Skip to content

18. 语音与实时交互

语音交互是 LLM 应用从文本走向多模态的关键一步。本章覆盖 STT、TTS、实时语音 Agent、VAD、音频处理流水线、延迟优化等核心话题,帮助你构建低延迟、高质量的语音交互系统。


Q: 语音交互系统的整体架构是怎样的?⭐⭐

答:

一个完整的语音交互系统通常包含以下模块:

用户麦克风 → VAD(语音活动检测) → STT(语音转文本) → LLM(大语言模型) → TTS(文本转语音) → 扬声器

核心流水线:

python
import asyncio
from dataclasses import dataclass
from typing import AsyncIterator

@dataclass
class AudioConfig:
    sample_rate: int = 16000
    channels: int = 1
    chunk_duration_ms: int = 30  # 每个音频块的时长(毫秒)

class VoicePipeline:
    """语音交互核心流水线"""
    
    def __init__(self, stt_engine, tts_engine, llm_client):
        self.stt = stt_engine
        self.tts = tts_engine
        self.llm = llm_client
        self.vad = WebRTCVAD(aggressiveness=2)
    
    async def process_audio_stream(self, audio_stream: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
        """处理完整的音频流:录音 → STT → LLM → TTS → 播放"""
        
        # 1. 收集语音片段(VAD 检测到语音结束时触发)
        audio_buffer = bytearray()
        is_speaking = False
        
        async for chunk in audio_stream:
            is_speech = self.vad.is_speech(chunk)
            
            if is_speech:
                is_speaking = True
                audio_buffer.extend(chunk)
            elif is_speaking:
                # 语音结束,开始处理
                is_speaking = False
                
                # 2. STT: 语音转文本
                text = await self.stt.transcribe(bytes(audio_buffer))
                if not text.strip():
                    audio_buffer.clear()
                    continue
                
                # 3. LLM: 生成回复
                response = await self.llm.chat(text)
                
                # 4. TTS: 文本转语音并流式输出
                async for audio_chunk in self.tts.stream_speak(response):
                    yield audio_chunk
                
                audio_buffer.clear()

关键设计要点:

  • 流式处理:每个环节尽量流式,减少整体延迟
  • 异步并发:LLM 推理和 TTS 可以 pipeline 化
  • VAD 断句:准确的语音端点检测直接影响用户体验

Q: 如何使用 OpenAI Whisper 和 faster-whisper 进行语音转文本?⭐⭐⭐

答:

方案一:OpenAI Whisper(本地部署)

python
import whisper
import numpy as np

class WhisperSTT:
    def __init__(self, model_size="base"):
        """
        model_size: tiny, base, small, medium, large
        - tiny:   ~1GB VRAM, 最快,准确率一般
        - base:   ~1GB VRAM, 较快
        - small:  ~2GB VRAM, 平衡
        - medium: ~5GB VRAM, 较准
        - large:  ~10GB VRAM, 最准
        """
        self.model = whisper.load_model(model_size)
    
    def transcribe(self, audio_path: str, language: str = None) -> str:
        result = self.model.transcribe(
            audio_path,
            language=language,       # 指定语言可提高准确率
            fp16=False,              # CPU 环境设为 False
            initial_prompt="以下是普通话的句子。",  # 中文提示可提高中文识别率
        )
        return result["text"]
    
    def transcribe_bytes(self, audio_bytes: bytes, sample_rate: int = 16000) -> str:
        """从内存中的音频字节流转录"""
        audio_array = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
        result = self.model.transcribe(audio_array, fp16=False)
        return result["text"]

方案二:faster-whisper(推荐,基于 CTranslate2 加速)

python
from faster_whisper import WhisperModel
import time

class FasterWhisperSTT:
    def __init__(self, model_size="base", device="auto", compute_type="float32"):
        """
        compute_type: float32, float16, int8
        int8 量化可在 CPU 上获得 4 倍加速
        """
        self.model = WhisperModel(
            model_size,
            device=device,          # auto, cpu, cuda
            compute_type=compute_type
        )
    
    def transcribe(self, audio_path: str, language: str = None) -> dict:
        segments, info = self.model.transcribe(
            audio_path,
            language=language,
            beam_size=5,
            vad_filter=True,        # 启用 VAD 过滤静音段
            vad_parameters=dict(
                min_silence_duration_ms=500,
                speech_pad_ms=200,
            ),
        )
        
        full_text = ""
        segment_list = []
        for segment in segments:
            full_text += segment.text
            segment_list.append({
                "start": segment.start,
                "end": segment.end,
                "text": segment.text,
            })
        
        return {
            "text": full_text,
            "language": info.language,
            "duration": info.duration,
            "segments": segment_list,
        }
    
    def transcribe_streaming(self, audio_path: str):
        """流式返回识别结果(适合长音频)"""
        segments, _ = self.model.transcribe(audio_path, beam_size=5)
        for segment in segments:
            yield {
                "start": segment.start,
                "end": segment.end,
                "text": segment.text,
            }

# 使用示例
stt = FasterWhisperSTT(model_size="small", compute_type="int8")
result = stt.transcribe("recording.wav", language="zh")
print(f"识别结果: {result['text']}")
print(f"语言: {result['language']}, 时长: {result['duration']:.1f}s")

方案三:Groq Whisper API(云端,极快)

python
import httpx
import io

class GroqWhisperSTT:
    """使用 Groq 的 Whisper Large V3 API,推理速度极快"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.groq.com/openai/v1"
    
    async def transcribe(self, audio_path: str, language: str = "zh") -> str:
        async with httpx.AsyncClient() as client:
            with open(audio_path, "rb") as f:
                response = await client.post(
                    f"{self.base_url}/audio/transcriptions",
                    headers={"Authorization": f"Bearer {self.api_key}"},
                    files={"file": ("audio.wav", f, "audio/wav")},
                    data={
                        "model": "whisper-large-v3",
                        "language": language,
                        "response_format": "verbose_json",
                    },
                    timeout=30,
                )
            result = response.json()
            return result["text"]
    
    async def transcribe_bytes(self, audio_bytes: bytes, filename: str = "audio.wav") -> str:
        """从字节流转录"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/audio/transcriptions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                files={"file": (filename, io.BytesIO(audio_bytes), "audio/wav")},
                data={"model": "whisper-large-v3", "language": "zh"},
                timeout=30,
            )
            return response.json()["text"]

三种方案对比:

