我正在尝试使用左外连接使用 Spark 结构化流进行流-流自连接,以便我可以在之后拆分连接和未连接的行。 我的设置如下:
df_source = app.spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "XXXXXXX") \
.option("subscribe", "XXXXXXX") \
.option("startingOffsets", "earliest") \
.option("kafka.group.id", "924ee006-c268-11ed-afa1-0242ac120002") \
.load() \
df_source.createOrReplaceTempView("df_source")
然后我逐渐从 kafka 消息中获取 json 字符串并将 json 解析为我需要的模式:
df0 = app.spark.sql("""\
----------------------
SELECT
CAST(value AS STRING) AS json
FROM df_source
----------------------
""")
df0.createOrReplaceTempView("df0")
df1 = app.spark.sql("""\
----------------------
SELECT
from_json(json, 'struct< `metadata` : struct< `namespace` : string
, `name` : string
, `name0` : string
, `size0` : int
, `message0` : struct< `id` : string
, `type` : string
, `timestamp` : string
, `date` : string
, `time` : string
, `process_name` : string
, `loglevel` : string
, `process_id` : string
>
>
, `spec` : struct< `fix` : string
, `source_process_name` : string
, `sink_process_name` : string
, `source_CLORDID` : string
, `sink_CLORDID` : string
, `action` : string
>
, `@timestamp` : timestamp
>
') AS dict
FROM df0
----------------------
""")
df1.createOrReplaceTempView("df1")
我正在尝试在 df1 上执行自连接,其中 dict.spec.sink_CLORDID == dict.spec.source_CLORDID。 我已经关注了spark结构化流媒体文档
我正在根据我的事件时间戳定义水印:
df1_new = df1.withWatermark("`dict.@timestamp`", "2 minutes")
然后我尝试做自连接:
df2 = df1.alias("orig").join(df1_new.alias("new"),
expr("""
orig.dict.spec.sink_CLORDID = new.dict.spec.source_CLORDID AND
\"new.dict.@timestamp\" >= \"orig.dict.@timestamp\" AND
\"new.dict.@timestamp\" <= \"orig.dict.@timestamp\" + interval 1 minute
"""),
"leftOuter"
)
但是我收到以下错误消息:
如果连接键中没有水印,或者可空侧的水印和适当的范围条件,则不支持两个流式 DataFrame/Dataset 之间的流流 LeftOuter 连接
我有几个问题:
将问题留给将来有相同/类似问题的任何人,我的问题是由@timestamp 列中的转义字符引起的
\"new.dict.@timestamp\"
等这些被视为字符串而不是列