使用事件中心管理 PySpark Streaming 中的数据封装

问题描述 投票:0回答:1

使用 PySpark 流式传输数据时,我收到的主要消息封装在名为“body”的键中。

spark.readStream.format("eventhubs").options(**ehConf).load()

有没有办法避免这种情况,直接获取数据?

我正在尝试展平

body
,但传入的数据缺乏一致的结构,这使得处理具有挑战性。

azure pyspark databricks spark-streaming
1个回答
0
投票

正如您在使用 PySpark 使用流数据时提到的,您被封装在名为“body”的键中。

body属性中的数据以二进制格式存储。 要提取其元素,您需要显式定义结构。

就像下面的例子:

from pyspark.sql.types import *
Schema = StructType([StructField("name", StringType(), True),
                      StructField("dt", LongType(), True),
                      StructField("main", StructType( 
                          [StructField("temp", DoubleType()), 
                           StructField("pressure", DoubleType())])),
                      StructField("coord", StructType( 
                          [StructField("lon", DoubleType()), 
                           StructField("lat", DoubleType())]))
                    ])

                
from pyspark.sql.functions import *
rawData = df. \
  selectExpr("cast(Body as string) as json"). \
  select(from_json("json", Schema).alias("data")). \
  select("data.*")
  On the resulting dataframe you can apply functions, e. g. call the custom function u_make_hash on the column 'name':
parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))

在上面的 coe 中,我将二进制“body”列解码为

string
并使用模式进行解析。 为“
name
”列定义自定义 hash 函数。

结果:

+-----+--------------------+
|name |namehash            |
+-----+--------------------+
|CityA|-3611951129487740868|
|CityB|5969622088498840125 |
+-----+--------------------+



© www.soinside.com 2019 - 2024. All rights reserved.