我有一个 udp 音频流,我想尽可能接近实时地转录。
为此,我正在使用 Azure 语音服务(https://learn.microsoft.com/en-us/azure/ai-services/speech-service/get-started-stt-diarization?tabs=windows&pivots=programming -language-python) 在 python 中。
使用麦克风作为源输入,系统运行良好。然而,当尝试使用 PullAudioStreamCallback 选项时,该服务甚至不会进入 read 方法。
对这种方法有什么想法吗?
class MyAudioStream(speechsdk.audio.PullAudioInputStreamCallback):
def __init__(self, socket, channel):
super().__init__()
print("Init")
self.socket = socket
self.channel = channel # Channel 0 for left, 1 for right
self.buffer = bytearray()
def read(self, buffer_size):
# Fetch new data from socket if buffer is insufficient
while len(self.buffer) < buffer_size * 2: # x2 because stereo data
data, _ = self.socket.recvfrom(buffer_size * 2)
self.buffer.extend(data)
# Extract the requested buffer size * 2 (to handle stereo data)
data = self.buffer[:buffer_size * 2]
del self.buffer[:buffer_size * 2]
# Assuming 16-bit samples, split the stereo into mono
audio_data = np.frombuffer(data, dtype=np.int16) # Read data as 16-bit samples
mono_data = audio_data[self.channel::2] # Extract one channel
# print(audio_data)
return mono_data.tobytes()
def close(self):
print("Closed")
self.socket.close()
def conversation_transcriber_recognition_canceled_cb(evt: speechsdk.SessionEventArgs):
print('Canceled event')
def conversation_transcriber_session_stopped_cb(evt: speechsdk.SessionEventArgs):
print('SessionStopped event')
def conversation_transcriber_transcribed_cb(evt: speechsdk.SpeechRecognitionEventArgs, file_handle):
if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
s_id, text = evt.result.speaker_id, evt.result.text
file_handle.write('Speaker ID={}: {}\n'.format(s_id, text))
print('Speaker ID={}: {}\n'.format(s_id, text))
elif evt.result.reason == speechsdk.ResultReason.NoMatch:
file_handle.write('NOMATCH: Speech could not be TRANSCRIBED: {}\n'.format(evt.result.no_match_details))
def conversation_transcriber_session_started_cb(evt: speechsdk.SessionEventArgs):
print('SessionStarted event')
def create_transcriber(sock, sample_rate, channel):
speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
speech_config.speech_recognition_language="en-US"
# Set the expected audio format (monaural)
audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=sample_rate, bits_per_sample=16, channels=1)
# Create an instance of PullAudioInputStream with the custom stream callback
stream_callback = MyAudioStream(sock, channel)
audio_input = speechsdk.audio.PullAudioInputStream(stream_callback, audio_format)
audio_config = speechsdk.audio.AudioConfig(stream=audio_input)
conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)
return conversation_transcriber
def main():
# Setup the UDP socket common for both channels
UDP_IP = MY_IP
UDP_PORT = MY_PORT
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
transcribing_stop = False
conversation_transcriber = create_transcriber(sock, 16000, 0)
def stop_cb(evt: speechsdk.SessionEventArgs):
#"""callback that signals to stop continuous recognition upon receiving an event `evt`"""
print('CLOSING on {}'.format(evt))
nonlocal transcribing_stop
transcribing_stop = True
output_file_path = "test.txt"
with open(output_file_path, 'w') as file_handle:
conversation_transcriber.transcribed.connect(lambda evt: conversation_transcriber_transcribed_cb(evt, file_handle))
conversation_transcriber.session_started.connect(conversation_transcriber_session_started_cb)
conversation_transcriber.session_stopped.connect(conversation_transcriber_session_stopped_cb)
conversation_transcriber.canceled.connect(conversation_transcriber_recognition_canceled_cb)
conversation_transcriber.session_stopped.connect(stop_cb)
conversation_transcriber.canceled.connect(stop_cb)
conversation_transcriber.start_transcribing_async()
try:
while not transcribing_stop:
time.sleep(.5)
except Exception as e:
print(e)
conversation_transcriber.stop_transcribing_async()
conversation_transcriber.close()
sock.close()
if __name__=="__main__":
main()
或者,我测试了通过 HTTP 传输数据并使用 PushAudioInputStream 对象的系统。在这种情况下,数据似乎到达了 azure,但识别部分总是返回空。
这是第二次测试的代码:
def transcribe_stream(stream_url):
# Initialize the Azure Speech SDK
speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
# Using a pull stream to get audio from an HTTP stream
stream = speechsdk.audio.PushAudioInputStream()
audio_input = speechsdk.AudioConfig(stream=stream)
speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_input)
# Function to handle recognized text
def recognized(args):
if args.result.reason == speechsdk.ResultReason.RecognizedSpeech:
print(args)
print("Recognized: {}".format(args.result.text))
elif args.result.reason == speechsdk.ResultReason.NoMatch:
print("No speech could be recognized")
# Connect to the recognizer's events
speech_recognizer.recognized.connect(recognized)
# Begin continuous speech recognition
speech_recognizer.start_continuous_recognition()
try:
# Stream the audio from the HTTP source
with requests.get(stream_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
stream.write(chunk)
stream.close()
except Exception as e:
print(f"Error streaming audio: {e}")
finally:
# Stop the continuous recognition once the stream ends
print("END")
speech_recognizer.stop_continuous_recognition()
# URL of the VLC stream
stream_url = 'http://my_address/end_point'
transcribe_stream(stream_url)
任何指导表示赞赏:)
要将提供的代码片段与 Azure 语音服务集成以对通过 UDP 接收的音频流进行实时分类,我们需要在服务器和客户端之间建立 UDP 连接。
我参考了这个链接来通过UDP套接字传输音频数据。
import socket
import threading
import wave
import pyaudio
import time
import math
host_ip = '192.168.1.104' # Change this to the server's IP address
port = 9633
def audio_stream_UDP():
BUFF_SIZE = 65536
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFF_SIZE)
server_socket.bind((host_ip, port))
CHUNK = 10 * 1024
wf = wave.open("temp.wav", 'rb')
p = pyaudio.PyAudio()
print('Server listening at', (host_ip, port), wf.getframerate())
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
input=True,
frames_per_buffer=CHUNK)
sample_rate = wf.getframerate()
while True:
msg, client_addr = server_socket.recvfrom(BUFF_SIZE)
print('Got connection from', client_addr, msg)
DATA_SIZE = math.ceil(wf.getnframes() / CHUNK)
DATA_SIZE = str(DATA_SIZE).encode()
print('Sending data size...', wf.getnframes() / sample_rate)
server_socket.sendto(DATA_SIZE, client_addr)
cnt = 0
while True:
data = wf.readframes(CHUNK)
server_socket.sendto(data, client_addr)
time.sleep(0.001) # Adjust the delay according to your requirements
print(cnt)
if cnt > (wf.getnframes() / CHUNK):
break
cnt += 1
break
print('Data transmission complete')
t1 = threading.Thread(target=audio_stream_UDP, args=())
t1.start()
我提到了 Azure 语音服务的链接。
class MyAudioStream(speechsdk.audio.PullAudioInputStreamCallback):
def __init__(self, socket, channel):
super().__init__()
print("Init")
self.socket = socket
self.channel = channel # Channel 0 for left, 1 for right
self.buffer = bytearray()
def read(self, buffer_size):
# Fetch new data from socket if buffer is insufficient
while len(self.buffer) < buffer_size * 2: # x2 because stereo data
data, _ = self.socket.recvfrom(buffer_size * 2)
self.buffer.extend(data)
data = self.buffer[:buffer_size * 2]
del self.buffer[:buffer_size * 2]
# Assuming 16-bit samples, split the stereo into mono
audio_data = np.frombuffer(data, dtype=np.int16) # Read data as 16-bit samples
mono_data = audio_data[self.channel::2] # Extract one channel
return mono_data.tobytes()
def close(self):
print("Closed")
self.socket.close()
def conversation_transcriber_transcribed_cb(evt: speechsdk.SpeechRecognitionEventArgs, file_handle):
if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
s_id, text = evt.result.speaker_id, evt.result.text
file_handle.write('Speaker ID={}: {}\n'.format(s_id, text))
print('Speaker ID={}: {}\n'.format(s_id, text))
elif evt.result.reason == speechsdk.ResultReason.NoMatch:
file_handle.write('NOMATCH: Speech could not be TRANSCRIBED: {}\n'.format(evt.result.no_match_details))
def create_transcriber(sock, sample_rate, channel):
speech_config = speechsdk.SpeechConfig(subscription=key_, region=region_)
speech_config.speech_recognition_language="en-US"
# Set the expected audio format (monaural)
audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=sample_rate, bits_per_sample=16, channels=1)
stream_callback = MyAudioStream(sock, channel)
audio_input = speechsdk.audio.PullAudioInputStream(stream_callback, audio_format)
audio_config = speechsdk.audio.AudioConfig(stream=audio_input)
conversation_transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)
return conversation_transcriber
def main():
# Setup the UDP socket common for both channels
UDP_IP = "YourIPAddress"
UDP_PORT = 12345 # Replace with your UDP port
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
transcribing_stop = False
conversation_transcriber = create_transcriber(sock, 16000, 0)
def stop_cb(evt: speechsdk.SessionEventArgs):
print('CLOSING on {}'.format(evt))
nonlocal transcribing_stop
transcribing_stop = True
output_file_path = "transcription.txt"
with open(output_file_path, 'w') as file_handle:
conversation_transcriber.transcribed.connect(lambda evt: conversation_transcriber_transcribed_cb(evt, file_handle))
conversation_transcriber.session_stopped.connect(stop_cb)
conversation_transcriber.start_transcribing_async()
try:
while not transcribing_stop:
time.sleep(.5)
except Exception as e:
print(e)
conversation_transcriber.stop_transcribing_async()
conversation_transcriber.close()
sock.close()
有关使用 UDP 通道作为 ConversationTranscriber 输入的替代方法,请参阅此链接。