特性OpenAI Whisperfaster-whisperGroq Whisper API
部署方式本地本地云端
速度基准4-8x 加速极快(硬件加速)
成本GPU 硬件成本GPU/CPU 均可按量付费
离线可用
推荐场景研究/原型生产环境本地部署生产环境云端

Q: 如何实现高质量的 TTS(文本转语音)?⭐⭐⭐

答:

方案一:Edge TTS(免费,微软 Azure 引擎)

python
import edge_tts
import asyncio

class EdgeTTSEngine:
    """使用 Edge TTS,免费且支持多种语言和音色"""
    
    # 常用中文音色
    VOICES = {
        "zh-female": "zh-CN-XiaoxiaoNeural",      # 活泼女声
        "zh-male": "zh-CN-YunxiNeural",            # 男声
        "zh-female-warm": "zh-CN-XiaohanNeural",   # 温暖女声
        "zh-female-news": "zh-CN-XiaomoNeural",    # 新闻播报
        "en-female": "en-US-JennyNeural",           # 英文女声
        "en-male": "en-US-GuyNeural",              # 英文男声
    }
    
    def __init__(self, voice: str = "zh-CN-XiaoxiaoNeural"):
        self.voice = voice
    
    async def synthesize(self, text: str, output_path: str):
        """合成语音并保存到文件"""
        communicate = edge_tts.Communicate(text, self.voice)
        await communicate.save(output_path)
    
    async def stream_synthesize(self, text: str):
        """流式合成,逐块返回音频数据"""
        communicate = edge_tts.Communicate(
            text, 
            self.voice,
            rate="+0%",     # 语速调整: -50% ~ +100%
            volume="+0%",   # 音量调整: -50% ~ +100%
            pitch="+0Hz",   # 音调调整: -50Hz ~ +50Hz
        )
        
        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                yield chunk["data"]
    
    async def list_voices(self, language: str = "zh"):
        """列出可用音色"""
        voices = await edge_tts.list_voices()
        return [v for v in voices if v["Locale"].startswith(language)]

# 使用示例
async def main():
    tts = EdgeTTSEngine(voice="zh-CN-XiaoxiaoNeural")
    
    # 保存到文件
    await tts.synthesize("你好,欢迎使用语音助手!", "output.mp3")
    
    # 流式合成
    with open("stream_output.mp3", "wb") as f:
        async for chunk in tts.stream_synthesize("这是一段流式合成的测试文本。"):
            f.write(chunk)

asyncio.run(main())

方案二:OpenAI TTS

python
import openai
from openai import AsyncOpenAI

class OpenAITTSEngine:
    """OpenAI TTS API,支持 6 种音色,质量高"""
    
    VOICES = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
    
    def __init__(self, api_key: str, model: str = "tts-1"):
        """
        model: tts-1 (低延迟) 或 tts-1-hd (高质量)
        """
        self.client = AsyncOpenAI(api_key=api_key)
        self.model = model
    
    async def synthesize(self, text: str, output_path: str, voice: str = "nova"):
        response = await self.client.audio.speech.create(
            model=self.model,
            voice=voice,
            input=text,
            response_format="mp3",  # mp3, opus, aac, flac, wav, pcm
            speed=1.0,              # 0.25 ~ 4.0
        )
        response.stream_to_file(output_path)
    
    async def stream_synthesize(self, text: str, voice: str = "nova"):
        """流式返回音频数据"""
        response = await self.client.audio.speech.create(
            model=self.model,
            voice=voice,
            input=text,
            response_format="pcm",  # PCM 格式适合流式播放
        )
        async for chunk in response.iter_bytes(chunk_size=4096):
            yield chunk

# 使用示例
async def main():
    tts = OpenAITTSEngine(api_key="sk-xxx")
    await tts.synthesize("Hello, this is a test.", "output.mp3", voice="nova")

asyncio.run(main())

方案三:MiniMax TTS(高质量中文语音)

python
import httpx

class MiniMaxTTSEngine:
    """MiniMax TTS API,中文语音质量极高"""
    
    def __init__(self, api_key: str, group_id: str):
        self.api_key = api_key
        self.group_id = group_id
        self.base_url = "https://api.minimax.chat/v1"
    
    async def synthesize(self, text: str, voice_id: str = "female-shaonv") -> bytes:
        """
        voice_id 可选:
        - female-shaonv: 少女音
        - female-yujie: 御姐音
        - male-qn-jingying: 精英男
        - male-qn-badao: 霸道男
        - presenter_male: 男主持人
        - presenter_female: 女主持人
        - audiobook_male_1: 有声书男声
        - audiobook_female_1: 有声书女声
        """
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/t2a_v2",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                },
                json={
                    "model": "speech-01-turbo",
                    "text": text,
                    "voice_setting": {
                        "voice_id": voice_id,
                        "speed": 1.0,
                        "vol": 1.0,
                        "pitch": 0,
                    },
                    "audio_setting": {
                        "sample_rate": 32000,
                        "bitrate": 128000,
                        "format": "mp3",
                    },
                },
                timeout=60,
            )
            result = response.json()
            # 返回的音频数据是 hex 编码的
            audio_hex = result["data"]["audio"]
            return bytes.fromhex(audio_hex)
    
    async def stream_synthesize(self, text: str, voice_id: str = "female-shaonv"):
        """流式合成"""
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{self.base_url}/t2a_v2",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                },
                json={
                    "model": "speech-01-turbo",
                    "text": text,
                    "stream": True,
                    "voice_setting": {"voice_id": voice_id},
                },
                timeout=60,
            ) as response:
                async for line in response.aiter_lines():
                    if line.startswith("data:"):
                        import json
                        data = json.loads(line[5:])
                        if "data" in data and "audio" in data["data"]:
                            yield bytes.fromhex(data["data"]["audio"])

Q: 如何实现 WebSocket 实时语音交互?⭐⭐⭐

答:

实时语音交互需要低延迟的双向通信,WebSocket 是最佳选择。

服务端:

python
import asyncio
import websockets
import json
import numpy as np
from faster_whisper import WhisperModel

