上述法规偶尔会导致状态不一致甚至悬挂。 我的问题是:
在异步任务和同步线程之间共享可变状态的最佳实践是什么? 我是否应该仅依靠线锁,还是有助于确保线程安全的异步特定构造(或其他设计模式)? 将所有状态突变局限于Asyncio线程并通过消息队列或类似模式与其他线程进行通信? 任何解释强大解决方案的见解或代码示例都将不胜感激。
使用Asyncio任务时唯一可以使您独立状态的事情是Python的“上下文variables”和Contexts。 Asyncio已经演变为以引擎盖下的无缝方式使用这些上下文,并且由于Python 3.10(或3.11我不记得),甚至允许大多数Asyncio任务入口点将明确的上下文作为参数。问题是,当您引入线程时,异步的“自动状态分离”事物与众不同:尽管每个胎面都会保留单独的上下文(因此,其
ContextVar
S的单独值),默认情况下,所有上下文vars都重置为新线程中的默认值。相比之下,创建新任务将接收当前运行上下文的副本。
I will see if I can set an example for your simplified code, if I can make sense of it - and them point you to use myextracontext
set
ContextPreservingExecutor
ThreadPoolExecutor
in asyncio's
loop.run_in_executor
call, which will share a context copy in a transparent way with sync function called in the thread.(由于您的示例不包含来自同步代码的线程不同步调用,因此我无法在您的代码中调整它的示例。)。 Extracontext Repo在这里https://github.com/jsbueno/jsbueno/extracontext(用于适当生成文档的HELP),否则,该包装很好,以“ Python-Extracontext”发表,pypytextext上的“ Python-Extracontext”)
回到您的示例代码:您的简化示例也许是简化的,其中要共享的状态是在全局字典中 - 这是“免费”的“免费”。
lock
使用的一个大问题是:在异步上下文中,thrreading锁定是在阻塞。 如果您确实在实际代码中使用它,最终将阻止事件循环。 (我有一个“变速箱”异步任务的食谱,该任务将与螺纹锁的somerwher同步,如果您最终需要它,请与syncio.lock保持同步,稍后让我知道)。由于您在这里只运行一个任务,因此我很难想象要保持哪些数据。
我在您的方法中看到的是尝试从另一个线程启动事件环:问题是,您无法控制事件循环的生命周期:在最好的情况下,您将无法找到停止此应用程序的好方法。我更喜欢在辅助线程本身中较小函数中创建环路本身的方法。然后,它可以设置一个较小的任务,该任务可以接收要执行的联合行驶,您可以拥有一个以干净的方式停止循环本身的哨兵值 - 此示例中的代码将适用于此。 如上所述:对于数据共享的数据:螺纹锁将起作用:但是,如果在同步线程中使用时,这只会使Python切换到其他线程,当在异步线程中使用时,它将the loop the loop
block,并且该线程中的所有同步任务都将在asyncio loop.incio loop.incio loop.inds中。
如果您需要不同的数据在不同的任务中似乎是在偏离异步循环中的不同任务中,正如我上面说的那样,您的示例代码不涵盖它(然后,Extracontext软件包和ContextVars应该将您带到那里)除此之外,此示例可以显示出一种干净的方式来运行外线事件循环:
import asyncio
import threading
import time
loop_registry = {}
shared_state = {}
lock = loop = t = None
STOP_LOOP = object()
async def async_worker():
global stop_loop
print("starting async worker")
while True:
await asyncio.sleep(0.1)
# Update shared state
with lock:
# I opted to show two inccrements with a pause to show the sync thread won't run with the lock on:
value = shared_state['value'] = shared_state.get('value', 0) + 1
print("Async:", shared_state['value'])
await asyncio.sleep(0.3)
value = shared_state['value'] = shared_state.get('value', 0) + 1
print("Async:", shared_state['value'])
async def loop_manager(queue):
tasks = set()
while True:
print("awaiting new coro")
new_coro = await queue.get()
print("got coro: ", new_coro)
if new_coro is STOP_LOOP:
for task in tasks:
task.cancel()
return
tasks.add(asyncio.create_task(new_coro))
await asyncio.sleep(0.5)
async def dispatch_coro(queue, coro):
queue.put_nowait(coro)
def thread_loop_driver(queue):
loop = asyncio.new_event_loop()
loop_registry[threading.current_thread()] = loop
loop.run_until_complete(loop_manager(queue))
loop.close()
# Meanwhile, update shared state from the main thread
def update_from_main():
for _ in range(5):
with lock:
shared_state['value'] = shared_state.get('value', 0) + 10
print("Main thread update:", shared_state['value'])
time.sleep(0.2)
def main():
global lock, loop, t
lock = threading.Lock()
queue = asyncio.Queue() # FIXME: in "real world" code, there should be one queue for each thread/asyncio loop: just stuff them in the "loop registry" as well
time.sleep(0.01) # give time to thread worker to setup the loop
print(loop_registry)
loop = loop_registry[t]
# Schedule async_worker in the separate event loop
asyncio.run_coroutine_threadsafe(dispatch_coro(queue, async_worker()), loop)
update_from_main()
asyncio.run_coroutine_threadsafe(dispatch_coro(queue, STOP_LOOP), loop)
main()