在 Databricks 中调用一次 Trigger 来处理 Kinesis Stream

问题描述 投票:0回答:3

我正在寻找一种方法来触发我的 Databricks 笔记本一次来处理 Kinesis Stream 并使用以下模式

 import org.apache.spark.sql.streaming.Trigger

// Load your Streaming DataFrame
   val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
   sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")

AWS Kinesis 似乎不可能实现这一点,Databricks 文档也是如此建议的。我的问题是我们还能做些什么来实现这一目标?

scala databricks spark-structured-streaming amazon-kinesis aws-databricks
3个回答

1
投票
解决方法是在 X 运行后停止,而不触发。它将保证每次运行固定的行数。 唯一的问题是,如果队列中有数百万行等待,您将无法保证处理所有这些行

在scala中你可以添加一个事件监听器,在python中计算批次数。

from time import sleep s = sdf.writeStream.format("delta").start("/out/path") #by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch #maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run while len(s.recentProgress) < 10: print("Batchs #:"+str(len(s.recentProgress))) sleep(10) s.stop()
您可以使用更高级的逻辑来计算每批处理的消息数量,并在队列为空时停止(一旦全部消耗完,吞吐量就会降低,因为您只能获得“实时”流,而不是历史记录) 


0
投票
自 Databricks DBR 13.3 起,现在可以使用

Trigger.AvailableNow

(pyspark 中的 
.trigger(availableNow=True)
)批量处理来自 Kinesis 的数据。

请参阅此处的文档

© www.soinside.com 2019 - 2024. All rights reserved.