class RealtimeVoiceServer:
    def __init__(self):
        self.stt = WhisperModel("base", device="cpu", compute_type="int8")
        self.active_sessions = {}
    
    async def handle_client(self, websocket, path=None):
        session_id = id(websocket)
        audio_buffer = bytearray()
        print(f"Client {session_id} connected")
        
        try:
            async for message in websocket:
                if isinstance(message, bytes):
                    # 音频数据帧
                    audio_buffer.extend(message)
                    
                    # 每积累 1 秒音频就尝试识别
                    if len(audio_buffer) >= 16000 * 2:  # 16kHz, 16bit
                        text = await self._transcribe(bytes(audio_buffer))
                        audio_buffer.clear()
                        
                        if text.strip():
                            await websocket.send(json.dumps({
                                "type": "transcription",
                                "text": text,
                            }))
                            
                            # 获取 LLM 回复
                            response = await self._get_llm_response(text)
                            
                            # TTS 合成并流式发送
                            await websocket.send(json.dumps({
                                "type": "response_start",
                                "text": response,
                            }))
                            
                            async for audio_chunk in self._tts_synthesize(response):
                                await websocket.send(audio_chunk)
                            
                            await websocket.send(json.dumps({
                                "type": "response_end",
                            }))
                
                elif isinstance(message, str):
                    # 控制消息
                    cmd = json.loads(message)
                    if cmd["type"] == "config":
                        self.active_sessions[session_id] = cmd.get("config", {})
                    elif cmd["type"] == "end_of_speech":
                        # 用户主动结束语音
                        if audio_buffer:
                            text = await self._transcribe(bytes(audio_buffer))
                            audio_buffer.clear()
                            # ... 同上处理逻辑
        
        finally:
            self.active_sessions.pop(session_id, None)
            print(f"Client {session_id} disconnected")
    
    async def _transcribe(self, audio_bytes: bytes) -> str:
        loop = asyncio.get_event_loop()
        audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
        segments, _ = await loop.run_in_executor(
            None, lambda: self.stt.transcribe(audio, beam_size=3)
        )
        return "".join(s.text for s in segments)
    
    async def _get_llm_response(self, text: str) -> str:
        # 调用 LLM API
        return f"Echo: {text}"
    
    async def _tts_synthesize(self, text: str):
        # 调用 TTS 引擎
        yield b""

# 启动服务
async def main():
    server = await websockets.serve(
        RealtimeVoiceServer().handle_client,
        "0.0.0.0",
        8765,
        max_size=10 * 1024 * 1024,  # 10MB
    )
    print("WebSocket server started on ws://0.0.0.0:8765")
    await server.wait_closed()

asyncio.run(main())

客户端(浏览器 JavaScript):

javascript
class RealtimeVoiceClient {
    constructor(wsUrl) {
        this.wsUrl = wsUrl;
        this.ws = null;
        this.audioContext = null;
        this.mediaStream = null;
        this.isRecording = false;
    }

    async connect() {
        this.ws = new WebSocket(this.wsUrl);
        this.ws.binaryType = 'arraybuffer';

        this.ws.onmessage = (event) => {
            if (event.data instanceof ArrayBuffer) {
                // 音频数据,播放
                this.playAudioChunk(event.data);
            } else {
                // JSON 控制消息
                const msg = JSON.parse(event.data);
                switch (msg.type) {
                    case 'transcription':
                        console.log('识别结果:', msg.text);
                        this.onTranscription?.(msg.text);
                        break;
                    case 'response_start':
                        console.log('AI回复:', msg.text);
                        this.onResponse?.(msg.text);
                        break;
                    case 'response_end':
                        console.log('回复结束');
                        this.onResponseEnd?.();
                        break;
                }
            }
        };

        return new Promise((resolve) => {
            this.ws.onopen = resolve;
        });
    }

    async startRecording() {
        this.mediaStream = await navigator.mediaDevices.getUserMedia({
            audio: {
                sampleRate: 16000,
                channelCount: 1,
                echoCancellation: true,
                noiseSuppression: true,
            }
        });

        this.audioContext = new AudioContext({ sampleRate: 16000 });
        const source = this.audioContext.createMediaStreamSource(this.mediaStream);
        
        // 使用 ScriptProcessorNode 或 AudioWorklet 处理音频
        const processor = this.audioContext.createScriptProcessor(4096, 1, 1);
        
        processor.onaudioprocess = (e) => {
            if (!this.isRecording) return;
            const inputData = e.inputBuffer.getChannelData(0);
            // Float32 转 Int16
            const int16Data = new Int16Array(inputData.length);
            for (let i = 0; i < inputData.length; i++) {
                int16Data[i] = Math.max(-32768, Math.min(32767, inputData[i] * 32768));
            }
            this.ws.send(int16Data.buffer);
        };

        source.connect(processor);
        processor.connect(this.audioContext.destination);
        this.isRecording = true;
    }

    stopRecording() {
        this.isRecording = false;
        this.mediaStream?.getTracks().forEach(t => t.stop());
        this.audioContext?.close();
    }

    async playAudioChunk(arrayBuffer) {
        // 播放接收到的音频数据
        const audioContext = new AudioContext();
        const audioBuffer = await audioContext.decodeAudioData(arrayBuffer);
        const source = audioContext.createBufferSource();
        source.buffer = audioBuffer;
        source.connect(audioContext.destination);
        source.start();
    }
}

// 使用
const client = new RealtimeVoiceClient('ws://localhost:8765');
await client.connect();
await client.startRecording();

Q: 如何实现语音活动检测(VAD)?⭐⭐⭐

答:

VAD 用于检测音频中是否包含人声,是语音交互系统的核心组件。

方案一:WebRTC VAD(轻量级)

python
import webrtcvad
import collections
import struct

class WebRTCVAD:
    """基于 WebRTC 的 VAD,极其轻量高效"""
    
    def __init__(self, aggressiveness: int = 2, sample_rate: int = 16000):
        """
        aggressiveness: 0-3,越大越严格
        - 0: 最宽松,可能会把噪音也当作语音
        - 3: 最严格,可能会漏掉一些语音
        - 推荐: 2(平衡)
        """
        self.vad = webrtcvad.Vad(aggressiveness)
        self.sample_rate = sample_rate
        # 每个音频帧的时长(毫秒),WebRTC VAD 只支持 10/20/30ms
        self.frame_duration_ms = 30
        self.frame_size = int(sample_rate * frame_duration_ms / 1000)
    
    def is_speech(self, audio_frame: bytes) -> bool:
        """判断单个音频帧是否包含语音"""
        # 确保帧大小正确
        expected_bytes = self.frame_size * 2  # 16bit = 2 bytes per sample
        if len(audio_frame) < expected_bytes:
            audio_frame += b'\x00' * (expected_bytes - len(audio_frame))
        return self.vad.is_speech(audio_frame[:expected_bytes], self.sample_rate)


