type
status
date
slug
summary
tags
category
icon
password
Logstash介绍
Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。输入源 → logstash → 输出
输入:采集各种样式、大小和来源的数据。数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
过滤器:实时解析和转换数据。数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。
输出:选择你的存储,导出你的数据。尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
Logstash 和 Filebeat 的比较:
Logstash 功能更加丰富,除了能够处理各种日志格式外,还支持将非 JSON 格式的日志转换为 JSON 格式,并支持多目标输出。与 Filebeat 相比,Logstash 拥有更强大的过滤和转换功能,适用于日志的复杂加工处理,处理完成后可以将结果交给 Elasticsearch。
Filebeat 主要负责轻量级的数据采集,适合直接将日志转发到 Elasticsearch 或 Logstash,功能上相对简单。Logstash 的资源消耗较高,因此不适合在每个日志主机上部署,而更适合在集中式的日志处理环境中使用。
部署Logstash
注:ElasticSearch+Logstash+Kibana 三个版本号必须保持一致
解压安装包
修改 Logstash 配置
创建目录
内存优化,我们这里测试演示,设置的为1g
如果是以rpm安装的Logstash,默认使用logstash用户运行,如果logstash需要收集本机的日志,可能会有权限问题,可以修改为root。我们这里是源码部署,此配置忽略
logstash命令常用选项
-e:指定配置内容
-f:指定配置文件,支持绝对路径,如果用相对路径,是相对于/usr/share/logstash/的路径
-t:语法检查
-r:修改配置文件后自动加载生效,注意:有时候修改配置还需要重新启动生效
首先,让我们通过最基本的Logstash管道来测试一下刚刚安装的Logstash。Logstash管道有两个必需的元素
输入和输出,以及一个可选元素过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。测试是否可以启动,从控制台输入输出输出如下内容,表示启动成功
在终端输入 hello logstash 进行测试

输入Json格式数据
{ "name":"zhangsan","age": "18","WeChat":"897215783"},不自动解析
指定输入信息为Json格式,
codec => json 指定为json格式输入 codec => rubydebug 指输出格式,是默认值,可以省略输入Json格式数据
{ "name":"zhangsan","age": "18","WeChat":"897215783"},自动解析
输入非Json格式信息,告警提示无法自动解析,存放message字段

标准输入输出
在 Logstash 中,stdin 插件用于从标准输入(终端)接收数据,而 stdout 插件则用于将处理后的数据输出到终端屏幕,通常用于测试与调试配置。示例配置
检查语法是否正常
启动 Logstash 服务
在终端输入 hello logstash 进行测试

收集系统日志并输出至文件和ES
Logstash 支持多输出,我们将收集系统日志并输出至文件和ES,前提是需要 logstash 用户对被收集的日志文件有读权限,并对要写入的文件有写权限。Logstash 会记录每个文件的读取位置,下次自动从此位置继续向后读取,每个文件的读取位置记录在
/var/lib/logstash/plugins/inputs/file/.sincedb_xxxx对应的文件中,此文件包括文件的inode号,大小等信息HTTP数据采集输出到文件
Logstash可以通过TCP端口采集数据信息,在logstash目录下修改配置文件内容如下
启动Logstash
发现监听了8888端口
然后我们发送一条数据,返回ok
查看输出的文件

我们再来发送一条json格式的数据,首先先将/tmp/test.log文件内容清空
查看输出的文件

插件filter
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便进行更强大的分析和实现商业价值。Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响,常见的 Filter 插件:
1、利用 Grok 从非结构化数据中转化为结构数据
2、利用 GEOIP 根据 IP 地址找出对应的地理位置坐标
3、利用 useragent 从请求中分析操作系统、设备类型
Grok 插件
Grok 是一个过滤器插件,可帮助您描述日志格式的结构。有超过200种 grok 模式抽象概念,如IPv6地址、UNIX路径和月份名称。为了将日志行与格式匹配,生产环境常需要将非结构化的数据解析成 json 结构化数据格式。比如下面行:
使用 Grok 插件可以基于正则表达式技术,利用其内置的正则表达式的别名来表示和匹配上面的日志,如下
以
%{NUMBER:response_code}举例,NUMBER 是 Grok 内置的正则表达式别名,用于匹配数字([0-9]+),在这个例子中,它会匹配日志中的 200。response_code 是你自定义的字段名(键名),用于把匹配到的值提取出来,并赋值给这个键。最终转换为以下格式利用kibana可以将如上日志自动生成grok的内置格式代码,点击开发工具 → Grok Debugger

现在我们使用 grok 插件将 Nginx 日志格式化为 json 格式,每3秒采集一次,输出到屏幕终端进行调试
检查配置
启动logstash
访问nginx,查看终端输出发现数据正常输出

修改配置文件,输出到ES
启动logstash
访问nginx,在kibana中查看

Geoip 插件
GeoIP 是一种通过 IP 地址解析出其所属地理位置信息的技术。它可以提取出包括经纬度、国家、城市等地域信息,便于在日志分析、访问统计、风控审计等场景中进行地理分布分析和可视化展示。
现在我们使用 Geoip 插件分析 Nginx 日志,获取地域信息
检查配置
启动logstash
访问nginx,日志内容如下
查看终端输出

