我尝试使用 Spark Stream API 从 Kafka 读取数据并将结果作为增量表写入 S3。对我来说,在 S3 上放置更少的对象很重要,因此我使用 coalesce(2) 在每个批次中创建两个对象。我还想每 3 小时运行一次作业。
df = (spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", cf.kafka_bootstrap_servers)
.option("subscribe", cf.topic_name)
.option("startingOffsets", cf.starting_offsets)
.option("maxOffsetsPerTrigger", cf.max_batch)
.option("minOffsetsPerTrigger", cf.min_batch)
.option("failOnDataLoss", cf.fail_on_data_loss)
.option("maxTriggerDelay", cf.max_trigger_delay)
.load()
)
### some transformation here ###
df = df.coalesce(2)
query = df.writeStream \
.format("delta") \
.partitionBy("created_date") \
.outputMode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", cf.checkpoints_location) \
.trigger(availableNow=True) \
.start(cf.sink_location)
query.awaitTermination()
正如您在代码中看到的,我设置了 minOffsetsPerTrigger 和 maxOffsetsPerTrigger,并在查询中使用了 AvailableNow Trigger。
基于文件:
Available-now 微批次 类似于查询一次性微批次 触发时,查询将处理所有可用数据,然后停止 靠它自己。不同之处在于,它将处理数据 (可能)基于源选项的多个微批次(例如 maxFilesPerTrigger 对于文件源),这将导致更好的查询 可扩展性。
maxBytesPerTrigger 属性确定每个微批次的最大大小。 对于AvailableNow来说,它不仅仅代表一个触发器;如果有更多数据需要处理,它确实可以涉及多个触发器。
它开始以微批次的方式消费来自 Kafka 的数据,但它只是将最后一批数据存储在 s3 中,所以完成后我可以看到最后一批数据和之前的批次数据丢失了。
更新:
似乎直到完成才创建对象,在处理批次期间,S3 上没有创建对象。
我发现问题了。它与触发器无关。在转换中,有一个 from_avro 函数用于反序列化 Kafka 消息。不幸的是,架构发生了变化,from_avro 无法解析旧消息。