我正在开发一个音频应用程序,其中“音频循环”中的所有函数执行时间都需要 << 1ms. I am aware that Python is not the right programming language for this task, however, Python has come a long way and I am confident that with the right tips and tweaks I can get it to work.
目前我正在研究如何在线程之间传递数据,我发现了一些奇怪的结果。
我运行以下基准程序来测试不同的方法:
import multiprocessing
import threading
import queue
import numpy as np
import time
SIZE = 2048
myarray1 = np.ones(SIZE)
myarray2 = np.ones(SIZE)
def test_multiprocessing(num: int, put: list, get: list):
# init
shared_array = multiprocessing.Array('f', SIZE, lock=True)
for _ in range(nums):
starttime = time.perf_counter()
# put
with shared_array.get_lock():
np.copyto(np.frombuffer(shared_array.get_obj(), dtype=np.float32), myarray1)
enddtime = time.perf_counter()
put.append(enddtime-starttime)
starttime = time.perf_counter()
# get
with shared_array.get_lock():
np.copyto(myarray2, np.frombuffer(shared_array.get_obj(), dtype=np.float32))
enddtime = time.perf_counter()
get.append(enddtime-starttime)
def test_threading_copy(num: int, put: list, get: list):
# init
free = threading.Semaphore(value=1)
used = threading.Semaphore(value=0)
transfer = np.empty(SIZE)
for _ in range(nums):
starttime = time.perf_counter()
# put
free.acquire()
np.copyto(transfer, myarray1)
used.release()
enddtime = time.perf_counter()
put.append(enddtime-starttime)
starttime = time.perf_counter()
# get
used.acquire()
np.copyto(myarray2, transfer)
free.release()
enddtime = time.perf_counter()
get.append(enddtime-starttime)
def test_queue(num: int, put: list, get: list):
# init
q = queue.Queue(maxsize=1)
for _ in range(num):
starttime = time.perf_counter()
# put
q.put(myarray1)
enddtime = time.perf_counter()
put.append(enddtime-starttime)
starttime = time.perf_counter()
# get
myarray2 = q.get()
enddtime = time.perf_counter()
get.append(enddtime-starttime)
if __name__ == "__main__":
nums = int(1e6)
for test in [test_multiprocessing, test_threading_copy, test_queue]:
put = []; get = []
test(nums, put, get)
print("results:")
print(f"\tput_avg = {sum(put) / len(put)}")
print(f"\tput_max = {max(put)}")
print(f"\tget_avg = {sum(get) / len(get)}")
print(f"\tget_max = {max(get)}")
我机器上的结果看起来像这样:
results:
put_avg = 3.930823400122108e-06
put_max = 0.002819699999918157
get_avg = 3.895689899812624e-06
get_max = 0.0016344000000572123
results:
put_avg = 3.603283000182273e-06
put_max = 0.007975700000088182
get_avg = 3.501774700153874e-06
get_max = 0.010190099999817903
results:
put_avg = 1.4336647000006905e-06
put_max = 0.0008001000001058856
get_avg = 1.2777225000797898e-06
get_max = 0.00023200000009637733
平均时间对于我的应用来说完全没问题,但最大值却让我烦恼,因为它们高于或接近 1 毫秒。除了 test_queue 示例之外,我的代码都没有分配新内存。
- 您知道为什么会发生这种情况吗?
许多事情都在您的系统上同时运行。 系统在它们之间共享可用资源(CPU、内存带宽、设备 I/O)。
您的脚本对系统资源没有无条件的第一优先权。 事实上,可能有一些偶尔的任务在运行时具有更高的优先级,并且很多事情具有相同的优先级。 每隔一段时间,您的一次传输就必须等待相对较长的时间,系统才能执行其他操作。 如果您需要保证脚本可能需要等待多长时间才能执行其中一项传输,那么您需要适当利用实时操作系统的功能。
- 我该如何解决这个问题,或者加快代码速度?
您可能无法阻止偶尔出现的运行时间峰值,除非您准备安装 RT 操作系统并在那里运行代码。 对于这样的答案,您需要做什么的细节有点太宽泛了。
有了足够的系统权限,您也许能够提高给定正在运行的作业的优先级。 这可能对一些人有帮助。 详细信息取决于您的操作系统。
加速 Python 代码的通常答案是使用本机代码来运行较慢的部分。
- 是否有一些常规的 Python 设置可以防止这种行为?
我不这么认为。 您观察到的尖峰并不是 Python 特有的。
- 你会如何调试这个?
我不会。 你所观察到的情况对我来说似乎并不异常。