融合的 Kafka 到 S3 接收器自定义 s3 命名以方便分区

问题描述 投票:0回答:1

我正在使用 Confluence 的 kafka-connect-s3 将我的 kafka 主题备份到 s3 https://www.confluence.io/hub/confluenceinc/kafka-connect-s3。我希望能够使用 Athena 轻松查询这些数据,并对其进行正确分区,以实现廉价/快速读取。

我想按(年/月/日/主题)元组进行分区。我已经通过使用每日分区器解决了年/月/日部分https://docs.confluence.io/kafka-connect-s3-sink/current/index.html#partitioning-records-into-s3-objects 。现在,年=YYYY/月=MM/日=DD 已纳入路径中,因此任何基于 Hive 的查询都会按时优化/分区。查看 msck 解释,请注意使用

userid=

的示例

https://docs.aws.amazon.com/athena/latest/ug/msck-repair-table.html

但是,基于这些文档https://docs.confluence.io/kafka-connect-s3-sink/current/index.html#s3-object-names我在路径中得到了 {topic} 但没有办法将其修改为topic={topic}。我可以将其添加到前缀中(而不是 env={env},前缀将是 env={env}/topic={topic}),但这对于其下面的另一个独生子目录 {topic} 来说似乎是多余的。

我还注意到主题名称位于由 + 分隔的消息名称中(以及分区和起始偏移量)。

我的问题。 。 。如何在我的路径中获取 topic={topic} 以便基于配置单元的查询自动创建该分区?或者我是否已经通过将其放在路径中(没有 topic=)或消息名称中(同样,没有 topic=)来免费获得它

amazon-s3 apache-kafka amazon-athena confluent-platform s3-kafka-connector
1个回答
1
投票

如何在我的路径中获取 topic={topic} 以便基于 hive 的查询自动创建该分区?

没有,开箱即用。您必须重写连接器的 Partitioner 类。否则,我认为

__filename__
是一个特殊的Hive列,可以查询和解析,其中文件中有主题名称。

建议为每个主题创建一个分区表,而不是让主题本身成为分区

© www.soinside.com 2019 - 2024. All rights reserved.