概述我有一台服务器,它与一个客户端应用程序建立了开放的 WebSocket 连接。此客户端应用程序(Android 应用程序)可以发送实时麦克风音频数据。服务器在
我有一台服务器,它与一个客户端应用程序建立了开放的 WebSocket 连接。此客户端应用程序(Android 应用程序)可以发送实时麦克风音频数据。服务器在收到此数据后需要做的是回复部分转录,以便用户可以看到他们所说的内容被实时转录。我为此使用了 Google Speech-to-text API。
我也清楚地知道,Android 有一个内置的语音识别器,可以实现这一点。
服务器使用 启动 asyncio.run
,传入的数据被传递给所有使用异步方法的处理程序。这些方法负责处理音频帧的接收:
elif action == util.ActionMessages.AUDIO_FRAME:
audio_id, audio = content["id"], content["audio"]
await self._audio_handler.receive_audio(audio, audio_id)
# Audio handler method
class AudioHandler:
def __init__(self, client_handler: ClientHandler):
self._client_handler = client_handler
self._audio_finished = dict()
self._is_streaming = False
self._audio_queue = queue.Queue()
self._languages = "en-US"
self._speech_client = speech.SpeechClient()
config = speech.RecognitionConfig(...)
self._streaming_config = speech.StreamingRecognitionConfig(...)
self._executor = ThreadPoolExecutor(max_workers=1)
async def receive_audio(self, content: str | None, audio_id: str):
is_audio_complete = self._audio_finished.setdefault(audio_id, False)
if content and not is_audio_complete:
self._is_streaming = True
content = base64.b64decode(content)
self._audio_queue.put(content)
future = self._executor.submit(self._build_requests)
future.add_done_callback(lambda f: self._on_audio_processing_complete(f, audio_id))
self._request_built = True
elif is_audio_complete:
# TODO: Implement audio processing complete like clean up dictionary
pass
else:
self._is_streaming = False
self._audio_queue.put(None)
def _on_audio_processing_complete(self, future, audio_id):
self._audio_finished[audio_id] = True
self._request_built = False
def _read_audio(self):
while self._is_streaming:
chunk = self._audio_queue.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self._audio_queue.get_nowait()
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b"".join(data)
def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
# Send transcript clients
print(transcript + overwrite_chars)
if not result.is_final:
num_chars_printed = len(transcript)
else:
self._is_streaming = False
return
def _build_requests(self):
audio_generator = self._read_audio()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)
responses = self._speech_client.streaming_recognize(self._streaming_config, requests)
self._listen_print_loop(responses)
当音频带有话语 ID(音频 ID)时,它会填满一个队列。第一次到达时,会启动一个新线程,该线程实例化一个生成器,该生成器从队列中读取音频样本并将其转换为适当的类型。谷歌语音客户端使用此生成器执行转录。此语音客户端返回一个响应生成器,该 _listen_print_loop
方法使用它(目前)打印响应/转录。
使用 Google 语音 API 的逻辑主要基于他们的 docs .
可以想象,打印转录服务器端并不是我想要的。我想将这些部分转录发送到我的客户端应用程序。但是,我用来通过套接字发送消息的方法是异步的,因此在此实现中无法从方法中发送, _listen_print_loop
因为它本身不是异步的。这就是我的意思:
def _listen_print_loop(self, responses):
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
# Cannot do this!
await send_to_client(transcript + overwrite_chars)
if not result.is_final:
num_chars_printed = len(transcript)
else:
self._is_streaming = False
return
我想知道最好的解决方案是什么。是从使用线程切换到仅使用 asyncio 吗?如果是这样,那是否意味着我必须实现异步生成器函数?这不会导致语音客户端出现问题吗?
我对 asyncio 还比较陌生,如能得到任何指点我将非常感激!