改善pyspark作业分析数据

问题描述 投票:0回答:1

我在Google存储空间中有一些JSON文件,其中包含大量数据(介于500Gb和1Tb之间)。这些文件每行包含1个JSON对象,格式如下:

{"country":"US", "col1":"val1", "col2":"val2", "col3":"val3"}
{"country":"CA", "col1":"val4", "col2":"val5", "col3":"val6"}

我的目标是为我可以在此数据中找到的10个国家/地区在BigQuery中创建不同的表格。所以我最终得到10张桌子,例如,一张桌子是名为data_us的架构:col1,col2,col3。

我目前的操作方式是使用pyspark并在Google Dataproc的计算机集群上运行作业。

    data = spark.read.json(bucket_source)
    data.createOrReplaceTempView('data')

    for c in country_list:
        table_name = "data_{}".format(c)
        query = "select col1, col2, col3, from data where language = '{}'".format(c)
        result_folder = "result_{}".format(c)
        result = spark.sql(query)
        push_bigquery(bucket_dest, cluster_name, project_name, dataset_name, result, result_folder, table_name)

因此,基本上,我只是加载数据,创建视图并要求pyspark对每个国家/地区执行1个请求。然后,我调用push_bigquery函数,该函数将结果转储到CSV文件并将其加载到BigQuery中。此解决方案有效,但是对于大量数据来说似乎有点慢(对于接近1Tb的数据大小,大约需要12小时)。我有2个问题:

  • 我目前在每个国家/地区进行1个查询,因此将对每个查询的全部数据进行分析。有办法吗只是“分析”每一行并立即将其写入正确的结果/文件中?我觉得会更快但由于我对spark / pyspark不太熟悉,但我不知道此解决方案是否有意义。

  • 与使用spark相比,有没有一种完全不同且更好的方法来执行此任务?

谢谢您的帮助

apache-spark pyspark google-cloud-dataproc
1个回答
0
投票

我想念.cache,但是以下是我基于N国家 N表要求的首次尝试:

  1. 无缓存。
  2. 通过df.repartition(country).write...partitionBy(country)...使用适当的选项以非拼花形式(无需柱状)读取,重新分区和写出。

  3. 然后,我将依赖分区意识在每个国家/地区应用您的for循环。

© www.soinside.com 2019 - 2024. All rights reserved.