Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka

问题描述 投票:0回答:1

我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下:

df_source = app.spark.readStream.format("kafka") \
                 .option("kafka.bootstrap.servers", "XXXXXXX") \
                 .option("subscribe", "XXXXXXX") \
                 .option("startingOffsets", "earliest") \
                 .option("kafka.group.id", "924ee006-c268-11ed-afa1-0242ac120002") \
                 .load() \

df_source.createOrReplaceTempView("df_source")

然后我逐渐从 kafka 消息中获取 json 字符串并将 json 解析为我需要的模式:

df0 = app.spark.sql("""\
----------------------
SELECT
    CAST(value AS STRING) AS json
FROM df_source
----------------------
""")

df0.createOrReplaceTempView("df0")
df1 = app.spark.sql("""\
----------------------
SELECT
    from_json(json,        'struct< `metadata`   : struct< `namespace` : string 
                                                         , `name`      : string
                                                         , `name0`     : string 
                                                         , `size0`     : int
                                                         , `message0`  : struct< `id`           : string
                                                                               , `type`         : string
                                                                               , `timestamp`    : string
                                                                               , `date`         : string
                                                                               , `time`         : string
                                                                               , `process_name` : string
                                                                               , `loglevel`     : string
                                                                               , `process_id`   : string
                                                                               >
                                                         >
                                  , `spec`       : struct< `fix`                  : string
                                                         , `source_process_name`  : string
                                                         , `sink_process_name`    : string
                                                         , `source_CLORDID`       : string
                                                         , `sink_CLORDID`         : string
                                                         , `action`               : string
                                  >
                                  , `@timestamp` : timestamp
                                  >
                           ')               AS dict
FROM df0
----------------------
""")

df1.createOrReplaceTempView("df1")

我正在尝试在 df1 上执行自连接,其中 dict.spec.sink_CLORDID == dict.spec.source_CLORDID。 我已经关注了spark结构化流媒体文档

我正在根据我的事件时间戳定义水印:

df1_new = df1.withWatermark("`dict.@timestamp`", "2 minutes")

然后我尝试做自连接:

df2 = df1.alias("orig").join(df1_new.alias("new"), 
                     expr("""
                         orig.dict.spec.sink_CLORDID = new.dict.spec.source_CLORDID AND
                         \"new.dict.@timestamp\" >= \"orig.dict.@timestamp\" AND
                         \"new.dict.@timestamp\" <= \"orig.dict.@timestamp\" + interval 1 minute
                    """),
                     "leftOuter"
                )

但是我收到以下错误消息:

如果连接键中没有水印,或者可空侧的水印和适当的范围条件,则不支持两个流式 DataFrame/Dataset 之间的流流 LeftOuter 连接

我有几个问题:

  1. 我完全遵循了关于连接的文档示例,为什么我仍然收到水印错误?
  2. 当我从流中增量定义数据帧 -> 我想加入的数据集时,我是否需要为所有数据定义一个水印以便清除状态?
apache-spark pyspark apache-kafka spark-structured-streaming
1个回答
0
投票

将问题留给将来有相同/类似问题的任何人,我的问题是由@timestamp 列中的转义字符引起的

\"new.dict.@timestamp\" 

等这些被视为字符串而不是列

© www.soinside.com 2019 - 2024. All rights reserved.