我有logstash版本7.8.0 有人可以告诉我为什么下面的聚合从未将 THREAD_ID 字段显示到文档中吗? 我的字段:thread_id 添加在聚合末尾..
2024-12-14 12:00:01 thread-1 SOAP message <<Envelope 011>>
2024-12-14 12:00:02 thread-1 SOAP message >>Envelope 012<<
2024-12-14 12:05:03 thread-2 SOAP message <<Envelope 021>>
2024-12-14 12:05:04 thread-2 SOAP message >>Envelope 022<<
filter {
grok {
match => {
"message" => [
'%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
'%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
]
}
}
aggregate {
task_id => "%{thread_id}"
code => "
map['soap_in'] ||= []
map['soap_out'] ||= []
# Capture soap_in with timestamp if exists
if event.get('soap_in')
map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
end
# Capture soap_out with timestamp if exists
if event.get('soap_out')
map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
end
# Once both soap_in and soap_out are available, emit the aggregated event
if map['soap_in'] && map['soap_out']
event.set('soap_in', map['soap_in'])
event.set('soap_out', map['soap_out'])
event.set('thread_id', event.get('thread_id'))
event.cancel()
end
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
remove_field => ["message"]
}
}
结果从未显示thread_id
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "app-aggregate-2024.12.13",
"_type" : "_doc",
"_id" : "qop5wJMB81mNBoMqWzzC",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"soap_out" : [
{
"log_timestamp" : "2024-12-14 12:00:02",
"soap_out" : "Envelope 012"
}
],
"@timestamp" : "2024-12-13T14:43:18.985Z",
"soap_in" : [
{
"log_timestamp" : "2024-12-14 12:00:01",
"soap_in" : "Envelope 011"
}
]
}
},
{
"_index" : "app-aggregate-2024.12.13",
"_type" : "_doc",
"_id" : "rop5wJMB81mNBoMqbTwN",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"soap_out" : [
{
"log_timestamp" : "2024-12-14 12:05:04",
"soap_out" : "Envelope 022"
}
],
"@timestamp" : "2024-12-13T14:43:23.504Z",
"soap_in" : [
{
"log_timestamp" : "2024-12-14 12:05:03",
"soap_in" : "Envelope 021"
}
]
}
}
]
}
}
所以在elasticsearch的结果中,我们可以看到我们有2个值而不是4个,这很好但是 我仍然不知道为什么这从未显示字段 thread_id 即使它在聚合中提到 预先感谢您,
找出如何解决这个问题并不简单,但最终我得到了解决方案:
filter {
grok {
match => {
"message" => [
'%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
'%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
]
}
}
aggregate {
task_id => "%{thread_id}"
code => "
map['soap_in'] ||= []
map['soap_out'] ||= []
map['thread_id'] ||= []
map['thread_id'] = event.get('thread_id')
if event.get('soap_in')
map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
end
if event.get('soap_out')
map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
end
if map['soap_in'] && map['soap_out']
event.set('thread_id', map['thread_id'])
event.set('soap_in', map['soap_in'])
event.set('soap_out', map['soap_out'])
event.cancel()
end
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
remove_field => ["message"]
}
}