有什么方法可以加速Python中多处理处理程序和主线程之间的缓冲区交换吗?

问题描述 投票:0回答:1

我想实现一个句柄函数来接收消息并将其共享到主线程以进行进一步处理。处理程序和主线程上的接收时间之间的延迟非常大。我尝试过 collection.deque 和 manager.Queue 作为缓冲区,但它们看起来是一样的。实现是这样的:

def handler(msg):
    global msgBuff
    dict_msg=pickle.loads(msg.data)
    timer=time.time()
    print("handler latency",timer-dict_msg['timestamp'])
    msgBuff.append((dict_msg,time.time()))

if __name__ == "__main__":
    global msgBuff
    #manager = multiprocessing.Manager()
    #msgBuff = manager.Queue()
    msgBuff=collections.deque()
    msg_receiver(handler) # a multiprocessing function to receive message  
    while 1:
        if len(msgBuff)==0:
            time.sleep(0.001)
            continue
        dict_msg,rec_time=msgBuff.popleft()
        print("main latency",time.time()-dict_msg['timestamp'],time.time()-rec_time)

这是输出

handler latency 0.008012533187866211
main latency 0.022023439407348633 0.014010906219482422
handler latency 0.007892608642578125
main latency 0.023406028747558594 0.015513420104980469
handler latency 0.007999897003173828
main latency 0.02299976348876953 0.013998270034790039
handler latency 0.006999969482421875
main latency 0.02259969711303711 0.015599727630615234
handler latency 0.009000301361083984

有什么改进建议吗?

python queue handler
1个回答
0
投票

您向我们展示了某些阶段经过的时间 处理时间大约为 7、14 或 21 毫秒。 看来序列化/反序列化你的数据大约需要 7 毫秒, 而且你不止一次这样做。

缩小

msg.data
的大小将允许更快的 {ser,deser}ialize。 考虑传递数据文件名而不是原始数据。

Pickle() 有时会引入比您预期更多的依赖项。 考虑使用替代格式,可能是 JSON 或 CSV,这样可以 您可以方便地检查数据以验证其是否是您期望的数量。

序列化和反序列化会花费时间。 考虑使用像 pyarrow 这样的二进制格式。 甚至可以实现零拷贝切换 进入管道的下一阶段。

© www.soinside.com 2019 - 2024. All rights reserved.