我有一个如下的logstash 输出配置。这工作正常。
output {
kafka {
topic_id => "topic1"
bootstrap_servers => "172.172.172.172:1722"
codec => json
acks => "0"
partitioner => round_robin
compression_type => gzip
max_request_size => 10485760
}
}
每条消息都有一个 JSON 结构,并且有固定的键值对。这些值根据操作等不断变化。有一个键“ENV”,表示环境。我想根据 ENV 值向不同的 kafka 主题发送消息。
我尝试读取消息值并在其周围添加 if 但它不起作用。
output {
if [ENV] == "DEV" {
kafka {
topic_id => "topic_DEV"
bootstrap_servers => "172.172.172.172:1722"
codec => json
acks => "0"
partitioner => round_robin
compression_type => gzip
max_request_size => 10485760
}
}
else if [ENV] == "PRD" {
kafka {
topic_id => "topic_Production"
bootstrap_servers => "172.172.172.172:1722"
codec => json
acks => "0"
partitioner => round_robin
compression_type => gzip
max_request_size => 10485760
}
}
}
但是它并没有按预期工作。可能有一些消息被丢弃。我究竟做错了什么?我的 ENV 值仅作为 DEV 和 PRD。
我浏览了弹性文档https://www.elastic.co/guide/en/logstash/current/lookup-enrichment.html
input {
# define your input here
beats {
port => 5044
}
}
# we can use the filter to extract the namespace
# Make sure to parse it properly here
# example if the namespace is part of the message, you might use grok to extract it
filter {
grok {
match => {"message => "%{DATA: namespace} %{GREEDYDATA:log}"}
}
# If the namespace is already a field you might not need grok and just
proceed with conditionals
}
output {
if [namespace] == "DEV" {
kafka {
topic_id => "topic_DEV"
bootstrap_servers => "172.172.172.172:1722"
codec => json
acks => "0"
partitioner => round_robin
compression_type => gzip
max_request_size => 10485760
}
}
else if [namespace] == "PRD" {
kafka {
topic_id => "topic_Production"
bootstrap_servers => "172.172.172.172:1722"
codec => json
acks => "0"
partitioner => round_robin
compression_type => gzip
max_request_size => 10485760
}
}
}