Dask 令人尴尬地并行循环优化

问题描述 投票:0回答:1

我正在尝试加快 pandas.DataFrame 上的一些计算速度。每次迭代的大约一半时间仅用于创建数据帧的副本(它有大约 150 万行和 10 列),有什么方法可以跳过它或替代方案,以便我仍然能够复制结果? 注意:我在一台本地计算机上运行。

from scipy.stats import norm

def iteration(df, seed):
   dfCopy = df.copy()
   rng = np.random.default_rng(seed)
   factor = norm.ppf(rng.uniform())
   factors = norm.ppf(rng.uniform(size=len(dfCopy))
   
   dfCopy["result"] = dfCopy["important"] * factor + dfCopy["important"] * factors
   #  and some more operations involving the dataframe and using factor and factors
   
   return dfCopy.groupby(by="ID").agg({"result": "sum"}).reset_index()


sg = np.random.SeedSequence(112622607388004198928332989692281456100)
seeds = sg.generate_state(10000)

delayed_results = []
for seed in seeds:
    result = dask.delayed(iteration)(df, seed)
    delayed_results.append(result)
results = dask.compute(*delayed_results)

我尝试在数据框中创建新列,在其中执行计算并随后删除它们,但由于插入太多列,会引发错误。另外,我认为在 dask.Dataframe 上运行操作没有意义,因为它们并不那么复杂,唯一花费大量时间的是数据帧的复制和迭代次数。

python pandas multithreading multiprocessing dask
1个回答
0
投票

与其复制整个数据帧,不如根据结果创建一个 tmp 数据帧。

from scipy.stats import norm

def iteration(df, seed):
   rng = np.random.default_rng(seed)
   factor = norm.ppf(rng.uniform())
   factors = norm.ppf(rng.uniform(size=len(df))
   
   result = dfCopy["important"] * factor + dfCopy["important"] * factors
   #  and some more operations involving the dataframe and using factor and factors
   
   # do this.
   tmp_df = pd.DataFrame({'ID': df['ID'], 'result': result})
   
   return tmp_df.groupby(by="ID").agg({"result": "sum"}).reset_index()


sg = np.random.SeedSequence(112622607388004198928332989692281456100)
seeds = sg.generate_state(10000)

delayed_results = []
for seed in seeds:
    result = dask.delayed(iteration)(df, seed)
    delayed_results.append(result)
results = dask.compute(*delayed_results)

我假设你的操作是矢量化的。这可能会节省您一些时间,因为您不是创建副本,而是创建对原始 df 的引用,而不对其进行修改。

© www.soinside.com 2019 - 2024. All rights reserved.