我想在执行同步功能期间建立SSH SOCKs隧道(使用asyncssh
)。功能完成后,我想拆除隧道并退出。
显然,必须使异步功能保持正常运行,以便保持隧道正常工作,因此重要的是conn.wait_closed()
和同步功能是同时执行的。所以我很确定我确实需要第二个线程。我首先使用ThreadPoolExecutor
和run_in_executor
尝试了一些更聪明的方法,但最终遇到了下面的糟糕透顶的变体。#! /usr/bin/env python3
import traceback
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
import asyncio, asyncssh, sys
_server="127.0.0.1"
_port=22
_proxy_port=8080
async def run_client():
conn = await asyncio.wait_for(
asyncssh.connect(
_server,
port=_port,
options=asyncssh.SSHClientConnectionOptions(client_host_keysign=True),
),
10,
)
listener = await conn.forward_socks('127.0.0.1', _proxy_port)
return conn
async def do_stuff(func):
try:
conn = await run_client()
print("SSH tunnel active")
def start_loop(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"worker loop: {e}")
async def thread_func():
ret=await func()
print("Func done - tearing done worker thread and SSH connection")
conn.close()
# asyncio.get_event_loop().stop()
return ret
func_loop = asyncio.new_event_loop()
func_thread = Thread(target=start_loop, args=(func_loop,))
func_thread.start()
print("thread started")
fut = asyncio.run_coroutine_threadsafe(thread_func(), func_loop)
print(f"fut scheduled: {fut}")
done = await asyncio.gather(asyncio.wrap_future(fut), conn.wait_closed())
print("wait done")
for ret in done:
print(f"ret={ret}")
# Canceling pending tasks and stopping the loop
# asyncio.gather(*asyncio.Task.all_tasks()).cancel()
print("stopping func_loop")
func_loop.call_soon_threadsafe(func_loop.stop())
print("joining func_thread")
func_thread.join()
print("joined func_thread")
except (OSError, asyncssh.Error) as exc:
sys.exit('SSH connection failed: ' + str(exc))
except (Exception) as exc:
sys.exit('Unhandled exception: ' + str(exc))
traceback.print_exc()
async def just_wait():
print("starting just_wait")
input()
print("ending just_wait")
return 42
asyncio.get_event_loop().run_until_complete(do_stuff(just_wait))
它实际上“正确”地“正常”工作,直到结束join
辅助线程时出现异常。我猜想是因为我做的事情不是线程安全的。
Exception in callback None()
handle: <Handle>
Traceback (most recent call last):
File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
TypeError: 'NoneType' object is not callable
要测试代码,您必须具有运行本地SSH服务器,并为您的用户设置密钥文件。您可能需要更改_port
变量。
我正在寻找异常的原因和/或程序版本,该版本在线程中需要较少的手动干预,并且可能仅使用单个事件循环。当我想await
两件事时(例如在asyncio.gather
调用中),我不知道如何实现后者。
我想在执行同步功能期间建立SSH SOCKs隧道(使用asyncssh)。功能完成后,我想拆除隧道并退出。显然有些异步...