我正在尝试将 json 消息提取到 AZURE 事件中心 我的问题是 json 消息的大小,因为事件中心的限制为 1MB 我有一个很大的 json 消息,它由多个 json 消息组成
DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
此数据是一个示例。 DATA 已经是 json 格式,但 DATA 包含 10000+ 具有相同格式的 json 事件 我想将此 json 消息提取到事件中心
任何人都可以帮我吗?我如何将这一重要消息提取到事件中心?通过切片或其他方式
非常感谢!
尹植
我尝试对其进行切片,但一条 json 消息中的事件数量总是不同且非常大...
最有效的方法是将庞大的消息列表分成多个批次,然后将这些批次一一发送到Eventhub。 批次的大小由每条消息的大小决定,请记住,您只能在单个批次中发送 1MB。假设单个消息的平均大小为 100 字节,则每批大约会收到 10K 条消息。为了安全起见,您可以将其减少到 5000-8000。
下面是一段代码,它将原始消息 JSON 数组 (DATA) 分解为单独批次的 JSON 数组,并将它们一次发送一个到 eventhub。您可以将batch_limit调整为5000-8000之间的任何值。既然您说数组中的消息数量可以是10K+,您将只需要2-3个批次即可发送该消息。
import time
import asyncio
import os
import json
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData
CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxx='
EVENTHUB_NAME = 'xxxxxxxxxxxxxx'
DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
batch_limit = 2
async def run():
print('started')
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)
batch_cnt = 0
msg_array = []
for DATA_msg in DATA:
msg_array.append(DATA_msg)
batch_cnt += 1
if batch_cnt > batch_limit:
async with producer:
event_data_batch = await producer.create_batch()
for msg in msg_array:
event_data_batch.add(EventData(json.dumps(msg)))
await producer.send_batch(event_data_batch)
print('sent a batch of messages')
batch_cnt = 0
msg_array = []
if batch_cnt > 0:
async with producer:
event_data_batch = await producer.create_batch()
for msg in msg_array:
event_data_batch.add(EventData(json.dumps(msg)))
await producer.send_batch(event_data_batch)
print('sent remaining messages as last batch')
start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))