我正在寻找一种方法来减少熊猫数据帧上多个函数计算的时间计算,即我正在尝试做的事情。 在我的电脑上需要3分钟多。是否存在与带有聚合函数的 pandarallel 等效的函数或与 boost 计算等效的函数。 pandarallel 似乎不适用于聚合。
这是我使用的代码:
import pandas as pd
import numpy as np
from empyrical import (cagr,
annual_volatility,
max_drawdown,
)
import warnings
warnings.filterwarnings('ignore')
# Exemple
N = 10000
mu = 0.1/252
sigma = 0.15/np.sqrt(250)
# Créer un DataFrame vide pour stocker les séries temporelles
date_range = pd.date_range(start='1990-01-01', end='2020-01-01', freq='B')
# Créer un DataFrame vide avec l'index de dates
df = pd.DataFrame(index=date_range)
# Générer les séries temporelles
for i in range(N):
series = (100+ np.random.normal(mu, sigma, len(date_range)).cumsum())
df[f"Série {i+1}"] = series
tab = (df
.sort_index()
.aggregate([
# Date de première/dernière VL
lambda x: x.first_valid_index().date(),
lambda x: x.last_valid_index().date(),
# Mesure de performance
## Perf total
lambda x: 100*cagr(x[-52:].pct_change(), period='weekly') if x[-52:].isnull().sum() <= 1 else np.nan,
lambda x: 100*cagr(x[-3*52:].pct_change(), period='weekly'),
lambda x: 100*cagr(x[-5*52:].pct_change(), period='weekly'),
# Mesure de risque
# Volatilité
lambda x: 100*annual_volatility(x[-52:].pct_change(), period='weekly'),
lambda x: 100*annual_volatility(x[-3*52:].pct_change(), period='weekly'),
lambda x: 100*annual_volatility(x[-5*52:].pct_change(), period='weekly'),
## Max DD
lambda x: 100*max_drawdown(x[-52:].pct_change()),
lambda x: 100*max_drawdown(x[-3*52:].pct_change()),
lambda x: 100*max_drawdown(x[-5*52:].pct_change()),
],)
.set_axis([
'Date de début', 'Date de fin',
'Perf 1 an', 'Perf 3 ans', 'Perf 5 ans',
'Volat 1 an', 'Volat 3 ans', 'Volat 5 ans',
'Max DD 1 an', 'Max DD 3 ans', 'Max DD 5 ans',
])
.T
.dropna()
)
tab
您可以尝试使用
multiprocessing.Pool
在单独的进程中处理每个 pd.Series
:
import multiprocessing as mp
import time
import warnings
import numpy as np
import pandas as pd
from empyrical import annual_volatility, cagr, max_drawdown
warnings.filterwarnings("ignore")
agg_funcs = [
# Date de première/dernière VL
lambda x: x.first_valid_index().date(),
lambda x: x.last_valid_index().date(),
# Mesure de performance
## Perf total
lambda x: (
100 * cagr(x[-52:].pct_change(), period="weekly")
if x[-52:].isnull().sum() <= 1
else np.nan
),
lambda x: 100 * cagr(x[-3 * 52 :].pct_change(), period="weekly"),
lambda x: 100 * cagr(x[-5 * 52 :].pct_change(), period="weekly"),
# Mesure de risque
# Volatilité
lambda x: 100 * annual_volatility(x[-52:].pct_change(), period="weekly"),
lambda x: 100 * annual_volatility(x[-3 * 52 :].pct_change(), period="weekly"),
lambda x: 100 * annual_volatility(x[-5 * 52 :].pct_change(), period="weekly"),
## Max DD
lambda x: 100 * max_drawdown(x[-52:].pct_change()),
lambda x: 100 * max_drawdown(x[-3 * 52 :].pct_change()),
lambda x: 100 * max_drawdown(x[-5 * 52 :].pct_change()),
]
agg_labels = [
"Date de début",
"Date de fin",
"Perf 1 an",
"Perf 3 ans",
"Perf 5 ans",
"Volat 1 an",
"Volat 3 ans",
"Volat 5 ans",
"Max DD 1 an",
"Max DD 3 ans",
"Max DD 5 ans",
]
def calculate_agg(series):
name, x = series
out = pd.Series([fn(x) for fn in agg_funcs], index=agg_labels, name=name)
return out
if __name__ == "__main__":
# Exemple
N = 10000
# N = 10
mu = 0.1 / 252
sigma = 0.15 / np.sqrt(250)
# Créer un DataFrame vide pour stocker les séries temporelles
date_range = pd.date_range(start="1990-01-01", end="2020-01-01", freq="B")
# Créer un DataFrame vide avec l'index de dates
df = pd.DataFrame(index=date_range)
# Générer les séries temporelles
np.random.seed(42)
for i in range(N):
series = 100 + np.random.normal(mu, sigma, len(date_range)).cumsum()
df[f"Série {i+1}"] = series
start_time = time.time()
df_out = []
with mp.Pool(processes=16) as pool: # <-- adjust number of processes accordingly
for result in pool.imap_unordered(
calculate_agg, ((c, df[c]) for c in df.columns)
):
df_out.append(result)
df_out = pd.concat(df_out, axis=1).T
end_time = time.time()
print(f"--- {time.time() - start_time} seconds ---")
print("Final shape:", df_out.shape)
在我的计算机上打印(AMD 5700x):
--- 5.798572540283203 seconds ---
Final shape: (10000, 11)