8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

结合线程和 asyncio 通过 WebSocket 连接处理音频流

Mike Bannister 1月前

24 0

概述我有一台服务器,它与一个客户端应用程序建立了开放的 WebSocket 连接。此客户端应用程序(Android 应用程序)可以发送实时麦克风音频数据。服务器在

概述

我有一台服务器,它与一个客户端应用程序建立了开放的 WebSocket 连接。此客户端应用程序(Android 应用程序)可以发送实时麦克风音频数据。服务器在收到此数据后需要做的是回复部分转录,以便用户可以看到他们所说的内容被实时转录。我为此使用了 Google Speech-to-text API。

我也清楚地知道,Android 有一个内置的语音识别器,可以实现这一点。

服务器使用 启动 asyncio.run ,传入的数据被传递给所有使用异步方法的处理程序。这些方法负责处理音频帧的接收:

elif action == util.ActionMessages.AUDIO_FRAME:
    audio_id, audio = content["id"], content["audio"]
    await self._audio_handler.receive_audio(audio, audio_id)


# Audio handler method
class AudioHandler:
    def __init__(self, client_handler: ClientHandler):
        self._client_handler = client_handler

        self._audio_finished = dict()

        self._is_streaming = False
        self._audio_queue = queue.Queue()
        self._languages = "en-US"

        self._speech_client = speech.SpeechClient()
        config = speech.RecognitionConfig(...)
        self._streaming_config = speech.StreamingRecognitionConfig(...)

        self._executor = ThreadPoolExecutor(max_workers=1)


    async def receive_audio(self, content: str | None, audio_id: str):
        is_audio_complete = self._audio_finished.setdefault(audio_id, False)
        if content and not is_audio_complete:
            self._is_streaming = True
            content = base64.b64decode(content)
            self._audio_queue.put(content)

            future = self._executor.submit(self._build_requests)
            future.add_done_callback(lambda f: self._on_audio_processing_complete(f, audio_id))
            self._request_built = True

        elif is_audio_complete:
            # TODO: Implement audio processing complete like clean up dictionary
            pass

        else:
            self._is_streaming = False
            self._audio_queue.put(None)

    def _on_audio_processing_complete(self, future, audio_id):
        self._audio_finished[audio_id] = True
        self._request_built = False

    def _read_audio(self):
        while self._is_streaming:
            chunk = self._audio_queue.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = self._audio_queue.get_nowait()
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)

    def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Send transcript clients
            print(transcript + overwrite_chars)

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

    def _build_requests(self):
        audio_generator = self._read_audio()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )

        responses = self._speech_client.streaming_recognize(self._streaming_config, requests)
        self._listen_print_loop(responses)

当音频带有话语 ID(音频 ID)时,它会填满一个队列。第一次到达时,会启动一个新线程,该线程实例化一个生成器,该生成器从队列中读取音频样本并将其转换为适当的类型。谷歌语音客户端使用此生成器执行转录。此语音客户端返回一个响应生成器,该 _listen_print_loop 方法使用它(目前)打印响应/转录。

使用 Google 语音 API 的逻辑主要基于他们的 docs .

问题

可以想象,打印转录服务器端并不是我想要的。我想将这些部分转录发送到我的客户端应用程序。但是,我用来通过套接字发送消息的方法是异步的,因此在此实现中无法从方法中发送, _listen_print_loop 因为它本身不是异步的。这就是我的意思:

def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Cannot do this!
            await send_to_client(transcript + overwrite_chars)

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

我想知道最好的解决方案是什么。是从使用线程切换到仅使用 asyncio 吗?如果是这样,那是否意味着我必须实现异步生成器函数?这不会导致语音客户端出现问题吗?

我对 asyncio 还比较陌生,如能得到任何指点我将非常感激!

帖子版权声明 1、本帖标题:结合线程和 asyncio 通过 WebSocket 连接处理音频流
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Mike Bannister在本站《python》版块原创发布, 转载请注明出处!
最新回复 (0)
返回
作者最近主题: