pathos
框架在不同的进程中同时执行任务。在底层,这是通过 ppft
完成的,它是 pathos
的一部分。我当前的方法使用 pathos.multiprocessing.ProcessPool
实例。使用 apipe()
将多个作业提交给它,一次一个,不会阻塞。然后,主要流程是使用 ready()
和 get()
来监督工作进程。
这工作正常,但工作进程的结果只有在完成后才会收到(进程已结束)。但是,我需要一种方法来从工作进程中获取中间结果。我在
pathos
/ppft
的文档中找不到任何关于此的明确信息,但从它们包含的提示来看,很明显,这应该可以通过它们的功能实现。如何与 pathos
/ppft
结合 ProcessPool
进行进程间通信?
以下演示代码说明了我的方法。如何将中间结果发送到主进程?例如,报告到目前为止找到的素数列表,每次其长度是 100 的倍数?
#!/usr/bin/env python3
import pathos
def worker_func(limit):
"""
Dummy task: Find list of all prime numbers smaller than than "limit".
"""
return limit, [
num for num in range(2, limit) \
if all(num % i != 0 for i in range(2, num))
]
pool = pathos.pools.ProcessPool()
jobs = []
jobs.append(pool.apipe(worker_func, 10000))
jobs.append(pool.apipe(worker_func, 15000))
jobs.append(pool.apipe(worker_func, 20000))
count_done_jobs = 0
while count_done_jobs < len(jobs):
for job_idx, job in enumerate(jobs):
if job is not None and job.ready():
limit, primes = job.get()
jobs[job_idx] = None
count_done_jobs += 1
print("Job {}: There are {} primes smaller than {}." \
.format(job_idx, len(primes), limit))
好吧,我明白了。我意识到
pathos
使用 multiprocess
作为 ProcessPool
,而 multiprocess
是官方 Python 模块 multiprocessing
的一个分支,只有细微的差别(在 here 中提到过)。因此,您可以通过 multiprocessing
方式进行进程间通信,Python 文档中给出了一个示例,但使用 multiprocess.Pipe
对象而不是
multiprocessing.Pipe
对象。将此应用到我的示例代码中:
#!/usr/bin/env python3
import pathos
import multiprocess as mp
WAIT_TIMEOUT_SECS = 1.0
def worker_func(worker_no, conn, limit):
"""
Dummy task: Count all prime numbers smaller than than "limit".
"""
count = 0
for num in range(2, limit):
if all(num % i != 0 for i in range(2, num)):
count += 1
# Report progress to main process now and then
if num % (limit//10) == 0:
percent_done = int(num/limit*100)
conn.send([worker_no, percent_done, count, num])
return limit, count
pool = pathos.pools.ProcessPool()
# Make workers
workers, parent_pipes = [], []
for worker_no, limit in enumerate([100000, 150000, 200000]):
parent_conn, child_conn = mp.Pipe()
workers.append(pool.apipe(
worker_func,
worker_no,
child_conn,
limit
))
parent_pipes.append(parent_conn)
# Watch workers
count_done_workers = 0
while count_done_workers < len(workers):
# Check for done workers
for worker_no, worker in enumerate(workers):
if worker is not None and worker.ready():
limit, primes_count = worker.get()
workers[worker_no] = None
parent_pipes[worker_no] = None
count_done_workers += 1
print("Worker {} has completed: There are {} primes smaller than {}." \
.format(worker_no, primes_count, limit))
# Read progress reportings
objects_ready = mp.connection.wait(
[p for p in parent_pipes if p is not None],
WAIT_TIMEOUT_SECS
)
for obj in objects_ready:
worker_no, percent_done, count, num = obj.recv()
print("Worker {} is {}% done: There are {} primes smaller than {}." \
.format(worker_no, percent_done, count, num))