我试图在pyspark中向数据框插入多行。这是我的代码:
首先我导入包:
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
然后,我定义connectionPolicy:
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = {"Western Europe"}
凭证:
masterKey = 'yourmasterkey'
host = 'https://testcosmosdbasdada.documents.azure.com:443/'
client = document_client.DocumentClient(host,{'masterKey': masterKey}, connectionPolicy)
然后我定义数据库和集合的名称:
databaseId = 'pruebadb'
collectionId = 'collection1'
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId
注意:我应该在Azure套件中创建一个具有此名称的数据库和集合。然后我可以使用或CreateDocument或UpsertDocument。我将使用UpsertDocument。
client.UpsertDocument(collLink,{'attribute1': 4}, options=None)
这有效!正如您在文档中看到的那样:https://docs.microsoft.com/en-us/python/api/pydocumentdb/pydocumentdb.document_client.documentclient?view=azure-python#upsertdocument
但是我不知道如何一次插入一些行。这些证明不起作用:
1)
client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)
'list'对象没有属性'get'
2)
client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)
'list'对象没有属性'get'
3)
df = spark.read.csv('/FileStore/tables/points.csv', sep=';', header=True)
client.UpsertDocument(collLink, df, options=None)
'list'对象没有属性'get'
这些证明不起作用,因为我需要一个dict作为UpsertDocument()的第二个参数。
为了做到这一点,有pydocumentdb或其他python库的任何功能吗?
使用pyspark将数据从数据框插入CosmosDB的最佳性能方法如何?
您可以使用Spark MongoDB连接器提供的DataFrameWriter
API,而不是依赖于CosmosDB API。
以下代码应该有效:
df.write.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri", "<CosmosDB URI>")\
.option("database","CosmosDB Database Name")\
.option("collection","CosmosDB Collection Name")\
.mode("append").save()
您需要通过在spark-submit命令中使用--jars
参数或--packages
参数将Spark-MongoDB连接器添加到类路径中。
例如:spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 <YOUR_SRC_FILE>.py
有关DataFrameWriter API的更多信息,请访问:http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter
感谢Sivaprasanna Sethuraman,我一直在调查。没有必要使用MongoDB。最后我发现:https://github.com/Azure/azure-cosmosdb-spark
如果需要在非空数据框上插入,请注意使用mode append:
writeConfig = {
"Endpoint" : "yourhostcosmosdb",
"Masterkey" : "yourmasterkey",
"Database" : "pruebadb",
"Collection" : "collection1",
}
df.write.format("com.microsoft.azure.cosmosdb.spark").mode('append').options(**writeConfig).save()