Python中百万级json对象高效解析处理

问题描述 投票:0回答:1

我有一些工作代码需要显着提高运行时间,但我很迷失。本质上,我将获得包含数万个 json 文件的 zip 文件夹,每个文件包含大约 1,000 条 json 消息。每个文件中散布着大约 15 种不同类型的 json 对象,其中一些对象内部包含字典列表,而其他对象则非常简单。我需要读入所有数据,解析对象并提取相关信息,然后将解析后的数据传回并使用第三方软件的 API 将其插入到不同的程序中(类似于专有实现的包装器) SQL)。

所以我有代码可以完成所有这些工作。问题是每次运行大约需要 4-5 小时,我需要将其缩短到 30 分钟。

我当前的代码严重依赖 asyncio。我用它来获得一些并发性,特别是在读取 json 文件时。我还开始分析我的代码,到目前为止已转向使用 orjson 读取每个文件中的数据,并在 cython 中重写我的每个解析器函数,以在这方面也获得一些改进。但是,我使用异步队列来回传递内容,我的分析器显示大量时间花费在

queue.get
queue.put
调用上。我还研究了 msgspec 来改进 json 数据的读取,虽然速度更快,但当我必须将
msgspec.Struct
对象发送到我的 cython 代码中并使用它们而不仅仅是字典时,它变得更慢。

所以只是希望获得一些关于如何改进这个过程的一般帮助。我读过有关 multiprocessing.pools 和并发.futures 的多处理,但这两者都比我当前的实现慢。我在想也许我需要改变通过队列传递内容的方式,所以我传递了每个文件的完整 json 数据,而不是每个单独的消息(每个消息大约 1,000 个文档),但这没有帮助。

我读过很多问题/答案,但似乎很多人都有非常统一的 json 数据(不是 15 种不同的消息类型)。我研究了批处理,但我不完全理解这如何改变事情 - 这就是我使用并发.futures 所做的,但实际上它又花了更长的时间。

总的来说,我想将其保留为队列,因为将来我想对流数据运行相同的过程,因此该部分将取代 json 读取,而是将通过流接收到的每条消息放入队列和其他一切都会一样工作。

下面包含一些伪代码。

主.py

import asyncio
from glob import glob
import orjson
from parser_dispatcher import ParserDispatcher
from sql_dispatcher import SqlDispatcher

async def load_json(file_path, queue):
    async with aiofiles.open(file_path, mode="rb") as f:
        data = await f.read()
        json_data = await asyncio.to_thread(orjson.loads(data))
        for msg in json_data:
            await queue.put(msg)

async def load_all_json_files(base_path, queue):
    file_list = glob(f"{base_path}/*.json")
    tasks = [load_json(file_path, queue) for file_path in file_list]
    await asyncio.gather(*tasks)
    await queue.put(None) # to end the processing

def main()
    base_path = "\path\to\json\folder"
    paser_queue = asyncio.queue()
    sql_queue = asyncio.queue()
    
    parser_dispatch = ParserDispatcher()
    sql_dispatch = SqlDispatcher()

    load_task = load_all_json_files(base_path, parser_queue)
    parser_task = parser_dispatch.process_queue(parser_queue, sql_queue)
    sql_task = sql_dispatch.process_queue(sql_queue)

    await asyncio.gather(load_task, erdp_task, sqlr_task)

if __name__ -- "__main__":
    asyncio.run(main))

parser_dispatcher.py

import asyncio
import message_parsers as mp

class ParserDispatcher:
    def __init__(self):
        self.parsers = {
            ("1", "2", "3"): mp.parser1,
            .... etc
        } # this is a dictionary where keys are tuples and values are the parser functions

    def dispatch(self, msg):
        parser_key = tuple(msg.get("type"), msg.get("source"), msg.get("channel"))
        parser = self.parsers.get(parser_key)
        if parser:
            new_msg = parser(msg)
        else:
            new_msg = []
        return new_msg
    
    async def process_queue(self, parser_queue, sql_queue):
        while True:
            msg = await process_queue.get()
            if msg is None:
                await sql_put.put(None)
            process_queue.task_done()
            parsed_messages = self.dispatch(msg)
            for parsed_message in parsed_messages:
                await sql_queue.put(parsed_message)

sql_dispatcher.py

import asycnio
import proprietarySqlLibrary as sql

class SqlDispatcher:
    def __init__(self):
        # do all the connections to the DB in here

    async def process_queue(self, sql_queue):
        while True:
            msg = await sql_queue.get()
            # then go through and add this data to the DB
            # this part is also relatively slow but I'm focusing on the first half for now
            # since I don't have control over the DB stuff

python json python-asyncio python-multiprocessing
1个回答
0
投票

我的第一直觉是一台机器的并行处理不会比单线程快很多。我猜大部分时间都花在处理数据上,而不是从磁盘读取数据(您可以通过运行编程并注释掉处理数据的部分,只留下读取的部分来证明/反驳这一点)。

这个问题可能与您的问题相关。 Python 3.13 引入了某些功能来解决 GIL 的性能限制。

如果您可以访问多台机器,您可能会考虑使用队列将一些处理外包给多台机器(我喜欢https://python-rq.org/)。即使这不能让您一路回家,队列中的作业也可能会读取原始文件并将预先消化的文件写回磁盘,从而允许您的程序处理预先消化的文件,理论上这些文件应该比当前程序运行得更快.

© www.soinside.com 2019 - 2024. All rights reserved.