Python:发布许多音频文件并获取结果

问题描述 投票:0回答:1

Python

我的本地计算机上有数千个音频文件,我使用 FastAPI 运行一个 API,该 API 从音频文件中提取特征并将这些特征发送回给我 (json)。

过程如下:首先我发送音频文件,然后我得到一个令牌。然后我可以使用令牌一次又一次地检查处理是否已经完成。如果是这样,我会恢复这些功能。

我成功发布了一个音频文件:

url = 'https://something/submit/'

path = 'dir/interview-10001.wav'

with open(path, 'rb') as fobj:
    res = requests.post(url, files={'file': fobj})

print(res.text)

{"message":"Submitted successfully","token":"abcd"}

...我设法通过以下方式获取一个文件的功能:

url = 'https://something/features/abcd'

res_feat = requests.get(url)

res_feat.json()

{'filesize': 253812,
 'language': 'de',
 'language_prob': 0.9755016565322876,
 'n_words': 15, ... }

我现在如何并行或异步发出数百或数千个请求并收集结果?

我尝试过,例如jupyter笔记本中的这段代码(https://dev.to/ndrbrt/python-upload-multiple-files-concurrently-with-aiohttp-and-show-progress-bars-with-tqdm-32l7):

class FileManager():
    def __init__(self, file_name: str):
        self.name = file_name
        self.size = os.path.getsize(self.name)
        self.pbar = None

    def __init_pbar(self):
        self.pbar = tqdm(
            total=self.size,
            desc=self.name,
            unit='B',
            unit_scale=True,
            unit_divisor=1024,
            leave=True)

    async def file_reader(self):
        self.__init_pbar()
        chunk_size = 64*1024
        async with aiofiles.open(self.name, 'rb') as f:
            chunk = await f.read(chunk_size)
            while chunk:
                self.pbar.update(chunk_size)
                yield chunk
                chunk = await f.read(chunk_size)
            self.pbar.close()


async def upload(file: FileManager, url: str, session: aiohttp.ClientSession):
    try:
        data = {'file': open(file.name, 'rb')}
        async with session.post(url, data={'data': data}) as res:
            # NB: if you also need the response content, you have to await it
            return res
    except Exception as e:
        # handle error(s) according to your needs
        print(e)


async def main(files):
    url = 'https://something/submit/'
    
    files = [FileManager(file) for file in files]

    async with aiohttp.ClientSession() as session:
        res = await asyncio.gather(*[upload(file, url, session) for file in files])

    print(f'All files have been uploaded ({len(res)})')

    return res

我开始于:

path_1 = '/media/SPEAKER_01/interview-10001.wav'

path_2 = '/media/SPEAKER_00/interview-10001.wav'

files = [path_1, path_2]

res = await main(files)

res

但我回来了:

[422 Unprocessable Entity]>
 <CIMultiDictProxy('Content-Length': '89', 'Content-Type': 'application/json', 'Date': 'Fri, 19 Jul 2024 09:54:29 GMT', 'Server': 'envoy', 'x-envoy-upstream-service-time': '23')>, ...
python asynchronous audio https
1个回答
0
投票

我用以下代码解决了我的问题:

async def post_audio_file(url: str = None,
                   file_path: str = None,
                   session: 'async with aiohttp.ClientSession() as 
                  session' = None):

    async with session.post(url, data={'file': read_audio(file_path)}) 
    as response:

        response_text = await response.text()

        response_text = ast.literal_eval(response_text)

        response_text['file_name'] = os.path.basename(file_path) 
    
        return response_text

# ----------------------------------------------------- #

async def get_feature(url: str = None,
                  file_name: str = None,
                  token: str = None,
                  session: 'async with aiohttp.ClientSession() as 
                  session' = None):

    async with session.get(os.path.join(url, token)) as response:

        json_data = await response.json()

        all_data = {'file_name': file_name}

        all_data.update(json_data)

        status_code = response.status
    
        return {'file_name': file_name,
            'status_code': status_code,
            'json': all_data}

# ---------------------------------------------------- #

async def post_audio_files(url: str = None,
           file_paths: list = None,
           client_session: 'aiohttp.ClientSession()' = None):

    async with client_session as session:
        res = await asyncio.gather(*[post_audio_file(url = url,
                                              file_path = file_path,
                                              session = session)\
                                 for file_path in file_paths])

    print(f'All files have been uploaded ({len(res)})')

    return res

# ----------------------------------------------------- #

async def get_features_stat200(url: str = None,
               token_file_name_lst: list = None,
               client_session: 'aiohttp.ClientSession()' = None):



    async with client_session as session:

        animation = "|/-\\"
        idx = 0
    
        res_bool_lst = [False]
    
        while not all(res_bool_lst):

            print('Gather and check for results: ', animation[idx % 
            len(animation)], end="\r")
            idx += 1
        
            res = await asyncio.gather(*[get_feature(url = url,
                                                 file_name =                                           token_file_name[1],
                                                 token = token_file_name[0],
                                                 session = session)\
                                 for token_file_name in token_file_name_lst])

            res_bool_lst = [False if resp_dict['status_code'] != 200 else True for resp_dict in res]

    print(f'Received features from {len(token_lst)} tokens.')

    return res
© www.soinside.com 2019 - 2024. All rights reserved.