class VoiceActivityDetector:
    """带状态管理的 VAD,支持语音开始/结束事件"""
    
    def __init__(self, aggressiveness=2, sample_rate=16000,
                 silence_duration_ms=800, speech_pad_ms=300):
        self.vad = WebRTCVAD(aggressiveness, sample_rate)
        self.sample_rate = sample_rate
        self.frame_duration_ms = self.vad.frame_duration_ms
        
        # 配置参数
        self.silence_duration_ms = silence_duration_ms  # 静音多久算语音结束
        self.speech_pad_ms = speech_pad_ms              # 语音前后填充
        
        # 计算帧数
        self.silence_frames = int(silence_duration_ms / self.frame_duration_ms)
        self.pad_frames = int(speech_pad_ms / self.frame_duration_ms)
        
        # 状态
        self.ring_buffer = collections.deque(maxlen=self.silence_frames)
        self.triggered = False
        self.voiced_frames = []
        self.silence_counter = 0
    
    def process_frame(self, audio_frame: bytes) -> dict:
        """
        处理一个音频帧,返回事件信息
        返回: {"event": "speech_start" | "speech_end" | "speaking" | "silence", 
               "audio": bytes or None}
        """
        is_speech = self.vad.is_speech(audio_frame)
        
        if not self.triggered:
            self.ring_buffer.append((audio_frame, is_speech))
            
            # 检查是否应该触发(ring buffer 中语音帧占比超过阈值)
            num_voiced = sum(1 for _, speech in self.ring_buffer if speech)
            
            if num_voiced > 0.8 * self.ring_buffer.maxlen:
                self.triggered = True
                self.silence_counter = 0
                # 返回 ring buffer 中的所有帧(语音前填充)
                frames = [f for f, _ in self.ring_buffer]
                self.ring_buffer.clear()
                return {"event": "speech_start", "audio": b"".join(frames)}
            
            return {"event": "silence", "audio": None}
        
        else:
            self.voiced_frames.append(audio_frame)
            
            if not is_speech:
                self.silence_counter += 1
            else:
                self.silence_counter = 0
            
            if self.silence_counter >= self.silence_frames:
                # 语音结束
                self.triggered = False
                audio = b"".join(self.voiced_frames)
                self.voiced_frames.clear()
                self.silence_counter = 0
                return {"event": "speech_end", "audio": audio}
            
            return {"event": "speaking", "audio": None}
    
    def reset(self):
        self.ring_buffer.clear()
        self.triggered = False
        self.voiced_frames.clear()
        self.silence_counter = 0

# 使用示例
import wave

def detect_speech_in_file(wav_path: str):
    vad = VoiceActivityDetector(aggressiveness=2, silence_duration_ms=600)
    
    with wave.open(wav_path, "rb") as wf:
        assert wf.getsampwidth() == 2  # 16bit
        assert wf.getframerate() == 16000
        
        frame_size = int(16000 * 30 / 1000)  # 30ms 帧
        speech_segments = []
        current_segment = bytearray()
        
        while True:
            frame = wf.readframes(frame_size)
            if len(frame) < frame_size * 2:
                break
            
            result = vad.process_frame(frame)
            
            if result["event"] == "speech_start":
                current_segment = bytearray(result["audio"])
            elif result["event"] == "speaking":
                pass  # 正在说话,继续收集
            elif result["event"] == "speech_end":
                current_segment.extend(result["audio"] or b"")
                speech_segments.append(bytes(current_segment))
                print(f"检测到语音段: {len(current_segment) / 16000 / 2:.2f}s")
                current_segment.clear()
    
    return speech_segments

方案二:Silero VAD(深度学习模型,更准确)

python
import torch

class SileroVAD:
    """基于 Silero 的深度学习 VAD,准确率更高"""
    
    def __init__(self, threshold: float = 0.5):
        self.model, utils = torch.hub.load(
            repo_or_dir='snakers4/silero-vad',
            model='silero_vad',
            force_reload=False,
        )
        self.get_speech_timestamps, _, self.read_audio, _, _ = utils
        self.threshold = threshold
    
    def detect_speech_timestamps(self, audio_path: str) -> list:
        """检测语音的时间戳段"""
        wav = self.read_audio(audio_path, sampling_rate=16000)
        speech_timestamps = self.get_speech_timestamps(
            wav, 
            self.model,
            threshold=self.threshold,
            min_silence_duration_ms=500,
            speech_pad_ms=200,
        )
        return speech_timestamps
    
    def is_speech_chunk(self, audio_chunk: torch.Tensor) -> float:
        """判断单个音频块的语音概率"""
        return self.model(audio_chunk, 16000).item()

# 使用
vad = SileroVAD(threshold=0.5)
timestamps = vAD.detect_speech_timestamps("audio.wav")
for ts in timestamps:
    start = ts['start'] / 16000
    end = ts['end'] / 16000
    print(f"语音段: {start:.2f}s - {end:.2f}s")

Q: 如何设计完整的语音 Agent 架构?⭐⭐⭐

答:

一个生产级语音 Agent 需要考虑并发、容错、会话管理等。

python
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from typing import Optional, Dict, Callable, Awaitable
from enum import Enum

class SessionState(Enum):
    IDLE = "idle"
    LISTENING = "listening"
    PROCESSING = "processing"
    SPEAKING = "speaking"

@dataclass
class VoiceSession:
    session_id: str
    state: SessionState = SessionState.IDLE
    conversation_history: list = field(default_factory=list)
    created_at: float = field(default_factory=time.time)
    last_active: float = field(default_factory=time.time)
    audio_buffer: bytearray = field(default_factory=bytearray)
    language: str = "zh"

