使用 asyncio 时无法使用 os.fork() 将多个进程绑定到一个套接字服务器

问题描述 投票:0回答:2

我们都知道,使用 asyncio 可以显着提高套接字服务器的性能,如果我们能够利用 cpu 中的所有核心(可能通过多处理模块或

os.fork()
等),显然事情会变得更加出色

我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器侦听每个核心并全部绑定到一个端口。只需创建一个异步服务器,然后使用

os.fork()
,让流程竞争性地工作。

然而,当我尝试分叉时,单核精细代码遇到了一些麻烦。似乎在 epoll 选择器模块中从不同进程注册相同的文件描述符存在一些问题。

我在下面显示了一些代码,有人可以帮我吗?


这是使用 asyncio 的 echo 服务器的简单、逻辑清晰的代码:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

它工作正常,直到我尝试分叉、套接字绑定之后和服务器开始服务之前。 (这个逻辑在基于同步线程的代码中工作得很好。)


当我尝试这个时:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

理论上分叉的进程会绑定到同一个套接字吗?并在同一个事件循环中运行?然后就可以正常工作了吗?

但是我收到这些错误消息:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python 版本 3.7.3,

我对发生的事情完全感到困惑。

有人可以帮忙吗?谢谢

python multiprocessing selector python-asyncio epoll
2个回答
1
投票

根据跟踪器问题,不支持分叉现有的异步事件循环并尝试从多个进程使用它。然而,根据Yury对同一问题的评论,可以通过在开始循环之前分叉来实现多处理,因此在每个子进程中运行完全独立的异步循环。

您的代码实际上证实了这种可能性:虽然

create_server
async def
,但它不等待任何内容,也不使用
loop
参数。因此,我们可以通过以下方式实现 Yury 的方法:将
create_server
设为常规函数,删除
loop
参数,并在
os.fork()
之前调用它,并且仅在分叉后运行事件循环:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()

0
投票

我刚刚做了一个快速测试,这符合我的预期:

import asyncio
import os
import time
import socket
from pathlib import Path

async def other():
    while True:
       await asyncio.sleep(1)
       print("other {}".format(os.getuid()))

async def f():
    pid = os.fork()
    if pid == 0:
        loop = asyncio.new_event_loop()
        os.setuid(1000)
        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        Path("/tmp/sock.whatever").unlink(missing_ok = True)
        server.bind("/tmp/sock.whatever")
        server.listen(1)
        while True:
            loop.run_until_complete(asyncio.sleep(1))
            print("child {} {}".format(os.getuid(), pid))
            connection, client_address = server.accept()
            data = connection.recv(1024)
            print('{} {} Received data: {}'.format(os.getuid(), pid, data.decode()))
    else:
        while True:
            print("parent {} {}".format(os.getuid(), pid))
            if Path("/tmp/sock.whatever").exists():
                client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
                try:
                    client.connect("/tmp/sock.whatever")
                    message = 'foobar works {} {}'.format(os.getuid(), pid)
                    client.sendall(message.encode())
                    client.close()
                except:
                    pass
            await asyncio.sleep(1)

async def main():
    async with asyncio.TaskGroup() as tg:
        await asyncio.gather(tg.create_task(other()), tg.create_task(f()))

loop = asyncio.new_event_loop()
loop.run_until_complete(main())

使用这些内容创建一个脚本,将其命名为 fork.py 并使用 sudo 运行它:

sudo python fork.py
parent 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
other 0
child 1000 0
parent 0 591480
1000 0 Received data: foobar works 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480

据我所知,您不想覆盖父循环,但您必须在子循环中创建一个新循环,并且它必须有自己的事件循环(基于我能够收集到的信息)关于这个主题。)

© www.soinside.com 2019 - 2024. All rights reserved.