想要创建持续运行的 Spark 流式查询,该查询从 MemoryStream[String] 读取并输出到控制台

问题描述 投票:0回答:0

我问的问题——一旦回答——帮助我写一些测试 Spark 结构化流管道,从流源读取和 写入 s3/parquet,但我简化了一些事情,所以目标是简单地写入 控制台,因为新记录被添加到 MemoryStream。我几乎有这个工作。但唯一的问题是 下面的代码是我添加到 Stream 的第一个项目确实输出到控制台。然而, 在我睡眠之后(故意超出查询的触发处理间隔),我没有看到 我在睡眠后添加的项目出现在控制台上。

输出基本上是这样的:

    Batch: 0
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |Alice|
    |  Bob|
    +-----+

我想看到第二批值为“Mouse”,但当我运行下面的代码时它从未出现过。 在此先感谢您的指导!

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object StreamTableDemo {
  def main(args: Array[String]): Unit = {
    implicit val spark = SparkSession
      .builder
      .appName("StreamTableDemo")
      .master("local[*]")
      .getOrCreate()

    // Define the schema for the stream data
    val schema = new StructType() .add("id", IntegerType)

    implicit val stringEncoder = ExpressionEncoder[String]

    // Create a MemoryStream and a DataFrame based on the schema
    val stream = new MemoryStream[String](2, spark.sqlContext, Some(1))
    val streamDF = stream.toDF()


    // Create a temporary view from the stream DataFrame
    streamDF.createOrReplaceTempView("T")

    // Define a streaming query to output the records in T to the console
    val query = spark.table("T")
      .writeStream
      .outputMode(OutputMode.Append())
      .trigger(Trigger.ProcessingTime("1 seconds"))
      .format("console")
      .start()

    // Add data to the stream
    stream.addData("Alice")
    stream.addData("Bob")

    // Wait for the query to terminate
    query.awaitTermination()
    Thread.sleep(2 * 1000)
    stream.addData("Mouse")
  }
}
apache-spark memory console spark-streaming
© www.soinside.com 2019 - 2024. All rights reserved.