将音频流式传输到 Azure 实时分类服务

问题描述 投票:0回答:1

我有一个 udp 音频流,我想尽可能接近实时地转录。

为此,我正在使用 Azure 语音服务(https://learn.microsoft.com/en-us/azure/ai-services/speech-service/get-started-stt-diarization?tabs=windows&pivots=programming -language-python) 在 python 中。

使用麦克风作为源输入,系统运行良好。然而,当尝试使用 PullAudioStreamCallback 选项时,该服务甚至不会进入 read 方法。

对这种方法有什么想法吗?

class MyAudioStream(speechsdk.audio.PullAudioInputStreamCallback):
    def __init__(self, socket, channel):
        super().__init__()
        print("Init")
        self.socket = socket
        self.channel = channel  # Channel 0 for left, 1 for right
        self.buffer = bytearray()

    def read(self, buffer_size):
        # Fetch new data from socket if buffer is insufficient
        while len(self.buffer) < buffer_size * 2:  # x2 because stereo data
            data, _ = self.socket.recvfrom(buffer_size * 2)
            self.buffer.extend(data)

        # Extract the requested buffer size * 2 (to handle stereo data)
        data = self.buffer[:buffer_size * 2]
        del self.buffer[:buffer_size * 2]

        # Assuming 16-bit samples, split the stereo into mono
        audio_data = np.frombuffer(data, dtype=np.int16)  # Read data as 16-bit samples
        mono_data = audio_data[self.channel::2]  # Extract one channel
        # print(audio_data)
        return mono_data.tobytes()

    def close(self):
        print("Closed")
        self.socket.close()
        
def conversation_transcriber_recognition_canceled_cb(evt: speechsdk.SessionEventArgs):
    print('Canceled event')

def conversation_transcriber_session_stopped_cb(evt: speechsdk.SessionEventArgs):
    print('SessionStopped event')

def conversation_transcriber_transcribed_cb(evt: speechsdk.SpeechRecognitionEventArgs, file_handle):
    if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
        s_id, text = evt.result.speaker_id, evt.result.text
        file_handle.write('Speaker ID={}: {}\n'.format(s_id, text))
        print('Speaker ID={}: {}\n'.format(s_id, text))
    elif evt.result.reason == speechsdk.ResultReason.NoMatch:
        file_handle.write('NOMATCH: Speech could not be TRANSCRIBED: {}\n'.format(evt.result.no_match_details))


def conversation_transcriber_session_started_cb(evt: speechsdk.SessionEventArgs):
    print('SessionStarted event')


def create_transcriber(sock, sample_rate, channel):
    
    speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
    speech_config.speech_recognition_language="en-US"
    # Set the expected audio format (monaural)
    audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=sample_rate, bits_per_sample=16, channels=1)
    # Create an instance of PullAudioInputStream with the custom stream callback
    stream_callback = MyAudioStream(sock, channel)
    audio_input = speechsdk.audio.PullAudioInputStream(stream_callback, audio_format)
    audio_config = speechsdk.audio.AudioConfig(stream=audio_input)
    conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)

    return conversation_transcriber 

def main():
    # Setup the UDP socket common for both channels
    UDP_IP = MY_IP
    UDP_PORT = MY_PORT
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))
      
    transcribing_stop = False
    conversation_transcriber = create_transcriber(sock, 16000, 0)

    def stop_cb(evt: speechsdk.SessionEventArgs):
        #"""callback that signals to stop continuous recognition upon receiving an event `evt`"""
        print('CLOSING on {}'.format(evt))
        nonlocal transcribing_stop
        transcribing_stop = True

    output_file_path = "test.txt"
    with open(output_file_path, 'w') as file_handle:
        conversation_transcriber.transcribed.connect(lambda evt: conversation_transcriber_transcribed_cb(evt, file_handle))
        conversation_transcriber.session_started.connect(conversation_transcriber_session_started_cb)
        conversation_transcriber.session_stopped.connect(conversation_transcriber_session_stopped_cb)
        conversation_transcriber.canceled.connect(conversation_transcriber_recognition_canceled_cb)
        conversation_transcriber.session_stopped.connect(stop_cb)
        conversation_transcriber.canceled.connect(stop_cb)

        conversation_transcriber.start_transcribing_async()

        try:
            while not transcribing_stop:
                time.sleep(.5)
        except Exception as e:
            print(e)
            conversation_transcriber.stop_transcribing_async()
            conversation_transcriber.close()
            sock.close()

if __name__=="__main__":
      main()

或者,我测试了通过 HTTP 传输数据并使用 PushAudioInputStream 对象的系统。在这种情况下,数据似乎到达了 azure,但识别部分总是返回空。

这是第二次测试的代码:

def transcribe_stream(stream_url):
            
    # Initialize the Azure Speech SDK
    speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
    # Using a pull stream to get audio from an HTTP stream
    stream = speechsdk.audio.PushAudioInputStream()
    audio_input = speechsdk.AudioConfig(stream=stream)
    speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_input)

    # Function to handle recognized text
    def recognized(args):
        if args.result.reason == speechsdk.ResultReason.RecognizedSpeech:
            print(args)
            print("Recognized: {}".format(args.result.text))
        elif args.result.reason == speechsdk.ResultReason.NoMatch:
            print("No speech could be recognized")

    # Connect to the recognizer's events
    speech_recognizer.recognized.connect(recognized)

    # Begin continuous speech recognition
    speech_recognizer.start_continuous_recognition()
    try:
        # Stream the audio from the HTTP source
        with requests.get(stream_url, stream=True) as r:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    stream.write(chunk)
            stream.close()
    except Exception as e:
        print(f"Error streaming audio: {e}")
    finally:
        # Stop the continuous recognition once the stream ends
        print("END")
        speech_recognizer.stop_continuous_recognition()

