Logstash 字段在聚合后永远不会显示

问题描述 投票:0回答:1

我有logstash版本7.8.0 有人可以告诉我为什么下面的聚合从未将 THREAD_ID 字段显示到文档中吗? 我的字段:thread_id 添加在聚合末尾..

示例.log:

2024-12-14 12:00:01 thread-1 SOAP message <<Envelope 011>>
2024-12-14 12:00:02 thread-1 SOAP message >>Envelope 012<<
2024-12-14 12:05:03 thread-2 SOAP message <<Envelope 021>>
2024-12-14 12:05:04 thread-2 SOAP message >>Envelope 022<<

从logstash中过滤GROK:

filter {
  grok {
    match => {
      "message" => [
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
      ]
    }
  }
  aggregate {
  task_id => "%{thread_id}"
  code => "
    map['soap_in'] ||= []
    map['soap_out'] ||= []

    # Capture soap_in with timestamp if exists
    if event.get('soap_in')
      map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
    end

    # Capture soap_out with timestamp if exists
    if event.get('soap_out')
      map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
    end

   # Once both soap_in and soap_out are available, emit the aggregated event
    if map['soap_in'] && map['soap_out']
      event.set('soap_in', map['soap_in'])
      event.set('soap_out', map['soap_out'])
      event.set('thread_id', event.get('thread_id'))
      event.cancel()
   end
   "
  push_previous_map_as_event => true
  timeout => 3
}
  mutate {
    remove_field => ["message"]
  }
}

结果从未显示thread_id

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "app-aggregate-2024.12.13",
        "_type" : "_doc",
        "_id" : "qop5wJMB81mNBoMqWzzC",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "soap_out" : [
            {
              "log_timestamp" : "2024-12-14 12:00:02",
              "soap_out" : "Envelope 012"
            }
          ],
          "@timestamp" : "2024-12-13T14:43:18.985Z",
          "soap_in" : [
            {
              "log_timestamp" : "2024-12-14 12:00:01",
              "soap_in" : "Envelope 011"
            }
          ]
        }
      },
      {
        "_index" : "app-aggregate-2024.12.13",
        "_type" : "_doc",
        "_id" : "rop5wJMB81mNBoMqbTwN",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "soap_out" : [
            {
              "log_timestamp" : "2024-12-14 12:05:04",
              "soap_out" : "Envelope 022"
            }
          ],
          "@timestamp" : "2024-12-13T14:43:23.504Z",
          "soap_in" : [
            {
              "log_timestamp" : "2024-12-14 12:05:03",
              "soap_in" : "Envelope 021"
            }
          ]
        }
      }
    ]
  }
}

所以在elasticsearch的结果中,我们可以看到我们有2个值而不是4个,这很好但是 我仍然不知道为什么这从未显示字段 thread_id 即使它在聚合中提到 预先感谢您,

elasticsearch logstash aggregate kibana grok
1个回答
0
投票

找出如何解决这个问题并不简单,但最终我得到了解决方案:

    filter {
  grok {
    match => {
      "message" => [
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
        '%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
      ]
    }
  }
  aggregate {
  task_id => "%{thread_id}"
  code => "
    map['soap_in'] ||= []
    map['soap_out'] ||= []
    map['thread_id'] ||= []
    map['thread_id'] = event.get('thread_id')

    if event.get('soap_in')
      map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
    end

    if event.get('soap_out')
      map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
    end

    if map['soap_in'] && map['soap_out']
      event.set('thread_id', map['thread_id'])
      event.set('soap_in', map['soap_in'])
      event.set('soap_out', map['soap_out'])
      event.cancel()
   end
   "
  push_previous_map_as_event => true
  timeout => 3
}
  mutate {
    remove_field => ["message"]
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.