如何使用异步Python代码优化我的性能

问题描述 投票:0回答:2

我正在寻求优化我的代码以便更快地处理信息。第一次玩异步请求。而且对 Python 来说还是个新手。我希望我的代码有意义。

我使用 FastAPI 作为框架。和 aiohttp 发送我的请求。 现在,我只对获取每个搜索单词的结果总数感兴趣。之后我会将 json 转储到数据库中。

我的代码正在向公共 crossref API 发送请求 (crossref) 例如,我正在搜索从 2022-06-02 到 2022-06-03(含)的术语。正在搜索的术语有:“纸”(3146 个结果)、“铵”(1430 个结果)和“漂白剂”(23 个结果)。示例:

https://api.crossref.org/works?rows=1000&sort=created&[email protected]&query=paper&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=*

这将返回 3146 行。我一次只需要搜索一个术语。我也没有尝试每天拆分它来看看它是否更快。

这里面还有一个递归上下文。这就是我觉得我对异步概念处理不当的地方。这就是为什么我需要递归调用。

深度分页请求

使用游标的深度分页可用于迭代大型结果集,而对其大小没有任何限制。 要使用深度分页,请像平常一样进行查询,但包含值为 * 的游标参数,例如:

https://api.crossref.org/works?rows=1000&sort=created&[email protected]&query=ammonium&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=*

JSON 响应中将提供下一个光标字段。要获取下一页结果,请将 next-cursor 的值作为游标参数传递。例如:

https://api.crossref.org/works?rows=1000&sort=created&[email protected]&query=ammonium&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=<value of next-cursor parameter>

来自 CrossRef 文档的建议

客户应检查退回物品的数量。如果返回的项目数等于预期行数,则已到达结果集末尾。超出此点使用下一个光标将导致响应具有空项目列表。

仅 3 个单词(和 7 个请求),我的处理时间仍然非常快,超过 15 秒。如果可能的话,我想把它减少到 5 秒以内?使用postman,最长的请求花了大约4秒才返回

这是我到目前为止所拥有的,如果你想尝试一下的话。

schema.py

class CrossRefSearchRequest(BaseModel):
    keywords: List[str]
    date_from: Optional[datetime] = None
    date_to: Optional[datetime] = None

控制器.py

import time
from fastapi import FastAPI, APIRouter, Request

app = FastAPI(title="CrossRef API", openapi_url=f"{settings.API_V1_STR}/openapi.json")
api_router = APIRouter()
service = CrossRefService()

@api_router.post("/search", status_code=201)
async def search_keywords(*, search_args: CrossRefSearchRequest) -> dict:
    fixed_search_args = {
        "sort": "created",
        "rows": "1000",
        "cursor": "*"
    }
    results = await service.cross_ref_request(search_args, **fixed_search_args)
    return {k: len(v) for k, v in results.items()}

# sets the header X-Process-Time, in order to have the time for each request
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

app.include_router(api_router)

if __name__ == "__main__":
    # Use this for debugging purposes only
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8001, log_level="debug")

服务.py

from datetime import datetime, timedelta

def _setup_date_default(date_from_req: datetime, date_to_req: datetime):
    yesterday = datetime.utcnow()- timedelta(days=1)

    date_from = yesterday if date_from_req is None else date_from_req
    date_to = yesterday if date_to_req is None else date_to_req

    return date_from.strftime(DATE_FORMAT_CROSS_REF), date_to.strftime(DATE_FORMAT_CROSS_REF)


class CrossRefService:

    def __init__(self):
        self.client = CrossRefClient()
    
    # my recursive call for the next cursor
    async def _send_client_request(self ,final_result: dict[str, list[str]], keywords: [str], date_from: str, date_to: str, **kwargs):
        json_responses = await self.client.cross_ref_request_date_range(keywords, date_from, date_to, **kwargs)

        for json_response in json_responses:

            message = json_response.get('message', {})
            keyword = message.get('query').get('search-terms')
            next_cursor = message.get('next-cursor')
            total_results = message.get('total-results')
            search_results = message.get('items', [{}]) if total_results > 0 else []

            if final_result[keyword] is None:
                final_result[keyword] = search_results
            else:
                final_result[keyword].extend(search_results)

            if total_results > int(kwargs['rows']) and len(search_results) == int(kwargs['rows']):
                kwargs['cursor'] = next_cursor
                await self._send_client_request(final_result, [keyword], date_from, date_to, **kwargs)

    async def cross_ref_request(self, request: CrossRefSearchRequest, **kwargs) -> dict[str, list[str]]:
        date_from, date_to = _setup_date(request.date_from, request.date_to)
        results: dict[str, list[str]] = dict.fromkeys(request.keywords)

        await self._send_client_request(results, request.keywords, date_from, date_to, **kwargs)

        return results

客户端.py

import asyncio
from aiohttp import ClientSession

async def _send_request_task(session: ClientSession, url: str):
    try:
        async with session.get(url) as response:
            await response.read()
            return response
    # exception handler to come
    except Exception as e:
        print(f"exception for {url}")
        print(str(e))
        
        
class CrossRefClient:
    base_url = "https://api.crossref.org/works?" \
               "query={}&" \
               "filter=from-index-date:{},until-index-date:{}&" \
               "sort={}&" \
               "rows={}&" \
               "cursor={}"

    def __init__(self) -> None:
        self.headers = {
            "User-Agent": f"my_app/v0.1 (example.com/; mailto:[email protected]) using FastAPI"
        }

    async def cross_ref_request_date_range(
            self, keywords: [str], date_from: str, date_to: str, **kwargs
    ) -> list:
        async with ClientSession(headers=self.headers) as session:
            tasks = [
                asyncio.create_task(
                    _send_request_task(session, self.base_url.format(
                             keyword, date_from, date_to, kwargs['sort'], kwargs['rows'], kwargs['cursor']
                         )),
                    name=TASK_NAME_BASE.format(keyword, date_from, date_to)
                )
                for keyword in keywords
            ]

            responses = await asyncio.gather(*tasks)

            return [await response.json() for response in responses]

如何更好地优化这个并更好地使用异步调用?此外,这种递归循环可能也不是最好的方法。对此还有什么想法吗?

我实现了一个同步调用的解决方案,它甚至更慢。所以我想我离这个还不算太远。

谢谢!

python python-3.x asynchronous recursion fastapi
2个回答
1
投票

您的代码看起来不错,并且您没有滥用异步概念。

也许您受到客户端会话数量的限制,一次最多只能有 100 个连接。看看 https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.BaseConnector

也许上游服务器只是对大量请求响应缓慢。


0
投票

创建每个任务后,使用await asyncio.sleep(0)创建任务时应该返回到事件循环,否则所有任务将按顺序执行。

© www.soinside.com 2019 - 2024. All rights reserved.