Python:同时在异步协程和同步函数上挂起

问题描述 投票:0回答:1

我想在执行同步功能期间建立SSH SOCKs隧道(使用asyncssh)。功能完成后,我想拆除隧道并退出。

显然,必须使异步功能保持正常运行,以便保持隧道正常工作,因此重要的是conn.wait_closed()和同步功能是同时执行的。所以我很确定我确实需要第二个线程。我首先使用ThreadPoolExecutorrun_in_executor尝试了一些更聪明的方法,但最终遇到了下面的糟糕透顶的变体。#! /usr/bin/env python3 import traceback from threading import Thread from concurrent.futures import ThreadPoolExecutor import asyncio, asyncssh, sys _server="127.0.0.1" _port=22 _proxy_port=8080 async def run_client(): conn = await asyncio.wait_for( asyncssh.connect( _server, port=_port, options=asyncssh.SSHClientConnectionOptions(client_host_keysign=True), ), 10, ) listener = await conn.forward_socks('127.0.0.1', _proxy_port) return conn async def do_stuff(func): try: conn = await run_client() print("SSH tunnel active") def start_loop(loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"worker loop: {e}") async def thread_func(): ret=await func() print("Func done - tearing done worker thread and SSH connection") conn.close() # asyncio.get_event_loop().stop() return ret func_loop = asyncio.new_event_loop() func_thread = Thread(target=start_loop, args=(func_loop,)) func_thread.start() print("thread started") fut = asyncio.run_coroutine_threadsafe(thread_func(), func_loop) print(f"fut scheduled: {fut}") done = await asyncio.gather(asyncio.wrap_future(fut), conn.wait_closed()) print("wait done") for ret in done: print(f"ret={ret}") # Canceling pending tasks and stopping the loop # asyncio.gather(*asyncio.Task.all_tasks()).cancel() print("stopping func_loop") func_loop.call_soon_threadsafe(func_loop.stop()) print("joining func_thread") func_thread.join() print("joined func_thread") except (OSError, asyncssh.Error) as exc: sys.exit('SSH connection failed: ' + str(exc)) except (Exception) as exc: sys.exit('Unhandled exception: ' + str(exc)) traceback.print_exc() async def just_wait(): print("starting just_wait") input() print("ending just_wait") return 42 asyncio.get_event_loop().run_until_complete(do_stuff(just_wait))

它实际上“正确”地“正常”工作,直到结束join辅助线程时出现异常。我猜想是因为我做的事情不是线程安全的。

Exception in callback None() handle: <Handle> Traceback (most recent call last): File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run self._context.run(self._callback, *self._args) TypeError: 'NoneType' object is not callable

要测试代码,您必须具有运行本地SSH服务器,并为您的用户设置密钥文件。您可能需要更改_port变量。

我正在寻找异常的原因和/或程序版本,该版本在线程中需要较少的手动干预,并且可能仅使用单个事件循环。当我想await两件事时(例如在asyncio.gather调用中),我不知道如何实现后者。

我想在执行同步功能期间建立SSH SOCKs隧道(使用asyncssh)。功能完成后,我想拆除隧道并退出。显然有些异步...

python-3.x async-await python-asyncio python-multithreading
1个回答
1
投票
您的错误的直接原因是此行:
© www.soinside.com 2019 - 2024. All rights reserved.