我正在 Databricks 中使用结构化流将批处理文件加载到 UC 表中。但是它正在工作,如果 foreachBatch 没有在 60 秒内完成,则会产生以下错误:
Force terminating query xxxxxxxxxxxxx due to not receiving any updates in 60 seconds. Spark Session ID is yyyyyyyyyyy
Timedout query xxxxxxxxxxxxx terminated
这是完整的代码:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType # Import necessary types
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
if debug:
print('Streaming enabled: read')
# Read the data into stream
landing_df = spark.readStream \
.format("cloudFiles") \
.option("cloudfiles.format", "parquet") \
.option("cloudFiles.schemaLocation", checkpoint_path) \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("mergeSchema", "true") \
.load(source_path)
landing_df = dropMetaColumns(landing_df)
def merge_batches(batchDF, batch_id):
batchDF = get_latest_records(batchDF)
# Check if the Delta table exists
if not table_exists:
print('Initial Load')
# Add metadata columns
batchDF = add_metadata_bulkload(batchDF)
#batchDF = add_metadata_bulkload(batchDF)
dfw = batchDF.write \
.mode('overwrite') \
.option('overwriteSchema', 'true') \
.saveAsTable(destination_table)
else:
print('Merge Load')
# If the Delta table exists, merge the data
deltaTable = DeltaTable.forName(spark, destination_table)
merge_condition = get_merge_condition(primary_key)
# Add any columns that dont exist in delta that exist in source
add_missing_columns(destination_table, batchDF)
# Get columns in target delta table
existing_columns = [field.name for field in spark.table(destination_table).schema.fields]
# upsert_columns = { f"target.`{c.name}`" : f"source.`{c.name}`" for c in batchDF.schema if c.name in existing_columns and c.name }
# Create update statement to update the target table
update_columns = get_upsert_condition(batchDF, loadAction.UPDATE.name, existing_columns)
insert_columns = get_upsert_condition(batchDF, loadAction.INSERT.name, existing_columns)
# Perform the merge operation for each batch
deltaTable.alias("target").merge(
batchDF.alias("source"),
merge_condition
) \
.whenMatchedUpdate(set=update_columns) \
.whenNotMatchedInsert(values=insert_columns) \
.execute()
# Stream data in micro-batches, merging it into the Delta table
landing_df.writeStream \
.foreachBatch(merge_batches) \
.outputMode("update") \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.option("mergeSchema", "true") \
.start() \
.awaitTermination()
然而,即使出现错误,数据似乎仍在加载。我不确定它是否会继续发布错误,并且由于会话仍在运行,因此它会继续。
我尝试向
awaitTermination()
添加超时值,但看起来似乎没有任何区别。
由于未收到任何更新而强制终止查询 xxxxxxxxxxxxx 60 秒内。 Spark 会话 ID 为 yyyyyyyyyyy 超时查询 xxxxxxxxxxxxx 已终止
出现上述错误是因为Databricks正在对查询执行超时。 如果
foreachBatch
处理需要更多时间,Databricks 会因不活动而强制终止。
我已经尝试过以下方法:
spark.conf.set("spark.databricks.streaming.timeout", "300")
将
spark.databricks.streaming.timeout
设置为大于默认 60 秒的值。
这使您的 foreachBatch 操作有更多的时间来完成每个批次而不会被中断。
然后就可以写Stream了:
landing_df.writeStream \
.foreachBatch(merge_batches) \
.outputMode("update") \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.option("mergeSchema", "true") \
.start() \
.awaitTermination()