我在Google存储空间中有一些JSON文件,其中包含大量数据(介于500Gb和1Tb之间)。这些文件每行包含1个JSON对象,格式如下:
{"country":"US", "col1":"val1", "col2":"val2", "col3":"val3"}
{"country":"CA", "col1":"val4", "col2":"val5", "col3":"val6"}
我的目标是为我可以在此数据中找到的10个国家/地区在BigQuery中创建不同的表格。所以我最终得到10张桌子,例如,一张桌子是名为data_us的架构:col1,col2,col3。
我目前的操作方式是使用pyspark并在Google Dataproc的计算机集群上运行作业。
data = spark.read.json(bucket_source)
data.createOrReplaceTempView('data')
for c in country_list:
table_name = "data_{}".format(c)
query = "select col1, col2, col3, from data where language = '{}'".format(c)
result_folder = "result_{}".format(c)
result = spark.sql(query)
push_bigquery(bucket_dest, cluster_name, project_name, dataset_name, result, result_folder, table_name)
因此,基本上,我只是加载数据,创建视图并要求pyspark对每个国家/地区执行1个请求。然后,我调用push_bigquery函数,该函数将结果转储到CSV文件并将其加载到BigQuery中。此解决方案有效,但是对于大量数据来说似乎有点慢(对于接近1Tb的数据大小,大约需要12小时)。我有2个问题:
我目前在每个国家/地区进行1个查询,因此将对每个查询的全部数据进行分析。有办法吗只是“分析”每一行并立即将其写入正确的结果/文件中?我觉得会更快但由于我对spark / pyspark不太熟悉,但我不知道此解决方案是否有意义。
与使用spark相比,有没有一种完全不同且更好的方法来执行此任务?
谢谢您的帮助
我想念.cache,但是以下是我基于N国家 N表要求的首次尝试:
通过df.repartition(country).write...partitionBy(country)...
使用适当的选项以非拼花形式(无需柱状)读取,重新分区和写出。
然后,我将依赖分区意识在每个国家/地区应用您的for循环。