# URL of the VLC stream
stream_url = 'http://my_address/end_point'
transcribe_stream(stream_url)

任何指导表示赞赏:)

python azure udp streaming azure-speech
1个回答
0
投票

要将提供的代码片段与 Azure 语音服务集成以对通过 UDP 接收的音频流进行实时分类,我们需要在服务器和客户端之间建立 UDP 连接。

  • 与 TCP 不同,UDP 不提供可靠性或顺序保证。因此,服务器只是将音频数据包发送到客户端,而不等待确认。
  • 客户端接收数据包并播放音频。因此,我们使用 Azure 语音服务播放接收到的音频进行分类。

我参考了这个链接来通过UDP套接字传输音频数据。

import socket
import threading
import wave
import pyaudio
import time
import math

host_ip = '192.168.1.104'  # Change this to the server's IP address
port = 9633

def audio_stream_UDP():
    BUFF_SIZE = 65536
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFF_SIZE)
    server_socket.bind((host_ip, port))

    CHUNK = 10 * 1024
    wf = wave.open("temp.wav", 'rb')
    p = pyaudio.PyAudio()

    print('Server listening at', (host_ip, port), wf.getframerate())
    stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                    channels=wf.getnchannels(),
                    rate=wf.getframerate(),
                    input=True,
                    frames_per_buffer=CHUNK)

    sample_rate = wf.getframerate()
    while True:
        msg, client_addr = server_socket.recvfrom(BUFF_SIZE)
        print('Got connection from', client_addr, msg)
        
        DATA_SIZE = math.ceil(wf.getnframes() / CHUNK)
        DATA_SIZE = str(DATA_SIZE).encode()
        print('Sending data size...', wf.getnframes() / sample_rate)
        server_socket.sendto(DATA_SIZE, client_addr)
        cnt = 0
        while True:
            data = wf.readframes(CHUNK)
            server_socket.sendto(data, client_addr)
            time.sleep(0.001)  # Adjust the delay according to your requirements
            print(cnt)
            if cnt > (wf.getnframes() / CHUNK):
                break
            cnt += 1

        break
    print('Data transmission complete')

t1 = threading.Thread(target=audio_stream_UDP, args=())
t1.start()

我提到了 Azure 语音服务的链接

class MyAudioStream(speechsdk.audio.PullAudioInputStreamCallback):
    def __init__(self, socket, channel):
        super().__init__()
        print("Init")
        self.socket = socket
        self.channel = channel  # Channel 0 for left, 1 for right
        self.buffer = bytearray()

    def read(self, buffer_size):
        # Fetch new data from socket if buffer is insufficient
        while len(self.buffer) < buffer_size * 2:  # x2 because stereo data
            data, _ = self.socket.recvfrom(buffer_size * 2)
            self.buffer.extend(data)

      
        data = self.buffer[:buffer_size * 2]
        del self.buffer[:buffer_size * 2]

        # Assuming 16-bit samples, split the stereo into mono
        audio_data = np.frombuffer(data, dtype=np.int16)  # Read data as 16-bit samples
        mono_data = audio_data[self.channel::2]  # Extract one channel
        return mono_data.tobytes()

    def close(self):
        print("Closed")
        self.socket.close()

def conversation_transcriber_transcribed_cb(evt: speechsdk.SpeechRecognitionEventArgs, file_handle):
    if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
        s_id, text = evt.result.speaker_id, evt.result.text
        file_handle.write('Speaker ID={}: {}\n'.format(s_id, text))
        print('Speaker ID={}: {}\n'.format(s_id, text))
    elif evt.result.reason == speechsdk.ResultReason.NoMatch:
        file_handle.write('NOMATCH: Speech could not be TRANSCRIBED: {}\n'.format(evt.result.no_match_details))

def create_transcriber(sock, sample_rate, channel):
    speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
    speech_config.speech_recognition_language="en-US"
    # Set the expected audio format (monaural)
    audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=sample_rate, bits_per_sample=16, channels=1)
    
    stream_callback = MyAudioStream(sock, channel)
    audio_input = speechsdk.audio.PullAudioInputStream(stream_callback, audio_format)
    audio_config = speechsdk.audio.AudioConfig(stream=audio_input)
    conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)

    return conversation_transcriber 

def main():
    # Setup the UDP socket common for both channels
    UDP_IP = "YourIPAddress"
    UDP_PORT = 12345  # Replace with your UDP port
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))
      
    transcribing_stop = False
    conversation_transcriber = create_transcriber(sock, 16000, 0)

    def stop_cb(evt: speechsdk.SessionEventArgs):
        print('CLOSING on {}'.format(evt))
        nonlocal transcribing_stop
        transcribing_stop = True

    output_file_path = "transcription.txt"
    with open(output_file_path, 'w') as file_handle:
        conversation_transcriber.transcribed.connect(lambda evt: conversation_transcriber_transcribed_cb(evt, file_handle))
        conversation_transcriber.session_stopped.connect(stop_cb)

        conversation_transcriber.start_transcribing_async()

        try:
            while not transcribing_stop:
                time.sleep(.5)
        except Exception as e:
            print(e)
            conversation_transcriber.stop_transcribing_async()
            conversation_transcriber.close()
            sock.close()

enter image description here

有关使用 UDP 通道作为 ConversationTranscriber 输入的替代方法,请参阅此链接

© www.soinside.com 2019 - 2024. All rights reserved.