使用 PySpark 流式传输数据时,我收到的主要消息封装在名为“body”的键中。
spark.readStream.format("eventhubs").options(**ehConf).load()
有没有办法避免这种情况,直接获取数据?
我正在尝试展平
body
,但传入的数据缺乏一致的结构,这使得处理具有挑战性。
正如您在使用 PySpark 使用流数据时提到的,您被封装在名为“body”的键中。
body属性中的数据以二进制格式存储。 要提取其元素,您需要显式定义结构。
就像下面的例子:
from pyspark.sql.types import *
Schema = StructType([StructField("name", StringType(), True),
StructField("dt", LongType(), True),
StructField("main", StructType(
[StructField("temp", DoubleType()),
StructField("pressure", DoubleType())])),
StructField("coord", StructType(
[StructField("lon", DoubleType()),
StructField("lat", DoubleType())]))
])
from pyspark.sql.functions import *
rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")
On the resulting dataframe you can apply functions, e. g. call the custom function u_make_hash on the column 'name':
parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))
在上面的 coe 中,我将二进制“body”列解码为
string
并使用模式进行解析。
为“name”列定义自定义
hash
函数。
结果:
+-----+--------------------+
|name |namehash |
+-----+--------------------+
|CityA|-3611951129487740868|
|CityB|5969622088498840125 |
+-----+--------------------+