我正在使用 Python 开发一个应用程序,它使用 Azure Cosmos DB 作为主数据库。在应用程序中的某个时刻,我需要将批量数据(一批项目)插入到 Cosmos DB 中。到目前为止,我一直在使用 Azure Cosmos DB Python SDK for SQL API 与 Cosmos DB 进行通信;但是,它没有提供批量数据插入的方法。
据我了解,这些是这个SDK中提供的插入方法,这两种方法都只支持单个项目插入,在
for
循环中使用时会非常慢:
.upsert_item()
.create_item()
是否有另一种方法可以使用此 SDK 插入批量数据,而不是在
for
循环中使用上述方法?如果没有,是否有可以处理批量数据插入的 Azure REST API?
Cosmos DB 服务不通过其 REST API 提供此服务。 Bulk模式是在SDK层实现的,遗憾的是Python SDK还不支持bulk模式。但它确实支持异步 IO。这是一个可能对您有帮助的示例。
from azure.cosmos.aio import CosmosClient
import os
URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
DATABASE_NAME = 'myDatabase'
CONTAINER_NAME = 'myContainer'
async def create_products():
async with CosmosClient(URL, credential=KEY) as client:
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
for i in range(10):
await container.upsert_item({
'id': 'item{0}'.format(i),
'productName': 'Widget',
'productModel': 'Model {0}'.format(i)
}
)
更新:我记得在 Cosmos DB for Python SDK 中进行批量插入的另一种方法是使用存储过程。有一些关于如何编写这些的示例,包括演示传递数组的示例,这正是您想要做的。我还会看看有界执行,因为您也想实现它。您可以在这里学习如何编写它们,如何编写存储过程。那么如何注册和调用这里,如何使用存储过程。 注意: 这些只能在传递分区键值时使用,因此只能在逻辑分区内执行批处理。
更新 - 2024 年 6 月:
官方Azure Cosmos DB Python SDK 存储库中描述了针对此限制的解决方法:
虽然 SDK 支持事务批处理,但 Python SDK 尚未实现对批量请求的支持。您可以将异步客户端与我们开发的它基本上使用此并发示例一起使用,作为可能的解决方法的参考。
asyncio 方法、wait
和
create_task
来同时创建一批项目,这意味着当一个连接正在等待 IO(如等待数据到达)时,Python 可以将上下文切换到另一个连接连接并在那里取得进展。这是一个例子:
import asyncio
from azure.cosmos.aio import CosmosClient
async def main():
async with CosmosClient.from_connection_string(CONN_STR) as client:
db = client.get_database_client(DATABASE_NAME)
container = db.get_container_client(CONTAINER_NAME)
# A list of items to be upserted (bulk)
items: list[dict] = ...
await asyncio.wait(
[asyncio.create_task(container.upsert_item(item) for item in items)]
)
if __name__ == "__main__":
asyncio.run(main())