我正在查询 CosmosDb 集合,并且能够打印结果。 当我尝试将结果存储到 Spark DataFrame 时,它失败了。
以本网站为例:
按照上述链接中的具体步骤进行操作。另外,尝试以下
df = spark.createDataFrame(dataset)
这会引发此错误:
ValueError:某些类型推断后无法确定
值错误 回溯(最近一次调用最后一次)
createDataFrame 中的
在()
25 打印(数据集)
26
---> 27 df = Spark.createDataFrame(数据集)
28 df.show()
29/databricks/spark/python/pyspark/sql/session.py(自我,数据,架构,samplingRatio,verifySchema)
第443章
第808章 第809章:
--> 810 rdd, schema = self._createFromLocal(map(准备, 数据), schema)
第811章 第812章
/databricks/spark/python/pyspark/sql/session.py 在 _createFromLocal(self, data, schema)
第440章 440 第441章 --> 442 数据,模式 = self._wrap_data_schema(数据,模式)
但是,希望将其保存为 Spark DataFrame
任何帮助将不胜感激。 谢谢!!!>
为了推断字段类型,PySpark 会查看每个字段中的非无记录。如果字段只有 None 记录,PySpark 无法推断类型并会引发该错误。
手动定义架构将解决该问题
希望有帮助。
我发现您正在遵循我之前的答案,使用旧的 DocumentDB Python SDK 来查询 CosmosDB 文档以创建 PySpark DataFrame 对象。但你不能直接将
docs
方法的结果
client.ReadDocuments
data
传递给函数
SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
,因为数据类型不同,如下所示。函数
createDataFrame
需要一个参数
data
,该参数必须是RDD
或
list
或pandas.DataFrame
但是,我从
https://pypi.org/project/pydocumentdb/#files 下载了 pydocumentdb-2.3.3.tar.gz
的源代码并查看了代码文件 document_client.py
&
query_iterable.py
。
# from document_client.py
def ReadDocuments(self, collection_link, feed_options=None):
"""Reads all documents in a collection.
:param str collection_link:
The link to the document collection.
:param dict feed_options:
:return:
Query Iterable of Documents.
:rtype:
query_iterable.QueryIterable
"""
if feed_options is None:
feed_options = {}
return self.QueryDocuments(collection_link, None, feed_options)
# query_iterable.py
class QueryIterable(object):
"""Represents an iterable object of the query results.
QueryIterable is a wrapper for query execution context.
"""
因此,要解决您的问题,您必须首先通过从
pandas.DataFrame
方法迭代结果 Query Iterable of Documents
创建一个
ReadDocuments
对象,然后通过
spark.createDataFrame(pandas_df)
创建一个 PySpark DataFrame 对象。