我正在尝试动态重命名流数据中的列,但标准方法似乎不适用于流。
我的代码如下。
json_df = spark.readStream.format("eventhubs").options(**ehConf).load()
json_df = json_df.withColumn("body", json_df.body.cast("string"))
json_df = json_df.withColumn("body", F.from_json(json_df.body, MapType(StringType(), StringType())))
# Rename the MAIN_MESSAGE column to the value of TYPE
type_value = json_df.select("TYPE").distinct().collect()[0][0]
json_df = json_df.withColumnRenamed("MAIN_MESSAGE", type_value)
我收到的错误消息如下
AnalysisException: Queries with streaming sources must be executed with writeStream.start(); eventhubs
错误突出显示以下行
type_value = json_df.select("TYPE").distinct().collect()[0][0]
readStream()
返回流数据帧。它的工作原理不同。了解如何使用流数据帧。您需要根据您想要做什么来选择正确的水槽。例如
json_df = spark.readStream.format("eventhubs").options(**ehConf).load()
json_df = json_df.withColumn("body", json_df.body.cast("string"))
json_df = json_df.withColumn("body", F.from_json(json_df.body, MapType(StringType(), StringType())))
streaming_query = (
json_df.select('TYPE')
.writeStream
.format("memory")
.queryName("temp_type_table")
.start()
)
# Rename the MAIN_MESSAGE column to the value of TYPE
type_value = spark.sql('select distinct TYPE from temp_type_table')[0].asDict()['TYPE']
# same as following:
# type_value = spark.read.table('temp_type_table').select('TYPE').distinct().collect()[0].asDict()['TYPE']
json_df = json_df.withColumnRenamed("MAIN_MESSAGE", type_value)
你还必须清理:
streaming_query.awaitTermination() # OR
streaming_query.stop()
# maybe drop the temp_type_table
您还应该考虑如何开始直播,使用诸如
availableNow
之类的触发器或任何适合您的账单的触发器。
根据经验,所有流数据帧写入操作中的最后一个函数调用是
start()
(这是您的错误消息)。
参见 流式源的查询必须使用 writeStream.start();
执行总而言之,请花几个小时阅读手册。