如何用pathos/ppft进行进程间通信?

问题描述 投票:0回答:1

我正在使用

pathos
框架在不同的进程中同时执行任务。在底层,这是通过
ppft
完成的,它是
pathos
的一部分。我当前的方法使用
pathos.multiprocessing.ProcessPool
实例。使用
apipe()
将多个作业提交给它,一次一个,不会阻塞。然后,主要流程是使用
ready()
get()
来监督工作进程。

这工作正常,但工作进程的结果只有在完成后才会收到(进程已结束)。但是,我需要一种方法来从工作进程中获取中间结果。我在

pathos
/
ppft
的文档中找不到任何关于此的明确信息,但从它们包含的提示来看,很明显,这应该可以通过它们的功能实现。如何与
pathos
/
ppft
结合
ProcessPool
进行进程间通信?

以下演示代码说明了我的方法。如何将中间结果发送到主进程?例如,报告到目前为止找到的素数列表,每次其长度是 100 的倍数?

#!/usr/bin/env python3

import pathos


def worker_func(limit):
    """
    Dummy task: Find list of all prime numbers smaller than than "limit".
    """
    return limit, [
        num for num in range(2, limit) \
        if all(num % i != 0 for i in range(2, num))
    ]


pool = pathos.pools.ProcessPool()

jobs = []
jobs.append(pool.apipe(worker_func, 10000))
jobs.append(pool.apipe(worker_func, 15000))
jobs.append(pool.apipe(worker_func, 20000))

count_done_jobs = 0
while count_done_jobs < len(jobs):
    for job_idx, job in enumerate(jobs):
        if job is not None and job.ready():
            limit, primes = job.get()
            jobs[job_idx] = None
            count_done_jobs += 1
            print("Job {}: There are {} primes smaller than {}." \
                .format(job_idx, len(primes), limit))
python ipc dill pathos ppft
1个回答
0
投票

好吧,我明白了。我意识到

pathos
使用
multiprocess
作为
ProcessPool
,而
multiprocess
是官方 Python 模块
multiprocessing
的一个分支,只有细微的差别(在 here 中提到过)。因此,您可以通过
multiprocessing
方式进行进程间通信,Python 文档中给出了一个示例,但使用 multiprocess.Pipe
 对象而不是 
multiprocessing.Pipe
 对象。

将此应用到我的示例代码中:

#!/usr/bin/env python3 import pathos import multiprocess as mp WAIT_TIMEOUT_SECS = 1.0 def worker_func(worker_no, conn, limit): """ Dummy task: Count all prime numbers smaller than than "limit". """ count = 0 for num in range(2, limit): if all(num % i != 0 for i in range(2, num)): count += 1 # Report progress to main process now and then if num % (limit//10) == 0: percent_done = int(num/limit*100) conn.send([worker_no, percent_done, count, num]) return limit, count pool = pathos.pools.ProcessPool() # Make workers workers, parent_pipes = [], [] for worker_no, limit in enumerate([100000, 150000, 200000]): parent_conn, child_conn = mp.Pipe() workers.append(pool.apipe( worker_func, worker_no, child_conn, limit )) parent_pipes.append(parent_conn) # Watch workers count_done_workers = 0 while count_done_workers < len(workers): # Check for done workers for worker_no, worker in enumerate(workers): if worker is not None and worker.ready(): limit, primes_count = worker.get() workers[worker_no] = None parent_pipes[worker_no] = None count_done_workers += 1 print("Worker {} has completed: There are {} primes smaller than {}." \ .format(worker_no, primes_count, limit)) # Read progress reportings objects_ready = mp.connection.wait( [p for p in parent_pipes if p is not None], WAIT_TIMEOUT_SECS ) for obj in objects_ready: worker_no, percent_done, count, num = obj.recv() print("Worker {} is {}% done: There are {} primes smaller than {}." \ .format(worker_no, percent_done, count, num))
    
© www.soinside.com 2019 - 2024. All rights reserved.