如何在将大型 JSON 文件转换为 JSON 之前使用 AWSglueContext 对其进行拆分/分块?

问题描述 投票:0回答:2

我正在尝试使用 AWS Glue 将 20GB JSON gzip 文件转换为 parquet。

我已经使用 Pyspark 设置了一个作业,代码如下。

我收到此日志警告消息:

LOG.WARN: Loading one large unsplittable file s3://aws-glue-data.json.gz with only one partition, because the file is compressed by unsplittable compression codec.

我想知道是否有办法分割/分块文件?我知道我可以用 pandas 做到这一点,但不幸的是这需要太长时间(12 个小时以上)。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
import pyspark.sql.functions
from pyspark.sql.functions import col, concat, reverse, translate
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

test = glueContext.create_dynamic_frame_from_catalog(
             database="test_db",
             table_name="aws-glue-test_table")


# Create Spark DataFrame, remove timestamp field and re-name other fields
reconfigure = test.drop_fields(['timestamp']).rename_field('name', 'FirstName').rename_field('LName', 'LastName').rename_field('type', 'record_type')

# Create pyspark DF
spark_df = reconfigure.toDF()
# Filter and only return 'a' record types 
spark_df = spark_df.where("record_type == 'a'")
# Once filtered, remove the record_type column
spark_df = spark_df.drop('record_type')
spark_df = spark_df.withColumn("LastName", translate("LastName", "LName:", ""))
spark_df = spark_df.withColumn("FirstName", reverse("FirstName"))

spark_df.write.parquet("s3a://aws-glue-bucket/parquet/test.parquet")

json amazon-web-services apache-spark pyspark bigdata
2个回答
1
投票

Spark 不会并行读取单个 gzip 文件。但是,您可以将其分成块。

此外,Spark 读取 gzip 文件的速度非常慢(因为它没有并行化)。您可以这样做来加快速度:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

0
投票

我遇到了一个使用 Gzip 压缩的大型不可分割 CSV 文件的问题。我相信接受的答案仅适用于文件列表。

我使用新的 AWS Glue for ray 和 AWS Wrangler 批量读取分区,如下所示:

import awswrangler as wr
import ray
import pandas
from ray import data
import pandas as pd


ray.init('auto')
large_dataset = wr.s3.read_csv(
        's3://path_to_large_csv.gz,
        sep=',',
        header = True,
        chunksize = 1e5,
        
    
    )


@ray.remote
def read_batch(batch):
    #print (batch.shape)
    return batch
futures = [read_batch.remote(part) for part in large_dataset]

large_distributed_dataset = data.from_pandas(ray.get(futures))
large_distributed_dataset.write_parquet(
    "s3://path_to_output/"
    )

awswrangler[modin] 必须按照胶水文档中的指导使用 --pip-install 添加到作业中。

© www.soinside.com 2019 - 2024. All rights reserved.