插入顶点和边时出现错误“TooManyRequests”

问题描述 投票:0回答:1

我正在尝试使用下面的代码插入顶点和边。

# Load data from Databricks
df = spark.sql("SELECT * FROM MyTable)")

# Transform data into vertices and edges
vertices = df.selectExpr("Client","Child","Parent","Relation","Name")

edges = df.selectExpr("Child","Parent","Relation")

# Write vertices to Cosmos DB 

for row in vertices.collect():

    client.submit(f"g.addV('Party').property('Id', '{row.Child}').property('Name', '{row.Name}')

    client.submit(f"g.addV('Party').property('Id', '{row.Parent}').property('Name', '{row.Name}')

# Write edges to Cosmos DB

for row in edges.collect():

    client.submit(f"g.V().has('Party', 'Id','{row.Child}').addE('knows').to(g.V().has('Party', 'Id','{row.Parent}')).property('Relation', '{row.Relation}')")  

对于加载大约 800 条记录,它运行良好,但一旦记录数量增加,就会出现错误

 "ExceptionMessage : Microsoft.Azure.Cosmos.CosmosException : Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: ab30cd85-000; Reason: (\r\nErrors : [\r\n  "Request rate is large."

并且插入了 800 条记录,除此之外什么也没有。

我尝试增加“请求单位”,但这没有帮助。

我正在考虑使用 for 循环,但不知道如何循环它,所以如果有人也可以帮助解决这个问题。

azure-cosmosdb azure-databricks azure-cosmosdb-gremlinapi
1个回答
0
投票
"ExceptionMessage : Microsoft.Azure.Cosmos.CosmosException : Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: ab30cd85-000; Reason: (\r\nErrors : [\r\n  "Request rate is large."

当您使用 Cosmos DB 时会出现上述错误,您可以使用的资源量(请求单位或 RU)是有限制的。 如果您的应用程序超出此限制,可用吞吐量、预配置吞吐量将耗尽,这可能导致请求受到速率限制并引发异常。

这些异常(称为 429 错误)是在客户端使用的资源(每秒 RU)多于预配置限制时发生的。

为了解决这个问题,您可以通过增加 RU/s 来实现:

enter image description here

Spark 扩展可用于管理速率限制。

我已经安装了libaray 从 Spark 连接到 Azure Cosmos DB for Apache Cassandra

com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0

cosmos_config = {
    "spark.cosmosdb.account.endpoint": "<YOUR_COSMOS_DB_ENDPOINT>",
    "spark.cosmosdb.account.key": "<YOUR_COSMOS_DB_KEY>",
    "spark.cosmosdb.keyspace": "YourKeyspace",
    "spark.cosmosdb.table": "YourTable",
    "spark.cosmosdb.cassandra.connection.timeout_ms": "5000",
    "spark.cosmosdb.cassandra.read.timeout_ms": "10000",
    df = spark.sql("SELECT * FROM MyTable") 
df.foreachPartition(lambda partition: process_partition_with_rate_limit(partition, cosmos_config, batch_size=50, delay=1.0))

参考:在 Azure Cosmos DB 容器上配置标准(手动)吞吐量 - NoSQL 的 API

处理 Azure Cosmos DB Cassandra API 中的速率限制和多区域故障转移

© www.soinside.com 2019 - 2024. All rights reserved.