我正在尝试同时对 2 个函数进行多进程处理。这些功能之一是一个 websocket 连接,它不断地向 django 站点发送消息。另一个只是一个无限循环。我的第二个函数没有输出。
这是我的脚本
import asyncio
import websockets
import json
from multiprocessing import Process
from multiprocessing import freeze_support
async def SendSMS():
async with websockets.connect('ws://localhost:8000/ws/endpoint/chat/', ping_interval=None) as websocket:
# await websocket.send(json.dumps({'message': f'red', 'token': 'hello'}))
i = 0
while True:
i+=1
if i%2==0:
await websocket.send(json.dumps({'message': f'red', 'token': 'hello'}))
print(str(i)+" were saying red")
else:
await websocket.send(json.dumps({'message': f'green', 'token': 'hello'}))
print(str(i)+" were saying green")
await asyncio.sleep(1)
import time
def function2():#
while True:
print("popopopopopopopopop")
time.sleep(1)
if __name__ == '__main__':
freeze_support()
loop = asyncio.get_event_loop()
p1 = Process(target=loop.run_until_complete(SendSMS()))
p2 = Process(target=function2)
p2.start()
p1.start()
p2.join()
p1.join()
它几乎就像 function2 永远不会启动
这里是一些输出
C:\Users\tgmjack\Desktop\New folder (57)\mi_external_websocket\external.py:47: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
1 were saying green
2 were saying red
3 were saying green
4 were saying red
5 were saying green
6 were saying red
python 多进程异步函数可以吗?
为什么我的第二个函数没有输出?
#######更新
显然问题是像我正在做的那样将我的函数传递给进程,称之为 imediatley 并且我们永远不会通过那条线。
我最初这样调用我的异步函数(在多处理之前)
asyncio.get_event_loop().run_until_complete(SendSMS())
如何设置异步的多进程进程?
我尝试了以下选项
尝试 1)
p1 = Process(target=lambda: loop.run_until_complete(SendSMS()))
我说错了
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x0000019AEEEF01F0>: attribute lookup <lambda> on __main__ failed
和
line 102, in spawn_main
source_process = _winapi.OpenProcess(
OSError: [WinError 87] The parameter is incorrec
尝试 2)
p1 = Process(target = SendSMS)
我得到...
popopopopopopopopop
C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\multiprocessing\process.py:108: RuntimeWarning: coroutine 'SendSMS' was never awaited
self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
popopopopopopopopop
popopopopopopopopop
popopopopopopopopop
尝试 3)
p1 = Process(target = asyncio.get_event_loop().run_until_complete(SendSMS))
我得到...
C:\Users\tgmjack\Desktop\New folder (57)\mi_external_websocket\external.py:44: DeprecationWarning: There is no current event loop
p1 = Process(target = asyncio.get_event_loop().run_until_complete(SendSMS))
Traceback (most recent call last):
File "C:\Users\tgmjack\Desktop\New folder (57)\mi_external_websocket\external.py", line 44, in <module>
p1 = Process(target = asyncio.get_event_loop().run_until_complete(SendSMS))
File "C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 625, in run_until_complete
future = tasks.ensure_future(future, loop=self)
File "C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\asyncio\tasks.py", line 615, in ensure_future
return _ensure_future(coro_or_future, loop=loop)
File "C:\Users\tgmjack\AppData\Local\Programs\Python\Python310\lib\asyncio\tasks.py", line 630, in _ensure_future
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
TypeError: An asyncio.Future, a coroutine or an awaitable is required
您不应该将事件循环或异步函数传递给
multiprocessing.Process
,因为它们不是“简单数据类型”(可序列化),它们与操作系统特定的句柄相关联,并且不能在进程之间简单地传递。
相反,您需要子进程执行一个函数,该函数将在子进程中创建事件循环并让它执行您想要的函数,使用
asyncio.run
,这个函数应该在全局范围内定义,以便它可以被子进程。
import asyncio
from multiprocessing import Process
from multiprocessing import freeze_support
async def SendSMS():
while True:
print("SendSMS")
await asyncio.sleep(1)
def send_SMS_caller():
asyncio.run(SendSMS())
import time
def function2(): #
while True:
print("function2", flush=True)
time.sleep(1)
if __name__ == '__main__':
freeze_support()
p1 = Process(target=send_SMS_caller)
p2 = Process(target=function2)
p2.start()
p1.start()
p2.join()
p1.join()