我需要线程安全原语(锁定、条件信号量),它们存在于 asyncio 生态系统中吗?
我自己创建了一些代码,但感觉有点迟缓:
import asyncio
from threading import get_ident,Lock,Event
class AsyncThreadsafeEvent():
def __init__(self,*args,**kwargs):
super(AsyncThreadsafeEvent, self).__init__()
self.waiters = {}
self.event = Event()
def set(self):
self.event.set()
events = self.waiters.values()
for loop,event in events:
loop.call_soon_threadsafe(event.set)
async def wait(self):
event = asyncio.Event()
#not strictly necessary but could provide tiny speedup
if self.event.is_set():
return
self.waiters[get_ident()] = (asyncio.get_event_loop(),event)
#to ensure thread safty
if self.event.is_set():
return
await event.wait()
如有任何帮助,我们将不胜感激!
它们确实存在。您可以使用
aiologic.Lock
、aiologic.Semaphore
、aiologic.Condition
等(我是 aiologic 的创建者)。 aiologic
提供的所有原语都是完全线程安全的。当然,它们都是异步感知和线程感知的:它们可以在异步和同步环境中使用。
import asyncio
from aiologic import Condition
cond = Condition()
def work():
with cond:
print("thread")
cond.notify()
cond.wait()
print("thread")
async def main():
async with asyncio.TaskGroup() as tasks, cond:
tasks.create_task(asyncio.to_thread(work))
print("asyncio")
await cond
cond.notify()
print("asyncio")
asyncio.run(main())
但是,由于它们的双重性质,它们在接口和语义上与您可以在 threading 模块或 asyncio 模块中找到的原语存在一些差异。因此,请随意在 GitHub 上创建新的讨论。我很乐意回答您的问题。