我在 Databricks 中有几十个 Spark 表,大小在 ~1 到 ~20 GB 之间,并且想要在每个表上执行一个函数。由于每个查询的结果之间不存在相互依赖性,因此应该很容易并行化。
但是我不知道如何指示 pyspark 并行执行以下代码。它只是一张又一张地进行。
这是一个简单的演示,展示我的代码结构:
单元格 1(创建一些演示表):
tables = []
columns = list("abc")
for i in range(10):
nrows = int(1E6)
ncols = len(columns)
data = np.random.rand(ncols * nrows).reshape((nrows, ncols))
schema = ", ".join([f"{_}: float" for _ in columns])
table = spark.createDataFrame(data=data, schema=schema)
tables.append(table)
单元格 2(对每个单元格执行操作):
quantiles = {}
for i, table in enumerate(tables):
quantiles[i] = table.approxQuantile(columns, [0.01, 0.99], relativeError=0.001)
注意:演示有点简化。实际上,我每个表上都有不同的列,所以我不能只是将它们连接起来。
你不能只使用
parallelize
吗?
列表
tables
中的每个元素都应该是一个 PySpark DataFrame,使用 spark.createDataFrame
创建
# create a RDD of the tables
# `sc` a.k.a. SparkContext should be already created once you started the session in Databricks
rdd_tables = sc.parallelize(tables)
# apply the function to each table in parallel
rdd_quantiles = rdd_tables.map(compute_quantiles)
quantiles = rdd_quantiles.collect()
然后,您可以首先创建数据框列表,然后使用
parallelize
和 map
函数。