Spark 中的 StreamQueryListener 不执行 onQueryProgress() 中的代码

问题描述 投票:0回答:3

我正在从 Databricks 增量表中读取数据作为流并将其写入另一个增量表(使用屏幕截图中的控制台以便于调试),我想利用 Spark 的 StreamingQueryListener() 并使用 onQueryProgress() 来打印输入此处代码片段中的批处理中的行用于调试。不知道我在这里错过了什么!

这让我想到这个功能是否仅适用于 Kafka,但我通过使用 Kafka 源也得到了相同的结果。任何帮助表示赞赏

%scala
import org.apache.spark.sql.streaming._
val streamingCountsListener = new StreamingQueryListener() {
  override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {
      println("query started")
  }
  override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {
        println("query made stopped")
      
  }
  override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = {
    queryProgress.progress.sources.foreach(src => {
      println(src.numInputRows)
      println("\n\n\n\n")
    })
  }
}

// Add this query listener to the session
spark.streams.addListener(streamingCountsListener)

var x = spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .table(s"qastg.student")

val query = x.writeStream
  .format("console")
  .outputMode("append")
  .start()

query.awaitTermination()

代码片段

尝试使用Kafka Source并使用简单的打印语句进行调试,但没有成功

apache-spark databricks etl spark-streaming spark-structured-streaming
3个回答
1
投票

看到

%scala
,我认为这是在笔记本内的Databricks上。如果是这样,则开箱即用提供此信息。如果您需要此默认 Databricks 侦听器未提供的内容,您应该能够在 驱动程序的 stdout 日志中看到自定义侦听器输出。


0
投票

将事件传递给查询监听器:

queryStarted: StreamingQueryListener.QueryStartedEvent queryStarted

0
投票

这些 println 语句输出到 stdout。

从 Databricks UI 转到“集群详细信息”内的“驱动程序日志”选项卡,您将看到预期的输出。

© www.soinside.com 2019 - 2024. All rights reserved.