我正在寻找一种方法来触发我的 Databricks 笔记本一次来处理 Kinesis Stream 并使用以下模式
import org.apache.spark.sql.streaming.Trigger
// Load your Streaming DataFrame
val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")
AWS Kinesis 似乎不可能实现这一点,Databricks 文档也是如此建议的。我的问题是我们还能做些什么来实现这一目标?
正如您在问题中提到的,Kinesis 不支持触发一次。
但是您可以通过在图片中添加Kinesis Data Firehose 来实现您所需要的,它将数据 从 Kinesis 写入 S3 存储桶(您可以选择您需要的格式,例如 Parquet、ORC,或者直接离开JSON 格式),然后您可以将流作业指向给定的存储桶,并使用 Trigger.Once,因为它是普通的流源(为了提高效率,最好使用 Databricks 上提供的Auto Loader)。 此外,为了控制成本,您可以为 S3 目标设置保留策略,以便在一段时间(例如 1 周或一个月)后删除或存档文件。
在scala中你可以添加一个事件监听器,在python中计算批次数。
from time import sleep
s = sdf.writeStream.format("delta").start("/out/path")
#by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch
#maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run
while len(s.recentProgress) < 10:
print("Batchs #:"+str(len(s.recentProgress)))
sleep(10)
s.stop()
您可以使用更高级的逻辑来计算每批处理的消息数量,并在队列为空时停止(一旦全部消耗完,吞吐量就会降低,因为您只能获得“实时”流,而不是历史记录)
Trigger.AvailableNow
(pyspark 中的
.trigger(availableNow=True)
)批量处理来自 Kinesis 的数据。
请参阅此处的文档。