Python 线程之间的快速通信

问题描述 投票:0回答:1

我正在开发一个音频应用程序,其中“音频循环”中的所有函数执行时间都需要 << 1ms. I am aware that Python is not the right programming language for this task, however, Python has come a long way and I am confident that with the right tips and tweaks I can get it to work.

目前我正在研究如何在线程之间传递数据,我发现了一些奇怪的结果。

我运行以下基准程序来测试不同的方法:

import multiprocessing
import threading
import queue
import numpy as np
import time

SIZE = 2048
myarray1 = np.ones(SIZE)
myarray2 = np.ones(SIZE)

def test_multiprocessing(num: int, put: list, get: list):
  # init
  shared_array = multiprocessing.Array('f', SIZE, lock=True)
  for _ in range(nums):
    starttime = time.perf_counter()
    # put
    with shared_array.get_lock():
      np.copyto(np.frombuffer(shared_array.get_obj(), dtype=np.float32), myarray1)
    enddtime = time.perf_counter()
    put.append(enddtime-starttime)

    starttime = time.perf_counter()
    # get
    with shared_array.get_lock():
      np.copyto(myarray2, np.frombuffer(shared_array.get_obj(), dtype=np.float32))
    enddtime = time.perf_counter()
    get.append(enddtime-starttime)

def test_threading_copy(num: int, put: list, get: list):
  # init
  free = threading.Semaphore(value=1)
  used = threading.Semaphore(value=0)
  transfer = np.empty(SIZE)
  for _ in range(nums):
    
    starttime = time.perf_counter()
    # put
    free.acquire()
    np.copyto(transfer, myarray1)
    used.release()
    enddtime = time.perf_counter()
    put.append(enddtime-starttime)

    starttime = time.perf_counter()
    # get
    used.acquire()
    np.copyto(myarray2, transfer)
    free.release()
    enddtime = time.perf_counter()
    get.append(enddtime-starttime)

def test_queue(num: int, put: list, get: list):
  # init
  q = queue.Queue(maxsize=1)
  for _ in range(num):
    starttime = time.perf_counter()
    # put
    q.put(myarray1)
    enddtime = time.perf_counter()
    put.append(enddtime-starttime)
    
    starttime = time.perf_counter()
    # get
    myarray2 = q.get()
    enddtime = time.perf_counter()
    get.append(enddtime-starttime)

if __name__ == "__main__":
  nums = int(1e6)
  for test in [test_multiprocessing, test_threading_copy, test_queue]:
    put = []; get = []
    test(nums, put, get)
    print("results:")
    print(f"\tput_avg = {sum(put) / len(put)}")
    print(f"\tput_max = {max(put)}")
    print(f"\tget_avg = {sum(get) / len(get)}")
    print(f"\tget_max = {max(get)}")

我机器上的结果看起来像这样:

results:
        put_avg = 3.930823400122108e-06
        put_max = 0.002819699999918157
        get_avg = 3.895689899812624e-06
        get_max = 0.0016344000000572123
results:
        put_avg = 3.603283000182273e-06
        put_max = 0.007975700000088182
        get_avg = 3.501774700153874e-06
        get_max = 0.010190099999817903
results:
        put_avg = 1.4336647000006905e-06
        put_max = 0.0008001000001058856
        get_avg = 1.2777225000797898e-06
        get_max = 0.00023200000009637733

平均时间对于我的应用来说完全没问题,但最大值却让我烦恼,因为它们高于或接近 1 毫秒。除了 test_queue 示例之外,我的代码都没有分配新内存。

  • 您知道为什么会发生这种情况吗?
  • 我该如何解决这个问题,或者加快代码速度?
  • 是否有一些常规的 Python 设置可以防止这种行为?
  • 你会如何调试这个?
python multithreading performance multiprocessing low-latency
1个回答
0
投票
  • 您知道为什么会发生这种情况吗?

许多事情都在您的系统上同时运行。 系统在它们之间共享可用资源(CPU、内存带宽、设备 I/O)。

您的脚本对系统资源没有无条件的第一优先权。 事实上,可能有一些偶尔的任务在运行时具有更高的优先级,并且很多事情具有相同的优先级。 每隔一段时间,您的一次传输就必须等待相对较长的时间,系统才能执行其他操作。 如果您需要保证脚本可能需要等待多长时间才能执行其中一项传输,那么您需要适当利用实时操作系统的功能。

  • 我该如何解决这个问题,或者加快代码速度?

您可能无法阻止偶尔出现的运行时间峰值,除非您准备安装 RT 操作系统并在那里运行代码。 对于这样的答案,您需要做什么的细节有点太宽泛了。

有了足够的系统权限,您也许能够提高给定正在运行的作业的优先级。 这可能对一些人有帮助。 详细信息取决于您的操作系统。

加速 Python 代码的通常答案是使用本机代码来运行较慢的部分。

  • 是否有一些常规的 Python 设置可以防止这种行为?

我不这么认为。 您观察到的尖峰并不是 Python 特有的。

  • 你会如何调试这个?

我不会。 你所观察到的情况对我来说似乎并不异常。

© www.soinside.com 2019 - 2024. All rights reserved.