如何从 PySpark DataFrame 批量处理项目

问题描述 投票:0回答:3

我有一个 PySpark 数据框,对于每条(批次)记录,我想调用一个 API。所以基本上说我有 100000k 条记录,我想将项目批量分成 1000 条组并调用 API。我如何使用 PySpark 做到这一点?进行批处理的原因是 API 可能不会接受来自大数据系统的大量数据。

我首先想到了

LIMIT
,但这不会是“确定性的”。而且看起来效率很低?

apache-spark pyspark
3个回答
4
投票
df.foreachPartition { ele =>
   ele.grouped(1000).foreach { chunk =>
   postToServer(chunk)
}

代码是scala中的,你可以在python中检查相同的代码。它将创建 1000 个批次。


3
投票

使用

foreachPartition
,然后像这样如何将可迭代对象拆分为恒定大小的块,将可迭代对象批处理为 1000 个组,这在 Spark 资源使用方面可以说是最有效的方法。

def handle_iterator(it):
    # batch the iterable and call API
    pass
df.foreachPartition(handle_iterator)

注意:这将从执行器中并行调用 API,并且可能不是实践中的方法,例如速率限制是一个问题。


0
投票

如果顺序不是强制性的,那么您可以使用

randomSplit()
将记录划分为大致相等数量的批次。 参考这里

df_count = 575641
batch_size = 15000
num_batches = (df_count + batch_size - 1) // batch_size
offset = 0

ids_set = set()
orig_df = spark.range(1,df_count)
for i in range(0,num_batches):
    batch_df = orig_df.offset(offset).limit(batch_size)
    ids_set = ids_set.union([r[0] for r in batch_df.select('id').collect()])
    print(batch_df.count(), len(ids_set), i)
© www.soinside.com 2019 - 2024. All rights reserved.