基于logstash中消息值的多个Kafka主题

问题描述 投票:0回答:1

我有一个如下的logstash 输出配置。这工作正常。

output {
  kafka {
    topic_id => "topic1"
    bootstrap_servers => "172.172.172.172:1722"
    codec => json
    acks => "0"
    partitioner => round_robin
    compression_type => gzip
    max_request_size => 10485760
  }
}

每条消息都有一个 JSON 结构,并且有固定的键值对。这些值根据操作等不断变化。有一个键“ENV”,表示环境。我想根据 ENV 值向不同的 kafka 主题发送消息。

我尝试读取消息值并在其周围添加 if 但它不起作用。

output {
  if [ENV] == "DEV" {
    kafka {
      topic_id => "topic_DEV"
      bootstrap_servers => "172.172.172.172:1722"
      codec => json
      acks => "0"
      partitioner => round_robin
      compression_type => gzip
      max_request_size => 10485760
    }
  }
  else if [ENV] == "PRD" {
    kafka {
      topic_id => "topic_Production"
      bootstrap_servers => "172.172.172.172:1722"
      codec => json
      acks => "0"
      partitioner => round_robin
      compression_type => gzip
      max_request_size => 10485760
    }
  }
}

但是它并没有按预期工作。可能有一些消息被丢弃。我究竟做错了什么?我的 ENV 值仅作为 DEV 和 PRD。

我浏览了弹性文档https://www.elastic.co/guide/en/logstash/current/lookup-enrichment.html

apache-kafka logstash
1个回答
0
投票
input {
      # define your input here 
      beats {
      port => 5044
    }
  }
# we can use the filter to extract the namespace
# Make sure to parse it properly here 
# example if the namespace is part of the message, you might use grok to extract it 
filter {
    grok {
          match => {"message => "%{DATA: namespace} %{GREEDYDATA:log}"}
    }
# If the namespace is already a field you might not need grok and just 
  proceed with conditionals 
}
output {
  if [namespace] == "DEV" {
    kafka {
      topic_id => "topic_DEV"
      bootstrap_servers => "172.172.172.172:1722"
      codec => json
      acks => "0"
      partitioner => round_robin
      compression_type => gzip
      max_request_size => 10485760
    }
  }
  else if [namespace] == "PRD" {
    kafka {
      topic_id => "topic_Production"
      bootstrap_servers => "172.172.172.172:1722"
      codec => json
      acks => "0"
      partitioner => round_robin
      compression_type => gzip
      max_request_size => 10485760
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.