如何读取 Databricks 中的 Azure CosmosDb 集合并写入 Spark DataFrame

问题描述 投票:0回答:2

我正在查询 CosmosDb 集合,并且能够打印结果。 当我尝试将结果存储到 Spark DataFrame 时,它失败了。

以本网站为例:

如何用python从Azure的CosmosDB读取数据

按照上述链接中的具体步骤进行操作。另外,尝试以下

 df = spark.createDataFrame(dataset)

这会引发此错误:

ValueError:某些类型推断后无法确定

值错误 回溯(最近一次调用最后一次)
在()
25 打印(数据集)
26
---> 27 df = Spark.createDataFrame(数据集)
28 df.show()
29

createDataFrame 中的

/databricks/spark/python/pyspark/sql/session.py(自我,数据,架构,samplingRatio,verifySchema)
第808章 第809章:
--> 810 rdd, schema = self._createFromLocal(map(准备, 数据), schema)
第811章 第812章
/databricks/spark/python/pyspark/sql/session.py 在 _createFromLocal(self, data, schema)
第440章 440 第441章 --> 442 数据,模式 = self._wrap_data_schema(数据,模式)

第443章


但是,希望将其保存为 Spark DataFrame

任何帮助将不胜感激。 谢谢!!!>

为了推断字段类型,PySpark 会查看每个字段中的非无记录。如果字段只有 None 记录,PySpark 无法推断类型并会引发该错误。

手动定义架构将解决该问题

python azure azure-cosmosdb
2个回答
0
投票

希望有帮助。

我发现您正在遵循我之前的答案,使用旧的 DocumentDB Python SDK 来查询 CosmosDB 文档以创建 PySpark DataFrame 对象。但你不能直接将 
docs

方法的结果

client.ReadDocuments

0
投票
data

传递给函数

SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
,因为数据类型不同,如下所示。

函数createDataFrame
需要一个参数
data
,该参数必须是

RDD

list
pandas.DataFrame



但是,我从

https://pypi.org/project/pydocumentdb/#files 下载了 enter image description herepydocumentdb-2.3.3.tar.gz

的源代码并查看了代码文件 document_client.py

 & 
query_iterable.py # from document_client.py def ReadDocuments(self, collection_link, feed_options=None): """Reads all documents in a collection. :param str collection_link: The link to the document collection. :param dict feed_options: :return: Query Iterable of Documents. :rtype: query_iterable.QueryIterable """ if feed_options is None: feed_options = {} return self.QueryDocuments(collection_link, None, feed_options) # query_iterable.py class QueryIterable(object): """Represents an iterable object of the query results. QueryIterable is a wrapper for query execution context. """

因此,要解决您的问题,您必须首先通过从
pandas.DataFrame
方法迭代结果

Query Iterable of Documents
 创建一个 
ReadDocuments

对象,然后通过

spark.createDataFrame(pandas_df)
创建一个 PySpark DataFrame 对象。
    

© www.soinside.com 2019 - 2024. All rights reserved.