spark 结构化流 - 使用 availableNow 触发器从 kafka 读取

问题描述 投票:0回答:1

我尝试使用 Spark Stream API 从 Kafka 读取数据并将结果作为增量表写入 S3。对我来说,在 S3 上放置更少的对象很重要,因此我使用 coalesce(2) 在每个批次中创建两个对象。我还想每 3 小时运行一次作业。

df = (spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", cf.kafka_bootstrap_servers)
  .option("subscribe", cf.topic_name)
  .option("startingOffsets", cf.starting_offsets)
  .option("maxOffsetsPerTrigger", cf.max_batch)
  .option("minOffsetsPerTrigger", cf.min_batch)
  .option("failOnDataLoss", cf.fail_on_data_loss)
  .option("maxTriggerDelay", cf.max_trigger_delay)
  .load()
  )

### some transformation here ###

df = df.coalesce(2)

query = df.writeStream \
    .format("delta") \
    .partitionBy("created_date") \
    .outputMode("append") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", cf.checkpoints_location) \
    .trigger(availableNow=True) \
    .start(cf.sink_location)

query.awaitTermination()

正如您在代码中看到的,我设置了 minOffsetsPerTrigger 和 maxOffsetsPerTrigger,并在查询中使用了 AvailableNow Trigger。

基于文件:

Available-now 微批次 类似于查询一次性微批次 触发时,查询将处理所有可用数据,然后停止 靠它自己。不同之处在于,它将处理数据 (可能)基于源选项的多个微批次(例如 maxFilesPerTrigger 对于文件源),这将导致更好的查询 可扩展性。

maxBytesPerTrigger 属性确定每个微批次的最大大小。 对于AvailableNow来说,它不仅仅代表一个触发器;如果有更多数据需要处理,它确实可以涉及多个触发器。

它开始以微批次的方式消费来自 Kafka 的数据,但它只是将最后一批数据存储在 s3 中,所以完成后我可以看到最后一批数据和之前的批次数据丢失了。

更新:

似乎直到完成才创建对象,在处理批次期间,S3 上没有创建对象。

apache-spark apache-kafka spark-streaming spark-structured-streaming delta-lake
1个回答
0
投票

我发现问题了。它与触发器无关。在转换中,有一个 from_avro 函数用于反序列化 Kafka 消息。不幸的是,架构发生了变化,from_avro 无法解析旧消息。

© www.soinside.com 2019 - 2024. All rights reserved.