只保留解析的指定字段

Date 插件
在 Logstash 中,默认生成的 @timestamp 字段表示的是日志进入 Logstash 的时间,而不是日志本身的产生时间。如果你希望用日志中自带的时间来替代 @timestamp,或者写入到其他字段,就可以使用 date 插件。date 插件:将字符串格式的时间解析为真正的时间戳。
当你的日志中包含时间字段,比如:
14/Jul/2020:15:07:27 +0800。这个时间是日志实际产生的时间,但它是一个字符串格式,Elasticsearch 无法直接识别。我们可以使用 date 插件将其转换为标准时间格式。配置示例:match:指定源字段和其时间格式,格式必须和日志中的时间字符串一一对应。
target:可选,表示解析结果写入哪个字段。默认是 @timestamp
timezone:可选,指定解析时间时使用的时区,默认是 UTC
部分时间格式对照表
示例时间 | 格式 |
14/Jul/2020:15:07:27 +0800 | dd/MMM/yyyy:HH:mm:ss Z |
2020-07-14 15:07:27 | yyyy-MM-dd HH:mm:ss |
2020-07-14T15:07:27.000Z | ISO8601(可直接使用) |
useragent插件
Logstash 提供的 useragent 插件可以从 User-Agent 字符串中提取出浏览器名称、版本、操作系统、设备类型等信息,方便我们对访问来源做更细粒度的分析,比如区分 PC / 移动设备、不同浏览器等。使用示例:
启动logstash
用 curl 提交日志
经过插件解析后,你会得到如下结构

如果你的日志是结构化 JSON,比如 nginx 的日志格式中已经提取了 user-agent 到 user_agent.original,可以用:
source => "[user_agent][original]"。解析后可以用来统计:浏览器种类分布、操作系统分布等。mutate 插件
mutate 插件是 Logstash 中最常用的过滤插件之一,主要用于对事件中的字段进行增删改查等操作。通过它可以方便地调整字段的值、格式或类型,适用于大多数数据清洗场景。以下是 mutate 常用的函数说明:
remove_field:删除一个或多个字段
add_field:添加一个新字段,或为已有字段赋新值,支持使用变量
rename:修改字段名称
convert:转换字段的数据类型,支持:integer,float,string,boolean 等
split:将字符串按指定分隔符切割为数组(类似 awk 取列)
gsub:使用正则表达式进行字符串替换
lowercase:将字符串转为小写
split 切割字符
mutate 中的 split 字符串切割,指定字符做为分隔符,切割的结果用于生成新的列表元素
启动logstash
用 curl 提交日志
观察结果

add_field 添加字段
用指定源字段添加新的字段,添加完成后源字段还存在
启动logstash
用 curl 提交日志
观察结果

remove_field 删除字段
启动logstash
用 curl 提交日志
观察结果

convert 类型转换
mutate 中的 convert 可以实现数据类型的转换。 支持转换integer、float、string等类型
启动logstash
用 curl 提交日志
gsub 替换
gsub 实现字符串的替换
条件判断
在 Logstash 的 filter 阶段,有时我们只想在 满足某些条件 时,才对事件做进一步处理。比如添加字段、重命名、分流等。这时就可以用
if ... else if ... else ... 的语法来实现条件判断。基本语法格式:比如我们想根据 path 字段中日志文件的路径,判断这是哪类日志,并添加一个 type 字段用于后续统计:
如果 path 字段中包含 access,我们认为是访问日志,打上标签 access_log。如果是 error,认为是错误日志。否则归为 other_log 类别。更多条件判断方式:
[status] == "200":状态码等于 200
[method] != "GET":方法不是 GET
"error" in [message]:message 中包含 "error"
[path] =~ /access/:path 字段匹配 access 关键字multiline 插件收集 Java 错误日志
在 Java 应用日志中,遇到异常时通常会输出多行错误堆栈信息,如 Exception 和 stack trace等。这些多行实际上属于同一条日志,为了便于 Elasticsearch 分析,需要将它们合并成一条完整的日志事件。Logstash 提供了 codec => multiline 插件来实现这个功能。它可以通过正则匹配,灵活地将多行日志合并为一条,常配合 what => "previous" 或 what => "next" 来控制合并方向。
假设我们有如下数据,需要将如下的数据做一个多行展示。129904 的作为一行展示,129905 的异常堆栈数据作为一行展示

从上方简化的日志可知,我们的日志每行都是以一个数字开头,那么可以认为以数字开头的行是一个单独的行,以非数字开头的行是隶属于上一行的,即是一个多行的。编写配置文件,输出到屏幕终端进行调试
检查配置,测试语法是否正确
启动logstash
java报错日志参考如下
查看输出结果

测试没问题后,将输出改为 Elasticsearch
启动logstash
Kibana 界面添加索引

Kibana 界面查看数据

- 链接:www.qianshuai.cn/article/logstash
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。











