由于 Spark 中没有对读取 excel 文件的开箱即用支持,所以我首先将 excel 文件读入 pandas 数据帧,然后尝试将 pandas 数据帧转换为 Spark 数据帧,但出现以下错误 (我使用的是spark 1.5.1)
import pandas as pd
from pandas import ExcelFile
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
pdf=pd.read_excel('/home/testdata/test.xlsx')
df = sqlContext.createDataFrame(pdf)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/spark-hadoop/python/pyspark/sql/context.py", line 406, in createDataFrame
rdd, schema = self._createFromLocal(data, schema)
File "/opt/spark/spark-hadoop/python/pyspark/sql/context.py", line 337, in _createFromLocal
data = [schema.toInternal(row) for row in data]
File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 541, in toInternal
return tuple(f.toInternal(v) for f, v in zip(self.fields, obj))
File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 541, in <genexpr>
return tuple(f.toInternal(v) for f, v in zip(self.fields, obj))
File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 435, in toInternal
return self.dataType.toInternal(obj)
File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 191, in toInternal
else time.mktime(dt.timetuple()))
AttributeError: 'datetime.time' object has no attribute 'timetuple'
有人知道如何解决吗?
我最好的猜测是,当您使用 Pandas 读取数据时,您的问题是“错误”解析
datetime
数据
以下代码“正常工作”:
import pandas as pd
from pandas import ExcelFile
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
pdf = pd.read_excel('test.xlsx', parse_dates=['Created on','Confirmation time'])
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.createDataFrame(data=pdf).collect()
[Row(Customer=1000935702, Country='TW', ...
请注意,您还有一个日期时间列
'Confirmation date'
,在您的示例中由 NaT
组成,因此使用您的短样本读取 RDD 不会出现问题,但是如果您碰巧在完整数据集中有一些数据,则您'我也必须关心该专栏。
显式定义模式可以解决问题。根据您的用例,您可以动态指定架构,如下面的代码片段所示;
from pyspark.sql.types import *
schema = StructType([
StructField(name,
TimestampType() if pd.api.types.is_datetime64_dtype(col) else
DateType() if pd.api.types.is_datetime64_any_dtype(col) else
DoubleType() if pd.api.types.is_float_dtype(col) else StringType(), True)
for name, col in zip(df.columns, df.dtypes)])
sparkDf = spark.createDataFrame(df, schema)