18. 语音与实时交互
语音交互是 LLM 应用从文本走向多模态的关键一步。本章覆盖 STT、TTS、实时语音 Agent、VAD、音频处理流水线、延迟优化等核心话题,帮助你构建低延迟、高质量的语音交互系统。
Q: 语音交互系统的整体架构是怎样的?⭐⭐
答:
一个完整的语音交互系统通常包含以下模块:
用户麦克风 → VAD(语音活动检测) → STT(语音转文本) → LLM(大语言模型) → TTS(文本转语音) → 扬声器核心流水线:
import asyncio
from dataclasses import dataclass
from typing import AsyncIterator
@dataclass
class AudioConfig:
sample_rate: int = 16000
channels: int = 1
chunk_duration_ms: int = 30 # 每个音频块的时长(毫秒)
class VoicePipeline:
"""语音交互核心流水线"""
def __init__(self, stt_engine, tts_engine, llm_client):
self.stt = stt_engine
self.tts = tts_engine
self.llm = llm_client
self.vad = WebRTCVAD(aggressiveness=2)
async def process_audio_stream(self, audio_stream: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
"""处理完整的音频流:录音 → STT → LLM → TTS → 播放"""
# 1. 收集语音片段(VAD 检测到语音结束时触发)
audio_buffer = bytearray()
is_speaking = False
async for chunk in audio_stream:
is_speech = self.vad.is_speech(chunk)
if is_speech:
is_speaking = True
audio_buffer.extend(chunk)
elif is_speaking:
# 语音结束,开始处理
is_speaking = False
# 2. STT: 语音转文本
text = await self.stt.transcribe(bytes(audio_buffer))
if not text.strip():
audio_buffer.clear()
continue
# 3. LLM: 生成回复
response = await self.llm.chat(text)
# 4. TTS: 文本转语音并流式输出
async for audio_chunk in self.tts.stream_speak(response):
yield audio_chunk
audio_buffer.clear()关键设计要点:
- 流式处理:每个环节尽量流式,减少整体延迟
- 异步并发:LLM 推理和 TTS 可以 pipeline 化
- VAD 断句:准确的语音端点检测直接影响用户体验
Q: 如何使用 OpenAI Whisper 和 faster-whisper 进行语音转文本?⭐⭐⭐
答:
方案一:OpenAI Whisper(本地部署)
import whisper
import numpy as np
class WhisperSTT:
def __init__(self, model_size="base"):
"""
model_size: tiny, base, small, medium, large
- tiny: ~1GB VRAM, 最快,准确率一般
- base: ~1GB VRAM, 较快
- small: ~2GB VRAM, 平衡
- medium: ~5GB VRAM, 较准
- large: ~10GB VRAM, 最准
"""
self.model = whisper.load_model(model_size)
def transcribe(self, audio_path: str, language: str = None) -> str:
result = self.model.transcribe(
audio_path,
language=language, # 指定语言可提高准确率
fp16=False, # CPU 环境设为 False
initial_prompt="以下是普通话的句子。", # 中文提示可提高中文识别率
)
return result["text"]
def transcribe_bytes(self, audio_bytes: bytes, sample_rate: int = 16000) -> str:
"""从内存中的音频字节流转录"""
audio_array = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
result = self.model.transcribe(audio_array, fp16=False)
return result["text"]方案二:faster-whisper(推荐,基于 CTranslate2 加速)
from faster_whisper import WhisperModel
import time
class FasterWhisperSTT:
def __init__(self, model_size="base", device="auto", compute_type="float32"):
"""
compute_type: float32, float16, int8
int8 量化可在 CPU 上获得 4 倍加速
"""
self.model = WhisperModel(
model_size,
device=device, # auto, cpu, cuda
compute_type=compute_type
)
def transcribe(self, audio_path: str, language: str = None) -> dict:
segments, info = self.model.transcribe(
audio_path,
language=language,
beam_size=5,
vad_filter=True, # 启用 VAD 过滤静音段
vad_parameters=dict(
min_silence_duration_ms=500,
speech_pad_ms=200,
),
)
full_text = ""
segment_list = []
for segment in segments:
full_text += segment.text
segment_list.append({
"start": segment.start,
"end": segment.end,
"text": segment.text,
})
return {
"text": full_text,
"language": info.language,
"duration": info.duration,
"segments": segment_list,
}
def transcribe_streaming(self, audio_path: str):
"""流式返回识别结果(适合长音频)"""
segments, _ = self.model.transcribe(audio_path, beam_size=5)
for segment in segments:
yield {
"start": segment.start,
"end": segment.end,
"text": segment.text,
}
# 使用示例
stt = FasterWhisperSTT(model_size="small", compute_type="int8")
result = stt.transcribe("recording.wav", language="zh")
print(f"识别结果: {result['text']}")
print(f"语言: {result['language']}, 时长: {result['duration']:.1f}s")方案三:Groq Whisper API(云端,极快)
import httpx
import io
class GroqWhisperSTT:
"""使用 Groq 的 Whisper Large V3 API,推理速度极快"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.groq.com/openai/v1"
async def transcribe(self, audio_path: str, language: str = "zh") -> str:
async with httpx.AsyncClient() as client:
with open(audio_path, "rb") as f:
response = await client.post(
f"{self.base_url}/audio/transcriptions",
headers={"Authorization": f"Bearer {self.api_key}"},
files={"file": ("audio.wav", f, "audio/wav")},
data={
"model": "whisper-large-v3",
"language": language,
"response_format": "verbose_json",
},
timeout=30,
)
result = response.json()
return result["text"]
async def transcribe_bytes(self, audio_bytes: bytes, filename: str = "audio.wav") -> str:
"""从字节流转录"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/audio/transcriptions",
headers={"Authorization": f"Bearer {self.api_key}"},
files={"file": (filename, io.BytesIO(audio_bytes), "audio/wav")},
data={"model": "whisper-large-v3", "language": "zh"},
timeout=30,
)
return response.json()["text"]三种方案对比:
| 特性 | OpenAI Whisper | faster-whisper | Groq Whisper API |
|---|---|---|---|
| 部署方式 | 本地 | 本地 | 云端 |
| 速度 | 基准 | 4-8x 加速 | 极快(硬件加速) |
| 成本 | GPU 硬件成本 | GPU/CPU 均可 | 按量付费 |
| 离线可用 | ✅ | ✅ | ❌ |
| 推荐场景 | 研究/原型 | 生产环境本地部署 | 生产环境云端 |
Q: 如何实现高质量的 TTS(文本转语音)?⭐⭐⭐
答:
方案一:Edge TTS(免费,微软 Azure 引擎)
import edge_tts
import asyncio
class EdgeTTSEngine:
"""使用 Edge TTS,免费且支持多种语言和音色"""
# 常用中文音色
VOICES = {
"zh-female": "zh-CN-XiaoxiaoNeural", # 活泼女声
"zh-male": "zh-CN-YunxiNeural", # 男声
"zh-female-warm": "zh-CN-XiaohanNeural", # 温暖女声
"zh-female-news": "zh-CN-XiaomoNeural", # 新闻播报
"en-female": "en-US-JennyNeural", # 英文女声
"en-male": "en-US-GuyNeural", # 英文男声
}
def __init__(self, voice: str = "zh-CN-XiaoxiaoNeural"):
self.voice = voice
async def synthesize(self, text: str, output_path: str):
"""合成语音并保存到文件"""
communicate = edge_tts.Communicate(text, self.voice)
await communicate.save(output_path)
async def stream_synthesize(self, text: str):
"""流式合成,逐块返回音频数据"""
communicate = edge_tts.Communicate(
text,
self.voice,
rate="+0%", # 语速调整: -50% ~ +100%
volume="+0%", # 音量调整: -50% ~ +100%
pitch="+0Hz", # 音调调整: -50Hz ~ +50Hz
)
async for chunk in communicate.stream():
if chunk["type"] == "audio":
yield chunk["data"]
async def list_voices(self, language: str = "zh"):
"""列出可用音色"""
voices = await edge_tts.list_voices()
return [v for v in voices if v["Locale"].startswith(language)]
# 使用示例
async def main():
tts = EdgeTTSEngine(voice="zh-CN-XiaoxiaoNeural")
# 保存到文件
await tts.synthesize("你好,欢迎使用语音助手!", "output.mp3")
# 流式合成
with open("stream_output.mp3", "wb") as f:
async for chunk in tts.stream_synthesize("这是一段流式合成的测试文本。"):
f.write(chunk)
asyncio.run(main())方案二:OpenAI TTS
import openai
from openai import AsyncOpenAI
class OpenAITTSEngine:
"""OpenAI TTS API,支持 6 种音色,质量高"""
VOICES = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
def __init__(self, api_key: str, model: str = "tts-1"):
"""
model: tts-1 (低延迟) 或 tts-1-hd (高质量)
"""
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
async def synthesize(self, text: str, output_path: str, voice: str = "nova"):
response = await self.client.audio.speech.create(
model=self.model,
voice=voice,
input=text,
response_format="mp3", # mp3, opus, aac, flac, wav, pcm
speed=1.0, # 0.25 ~ 4.0
)
response.stream_to_file(output_path)
async def stream_synthesize(self, text: str, voice: str = "nova"):
"""流式返回音频数据"""
response = await self.client.audio.speech.create(
model=self.model,
voice=voice,
input=text,
response_format="pcm", # PCM 格式适合流式播放
)
async for chunk in response.iter_bytes(chunk_size=4096):
yield chunk
# 使用示例
async def main():
tts = OpenAITTSEngine(api_key="sk-xxx")
await tts.synthesize("Hello, this is a test.", "output.mp3", voice="nova")
asyncio.run(main())方案三:MiniMax TTS(高质量中文语音)
import httpx
class MiniMaxTTSEngine:
"""MiniMax TTS API,中文语音质量极高"""
def __init__(self, api_key: str, group_id: str):
self.api_key = api_key
self.group_id = group_id
self.base_url = "https://api.minimax.chat/v1"
async def synthesize(self, text: str, voice_id: str = "female-shaonv") -> bytes:
"""
voice_id 可选:
- female-shaonv: 少女音
- female-yujie: 御姐音
- male-qn-jingying: 精英男
- male-qn-badao: 霸道男
- presenter_male: 男主持人
- presenter_female: 女主持人
- audiobook_male_1: 有声书男声
- audiobook_female_1: 有声书女声
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/t2a_v2",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": "speech-01-turbo",
"text": text,
"voice_setting": {
"voice_id": voice_id,
"speed": 1.0,
"vol": 1.0,
"pitch": 0,
},
"audio_setting": {
"sample_rate": 32000,
"bitrate": 128000,
"format": "mp3",
},
},
timeout=60,
)
result = response.json()
# 返回的音频数据是 hex 编码的
audio_hex = result["data"]["audio"]
return bytes.fromhex(audio_hex)
async def stream_synthesize(self, text: str, voice_id: str = "female-shaonv"):
"""流式合成"""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{self.base_url}/t2a_v2",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": "speech-01-turbo",
"text": text,
"stream": True,
"voice_setting": {"voice_id": voice_id},
},
timeout=60,
) as response:
async for line in response.aiter_lines():
if line.startswith("data:"):
import json
data = json.loads(line[5:])
if "data" in data and "audio" in data["data"]:
yield bytes.fromhex(data["data"]["audio"])Q: 如何实现 WebSocket 实时语音交互?⭐⭐⭐
答:
实时语音交互需要低延迟的双向通信,WebSocket 是最佳选择。
服务端:
import asyncio
import websockets
import json
import numpy as np
from faster_whisper import WhisperModel
class RealtimeVoiceServer:
def __init__(self):
self.stt = WhisperModel("base", device="cpu", compute_type="int8")
self.active_sessions = {}
async def handle_client(self, websocket, path=None):
session_id = id(websocket)
audio_buffer = bytearray()
print(f"Client {session_id} connected")
try:
async for message in websocket:
if isinstance(message, bytes):
# 音频数据帧
audio_buffer.extend(message)
# 每积累 1 秒音频就尝试识别
if len(audio_buffer) >= 16000 * 2: # 16kHz, 16bit
text = await self._transcribe(bytes(audio_buffer))
audio_buffer.clear()
if text.strip():
await websocket.send(json.dumps({
"type": "transcription",
"text": text,
}))
# 获取 LLM 回复
response = await self._get_llm_response(text)
# TTS 合成并流式发送
await websocket.send(json.dumps({
"type": "response_start",
"text": response,
}))
async for audio_chunk in self._tts_synthesize(response):
await websocket.send(audio_chunk)
await websocket.send(json.dumps({
"type": "response_end",
}))
elif isinstance(message, str):
# 控制消息
cmd = json.loads(message)
if cmd["type"] == "config":
self.active_sessions[session_id] = cmd.get("config", {})
elif cmd["type"] == "end_of_speech":
# 用户主动结束语音
if audio_buffer:
text = await self._transcribe(bytes(audio_buffer))
audio_buffer.clear()
# ... 同上处理逻辑
finally:
self.active_sessions.pop(session_id, None)
print(f"Client {session_id} disconnected")
async def _transcribe(self, audio_bytes: bytes) -> str:
loop = asyncio.get_event_loop()
audio = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
segments, _ = await loop.run_in_executor(
None, lambda: self.stt.transcribe(audio, beam_size=3)
)
return "".join(s.text for s in segments)
async def _get_llm_response(self, text: str) -> str:
# 调用 LLM API
return f"Echo: {text}"
async def _tts_synthesize(self, text: str):
# 调用 TTS 引擎
yield b""
# 启动服务
async def main():
server = await websockets.serve(
RealtimeVoiceServer().handle_client,
"0.0.0.0",
8765,
max_size=10 * 1024 * 1024, # 10MB
)
print("WebSocket server started on ws://0.0.0.0:8765")
await server.wait_closed()
asyncio.run(main())客户端(浏览器 JavaScript):
class RealtimeVoiceClient {
constructor(wsUrl) {
this.wsUrl = wsUrl;
this.ws = null;
this.audioContext = null;
this.mediaStream = null;
this.isRecording = false;
}
async connect() {
this.ws = new WebSocket(this.wsUrl);
this.ws.binaryType = 'arraybuffer';
this.ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
// 音频数据,播放
this.playAudioChunk(event.data);
} else {
// JSON 控制消息
const msg = JSON.parse(event.data);
switch (msg.type) {
case 'transcription':
console.log('识别结果:', msg.text);
this.onTranscription?.(msg.text);
break;
case 'response_start':
console.log('AI回复:', msg.text);
this.onResponse?.(msg.text);
break;
case 'response_end':
console.log('回复结束');
this.onResponseEnd?.();
break;
}
}
};
return new Promise((resolve) => {
this.ws.onopen = resolve;
});
}
async startRecording() {
this.mediaStream = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 16000,
channelCount: 1,
echoCancellation: true,
noiseSuppression: true,
}
});
this.audioContext = new AudioContext({ sampleRate: 16000 });
const source = this.audioContext.createMediaStreamSource(this.mediaStream);
// 使用 ScriptProcessorNode 或 AudioWorklet 处理音频
const processor = this.audioContext.createScriptProcessor(4096, 1, 1);
processor.onaudioprocess = (e) => {
if (!this.isRecording) return;
const inputData = e.inputBuffer.getChannelData(0);
// Float32 转 Int16
const int16Data = new Int16Array(inputData.length);
for (let i = 0; i < inputData.length; i++) {
int16Data[i] = Math.max(-32768, Math.min(32767, inputData[i] * 32768));
}
this.ws.send(int16Data.buffer);
};
source.connect(processor);
processor.connect(this.audioContext.destination);
this.isRecording = true;
}
stopRecording() {
this.isRecording = false;
this.mediaStream?.getTracks().forEach(t => t.stop());
this.audioContext?.close();
}
async playAudioChunk(arrayBuffer) {
// 播放接收到的音频数据
const audioContext = new AudioContext();
const audioBuffer = await audioContext.decodeAudioData(arrayBuffer);
const source = audioContext.createBufferSource();
source.buffer = audioBuffer;
source.connect(audioContext.destination);
source.start();
}
}
// 使用
const client = new RealtimeVoiceClient('ws://localhost:8765');
await client.connect();
await client.startRecording();Q: 如何实现语音活动检测(VAD)?⭐⭐⭐
答:
VAD 用于检测音频中是否包含人声,是语音交互系统的核心组件。
方案一:WebRTC VAD(轻量级)
import webrtcvad
import collections
import struct
class WebRTCVAD:
"""基于 WebRTC 的 VAD,极其轻量高效"""
def __init__(self, aggressiveness: int = 2, sample_rate: int = 16000):
"""
aggressiveness: 0-3,越大越严格
- 0: 最宽松,可能会把噪音也当作语音
- 3: 最严格,可能会漏掉一些语音
- 推荐: 2(平衡)
"""
self.vad = webrtcvad.Vad(aggressiveness)
self.sample_rate = sample_rate
# 每个音频帧的时长(毫秒),WebRTC VAD 只支持 10/20/30ms
self.frame_duration_ms = 30
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
def is_speech(self, audio_frame: bytes) -> bool:
"""判断单个音频帧是否包含语音"""
# 确保帧大小正确
expected_bytes = self.frame_size * 2 # 16bit = 2 bytes per sample
if len(audio_frame) < expected_bytes:
audio_frame += b'\x00' * (expected_bytes - len(audio_frame))
return self.vad.is_speech(audio_frame[:expected_bytes], self.sample_rate)
class VoiceActivityDetector:
"""带状态管理的 VAD,支持语音开始/结束事件"""
def __init__(self, aggressiveness=2, sample_rate=16000,
silence_duration_ms=800, speech_pad_ms=300):
self.vad = WebRTCVAD(aggressiveness, sample_rate)
self.sample_rate = sample_rate
self.frame_duration_ms = self.vad.frame_duration_ms
# 配置参数
self.silence_duration_ms = silence_duration_ms # 静音多久算语音结束
self.speech_pad_ms = speech_pad_ms # 语音前后填充
# 计算帧数
self.silence_frames = int(silence_duration_ms / self.frame_duration_ms)
self.pad_frames = int(speech_pad_ms / self.frame_duration_ms)
# 状态
self.ring_buffer = collections.deque(maxlen=self.silence_frames)
self.triggered = False
self.voiced_frames = []
self.silence_counter = 0
def process_frame(self, audio_frame: bytes) -> dict:
"""
处理一个音频帧,返回事件信息
返回: {"event": "speech_start" | "speech_end" | "speaking" | "silence",
"audio": bytes or None}
"""
is_speech = self.vad.is_speech(audio_frame)
if not self.triggered:
self.ring_buffer.append((audio_frame, is_speech))
# 检查是否应该触发(ring buffer 中语音帧占比超过阈值)
num_voiced = sum(1 for _, speech in self.ring_buffer if speech)
if num_voiced > 0.8 * self.ring_buffer.maxlen:
self.triggered = True
self.silence_counter = 0
# 返回 ring buffer 中的所有帧(语音前填充)
frames = [f for f, _ in self.ring_buffer]
self.ring_buffer.clear()
return {"event": "speech_start", "audio": b"".join(frames)}
return {"event": "silence", "audio": None}
else:
self.voiced_frames.append(audio_frame)
if not is_speech:
self.silence_counter += 1
else:
self.silence_counter = 0
if self.silence_counter >= self.silence_frames:
# 语音结束
self.triggered = False
audio = b"".join(self.voiced_frames)
self.voiced_frames.clear()
self.silence_counter = 0
return {"event": "speech_end", "audio": audio}
return {"event": "speaking", "audio": None}
def reset(self):
self.ring_buffer.clear()
self.triggered = False
self.voiced_frames.clear()
self.silence_counter = 0
# 使用示例
import wave
def detect_speech_in_file(wav_path: str):
vad = VoiceActivityDetector(aggressiveness=2, silence_duration_ms=600)
with wave.open(wav_path, "rb") as wf:
assert wf.getsampwidth() == 2 # 16bit
assert wf.getframerate() == 16000
frame_size = int(16000 * 30 / 1000) # 30ms 帧
speech_segments = []
current_segment = bytearray()
while True:
frame = wf.readframes(frame_size)
if len(frame) < frame_size * 2:
break
result = vad.process_frame(frame)
if result["event"] == "speech_start":
current_segment = bytearray(result["audio"])
elif result["event"] == "speaking":
pass # 正在说话,继续收集
elif result["event"] == "speech_end":
current_segment.extend(result["audio"] or b"")
speech_segments.append(bytes(current_segment))
print(f"检测到语音段: {len(current_segment) / 16000 / 2:.2f}s")
current_segment.clear()
return speech_segments方案二:Silero VAD(深度学习模型,更准确)
import torch
class SileroVAD:
"""基于 Silero 的深度学习 VAD,准确率更高"""
def __init__(self, threshold: float = 0.5):
self.model, utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
force_reload=False,
)
self.get_speech_timestamps, _, self.read_audio, _, _ = utils
self.threshold = threshold
def detect_speech_timestamps(self, audio_path: str) -> list:
"""检测语音的时间戳段"""
wav = self.read_audio(audio_path, sampling_rate=16000)
speech_timestamps = self.get_speech_timestamps(
wav,
self.model,
threshold=self.threshold,
min_silence_duration_ms=500,
speech_pad_ms=200,
)
return speech_timestamps
def is_speech_chunk(self, audio_chunk: torch.Tensor) -> float:
"""判断单个音频块的语音概率"""
return self.model(audio_chunk, 16000).item()
# 使用
vad = SileroVAD(threshold=0.5)
timestamps = vAD.detect_speech_timestamps("audio.wav")
for ts in timestamps:
start = ts['start'] / 16000
end = ts['end'] / 16000
print(f"语音段: {start:.2f}s - {end:.2f}s")Q: 如何设计完整的语音 Agent 架构?⭐⭐⭐
答:
一个生产级语音 Agent 需要考虑并发、容错、会话管理等。
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from typing import Optional, Dict, Callable, Awaitable
from enum import Enum
class SessionState(Enum):
IDLE = "idle"
LISTENING = "listening"
PROCESSING = "processing"
SPEAKING = "speaking"
@dataclass
class VoiceSession:
session_id: str
state: SessionState = SessionState.IDLE
conversation_history: list = field(default_factory=list)
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
audio_buffer: bytearray = field(default_factory=bytearray)
language: str = "zh"
class VoiceAgent:
"""
生产级语音 Agent 架构
支持多会话、状态管理、中断处理
"""
def __init__(self, stt, tts, llm, vad):
self.stt = stt
self.tts = tts
self.llm = llm
self.vad = vad
self.sessions: Dict[str, VoiceSession] = {}
self._audio_queues: Dict[str, asyncio.Queue] = {}
def create_session(self) -> str:
session_id = str(uuid.uuid4())
session = VoiceSession(session_id=session_id)
self.sessions[session_id] = session
self._audio_queues[session_id] = asyncio.Queue()
return session_id
async def handle_audio_frame(self, session_id: str, audio_data: bytes):
"""处理来自客户端的音频帧"""
session = self.sessions.get(session_id)
if not session:
return
session.last_active = time.time()
# VAD 检测
if session.state == SessionState.SPEAKING:
# 如果正在播放,用户说话则中断
is_speech = self.vad.is_speech(audio_data)
if is_speech:
await self._interrupt_speech(session)
session.state = SessionState.LISTENING
session.audio_buffer.clear()
return
# 状态机处理
if session.state == SessionState.IDLE:
is_speech = self.vad.is_speech(audio_data)
if is_speech:
session.state = SessionState.LISTENING
session.audio_buffer.extend(audio_data)
elif session.state == SessionState.LISTENING:
is_speech = self.vad.is_speech(audio_data)
session.audio_buffer.extend(audio_data)
if not is_speech:
# 检测到静音,判断是否结束
session._silence_frames = getattr(session, '_silence_frames', 0) + 1
if session._silence_frames > 25: # ~750ms 静音
session.state = SessionState.PROCESSING
asyncio.create_task(self._process_speech(session))
else:
session._silence_frames = 0
async def _process_speech(self, session: VoiceSession):
"""处理识别到的语音"""
try:
# STT
audio_bytes = bytes(session.audio_buffer)
session.audio_buffer.clear()
text = await self.stt.transcribe(audio_bytes)
if not text.strip():
session.state = SessionState.IDLE
return
session.conversation_history.append({"role": "user", "content": text})
# LLM(流式生成)
session.state = SessionState.SPEAKING
full_response = ""
# 流式 TTS:LLM 每生成一句话就立即合成
sentence_buffer = ""
async for token in self.llm.stream_chat(session.conversation_history):
full_response += token
sentence_buffer += token
# 按句子分割,减少首句延迟
if any(p in token for p in "。!?.!?\n"):
await self._speak_text(session, sentence_buffer)
sentence_buffer = ""
if sentence_buffer:
await self._speak_text(session, sentence_buffer)
session.conversation_history.append({
"role": "assistant",
"content": full_response
})
except Exception as e:
print(f"Error processing speech: {e}")
finally:
session.state = SessionState.IDLE
async def _speak_text(self, session: VoiceSession, text: str):
"""合成并发送语音"""
async for audio_chunk in self.tts.stream_synthesize(text):
await self._audio_queues[session.session_id].put(audio_chunk)
async def _interrupt_speech(self, session: VoiceSession):
"""中断当前语音播放"""
# 清空音频队列
queue = self._audio_queues.get(session.session_id)
if queue:
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
breakQ: 如何优化语音交互的端到端延迟?⭐⭐⭐
答:
语音交互的延迟直接影响用户体验,目标是 首字节延迟 < 500ms。
"""
延迟优化策略总览:
用户说话结束 → STT (200-500ms) → LLM 首 token (200-500ms) → TTS 首帧 (100-300ms)
总延迟目标: < 1s(人感知流畅的阈值)
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
class LowLatencyVoicePipeline:
"""低延迟语音交互流水线"""
def __init__(self, stt, tts, llm):
self.stt = stt
self.tts = tts
self.llm = llm
self.executor = ThreadPoolExecutor(max_workers=4)
# 策略1: 流式 STT - 边录边识别
async def streaming_stt(self, audio_chunks):
"""流式语音识别,不需要等待整段录音完成"""
partial_results = []
async for chunk in audio_chunks:
# 每收到一定量的音频就尝试部分识别
result = await self.stt.transcribe_streaming(chunk)
if result.text != partial_results[-1] if partial_results else True:
partial_results.append(result.text)
yield result.text
# 策略2: LLM + TTS Pipeline 化 - 边生成边合成
async def pipelined_response(self, user_text: str):
"""LLM 流式生成 + TTS 流式合成并行"""
sentence_buffer = ""
tts_tasks = []
async for token in self.llm.stream_chat(user_text):
sentence_buffer += token
# 遇到句号等标点就启动 TTS(不等待完成)
if any(p in token for p in "。!?.!?\n"):
# 创建异步任务,不等待完成
task = asyncio.create_task(
self._collect_tts_audio(sentence_buffer)
)
tts_tasks.append(task)
sentence_buffer = ""
# 立即 yield 已完成的 TTS 结果
if len(tts_tasks) >= 2:
audio = await tts_tasks.pop(0)
yield audio
# 处理剩余
if sentence_buffer:
task = asyncio.create_task(
self._collect_tts_audio(sentence_buffer)
)
tts_tasks.append(task)
for task in tts_tasks:
yield await task
async def _collect_tts_audio(self, text: str) -> bytes:
"""收集 TTS 的所有音频数据"""
chunks = []
async for chunk in self.tts.stream_synthesize(text):
chunks.append(chunk)
return b"".join(chunks)
# 策略3: 预热模型
async def warmup(self):
"""系统启动时预热所有模型"""
tasks = [
asyncio.create_task(self._warmup_stt()),
asyncio.create_task(self._warmup_tts()),
asyncio.create_task(self._warmup_llm()),
]
await asyncio.gather(*tasks)
print("All models warmed up")
async def _warmup_stt(self):
"""用空音频预热 STT 模型"""
import numpy as np
dummy_audio = np.zeros(16000, dtype=np.float32) # 1秒静音
await self.stt.transcribe(dummy_audio)
async def _warmup_tts(self):
"""用简短文本预热 TTS 模型"""
async for _ in self.tts.stream_synthesize("你好"):
pass
async def _warmup_llm(self):
"""预热 LLM"""
async for _ in self.llm.stream_chat("hi"):
break
# 策略4: 音频缓冲与播放优化
class AudioJitterBuffer:
"""抖动缓冲区,平滑音频播放"""
def __init__(self, min_buffer_ms: int = 100):
self.buffer = bytearray()
self.min_buffer_bytes = int(16000 * 2 * min_buffer_ms / 1000)
self.playing = False
async def add_chunk(self, chunk: bytes) -> list[bytes]:
"""添加音频块,返回可以播放的块"""
self.buffer.extend(chunk)
if not self.playing and len(self.buffer) >= self.min_buffer_bytes:
self.playing = True
if self.playing and len(self.buffer) >= self.min_buffer_bytes:
# 输出一个播放块
output = bytes(self.buffer[:self.min_buffer_bytes])
self.buffer = self.buffer[self.min_buffer_bytes:]
return [output]
return []延迟优化 Checklist:
| 优化点 | 方法 | 预期收益 |
|---|---|---|
| STT 加速 | faster-whisper int8 量化 | 减少 50%+ 推理时间 |
| LLM 首 token | 使用流式 API | 感知延迟大幅降低 |
| TTS 流式 | 边生成边合成 | 减少 200-400ms |
| 句子级流水线 | 不等 LLM 全部生成就开始 TTS | 减少 300-500ms |
| 模型预热 | 启动时加载并跑一次 | 避免首次冷启动 |
| 音频编码 | 使用 Opus 编码 | 减少带宽,降低传输延迟 |
| 地理就近 | 就近部署或使用 CDN | 减少网络延迟 |
Q: 如何实现 GPT-4o 风格的实时语音对话?⭐⭐⭐
答:
GPT-4o Realtime API 使用 WebSocket 进行全双工语音通信。
import asyncio
import websockets
import json
import base64
import numpy as np
class GPT4oRealtimeVoice:
"""
OpenAI Realtime API 客户端
支持全双工语音交互:同时收听和说话
"""
def __init__(self, api_key: str, model: str = "gpt-4o-realtime-preview"):
self.api_key = api_key
self.model = model
self.ws = None
self.on_audio = None # 回调: 收到音频时
self.on_text = None # 回调: 收到文本时
self.on_transcript = None # 回调: 收到转录时
async def connect(self):
"""建立 WebSocket 连接"""
url = f"wss://api.openai.com/v1/realtime?model={self.model}"
self.ws = await websockets.connect(
url,
additional_headers={
"Authorization": f"Bearer {self.api_key}",
"OpenAI-Beta": "realtime=v1",
},
)
# 配置会话
await self._send_event({
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"instructions": "你是一个有帮助的中文语音助手。请用中文回答,语气自然亲切。",
"voice": "alloy",
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1",
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
},
"temperature": 0.8,
},
})
async def _send_event(self, event: dict):
await self.ws.send(json.dumps(event))
async def send_audio(self, audio_bytes: bytes):
"""发送音频数据(PCM16, 24kHz, 单声道)"""
audio_b64 = base64.b64encode(audio_bytes).decode()
await self._send_event({
"type": "input_audio_buffer.append",
"audio": audio_b64,
})
async def send_text(self, text: str):
"""发送文本消息"""
await self._send_event({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": text}],
},
})
await self._send_event({"type": "response.create"})
async def listen(self):
"""监听服务端事件"""
async for message in self.ws:
event = json.loads(message)
event_type = event.get("type", "")
if event_type == "response.audio.delta":
# 收到音频片段
audio_bytes = base64.b64decode(event["delta"])
if self.on_audio:
await self.on_audio(audio_bytes)
elif event_type == "response.audio_transcript.delta":
# AI 的语音转文字
delta = event["delta"]
if self.on_text:
await self.on_text(delta)
elif event_type == "conversation.item.input_audio_transcription.completed":
# 用户语音的转录结果
transcript = event["transcript"]
if self.on_transcript:
await self.on_transcript(transcript)
elif event_type == "error":
print(f"Error: {event}")
async def close(self):
if self.ws:
await self.ws.close()
# 使用示例
async def main():
client = GPT4oRealtimeVoice(api_key="sk-xxx")
client.on_transcript = lambda t: print(f"用户: {t}")
client.on_text = lambda t: print(f"AI: {t}", end="", flush=True)
client.on_audio = lambda a: play_audio(a) # 播放音频
await client.connect()
# 启动监听任务
listen_task = asyncio.create_task(client.listen())
# 发送音频(从麦克风采集)
await stream_microphone(client)
listen_task.cancel()
async def stream_microphone(client):
"""从麦克风流式采集并发送音频"""
import pyaudio
pa = pyaudio.PyAudio()
stream = pa.open(
format=pyaudio.paInt16,
channels=1,
rate=24000,
input=True,
frames_per_buffer=1024,
)
try:
while True:
data = stream.read(1024, exception_on_overflow=False)
await client.send_audio(data)
await asyncio.sleep(0.01) # 控制发送速率
finally:
stream.stop_stream()
stream.close()
pa.terminate()Q: 如何实现多语言语音支持?⭐⭐
答:
多语言支持需要在 STT、LLM、TTS 三个环节都做好处理。
import asyncio
from typing import Optional
from langdetect import detect
class MultiLanguageVoiceAgent:
"""支持多语言的语音 Agent"""
# 语言映射配置
LANG_CONFIG = {
"zh": {
"stt_model": "whisper-large-v3",
"stt_prompt": "以下是普通话的句子。",
"tts_voice": "zh-CN-XiaoxiaoNeural",
"tts_edge_voice": "zh-CN-YunxiNeural",
"system_prompt": "请用中文回答。",
},
"en": {
"stt_model": "whisper-large-v3",
"stt_prompt": "",
"tts_voice": "en-US-JennyNeural",
"tts_edge_voice": "en-US-GuyNeural",
"system_prompt": "Please respond in English.",
},
"ja": {
"stt_model": "whisper-large-v3",
"stt_prompt": "",
"tts_voice": "ja-JP-NanamiNeural",
"tts_edge_voice": "ja-JP-KeitaNeural",
"system_prompt": "日本語で回答してください。",
},
"ko": {
"stt_model": "whisper-large-v3",
"stt_prompt": "",
"tts_voice": "ko-KR-SunHiNeural",
"tts_edge_voice": "ko-KR-InJoonNeural",
"system_prompt": "한국어로 대답해 주세요.",
},
}
def __init__(self, stt, tts, llm):
self.stt = stt
self.tts = tts
self.llm = llm
self.current_lang = "zh" # 默认语言
def detect_language(self, text: str) -> str:
"""检测语言"""
try:
lang = detect(text)
# langdetect 返回 ISO 639-1 代码
if lang in self.LANG_CONFIG:
return lang
# 某些语言的映射
lang_map = {"zh-cn": "zh", "zh-tw": "zh"}
return lang_map.get(lang, "en")
except:
return self.current_lang
async def transcribe_with_lang(self, audio_bytes: bytes,
language: Optional[str] = None) -> tuple[str, str]:
"""识别语音并返回 (文本, 语言)"""
# 如果未指定语言,让 Whisper 自动检测
if language:
config = self.LANG_CONFIG[language]
text = await self.stt.transcribe(
audio_bytes,
language=language,
initial_prompt=config["stt_prompt"],
)
return text, language
else:
# 自动检测
text = await self.stt.transcribe(audio_bytes)
detected_lang = self.detect_language(text)
return text, detected_lang
async def respond(self, user_text: str, language: str) -> str:
"""根据语言生成回复"""
config = self.LANG_CONFIG.get(language, self.LANG_CONFIG["en"])
messages = [
{"role": "system", "content": config["system_prompt"]},
{"role": "user", "content": user_text},
]
response = await self.llm.chat(messages)
return response
async def synthesize_speech(self, text: str, language: str) -> bytes:
"""根据语言合成语音"""
config = self.LANG_CONFIG.get(language, self.LANG_CONFIG["en"])
audio_chunks = []
async for chunk in self.tts.stream_synthesize(text, voice=config["tts_voice"]):
audio_chunks.append(chunk)
return b"".join(audio_chunks)
async def process_audio(self, audio_bytes: bytes) -> bytes:
"""完整处理流程"""
# 1. STT(带语言检测)
text, language = await self.transcribe_with_lang(audio_bytes)
print(f"[{language}] 用户: {text}")
# 2. 生成回复
response = await self.respond(text, language)
print(f"[{language}] AI: {response}")
# 3. TTS
audio = await self.synthesize_speech(response, language)
return audioQ: 如何构建音频处理流水线?⭐⭐
答:
生产环境中,音频在送入 STT 之前需要经过一系列预处理。
import numpy as np
from scipy import signal
import io
import wave
class AudioProcessor:
"""音频预处理流水线"""
@staticmethod
def convert_sample_rate(audio: np.ndarray,
orig_sr: int, target_sr: int) -> np.ndarray:
"""采样率转换(如 48kHz → 16kHz)"""
if orig_sr == target_sr:
return audio
# 使用 scipy 的 resample
duration = len(audio) / orig_sr
target_length = int(duration * target_sr)
resampled = signal.resample(audio, target_length)
return resampled.astype(np.float32)
@staticmethod
def normalize_audio(audio: np.ndarray, target_db: float = -20.0) -> np.ndarray:
"""音量归一化"""
rms = np.sqrt(np.mean(audio ** 2))
if rms == 0:
return audio
current_db = 20 * np.log10(rms)
gain = target_db - current_db
return audio * (10 ** (gain / 20))
@staticmethod
def remove_silence(audio: np.ndarray, sample_rate: int = 16000,
threshold_db: float = -40,
min_silence_ms: int = 200) -> np.ndarray:
"""去除首尾静音"""
threshold = 10 ** (threshold_db / 20)
# 找到有效音频的起止位置
frame_size = int(sample_rate * 0.01) # 10ms 帧
energy = np.array([
np.sqrt(np.mean(audio[i:i+frame_size] ** 2))
for i in range(0, len(audio) - frame_size, frame_size)
])
above_threshold = np.where(energy > threshold)[0]
if len(above_threshold) == 0:
return np.array([], dtype=np.float32)
start = max(0, above_threshold[0] * frame_size - frame_size * 5)
end = min(len(audio), above_threshold[-1] * frame_size + frame_size * 5)
return audio[start:end]
@staticmethod
def apply_noise_gate(audio: np.ndarray, threshold_db: float = -35) -> np.ndarray:
"""噪声门:低于阈值的信号设为零"""
threshold = 10 ** (threshold_db / 20)
gate = np.abs(audio) > threshold
return audio * gate
@staticmethod
def apply_high_pass_filter(audio: np.ndarray, sample_rate: int = 16000,
cutoff_hz: float = 80) -> np.ndarray:
"""高通滤波器:去除低频噪音(如空调嗡嗡声)"""
nyquist = sample_rate / 2
normalized_cutoff = cutoff_hz / nyquist
b, a = signal.butter(4, normalized_cutoff, btype='high')
return signal.filtfilt(b, a, audio).astype(np.float32)
@staticmethod
def convert_format(audio_bytes: bytes,
from_format: str, to_format: str,
sample_rate: int = 16000) -> bytes:
"""音频格式转换"""
import subprocess
process = subprocess.Popen(
['ffmpeg', '-i', 'pipe:0',
'-f', to_format,
'-ar', str(sample_rate),
'-ac', '1',
'pipe:1'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
output, _ = process.communicate(input=audio_bytes)
return output
class AudioPipeline:
"""完整的音频处理流水线"""
def __init__(self, target_sr: int = 16000):
self.target_sr = target_sr
self.processor = AudioProcessor()
def process(self, audio_bytes: bytes,
source_sr: int = 44100,
source_format: str = "wav") -> np.ndarray:
"""
完整处理流程:
原始音频 → 格式转换 → 采样率转换 → 去噪 → 归一化 → 去静音
"""
# 1. 转为 numpy 数组
if source_format == "wav":
with wave.open(io.BytesIO(audio_bytes), "rb") as wf:
raw_data = wf.readframes(wf.getnframes())
audio = np.frombuffer(raw_data, dtype=np.int16).astype(np.float32) / 32768.0
source_sr = wf.getframerate()
else:
# 使用 ffmpeg 转换
wav_bytes = self.processor.convert_format(
audio_bytes, source_format, "wav", self.target_sr
)
with wave.open(io.BytesIO(wav_bytes), "rb") as wf:
raw_data = wf.readframes(wf.getnframes())
audio = np.frombuffer(raw_data, dtype=np.int16).astype(np.float32) / 32768.0
source_sr = self.target_sr
# 2. 采样率转换
audio = self.processor.convert_sample_rate(audio, source_sr, self.target_sr)
# 3. 高通滤波
audio = self.processor.apply_high_pass_filter(audio, self.target_sr)
# 4. 音量归一化
audio = self.processor.normalize_audio(audio, target_db=-20.0)
# 5. 去除首尾静音
audio = self.processor.remove_silence(audio, self.target_sr)
return audioQ: 如何在生产环境中监控和调试语音系统?⭐⭐
答:
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class VoiceMetrics:
"""语音系统性能指标"""
# 延迟指标 (ms)
vad_latency_ms: float = 0
stt_latency_ms: float = 0
llm_first_token_ms: float = 0
llm_total_ms: float = 0
tts_first_chunk_ms: float = 0
tts_total_ms: float = 0
# 端到端
e2e_latency_ms: float = 0 # 用户停止说话 → 听到第一个音频
# 质量指标
stt_confidence: float = 0
audio_duration_ms: float = 0
tts_audio_duration_ms: float = 0
# 会话信息
session_id: str = ""
language: str = ""
text_input: str = ""
text_output: str = ""
class VoiceMetricsCollector:
def __init__(self):
self.metrics_history: list[VoiceMetrics] = []
def calculate_e2e(self, m: VoiceMetrics) -> float:
"""计算端到端延迟"""
m.e2e_latency_ms = (
m.stt_latency_ms +
m.llm_first_token_ms +
m.tts_first_chunk_ms
)
return m.e2e_latency_ms
def get_summary(self) -> dict:
"""获取统计数据摘要"""
if not self.metrics_history:
return {}
recent = self.metrics_history[-100:] # 最近 100 次交互
def avg(key):
values = [getattr(m, key) for m in recent if getattr(m, key) > 0]
return sum(values) / len(values) if values else 0
def p95(key):
values = sorted([getattr(m, key) for m in recent if getattr(m, key) > 0])
if not values:
return 0
idx = int(len(values) * 0.95)
return values[min(idx, len(values) - 1)]
return {
"total_interactions": len(self.metrics_history),
"avg_e2e_latency_ms": round(avg("e2e_latency_ms"), 1),
"p95_e2e_latency_ms": round(p95("e2e_latency_ms"), 1),
"avg_stt_ms": round(avg("stt_latency_ms"), 1),
"avg_llm_first_token_ms": round(avg("llm_first_token_ms"), 1),
"avg_tts_first_chunk_ms": round(avg("tts_first_chunk_ms"), 1),
}
def log_metrics(self, metrics: VoiceMetrics):
"""记录并输出指标"""
self.metrics_history.append(metrics)
self.calculate_e2e(metrics)
logging.info(
f"[VoiceMetrics] session={metrics.session_id[:8]} "
f"lang={metrics.language} "
f"stt={metrics.stt_latency_ms:.0f}ms "
f"llm_ft={metrics.llm_first_token_ms:.0f}ms "
f"tts_ft={metrics.tts_first_chunk_ms:.0f}ms "
f"e2e={metrics.e2e_latency_ms:.0f}ms"
)
class InstrumentedVoicePipeline:
"""带监控的语音流水线"""
def __init__(self, stt, tts, llm, vad):
self.stt = stt
self.tts = tts
self.llm = llm
self.vad = vad
self.metrics = VoiceMetricsCollector()
async def process(self, audio_bytes: bytes, session_id: str = "") -> bytes:
m = VoiceMetrics(session_id=session_id)
# STT
t0 = time.monotonic()
text = await self.stt.transcribe(audio_bytes)
m.stt_latency_ms = (time.monotonic() - t0) * 1000
m.text_input = text
# LLM (流式)
t0 = time.monotonic()
response_text = ""
first_token = True
async for token in self.llm.stream_chat(text):
if first_token:
m.llm_first_token_ms = (time.monotonic() - t0) * 1000
first_token = False
response_text += token
m.llm_total_ms = (time.monotonic() - t0) * 1000
m.text_output = response_text
# TTS (流式)
t0 = time.monotonic()
audio_chunks = []
first_chunk = True
async for chunk in self.tts.stream_synthesize(response_text):
if first_chunk:
m.tts_first_chunk_ms = (time.monotonic() - t0) * 1000
first_chunk = False
audio_chunks.append(chunk)
m.tts_total_ms = (time.monotonic() - t0) * 1000
# 记录指标
self.metrics.log_metrics(m)
return b"".join(audio_chunks)指标看板示例输出:
=== Voice System Metrics ===
Total interactions: 1523
Avg E2E latency: 847ms (P95: 1243ms)
STT: avg 234ms (P95: 412ms)
LLM 1st token: avg 312ms (P95: 523ms)
TTS 1st chunk: avg 189ms (P95: 301ms)
===========================本章总结: 语音交互系统的核心挑战在于低延迟和高质量。关键优化策略包括:使用 faster-whisper 进行本地加速推理、LLM + TTS pipeline 化减少等待、VAD 实现准确的语音端点检测、WebSocket 实现全双工通信。生产环境中需要重点关注端到端延迟监控,目标控制在 1 秒以内以提供流畅体验。