使用python将json消息提取到甚至集线器中

问题描述 投票:0回答:1

我正在尝试将 json 消息提取到 AZURE 事件中心 我的问题是 json 消息的大小,因为事件中心的限制为 1MB 我有一个很大的 json 消息,它由多个 json 消息组成

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]

此数据是一个示例。 DATA 已经是 json 格式,但 DATA 包含 10000+ 具有相同格式的 json 事件 我想将此 json 消息提取到事件中心

任何人都可以帮我吗?我如何将这一重要消息提取到事件中心?通过切片或其他方式

非常感谢!

尹植

我尝试对其进行切片,但一条 json 消息中的事件数量总是不同且非常大...

json azure slice data-ingestion eventhub
1个回答
0
投票

最有效的方法是将庞大的消息列表分成多个批次,然后将这些批次一一发送到Eventhub。 批次的大小由每条消息的大小决定,请记住,您只能在单个批次中发送 1MB。假设单个消息的平均大小为 100 字节,则每批大约会收到 10K 条消息。为了安全起见,您可以将其减少到 5000-8000。

下面是一段代码,它将原始消息 JSON 数组 (DATA) 分解为单独批次的 JSON 数组,并将它们一次发送一个到 eventhub。您可以将batch_limit调整为5000-8000之间的任何值。既然您说数组中的消息数量可以是10K+,您将只需要2-3个批次即可发送该消息。

import time
import asyncio
import os
import json

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData

CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxx='
EVENTHUB_NAME = 'xxxxxxxxxxxxxx'

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
batch_limit = 2
async def run():
    print('started')
    producer = EventHubProducerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        eventhub_name=EVENTHUB_NAME
    )
    batch_cnt = 0
    msg_array = []
    for DATA_msg in DATA:
      msg_array.append(DATA_msg)
      batch_cnt += 1
      if batch_cnt > batch_limit:
        async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent a batch of messages')
        batch_cnt = 0
        msg_array = []
    if batch_cnt > 0:
       async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent remaining messages as last batch')
      
start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))
© www.soinside.com 2019 - 2024. All rights reserved.