我们都知道,使用 asyncio 可以显着提高套接字服务器的性能,如果我们能够利用 cpu 中的所有核心(可能通过多处理模块或
os.fork()
等),显然事情会变得更加出色
我现在正在尝试构建一个多核套接字服务器演示,其中一个异步套接字服务器侦听每个核心并全部绑定到一个端口。只需创建一个异步服务器,然后使用
os.fork()
,让流程竞争性地工作。
然而,当我尝试分叉时,单核精细代码遇到了一些麻烦。似乎在 epoll 选择器模块中从不同进程注册相同的文件描述符存在一些问题。
我在下面显示了一些代码,有人可以帮我吗?
这是使用 asyncio 的 echo 服务器的简单、逻辑清晰的代码:
import os
import asyncio #,uvloop
from socket import *
# hendler sends back incoming message directly
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
async def create_server(loop):
sock = socket(AF_INET ,SOCK_STREAM)
sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
sock.bind(('',25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
while True:
client ,addr = await loop.sock_accept(sock)
loop.create_task(handler(loop ,client))
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()
它工作正常,直到我尝试分叉、套接字绑定之后和服务器开始服务之前。 (这个逻辑在基于同步线程的代码中工作得很好。)
当我尝试这个时:
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop.create_task(serving(loop, sock))
loop.run_forever()
理论上分叉的进程会绑定到同一个套接字吗?并在同一个事件循环中运行?然后就可以正常工作了吗?
但是我收到这些错误消息:
Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/test/temp1.py", line 23, in serving
client ,addr = await loop.sock_accept(sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
self._sock_accept(fut, False, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
self.add_reader(fd, self._sock_accept, fut, True, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
return self._add_reader(fd, callback, *args)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
(handle, None))
File "/usr/local/lib/python3.7/selectors.py", line 359, in register
self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists
Python 版本 3.7.3,
我对发生的事情完全感到困惑。
有人可以帮忙吗?谢谢
根据跟踪器问题,不支持分叉现有的异步事件循环并尝试从多个进程使用它。然而,根据Yury对同一问题的评论,可以通过在开始循环之前分叉来实现多处理,因此在每个子进程中运行完全独立的异步循环。
您的代码实际上证实了这种可能性:虽然
create_server
是 async def
,但它不等待任何内容,也不使用 loop
参数。因此,我们可以通过以下方式实现 Yury 的方法:将 create_server
设为常规函数,删除 loop
参数,并在 os.fork()
之前调用它,并且仅在分叉后运行事件循环:
import os, asyncio, socket, multiprocessing
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
def create_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', 25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
while True:
client, addr = await loop.sock_accept(sock)
loop.create_task(handler(loop, client))
sock = create_server()
for num in range(multiprocessing.cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
我刚刚做了一个快速测试,这符合我的预期:
import asyncio
import os
import time
import socket
from pathlib import Path
async def other():
while True:
await asyncio.sleep(1)
print("other {}".format(os.getuid()))
async def f():
pid = os.fork()
if pid == 0:
loop = asyncio.new_event_loop()
os.setuid(1000)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
Path("/tmp/sock.whatever").unlink(missing_ok = True)
server.bind("/tmp/sock.whatever")
server.listen(1)
while True:
loop.run_until_complete(asyncio.sleep(1))
print("child {} {}".format(os.getuid(), pid))
connection, client_address = server.accept()
data = connection.recv(1024)
print('{} {} Received data: {}'.format(os.getuid(), pid, data.decode()))
else:
while True:
print("parent {} {}".format(os.getuid(), pid))
if Path("/tmp/sock.whatever").exists():
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
client.connect("/tmp/sock.whatever")
message = 'foobar works {} {}'.format(os.getuid(), pid)
client.sendall(message.encode())
client.close()
except:
pass
await asyncio.sleep(1)
async def main():
async with asyncio.TaskGroup() as tg:
await asyncio.gather(tg.create_task(other()), tg.create_task(f()))
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
使用这些内容创建一个脚本,将其命名为 fork.py 并使用 sudo 运行它:
sudo python fork.py
parent 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
other 0
child 1000 0
parent 0 591480
1000 0 Received data: foobar works 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
据我所知,您不想覆盖父循环,但您必须在子循环中创建一个新循环,并且它必须有自己的事件循环(基于我能够收集到的信息)关于这个主题。)