class VoiceAgent:
    """
    生产级语音 Agent 架构
    支持多会话、状态管理、中断处理
    """
    
    def __init__(self, stt, tts, llm, vad):
        self.stt = stt
        self.tts = tts
        self.llm = llm
        self.vad = vad
        self.sessions: Dict[str, VoiceSession] = {}
        self._audio_queues: Dict[str, asyncio.Queue] = {}
    
    def create_session(self) -> str:
        session_id = str(uuid.uuid4())
        session = VoiceSession(session_id=session_id)
        self.sessions[session_id] = session
        self._audio_queues[session_id] = asyncio.Queue()
        return session_id
    
    async def handle_audio_frame(self, session_id: str, audio_data: bytes):
        """处理来自客户端的音频帧"""
        session = self.sessions.get(session_id)
        if not session:
            return
        
        session.last_active = time.time()
        
        # VAD 检测
        if session.state == SessionState.SPEAKING:
            # 如果正在播放,用户说话则中断
            is_speech = self.vad.is_speech(audio_data)
            if is_speech:
                await self._interrupt_speech(session)
                session.state = SessionState.LISTENING
                session.audio_buffer.clear()
            return
        
        # 状态机处理
        if session.state == SessionState.IDLE:
            is_speech = self.vad.is_speech(audio_data)
            if is_speech:
                session.state = SessionState.LISTENING
                session.audio_buffer.extend(audio_data)
        
        elif session.state == SessionState.LISTENING:
            is_speech = self.vad.is_speech(audio_data)
            session.audio_buffer.extend(audio_data)
            
            if not is_speech:
                # 检测到静音,判断是否结束
                session._silence_frames = getattr(session, '_silence_frames', 0) + 1
                if session._silence_frames > 25:  # ~750ms 静音
                    session.state = SessionState.PROCESSING
                    asyncio.create_task(self._process_speech(session))
            else:
                session._silence_frames = 0
    
    async def _process_speech(self, session: VoiceSession):
        """处理识别到的语音"""
        try:
            # STT
            audio_bytes = bytes(session.audio_buffer)
            session.audio_buffer.clear()
            
            text = await self.stt.transcribe(audio_bytes)
            if not text.strip():
                session.state = SessionState.IDLE
                return
            
            session.conversation_history.append({"role": "user", "content": text})
            
            # LLM(流式生成)
            session.state = SessionState.SPEAKING
            full_response = ""
            
            # 流式 TTS:LLM 每生成一句话就立即合成
            sentence_buffer = ""
            async for token in self.llm.stream_chat(session.conversation_history):
                full_response += token
                sentence_buffer += token
                
                # 按句子分割,减少首句延迟
                if any(p in token for p in "。!?.!?\n"):
                    await self._speak_text(session, sentence_buffer)
                    sentence_buffer = ""
            
            if sentence_buffer:
                await self._speak_text(session, sentence_buffer)
            
            session.conversation_history.append({
                "role": "assistant", 
                "content": full_response
            })
        
        except Exception as e:
            print(f"Error processing speech: {e}")
        finally:
            session.state = SessionState.IDLE
    
    async def _speak_text(self, session: VoiceSession, text: str):
        """合成并发送语音"""
        async for audio_chunk in self.tts.stream_synthesize(text):
            await self._audio_queues[session.session_id].put(audio_chunk)
    
    async def _interrupt_speech(self, session: VoiceSession):
        """中断当前语音播放"""
        # 清空音频队列
        queue = self._audio_queues.get(session.session_id)
        if queue:
            while not queue.empty():
                try:
                    queue.get_nowait()
                except asyncio.QueueEmpty:
                    break

Q: 如何优化语音交互的端到端延迟?⭐⭐⭐

答:

语音交互的延迟直接影响用户体验,目标是 首字节延迟 < 500ms

python
"""
延迟优化策略总览:

用户说话结束 → STT (200-500ms) → LLM 首 token (200-500ms) → TTS 首帧 (100-300ms)
总延迟目标: < 1s(人感知流畅的阈值)
"""

import asyncio
from concurrent.futures import ThreadPoolExecutor

class LowLatencyVoicePipeline:
    """低延迟语音交互流水线"""
    
    def __init__(self, stt, tts, llm):
        self.stt = stt
        self.tts = tts
        self.llm = llm
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    # 策略1: 流式 STT - 边录边识别
    async def streaming_stt(self, audio_chunks):
        """流式语音识别,不需要等待整段录音完成"""
        partial_results = []
        async for chunk in audio_chunks:
            # 每收到一定量的音频就尝试部分识别
            result = await self.stt.transcribe_streaming(chunk)
            if result.text != partial_results[-1] if partial_results else True:
                partial_results.append(result.text)
                yield result.text
    
    # 策略2: LLM + TTS Pipeline 化 - 边生成边合成
    async def pipelined_response(self, user_text: str):
        """LLM 流式生成 + TTS 流式合成并行"""
        sentence_buffer = ""
        tts_tasks = []
        
        async for token in self.llm.stream_chat(user_text):
            sentence_buffer += token
            
            # 遇到句号等标点就启动 TTS(不等待完成)
            if any(p in token for p in "。!?.!?\n"):
                # 创建异步任务,不等待完成
                task = asyncio.create_task(
                    self._collect_tts_audio(sentence_buffer)
                )
                tts_tasks.append(task)
                sentence_buffer = ""
                # 立即 yield 已完成的 TTS 结果
                if len(tts_tasks) >= 2:
                    audio = await tts_tasks.pop(0)
                    yield audio
        
        # 处理剩余
        if sentence_buffer:
            task = asyncio.create_task(
                self._collect_tts_audio(sentence_buffer)
            )
            tts_tasks.append(task)
        
        for task in tts_tasks:
            yield await task
    
    async def _collect_tts_audio(self, text: str) -> bytes:
        """收集 TTS 的所有音频数据"""
        chunks = []
        async for chunk in self.tts.stream_synthesize(text):
            chunks.append(chunk)
        return b"".join(chunks)
    
    # 策略3: 预热模型
    async def warmup(self):
        """系统启动时预热所有模型"""
        tasks = [
            asyncio.create_task(self._warmup_stt()),
            asyncio.create_task(self._warmup_tts()),
            asyncio.create_task(self._warmup_llm()),
        ]
        await asyncio.gather(*tasks)
        print("All models warmed up")
    
    async def _warmup_stt(self):
        """用空音频预热 STT 模型"""
        import numpy as np
        dummy_audio = np.zeros(16000, dtype=np.float32)  # 1秒静音
        await self.stt.transcribe(dummy_audio)
    
    async def _warmup_tts(self):
        """用简短文本预热 TTS 模型"""
        async for _ in self.tts.stream_synthesize("你好"):
            pass
    
    async def _warmup_llm(self):
        """预热 LLM"""
        async for _ in self.llm.stream_chat("hi"):
            break
    
    # 策略4: 音频缓冲与播放优化
    class AudioJitterBuffer:
        """抖动缓冲区,平滑音频播放"""
        
        def __init__(self, min_buffer_ms: int = 100):
            self.buffer = bytearray()
            self.min_buffer_bytes = int(16000 * 2 * min_buffer_ms / 1000)
            self.playing = False
        
        async def add_chunk(self, chunk: bytes) -> list[bytes]:
            """添加音频块,返回可以播放的块"""
            self.buffer.extend(chunk)
            
            if not self.playing and len(self.buffer) >= self.min_buffer_bytes:
                self.playing = True
            
            if self.playing and len(self.buffer) >= self.min_buffer_bytes:
                # 输出一个播放块
                output = bytes(self.buffer[:self.min_buffer_bytes])
                self.buffer = self.buffer[self.min_buffer_bytes:]
                return [output]
            
            return []

