跳转至

16.logstash格式化日志

1. grok语法

%{SYNTAX:SEMANTIC}
#SYNTAX:代表匹配值的类型,如:3.44用NUMBER类型匹配,127.0.0.1用:IP类型匹配

#SEMANTIC:代表存储该值的一个变量名称,如:3.44可以用%{NUMBER:duration}来匹配,127.0.0.1可以用%{IP:client}来匹配。
# 更多grok用法见官网
# https://www.elastic.co/cn/blog/do-you-grok-grok

2. 分析日志信息

#第1条,需要删除
"message" => "# Time: 2021-12-10T07:17:17.100835Z"

#第2条,需要提取出各条信息!
"message" => "# User@Host: root[root] @ localhost []  Id:     6\n# Query_time: 5.759042  Lock_time: 0.000231 Rows_sent: 1  Rows_examined: 19999999\nSET timestamp=1639120637;\nselect * from db1.t1 where id=8;"

1. 删除time行

 #删除 "message" => "# Time: 181105 4:31:16"行
 #匹配 message 中,行"# Time: "后加入一个 tag drop
 grok {
     match => { "message" => "# Time: " }
     add_tag => [ "drop" ]
     tag_on_failure => []
     }

 #删除标签中含有 drop 的行
 if "drop" in [tags] {
     drop {}
     }

2. 提取有用信息

    grok {
         match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ 
        (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+# 
        Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_s
        ent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{D
        ATA:database};\s*)?SET 
        timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ]
         }

3. 正则匹配分析(失败!)

"message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)"
# 日志信息
"message" => "# User@Host: root[root] @ localhost []  Id:     6\n# Query_time: 5.759042  Lock_time: 0.000231 Rows_sent: 1  Rows_examined: 19999999\nSET timestamp=1639120637;\nselect * from db1.t1 where id=8;"

4.

"message", 
"(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\]
(?:\s*Id: %{NUMBER:id:int})
?\s+#Query_time: %{NUMBER:query_time:float}
\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)"

3. 完整的grok语句

filter {
    grok {
         match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @(?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ]
        }

    #删除 "message" => "# Time: 181105 4:31:16"行
    #匹配 message 中,行"# Time: "后加入一个 tag drop
     grok {
         match => { "message" => "# Time: " }
         add_tag => [ "drop" ]
         tag_on_failure => []
        }

     #删除标签中含有 drop 的行
     if "drop" in [tags] {
        drop {}
        }

     #时间转换
     date {
         match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
         target => "@timestamp"
         timezone => "Asia/Shanghai"
         }
     ruby {
        code => "event.set('[@metadata][today]', 
        Time.at(event.get('@timestamp').to_i).localtime.strftime('%Y.%m.%d'))"
        }

     #删除字段 message
     mutate {
         remove_field => [ "message" ]
        }
}

4. 修改logstash配置文件

vim /etc/logstash/conf.d/logstash-to-elasticsearch.conf
# 在input和output之间添加filter!
filter {
    grok {
         match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ]
        }
     grok {
         match => { "message" => "# Time: " }
         add_tag => [ "drop" ]
         tag_on_failure => []
        }
     if "drop" in [tags] {
        drop {}
        }
     date {
         match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
         target => "@timestamp"
         timezone => "Asia/Shanghai"
         }
     ruby {
        code => "event.set('[@metadata][today]',Time.at(event.get('@timestamp').to_i).localtime.strftime('%Y.%m.%d'))"
        }
     mutate {
         remove_field => [ "message" ]
        }
}

完整配置文件如下

  1 # Sample Logstash configuration for creating a simple
  2 # Beats -> Logstash -> Elasticsearch pipeline.
  3
  4 input {
  5   beats {
  6     port => 5044
  7   }
  8 }
  9
 10
 11 filter {
 12     grok {
 13          match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\](?:\s*Id: %{NUMBER:id:int})?\s+#Query_time: %{NUMBER:query_time:float}\s+L    ock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<acti    on>\w+)\s+.*)" ]
 14         }
 15      grok {
 16          match => { "message" => "# Time: " }
 17          add_tag => [ "drop" ]
 18          tag_on_failure => []
 19         }
 20      if "drop" in [tags] {
 21         drop {}
 22         }
 23      date {
 24          match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
 25          target => "@timestamp"
 26          timezone => "Asia/Shanghai"
 27          }
 28      ruby {
 29         code => "event.set('[@metadata][today]',Time.at(event.get('@timestamp').to_i).localtime.strftime('%Y.%m.%d'))"
 30         }
 31      mutate {
 32          remove_field => [ "message" ]
 33         }
 34         }
 35
 36
 37
 38 output {
 39   elasticsearch {
 40     hosts => ["http://localhost:9200"]
 41     index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
 42     #user => "elastic"
 43     #password => "changeme"
 44   }
 45
 46 stdout {
 47 codec => rubydebug
 48        }
 49
 50 }

5. 重新启动logstash测试

# 窗口1,ctrl+c取消后,再次执行如下语句
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-to-elasticsearch.conf

6. 生成慢日志查看

