我想为异步函数创建一个上下文管理器,每次执行“移动”到另一个上下文时都会调用一个函数
例如
import os
import asyncio
class AsyncContextChangeDir:
def __init__(self, newdir):
self.curdir = os.getcwd()
self.newdir = newdir
async def __aenter__(self):
os.chdir(self.newdir)
async def __aexit__(self, exc_type, exc_value, traceback):
os.chdir(self.curdir)
async def workon_mypath():
async with AsyncContextChangeDir("/tmp"):
print("working in /tmp context manager, cwd:" + os.getcwd()) # /mypath
await asyncio.sleep(100)
print("working in /tmp context manager, cwd:" + os.getcwd()) # ???
async def workon_someotherpath():
await asyncio.sleep(10)
os.chdir("/home")
print("working in other context cwd:" + os.getcwd())
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
workon_mypath(),
workon_someotherpath()))
我希望第二次打印打印
/mypath
,并且显然每次执行“切换”到另一个上下文时都恢复以前的工作目录
最好的方法是什么?
与您从名称中所期望的相反,上下文管理器作为一个概念与上下文切换没有任何关系。
常规上下文管理器和异步上下文管理器都不会被告知事件循环“上下文切换”。上下文管理器无法检测到事件循环将开始运行另一个协程,并且上下文管理器无法在发生这种情况时执行代码。
我想我找到了一种检测上下文切换的方法,但正如任何人都会说的那样,这是一种肮脏的黑客行为,强烈建议不要在生产中使用它。另一个问题是我可能还不知道如何获取任务当前正在执行的特定函数。此过程涉及修改
BaseEventLoop._run_once
方法,如本例所示。
输出:
================= Context switching happen for main() in asyncio_testing_2.py, line 9 =================
start main
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 35 =================
start multiple_calls 1
start example_coroutine 1
start example_coroutine_extended 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 35 =================
start multiple_calls 2
start example_coroutine 2
start example_coroutine_extended 2
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 37 =================
end example_coroutine_extended 1
start example_coroutine_extended 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 37 =================
end example_coroutine_extended 2
start example_coroutine_extended 2
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 37 =================
end example_coroutine_extended 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 37 =================
end example_coroutine_extended 2
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 37 =================
end example_coroutine 1
start example_coroutine 1
start example_coroutine_extended 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 37 =================
end example_coroutine 2
start example_coroutine 2
start example_coroutine_extended 2
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 38 =================
end example_coroutine_extended 1
start example_coroutine_extended 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 38 =================
end example_coroutine_extended 2
start example_coroutine_extended 2
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 38 =================
end example_coroutine_extended 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 38 =================
end example_coroutine_extended 2
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 38 =================
end example_coroutine 1
start blocking_coroutine 1
end blocking_coroutine 1
end multiple_calls 1
================= Context switching happen for multiple_calls() in asyncio_testing_2.py, line 38 =================
end example_coroutine 2
start blocking_coroutine 2
end blocking_coroutine 2
end multiple_calls 2
================= Context switching happen for main() in asyncio_testing_2.py, line 13 =================
end main
Process finished with exit code 0
代码:
import asyncio
import heapq
import time
# noinspection PyUnresolvedReferences
from asyncio.base_events import MAXIMUM_SELECT_TIMEOUT, _MIN_SCHEDULED_TIMER_HANDLES, _MIN_CANCELLED_TIMER_HANDLES_FRACTION
from types import MethodType
async def main():
print("start main")
task1 = asyncio.create_task(multiple_calls(1))
task2 = asyncio.create_task(multiple_calls(2))
await asyncio.gather(task1, task2)
print("end main")
async def example_coroutine_extended(_id):
print(f"start example_coroutine_extended {_id}")
await asyncio.sleep(1)
print(f"end example_coroutine_extended {_id}")
async def example_coroutine(_id):
print(f"start example_coroutine {_id}")
await example_coroutine_extended(_id)
await example_coroutine_extended(_id)
await asyncio.sleep(1)
print(f"end example_coroutine {_id}")
async def blocking_coroutine(_id):
print(f"start blocking_coroutine {_id}")
time.sleep(1)
print(f"end blocking_coroutine {_id}")
async def multiple_calls(_id):
print(f"start multiple_calls {_id}")
await example_coroutine(_id)
await example_coroutine(_id)
await blocking_coroutine(_id)
print(f"end multiple_calls {_id}")
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Needed to break cycles when an exception occurs.
event_list = None
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
#################### MODIFIED ####################
task = None
if type(handle._callback).__name__!='function':
task = handle._callback.__self__
if isinstance(task, asyncio.Task):
on_context_switch_event_handler(handle, task)
handle._run()
##################################################
handle = None # Needed to break cycles when an exception occurs.
def on_context_switch_event_handler(handle, task):
task_code_info = task.get_coro().cr_code
task_frame_info = task.get_coro().cr_frame
func_name = get_coro_name(task.get_coro())
line_pos = task_frame_info.f_lineno
file_name = task_code_info.co_filename.replace("\\", "/").split("/")[-1]
print(f"================= Context switching happen for {func_name} in {file_name}, line {line_pos} ================= ")
def get_coro_name(coro):
# Coroutines compiled with Cython sometimes don't have
# proper __qualname__ or __name__. While that is a bug
# in Cython, asyncio shouldn't crash with an AttributeError
# in its __repr__ functions.
if hasattr(coro, '__qualname__') and coro.__qualname__:
coro_name = coro.__qualname__
elif hasattr(coro, '__name__') and coro.__name__:
coro_name = coro.__name__
else:
# Stop masking Cython bugs, expose them in a friendly way.
coro_name = f'<{type(coro).__name__} without __name__>'
return f'{coro_name}()'
loop = asyncio.get_event_loop()
loop._run_once = MethodType(_run_once, loop)
loop.run_until_complete(main())