我有一个约 14,000 行的数据框,并试图通过调用 API 将一些数据填充到新列中。下面的代码检索了预期的响应,但是,似乎每次迭代都在等待响应以转到下一行。
这里是函数:
def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]
我正在使用 xbbg 调用 Bloomberg API。
.apply() 函数返回预期的响应,
df['new_column'] = df['ISIN'].apply(market_sector_des)
但每个响应大约需要 2 秒,在 14,000 行时大约需要 8 小时。
有没有办法让这个apply函数异步,让所有的请求都并行发送?我已经将 dask 视为替代方案,但是,我也遇到了使用它的问题。
您可以使用
multiprocessing
并行化 API 调用。将您的 Series 分成 THREAD 块,然后每个块运行一个进程:
import multiprocessing as mp
import pandas as pd
import numpy as np
THREADS = mp.cpu_count() - 1
def market_sector_des(isin):
isin = '/isin/' + isin
return isin.lower()
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]
def proxy_func(sr):
return pd.Series([market_sector_des(isin) for isin in sr], index=sr.index)
if __name__ == '__main__':
# df = your_dataframe_here
split = np.array_split(df['ISIN'], THREADS)
with mp.Pool(THREADS) as pool:
data = pool.map(proxy_func, split)
df['new_column'] = pd.concat(data)