使用异步重写 http 请求

问题描述 投票:0回答:1

我正在尝试使用 asyncio (async & await) 重写 python 脚本。 该脚本从文件中读取数据,然后使用请求库根据 Web 服务检查每一行。我了解到我必须将请求更改为启用异步的“httpx”库,但没有掌握如何使用 asyncio 的想法,希望从这个例子中学习。 将请求更改为 httpx 后,脚本如下所示。

#!/usr/bin/env python3                                                                                                                                                                                    
                                                                                                                                                                                                          
import sys                                                                                                                                                                                                
import json                                                                                                                                                                                               
import httpx                                                                                                                                                                                              
def main( ):                                                                                                                                                                                              
    with  open('output','w') as res:                                                                                                                                                                      
        for rr in sys.stdin:                                                                                                                                                                              
            r=rr.rstrip().split();                                                                                                                                                                        
            rsp=httpx.post('http://localhost:8080/api/v1/service',                                                                                                                                        
            ->>>json={'f1': r[0], 'f2': r[1] })                                                                                                                                                           
            json_result=json.loads(rsp.text)                                                                                                                                                              
            if json_result['error'] :                                                                                                                                                                     
                  print (rr.rstrip(),file=res);                                                                                                                                                           
                  print (rsp.text,file=res,flush=True)                                                                                                                                                    
main()                                                                                                                                                                                                    

我可以运行此脚本的多个实例,因为该服务能够并行处理多个请求。我想更改脚本,以便它可以自己并行发送多个请求。 我想保留关于“for 循环”的脚本的整体结构,因为在更复杂的场景中,输入数据将从数据库中读取(查询中的记录:)。如果可能的话,我希望在达到预先配置的未完成请求数时暂停读取输入数据,并在此类请求数减少时恢复。

这可能吗?实现该目标的最佳方法是什么?

python asynchronous python-requests
1个回答
0
投票

是的,您可以修改脚本以并行发送多个请求,同时保持脚本的整体结构不变。给你:

#!/usr/bin/env python3

import sys
import json
import httpx
import concurrent.futures

def send_request(url, data):
    with httpx.Client() as client:
        rsp = client.post(url, json=data)
        json_result = json.loads(rsp.text)
        if json_result['error']:
            return (data, rsp.text)
        else:
            return None

def main():
    with open('output', 'w') as res:
        url = 'http://localhost:8080/api/v1/service'
        max_requests = 10  # Set the maximum number of concurrent requests
        data_list = []
        for rr in sys.stdin:
            r = rr.rstrip().split()
            data_list.append({'f1': r[0], 'f2': r[1]})
            if len(data_list) >= max_requests:
                with concurrent.futures.ThreadPoolExecutor(max_workers=max_requests) as executor:
                    futures = [executor.submit(send_request, url, data) for data in data_list]
                    for future in concurrent.futures.as_completed(futures):
                        result = future.result()
                        if result:
                            print(result[0], file=res)
                            print(result[1], file=res, flush=True)
                data_list = []
        if data_list:
            with concurrent.futures.ThreadPoolExecutor(max_workers=len(data_list)) as executor:
                futures = [executor.submit(send_request, url, data) for data in data_list]
                for future in concurrent.futures.as_completed(futures):
                    result = future.result()
                    if result:
                        print(result[0], file=res)
                        print(result[1], file=res, flush=True)

main()
© www.soinside.com 2019 - 2024. All rights reserved.