在嵌入式终端上基于本地大模型实现的离线语音聊天机器人

引言

今年上半年,我曾在基于RK3566的嵌入式开发板上本地部署并运行大模型,当时仅能在终端界面使用文字进行交互,而我的进一步目标是实现本地的大模型语音交互。但限于我手里的开发板内存不足,无法加载本地的语音识别模型,同时经费有限,只能暂时搁置。下半年,遇到电源网和得捷举办DIY活动,提供600元的经费报销,使得该项目得以继续。

本文将介绍在嵌入式终端上,基于本地大模型实现的多语言离线语音聊天机器人。其中,语音识别、大模型推理、文字转语音(TTS)都是在嵌入式终端基于本地模型实现的,无需接入互联网环境。

特点

  • 基于嵌入式终端实现
  • 无需联网,完全离线运行
  • 多语言支持(中、英)
  • 代码开源,推理模型易部署及更换

硬件环境

  • Raspberry Pi 5(quad-core Arm Cortex A76 processor @ 2.4GHz, 8GB RAM)
  • WM8960 音频模块
  • 2寸LCD主屏+ 0.96寸OLED双副屏

rasp

软件及模型

整体框架

Scheme-it

(按要求使用Scheme-it绘制)

环境准备

安装llama推理框架

直接用pip安装:

1
pip install llama-cpp-python

ins_llama

安装SenseVoice依赖

在SenseVoice的Github仓库,提供了requirements.txt。如果是直接使用我提供的源码,无需拉取SenseVoice仓库,requirements.txt存放于\models\SenseVoiceSmall目录。使用pip安装必要的依赖:

1
pip install -r requirements.txt

ins-sensevoice

安装Piper TTS

Piper TTS是我目前找到的较优离线TTS,语音接近人声,加载速度快,完全离线运行。它无需特别安装,只需要下载编译好的二进制可执行文件,即可使用,我提供的源码已经直接包含,存放于\piper目录。

特别说明:上述安装截图中pip安装含有--break-system-packages选项,这是我的系统Python结构的原因,在其它系统Python环境下,可能是不需要的。

安装音频及显示驱动

本文使用的音频及显示模块,均来自微雪电子,可直接参考对应模块的教程,进行驱动安装即可,如有问题,可联系其技术支持。

获取本项目源码

  • 获取项目源码:

    本项目完整源码,通过gitee开源,可通过git拉取

  • 放置模型文件:

    受限于git仓库对单个文件的大小的限制,两个较大的模型文件单独提供网盘下载

注:源码网址见文末

代码说明

线程结构

本项目代码主要由多个线程组成,包含按键线程、录音线程、语音识别线程、模型推理线程、文字转语音线程、显示线程等,各线程通过事件进行触发并流转运作。

thread

主要线程代码

Key线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Device.pin_factory = LGPIOFactory()
# key init
key2 = Button(17)
def key2_pressed():
start_record_event.set()
stop_tts_event.set()
show_record_event.set()
def key2_released():
stop_record_event.set()
model_doing_event.set()
stop_tts_event.clear()
show_record_event.clear()
# Bind key press event
key2.when_pressed = key2_pressed
key2.when_released = key2_released

该线程主要监听按键的按下和释放事件,以触发语音录制及识别等相关动作。

录音线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def recording_thread():
while True:
start_record_event.wait()
start_record_event.clear()
device = "default"
wavfile = wave.open(f"{current_dir}/record.wav", "wb")
mic = alsaaudio.PCM(
alsaaudio.PCM_CAPTURE,
alsaaudio.PCM_NONBLOCK,
channels=1,
rate=44100,
format=alsaaudio.PCM_FORMAT_S16_LE,
periodsize=160,
device=device,
)
wavfile.setnchannels(1)
wavfile.setsampwidth(2) # PCM_FORMAT_S16_LE
wavfile.setframerate(44100)
print("Start speaking...")
time_start = datetime.now()
while True:
if stop_record_event.is_set():
stop_record_event.clear()
time_stop = datetime.now()
print("Stop speaking...")
wavfile.close()
if time_stop.timestamp() - time_start.timestamp() >= 1:
trig_sensevoice_event.set()
else:
print("The speaking time is too short")
model_doing_event.clear()
break
# Read data from device
l, data = mic.read()
if l:
wavfile.writeframes(data)
time.sleep(0.001)

录音线程,在KEY按下后被触发执行循环录制,KEY释放后退出录制。此处还做了简单的录音时长的判断,因为当录音时长过短时,后续的语音识别可能会报错。

语音识别线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def sensevoice_thread():
from model import SenseVoiceSmall
from funasr.utils.postprocess_utils import rich_transcription_postprocess

model_dir = f"{current_dir}/models/SenseVoiceSmall"
m, kwargs = SenseVoiceSmall.from_pretrained(model=model_dir, device="cuda:0")
m.eval()
senvc_load_done.set()
print("Load sensevoice model done")

while True:
trig_sensevoice_event.wait()
trig_sensevoice_event.clear()
res = m.inference(
data_in=f"{current_dir}/record.wav",
language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech"
use_itn=False,
ban_emo_unk=False,
**kwargs,
)

text = rich_transcription_postprocess(res[0][0]["text"])
ask_text_q.put(text)
trig_llama_event.set()

在录音线程正常执行完成后,触发执行语音识别线程。SenseVoice语音识别使用较为简单,可自动识别多种语言,生成的文本直接放置到消息队列中,供下一步模型推理使用。模块的初始import是较费时间的,为了不影响程序的整体加载时间,所以关于SenseVoice模块的import处理也放置在了线程中,而不是统一放在文件的开头。

