嗨,我正在使用 AsyncIOMotorClient 对 mongoDb 进行异步数据库调用。 下面是我的代码。
xyz.py
async def insertMany(self,collection_name,documents_to_insert):
try:
collection=self.database[collection_name]
document_inserted = await collection.insert_many(documents_to_insert)
return document_inserted
except Exception:
raise
def insertManyFn(self,collection_name,documents_to_insert):
try:
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop1=asyncio.get_event_loop()
inserted_documents_count = loop1.run_until_complete(self.insertMany(collection_name, documents_to_insert))
if inserted_documents_count==len(documents_to_insert):
document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
except Exception:
raise
xyz1.py
t=Timer(10,xyz.insertManyFn,\
(collection_name,documents_to_insert))
t.start()
运行此程序时出现异常
RuntimeError: Task <Task pending coro=<xyz.insertMany() running at <my workspace location>/xyz.py:144> cb=[_run_until_complete_cb() at /usr/lib64/python3.5/asyncio/base_events.py:164]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib64/python3.5/asyncio/futures.py:431]> attached to a different loop
在上面的程序中,insertManyFn将在10秒后被调用并执行插入操作。但是当它第一次调用 insertMany 时,我遇到了异常。
我仍然希望我的 MotorClient 位于模块的顶层,所以这就是我所做的:我修补
MotorClient.get_io_loop
以始终返回当前循环。
import asyncio
import motor.core
from motor.motor_asyncio import (
AsyncIOMotorClient as MotorClient,
)
# MongoDB client
client = MotorClient('mongodb://localhost:27017/test')
client.get_io_loop = asyncio.get_running_loop
# The current database ("test")
db = client.get_default_database()
# async context
async def main():
posts = db.posts
await posts.insert_one({'title': 'great success!')
# Run main()
asyncio.run(main())
根据 documentation,如果您不使用默认的 ioloop,则应向
AsyncIOMotorClient
传递一个 ioloop。 创建事件循环后尝试创建客户端:
loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = AsyncIOMotorClient(io_loop=loop)
我修改了代码并且它可以工作。
def insertManyFn(self,loop,collection_name,documents_to_insert):
try:
inserted_documents_count = loop.run_until_complete(self.insertMany(event_loop,collection_name, documents_to_insert))
if len(inserted_documents_count)==len(documents_to_insert):
document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
except Exception:
raise
loop=asyncio.get_event_loop()
t=Timer(10,self.xyz.insertManyFn,(loop,collection_name,documents_to_insert))
t.start()
说明-我正在使用 python 线程计时器,它创建自己的线程以在一定时间后执行函数。因此,在这个线程中,我正在获取事件循环,这不应该是正确的方法,它应该首先获取事件循环并在其中创建一个计时器线程。 我想这是唯一的原因。
@kolypto 的答案拯救了我的夜晚,但是如果你想通过 Motor 客户端修补一些框架或 ORM 代码,你需要修补客户端,在我的例子中,我在
fastapi_contrib
上使用 pytest
MongoDB 模型,我必须修补 AgnosticClient
类。
import asyncio
from motor.core import AgnosticClient
AgnosticClient.get_io_loop = asyncio.get_running_loop
对 kolypto 发表评论。当你修补 get_io_loop 方法时,你应该确保有一个正在运行的事件循环。您可以将其放在其他模块创建事件循环之后。例如使用uvicorn时,应该在生命周期内打补丁;使用scrapy时,应该在spider_start信号等中修补它
更重要的是,为了防止在 get_event_loop 方法中进行额外的检查,你可以这样做:
client._io_loop = asyncio.get_event_loop()