我问的问题——一旦回答——帮助我写一些测试 Spark 结构化流管道,从流源读取和 写入 s3/parquet,但我简化了一些事情,所以目标是简单地写入 控制台,因为新记录被添加到 MemoryStream。我几乎有这个工作。但唯一的问题是 下面的代码是我添加到 Stream 的第一个项目确实输出到控制台。然而, 在我睡眠之后(故意超出查询的触发处理间隔),我没有看到 我在睡眠后添加的项目出现在控制台上。
输出基本上是这样的:
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|Alice|
| Bob|
+-----+
我想看到第二批值为“Mouse”,但当我运行下面的代码时它从未出现过。 在此先感谢您的指导!
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object StreamTableDemo {
def main(args: Array[String]): Unit = {
implicit val spark = SparkSession
.builder
.appName("StreamTableDemo")
.master("local[*]")
.getOrCreate()
// Define the schema for the stream data
val schema = new StructType() .add("id", IntegerType)
implicit val stringEncoder = ExpressionEncoder[String]
// Create a MemoryStream and a DataFrame based on the schema
val stream = new MemoryStream[String](2, spark.sqlContext, Some(1))
val streamDF = stream.toDF()
// Create a temporary view from the stream DataFrame
streamDF.createOrReplaceTempView("T")
// Define a streaming query to output the records in T to the console
val query = spark.table("T")
.writeStream
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 seconds"))
.format("console")
.start()
// Add data to the stream
stream.addData("Alice")
stream.addData("Bob")
// Wait for the query to terminate
query.awaitTermination()
Thread.sleep(2 * 1000)
stream.addData("Mouse")
}
}