PySpark 流代码中的动态键/列重命名

问题描述 投票:0回答:1

我正在尝试动态重命名流数据中的列,但标准方法似乎不适用于流。

我的代码如下。

json_df = spark.readStream.format("eventhubs").options(**ehConf).load()
json_df = json_df.withColumn("body", json_df.body.cast("string"))
json_df = json_df.withColumn("body", F.from_json(json_df.body, MapType(StringType(), StringType())))

# Rename the MAIN_MESSAGE column to the value of TYPE
type_value = json_df.select("TYPE").distinct().collect()[0][0]
json_df = json_df.withColumnRenamed("MAIN_MESSAGE", type_value)

我收到的错误消息如下

AnalysisException: Queries with streaming sources must be executed with writeStream.start(); eventhubs

错误突出显示以下行

type_value = json_df.select("TYPE").distinct().collect()[0][0]

pyspark databricks spark-streaming
1个回答
0
投票

readStream()
返回流数据帧。它的工作原理不同。了解如何使用流数据帧。您需要根据您想要做什么来选择正确的水槽。例如

json_df = spark.readStream.format("eventhubs").options(**ehConf).load()
json_df = json_df.withColumn("body", json_df.body.cast("string"))
json_df = json_df.withColumn("body", F.from_json(json_df.body, MapType(StringType(), StringType())))

streaming_query = (
    json_df.select('TYPE')
    .writeStream
    .format("memory")
    .queryName("temp_type_table")
    .start()
)

# Rename the MAIN_MESSAGE column to the value of TYPE
type_value = spark.sql('select distinct TYPE from temp_type_table')[0].asDict()['TYPE']
# same as following:
# type_value = spark.read.table('temp_type_table').select('TYPE').distinct().collect()[0].asDict()['TYPE']
json_df = json_df.withColumnRenamed("MAIN_MESSAGE", type_value)

你还必须清理:

streaming_query.awaitTermination()   # OR
streaming_query.stop()

# maybe drop the temp_type_table

您还应该考虑如何开始直播,使用诸如

availableNow
之类的触发器或任何适合您的账单的触发器。

根据经验,所有流数据帧写入操作中的最后一个函数调用是

start()
(这是您的错误消息)。

参见 流式源的查询必须使用 writeStream.start();

执行

总而言之,请花几个小时阅读手册

© www.soinside.com 2019 - 2024. All rights reserved.