我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,该消息的
kafka_key
元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码有效负载的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode
处理器来解码它们。我希望为每个包含两个字段的 Kafka 消息生成一条输出 JSON 消息,一个名为 content
包含解码的 AVRO 消息,另一个名为 metadata
包含 Benthos 收集的各种 元数据字段,包括解码的 kafka_key
有效负载。
branch
处理器来实现这一点,如下所示:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}
我希望将其中生成的json键指定为输出中的键值,但是我不希望o/p主题中的元数据字段,如何实现?