Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

问题描述 投票:0回答:2

我需要从 Kafka 主题读取 JSON 序列化消息,将它们转换为 Parquet 并保留在 S3 中。

背景

官方S3-Sink-Connector支持Parquet输出格式,但是:

对于此连接器,您必须使用 AvroConverter、ProtobufConverter 或 JsonSchemaConverter 以及 ParquetFormat。尝试使用 JsonConverter(带或不带模式)会导致 NullPointerException 和 StackOverflowException。

如果消息不是使用 JSON Schema 序列化写入的,JsonSchemaConverter 会抛出错误。

问题陈述

因此,我正在寻找一种方法来读取最初以 JSON 格式编写的 Kafka 主题消息,以某种方式将它们转换为 JSON Schema 格式,然后将它们插入 S3 连接器,该连接器将以 Parquet 格式写入 S3。

或者,我也愿意接受替代解决方案(-不涉及编写JAVA代码-)给定主要需求(获取Kafka消息,将其作为Parquet文件放入S3中)。谢谢!

PS: 不幸的是,目前我无法选择更改这些 Kafka 消息最初的写入方式(例如使用 JSON Schema 序列化Schema Discovery)。

json apache-kafka parquet apache-kafka-connect s3-kafka-connector
2个回答
0
投票
一般来说,您的数据需要有一个架构,因为 Parquet 需要它(S3 parquet writer 转换为 Avro 作为中间步骤)

您可以考虑使用

此 Connect 转换,它接受架构,并尝试应用 JSON 架构 - 参见测试。由于这会返回一个 Struct

 对象,因此您可以尝试使用 
JsonSchemaConverter
 作为接收器的一部分。

但是,如果您只是将随机 JSON 数据放入单个主题中,而没有任何一致的字段或值,那么您将很难应用任何模式


0
投票
如果生产者没有架构注册表,有两种方法可以实现此目的:

    启动架构注册表和 KSQL。使用消息架构创建一个 base_stream,并创建另一个流作为“select * from base_stream”WITH (VALUE_FORMAT="avro")。
  1. 如果您已正确配置 KSQL 和架构注册表,则 KSQL 具有提交架构。
  2. 使用带有 AVRO 转换器和 Parquet 格式的 S3 接收器连接器。 Kafka-connect 也应该连接到模式注册表才能正常工作。
© www.soinside.com 2019 - 2024. All rights reserved.