如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的 kafka 密钥?

问题描述 投票:0回答:2

我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,该消息的

kafka_key
元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码有效负载的模式存储在模式注册表中,Benthos 有一个
schema_registry_decode
处理器来解码它们。我希望为每个包含两个字段的 Kafka 消息生成一条输出 JSON 消息,一个名为
content
包含解码的 AVRO 消息,另一个名为
metadata
包含 Benthos 收集的各种 元数据字段,包括解码的
 kafka_key
有效负载。

go apache-kafka confluent-schema-registry benthos
2个回答
4
投票

事实证明,可以使用

branch
处理器来实现这一点,如下所示:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # Decode the message
    - schema_registry_decode:
        url: http://localhost:8081

    # Populate output content field
    - bloblang: |
        root.content = this

    # Decode kafka_key metadata payload and populate output metadata field
    - branch:
        request_map: |
          root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this

output:
  stdout: {}

-1
投票

我希望将其中生成的json键指定为输出中的键值,但是我不希望o/p主题中的元数据字段,如何实现?

© www.soinside.com 2019 - 2024. All rights reserved.