多处理 - 在不破坏池的情况下取消池中的剩余作业

问题描述 投票:1回答:2

我正在使用map_async创建一个包含4个工作线程的池。并给它一个图像文件列表来处理[Set 1]。 有时,我需要取消之间的处理,以便我可以改为处理一组不同的文件[Set 2]。

所以一个示例情况是,我给了map_async 1000个文件来处理。然后想要在处理了大约200个文件后取消剩余作业的处理。 此外,我想在不破坏/终止池的情况下进行此取消。这可能吗?

我不想终止池,因为在Windows上重新创建池是一个缓慢的过程(因为它使用'spawn'而不是'fork')。我需要使用同一个池来处理一组不同的图像文件[Set 2] ..

# Putting job_set1 through processing. It may consist of 1000 images
cpu = multiprocessing.cpu_count()
pool = Pool(processes=cpu)
result = pool.map_async(job_set1, thumb_ts_list, chunksize=chunksize)

现在介于两者之间,我需要取消对此设置1的处理。并转到另一组(等待所有1000个图像完成处理不是一个选项,但我可以等待处理的当前图像完成)

<Somehow cancel processing of job_set1>
result = pool.map_async(job_set2, thumb_ts_list, chunksize=chunksize)
python python-3.x multiprocessing python-multiprocessing python-internals
2个回答
1
投票

现在是时候了fundamental theorem of software engineering:虽然multiprocessing.Pool没有提供取消作为一个功能,我们可以通过从精心制作的可迭代中读取Pool来添加它。然而,有一个生成器,yields从列表中的值,但在某些信号上停止短路是不够的,因为Pool急切地消耗任何给它的发生器。所以我们需要一个非常精心设计的可迭代。

A lazy Pool

我们需要的通用工具是一种仅在工作人员可用时才构建Pool任务的方法(或者在构建它们需要很长时间的情况下,最多只能执行一项任务)。基本的想法是减慢Pool的线程收集工作,只有在任务完成时才能提升信号量。 (我们知道这样的线程存在于imap_unordered的可观察行为中。)

import multiprocessing
from threading import Semaphore

size=multiprocessing.cpu_count()  # or whatever Pool size to use

# How many workers are waiting for work?  Add one to buffer one task.
work=Semaphore(size)

def feed0(it):
  it=iter(it)
  try:
    while True:
      # Don't ask the iterable until we have a customer, in case better
      # instructions become available:
      work.acquire()
      yield next(it)
  except StopIteration: pass
  work.release()
def feed(p,f,it):
  import sys,traceback
  iu=p.imap_unordered(f,feed0(it))
  while True:
    try: x=next(iu)
    except StopIteration: return
    except Exception: traceback.print_exception(*sys.exc_info())
    work.release()
    yield x

try中的feed可防止孩子们破坏信号量计数,但请注意,它不能防止父母的失败。

A cancelable iterator

现在我们可以实时控制Pool输入,制定任何调度策略都很简单。例如,这里有类似itertools.chain的东西,但能够异步丢弃其中一个输入序列中的任何剩余元素:

import collections,queue

class Cancel:
  closed=False
  cur=()
  def __init__(self): self.data=queue.Queue() # of deques
  def add(self,d):
    d=collections.deque(d)
    self.data.put(d)
    return d
  def __iter__(self):
    while True:
      try: yield self.cur.popleft()
      except IndexError:
        self.cur=self.data.get()
        if self.cur is None: break
  @staticmethod
  def cancel(d): d.clear()
  def close(self): self.data.put(None)

这是线程安全的(至少在CPython中),尽管缺少锁定,因为像deque.clear这样的操作在Python检查方面是原子的(我们不单独检查self.cur是否为空)。

Usage

制作其中一个看起来像

pool=mp.Pool(size)
can=Cancel()
many=can.add(range(1000))
few=can.add(["some","words"])
can.close()
for x in feed(pool,assess_happiness,can):
  if happy_with(x): can.cancel(many)  # straight onto few, then out

当然,adds和close本身可以在循环中。


0
投票

multiprocessing模块似乎没有取消的概念。当您有足够的结果时,您可以使用concurrent.futures.ProcessPoolExecutor包装并取消待处理的期货。

这是一个从一组路径中挑选出10个JPEG的示例,并取消待处理的期货,同时让流程池可用:

import concurrent.futures


def interesting_path(path):
    """Gives path if is a JPEG else ``None``."""
    with open(path, 'rb') as f:
        if f.read(3) == b'\xff\xd8\xff':
            return path
        return None


def find_interesting(paths, count=10):
     """Yields count from paths which are 'interesting' by multiprocess task."""
    with concurrent.futures.ProcessPoolExecutor() as pool:
        futures = {pool.submit(interesting_path, p) for p in paths}
        print ('Started {}'.format(len(futures)))
        for future in concurrent.futures.as_completed(futures):
            res = future.result()
            futures.remove(future)
            if res is not None:
                yield res
                count -= 1
                if count == 0:
                    break
        cancelled = 0
        for future in futures:
            cancelled += future.cancel()
        print ('Cancelled {}'.format(cancelled))
        concurrent.futures.wait(futures)
        # Can still use pool here for more processing as needed

请注意,选择如何将工作分解为期货仍然是棘手的,更大的设置是更多的开销,但也可以意味着更少的浪费工作。这也可以很容易地适应Python 3.6异步语法。

© www.soinside.com 2019 - 2024. All rights reserved.