python-asyncio 相关问题

此标记用于asyncio Python包,它提供了编写单线程并发代码的机制。 asyncio包提供从Python 3.4开始的异步I / O,事件循环,协同程序和任务。

Python 异步等待和通知

我正在尝试做类似 C# ManualResetEvent 的事情,但是是在 Python 中。 我尝试在 python 中执行此操作,但似乎不起作用。 导入异步 cond = asyncio.Condition() 异步 def main(...

回答 2 投票 0

使用asyncio处理多个IO阻塞服务

这不一定是关于 Python 的问题,它可能是关于一般异步编程的问题。简而言之,我有未定义数量的对象,每个对象都执行......

回答 2 投票 0

Python |异步IO |类型错误:需要一个协程

我正在尝试使用 asyncio 进行 python 协程编程。这是我的代码。 导入异步 异步 def coro_function(): 返回 2 + 2 异步 def get(): 返回等待 coro_function() 打印(异步。

回答 2 投票 0

Azure 功能损坏了模块?适用于一个功能应用程序(开发),不适用于另一功能应用程序(客户端)

所以我有一个用于CMS的python v2函数应用程序,它可以与以下包(requirements.txt)一起使用 - azure 存储 blob 皮茨 python-dotenv azure 搜索文档 azure 存储文件共享 aiohttp

回答 1 投票 0

如何使用 asyncio.gather 运行热解图

来自热解图导入客户端,过滤器 导入异步 应用程序=客户端(“Myapi”) 异步 def heart_beat(): 而真实: 等待 asyncio.sleep(30) 等待beat_function() 异步定义

回答 1 投票 0

运行时错误:任务附加到不同的循环

嗨,我正在使用 AsyncIOMotorClient 对 mongoDb 进行异步数据库调用。 下面是我的代码。 xyz.py 异步 def insertMany(self,collection_name,documents_to_insert): 尝试: 集合=self.dat...

回答 5 投票 0

Azure IoT 中心上并发设备连接和遥测的支持请求

azure-iot-device:2.14.0 azure-iot-hub:2.6.1 asyncio:3.4.3 Python 版本:3.10.12 Azure IoTHub 详细信息:层: S1 问题描述: 我使用 Azure IoT S 开发了一个 Python 脚本...

回答 1 投票 0

Interactive Broker 的 API (IB_sync) 跳过来自 Polygon WebSocket Stream 的订单

我只是想根据实时 Polygon.io 数据流中的特定条件自动化盈透证券中的一些交易(请参见下面的示例)。我在

回答 1 投票 0

无法使用 ib_insync 下whatIfOrder

我尝试使用 ib_insync 放置假设订单,但在调用 WhatIfOrder 时收到与 asyncio 事件循环相关的错误: 该事件循环已经在运行。 改为常规订单...

回答 1 投票 0

在 AsyncGenerator/AsyncIterator 中检查 __contains__

在同步Python中,我可以使用in运算符检查迭代器是否包含值: def my_iterator(): 打印('A') 产量 1 打印('B') 产量 2 打印('C') 产量 3 打印...

回答 1 投票 0

不用asyncio编写异步代码

我正在尝试编写相当于繁忙循环的代码 异步 def wait_for_callback(): 而真实: 如果

回答 1 投票 0

Python-binance 和 asyncio

遇到奇怪的问题。我正在尝试从 Binance 获取一些数据(使用 python-binance 包)。我的非常简单的代码: 导入异步 从 binance.client 导入 AsyncClient 异步 def get_data(

回答 1 投票 0

如何检测任务组取消任务

给定任务组和正在运行的任务数量,根据任务组文档,如果任何任务引发错误,则组中的其余任务将被取消。 如果其中一些任务需要执行清理操作...

回答 1 投票 0

FastAPI 和 Pytest。将 Future 附加到不同的循环

问题是这样的。当我尝试对 FastAPI 应用程序进行测试时,出现错误。 失败的测试/test_users_api.py::test_create_jwt - 运行时错误:任务 问题是这样的。当我尝试对 FastAPI 应用程序进行测试时,出现错误。 FAILED tests/test_users_api.py::test_create_jwt - RuntimeError: Task <Task pending name='Task-153' coro=<test_create_jwt() running at /backend/tests/test_users_api.py:22> cb=[_run_until_complete_cb() at /usr/local/lib/python3.12/asyncio/base_events.py:182]> got Future <Future pending cb=[BaseProtocol._on_waiter_completed()]> attached to a different loop FAILED tests/test_users_api.py::test_create_user_with_valid_data - RuntimeError: Task <Task pending name='Task-179' coro=<test_create_user_with_valid_data() running at /backend/tests/test_users_api.py:122> cb=[_run_until_complete_cb() at /usr/local/lib/python3.12/asyncio/base_events.py:182]> got Future <Future pending cb=[BaseProtocol._on_waiter_completed()]> attached to a different loop 在测试我通过 sqlalchemy 使用数据库的那些函数时发生错误。 这是我的数据库会话和 FastAPI 的 dependency_override 的代码: @pytest_asyncio.fixture(scope="session") async def engine(app_database_url, migrations) -> AsyncEngine: engine = create_async_engine(app_database_url) yield engine await engine.dispose() @pytest_asyncio.fixture async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: async with engine.connect() as connection: transaction = await connection.begin() session = AsyncSession(bind=connection, join_transaction_mode="create_savepoint", expire_on_commit=False) try: yield session except Exception: await session.rollback() raise finally: await session.close() await transaction.rollback() await connection.close() @pytest.fixture def session_override(db_session): async def get_session_override() -> AsyncGenerator[AsyncSession, None]: yield db_session main_app.dependency_overrides[db_helper.get_session] = get_session_override 我的异步客户端 @pytest_asyncio.fixture async def client(session_override) -> AsyncGenerator[AsyncClient, None]: async with AsyncClient(app=main_app, base_url="http://localhost:8000") as ac: yield ac 我尝试为 event_loop 制作固定装置 @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() 但这并没有改变现状 我的 pyproject.toml 部分与 pytest 设置 [tool.pytest.ini_options] addopts = [ "-vvv", "--cov=app", "--cov-report=term-missing", "--cov-config=pyproject.toml" ] python_files = "test_*.py" filterwarnings = [ "ignore::DeprecationWarning", "ignore::SyntaxWarning", ] pythonpath = [ ".", "backend", ] asyncio_mode="auto" asyncio_default_fixture_loop_scope = "session" 项目 github https://github.com/DenisMaslennikov/to-do-list-FastAPI 我认为你必须在你的conftest.py(或其他地方)中做这样的事情,所以首先你创建一个新的loop,然后用asyncio设置它。 @pytest.fixture(scope='session') def event_loop(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) yield loop loop.close()

回答 1 投票 0

asyncio.run() 与 asyncio.get_event_loop().run_until_complete()

我需要从同步函数中调用异步函数。 有人可以告诉我以下内容:sync_fn_a 和sync_fn_b 之间的显着区别是什么,我什么时候会选择...

回答 1 投票 0

错误:ThreadPoolExecutor 中的事件循环已关闭

在 .py 模块中我创建了这个函数 从 utils.telegram_utils 导入 telegram_message ... 计数器 = 计数(1) def open_websites_with_progress(url): 效果 =

回答 1 投票 0

为什么自定义协程调用时不会调用_run_once?

我做了什么 异步 def inner() -> 无: 打印(“内部”) 异步 def main() -> 无: print("主要开始") 等待内部() print("主要结束") 如果__name__ ...

回答 1 投票 0

关闭 httpx 客户端会导致“运行时错误:事件循环已关闭”

我需要在代码中维护一个持久的 httpx 客户端,以便在应用程序的整个生命周期中利用其连接池。下面是我的实现的简化版本: 导入异步 ...

回答 1 投票 0

TypeError:“协程”对象不支持 pytest_asyncio 中的异步上下文管理器协议

我正在尝试对使用 aiohttp.ClientSession 执行 HTTPS 请求的类执行异步测试。 我遇到的问题是,当我用 session.get 调用 async 作为秋季响应时......

回答 1 投票 0

使用队列时Python Asyncio 阻塞

我正在尝试将asyncio与sync(将是python程序的其余部分)和async块一起使用,并让sync块通过asyncio.queues发送数据。 无需排队,一切都有效......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.