我已经成功设置了 Spark 的 Session 并从 Kafka 的 Topic 流式传输消息。
kafka_stream_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:10000, localhost:10001, localhost:10002') \
.option('subscribe', 'responses') \
.option('startingOffsets', 'latest') \
.load()
然后,经过多次转换过程,只选择真正进入冰山表的数据。
schema = StructType() \
.add("event_date", StringType()) \
.add("event_timestamp", LongType()) \
.add("event_name", StringType()) \
.add("projectKey", StringType()) \
.add("customerKey", StringType()) \
.add("GA4ResponseId", StringType()) \
.add("exitFieldId", LongType()) \
.add("surveyTimeStamp", LongType()) \
.add("currentFieldId", LongType()) \
.add("rejectionFieldId", LongType())
type_transformed_df = kafka_stream_df.selectExpr("CAST(value AS STRING)")
parsed_df = type_transformed_df.select(from_json(col("value"), schema).alias("data")) \
.select(
col("data.event_date"),
col("data.event_timestamp"),
col("data.event_name"),
col("data.projectKey").alias("project_key"),
col("data.customerKey").alias("customer_key"),
col("data.GA4ResponseId").alias("ga4_response_id"),
col("data.exitFieldId").alias("exit_field_id"),
col("data.surveyTimeStamp").alias("survey_time_stamp"),
col("data.currentFieldId").alias("current_field_id"),
col("data.rejectionFieldId").alias("rejection_field_id")
)
现在重要的是在转换后的Streaming Dataframe中,每一行数据必须插入到对应的项目关键冰山表中。
使用 Iceberg 日志表中的 project_key 列进行分区会很方便,但它不能用于我的产品............
所有消息必须快速写入冰山表。
我该怎么办?有哪些方法?
Iceberg 有分区和非分区方法,但这是在创建 Iceberg 表时定义的。
根据您的情况/愿望,您可以
foreachBatch
和缓存插入不同的Iceberg表。