Kafka和Spark的Structured Streaming结果应该如何根据特定列插入到Iceberg Table中?没有分区

问题描述 投票:0回答:1

我已经成功设置了 Spark 的 Session 并从 Kafka 的 Topic 流式传输消息。

kafka_stream_df = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:10000, localhost:10001, localhost:10002') \
    .option('subscribe', 'responses') \
    .option('startingOffsets', 'latest') \
    .load()

然后,经过多次转换过程,只选择真正进入冰山表的数据。

schema = StructType() \
    .add("event_date", StringType()) \
    .add("event_timestamp", LongType()) \
    .add("event_name", StringType()) \
    .add("projectKey", StringType()) \
    .add("customerKey", StringType()) \
    .add("GA4ResponseId", StringType()) \
    .add("exitFieldId", LongType()) \
    .add("surveyTimeStamp", LongType()) \
    .add("currentFieldId", LongType()) \
    .add("rejectionFieldId", LongType())

type_transformed_df = kafka_stream_df.selectExpr("CAST(value AS STRING)")
parsed_df = type_transformed_df.select(from_json(col("value"), schema).alias("data")) \
    .select(
        col("data.event_date"),
        col("data.event_timestamp"),
        col("data.event_name"),
        col("data.projectKey").alias("project_key"),
        col("data.customerKey").alias("customer_key"),
        col("data.GA4ResponseId").alias("ga4_response_id"),
        col("data.exitFieldId").alias("exit_field_id"),
        col("data.surveyTimeStamp").alias("survey_time_stamp"),
        col("data.currentFieldId").alias("current_field_id"),
        col("data.rejectionFieldId").alias("rejection_field_id")
    )

现在重要的是在转换后的Streaming Dataframe中,每一行数据必须插入到对应的项目关键冰山表中。

使用 Iceberg 日志表中的 project_key 列进行分区会很方便,但它不能用于我的产品............

所有消息必须快速写入冰山表。

我该怎么办?有哪些方法?

apache-spark pyspark apache-kafka spark-structured-streaming apache-iceberg
1个回答
0
投票

Iceberg 有分区和非分区方法,但这是在创建 Iceberg 表时定义的。

根据您的情况/愿望,您可以

  1. 不分区插入 Iceberg 或
  2. 使用
    foreachBatch
    缓存插入不同的Iceberg表。
    • 这是多个水槽的标准方法。
    • 如果表不存在,只需使用追加模式,Iceberg 创建表。
© www.soinside.com 2019 - 2024. All rights reserved.