我有一个“服务器”线程,它应该监听来自其他服务器线程的调用/事件,同时执行一些其他代码。最近我经常使用 Node.js,所以我认为使用 async/await 创建一个事件循环会很好,我可以在其中等待其他线程加入事件循环并在它们最终加入时处理它们的响应。
为了测试这个想法,我用 Python 3.5 编写了以下测试脚本:
# http://stackabuse.com/python-async-await-tutorial/
# Testing out Python's asynchronous features
import asyncio
from time import sleep
import threading
from threading import Thread
import random
class MyThread(Thread):
def __init__(self, message):
Thread.__init__(self)
self._message = message
def run(self):
self._return = self._message + " oli viesti"
a = random.randint(1, 5)
print("Sleep for ", a)
sleep(a)
print("Thread exiting...")
def join(self):
Thread.join(self)
return self._return
async def send(message):
t = MyThread(message) # daemon = True
t.start()
print("asd")
return t.join()
async def sendmsg(msg):
response = await send(msg)
print("response is ", response)
if __name__ == "__main__":
# Initiate a new thread and pass in keyword argument dictionary as parameters
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(sendmsg("hippa1"), loop=loop),
asyncio.ensure_future(sendmsg("hippa2"), loop=loop),
asyncio.ensure_future(sendmsg("hippa3"), loop=loop)
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
在示例中,我想启动三个具有不同字符串的工作线程并等待它们完成。工作人员睡眠的时间是随机的,因此我希望他们在脚本多次运行时以随机的顺序完成。事实证明,它们似乎是按顺序执行的,第二个线程在第一个线程之后启动。
我的错误是什么? sleep 不应该只阻塞它所在的线程吗?我的事件循环设置正确吗?我可以异步/等待连接吗?
最终我想向其他线程发送消息并等待它们的响应,然后运行具有返回值的回调函数。
编辑:为了澄清,最终我想在主线程中等待带有 async/await 的条件变量并运行其他代码,直到某些条件变量允许执行。在此示例代码中,我尝试对工作线程的连接执行相同的操作。
最终,由于这段代码,它是按顺序运行的:
async def send(message):
t = MyThread(message) # daemon = True
t.start()
print("asd")
return t.join()
您启动一个线程,然后立即等待该线程完成,然后再继续。这就是为什么它们按顺序执行。
Node.js 和 asyncio 不一定会创建新线程来执行其操作。例如,Node.js 仅使用单个线程,但它使用内核级函数(例如“epoll”)来调用您在发生某些新网络活动时指示的回调。这允许单个线程管理数百个网络连接。
这就是为什么当您在没有 Thread 实例的情况下执行此操作时,您会在当前运行的线程(与主线程相同)上调用 sleep 。当您将 asyncio 与网络功能结合使用时,您可以使用“yield from”结构,这允许在其他任务与其他远程服务一起执行操作时执行其他代码块。
主体结构正确。你想要这段代码:
loop.run_until_complete(asyncio.wait(tasks))
但是不要依赖‘sleep’来测试功能,你需要进行网络调用,或者使用:
yield from asyncio.sleep(1)
在这种情况下不需要启动单独的线程。
你已经知道原因了:
Thread.join()
阻塞了事件循环。所以我会直接讨论解决方案。
如果你真的需要异步
Thread.join()
,那你就不走运了。因为这种情况下你需要启动一个单独的线程来等待:
await asyncio.to_thread(t.join)
或
await asyncio.get_running_loop().run_in_executor(executor, t.join)
因此,由于
threading
模块中线程的实现方式。当然它也有缺点:您无法取消已启动的线程。您将不得不忘记异步超时,因为每次尝试执行 Thread.join()
都会启动另一个线程。你实际上会遇到线程泄漏!
替代解决方案和组合解决方案都是轮询线程是否处于活动状态。这样你就会浪费CPU资源。如果你启动一个额外的线程来等待,那么返回会很快。但如果不是,返回的时间将与您设置的超时时间一样长。
如果您必须等待来自另一个线程的事件,那么您很幸运。这个问题已经在 StackOverflow 上被问过。只需使用那里的任何解决方案并在线程中调用 set()
方法即可。
aiologic.CountdownEvent
(我是
aiologic的创造者)。
import time
import asyncio
from threading import Thread
from aiologic import CountdownEvent
def work(event):
try:
time.sleep(1)
finally:
event.down() # -1 thread to wait
async def main():
event = CountdownEvent()
for _ in range(4):
event.up() # +1 thread to wait
Thread(target=work, args=[event]).start()
print("before")
await event # wait for all threads
print("after")
asyncio.run(main())
这是一个通用同步原语,您可以使用它等待您想要的内容。
up()
和
down()
方法分别增加和减少倒计时事件值。当它为零时,所有等待任务都被唤醒。
aiologic
中的同步原语,例如
aiologic.Lock
、aiologic.Semaphore
或 aiologic.Condition
。他们只是工作。