[pyspark]合并镶木地板文件时,MutableFloat 无法转换为 MutableDouble

问题描述 投票:0回答:1

我的 S3 文件夹中有许多镶木地板文件。每一个都有“A”、“B”、“C”列。 “A”和“B”列具有字符串数据类型,但“C”列在某些中具有

Float
类型,在其他中具有
Double
类型。我想合并这些镶木地板文件并创建更大的文件。我正在使用 pyspark 在 AWS Glue 中进行合并。

当我尝试使用

将 Dataframe 写入 S3 时
     output_s3_path = "s3://new_path/"
     df.write.mode("overwrite").parquet(output_s3_path)

我收到错误

Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableFloat cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableDouble at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setDouble(SpecificInternalRow.scala:284)

我尝试了以下方法

  1. spark.read.option("mergeSchema", "true")
    合并架构,例如:

    s3_df = spark.read.option("overwriteSchema", "true").parquet("s3://path/")

  2. spark.read.option("overwriteSchema", "true")
    使用新数据类型覆盖模式

  3. spark.conf.set("spark.sql.parquet.enableVectorizedReader","false") 

但没有一个奏效。明确设置架构对我来说并不是更好的解决方案,因为我希望使用相同的解决方案合并许多不同的镶木地板文件集。

如何解决这个问题?

apache-spark pyspark aws-glue
1个回答
0
投票

尝试在 pyspark 中使用

df = df.withColumn("C", F.col("C").cast("double"))

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F

# Initialize Glue context and job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 paths
input_path = "s3://your-input-bucket/path-to-parquet-files/"
output_path = "s3://your-output-bucket/path-to-merged-parquet-file/"

# Read the parquet files into a DynamicFrame
dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [input_path]},
    format="parquet"
)

# Convert DynamicFrame to DataFrame
df = dyf.toDF()

# Cast the 'C' column to Double to ensure consistency
df = df.withColumn("C", F.col("C").cast("double"))

# Convert DataFrame back to DynamicFrame
dyf_cleaned = DynamicFrame.fromDF(df, glueContext, "dyf_cleaned")

# Write the merged data back to S3
glueContext.write_dynamic_frame.from_options(
    frame=dyf_cleaned,
    connection_type="s3",
    connection_options={"path": output_path},
    format="parquet"
)

# Commit the job
job.commit()
© www.soinside.com 2019 - 2024. All rights reserved.