延迟优化 Checklist:

优化点方法预期收益
STT 加速faster-whisper int8 量化减少 50%+ 推理时间
LLM 首 token使用流式 API感知延迟大幅降低
TTS 流式边生成边合成减少 200-400ms
句子级流水线不等 LLM 全部生成就开始 TTS减少 300-500ms
模型预热启动时加载并跑一次避免首次冷启动
音频编码使用 Opus 编码减少带宽,降低传输延迟
地理就近就近部署或使用 CDN减少网络延迟

Q: 如何实现 GPT-4o 风格的实时语音对话?⭐⭐⭐

答:

GPT-4o Realtime API 使用 WebSocket 进行全双工语音通信。

python
import asyncio
import websockets
import json
import base64
import numpy as np

class GPT4oRealtimeVoice:
    """
    OpenAI Realtime API 客户端
    支持全双工语音交互:同时收听和说话
    """
    
    def __init__(self, api_key: str, model: str = "gpt-4o-realtime-preview"):
        self.api_key = api_key
        self.model = model
        self.ws = None
        self.on_audio = None   # 回调: 收到音频时
        self.on_text = None    # 回调: 收到文本时
        self.on_transcript = None  # 回调: 收到转录时
    
    async def connect(self):
        """建立 WebSocket 连接"""
        url = f"wss://api.openai.com/v1/realtime?model={self.model}"
        
        self.ws = await websockets.connect(
            url,
            additional_headers={
                "Authorization": f"Bearer {self.api_key}",
                "OpenAI-Beta": "realtime=v1",
            },
        )
        
        # 配置会话
        await self._send_event({
            "type": "session.update",
            "session": {
                "modalities": ["text", "audio"],
                "instructions": "你是一个有帮助的中文语音助手。请用中文回答,语气自然亲切。",
                "voice": "alloy",
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": "whisper-1",
                },
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 500,
                },
                "temperature": 0.8,
            },
        })
    
    async def _send_event(self, event: dict):
        await self.ws.send(json.dumps(event))
    
    async def send_audio(self, audio_bytes: bytes):
        """发送音频数据(PCM16, 24kHz, 单声道)"""
        audio_b64 = base64.b64encode(audio_bytes).decode()
        await self._send_event({
            "type": "input_audio_buffer.append",
            "audio": audio_b64,
        })
    
    async def send_text(self, text: str):
        """发送文本消息"""
        await self._send_event({
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": text}],
            },
        })
        await self._send_event({"type": "response.create"})
    
    async def listen(self):
        """监听服务端事件"""
        async for message in self.ws:
            event = json.loads(message)
            event_type = event.get("type", "")
            
            if event_type == "response.audio.delta":
                # 收到音频片段
                audio_bytes = base64.b64decode(event["delta"])
                if self.on_audio:
                    await self.on_audio(audio_bytes)
            
            elif event_type == "response.audio_transcript.delta":
                # AI 的语音转文字
                delta = event["delta"]
                if self.on_text:
                    await self.on_text(delta)
            
            elif event_type == "conversation.item.input_audio_transcription.completed":
                # 用户语音的转录结果
                transcript = event["transcript"]
                if self.on_transcript:
                    await self.on_transcript(transcript)
            
            elif event_type == "error":
                print(f"Error: {event}")
    
    async def close(self):
        if self.ws:
            await self.ws.close()

# 使用示例
async def main():
    client = GPT4oRealtimeVoice(api_key="sk-xxx")
    
    client.on_transcript = lambda t: print(f"用户: {t}")
    client.on_text = lambda t: print(f"AI: {t}", end="", flush=True)
    client.on_audio = lambda a: play_audio(a)  # 播放音频
    
    await client.connect()
    
    # 启动监听任务
    listen_task = asyncio.create_task(client.listen())
    
    # 发送音频(从麦克风采集)
    await stream_microphone(client)
    
    listen_task.cancel()

async def stream_microphone(client):
    """从麦克风流式采集并发送音频"""
    import pyaudio
    
    pa = pyaudio.PyAudio()
    stream = pa.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=24000,
        input=True,
        frames_per_buffer=1024,
    )
    
    try:
        while True:
            data = stream.read(1024, exception_on_overflow=False)
            await client.send_audio(data)
            await asyncio.sleep(0.01)  # 控制发送速率
    finally:
        stream.stop_stream()
        stream.close()
        pa.terminate()

Q: 如何实现多语言语音支持?⭐⭐

答:

多语言支持需要在 STT、LLM、TTS 三个环节都做好处理。

python
import asyncio
from typing import Optional
from langdetect import detect

