我正在从 Databricks 增量表中读取数据作为流并将其写入另一个增量表(使用屏幕截图中的控制台以便于调试),我想利用 Spark 的 StreamingQueryListener() 并使用 onQueryProgress() 来打印输入此处代码片段中的批处理中的行用于调试。不知道我在这里错过了什么!
这让我想到这个功能是否仅适用于 Kafka,但我通过使用 Kafka 源也得到了相同的结果。任何帮助表示赞赏
%scala
import org.apache.spark.sql.streaming._
val streamingCountsListener = new StreamingQueryListener() {
override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {
println("query started")
}
override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("query made stopped")
}
override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = {
queryProgress.progress.sources.foreach(src => {
println(src.numInputRows)
println("\n\n\n\n")
})
}
}
// Add this query listener to the session
spark.streams.addListener(streamingCountsListener)
var x = spark.readStream.format("delta")
.option("ignoreChanges", "true")
.table(s"qastg.student")
val query = x.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination()
尝试使用Kafka Source并使用简单的打印语句进行调试,但没有成功
看到
%scala
,我认为这是在笔记本内的Databricks上。如果是这样,则开箱即用提供此信息。如果您需要此默认 Databricks 侦听器未提供的内容,您应该能够在 驱动程序的 stdout 日志中看到自定义侦听器输出。
将事件传递给查询监听器:
queryStarted: StreamingQueryListener.QueryStartedEvent queryStarted
这些 println 语句输出到 stdout。
从 Databricks UI 转到“集群详细信息”内的“驱动程序日志”选项卡,您将看到预期的输出。