如何在子守护进程中初始化 Pool(),但只能初始化一次?

问题描述 投票:0回答:1

在 python

3.11
Ubuntu
上,我有一个任务在每个时间间隔(不是
asyncio
)初始化异步调用,并且在子进程内部执行多处理任务。我有 36 个核心/72 个处理器。问题是,当我初始化 new
Pool(72)
时,需要 0.3 秒,这对于我的任务来说太多了,因为性能很重要。通过这篇文章Python进程池非守护进程?我发现了如何在池内创建新池(使用
NoDaemonProcess
)。但是如何只初始化一次子池呢?
concurrent.futures
对我来说不好,因为我做了测试,它比
multiprocessing
慢。

这是工作示例,我需要以某种方式进行修改以仅在子进程中初始化一次池。

parent pid=907058

2024-06-01 19:16:44.856839 start
2024-06-01 19:16:44.861229 sleep 4 sec
2024-06-01 19:16:44.861777 [907059] on_message(): 1
2024-06-01 19:16:44.866430 [907059] starting pool..
2024-06-01 19:16:44.867275 worker_function(), a=907059_1
2024-06-01 19:16:44.867373 worker_function(), a=907059_2
2024-06-01 19:16:44.867410 worker_function(), a=907059_3

2024-06-01 19:16:48.861738 start
2024-06-01 19:16:48.864965 sleep 4 sec
2024-06-01 19:16:48.865581 [907070] on_message(): 2
2024-06-01 19:16:48.870826 [907070] starting pool..
2024-06-01 19:16:48.871544 worker_function(), a=907070_1
2024-06-01 19:16:48.871638 worker_function(), a=907070_2
2024-06-01 19:16:48.871695 worker_function(), a=907070_3

2024-06-01 19:16:52.865456 long sleep..
2024-06-01 19:16:56.867489 end worker_function(), a=907059_1
2024-06-01 19:16:56.867657 end worker_function(), a=907059_3
2024-06-01 19:16:56.867666 end worker_function(), a=907059_2
2024-06-01 19:16:56.868269 [907059] pool ended
2024-06-01 19:16:56.870487 [907059] finished on_message(): 1
2024-06-01 19:17:00.871746 end worker_function(), a=907070_1
2024-06-01 19:17:00.871896 end worker_function(), a=907070_2
2024-06-01 19:17:00.871903 end worker_function(), a=907070_3
2024-06-01 19:17:00.872659 [907070] pool ended
2024-06-01 19:17:00.874545 [907070] finished on_message(): 2
2024-06-01 19:17:12.865676 finished

代码:

import os
import time
import traceback
from datetime import datetime
from multiprocessing import Pool
import multiprocessing.pool

# https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False
    @daemon.setter
    def daemon(self, value):
        pass

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)


class Message():

    def __init__(self):

        # self.pool_3 = Pool(3)
        pass

    def worker_function(self, a):

        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} worker_function(), a={a}")
        time.sleep(12)
        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} end worker_function(), a={a}")

        return None

    def on_message(self, message):

        try:
            pid = os.getpid()
            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")

            # I need to make code that here I don't init new Pool()
            # because my server has 72 logic processos and it takes 300ms to init
            # for my task it's super long, so I want to init Pool() once, but not everytime when calling on_message()

            # this could be possible solution
            # but it does not work, in __init__() the Pool(3) is not initing
            # res = self.pool_3.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
            # if I init Pool with self. here, I will get error
            # "Pool objects cannot be passed between processes or pickled"

            with Pool(3) as pool:
                print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
                res = pool.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
                print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")

            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] finished on_message(): {message}")
            # os.kill(pid, 9)
        except Exception as e:
            print(traceback.format_exc())
            print(e)



if __name__ == "__main__":

    print(f"parent pid={os.getpid()}")
    
    # https://stackoverflow.com/a/44719580/1802225 process.terminate()

    me = Message()
    
    for i in range(1, 3):
        print()
        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
        # starting pool non-daemonic to start pool inside
        pool = NestablePool(1)
        pool.starmap_async(me.on_message, [(i,)])

        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
        time.sleep(4)
    
    print()
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} long sleep..")
    time.sleep(20)
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} finished")
       
python python-3.x multithreading multiprocessing python-multiprocessing
1个回答
0
投票

您还没有真正展示如何在实际应用程序中调用

on_message
。该函数的名称表明当从某处收到新消息时会调用它。如果我们另外传递一个已预先分配且执行 CPU 密集型处理的
multiprocessing
池实例,为什么on_message 不能在 multithreading 池中执行?例如:

from multiprocessing.pool import Pool, ThreadPool
import time

def convert_char_to_integer(ch):
    time.sleep(.3)  # Simulate real processing
    return ord(ch)

def on_message(multiprocessing_pool, message):
    # Get the sum of each character of the message after it has been
    # converted to an integer:
    result = sum(multiprocessing_pool.map(convert_char_to_integer, message))
    print(f'{message!r}: {result}')

def await_next_message():
    yield 'Message 1'
    time.sleep(.1)
    yield 'Message 2'
    time.sleep(.1)
    yield 'Message 3'

def main():
    with Pool() as multiprocessing_pool, \
    ThreadPool(3) as multithreading_pool:
        for message in await_next_message():
            multithreading_pool.apply_async(on_message, args=(multiprocessing_pool, message))
        # Wait for all submitted tasks to complete
        multithreading_pool.close()
        multithreading_pool.join()

if __name__ == '__main__':
    main()

打印:

'Message 1': 790
'Message 2': 791
'Message 3': 792
© www.soinside.com 2019 - 2024. All rights reserved.