class MultiLanguageVoiceAgent:
    """支持多语言的语音 Agent"""
    
    # 语言映射配置
    LANG_CONFIG = {
        "zh": {
            "stt_model": "whisper-large-v3",
            "stt_prompt": "以下是普通话的句子。",
            "tts_voice": "zh-CN-XiaoxiaoNeural",
            "tts_edge_voice": "zh-CN-YunxiNeural",
            "system_prompt": "请用中文回答。",
        },
        "en": {
            "stt_model": "whisper-large-v3",
            "stt_prompt": "",
            "tts_voice": "en-US-JennyNeural",
            "tts_edge_voice": "en-US-GuyNeural",
            "system_prompt": "Please respond in English.",
        },
        "ja": {
            "stt_model": "whisper-large-v3",
            "stt_prompt": "",
            "tts_voice": "ja-JP-NanamiNeural",
            "tts_edge_voice": "ja-JP-KeitaNeural",
            "system_prompt": "日本語で回答してください。",
        },
        "ko": {
            "stt_model": "whisper-large-v3",
            "stt_prompt": "",
            "tts_voice": "ko-KR-SunHiNeural",
            "tts_edge_voice": "ko-KR-InJoonNeural",
            "system_prompt": "한국어로 대답해 주세요.",
        },
    }
    
    def __init__(self, stt, tts, llm):
        self.stt = stt
        self.tts = tts
        self.llm = llm
        self.current_lang = "zh"  # 默认语言
    
    def detect_language(self, text: str) -> str:
        """检测语言"""
        try:
            lang = detect(text)
            # langdetect 返回 ISO 639-1 代码
            if lang in self.LANG_CONFIG:
                return lang
            # 某些语言的映射
            lang_map = {"zh-cn": "zh", "zh-tw": "zh"}
            return lang_map.get(lang, "en")
        except:
            return self.current_lang
    
    async def transcribe_with_lang(self, audio_bytes: bytes, 
                                    language: Optional[str] = None) -> tuple[str, str]:
        """识别语音并返回 (文本, 语言)"""
        # 如果未指定语言,让 Whisper 自动检测
        if language:
            config = self.LANG_CONFIG[language]
            text = await self.stt.transcribe(
                audio_bytes,
                language=language,
                initial_prompt=config["stt_prompt"],
            )
            return text, language
        else:
            # 自动检测
            text = await self.stt.transcribe(audio_bytes)
            detected_lang = self.detect_language(text)
            return text, detected_lang
    
    async def respond(self, user_text: str, language: str) -> str:
        """根据语言生成回复"""
        config = self.LANG_CONFIG.get(language, self.LANG_CONFIG["en"])
        
        messages = [
            {"role": "system", "content": config["system_prompt"]},
            {"role": "user", "content": user_text},
        ]
        
        response = await self.llm.chat(messages)
        return response
    
    async def synthesize_speech(self, text: str, language: str) -> bytes:
        """根据语言合成语音"""
        config = self.LANG_CONFIG.get(language, self.LANG_CONFIG["en"])
        
        audio_chunks = []
        async for chunk in self.tts.stream_synthesize(text, voice=config["tts_voice"]):
            audio_chunks.append(chunk)
        
        return b"".join(audio_chunks)
    
    async def process_audio(self, audio_bytes: bytes) -> bytes:
        """完整处理流程"""
        # 1. STT(带语言检测)
        text, language = await self.transcribe_with_lang(audio_bytes)
        print(f"[{language}] 用户: {text}")
        
        # 2. 生成回复
        response = await self.respond(text, language)
        print(f"[{language}] AI: {response}")
        
        # 3. TTS
        audio = await self.synthesize_speech(response, language)
        return audio

Q: 如何构建音频处理流水线?⭐⭐

答:

生产环境中,音频在送入 STT 之前需要经过一系列预处理。

python
import numpy as np
from scipy import signal
import io
import wave

class AudioProcessor:
    """音频预处理流水线"""
    
    @staticmethod
    def convert_sample_rate(audio: np.ndarray, 
                            orig_sr: int, target_sr: int) -> np.ndarray:
        """采样率转换(如 48kHz → 16kHz)"""
        if orig_sr == target_sr:
            return audio
        
        # 使用 scipy 的 resample
        duration = len(audio) / orig_sr
        target_length = int(duration * target_sr)
        resampled = signal.resample(audio, target_length)
        return resampled.astype(np.float32)
    
    @staticmethod
    def normalize_audio(audio: np.ndarray, target_db: float = -20.0) -> np.ndarray:
        """音量归一化"""
        rms = np.sqrt(np.mean(audio ** 2))
        if rms == 0:
            return audio
        
        current_db = 20 * np.log10(rms)
        gain = target_db - current_db
        return audio * (10 ** (gain / 20))
    
    @staticmethod
    def remove_silence(audio: np.ndarray, sample_rate: int = 16000,
                       threshold_db: float = -40, 
                       min_silence_ms: int = 200) -> np.ndarray:
        """去除首尾静音"""
        threshold = 10 ** (threshold_db / 20)
        
        # 找到有效音频的起止位置
        frame_size = int(sample_rate * 0.01)  # 10ms 帧
        energy = np.array([
            np.sqrt(np.mean(audio[i:i+frame_size] ** 2))
            for i in range(0, len(audio) - frame_size, frame_size)
        ])
        
        above_threshold = np.where(energy > threshold)[0]
        if len(above_threshold) == 0:
            return np.array([], dtype=np.float32)
        
        start = max(0, above_threshold[0] * frame_size - frame_size * 5)
        end = min(len(audio), above_threshold[-1] * frame_size + frame_size * 5)
        
        return audio[start:end]
    
    @staticmethod
    def apply_noise_gate(audio: np.ndarray, threshold_db: float = -35) -> np.ndarray:
        """噪声门:低于阈值的信号设为零"""
        threshold = 10 ** (threshold_db / 20)
        gate = np.abs(audio) > threshold
        return audio * gate
    
    @staticmethod
    def apply_high_pass_filter(audio: np.ndarray, sample_rate: int = 16000,
                                cutoff_hz: float = 80) -> np.ndarray:
        """高通滤波器:去除低频噪音(如空调嗡嗡声)"""
        nyquist = sample_rate / 2
        normalized_cutoff = cutoff_hz / nyquist
        b, a = signal.butter(4, normalized_cutoff, btype='high')
        return signal.filtfilt(b, a, audio).astype(np.float32)
    
    @staticmethod
    def convert_format(audio_bytes: bytes, 
                       from_format: str, to_format: str,
                       sample_rate: int = 16000) -> bytes:
        """音频格式转换"""
        import subprocess
        
        process = subprocess.Popen(
            ['ffmpeg', '-i', 'pipe:0',
             '-f', to_format,
             '-ar', str(sample_rate),
             '-ac', '1',
             'pipe:1'],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
        )
        output, _ = process.communicate(input=audio_bytes)
        return output


