多处理会导致 Python 崩溃,并给出一个错误:调用 fork() 时可能已经在另一个线程中进行了

问题描述 投票:0回答:6

我对 Python 比较陌生,正在尝试为我的 for 循环实现一个多处理模块。

我在 img_urls 中存储了一组图像 url,我需要下载并应用一些 Google 视觉。

if __name__ == '__main__':

    img_urls = [ALL_MY_Image_URLS]
    runAll(img_urls)
    print("--- %s seconds ---" % (time.time() - start_time)) 

这是我的 runAll() 方法

def runAll(img_urls):
    num_cores = multiprocessing.cpu_count()

    print("Image URLS  {}",len(img_urls))
    if len(img_urls) > 2:
        numberOfImages = 0
    else:
        numberOfImages = 1

    start_timeProcess = time.time()

    pool = multiprocessing.Pool()
    pool.map(annotate,img_urls)
    end_timeProcess = time.time()
    print('\n Time to complete ', end_timeProcess-start_timeProcess)

    print(full_matching_pages)


def annotate(img_path):
    file =  requests.get(img_path).content
    print("file is",file)
    """Returns web annotations given the path to an image."""
    print('Process Working under ',os.getpid())
    image = types.Image(content=file)
    web_detection = vision_client.web_detection(image=image).web_detection
    report(web_detection)

当我运行它并且 python 崩溃时,我收到此警告

objc[67570]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67570]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67567]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67567]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67568]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67568]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67569]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67569]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67571]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67571]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
objc[67572]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[67572]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
python python-3.x multithreading macos
6个回答
418
投票

出现此错误的原因是在 macOS High Sierra 和更高版本的 macOS 中增加了限制多线程的安全性。 我知道这个答案有点晚了,但我使用以下方法解决了问题:

设置环境变量

.bash_profile
(对于最新的 macOS 为
.zshrc
)以允许在新的 macOS High Sierra 安全规则下使用多线程应用程序或脚本。

打开终端:

$ nano .bash_profile

将以下行添加到文件末尾:

OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

保存、退出、关闭终端并重新打开终端。 检查环境变量现在是否已设置:

$ env

您将看到类似于以下内容的输出:

TERM_PROGRAM=Apple_Terminal
SHELL=/bin/bash
TERM=xterm-256color
TMPDIR=/var/folders/pn/vasdlj3ojO#OOas4dasdffJq/T/
Apple_PubSub_Socket_Render=/private/tmp/com.apple.launchd.E7qLFJDSo/Render
TERM_PROGRAM_VERSION=404
TERM_SESSION_ID=NONE
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

您现在应该能够使用多线程运行 Python 脚本。


48
投票

运行 MAC 和 z-shell,在我的 .zshrc 文件中我必须添加:

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

然后在命令行中:

source ~/.zshrc

然后就成功了


25
投票

其他答案告诉你设置

OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
,但不要这样做!您只需在警告灯上贴上胶带即可。对于某些旧版软件,您可能需要根据具体情况进行设置,但绝对不要在您的
.bash_profile

中设置它

此问题已在 https://bugs.python.org/issue33725 (python3.8+) 中修复,但最佳实践是使用

with multiprocessing.get_context("spawn").Pool() as pool:
    pool.map(annotate,img_urls)

8
投票

OBJC_DISABLE_INITIALIZE_FORK_SAFETY = YES
解决方案对我不起作用。 另一个潜在的解决方案是在脚本环境中设置
no_proxy = *
如此处所述

除了其他人提到的原因外,此错误消息也可能与网络有关。我的脚本有一个 TCP 服务器。 我什至不使用池,仅使用

os.fork
multiprocessing.Queue
来传递消息。 在我添加队列之前,叉子工作得很好。

设置 no_proxy 本身修复了我的情况。 如果您的脚本具有网络组件,请尝试此修复 - 也许与

OBJC_DISABLE_INITIALIZE_FORK_SAFETY
结合使用。


2
投票

在环境中没有

OBJC_DISABLE_INITIALIZE_FORK_SAFETY
标志的情况下对我有用的解决方案涉及在
multiprocessing.Pool
程序启动后立即初始化
main()
类。

这很可能不是最快的解决方案,我不确定它是否适用于所有情况,但是,在程序启动之前尽早预热工作进程不会导致任何

... may have been in progress in another thread when fork() was called
错误,而且我确实得到了与使用非并行化代码相比,性能显着提升。

我创建了一个便利类

Parallelizer
,我很早就开始使用它,然后在程序的整个生命周期中使用它。完整版本可以在这里找到。

# entry point to my program
def main():
    parallelizer = Parallelizer()
    ...

然后每当你想要并行化时:

# this function is parallelized. it is run by each child process.
def processing_function(input):
    ...
    return output

...
inputs = [...]
results = parallelizer.map(
    inputs,
    processing_function
)

以及并行器类:

class Parallelizer:
    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(multiprocessing.cpu_count(),
                                         Parallelizer._run,
                                         (self.input_queue, self.output_queue,))

    def map(self, contents, processing_func):
        size = 0
        for content in contents:
            self.input_queue.put((content, processing_func))
            size += 1
        results = []
        while size > 0:
            result = self.output_queue.get(block=True)
            results.append(result)
            size -= 1
        return results

    @staticmethod
    def _run(input_queue, output_queue):
        while True:
            content, processing_func = input_queue.get(block=True)
            result = processing_func(content)
            output_queue.put(result)

一个警告:并行化代码可能很难调试,因此我还准备了类的非并行化版本,当子进程中出现问题时我会启用它:

class NullParallelizer:
    @staticmethod
    def map(contents, processing_func):
        results = []
        for content in contents:
            results.append(processing_func(content))
        return results

0
投票

我在 MacOS 上遇到了这个问题,以下标志对我有用。

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
© www.soinside.com 2019 - 2024. All rights reserved.