我正在使用 Pyspark 结构化流处理 Databricks,并且希望捕获我自己在作为“.foreachBatch”函数传递到流的函数中引发的异常。
这是我的示例代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQueryException
table_location = "hive_metastore.default.test_example"
checkpoint_location = "/tmp/test_checkpoint"
schema = StructType([
StructField("BestellID", IntegerType(), True),
StructField("CRDAT", StringType(), True),
StructField("Menge", IntegerType(), True),
StructField("__cmi_ingestion_ts", StringType(), True)
])
data = [(1, '20240901', 3, '20241002')]
df = spark.createDataFrame(data, schema)
df.write.mode("overwrite").saveAsTable(table_location)
deltaTable = DeltaTable.forName(spark, table_location)
def mergetoDF(df, batchID):
raise ValueError("This is an error")
def test_run():
try:
inbound_data = (spark.readStream.format("delta").table(table_location))
streamQuery = (inbound_data
.writeStream
.format("delta")
.outputMode("append")
.foreachBatch(mergetoDF)
.trigger(once=True)
.option("checkpointLocation", checkpoint_location)
.start()
)
streamQuery.awaitTermination()
except ValueError:
print('I am a value error')
except StreamingQueryException as e:
if 'FOREACH_BATCH_USER_FUNCTION_ERROR' in str(e):
print('I am a StreamingQueryException')
test_run()
我希望能够捕获“foreachBatch”函数内部引发的错误,而无需在屏幕上打印任何其他错误消息。但目前,捕获了 StreamingQueryException 并告知了其他错误
ERROR: Some streams terminated before this command could finish!
org.apache.spark.api.python.PythonException: Found error inside foreachBatch Python process: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 99, in main
process(df_ref_id, int(batch_id))
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 86, in process
func(batch_df, batch_id)
File "/home/spark-19129aeb-c024-45c4-ac47-ef/.ipykernel/54200/command-3034895255467445-2795320295", line 7, in mergetoDF
ValueError: This is an error
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$6(StreamingForeachBatchHelper.scala:199)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$6$adapted(StreamingForeachBatchHelper.scala:174)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$dataFrameCachingWrapper$1(StreamingForeachBatchHelper.scala:91)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$dataFrameCachingWrapper$1$adapted(StreamingForeachBatchHelper.scala:80)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:172)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchOptimized$2(ForeachBatchSink.scala:247)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:190)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchOptimized(ForeachBatchSink.scala:247)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:99)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:537)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:91)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.addBatch(MicroBatchExecution.scala:1238)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1456)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:325)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.markAndTimeCollectBatch(MicroBatchExecution.scala:1246)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:1456)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:462)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:800)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:334)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:205)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:737)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:1449)
at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:325)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1449)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$6(MicroBatchExecution.scala:760)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.handleDataSourceException(MicroBatchExecution.scala:1868)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$5(MicroBatchExecution.scala:760)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1813)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$4(MicroBatchExecution.scala:756)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:325)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$3(MicroBatchExecution.scala:716)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:74)
at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:175)
at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:617)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:729)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:738)
at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:617)
at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:615)
at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:710)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:671)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:671)
at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:85)
at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:73)
at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.runOneBatch(TriggerExecutor.scala:97)
at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:671)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:448)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:454)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:401)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:74)
at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:175)
at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:617)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:729)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:738)
at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:617)
at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:615)
at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:86)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:381)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:283)
导致脚本状态“上次执行失败”。
实际上,我想编写 pytest 测试并期望测试中出现异常。
.foreachBatch(mergetoDF)
据我所知,如果
mergetoDF()
抛出异常,那么它总是会传播到主线程(一个调用 streamQuery.start()
)。
“处理”传递给
foreachBatch()
的函数代码中的错误的唯一方法是在您传递的函数内部(在本例中为 mergetoDF
)。有点类似于处理 UDF 内错误的方式。所以例如你可以
errors
的新列并将错误存储在其中。实际上,我想编写 pytest 测试并期望测试中出现异常。
一旦将错误处理移至
mergetoDF()
内部,您的测试将仅测试 mergetoDF()
产生异常的各种错误场景。
对
test_run()
的测试会嘲笑大部分 spark
。 IE。您测试的是您的代码而不是 Spark 代码。