class AudioPipeline:
    """完整的音频处理流水线"""
    
    def __init__(self, target_sr: int = 16000):
        self.target_sr = target_sr
        self.processor = AudioProcessor()
    
    def process(self, audio_bytes: bytes, 
                source_sr: int = 44100,
                source_format: str = "wav") -> np.ndarray:
        """
        完整处理流程:
        原始音频 → 格式转换 → 采样率转换 → 去噪 → 归一化 → 去静音
        """
        # 1. 转为 numpy 数组
        if source_format == "wav":
            with wave.open(io.BytesIO(audio_bytes), "rb") as wf:
                raw_data = wf.readframes(wf.getnframes())
                audio = np.frombuffer(raw_data, dtype=np.int16).astype(np.float32) / 32768.0
                source_sr = wf.getframerate()
        else:
            # 使用 ffmpeg 转换
            wav_bytes = self.processor.convert_format(
                audio_bytes, source_format, "wav", self.target_sr
            )
            with wave.open(io.BytesIO(wav_bytes), "rb") as wf:
                raw_data = wf.readframes(wf.getnframes())
                audio = np.frombuffer(raw_data, dtype=np.int16).astype(np.float32) / 32768.0
                source_sr = self.target_sr
        
        # 2. 采样率转换
        audio = self.processor.convert_sample_rate(audio, source_sr, self.target_sr)
        
        # 3. 高通滤波
        audio = self.processor.apply_high_pass_filter(audio, self.target_sr)
        
        # 4. 音量归一化
        audio = self.processor.normalize_audio(audio, target_db=-20.0)
        
        # 5. 去除首尾静音
        audio = self.processor.remove_silence(audio, self.target_sr)
        
        return audio

Q: 如何在生产环境中监控和调试语音系统?⭐⭐

答:

python
import time
import logging
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class VoiceMetrics:
    """语音系统性能指标"""
    
    # 延迟指标 (ms)
    vad_latency_ms: float = 0
    stt_latency_ms: float = 0
    llm_first_token_ms: float = 0
    llm_total_ms: float = 0
    tts_first_chunk_ms: float = 0
    tts_total_ms: float = 0
    
    # 端到端
    e2e_latency_ms: float = 0  # 用户停止说话 → 听到第一个音频
    
    # 质量指标
    stt_confidence: float = 0
    audio_duration_ms: float = 0
    tts_audio_duration_ms: float = 0
    
    # 会话信息
    session_id: str = ""
    language: str = ""
    text_input: str = ""
    text_output: str = ""

class VoiceMetricsCollector:
    def __init__(self):
        self.metrics_history: list[VoiceMetrics] = []
    
    def calculate_e2e(self, m: VoiceMetrics) -> float:
        """计算端到端延迟"""
        m.e2e_latency_ms = (
            m.stt_latency_ms +
            m.llm_first_token_ms +
            m.tts_first_chunk_ms
        )
        return m.e2e_latency_ms
    
    def get_summary(self) -> dict:
        """获取统计数据摘要"""
        if not self.metrics_history:
            return {}
        
        recent = self.metrics_history[-100:]  # 最近 100 次交互
        
        def avg(key):
            values = [getattr(m, key) for m in recent if getattr(m, key) > 0]
            return sum(values) / len(values) if values else 0
        
        def p95(key):
            values = sorted([getattr(m, key) for m in recent if getattr(m, key) > 0])
            if not values:
                return 0
            idx = int(len(values) * 0.95)
            return values[min(idx, len(values) - 1)]
        
        return {
            "total_interactions": len(self.metrics_history),
            "avg_e2e_latency_ms": round(avg("e2e_latency_ms"), 1),
            "p95_e2e_latency_ms": round(p95("e2e_latency_ms"), 1),
            "avg_stt_ms": round(avg("stt_latency_ms"), 1),
            "avg_llm_first_token_ms": round(avg("llm_first_token_ms"), 1),
            "avg_tts_first_chunk_ms": round(avg("tts_first_chunk_ms"), 1),
        }
    
    def log_metrics(self, metrics: VoiceMetrics):
        """记录并输出指标"""
        self.metrics_history.append(metrics)
        self.calculate_e2e(metrics)
        
        logging.info(
            f"[VoiceMetrics] session={metrics.session_id[:8]} "
            f"lang={metrics.language} "
            f"stt={metrics.stt_latency_ms:.0f}ms "
            f"llm_ft={metrics.llm_first_token_ms:.0f}ms "
            f"tts_ft={metrics.tts_first_chunk_ms:.0f}ms "
            f"e2e={metrics.e2e_latency_ms:.0f}ms"
        )


class InstrumentedVoicePipeline:
    """带监控的语音流水线"""
    
    def __init__(self, stt, tts, llm, vad):
        self.stt = stt
        self.tts = tts
        self.llm = llm
        self.vad = vad
        self.metrics = VoiceMetricsCollector()
    
    async def process(self, audio_bytes: bytes, session_id: str = "") -> bytes:
        m = VoiceMetrics(session_id=session_id)
        
        # STT
        t0 = time.monotonic()
        text = await self.stt.transcribe(audio_bytes)
        m.stt_latency_ms = (time.monotonic() - t0) * 1000
        m.text_input = text
        
        # LLM (流式)
        t0 = time.monotonic()
        response_text = ""
        first_token = True
        async for token in self.llm.stream_chat(text):
            if first_token:
                m.llm_first_token_ms = (time.monotonic() - t0) * 1000
                first_token = False
            response_text += token
        m.llm_total_ms = (time.monotonic() - t0) * 1000
        m.text_output = response_text
        
        # TTS (流式)
        t0 = time.monotonic()
        audio_chunks = []
        first_chunk = True
        async for chunk in self.tts.stream_synthesize(response_text):
            if first_chunk:
                m.tts_first_chunk_ms = (time.monotonic() - t0) * 1000
                first_chunk = False
            audio_chunks.append(chunk)
        m.tts_total_ms = (time.monotonic() - t0) * 1000
        
        # 记录指标
        self.metrics.log_metrics(m)
        
        return b"".join(audio_chunks)

指标看板示例输出:

=== Voice System Metrics ===
Total interactions: 1523
Avg E2E latency: 847ms (P95: 1243ms)
  STT:          avg 234ms (P95: 412ms)
  LLM 1st token: avg 312ms (P95: 523ms)
  TTS 1st chunk: avg 189ms (P95: 301ms)
===========================

本章总结: 语音交互系统的核心挑战在于低延迟和高质量。关键优化策略包括:使用 faster-whisper 进行本地加速推理、LLM + TTS pipeline 化减少等待、VAD 实现准确的语音端点检测、WebSocket 实现全双工通信。生产环境中需要重点关注端到端延迟监控,目标控制在 1 秒以内以提供流畅体验。

LLM 应用 & Agent 开发面试准备