在pyspark中并行化for循环;每次迭代一张表

问题描述 投票:0回答:1

我在 Databricks 中有几十个 Spark 表,大小在 ~1 到 ~20 GB 之间,并且想要在每个表上执行一个函数。由于每个查询的结果之间不存在相互依赖性,因此应该很容易并行化。

但是我不知道如何指示 pyspark 并行执行以下代码。它只是一张又一张地进行。

这是一个简单的演示,展示我的代码结构:

单元格 1(创建一些演示表):

tables = []
columns = list("abc")
for i in range(10):
    nrows = int(1E6)
    ncols = len(columns)
    data = np.random.rand(ncols * nrows).reshape((nrows, ncols))
    schema = ", ".join([f"{_}: float" for _ in columns])
    table = spark.createDataFrame(data=data, schema=schema)
    tables.append(table)

单元格 2(对每个单元格执行操作):

quantiles = {}
for i, table in enumerate(tables):
    quantiles[i] = table.approxQuantile(columns, [0.01, 0.99], relativeError=0.001)

注意:演示有点简化。实际上,我每个表上都有不同的列,所以我不能只是将它们连接起来。

apache-spark pyspark databricks
1个回答
0
投票

你不能只使用

parallelize
吗?

列表

tables
中的每个元素都应该是一个 PySpark DataFrame,使用
spark.createDataFrame

创建
# create a RDD of the tables
# `sc` a.k.a. SparkContext should be already created once you started the session in Databricks
rdd_tables = sc.parallelize(tables)

# apply the function to each table in parallel
rdd_quantiles = rdd_tables.map(compute_quantiles)

quantiles = rdd_quantiles.collect()

然后,您可以首先创建数据框列表,然后使用

parallelize
map
函数。

© www.soinside.com 2019 - 2024. All rights reserved.