Python
我的本地计算机上有数千个音频文件,我使用 FastAPI 运行一个 API,该 API 从音频文件中提取特征并将这些特征发送回给我 (json)。
过程如下:首先我发送音频文件,然后我得到一个令牌。然后我可以使用令牌一次又一次地检查处理是否已经完成。如果是这样,我会恢复这些功能。
我成功发布了一个音频文件:
url = 'https://something/submit/'
path = 'dir/interview-10001.wav'
with open(path, 'rb') as fobj:
res = requests.post(url, files={'file': fobj})
print(res.text)
{"message":"Submitted successfully","token":"abcd"}
...我设法通过以下方式获取一个文件的功能:
url = 'https://something/features/abcd'
res_feat = requests.get(url)
res_feat.json()
{'filesize': 253812,
'language': 'de',
'language_prob': 0.9755016565322876,
'n_words': 15, ... }
我现在如何并行或异步发出数百或数千个请求并收集结果?
我尝试过,例如jupyter笔记本中的这段代码(https://dev.to/ndrbrt/python-upload-multiple-files-concurrently-with-aiohttp-and-show-progress-bars-with-tqdm-32l7):
class FileManager():
def __init__(self, file_name: str):
self.name = file_name
self.size = os.path.getsize(self.name)
self.pbar = None
def __init_pbar(self):
self.pbar = tqdm(
total=self.size,
desc=self.name,
unit='B',
unit_scale=True,
unit_divisor=1024,
leave=True)
async def file_reader(self):
self.__init_pbar()
chunk_size = 64*1024
async with aiofiles.open(self.name, 'rb') as f:
chunk = await f.read(chunk_size)
while chunk:
self.pbar.update(chunk_size)
yield chunk
chunk = await f.read(chunk_size)
self.pbar.close()
async def upload(file: FileManager, url: str, session: aiohttp.ClientSession):
try:
data = {'file': open(file.name, 'rb')}
async with session.post(url, data={'data': data}) as res:
# NB: if you also need the response content, you have to await it
return res
except Exception as e:
# handle error(s) according to your needs
print(e)
async def main(files):
url = 'https://something/submit/'
files = [FileManager(file) for file in files]
async with aiohttp.ClientSession() as session:
res = await asyncio.gather(*[upload(file, url, session) for file in files])
print(f'All files have been uploaded ({len(res)})')
return res
我开始于:
path_1 = '/media/SPEAKER_01/interview-10001.wav'
path_2 = '/media/SPEAKER_00/interview-10001.wav'
files = [path_1, path_2]
res = await main(files)
res
但我回来了:
[422 Unprocessable Entity]>
<CIMultiDictProxy('Content-Length': '89', 'Content-Type': 'application/json', 'Date': 'Fri, 19 Jul 2024 09:54:29 GMT', 'Server': 'envoy', 'x-envoy-upstream-service-time': '23')>, ...
我用以下代码解决了我的问题:
async def post_audio_file(url: str = None,
file_path: str = None,
session: 'async with aiohttp.ClientSession() as
session' = None):
async with session.post(url, data={'file': read_audio(file_path)})
as response:
response_text = await response.text()
response_text = ast.literal_eval(response_text)
response_text['file_name'] = os.path.basename(file_path)
return response_text
# ----------------------------------------------------- #
async def get_feature(url: str = None,
file_name: str = None,
token: str = None,
session: 'async with aiohttp.ClientSession() as
session' = None):
async with session.get(os.path.join(url, token)) as response:
json_data = await response.json()
all_data = {'file_name': file_name}
all_data.update(json_data)
status_code = response.status
return {'file_name': file_name,
'status_code': status_code,
'json': all_data}
# ---------------------------------------------------- #
async def post_audio_files(url: str = None,
file_paths: list = None,
client_session: 'aiohttp.ClientSession()' = None):
async with client_session as session:
res = await asyncio.gather(*[post_audio_file(url = url,
file_path = file_path,
session = session)\
for file_path in file_paths])
print(f'All files have been uploaded ({len(res)})')
return res
# ----------------------------------------------------- #
async def get_features_stat200(url: str = None,
token_file_name_lst: list = None,
client_session: 'aiohttp.ClientSession()' = None):
async with client_session as session:
animation = "|/-\\"
idx = 0
res_bool_lst = [False]
while not all(res_bool_lst):
print('Gather and check for results: ', animation[idx %
len(animation)], end="\r")
idx += 1
res = await asyncio.gather(*[get_feature(url = url,
file_name = token_file_name[1],
token = token_file_name[0],
session = session)\
for token_file_name in token_file_name_lst])
res_bool_lst = [False if resp_dict['status_code'] != 200 else True for resp_dict in res]
print(f'Received features from {len(token_lst)} tokens.')
return res