如何加入2个spark sql流

问题描述 投票:4回答:1

ENV:Scala spark版本:2.1.1

这是我的溪流(从卡夫卡读取):

val conf = new SparkConf()
  .setMaster("local[1]")
  .setAppName("JoinStreams")

val spark = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._

val schema = StructType(
  List(
    StructField("t", DataTypes.StringType),
    StructField("dst", DataTypes.StringType),
    StructField("dstPort", DataTypes.IntegerType),
    StructField("src", DataTypes.StringType),
    StructField("srcPort", DataTypes.IntegerType),
    StructField("ts", DataTypes.LongType),
    StructField("len", DataTypes.IntegerType),
    StructField("cpu", DataTypes.DoubleType),
    StructField("l", DataTypes.StringType),
    StructField("headers", DataTypes.createArrayType(DataTypes.StringType))
  )
)
val baseDataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", 'topic')
  .load()
  .selectExpr("cast (value as string) as json")
  .select(from_json($"json", schema).as("data"))
  .select($"data.*")

val requestsDataFrame = baseDataFrame
  .filter("t = 'REQUEST'")
  .repartition($"dst")
  .withColumn("rowId", monotonically_increasing_id())

val responseDataFrame = baseDataFrame
  .filter("t = 'RESPONSE'")
  .repartition($"src")
  .withColumn("rowId", monotonically_increasing_id())

responseDataFrame.createOrReplaceTempView("responses")
requestsDataFrame.createOrReplaceTempView("requests")


val dataFrame = spark.sql("select * from requests left join responses ON requests.rowId = responses.rowId")

启动应用程序时出现此错误:

org.apache.spark.sql.AnalysisException: Left outer/semi/anti joins with a streaming DataFrame/Dataset on the right is not supported;;

我如何加入这两个流?我也尝试直接连接并得到相同的错误。我应该先将它保存到文件然后再读一遍吗?什么是最佳做法?

scala apache-spark-sql
1个回答
1
投票

看来你需要Spark 2.3:

“在Spark 2.3中,我们增加了对流 - 流连接的支持......”

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins

© www.soinside.com 2019 - 2024. All rights reserved.