由于 60 秒内未收到任何更新而强制终止查询 xxxxxx

问题描述 投票:0回答:1

我正在 Databricks 中使用结构化流将批处理文件加载到 UC 表中。但是它正在工作,如果 foreachBatch 没有在 60 秒内完成,则会产生以下错误:

Force terminating query xxxxxxxxxxxxx due to not receiving any updates in 60 seconds. Spark Session ID is yyyyyyyyyyy
Timedout query xxxxxxxxxxxxx terminated

这是完整的代码:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType  # Import necessary types

from delta.tables import DeltaTable
from pyspark.sql.functions import lit

    if debug:
        print('Streaming enabled: read')
    
    # Read the data into stream
    landing_df = spark.readStream \
        .format("cloudFiles") \
        .option("cloudfiles.format", "parquet") \
        .option("cloudFiles.schemaLocation", checkpoint_path) \
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
        .option("mergeSchema", "true") \
        .load(source_path)
    
    landing_df = dropMetaColumns(landing_df)
    
    def merge_batches(batchDF, batch_id):        
        batchDF = get_latest_records(batchDF)
    
        # Check if the Delta table exists
        if not table_exists:
            print('Initial Load')
    
            # Add metadata columns
            batchDF = add_metadata_bulkload(batchDF)
    
            #batchDF = add_metadata_bulkload(batchDF)
            dfw = batchDF.write \
                .mode('overwrite') \
                .option('overwriteSchema', 'true') \
                .saveAsTable(destination_table)
        else:           
    
            print('Merge Load')
            # If the Delta table exists, merge the data
            deltaTable = DeltaTable.forName(spark, destination_table)
            merge_condition = get_merge_condition(primary_key)
    
            # Add any columns that dont exist in delta that exist in source
            add_missing_columns(destination_table, batchDF)     
    
            # Get columns in target delta table
            existing_columns = [field.name for field in spark.table(destination_table).schema.fields] 
    
            # upsert_columns = { f"target.`{c.name}`" : f"source.`{c.name}`" for c in batchDF.schema if c.name in existing_columns and c.name }
            
            # Create update statement to update the target table
            update_columns = get_upsert_condition(batchDF, loadAction.UPDATE.name, existing_columns)
            insert_columns = get_upsert_condition(batchDF, loadAction.INSERT.name, existing_columns)
    
            # Perform the merge operation for each batch
            deltaTable.alias("target").merge(
                batchDF.alias("source"),
                merge_condition  
            ) \
            .whenMatchedUpdate(set=update_columns) \
            .whenNotMatchedInsert(values=insert_columns) \
            .execute()
    
    # Stream data in micro-batches, merging it into the Delta table
    landing_df.writeStream \
        .foreachBatch(merge_batches) \
        .outputMode("update") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(availableNow=True) \
        .option("mergeSchema", "true") \
        .start() \
        .awaitTermination()

然而,即使出现错误,数据似乎仍在加载。我不确定它是否会继续发布错误,并且由于会话仍在运行,因此它会继续。

我尝试向

awaitTermination()
添加超时值,但看起来似乎没有任何区别。

apache-spark databricks azure-databricks spark-structured-streaming
1个回答
0
投票

由于未收到任何更新而强制终止查询 xxxxxxxxxxxxx 60 秒内。 Spark 会话 ID 为 yyyyyyyyyyy 超时查询 xxxxxxxxxxxxx 已终止

出现上述错误是因为Databricks正在对查询执行超时。 如果

foreachBatch
处理需要更多时间,Databricks 会因不活动而强制终止。

我已经尝试过以下方法:

spark.conf.set("spark.databricks.streaming.timeout", "300")

spark.databricks.streaming.timeout
设置为大于默认 60 秒的值。 这使您的 foreachBatch 操作有更多的时间来完成每个批次而不会被中断。

然后就可以写Stream了:

landing_df.writeStream \
    .foreachBatch(merge_batches) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(availableNow=True) \
    .option("mergeSchema", "true") \
    .start() \
    .awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.