从pyspark到cosmosdb插入多行

问题描述 投票:0回答:2

我试图在pyspark中向数据框插入多行。这是我的代码:

首先我导入包:

import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents

然后,我定义connectionPolicy:

connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = {"Western Europe"}

凭证:

masterKey = 'yourmasterkey'
host = 'https://testcosmosdbasdada.documents.azure.com:443/'
client = document_client.DocumentClient(host,{'masterKey': masterKey}, connectionPolicy)

然后我定义数据库和集合的名称:

databaseId = 'pruebadb'
collectionId = 'collection1'

dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

注意:我应该在Azure套件中创建一个具有此名称的数据库和集合。然后我可以使用或CreateDocument或UpsertDocument。我将使用UpsertDocument。

client.UpsertDocument(collLink,{'attribute1': 4}, options=None)

这有效!正如您在文档中看到的那样:https://docs.microsoft.com/en-us/python/api/pydocumentdb/pydocumentdb.document_client.documentclient?view=azure-python#upsertdocument

但是我不知道如何一次插入一些行。这些证明不起作用:

1)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

'list'对象没有属性'get'

2)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

'list'对象没有属性'get'

3)

df = spark.read.csv('/FileStore/tables/points.csv', sep=';', header=True)
client.UpsertDocument(collLink, df, options=None)

'list'对象没有属性'get'

这些证明不起作用,因为我需要一个dict作为UpsertDocument()的第二个参数。

为了做到这一点,有pydocumentdb或其他python库的任何功能吗?

使用pyspark将数据从数据框插入CosmosDB的最佳性能方法如何?

pyspark azure-cosmosdb
2个回答
1
投票

您可以使用Spark MongoDB连接器提供的DataFrameWriter API,而不是依赖于CosmosDB API。

以下代码应该有效:

df.write.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "<CosmosDB URI>")\
        .option("database","CosmosDB Database Name")\
        .option("collection","CosmosDB Collection Name")\
        .mode("append").save()

您需要通过在spark-submit命令中使用--jars参数或--packages参数将Spark-MongoDB连接器添加到类路径中。

例如:spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 <YOUR_SRC_FILE>.py

有关DataFrameWriter API的更多信息,请访问:http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter


0
投票

感谢Sivaprasanna Sethuraman,我一直在调查。没有必要使用MongoDB。最后我发现:https://github.com/Azure/azure-cosmosdb-spark

如果需要在非空数据框上插入,请注意使用mode append:

writeConfig = {
 "Endpoint" : "yourhostcosmosdb",
 "Masterkey" : "yourmasterkey",
 "Database" : "pruebadb",
 "Collection" : "collection1",
}
df.write.format("com.microsoft.azure.cosmosdb.spark").mode('append').options(**writeConfig).save()
© www.soinside.com 2019 - 2024. All rights reserved.