我有一个数据帧,我使用 Spark Structured Streaming .readStream() 进行流式传输:
身份证 | json_数据 |
---|---|
123 | {颜色:“红色”,值:“#f00”} |
125 | {颜色:“蓝色”,值:“#f45”} |
我想将每行中的每个 json_data 作为 json 有效负载发送到 Rest API。最好的方法是什么?
我知道 Databricks 有一个数据框编写器(https://docs.databricks.com/en/structed-streaming/foreach.html),但不清楚我会如何做到这一点。
我需要将列写入Python字典吗?
有点困惑这个脚本如何工作,流数据进入并附加到数据帧上,但我需要 json_data 列(存储为字符串)作为有效负载。
试试这个:
def export_to_api(microBatchOutputDF, batchId):
microBatchOutputDF_array = microBatchOutputDF.collect()
for row in microBatchOutputDF_array:
json_content = row.json_data
# Enter solution for exporting to api
<>
# Write the output of a streaming aggregation query into Delta table
(streaming_data.writeStream
.format("delta")
.foreachBatch(export_to_api)
)