我将压缩数据发送到事件中心,以克服 Azure 事件中心中的 1 MB 硬限制。 我还必须在 Py-spark 中阅读此内容并更新增量表。
发送到事件中心的压缩数据在 Py-spark 流中为空。怎么读?
这就是我从事件中心阅读的方式
df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
这就是我将数据发送到事件中心的方式。 event_data_batch =等待生产者.create_batch()
# Add events to the batch.
body = '{"id": "90", "firstName": "Sudarshan70","middleName": "Kumar2","lastName": "Thakur2"}'
# Compress the JSON string using the gzip algorithm.
compressed_body = gzip.compress(body.encode('utf-8'))
# Encode the compressed JSON string to base64.
encoded_compressed_body = base64.b64encode(compressed_body)
event_data_batch.add(EventData(encoded_compressed_body))
我尝试使用 gzip 选项进行阅读,但它给了我 null 。
df_stream = spark.readStream.format("eventhubs")\
.options(**ehConf)\
.option("compression", "gzip") \
.load()
您需要对列
body
进行解压缩,选项.option("compression", "gzip")
适用于压缩文件,而不是列上的压缩数据。
因此,需要创建用户定义的函数来解压缩和解码数据。 使用下面的代码。
UDF
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import gzip,base64,json
def unZip(binary_string):
return gzip.decompress(base64.b64decode(binary_string)).decode('utf-8')
unzip = F.udf(unZip, StringType())
接下来,使用解压缩的数据创建新列。
df.withColumn("bd", unzip(F.col("body").cast('string'))).display()
输出: