// Blog Entry
基于意图路由的实时语音Agent
当提及ai的应用时候,我想到的就是一个语音对话助手.大概是因为我是蜂群的原因,从去年的MoeChat的尝试,到更早的Fake-neuro,我一直期待的拥有一个属于我自己的个人的Neuro,随着ai的落地应用的不断发展,我真的有时候感觉这个时间越来越快的到来了
现在,让我们开始分析一下Neruo实现了什么的功能(except Live 2D)
最基础的:感知,思考,表达.这三者的实现是一个最基础的语音助手,即像是”豆包”或是其他云端api调用的本地语音聊天
在进阶一点,我们知道,Neruo是通过osu!发展而来的2,同样在MineCraft中进行游戏而知名.所以基本是实现了类似YOLO的多模态识别(osu和MineCraft大概是接的程序脚本)和function-calling.此外,作为虚拟主播,Neuro毫无置疑的拥有长短时记忆
actually,我这个太简陋太简陋了,也就是说,我这个的流程和Neuro的基本一致,但是,在质量上真的是相差甚远,仅仅作为了解相关知识的实操还算勉强够格.
我们先来看看最基础的第一部分:感知,思考和表达.
感知,我目前也就搞了两个部分,屏幕截图以及ASR识别音频(whisper).
ASR主要是通过sounddevice 库直接挂载到系统的默认录音设备,异步采集每一帧声音,然后被存进一个列表.首尾拼接成一个长音频
async def record_until_stop(self, output_path: Optional[Path] = None) -> Path:
"""录制音频直到调用 stop()"""
try:
import sounddevice as sd
# ... (路径初始化逻辑)
logger.info("[RECORDER] 开始录音...")
self.is_recording = True
self.frames = [] # 1. 磁带初始化:清空之前的录音数据
# 2. 核心回调函数:这是由声卡硬件驱动定期触发的
def callback(indata, frames, time, status):
if status:
logger.warning(f"[RECORDER] 状态: {status}")
if self.is_recording:
# 3. 磁带延伸:将当前采集到的声音片段(chunk)存入列表
# 必须使用 .copy(),因为 indata 的内存空间会被声卡循环复用
self.frames.append(indata.copy())
# 4. 开启异步流:这是与硬件通信的桥梁
with sd.InputStream(
samplerate=self.sample_rate, # 16000Hz
channels=self.channels, # 单声道:减少处理开销
callback=callback # 绑定回调函数
):
# 5. 挂起等待:只要 self.is_recording 为 True,磁带就会一直录制
while self.is_recording:
await asyncio.sleep(0.1)
# 6. 后处理:将无数个小片段拼成一整段完整的音频
if self.frames:
import numpy as np
audio_data = np.concatenate(self.frames, axis=0) # 把磁带首尾拼接
self._save_wav(output_path, audio_data) # 持久化到硬盘供 ASR 识别
return output_path
这部分的优化进阶是等到self.frames 达到一定长度(比如 200ms 的音频数据),就立即将其转换成字节流发送出去.ASR服务不断返回中间识别结果.流式输入的优点一是输入识别的文字更快,二是实现了语音的打断
等到self.silence_frames(静音时长)达到阈值时, 置process_flag并通知后端处理.即调用client.audio.transcriptions.create发送到 Whisper 模型,结果返回transcript.text.
对于视觉,我采用的是定时截图并调用Gemini1.5进行分析,然后如果有工具调用的需求进行主动截图并调用Gemini.
思考这部分是个非常大的部分,
按理来说,像是Neruo和ai琉璃,这些都是预训练模型经过微调后的.我本来也是想搞预训练的,结果Liunx系统炸掉了导致不了了之.龟龟总不能自己预训练一个LLM吧
如果要微调大模型的话,对于我的需求来说,7B的应该是可以满足日常需求的.
目前来说,我是简单的调用了Deepseek的api以及逆向出来的ikuncode中转站的codex的api(这个延迟很高后来弃用了).\
import asyncio
from openai import AsyncOpenAI
class DeepSeekClient:
def __init__(self, api_key, base_url="https://api.deepseek.com"):
# 优先初始化专用客户端
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
async def chat_with_deepseek(self, messages, model="deepseek-chat", stream=True):
try:
# 执行异步请求
response = await self.client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
stream=stream
)
if stream:
# 返回一个异步生成器,用于实时渲染文字
return self._stream_generator(response)
else:
return response.choices[0].message.content
except Exception as e:
return f"[API Error] 呼叫 DeepSeek 失败: {str(e)}"
async def _stream_generator(self, response):
async for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
输出方面也是简单采用的GPT-SoVITs,GPT-SoVITs有两种方案,一种是“即插即用”的 Few-shot(少样本克隆),另一种是比较复杂的 Fine-tuning(微调训练)。我目前使用的是第一种.我倒是也有第二种的语气权重(在B站乞讨到的),不过目前还没有进行运行.
Few-shot是将参考音频作为一种“视觉/听觉提示词(Prompt)”,将提供的参考音频转换成声学向量(Acoustic Tokens),同时将参考音频对应的文本转为音素(Phonemes)。GPT 模型会学习参考音频里的语速、情感起伏、停顿习惯。当输入新的目标文本时,GPT 模型会参考上述的“提示”,预测出目标文本应该对应的声学特征序列。
而Fine-tuning就是使其默认输出就是该角色的声音。
然后就是流式输出,流式输出真的很重要,真的,如果输出很长的句子的话可能会差出来十几秒的时间差.流式输出真的太重要了
流式输出主要是在 api_client.py 中,开启 stream=True,此时LLM一个字一个字的输出,由于 TTS 无法直接合成单个汉字(没有语调),设置一个缓冲区,遇到标点符号时阶段,抄送到TTS进行输出.
async def synthesize_stream(self, text: str):
params = {
"text": text,
"streaming_mode": 1, # 开启 API 端的流式输出
"media_type": "wav"
}
# 通过异步迭代器,边收到音频块边抛给播放器
async for chunk in response.content.iter_any():
if chunk:
yield chunk
到此为止,你的一个简单的单纯和你聊天的语音ai就构建完毕了,但是如果你想要ai帮助你做一些事情,此时就需要FunctionCalling以及Agent了(简而言之,Function是一段固定好的代码,而agent是一个以大模型为核心引擎的自主系统.)
对于Tools,我简单的定义了四个: pc_control(包括get_time,system_info,screenshot和files操作);web_search;file_search(通过everything的CLI);document_qa
那么什么时候应该调用什么工具呢?我本来是采用的FunctionCalling,即提取特征关键词 -> 返回JSON指令.不过对于口语化的表达,关键词往往不能全部的覆盖,判断往往会不如人意
# intent_detector.py
class IntentDetector:
def __init__(self):
# 时间查询关键词
self.time_keywords = [
"几点", "现在时间", "什么时候", "当前时间", "time", "现在几点"
]
# 截图关键词
self.screenshot_keywords = [
"截图", "看看屏幕", "看一下", "截屏", "screenshot", "我在干嘛"
]
# 文件搜索关键词
self.search_keywords = [
"搜索", "查找", "找", "search", "find"
]
# 联网搜索关键词
self.web_search_keywords = [
"搜一下", "查一下", "百度", "谷歌", "天气", "新闻"
]
于是我决定采用Workflow,即使用一个更小、更快的模型做Router,如果它判断不需要调用工具,直接结束;如果需要,再调大模型。
class RouterEngine:
def __init__(self):
self.api_client = get_api_client()
# 指定一个极速、廉价的模型作为路由器
self.router_model = config.get_setting("models.router", "deepseek-chat")
async def analyze(self, user_input: str) -> Dict[str, Any]:
return await self._ai_based_check(user_input)
async def _ai_based_check(self, user_input: str) -> Dict[str, Any]:
# 构造极其精简的 Prompt,只要求返回 JSON
prompt = f"你是一个意图分析助手。用户输入: '{user_input}'。判断:1.是否需调用工具? 2.哪种工具(time/file/web/control/screenshot)?只返回 JSON。"
response = await self.api_client.chat(
messages=[{"role": "user", "content": prompt}],
model=self.router_model,
stream=False,
temperature=0.1 # 极低温度确保输出结果稳定
)
# 提取并解析 JSON 指令
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
result = json.loads(match.group())
return {
"need_tool": result.get("need_tool", False),
"tool_type": result.get("tool_type"),
"reason": result.get("reason", "AI 语义分析")
}
return {"need_tool": False}
但是,但是,这样对于一个以对话为主要目的的大模型还是本末倒置了,导致了平均10s多的对话延迟,同时还要兼顾和我对话导致Tools的调用决策更差
于是针对对话延迟以及性能优化,我决定采用双层模型架构agent:
第一层是聊天层,它不负责查资料,只负责根据当前的氛围说出一些闲聊
第二层是工具层,与聊天层同步启动 _tool_layer 任务,通过 RouterEngine 进行 DeepSeek 快速分析,判断是否需要调用工具。
当工具层拿到数据后(如查询到当前时间),它会竖起 has_data 的旗标,系统再次调用聊天层,生成第二段语音输出。
async def process(self, user_input: str) -> List[str]:
# 同时启动两个异步任务
chat_task = asyncio.create_task(self._chat_layer_fast()) # 快速生成过渡回复
tool_task = asyncio.create_task(self._tool_layer(user_input)) # 后台异步执行工具
# 等待极速回复(通常 3-5s 即返回第一阶段音频内容)
chat_response = await chat_task
responses = [chat_response]
# 后台获取工具执行结果
tool_result = await tool_task
# 若工具获取到真实数据,则触发第二次独立对话生成最终回复
if tool_result and tool_result.get("has_data"):
new_user_msg = f"[系统提示] 刚才查询的结果是:{self._format_tool_data_simple(tool_result['data'])}"
self.memory.add_user_message(new_user_msg)
final_response = await self._chat_layer_fast()
responses.append(final_response)
return responses
梦想有一个“数字生命”,长短时记忆绝对是不可或缺的部分,我选择上下文滑动窗口作为短期记忆.
简单的RAG进行长期记忆的算法:
在生成回复前,Agent 会先去 long_term_memory 里搜一下是否有相关的“陈年旧事”。
如果有,就通过 _format_long_term_memories 返回得分最高的前两条(Top-K)。包装成“过往回忆”塞进 Prompt 里。\
我的RAG虽然简单,但它避免了引入复杂的向量数据库(Vector DB),对于个人项目来说非常轻量且够用。
# long_term.py 的关键词检索片段
for mem in self._memories:
content = mem.get("content", "")
# 统计关键词在历史记忆中命中的次数
score = sum(1 for word in query_lower.split() if word in content.lower())
if score > 0:
scored.append((score, mem))
到这里,我的简单的实时语音助手就算是完成了demo
未来还是想要完善 很多东西,比如说live2D =)
还有YOLO实时的检测屏幕和对话,什么主动的发起聊天啊,玩游戏时给我进行建议etc,采用记忆摘要(虽然有可能爆token)以及完善优化我的RAG…