如何并行运行不同处理速率的函数

问题描述 投票:0回答:1

我见过很多 python 异步和多处理工具(ray、asyncio、Process、Thread) 我不确定哪一个(如果有的话)最适合我想要做的事情。

基本上,我正在使用计算机传入的数据实时更新动画。数据以 10KHz(100us) 的速率传入,我有一个函数来读取它。同时,我有一个以 60fps 运行动画的函数。动画将以 60fps 标记拍摄数据快照以更新下一帧。

问题在于,一些用于运行动画的行速度“慢”,需要超过 100us 的时间来处理,导致当代码返回读取时数据流变得混乱。 python 中是否有某种东西允许我在后台静默运行和发布 readData 函数,同时让动画函数运行并在需要获取下一帧的数据时监听流?

python asynchronous parallel-processing multiprocessing
1个回答
0
投票

由于我对视频不太了解,所以这篇文章可能离题很远。但在我看来,如果您的数据(“帧”)每 1 微秒到达一次,并且您希望以每秒 60 帧的速度制作动画,那么您希望每隔 1666.6667 帧显示一次。这些帧编号(其中第一帧是帧编号 1)将为 16666、333333、50000、66666、83333、100000 等。这意味着一个进程(“读取器”)每隔 1 就会将一个新帧放入队列中/60秒为其他进程,“动画师”得到。

这可以按照以下模拟来完成。

from multiprocessing import Process, Value, Queue
import itertools

def reader(clock, queue):
    # This will average out to every other 16666.6667 frames:
    frame_to_send_modulos = itertools.cycle((16666, 16667, 16667))
    frame_to_send_modulo = next(frame_to_send_modulos)
    frame_number = 0
    frames_sent = 0
    skip_count = 0

    def frame_arrived(frame):
        """This gets invoked every microsecond with a new frame"""

        nonlocal frame_to_send_modulo, frame_number, skip_count, frames_sent

        frame_number += 1
        skip_count += 1
        if skip_count % frame_to_send_modulo == 0:
            #print(frame_number)
            queue.put(frame)
            frames_sent += 1
            # Reset for next frame
            skip_count = 0
            frame_to_send_modulo = next(frame_to_send_modulos)

    # Simulate a new frame arriving every microsecond
    # Send 600 frames:
    while frames_sent < 600:
        clock.value += 1 # Advance clock 1 microsecond
        frame_arrived(1) # For simulation, use any value
    queue.put(None) # Sentinel to tell animator we are done

def animator(clock, queue):
    # Animate until we get a sentinel
    frame_count = 0
    for frame in iter(queue.get, None):
        # Animate:
        #print(frame)
        frame_count += 1
    v = clock.value # Microseconds
    seconds = v / 1_000_000
    print(f'{frame_count} frames in {seconds} seconds. The frame rate is {frame_count / seconds} fps.')

if __name__ == '__main__':
    clock = Value('i', 0, lock=False)
    queue = Queue()

    p1 = Process(target=reader, args=(clock, queue))
    p2 = Process(target=animator, args=(clock, queue))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

打印:

600 frames in 10.0 seconds. The frame rate is 60.0 fps.
© www.soinside.com 2019 - 2024. All rights reserved.