我们在 Databricks 中利用结构化流,使用 foreach 功能进行转换和操作,并最终将数据写入 Delta 表。我们的数据源是 Eventhub、Delta 表或 Azure Cosmos 更改源。当源(Delta 表、Azure SQL 或 Cosmos)中发生重大数据更改时,我们需要在流处理过程中将数据拆分为更小的块,同时从源读取更改数据。具体来说,我们需要在结构化流中的 foreach 功能中设置微批次计数限制。
对于 Delta 表,设置选项 maxFilesPerTrigger 和/或 maxBytesPerTrigger:
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", "100")
.option("maxBytesPerTrigger", "5g")
请注意,如果您有一个大文件,它不会被分成多个批次