使用pySpark和Cloud Storage过滤数百万个文件

问题描述 投票:3回答:1

我正面临以下任务:我将各个文件(例如Mb)存储在Google Cloud Storage Bucket中,并按日期在目录中分组(每个目录包含大约5k个文件)。我需要查看每个文件(xml),过滤适当的文件,然后将其放入Mongo,或以拼写格式将其写回到Google Cloud Storage。我编写了一个简单的pySpark程序,如下所示:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = (
    SparkSession
    .builder
    .appName('myApp')
    .config("spark.mongodb.output.uri", "mongodb://<mongo_connection>") 
    .config("spark.mongodb.output.database", "test") 
    .config("spark.mongodb.output.collection", "test")
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true")
    .config("spark.dynamicAllocation.enabled", "true")
    .getOrCreate()
)

spark_context = spark.sparkContext
spark_context.setLogLevel("INFO")
sql_context   = pyspark.SQLContext(spark_context)

# configure Hadoop
hadoop_conf = spark_context._jsc.hadoopConfiguration()
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")


# DataFrame schema
schema = StructType([
    StructField('filename', StringType(), True),
    StructField("date", DateType(), True),
    StructField("xml", StringType(), True)
])

# -------------------------
# Main operation
# -------------------------
# get all files
files = spark_context.wholeTextFiles('gs://bucket/*/*.gz')

rows = files \
    .map(lambda x: custom_checking_map(x)) \
    .filter(lambda x: x is not None)

# transform to DataFrame 
df = sql_context.createDataFrame(rows, schema)

# write to mongo
df.write.format("mongo").mode("append").save()

# write back to Cloud Storage
df.write.parquet('gs://bucket/test.parquet')

spark_context.stop()

我在一个子集(单个目录gs://bucket/20191010/*.gz)上对其进行了测试,并且可以正常工作。我将其部署在Google Dataproc集群上,但怀疑在19/11/06 15:41:40 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1573054807908_0001

之后,日志停止会发生任何事情

我正在运行3个具有4个核心和15GB RAM + 500GB HDD的工作集群。 Spark版本2.3.3,scala 2.11 mongo-connector-spark_2.11-2.3.3。我是Spark的新手,所以任何建议都值得赞赏。通常,我会使用Python多重处理编写此工作,但想移到“更好”的东西,但现在我不确定。

pyspark google-cloud-storage google-cloud-dataproc
1个回答
2
投票

[在GCS中列出大量文件可能要花费大量时间-最有可能是您的工作“挂起”,而Spark驱动程序在开始处理之前列出了所有文件。

通过首先列出所有目录,然后在每个目录中处理文件,您将获得更好的性能-为了获得最佳性能,您可以并行处理目录,但要考虑到每个目录有5k文件,并且群集中只有3个工作线程,依次处理目录可能就足够了。

© www.soinside.com 2019 - 2024. All rights reserved.