我想非常快速地连接到许多不同站点的列表。 我使用 asyncio 以异步方式执行此操作,现在想要 添加超时,如果连接响应时间过长,则应忽略连接。
我该如何实施?
import ssl
import asyncio
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
import time
@asyncio.coroutine
def run():
while True:
host = yield from q.get()
if not host:
break
with suppress(ssl.CertificateError):
reader, writer = yield from asyncio.open_connection(host[1], 443, ssl=True) #timout option?
reader.close()
writer.close()
@asyncio.coroutine
def load_q():
# only 3 entries for debugging reasons
for host in [[1, 'python.org'], [2, 'qq.com'], [3, 'google.com']]:
yield from q.put(host)
for _ in range(NUM):
q.put(None)
if __name__ == "__main__":
NUM = 1000
q = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(NUM))
start = time.time()
coros = [asyncio.async(run()) for i in range(NUM)]
loop.run_until_complete(load_q())
loop.run_until_complete(asyncio.wait(coros))
end = time.time()
print(end-start)
(旁注:有人知道如何优化这个吗?)
您可以将对
open_connection
的调用包装在 asyncio.wait_for
中,这样您就可以指定超时:
with suppress(ssl.CertificateError):
fut = asyncio.open_connection(host[1], 443, ssl=True)
try:
# Wait for 3 seconds, then raise TimeoutError
reader, writer = yield from asyncio.wait_for(fut, timeout=3)
except asyncio.TimeoutError:
print("Timeout, skipping {}".format(host[1]))
continue
请注意,当
TimeoutError
升高时,open_connection
协程也会被取消。如果您不希望它被取消(尽管我认为在这种情况下您确实希望它被取消),您可以将呼叫包装在asyncio.shield
中。
这是我用 Python 3.11 和 3.12 测试过的类似片段。它会发出“保持活动”而不是超时,但您可以删除
while True
来执行相同的操作。
T = TypeVar('T')
U = TypeVar('U')
async def emit_keepalive_chunks(
underlying: AsyncIterator[U],
timeout: float | None,
sentinel: T,
) -> AsyncIterator[U | T]:
# Emit an initial keepalive, in case our async chunks are enormous
yield sentinel
maybe_next: asyncio.Future[U] | None = None
try:
maybe_next = asyncio.ensure_future(underlying.__anext__())
while True:
try:
yield await asyncio.wait_for(asyncio.shield(maybe_next), timeout)
maybe_next = asyncio.ensure_future(underlying.__anext__())
except asyncio.TimeoutError:
yield sentinel
except StopAsyncIteration:
pass
finally:
if maybe_next is not None:
maybe_next.cancel()
技术说明:
asyncio.shield
。AsyncGenerator
?)