我正在尝试使用 AWS Glue 将 20GB JSON gzip 文件转换为 parquet。
我已经使用 Pyspark 设置了一个作业,代码如下。
我收到此日志警告消息:
LOG.WARN: Loading one large unsplittable file s3://aws-glue-data.json.gz with only one partition, because the file is compressed by unsplittable compression codec.
我想知道是否有办法分割/分块文件?我知道我可以用 pandas 做到这一点,但不幸的是这需要太长时间(12 个小时以上)。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
import pyspark.sql.functions
from pyspark.sql.functions import col, concat, reverse, translate
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
test = glueContext.create_dynamic_frame_from_catalog(
database="test_db",
table_name="aws-glue-test_table")
# Create Spark DataFrame, remove timestamp field and re-name other fields
reconfigure = test.drop_fields(['timestamp']).rename_field('name', 'FirstName').rename_field('LName', 'LastName').rename_field('type', 'record_type')
# Create pyspark DF
spark_df = reconfigure.toDF()
# Filter and only return 'a' record types
spark_df = spark_df.where("record_type == 'a'")
# Once filtered, remove the record_type column
spark_df = spark_df.drop('record_type')
spark_df = spark_df.withColumn("LastName", translate("LastName", "LName:", ""))
spark_df = spark_df.withColumn("FirstName", reverse("FirstName"))
spark_df.write.parquet("s3a://aws-glue-bucket/parquet/test.parquet")
Spark 不会并行读取单个 gzip 文件。但是,您可以将其分成块。
此外,Spark 读取 gzip 文件的速度非常慢(因为它没有并行化)。您可以这样做来加快速度:
file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())
我遇到了一个使用 Gzip 压缩的大型不可分割 CSV 文件的问题。我相信接受的答案仅适用于文件列表。
我使用新的 AWS Glue for ray 和 AWS Wrangler 批量读取分区,如下所示:
import awswrangler as wr
import ray
import pandas
from ray import data
import pandas as pd
ray.init('auto')
large_dataset = wr.s3.read_csv(
's3://path_to_large_csv.gz,
sep=',',
header = True,
chunksize = 1e5,
)
@ray.remote
def read_batch(batch):
#print (batch.shape)
return batch
futures = [read_batch.remote(part) for part in large_dataset]
large_distributed_dataset = data.from_pandas(ray.get(futures))
large_distributed_dataset.write_parquet(
"s3://path_to_output/"
)
awswrangler[modin] 必须按照胶水文档中的指导使用 --pip-install 添加到作业中。