Python 多处理。队列放置会杀死 Docker 容器中的进程

问题描述 投票:0回答:1

我正在 Docker 容器中运行一个小型

multiprocessing
测试。该测试使用
multiprocessing.Queue
以及消费者和生产者。下面可以找到导致崩溃的最少代码以及 Dockerfile。 在代码中,主进程启动一个新进程,该进程接收对
Queue
的引用,并使用
Queue.put()
将队列中的整数放入主进程。我看到的是,生产者进程在调用
.put()
期间被终止,没有任何
Exception
被引发。

关于为什么

.put()
杀死进程有什么想法吗?这确实可以在本地 (macOS) 上运行,但不能在容器的 Python 基础映像上运行。 Python版本是3.9.16。

import multiprocessing as mp
import time
import traceback
from typing import Any, Optional

import psutil

base_file = "logs.txt"


def main() -> None:
    queue: Any = mp.Queue()
    print("Queue created")
    print("Starting producer process")

    p = mp.get_context("spawn").Process(target=producer, args=(queue,), daemon=True)
    p.start()
    print(f"Main: producer started: {p.pid}")

    alive = True
    while alive:
        alive = p.is_alive()
        print(f"Ha ha ha staying alive, producer: {p.is_alive()}")
        time.sleep(1)

    print("Every process is dead :( ")

def producer(q: mp.Queue) -> None:
    with open(f"producer.{base_file}", "w") as f:
        print("Producer: started", file=f, flush=True)
        current_value: int = 0
        while True:
            print(f"Producer: Adding value {current_value} to queue", file=f, flush=True)
            try:
                q.put(current_value, block=False)
            except BaseException as e:
                print(f"Producer: exception: {e}", file=f, flush=True)
                print(f"{traceback.format_exc()}", file=f, flush=True)
                raise e
            print(f"Producer: Value {current_value} added to queue", file=f, flush=True)
            print("Producer: Sleeping for 1 second", file=f, flush=True)
            time.sleep(1)
            current_value += 1


if __name__ == "__main__":
    main()
FROM python:3.9.16

RUN apt-get update && apt-get install -y gettext git mime-support && apt-get clean

RUN python3 -m pip install psutil

COPY ./multiprocessing_e2e.py /src/multiprocessing_e2e.py

WORKDIR /src

CMD ["python", "-u", "multiprocessing_e2e.py"]

python python-3.x docker multiprocessing
1个回答
0
投票

当尝试在 fedora39、WSL 中运行您的示例时,我收到一条错误消息,该消息可能解释了问题:

    raise RuntimeError('A SemLock created in a fork context is being '
RuntimeError: A SemLock created in a fork context is being shared with a process in a spawn context. This is not supported. Please use the same context to create multiprocessing objects and Process.

为什么它在 MacOS 上不会崩溃,因为默认使用 Spawn。 事实上,更改队列和进程以从“spawn”上下文开始是有效的:

import multiprocessing as mp
import time
import traceback
from typing import Any, Optional

import psutil

base_file = "logs.txt"


def main() -> None:
    mp_ctx = mp.get_context("spawn")
    queue: Any = mp_ctx.Queue()
    print("Queue created")
    print("Starting producer process")

    p = mp_ctx.Process(target=producer, args=(queue,), daemon=True)
    p.start()
    print(f"Main: producer started: {p.pid}")

    alive = True
    while alive:
        alive = p.is_alive()
        print(f"Ha ha ha staying alive, producer: {p.is_alive()}")
        time.sleep(1)

    print("Every process is dead :( ")

...
© www.soinside.com 2019 - 2024. All rights reserved.