当我尝试使用方法
toPandas
将 pyspark 数据帧转换为 pandas 数据帧时,出现以下错误。我不明白错误的原因:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/tmp/ipykernel_64705/3870041712.py in <module>
----> 1 df_who.limit(10).toPandas()
/opt/miniforge/miniforge/envs/jupyterlab/lib/python3.7/site-packages/pyspark/sql/dataframe.py in toPandas(self)
2130 if len(batches) > 0:
2131 table = pyarrow.Table.from_batches(batches)
-> 2132 pdf = table.to_pandas()
2133 pdf = _check_dataframe_convert_date(pdf, self.schema)
2134 return _check_dataframe_localize_timestamps(pdf, timezone)
/opt/miniforge/miniforge/envs/jupyterlab/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib._PandasConvertible.to_pandas()
/opt/miniforge/miniforge/envs/jupyterlab/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.Table._to_pandas()
/opt/miniforge/miniforge/envs/jupyterlab/lib/python3.7/site-packages/pyarrow/pandas_compat.py in table_to_blockmanager(options, table, categories, ignore_metadata, types_mapper)
786
787 _check_data_column_metadata_consistency(all_columns)
--> 788 columns = _deserialize_column_index(table, all_columns, column_indexes)
789 blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
790
/opt/miniforge/miniforge/envs/jupyterlab/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _deserialize_column_index(block_table, all_columns, column_indexes)
901
902 # ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
--> 903 columns = _flatten_single_level_multiindex(columns)
904
905 return columns
/opt/miniforge/miniforge/envs/jupyterlab/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _flatten_single_level_multiindex(index)
1142 # Cheaply check that we do not somehow have duplicate column names
1143 if not index.is_unique:
-> 1144 raise ValueError('Found non-unique column index')
1145
1146 return pd.Index(
ValueError: Found non-unique column index
您可以检查pyspark数据帧的列,根据您的错误,您的数据帧中有重复的列名称。
您可以使用
检查表中是否有重复的名称assert len(set(pyarrow_table.schema.names)) == len(pyarrow_table.schema.names), list(enumerate(pyarrow_table.schema.names))
然后你可以找到坏的列,并为每列使用这样的一行来删除它们
pyarrow_table = pyarrow_table.remove_column(index_of_duplicate_column)
或者,您可以重命名重复的列。这是重命名所有列的解决方案
from collections import Counter
seen_col_names = Counter()
for i, col_name in enumerate(pyarrow_table.schema.names):
if seen_col_names[col_name] > 0:
pyarrow_table = pyarrow_table.set_column(i, f'{col_name}_{seen_col_names[i]}', pyarrow_table[i])
seen_col_names.update([col_name])