捕获 foreachBatch 函数中引发的异常

问题描述 投票:0回答:1

我正在使用 Pyspark 结构化流处理 Databricks,并且希望捕获我自己在作为“.foreachBatch”函数传递到流的函数中引发的异常。

这是我的示例代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQueryException

table_location = "hive_metastore.default.test_example"
checkpoint_location = "/tmp/test_checkpoint"

schema = StructType([
    StructField("BestellID", IntegerType(), True),
    StructField("CRDAT", StringType(), True),
    StructField("Menge", IntegerType(), True),
    StructField("__cmi_ingestion_ts", StringType(), True)
])

data = [(1, '20240901', 3, '20241002')]
df = spark.createDataFrame(data, schema)
df.write.mode("overwrite").saveAsTable(table_location)

deltaTable = DeltaTable.forName(spark, table_location)

def mergetoDF(df, batchID):
    raise ValueError("This is an error")

def test_run():

    try:
        inbound_data = (spark.readStream.format("delta").table(table_location))

        streamQuery = (inbound_data                           
                    .writeStream
                    .format("delta")
                    .outputMode("append")
                    .foreachBatch(mergetoDF)
                    .trigger(once=True)
                    .option("checkpointLocation", checkpoint_location)
                    .start()
                )
        streamQuery.awaitTermination()
    except ValueError:
        print('I am a value error')
    except StreamingQueryException as e:
        if 'FOREACH_BATCH_USER_FUNCTION_ERROR' in str(e):
            print('I am a StreamingQueryException')

test_run()

我希望能够捕获“foreachBatch”函数内部引发的错误,而无需在屏幕上打印任何其他错误消息。但目前,捕获了 StreamingQueryException 并告知了其他错误

ERROR: Some streams terminated before this command could finish!
org.apache.spark.api.python.PythonException: Found error inside foreachBatch Python process: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 99, in main
    process(df_ref_id, int(batch_id))
  File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 86, in process
    func(batch_df, batch_id)
  File "/home/spark-19129aeb-c024-45c4-ac47-ef/.ipykernel/54200/command-3034895255467445-2795320295", line 7, in mergetoDF
ValueError: This is an error

    at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$6(StreamingForeachBatchHelper.scala:199)
    at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$6$adapted(StreamingForeachBatchHelper.scala:174)
    at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$dataFrameCachingWrapper$1(StreamingForeachBatchHelper.scala:91)
    at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$dataFrameCachingWrapper$1$adapted(StreamingForeachBatchHelper.scala:80)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:172)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchOptimized$2(ForeachBatchSink.scala:247)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:190)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchOptimized(ForeachBatchSink.scala:247)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:99)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:537)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:91)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.addBatch(MicroBatchExecution.scala:1238)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1456)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:325)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.markAndTimeCollectBatch(MicroBatchExecution.scala:1246)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:1456)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:462)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:800)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:334)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:205)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:737)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:1449)
    at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:325)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1449)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$6(MicroBatchExecution.scala:760)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.handleDataSourceException(MicroBatchExecution.scala:1868)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$5(MicroBatchExecution.scala:760)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1813)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$4(MicroBatchExecution.scala:756)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:325)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$3(MicroBatchExecution.scala:716)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
    at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
    at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
    at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
    at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:74)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:175)
    at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:617)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:729)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:738)
    at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:617)
    at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:615)
    at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:710)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:671)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:671)
    at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:85)
    at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:73)
    at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.runOneBatch(TriggerExecutor.scala:97)
    at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:104)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:671)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:448)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:454)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:401)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
    at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
    at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
    at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
    at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:74)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:175)
    at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:617)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:729)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:738)
    at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:617)
    at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:615)
    at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:86)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:381)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:283)

导致脚本状态“上次执行失败”。

实际上,我想编写 pytest 测试并期望测试中出现异常。

python pyspark databricks spark-structured-streaming
1个回答
0
投票

.foreachBatch(mergetoDF)

据我所知,如果

mergetoDF()
抛出异常,那么它总是会传播到主线程(一个调用
streamQuery.start()
)。

“处理”传递给

foreachBatch()
的函数代码中的错误的唯一方法是在您传递的函数内部(在本例中为
mergetoDF
)。有点类似于处理 UDF 内错误的方式。所以例如你可以

  • 共享文件系统中的某些文件写入错误。在集群中共享,如 dbfs 或 s3 或 adls。
  • 向输出 df 添加一个名为
    errors
    的新列并将错误存储在其中。

实际上,我想编写 pytest 测试并期望测试中出现异常。

一旦将错误处理移至

mergetoDF()
内部,您的测试将仅测试
mergetoDF()
产生异常的各种错误场景。

test_run()
的测试会嘲笑大部分
spark
。 IE。您测试的是您的代码而不是 Spark 代码。

© www.soinside.com 2019 - 2024. All rights reserved.