# 窗口2
mysql -uroot -p123456

select * from db1.t1 where id=8;

7. 窗口1:查看现在的logstash日志

# 发现已经变成json格式了!且只有一条日志信息!!!
[WARN ] 2022-01-01 23:27:49.454 [defaultEventExecutorGroup-4-1] plain - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
{
         "input" => {
        "type" => "log"
    },
           "ecs" => {
        "version" => "1.12.0"
    },
     "container" => {
        "id" => "web-slow.log"
    },
      "@version" => "1",
          "tags" => [
        [0] "beats_input_codec_plain_applied",
        [1] "_grokparsefailure"
    ],
    "@timestamp" => 2022-01-01T15:27:46.790Z,
           "log" => {
        "offset" => 1140,
          "file" => {
            "path" => "/usr/local/mysql/data/web-slow.log"
        },
         "flags" => [
            [0] "multiline"
        ]
    },
         "agent" => {
            "hostname" => "4c16g",
                "type" => "filebeat",
                  "id" => "edca8f85-bc52-4365-b94b-bdf17eb8209f",
                "name" => "4c16g",
        "ephemeral_id" => "3568ddcd-f4a5-499f-84f4-1feac879b4b9",
             "version" => "7.16.2"
    },
          "host" => {
                  "mac" => [
            [ 0] "52:54:00:3c:12:5f",
            [ 1] "02:42:e8:51:6b:3d",
            [ 2] "02:42:f0:96:22:00",
            [ 3] "02:42:ae:33:d3:59",
            [ 4] "42:d0:00:10:12:eb",
            [ 5] "ba:2e:bd:27:16:c5",
            [ 6] "0e:9b:05:8b:48:de",
            [ 7] "ee:ee:ee:ee:ee:ee",
            [ 8] "ee:ee:ee:ee:ee:ee",
            [ 9] "f6:7c:4a:e2:03:7c",
            [10] "ee:ee:ee:ee:ee:ee",
            [11] "f6:09:3a:34:31:e5",
            [12] "ee:ee:ee:ee:ee:ee",
            [13] "ee:ee:ee:ee:ee:ee",
            [14] "aa:c6:05:0a:f9:c8",
            [15] "ce:6b:23:9b:4a:c2",
            [16] "72:6b:1a:93:c0:9c",
            [17] "fa:e0:ab:c5:95:20",
            [18] "02:da:89:5b:24:91",
            [19] "66:2c:c8:3b:be:dc",
            [20] "4a:1d:16:20:02:66"
        ],
             "hostname" => "4c16g",
                   "os" => {
                "type" => "linux",
              "kernel" => "5.4.0-90-generic",
                "name" => "Ubuntu",
            "codename" => "focal",
              "family" => "debian",
            "platform" => "ubuntu",
             "version" => "20.04 LTS (Focal Fossa)"
        },
                   "id" => "b3856b2bce5c47ab962ede7e592b054c",
                   "ip" => [
            [ 0] "10.0.16.15",
            [ 1] "fe80::5054:ff:fe3c:125f",
            [ 2] "172.17.0.1",
            [ 3] "fe80::42:e8ff:fe51:6b3d",
            [ 4] "10.0.8.5",
            [ 5] "10.244.75.128",
            [ 6] "192.168.250.1",
            [ 7] "fe80::42:f0ff:fe96:2200",
            [ 8] "172.18.0.1",
            [ 9] "fe80::42:aeff:fe33:d359",
            [10] "fe80::40d0:ff:fe10:12eb",
            [11] "fe80::b82e:bdff:fe27:16c5",
            [12] "fe80::c9b:5ff:fe8b:48de",
            [13] "fe80::ecee:eeff:feee:eeee",
            [14] "fe80::ecee:eeff:feee:eeee",
            [15] "fe80::f47c:4aff:fee2:37c",
            [16] "fe80::ecee:eeff:feee:eeee",
            [17] "fe80::f409:3aff:fe34:31e5",
            [18] "fe80::ecee:eeff:feee:eeee",
            [19] "fe80::ecee:eeff:feee:eeee",
            [20] "fe80::a8c6:5ff:fe0a:f9c8",
            [21] "fe80::cc6b:23ff:fe9b:4ac2",
            [22] "fe80::706b:1aff:fe93:c09c",
            [23] "fe80::f8e0:abff:fec5:9520",
            [24] "fe80::da:89ff:fe5b:2491",
            [25] "fe80::642c:c8ff:fe3b:bedc",
            [26] "fe80::481d:16ff:fe20:266"
        ],
                 "name" => "4c16g",
        "containerized" => false,
         "architecture" => "x86_64"
    }
}

有问题!!!

# 不是按照自己的定义取的日志信息!!!还是得分析正则!

8. 将logstash放到后台运行

# 可以先修改配置文件,将输出到屏幕的语句删除

# 放到后台运行logstash
cd /tmp
nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-to-elasticsearch.conf &

#查看启动情况
tail -f nohup.out 

# 查看端口5044
netstat -tunlp|grep 5044

至此,logstash设置完成!


最后更新: 2022-02-19 13:05:46