我正在使用map_async创建一个包含4个工作线程的池。并给它一个图像文件列表来处理[Set 1]。 有时,我需要取消之间的处理,以便我可以改为处理一组不同的文件[Set 2]。
所以一个示例情况是,我给了map_async 1000个文件来处理。然后想要在处理了大约200个文件后取消剩余作业的处理。 此外,我想在不破坏/终止池的情况下进行此取消。这可能吗?
我不想终止池,因为在Windows上重新创建池是一个缓慢的过程(因为它使用'spawn'而不是'fork')。我需要使用同一个池来处理一组不同的图像文件[Set 2] ..
# Putting job_set1 through processing. It may consist of 1000 images
cpu = multiprocessing.cpu_count()
pool = Pool(processes=cpu)
result = pool.map_async(job_set1, thumb_ts_list, chunksize=chunksize)
现在介于两者之间,我需要取消对此设置1的处理。并转到另一组(等待所有1000个图像完成处理不是一个选项,但我可以等待处理的当前图像完成)
<Somehow cancel processing of job_set1>
result = pool.map_async(job_set2, thumb_ts_list, chunksize=chunksize)
现在是时候了fundamental theorem of software engineering:虽然multiprocessing.Pool
没有提供取消作为一个功能,我们可以通过从精心制作的可迭代中读取Pool
来添加它。然而,有一个生成器,yield
s从列表中的值,但在某些信号上停止短路是不够的,因为Pool
急切地消耗任何给它的发生器。所以我们需要一个非常精心设计的可迭代。
Pool
我们需要的通用工具是一种仅在工作人员可用时才构建Pool
任务的方法(或者在构建它们需要很长时间的情况下,最多只能执行一项任务)。基本的想法是减慢Pool
的线程收集工作,只有在任务完成时才能提升信号量。 (我们知道这样的线程存在于imap_unordered
的可观察行为中。)
import multiprocessing
from threading import Semaphore
size=multiprocessing.cpu_count() # or whatever Pool size to use
# How many workers are waiting for work? Add one to buffer one task.
work=Semaphore(size)
def feed0(it):
it=iter(it)
try:
while True:
# Don't ask the iterable until we have a customer, in case better
# instructions become available:
work.acquire()
yield next(it)
except StopIteration: pass
work.release()
def feed(p,f,it):
import sys,traceback
iu=p.imap_unordered(f,feed0(it))
while True:
try: x=next(iu)
except StopIteration: return
except Exception: traceback.print_exception(*sys.exc_info())
work.release()
yield x
try
中的feed
可防止孩子们破坏信号量计数,但请注意,它不能防止父母的失败。
现在我们可以实时控制Pool
输入,制定任何调度策略都很简单。例如,这里有类似itertools.chain
的东西,但能够异步丢弃其中一个输入序列中的任何剩余元素:
import collections,queue
class Cancel:
closed=False
cur=()
def __init__(self): self.data=queue.Queue() # of deques
def add(self,d):
d=collections.deque(d)
self.data.put(d)
return d
def __iter__(self):
while True:
try: yield self.cur.popleft()
except IndexError:
self.cur=self.data.get()
if self.cur is None: break
@staticmethod
def cancel(d): d.clear()
def close(self): self.data.put(None)
这是线程安全的(至少在CPython中),尽管缺少锁定,因为像deque.clear
这样的操作在Python检查方面是原子的(我们不单独检查self.cur
是否为空)。
制作其中一个看起来像
pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
if happy_with(x): can.cancel(many) # straight onto few, then out
当然,add
s和close
本身可以在循环中。
multiprocessing
模块似乎没有取消的概念。当您有足够的结果时,您可以使用concurrent.futures.ProcessPoolExecutor
包装并取消待处理的期货。
这是一个从一组路径中挑选出10个JPEG的示例,并取消待处理的期货,同时让流程池可用:
import concurrent.futures
def interesting_path(path):
"""Gives path if is a JPEG else ``None``."""
with open(path, 'rb') as f:
if f.read(3) == b'\xff\xd8\xff':
return path
return None
def find_interesting(paths, count=10):
"""Yields count from paths which are 'interesting' by multiprocess task."""
with concurrent.futures.ProcessPoolExecutor() as pool:
futures = {pool.submit(interesting_path, p) for p in paths}
print ('Started {}'.format(len(futures)))
for future in concurrent.futures.as_completed(futures):
res = future.result()
futures.remove(future)
if res is not None:
yield res
count -= 1
if count == 0:
break
cancelled = 0
for future in futures:
cancelled += future.cancel()
print ('Cancelled {}'.format(cancelled))
concurrent.futures.wait(futures)
# Can still use pool here for more processing as needed
请注意,选择如何将工作分解为期货仍然是棘手的,更大的设置是更多的开销,但也可以意味着更少的浪费工作。这也可以很容易地适应Python 3.6异步语法。