在 python
3.11
和 Ubuntu
上,我有一个任务在每个时间间隔(不是 asyncio
)初始化异步调用,并且在子进程内部执行多处理任务。我有 36 个核心/72 个处理器。问题是,当我初始化 new Pool(72)
时,需要 0.3 秒,这对于我的任务来说太多了,因为性能很重要。通过这篇文章Python进程池非守护进程?我发现了如何在池内创建新池(使用NoDaemonProcess
)。但是如何只初始化一次子池呢? concurrent.futures
对我来说不好,因为我做了测试,它比multiprocessing
慢。
这是工作示例,我需要以某种方式进行修改以仅在子进程中初始化一次池。
parent pid=907058
2024-06-01 19:16:44.856839 start
2024-06-01 19:16:44.861229 sleep 4 sec
2024-06-01 19:16:44.861777 [907059] on_message(): 1
2024-06-01 19:16:44.866430 [907059] starting pool..
2024-06-01 19:16:44.867275 worker_function(), a=907059_1
2024-06-01 19:16:44.867373 worker_function(), a=907059_2
2024-06-01 19:16:44.867410 worker_function(), a=907059_3
2024-06-01 19:16:48.861738 start
2024-06-01 19:16:48.864965 sleep 4 sec
2024-06-01 19:16:48.865581 [907070] on_message(): 2
2024-06-01 19:16:48.870826 [907070] starting pool..
2024-06-01 19:16:48.871544 worker_function(), a=907070_1
2024-06-01 19:16:48.871638 worker_function(), a=907070_2
2024-06-01 19:16:48.871695 worker_function(), a=907070_3
2024-06-01 19:16:52.865456 long sleep..
2024-06-01 19:16:56.867489 end worker_function(), a=907059_1
2024-06-01 19:16:56.867657 end worker_function(), a=907059_3
2024-06-01 19:16:56.867666 end worker_function(), a=907059_2
2024-06-01 19:16:56.868269 [907059] pool ended
2024-06-01 19:16:56.870487 [907059] finished on_message(): 1
2024-06-01 19:17:00.871746 end worker_function(), a=907070_1
2024-06-01 19:17:00.871896 end worker_function(), a=907070_2
2024-06-01 19:17:00.871903 end worker_function(), a=907070_3
2024-06-01 19:17:00.872659 [907070] pool ended
2024-06-01 19:17:00.874545 [907070] finished on_message(): 2
2024-06-01 19:17:12.865676 finished
代码:
import os
import time
import traceback
from datetime import datetime
from multiprocessing import Pool
import multiprocessing.pool
# https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
class Message():
def __init__(self):
# self.pool_3 = Pool(3)
pass
def worker_function(self, a):
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} worker_function(), a={a}")
time.sleep(12)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} end worker_function(), a={a}")
return None
def on_message(self, message):
try:
pid = os.getpid()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")
# I need to make code that here I don't init new Pool()
# because my server has 72 logic processos and it takes 300ms to init
# for my task it's super long, so I want to init Pool() once, but not everytime when calling on_message()
# this could be possible solution
# but it does not work, in __init__() the Pool(3) is not initing
# res = self.pool_3.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
# if I init Pool with self. here, I will get error
# "Pool objects cannot be passed between processes or pickled"
with Pool(3) as pool:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
res = pool.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] finished on_message(): {message}")
# os.kill(pid, 9)
except Exception as e:
print(traceback.format_exc())
print(e)
if __name__ == "__main__":
print(f"parent pid={os.getpid()}")
# https://stackoverflow.com/a/44719580/1802225 process.terminate()
me = Message()
for i in range(1, 3):
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
# starting pool non-daemonic to start pool inside
pool = NestablePool(1)
pool.starmap_async(me.on_message, [(i,)])
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
time.sleep(4)
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} long sleep..")
time.sleep(20)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} finished")
您还没有真正展示如何在实际应用程序中调用
on_message
。该函数的名称表明当从某处收到新消息时会调用它。如果我们另外传递一个已预先分配且执行 CPU 密集型处理的 multiprocessing池实例,为什么
on_message
不能在 multithreading 池中执行?例如:
from multiprocessing.pool import Pool, ThreadPool
import time
def convert_char_to_integer(ch):
time.sleep(.3) # Simulate real processing
return ord(ch)
def on_message(multiprocessing_pool, message):
# Get the sum of each character of the message after it has been
# converted to an integer:
result = sum(multiprocessing_pool.map(convert_char_to_integer, message))
print(f'{message!r}: {result}')
def await_next_message():
yield 'Message 1'
time.sleep(.1)
yield 'Message 2'
time.sleep(.1)
yield 'Message 3'
def main():
with Pool() as multiprocessing_pool, \
ThreadPool(3) as multithreading_pool:
for message in await_next_message():
multithreading_pool.apply_async(on_message, args=(multiprocessing_pool, message))
# Wait for all submitted tasks to complete
multithreading_pool.close()
multithreading_pool.join()
if __name__ == '__main__':
main()
打印:
'Message 1': 790
'Message 2': 791
'Message 3': 792