模型推理线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def llama_thread():
global bool_Chinese_tts
model = llama_cpp.Llama(
model_path=f"{current_dir}/models/llama/qwen1_5-0_5b-chat-q4_0.gguf",
# n_ctx = 4096,
verbose=False,
)
ch_punctuations_re = "[,。?;]"
llama_load_done.set()
messages_history = []
max_msg_history = 2
print("Load llama model done")

while True:
trig_llama_event.wait()
trig_llama_event.clear()
ask_text = ask_text_q.get()
print(ask_text)
messages_history.append({"role": "user", "content": ask_text})
if len(messages_history) > max_msg_history:
messages_history = messages_history[-max_msg_history:]
ans_text = model.create_chat_completion(
messages=messages_history,
logprobs=False,
# stream=True,
repeat_penalty = 1.2,
max_tokens=100,
)
ans_text = ans_text["choices"][0]["message"]["content"]
messages_history.append({"role": "assistant", "content": ans_text})
if len(messages_history) > max_msg_history:
messages_history = messages_history[-max_msg_history:]
print(ans_text)
ans_text_tts = ans_text.replace(",", "。")
bool_Chinese_tts = bool(re.search(r"[\u4e00-\u9fff]", ans_text_tts)) # Chinese?
ans_text_q.put(ans_text_tts)
model_doing_event.clear()

模型推理部分,做了一些特殊处理:

  • 模型传入的输入信息,是包含前一次模型的输入及输出的,以便让模型每次推理具有一定的上下文信息。但增加上下文长度,会牺牲模型的推理速度,所以应该根据实际算力情况,合理规划上下文长度。
  • 模型做了一些参数设置,例如限制了最大输出Tokens数,以及重复性惩罚repeat_penalty等,避免模型一次输出太多信息,甚至重复输出一些无效信息。
  • 将模式输出文本做了处理,将中文逗号全部替换成中文句号,这主要是简单解决Piper TTS对中文逗号几乎没有语音停顿的局限性。
  • 判断模型输出的语言类型,以便让Piper TTS对应加载不同的语言模型。

注:

1.如果需要更换模型,只需要简单修改代码中model_path指定到对应的gguf文件即可

2.由于增加了1次对话上下文信息,所以一定程度牺牲了模型推理速度

3.考虑后续TTS输出的连贯性,模型未采用流式输出,所以感官上从模型输入到完整输出,推理会有较长的等待时间

文本转语音线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def tts_thread():
global bool_Chinese_tts
piper_cmd_zh = f"{current_dir}/piper/piper --model {current_dir}/models/piper/zh_CN-huayan-medium.onnx --output-raw | aplay -r 22050 -f S16_LE -t raw -"
piper_cmd_en = f"{current_dir}/piper/piper --model {current_dir}/models/piper/en_GB-jenny_dioco-medium.onnx --output-raw | aplay -r 22050 -f S16_LE -t raw -"
while True:
tts_text = ans_text_q.get()
if bool_Chinese_tts:
command = f'echo "{tts_text}" | {piper_cmd_zh}'
else:
command = f"echo {shlex.quote(tts_text)} | {piper_cmd_en}"
process = subprocess.Popen(
command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
while True:
if stop_tts_event.is_set():
terminate_process(process.pid)
break
if process.poll() is not None:
break
time.sleep(0.01)
process.wait()

Piper TTS通过命令行进行调用,所以对于英文的文本要使用shlex.quote()函数,对特殊字符进行预处理,避免文本里的字符影响命令行的结构。同时,在播放过程中,如果录音KEY被按下,则TTS会主动中断退出。

显示线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def oled_thread(oled_device, dir):
with Image.open(f"{current_dir}/img/{dir}_logo.bmp") as img:
img_resized = img.convert("1").resize((128, 64))
img_resized = ImageOps.invert(img_resized)
oled_device.display(img_resized)
llama_load_done.wait()
senvc_load_done.wait()
frames_eye = []
durations_eye = []
with Image.open(f"{current_dir}/img/{dir}_eye.gif") as img:
for frame in ImageSequence.Iterator(img):
frames_eye.append(frame.convert("1").resize((128, 64)))
durations_eye.append(frame.info.get("duration", 100) / 1000.0)
frames_rcd = []
durations_rcd = []
with Image.open(f"{current_dir}/img/record.gif") as img:
for frame in ImageSequence.Iterator(img):
frames_rcd.append(frame.convert("1").resize((128, 64)))
durations_rcd.append(frame.info.get("duration", 100) / 1000.0)

while True:
if show_record_event.is_set():
for frame, duration in zip(frames_rcd, durations_rcd):
oled_device.display(frame)
time.sleep(duration)
if not show_record_event.is_set():
break
else:
for frame, duration in zip(frames_eye, durations_eye):
if model_doing_event.is_set() and duration > 1:
continue
if duration > 1:
duration = duration * 2
oled_device.display(frame)
show_record_event.wait(timeout=duration)
if show_record_event.is_set():
break
else:
if dir == "left":
oled_events["left"].set()
oled_events["right"].wait()
oled_events["right"].clear()
else:
oled_events["right"].set()
oled_events["left"].wait()
oled_events["left"].clear()

显示线程主要用于协同显示当前的程序运行状态,例如录音时显示音频波形、模型推理时显示闭眼思考状态,推理完成后睁开眼睛并眨眼等。

效果演示




B站链接:

https://www.bilibili.com/video/BV1dSDzY6EHw/?spm_id_from=333.999.0.0&vd_source=b3875874dae83ca6abad9d14e2e3d762

小结

本文介绍了在嵌入式终端上,基于本地大模型实现的多语言离线语音聊天机器人。本项目在树莓派5上具体实现,代码完全开源,理论上可以运行于任何具有相当算力和资源的嵌入式终端。

(本项目完整源码请关注“ETRD”公众号,并回复关键词“聊天机器人” 获取)

ETRD