如何使用Python将批量数据插入Cosmos DB?

问题描述 投票:0回答:2

我正在使用 Python 开发一个应用程序,它使用 Azure Cosmos DB 作为主数据库。在应用程序中的某个时刻,我需要将批量数据(一批项目)插入到 Cosmos DB 中。到目前为止,我一直在使用 Azure Cosmos DB Python SDK for SQL API 与 Cosmos DB 进行通信;但是,它没有提供批量数据插入的方法。

据我了解,这些是这个SDK中提供的插入方法,这两种方法都只支持单个项目插入,在

for
循环中使用时会非常慢:

  • .upsert_item()
  • .create_item()

是否有另一种方法可以使用此 SDK 插入批量数据,而不是在

for
循环中使用上述方法?如果没有,是否有可以处理批量数据插入的 Azure REST API?

python azure azure-cosmosdb azure-cosmosdb-sqlapi
2个回答
3
投票

Cosmos DB 服务不通过其 REST API 提供此服务。 Bulk模式是在SDK层实现的,遗憾的是Python SDK还不支持bulk模式。但它确实支持异步 IO。这是一个可能对您有帮助的示例。

from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
DATABASE_NAME = 'myDatabase'
CONTAINER_NAME = 'myContainer'

async def create_products():
    async with CosmosClient(URL, credential=KEY) as client:
        database = client.get_database_client(DATABASE_NAME)
        container = database.get_container_client(CONTAINER_NAME)
        for i in range(10):
            await container.upsert_item({
                'id': 'item{0}'.format(i),
                'productName': 'Widget',
                'productModel': 'Model {0}'.format(i)
            }
        )

更新:我记得在 Cosmos DB for Python SDK 中进行批量插入的另一种方法是使用存储过程。有一些关于如何编写这些的示例,包括演示传递数组的示例,这正是您想要做的。我还会看看有界执行,因为您也想实现它。您可以在这里学习如何编写它们,如何编写存储过程。那么如何注册和调用这里,如何使用存储过程注意: 这些只能在传递分区键值时使用,因此只能在逻辑分区内执行批处理。


0
投票

更新 - 2024 年 6 月:

官方

Azure Cosmos DB Python SDK 存储库中描述了针对此限制的解决方法

虽然 SDK 支持事务批处理,但 Python SDK 尚未实现对批量请求的支持。您可以将异步客户端与我们开发的

此并发示例一起使用,作为可能的解决方法的参考。

它基本上使用

asyncio 方法、wait

create_task
 来同时创建一批项目,这意味着当一个连接正在等待 IO(如等待数据到达)时,Python 可以将上下文切换到另一个连接连接并在那里取得进展。

这是一个例子:

import asyncio from azure.cosmos.aio import CosmosClient async def main(): async with CosmosClient.from_connection_string(CONN_STR) as client: db = client.get_database_client(DATABASE_NAME) container = db.get_container_client(CONTAINER_NAME) # A list of items to be upserted (bulk) items: list[dict] = ... await asyncio.wait( [asyncio.create_task(container.upsert_item(item) for item in items)] ) if __name__ == "__main__": asyncio.run(main())
    
© www.soinside.com 2019 - 2024. All rights reserved.