我正在寻求优化我的代码以便更快地处理信息。第一次玩异步请求。而且对 Python 来说还是个新手。我希望我的代码有意义。
我使用 FastAPI 作为框架。和 aiohttp 发送我的请求。 现在,我只对获取每个搜索单词的结果总数感兴趣。之后我会将 json 转储到数据库中。
我的代码正在向公共 crossref API 发送请求 (crossref) 例如,我正在搜索从 2022-06-02 到 2022-06-03(含)的术语。正在搜索的术语有:“纸”(3146 个结果)、“铵”(1430 个结果)和“漂白剂”(23 个结果)。示例:
https://api.crossref.org/works?rows=1000&sort=created&[email protected]&query=paper&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=*
这将返回 3146 行。我一次只需要搜索一个术语。我也没有尝试每天拆分它来看看它是否更快。
这里面还有一个递归上下文。这就是我觉得我对异步概念处理不当的地方。这就是为什么我需要递归调用。
深度分页请求
使用游标的深度分页可用于迭代大型结果集,而对其大小没有任何限制。 要使用深度分页,请像平常一样进行查询,但包含值为 * 的游标参数,例如:
https://api.crossref.org/works?rows=1000&sort=created&[email protected]&query=ammonium&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=*
JSON 响应中将提供下一个光标字段。要获取下一页结果,请将 next-cursor 的值作为游标参数传递。例如:
https://api.crossref.org/works?rows=1000&sort=created&[email protected]&query=ammonium&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=<value of next-cursor parameter>
来自 CrossRef 文档的建议
客户应检查退回物品的数量。如果返回的项目数等于预期行数,则已到达结果集末尾。超出此点使用下一个光标将导致响应具有空项目列表。
仅 3 个单词(和 7 个请求),我的处理时间仍然非常快,超过 15 秒。如果可能的话,我想把它减少到 5 秒以内?使用postman,最长的请求花了大约4秒才返回
这是我到目前为止所拥有的,如果你想尝试一下的话。
schema.py
class CrossRefSearchRequest(BaseModel):
keywords: List[str]
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
控制器.py
import time
from fastapi import FastAPI, APIRouter, Request
app = FastAPI(title="CrossRef API", openapi_url=f"{settings.API_V1_STR}/openapi.json")
api_router = APIRouter()
service = CrossRefService()
@api_router.post("/search", status_code=201)
async def search_keywords(*, search_args: CrossRefSearchRequest) -> dict:
fixed_search_args = {
"sort": "created",
"rows": "1000",
"cursor": "*"
}
results = await service.cross_ref_request(search_args, **fixed_search_args)
return {k: len(v) for k, v in results.items()}
# sets the header X-Process-Time, in order to have the time for each request
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
app.include_router(api_router)
if __name__ == "__main__":
# Use this for debugging purposes only
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001, log_level="debug")
服务.py
from datetime import datetime, timedelta
def _setup_date_default(date_from_req: datetime, date_to_req: datetime):
yesterday = datetime.utcnow()- timedelta(days=1)
date_from = yesterday if date_from_req is None else date_from_req
date_to = yesterday if date_to_req is None else date_to_req
return date_from.strftime(DATE_FORMAT_CROSS_REF), date_to.strftime(DATE_FORMAT_CROSS_REF)
class CrossRefService:
def __init__(self):
self.client = CrossRefClient()
# my recursive call for the next cursor
async def _send_client_request(self ,final_result: dict[str, list[str]], keywords: [str], date_from: str, date_to: str, **kwargs):
json_responses = await self.client.cross_ref_request_date_range(keywords, date_from, date_to, **kwargs)
for json_response in json_responses:
message = json_response.get('message', {})
keyword = message.get('query').get('search-terms')
next_cursor = message.get('next-cursor')
total_results = message.get('total-results')
search_results = message.get('items', [{}]) if total_results > 0 else []
if final_result[keyword] is None:
final_result[keyword] = search_results
else:
final_result[keyword].extend(search_results)
if total_results > int(kwargs['rows']) and len(search_results) == int(kwargs['rows']):
kwargs['cursor'] = next_cursor
await self._send_client_request(final_result, [keyword], date_from, date_to, **kwargs)
async def cross_ref_request(self, request: CrossRefSearchRequest, **kwargs) -> dict[str, list[str]]:
date_from, date_to = _setup_date(request.date_from, request.date_to)
results: dict[str, list[str]] = dict.fromkeys(request.keywords)
await self._send_client_request(results, request.keywords, date_from, date_to, **kwargs)
return results
客户端.py
import asyncio
from aiohttp import ClientSession
async def _send_request_task(session: ClientSession, url: str):
try:
async with session.get(url) as response:
await response.read()
return response
# exception handler to come
except Exception as e:
print(f"exception for {url}")
print(str(e))
class CrossRefClient:
base_url = "https://api.crossref.org/works?" \
"query={}&" \
"filter=from-index-date:{},until-index-date:{}&" \
"sort={}&" \
"rows={}&" \
"cursor={}"
def __init__(self) -> None:
self.headers = {
"User-Agent": f"my_app/v0.1 (example.com/; mailto:[email protected]) using FastAPI"
}
async def cross_ref_request_date_range(
self, keywords: [str], date_from: str, date_to: str, **kwargs
) -> list:
async with ClientSession(headers=self.headers) as session:
tasks = [
asyncio.create_task(
_send_request_task(session, self.base_url.format(
keyword, date_from, date_to, kwargs['sort'], kwargs['rows'], kwargs['cursor']
)),
name=TASK_NAME_BASE.format(keyword, date_from, date_to)
)
for keyword in keywords
]
responses = await asyncio.gather(*tasks)
return [await response.json() for response in responses]
如何更好地优化这个并更好地使用异步调用?此外,这种递归循环可能也不是最好的方法。对此还有什么想法吗?
我实现了一个同步调用的解决方案,它甚至更慢。所以我想我离这个还不算太远。
谢谢!
您的代码看起来不错,并且您没有滥用异步概念。
也许您受到客户端会话数量的限制,一次最多只能有 100 个连接。看看 https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.BaseConnector
也许上游服务器只是对大量请求响应缓慢。
创建每个任务后,使用await asyncio.sleep(0)创建任务时应该返回到事件循环,否则所有任务将按顺序执行。