16.logstash格式化日志
1. grok语法¶
%{SYNTAX:SEMANTIC}
#SYNTAX:代表匹配值的类型,如:3.44用NUMBER类型匹配,127.0.0.1用:IP类型匹配
#SEMANTIC:代表存储该值的一个变量名称,如:3.44可以用%{NUMBER:duration}来匹配,127.0.0.1可以用%{IP:client}来匹配。
# 更多grok用法见官网
# https://www.elastic.co/cn/blog/do-you-grok-grok
2. 分析日志信息¶
#第1条,需要删除
"message" => "# Time: 2021-12-10T07:17:17.100835Z"
#第2条,需要提取出各条信息!
"message" => "# User@Host: root[root] @ localhost [] Id: 6\n# Query_time: 5.759042 Lock_time: 0.000231 Rows_sent: 1 Rows_examined: 19999999\nSET timestamp=1639120637;\nselect * from db1.t1 where id=8;"
1. 删除time行¶
#删除 "message" => "# Time: 181105 4:31:16"行
#匹配 message 中,行"# Time: "后加入一个 tag drop
grok {
match => { "message" => "# Time: " }
add_tag => [ "drop" ]
tag_on_failure => []
}
#删除标签中含有 drop 的行
if "drop" in [tags] {
drop {}
}
2. 提取有用信息¶
grok {
match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @
(?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#
Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_s
ent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{D
ATA:database};\s*)?SET
timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ]
}
3. 正则匹配分析(失败!)¶
"message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)"
# 日志信息
"message" => "# User@Host: root[root] @ localhost [] Id: 6\n# Query_time: 5.759042 Lock_time: 0.000231 Rows_sent: 1 Rows_examined: 19999999\nSET timestamp=1639120637;\nselect * from db1.t1 where id=8;"
4.¶
"message",
"(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\]
(?:\s*Id: %{NUMBER:id:int})
?\s+#Query_time: %{NUMBER:query_time:float}
\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)"
3. 完整的grok语句¶
filter {
grok {
match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @(?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ]
}
#删除 "message" => "# Time: 181105 4:31:16"行
#匹配 message 中,行"# Time: "后加入一个 tag drop
grok {
match => { "message" => "# Time: " }
add_tag => [ "drop" ]
tag_on_failure => []
}
#删除标签中含有 drop 的行
if "drop" in [tags] {
drop {}
}
#时间转换
date {
match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
ruby {
code => "event.set('[@metadata][today]',
Time.at(event.get('@timestamp').to_i).localtime.strftime('%Y.%m.%d'))"
}
#删除字段 message
mutate {
remove_field => [ "message" ]
}
}
4. 修改logstash配置文件¶
vim /etc/logstash/conf.d/logstash-to-elasticsearch.conf
# 在input和output之间添加filter!
filter {
grok {
match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ]
}
grok {
match => { "message" => "# Time: " }
add_tag => [ "drop" ]
tag_on_failure => []
}
if "drop" in [tags] {
drop {}
}
date {
match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
ruby {
code => "event.set('[@metadata][today]',Time.at(event.get('@timestamp').to_i).localtime.strftime('%Y.%m.%d'))"
}
mutate {
remove_field => [ "message" ]
}
}
完整配置文件如下¶
1 # Sample Logstash configuration for creating a simple
2 # Beats -> Logstash -> Elasticsearch pipeline.
3
4 input {
5 beats {
6 port => 5044
7 }
8 }
9
10
11 filter {
12 grok {
13 match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+L ock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<acti on>\w+)\s+.*)" ]
14 }
15 grok {
16 match => { "message" => "# Time: " }
17 add_tag => [ "drop" ]
18 tag_on_failure => []
19 }
20 if "drop" in [tags] {
21 drop {}
22 }
23 date {
24 match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
25 target => "@timestamp"
26 timezone => "Asia/Shanghai"
27 }
28 ruby {
29 code => "event.set('[@metadata][today]',Time.at(event.get('@timestamp').to_i).localtime.strftime('%Y.%m.%d'))"
30 }
31 mutate {
32 remove_field => [ "message" ]
33 }
34 }
35
36
37
38 output {
39 elasticsearch {
40 hosts => ["http://localhost:9200"]
41 index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
42 #user => "elastic"
43 #password => "changeme"
44 }
45
46 stdout {
47 codec => rubydebug
48 }
49
50 }
5. 重新启动logstash测试¶
# 窗口1,ctrl+c取消后,再次执行如下语句
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-to-elasticsearch.conf
6. 生成慢日志查看¶
# 窗口2
mysql -uroot -p123456
select * from db1.t1 where id=8;
7. 窗口1:查看现在的logstash日志¶
# 发现已经变成json格式了!且只有一条日志信息!!!
[WARN ] 2022-01-01 23:27:49.454 [defaultEventExecutorGroup-4-1] plain - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
{
"input" => {
"type" => "log"
},
"ecs" => {
"version" => "1.12.0"
},
"container" => {
"id" => "web-slow.log"
},
"@version" => "1",
"tags" => [
[0] "beats_input_codec_plain_applied",
[1] "_grokparsefailure"
],
"@timestamp" => 2022-01-01T15:27:46.790Z,
"log" => {
"offset" => 1140,
"file" => {
"path" => "/usr/local/mysql/data/web-slow.log"
},
"flags" => [
[0] "multiline"
]
},
"agent" => {
"hostname" => "4c16g",
"type" => "filebeat",
"id" => "edca8f85-bc52-4365-b94b-bdf17eb8209f",
"name" => "4c16g",
"ephemeral_id" => "3568ddcd-f4a5-499f-84f4-1feac879b4b9",
"version" => "7.16.2"
},
"host" => {
"mac" => [
[ 0] "52:54:00:3c:12:5f",
[ 1] "02:42:e8:51:6b:3d",
[ 2] "02:42:f0:96:22:00",
[ 3] "02:42:ae:33:d3:59",
[ 4] "42:d0:00:10:12:eb",
[ 5] "ba:2e:bd:27:16:c5",
[ 6] "0e:9b:05:8b:48:de",
[ 7] "ee:ee:ee:ee:ee:ee",
[ 8] "ee:ee:ee:ee:ee:ee",
[ 9] "f6:7c:4a:e2:03:7c",
[10] "ee:ee:ee:ee:ee:ee",
[11] "f6:09:3a:34:31:e5",
[12] "ee:ee:ee:ee:ee:ee",
[13] "ee:ee:ee:ee:ee:ee",
[14] "aa:c6:05:0a:f9:c8",
[15] "ce:6b:23:9b:4a:c2",
[16] "72:6b:1a:93:c0:9c",
[17] "fa:e0:ab:c5:95:20",
[18] "02:da:89:5b:24:91",
[19] "66:2c:c8:3b:be:dc",
[20] "4a:1d:16:20:02:66"
],
"hostname" => "4c16g",
"os" => {
"type" => "linux",
"kernel" => "5.4.0-90-generic",
"name" => "Ubuntu",
"codename" => "focal",
"family" => "debian",
"platform" => "ubuntu",
"version" => "20.04 LTS (Focal Fossa)"
},
"id" => "b3856b2bce5c47ab962ede7e592b054c",
"ip" => [
[ 0] "10.0.16.15",
[ 1] "fe80::5054:ff:fe3c:125f",
[ 2] "172.17.0.1",
[ 3] "fe80::42:e8ff:fe51:6b3d",
[ 4] "10.0.8.5",
[ 5] "10.244.75.128",
[ 6] "192.168.250.1",
[ 7] "fe80::42:f0ff:fe96:2200",
[ 8] "172.18.0.1",
[ 9] "fe80::42:aeff:fe33:d359",
[10] "fe80::40d0:ff:fe10:12eb",
[11] "fe80::b82e:bdff:fe27:16c5",
[12] "fe80::c9b:5ff:fe8b:48de",
[13] "fe80::ecee:eeff:feee:eeee",
[14] "fe80::ecee:eeff:feee:eeee",
[15] "fe80::f47c:4aff:fee2:37c",
[16] "fe80::ecee:eeff:feee:eeee",
[17] "fe80::f409:3aff:fe34:31e5",
[18] "fe80::ecee:eeff:feee:eeee",
[19] "fe80::ecee:eeff:feee:eeee",
[20] "fe80::a8c6:5ff:fe0a:f9c8",
[21] "fe80::cc6b:23ff:fe9b:4ac2",
[22] "fe80::706b:1aff:fe93:c09c",
[23] "fe80::f8e0:abff:fec5:9520",
[24] "fe80::da:89ff:fe5b:2491",
[25] "fe80::642c:c8ff:fe3b:bedc",
[26] "fe80::481d:16ff:fe20:266"
],
"name" => "4c16g",
"containerized" => false,
"architecture" => "x86_64"
}
}
有问题!!!¶
# 不是按照自己的定义取的日志信息!!!还是得分析正则!
8. 将logstash放到后台运行¶
# 可以先修改配置文件,将输出到屏幕的语句删除
# 放到后台运行logstash
cd /tmp
nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-to-elasticsearch.conf &
#查看启动情况
tail -f nohup.out
# 查看端口5044
netstat -tunlp|grep 5044
至此,logstash设置完成!¶
最后更新:
2022-02-19 13:05:46