我正在尝试使用 asyncio (async & await) 重写 python 脚本。 该脚本从文件中读取数据,然后使用请求库根据 Web 服务检查每一行。我了解到我必须将请求更改为启用异步的“httpx”库,但没有掌握如何使用 asyncio 的想法,希望从这个例子中学习。 将请求更改为 httpx 后,脚本如下所示。
#!/usr/bin/env python3
import sys
import json
import httpx
def main( ):
with open('output','w') as res:
for rr in sys.stdin:
r=rr.rstrip().split();
rsp=httpx.post('http://localhost:8080/api/v1/service',
->>>json={'f1': r[0], 'f2': r[1] })
json_result=json.loads(rsp.text)
if json_result['error'] :
print (rr.rstrip(),file=res);
print (rsp.text,file=res,flush=True)
main()
我可以运行此脚本的多个实例,因为该服务能够并行处理多个请求。我想更改脚本,以便它可以自己并行发送多个请求。 我想保留关于“for 循环”的脚本的整体结构,因为在更复杂的场景中,输入数据将从数据库中读取(查询中的记录:)。如果可能的话,我希望在达到预先配置的未完成请求数时暂停读取输入数据,并在此类请求数减少时恢复。
这可能吗?实现该目标的最佳方法是什么?
是的,您可以修改脚本以并行发送多个请求,同时保持脚本的整体结构不变。给你:
#!/usr/bin/env python3
import sys
import json
import httpx
import concurrent.futures
def send_request(url, data):
with httpx.Client() as client:
rsp = client.post(url, json=data)
json_result = json.loads(rsp.text)
if json_result['error']:
return (data, rsp.text)
else:
return None
def main():
with open('output', 'w') as res:
url = 'http://localhost:8080/api/v1/service'
max_requests = 10 # Set the maximum number of concurrent requests
data_list = []
for rr in sys.stdin:
r = rr.rstrip().split()
data_list.append({'f1': r[0], 'f2': r[1]})
if len(data_list) >= max_requests:
with concurrent.futures.ThreadPoolExecutor(max_workers=max_requests) as executor:
futures = [executor.submit(send_request, url, data) for data in data_list]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
print(result[0], file=res)
print(result[1], file=res, flush=True)
data_list = []
if data_list:
with concurrent.futures.ThreadPoolExecutor(max_workers=len(data_list)) as executor:
futures = [executor.submit(send_request, url, data) for data in data_list]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
print(result[0], file=res)
print(result[1], file=res, flush=True)
main()