python 多进程异步函数可以吗?

问题描述 投票:0回答:1

我正在尝试同时对 2 个函数进行多进程处理。这些功能之一是一个 websocket 连接,它不断地向 django 站点发送消息。另一个只是一个无限循环。我的第二个函数没有输出。

这是我的脚本

import asyncio
import websockets
import json

from multiprocessing import Process
from multiprocessing import freeze_support

async def SendSMS():
    async with websockets.connect('ws://localhost:8000/ws/endpoint/chat/', ping_interval=None) as websocket:
        # await websocket.send(json.dumps({'message': f'red', 'token': 'hello'}))
        i = 0
        while True:
            i+=1
            if i%2==0:
                await websocket.send(json.dumps({'message': f'red', 'token': 'hello'}))
                print(str(i)+"   were saying red")
            else:
                await websocket.send(json.dumps({'message': f'green', 'token': 'hello'}))
                print(str(i)+"   were saying green")
            await asyncio.sleep(1)

import time
def function2():#
    while True:
        print("popopopopopopopopop")
        time.sleep(1)
        
if __name__ == '__main__':
    freeze_support()
    loop = asyncio.get_event_loop()
    p1 = Process(target=loop.run_until_complete(SendSMS()))
    p2 = Process(target=function2)
    p2.start()
    p1.start()
    p2.join()
    p1.join()

它几乎就像 function2 永远不会启动

这里是一些输出

 C:\Users\tgmjack\Desktop\New folder (57)\mi_external_websocket\external.py:47: DeprecationWarning: There is no current event loop
  loop = asyncio.get_event_loop()
1   were saying green
2   were saying red
3   were saying green
4   were saying red
5   were saying green
6   were saying red

python 多进程异步函数可以吗?

为什么我的第二个函数没有输出?

#######更新

显然问题是像我正在做的那样将我的函数传递给进程,称之为 imediatley 并且我们永远不会通过那条线。

我最初这样调用我的异步函数(在多处理之前)

asyncio.get_event_loop().run_until_complete(SendSMS())

如何设置异步的多进程进程?

我尝试了以下选项

尝试 1)

p1 = Process(target=lambda: loop.run_until_complete(SendSMS()))

我说错了

ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x0000019AEEEF01F0>: attribute lookup <lambda> on __main__ failed

 line 102, in spawn_main
    source_process = _winapi.OpenProcess(
OSError: [WinError 87] The parameter is incorrec

尝试 2)

p1 = Process(target = SendSMS) 

我得到...

    popopopopopopopopop
C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\multiprocessing\process.py:108: RuntimeWarning: coroutine 'SendSMS' was never awaited
  self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
popopopopopopopopop
popopopopopopopopop
popopopopopopopopop

尝试 3)

p1 = Process(target = asyncio.get_event_loop().run_until_complete(SendSMS)) 

我得到...

C:\Users\tgmjack\Desktop\New folder (57)\mi_external_websocket\external.py:44: DeprecationWarning: There is no current event loop
  p1 = Process(target = asyncio.get_event_loop().run_until_complete(SendSMS))
Traceback (most recent call last):
  File "C:\Users\tgmjack\Desktop\New folder (57)\mi_external_websocket\external.py", line 44, in <module>
    p1 = Process(target = asyncio.get_event_loop().run_until_complete(SendSMS))
  File "C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 625, in run_until_complete
    future = tasks.ensure_future(future, loop=self)
  File "C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\asyncio\tasks.py", line 615, in ensure_future
    return _ensure_future(coro_or_future, loop=loop)
  File "C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\asyncio\tasks.py", line 630, in _ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable '
TypeError: An asyncio.Future, a coroutine or an awaitable is required
python asynchronous websocket multiprocessing
1个回答
1
投票

您不应该将事件循环或异步函数传递给

multiprocessing.Process
,因为它们不是“简单数据类型”(可序列化),它们与操作系统特定的句柄相关联,并且不能在进程之间简单地传递。

相反,您需要子进程执行一个函数,该函数将在子进程中创建事件循环并让它执行您想要的函数,使用

asyncio.run
,这个函数应该在全局范围内定义,以便它可以被子进程。

import asyncio

from multiprocessing import Process
from multiprocessing import freeze_support


async def SendSMS():
    while True:
        print("SendSMS")
        await asyncio.sleep(1)


def send_SMS_caller():
    asyncio.run(SendSMS())


import time


def function2():  #
    while True:
        print("function2", flush=True)
        time.sleep(1)


if __name__ == '__main__':
    freeze_support()
    p1 = Process(target=send_SMS_caller)
    p2 = Process(target=function2)
    p2.start()
    p1.start()
    p2.join()
    p1.join()
© www.soinside.com 2019 - 2024. All rights reserved.