为什么我的异步代码没有比同步更快?

问题描述 投票:0回答:1

因此,我构建了一个数据管道,它从 api 端点获取报告,进行一些基本的清理转换,然后将数据连接到输出数据帧中。最初我使用了多处理,但这在我们使用的平台中不再是一个选项。由于渴望尝试异步,我制作了一个测试套件,并使用异步、多线程和同步循环调用,将 n 次运行的性能与 4 个报告进行了比较。 然而,令我惊讶的是,我的异步代码在统计上(使用 anova 1-way)并不比同步代码快。我只能假设我没有以最佳方式编写异步。如果这里有问题,有人可以建议吗:

class AsyncApiCaller(ApiCaller):
    # snip
    async def async_build_and_assert_dataframe_from_csv(self, data):
        df = pd.read_csv(StringIO(data))
        if len(df) > 0:
            print("Warning: Dataframe is empty")
        return df

    async def process(self, session, url):
        result = await self.api_get(session, url)
        return await self.async_build_and_assert_dataframe_from_csv(
            result.decode("utf-8")
        )

    async def make_all_requests(self, ids):
        async with aiohttp.ClientSession() as session:
            tasks = set()
            for url in ids:
                task = asyncio.create_task(coro=self.process(session=session, url=url))
                tasks.add(task)
            return await asyncio.gather(*tasks, return_exceptions=False)

    async def api_get(self, session, url: str):
        async with session.get(url, headers=self.headers) as response:
            return await response.content.read()

    async def main(self) -> pd.DataFrame:
        # snip
        urls = # a list of urls to run
        results = await self.make_all_requests(urls)

        # concat results into single DataFrame
        final_df = pd.concat(results)
        final_df = self.format_df(final_df)
        # snip
        return final_df
    #snip

父类是同步版本,我将在这里省略大部分细节...

class ApiCaller:
   # snip
    def go_single(self, id: ID) -> pd.DataFrame:
        url = self.get_url(id) # turn id into an endpoint url
        response = self.call_api(url, "GET", headers) # requests library wrapper 
        data = self.parse_json_response(response, "data")
        dataframe = self.build_and_assert_dataframe_from_json(data)

        return dataframe

    def main(self) -> pd.DataFrame:
        # snip
        ids = # list of ids
        tasks = [self.go_single(id) for id in ids]
        dataframe = pd.concat(list(tasks), axis=0, ignore_index=False)
        dataframe = self.format_df(dataframe)
        # snip
        return dataframe
        

多线程代码省略。

python multithreading performance asynchronous synchronous
1个回答
0
投票

1-确保您没有每次都重新创建 ClientSession。

2-如果存在 API 速率限制,则使用 asyncio.Semaphore 限制并发请求。

3-不要使用 create_task,而是直接尝试使用 asyncio.gather

tasks = [self.process(session, url) for url in ids]
results = await asyncio.gather(*tasks)

4-将 asyncio.to_thread 用于 CPU 密集型任务,例如 pd.read_csv 和 pd.concat

df = await asyncio.to_thread(pd.read_csv, StringIO(data))

5-速度还取决于您的计算资源、CPU 核心和可用的工作线程。

© www.soinside.com 2019 - 2024. All rights reserved.