在一个进程中获取数据并使用两个单独的进程(在Python中,多处理)分析这些数据

问题描述 投票:0回答:1

我的目标是在一个进程中获取数据,并使用两个并行运行的独立进程来分析这些数据。

在提供的最小示例中,初始过程(每 1 秒)生成由三个数组组成的数据:

array1
array2
array3
。随后,另外两个进程会分析这些数组。

我正在寻求确认这种方法的正确性,特别是在数据分析方面。最好在这里分析数据吗:

# This is where I would do some processing on the data

from multiprocessing import shared_memory, Process, Lock, Value
import numpy as np
import time


# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration):
    for i in range(10):
        with lock:
            existing_shm = shared_memory.SharedMemory(name=n)
            np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
            np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
            np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
            np_array1[:] = np.random.randint(0, 1000, np_array1.shape)
            np_array2[:] = np.random.randint(0, 1000, np_array2.shape)
            np_array3[:] = np.random.randint(0, 1000, np_array3.shape)
            existing_shm.close()
            new_value_flag1.value = 1
            new_value_flag2.value = 1
            iteration.value = i
        time.sleep(1)

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, lock, new_value_flag1, iteration):
    while True:
        if new_value_flag1.value == 1:
            with lock:
                print('Start consumer1',time.time())
                existing_shm = shared_memory.SharedMemory(name=n)
                np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
                np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
                np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
                print(f"consumer 1, Iteration {iteration.value}:")
                existing_shm.close()
                new_value_flag1.value = 0
                print('Stop consumer1',time.time())
            # This is where I would do some processing on the data
            print(np_array1.mean())
            print(np_array2.mean())
            print(np_array3.mean())
        time.sleep(0.01)

def consumer2(n, shape1, shape2, shape3, lock, new_value_flag2, iteration):
    while True:
        if new_value_flag2.value == 1:
            with lock:
                print('Start consumer2',time.time())
                existing_shm = shared_memory.SharedMemory(name=n)
                np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
                np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
                np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
                print(f"consumer 2, Iteration {iteration.value}:")
                existing_shm.close()
                new_value_flag2.value = 0
                print('Stop consumer2',time.time())
            # This is where I would do some processing on the data
            print(np_array1.mean())
            print(np_array2.mean())
            print(np_array3.mean())
        time.sleep(0.01)
        
if __name__ == '__main__':
    # assume we have 3 arrays of different sizes (float32)
    shape1 = (2000, 20000)
    shape2 = (2000, 30000)
    shape3 = (2000, 40000)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    lock = Lock()
    new_value_flag1 = Value('i', 0)
    new_value_flag2 = Value('i', 0)
    iteration = Value('i', 0)
    
    
    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

编辑在第一条评论之后,我有一个新代码:

from multiprocessing import shared_memory, Process, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
        if new_value_flag1.value ==0 and new_value_flag2.value == 0:
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            new_value_flag1.value = 1
            new_value_flag2.value = 1
            iteration.value = i
            print('producer', i, time.time()-start_time)
        time.sleep(0.5)
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_value_flag1, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_value_flag1.value == 1:
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_value_flag1.value = 0
        time.sleep(0.01)
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_value_flag2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_value_flag2.value == 1:
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_value_flag2.value = 0
        time.sleep(0.01)
    existing_shm.close()
    
if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (200, 200)
    shape2 = (200, 300)
    shape3 = (200, 400)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    new_value_flag1 = Value('i', 0)
    new_value_flag2 = Value('i', 0)
    iteration = Value('i', 0)
    
    
    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_value_flag1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_value_flag2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

编辑2 无需不必要的映射和取消映射 + 使用

Event

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time

# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
        if not new_values_event_1.is_set() and not new_values_event_2.is_set():
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            new_values_event_1.set()
            new_values_event_2.set()
            iteration.value = i
            print('producer', i, time.time()-start_time)
        time.sleep(0.5)
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_values_event_1, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_values_event_1.is_set():
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_values_event_1.clear()
        time.sleep(0.01)
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_values_event_2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_values_event_2.is_set():
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_values_event_2.clear()
        time.sleep(0.01)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (50, 50)
    shape2 = (50, 50)
    shape3 = (50, 50)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    new_values_event_1 = Event()
    new_values_event_2 = Event()
    iteration = Value('i', 0)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_values_event_1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_values_event_2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

编辑3 无需不必要的映射和取消映射 + 使用

Event
+ 删除不必要的轮询

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time


# create a shared memory and write to it (producer
def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
            event_consumer1.wait()  # Wait until consumer 1 is ready
            event_consumer1.clear()  # Reset the event
            event_consumer2.wait()  # Wait until consumer 2 is ready
            event_consumer2.clear()  # Reset the event
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            iteration.value = i
            print('producer', i, time.time()-start_time)
            event_producer.set()  # Signal the consumers that new data is available
            time.sleep(2) # delay to simulate the time at which the data is produced
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
            event_producer.wait()  # Wait until the producer has produced new data
            event_producer.clear()  # Reset the event
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            time.sleep(0.1) # delay to simulate the time at which the data is processed
            event_consumer1.set()  # Signal the producer that the data has been processed
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        event_producer.wait()  # Wait until the producer has produced new data
        event_producer.clear()  # Reset the event
        # This is where I would do some processing on the data
        print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
        time.sleep(0.1) # delay to simulate the time at which the data is processed
        event_consumer2.set()  # Signal the producer that the data has been processed)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (5000, 50)
    shape2 = (5000, 50)
    shape3 = (5000, 50)


    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    event_consumer1 = Event()
    event_consumer2 = Event()
    event_consumer1.set()  # Set the event to allow the producer to start
    event_consumer2.set()  # Set the event to allow the producer to start
    event_producer = Event()
    iteration = Value('i', 0)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3,  event_consumer1, event_consumer2,event_producer , iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer,  iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()
python multiprocessing data-acquisition
1个回答
0
投票

共享内存是一种方法。但我可能会使用队列与消费者进程进行通信。这样,您不需要任何标志来同步消费者。这些消费者将闲置直到有新数据可用。它还允许将分析结果发送回生产者(如果需要)。

查看 python 文档以获取如何使用队列的示例: https://docs.python.org/3/library/multiprocessing.html#examples

我会使用从生产者到每个消费者的单独队列。否则,您可以将 3 个数组放入一个元组中,并将该元组两次添加到两个消费者都可以访问的队列中。假设分析花费的时间更长,则后续元组很可能由不同的消费者处理。

© www.soinside.com 2019 - 2024. All rights reserved.