我正在尝试使用下面的代码插入顶点和边。
# Load data from Databricks
df = spark.sql("SELECT * FROM MyTable)")
# Transform data into vertices and edges
vertices = df.selectExpr("Client","Child","Parent","Relation","Name")
edges = df.selectExpr("Child","Parent","Relation")
# Write vertices to Cosmos DB
for row in vertices.collect():
client.submit(f"g.addV('Party').property('Id', '{row.Child}').property('Name', '{row.Name}')
client.submit(f"g.addV('Party').property('Id', '{row.Parent}').property('Name', '{row.Name}')
# Write edges to Cosmos DB
for row in edges.collect():
client.submit(f"g.V().has('Party', 'Id','{row.Child}').addE('knows').to(g.V().has('Party', 'Id','{row.Parent}')).property('Relation', '{row.Relation}')")
对于加载大约 800 条记录,它运行良好,但一旦记录数量增加,就会出现错误
"ExceptionMessage : Microsoft.Azure.Cosmos.CosmosException : Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: ab30cd85-000; Reason: (\r\nErrors : [\r\n "Request rate is large."
并且插入了 800 条记录,除此之外什么也没有。
我尝试增加“请求单位”,但这没有帮助。
我正在考虑使用 for 循环,但不知道如何循环它,所以如果有人也可以帮助解决这个问题。
"ExceptionMessage : Microsoft.Azure.Cosmos.CosmosException : Response status code does not indicate success: TooManyRequests (429); Substatus: 3200; ActivityId: ab30cd85-000; Reason: (\r\nErrors : [\r\n "Request rate is large."
当您使用 Cosmos DB 时会出现上述错误,您可以使用的资源量(请求单位或 RU)是有限制的。 如果您的应用程序超出此限制,可用吞吐量、预配置吞吐量将耗尽,这可能导致请求受到速率限制并引发异常。
这些异常(称为 429 错误)是在客户端使用的资源(每秒 RU)多于预配置限制时发生的。
为了解决这个问题,您可以通过增加 RU/s 来实现:
Spark 扩展可用于管理速率限制。
我已经安装了libaray 从 Spark 连接到 Azure Cosmos DB for Apache Cassandra
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
cosmos_config = {
"spark.cosmosdb.account.endpoint": "<YOUR_COSMOS_DB_ENDPOINT>",
"spark.cosmosdb.account.key": "<YOUR_COSMOS_DB_KEY>",
"spark.cosmosdb.keyspace": "YourKeyspace",
"spark.cosmosdb.table": "YourTable",
"spark.cosmosdb.cassandra.connection.timeout_ms": "5000",
"spark.cosmosdb.cassandra.read.timeout_ms": "10000",
df = spark.sql("SELECT * FROM MyTable")
df.foreachPartition(lambda partition: process_partition_with_rate_limit(partition, cosmos_config, batch_size=50, delay=1.0))