因此,我构建了一个数据管道,它从 api 端点获取报告,进行一些基本的清理转换,然后将数据连接到输出数据帧中。最初我使用了多处理,但这在我们使用的平台中不再是一个选项。由于渴望尝试异步,我制作了一个测试套件,并使用异步、多线程和同步循环调用,将 n 次运行的性能与 4 个报告进行了比较。 然而,令我惊讶的是,我的异步代码在统计上(使用 anova 1-way)并不比同步代码快。我只能假设我没有以最佳方式编写异步。如果这里有问题,有人可以建议吗:
class AsyncApiCaller(ApiCaller):
# snip
async def async_build_and_assert_dataframe_from_csv(self, data):
df = pd.read_csv(StringIO(data))
if len(df) > 0:
print("Warning: Dataframe is empty")
return df
async def process(self, session, url):
result = await self.api_get(session, url)
return await self.async_build_and_assert_dataframe_from_csv(
result.decode("utf-8")
)
async def make_all_requests(self, ids):
async with aiohttp.ClientSession() as session:
tasks = set()
for url in ids:
task = asyncio.create_task(coro=self.process(session=session, url=url))
tasks.add(task)
return await asyncio.gather(*tasks, return_exceptions=False)
async def api_get(self, session, url: str):
async with session.get(url, headers=self.headers) as response:
return await response.content.read()
async def main(self) -> pd.DataFrame:
# snip
urls = # a list of urls to run
results = await self.make_all_requests(urls)
# concat results into single DataFrame
final_df = pd.concat(results)
final_df = self.format_df(final_df)
# snip
return final_df
#snip
父类是同步版本,我将在这里省略大部分细节...
class ApiCaller:
# snip
def go_single(self, id: ID) -> pd.DataFrame:
url = self.get_url(id) # turn id into an endpoint url
response = self.call_api(url, "GET", headers) # requests library wrapper
data = self.parse_json_response(response, "data")
dataframe = self.build_and_assert_dataframe_from_json(data)
return dataframe
def main(self) -> pd.DataFrame:
# snip
ids = # list of ids
tasks = [self.go_single(id) for id in ids]
dataframe = pd.concat(list(tasks), axis=0, ignore_index=False)
dataframe = self.format_df(dataframe)
# snip
return dataframe
多线程代码省略。
1-确保您没有每次都重新创建 ClientSession。
2-如果存在 API 速率限制,则使用 asyncio.Semaphore 限制并发请求。
3-不要使用 create_task,而是直接尝试使用 asyncio.gather
tasks = [self.process(session, url) for url in ids]
results = await asyncio.gather(*tasks)
4-将 asyncio.to_thread 用于 CPU 密集型任务,例如 pd.read_csv 和 pd.concat
df = await asyncio.to_thread(pd.read_csv, StringIO(data))
5-速度还取决于您的计算资源、CPU 核心和可用的工作线程。