我的目标是在一个进程中获取数据,并使用两个并行运行的独立进程来分析这些数据。
在提供的最小示例中,初始过程(每 1 秒)生成由三个数组组成的数据:
array1
、array2
和 array3
。随后,另外两个进程会分析这些数组。
我正在寻求确认这种方法的正确性,特别是在数据分析方面。最好在这里分析数据吗:
# This is where I would do some processing on the data
?
from multiprocessing import shared_memory, Process, Lock, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration):
for i in range(10):
with lock:
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
np_array1[:] = np.random.randint(0, 1000, np_array1.shape)
np_array2[:] = np.random.randint(0, 1000, np_array2.shape)
np_array3[:] = np.random.randint(0, 1000, np_array3.shape)
existing_shm.close()
new_value_flag1.value = 1
new_value_flag2.value = 1
iteration.value = i
time.sleep(1)
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, lock, new_value_flag1, iteration):
while True:
if new_value_flag1.value == 1:
with lock:
print('Start consumer1',time.time())
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
print(f"consumer 1, Iteration {iteration.value}:")
existing_shm.close()
new_value_flag1.value = 0
print('Stop consumer1',time.time())
# This is where I would do some processing on the data
print(np_array1.mean())
print(np_array2.mean())
print(np_array3.mean())
time.sleep(0.01)
def consumer2(n, shape1, shape2, shape3, lock, new_value_flag2, iteration):
while True:
if new_value_flag2.value == 1:
with lock:
print('Start consumer2',time.time())
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
print(f"consumer 2, Iteration {iteration.value}:")
existing_shm.close()
new_value_flag2.value = 0
print('Stop consumer2',time.time())
# This is where I would do some processing on the data
print(np_array1.mean())
print(np_array2.mean())
print(np_array3.mean())
time.sleep(0.01)
if __name__ == '__main__':
# assume we have 3 arrays of different sizes (float32)
shape1 = (2000, 20000)
shape2 = (2000, 30000)
shape3 = (2000, 40000)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
lock = Lock()
new_value_flag1 = Value('i', 0)
new_value_flag2 = Value('i', 0)
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag2, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
编辑在第一条评论之后,我有一个新代码:
from multiprocessing import shared_memory, Process, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
for i in range(100):
if new_value_flag1.value ==0 and new_value_flag2.value == 0:
start_time = time.time()
np_array1[:] = np.random.randint(0, 255, np_array1.shape)
np_array2[:] = np.random.randint(0, 255, np_array2.shape)
np_array3[:] = np.random.randint(0, 255, np_array3.shape)
new_value_flag1.value = 1
new_value_flag2.value = 1
iteration.value = i
print('producer', i, time.time()-start_time)
time.sleep(0.5)
existing_shm.close()
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_value_flag1, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_value_flag1.value == 1:
# This is where I would do some processing on the data
print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_value_flag1.value = 0
time.sleep(0.01)
existing_shm.close()
# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_value_flag2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_value_flag2.value == 1:
# This is where I would do some processing on the data
print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_value_flag2.value = 0
time.sleep(0.01)
existing_shm.close()
if __name__ == '__main__':
# assume we have 3 arrays of different sizes
shape1 = (200, 200)
shape2 = (200, 300)
shape3 = (200, 400)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
new_value_flag1 = Value('i', 0)
new_value_flag2 = Value('i', 0)
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_value_flag1, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_value_flag2, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
编辑2 无需不必要的映射和取消映射 + 使用
Event
from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
for i in range(100):
if not new_values_event_1.is_set() and not new_values_event_2.is_set():
start_time = time.time()
np_array1[:] = np.random.randint(0, 255, np_array1.shape)
np_array2[:] = np.random.randint(0, 255, np_array2.shape)
np_array3[:] = np.random.randint(0, 255, np_array3.shape)
new_values_event_1.set()
new_values_event_2.set()
iteration.value = i
print('producer', i, time.time()-start_time)
time.sleep(0.5)
existing_shm.close()
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_values_event_1, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_values_event_1.is_set():
# This is where I would do some processing on the data
print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_values_event_1.clear()
time.sleep(0.01)
existing_shm.close()
# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_values_event_2, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
if new_values_event_2.is_set():
# This is where I would do some processing on the data
print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
new_values_event_2.clear()
time.sleep(0.01)
existing_shm.close()
if __name__ == '__main__':
# assume we have 3 arrays of different sizes
shape1 = (50, 50)
shape2 = (50, 50)
shape3 = (50, 50)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
new_values_event_1 = Event()
new_values_event_2 = Event()
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_values_event_1, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_values_event_2, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
编辑3 无需不必要的映射和取消映射 + 使用
Event
+ 删除不必要的轮询
from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time
# create a shared memory and write to it (producer
def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
for i in range(100):
event_consumer1.wait() # Wait until consumer 1 is ready
event_consumer1.clear() # Reset the event
event_consumer2.wait() # Wait until consumer 2 is ready
event_consumer2.clear() # Reset the event
start_time = time.time()
np_array1[:] = np.random.randint(0, 255, np_array1.shape)
np_array2[:] = np.random.randint(0, 255, np_array2.shape)
np_array3[:] = np.random.randint(0, 255, np_array3.shape)
iteration.value = i
print('producer', i, time.time()-start_time)
event_producer.set() # Signal the consumers that new data is available
time.sleep(2) # delay to simulate the time at which the data is produced
existing_shm.close()
# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
event_producer.wait() # Wait until the producer has produced new data
event_producer.clear() # Reset the event
# This is where I would do some processing on the data
print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
time.sleep(0.1) # delay to simulate the time at which the data is processed
event_consumer1.set() # Signal the producer that the data has been processed
existing_shm.close()
# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration):
existing_shm = shared_memory.SharedMemory(name=n)
np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
while True:
event_producer.wait() # Wait until the producer has produced new data
event_producer.clear() # Reset the event
# This is where I would do some processing on the data
print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
time.sleep(0.1) # delay to simulate the time at which the data is processed
event_consumer2.set() # Signal the producer that the data has been processed)
existing_shm.close()
if __name__ == '__main__':
# assume we have 3 arrays of different sizes
shape1 = (5000, 50)
shape2 = (5000, 50)
shape3 = (5000, 50)
total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
shm = shared_memory.SharedMemory(create=True, size= total_size)
event_consumer1 = Event()
event_consumer2 = Event()
event_consumer1.set() # Set the event to allow the producer to start
event_consumer2.set() # Set the event to allow the producer to start
event_producer = Event()
iteration = Value('i', 0)
p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_consumer2,event_producer , iteration))
p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer, iteration))
p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer, iteration))
p2.start()
p3.start()
time.sleep(2) # delay to make sure the consumer processes are ready
p1.start()
p2.join()
p3.join()
p1.join()
# I know I have to crtl-c to stop the program
#shm.close()
#shm.unlink()
共享内存是一种方法。但我可能会使用队列与消费者进程进行通信。这样,您不需要任何标志来同步消费者。这些消费者将闲置直到有新数据可用。它还允许将分析结果发送回生产者(如果需要)。
查看 python 文档以获取如何使用队列的示例: https://docs.python.org/3/library/multiprocessing.html#examples
我会使用从生产者到每个消费者的单独队列。否则,您可以将 3 个数组放入一个元组中,并将该元组两次添加到两个消费者都可以访问的队列中。假设分析花费的时间更长,则后续元组很可能由不同的消费者处理。