如何使用 asyncio 添加连接超时?

问题描述 投票:0回答:2

我想非常快速地连接到许多不同站点的列表。 我使用 asyncio 以异步方式执行此操作,现在想要 添加超时,如果连接响应时间过长,则应忽略连接。

我该如何实施?

import ssl
import asyncio
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
import time


@asyncio.coroutine
def run():
    while True:
        host = yield from q.get()
        if not host:
            break

        with suppress(ssl.CertificateError):
            reader, writer = yield from asyncio.open_connection(host[1], 443, ssl=True) #timout option?
            reader.close()
            writer.close()


@asyncio.coroutine
def load_q():
    # only 3 entries for debugging reasons
    for host in [[1, 'python.org'], [2, 'qq.com'], [3, 'google.com']]:
        yield from q.put(host)
    for _ in range(NUM):
        q.put(None)


if __name__ == "__main__":
    NUM = 1000
    q = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.set_default_executor(ThreadPoolExecutor(NUM))

    start = time.time()
    coros = [asyncio.async(run()) for i in range(NUM)]
    loop.run_until_complete(load_q())
    loop.run_until_complete(asyncio.wait(coros))
    end = time.time()
    print(end-start)

(旁注:有人知道如何优化这个吗?)

python python-3.x asynchronous timeout python-asyncio
2个回答
29
投票

您可以将对

open_connection
的调用包装在
asyncio.wait_for
中,这样您就可以指定超时:

    with suppress(ssl.CertificateError):
        fut = asyncio.open_connection(host[1], 443, ssl=True)
        try:
            # Wait for 3 seconds, then raise TimeoutError
            reader, writer = yield from asyncio.wait_for(fut, timeout=3)
        except asyncio.TimeoutError:
            print("Timeout, skipping {}".format(host[1]))
            continue

请注意,当

TimeoutError
升高时,
open_connection
协程也会被取消。如果您不希望它被取消(尽管我认为在这种情况下您确实希望它被取消),您可以将呼叫包装在
asyncio.shield
中。


0
投票

这是我用 Python 3.11 和 3.12 测试过的类似片段。它会发出“保持活动”而不是超时,但您可以删除

while True
来执行相同的操作。

T = TypeVar('T')
U = TypeVar('U')


async def emit_keepalive_chunks(
        underlying: AsyncIterator[U],
        timeout: float | None,
        sentinel: T,
) -> AsyncIterator[U | T]:
    # Emit an initial keepalive, in case our async chunks are enormous
    yield sentinel

    maybe_next: asyncio.Future[U] | None = None

    try:
        maybe_next = asyncio.ensure_future(underlying.__anext__())
        while True:
            try:
                yield await asyncio.wait_for(asyncio.shield(maybe_next), timeout)
                maybe_next = asyncio.ensure_future(underlying.__anext__())
            except asyncio.TimeoutError:
                yield sentinel

    except StopAsyncIteration:
        pass

    finally:
        if maybe_next is not None:
            maybe_next.cancel()

技术说明:

  • 我认为 Python 3.7(在另一个答案中提到)的改变是添加
    asyncio.shield
  • 我认为输出的类型提示并不严格正确(这是一个
    AsyncGenerator
    ?)
© www.soinside.com 2019 - 2024. All rights reserved.