大数跨境
0
0

【小智AI语音开发板】开箱+性能测试+通过MCP服务实现自动化控制家电

【小智AI语音开发板】开箱+性能测试+通过MCP服务实现自动化控制家电 安信可科技
2025-08-04
1
导读:自动化控制家里的电器,效果非常惊喜!
图片

小伙伴们,小安派AI语音开发板上线一段时间了,安信可社区“疯狂发板”模式后,不少伙伴已经拿到开发板了,来看看大家的实际体验如何~


以下作品由安信可社区用户
zzbinfo制作


开箱



整个开发板组件装在防静电袋子中,很简洁。



打开防静电袋子,还有一个塑料袋子,袋子里面是个硬壳的盒子,开发组件就在盒子里面,防止运输过程中压坏开发板。



pcb前后都垫有减震棉,包装确实很用心。我们来仔细看看开发板。




开发板pcb尺寸



30mm.png


开发板35mm*30mm,尺寸小巧紧凑。很容易放置进合适的外壳中,对于我们业余DIY、开发测试非常友好。




主要芯片、模块



12F.png


主控模组就是安信可的Ai-WB2-12F WiFi模块,该模组搭载BL602芯片作为核心处理器,支持Wi-Fi 802.11b/g/n协议和BLE 5.0协议。


BL602芯片内置低功耗的32位RISC CPU,276KB RAM和丰富的外围接口,包括SDIO\SPI\UART\I2C\IR remote\PWM\ADC\DAC\PIR 和 GPIO等。每个外接端子都有丝端子采用的是SH1.25。


芯片.png

背面丝印LTH7的是pw4054,单节锂电充电芯片,支持3.7V锂电池的充电,输入电压4.5-6.5V,充电电流由旁边的电阻控制,支持10mA-500mA。板载的电阻是5k,换成2k,可以支持到500mA。板子上没有放电控制电路,所以外接的锂电池要带充放电控制板。


离线语音识别采用的是上海华振电子的语音大脑VB6824芯片方案。


内核采用高性能 32 位 RISC 内核,主频 240MHz,支持硬件浮点运算,内置 1MB SPI FLASH。


该方案支持AI语音识别,采用最新的神经网络(TDNN)算法,具有识别精准,误判率低等优势,5米远场可靠识别。支持语音降噪,可以有效过滤掉稳态噪声、对动态噪声也有很好的抑制作用,噪音下也可准确识别。支持 MP3,WAV,WMA,APE,FLAC,AAC,MP4,M4A,AIF,AIFC音频解码。蓝牙支持 SBC,AAC 音频解码音频,支持 mSBC 语音编解码器。符合蓝牙 V5.1+BR+EDR+ BLE 规范,提供+6dbm 发射功率。




上电



加电.png


接上type-c线后打开开关,随着红灯闪烁,语音提示清晰宏亮,根据语音提示和快速上手指南进行配网后就可以愉快地玩耍了。



小安AiPi-PalChatV1性能测试




配网测试



从输入密码点击确定开始,到提示配网成功大概耗时8秒到10秒,这个跟网络环境也有很大关系。网络不好,会造成对话反应变慢,声音输出打嗝等现象。具体可以看一下测试视频。






打断测试



由于采用的是VB6824语音识别方案,实现的对话中打断,效果还是不错的。可能是咪头和输出喇叭比较近的缘故,偶尔也会出现打断不了的情况,不过总体使用感觉还是很流畅。打断、识别的效果也和咪头有比较大的关系,我测试了两种手上有的咪头,效果相差很大。





识别距离测试



再次提醒,识别距离和环境有很大关系,笔者测试的房间外面是狭长的走廊,在走廊里面有比较大的回音,经过芯片的消回音处理,识别的效果就大打折扣了,实际测试时也确实在房间内有很好的识别效果,走廊里面就完全识别不到语音了。




AiPi-PalChatV1语音开发板通过MCP服务实现自动化控制加电





实现途径



有了AiPi-PalChatv1开发板和小智人工智能后台的加持,不实现点实用功能难免浪费,首先肯定是自动化控制家里的电器,通过查阅资料,有多种途径:


1、可以直接对开发板编程,定制自己的固件。

2、可以通过home assistant mcp server,这个需要有home assistant ,并且还要满足一定的要求。

3、自己写MCP server,小智后台提供了MCP接口和示例。


笔者采用第三种方式,把之前厨房的甲烷传感器,卫生间的热水器,空调,灯等能接入的全部实现了自动控制,效果非常惊喜。




硬件优化



ldo.jpg


从之前的测试和最近一段时间的摸索,发现语音识别依赖于采集的声音的质量,笔者把开发板上的两个开关电源芯片禁用了(也可以直接拆掉)。


通过外接的LDO实现的5V和3.3V的供电,实际使用效果良好,同样的咪头,唤醒距离增加一倍,对话流畅。(如下视频)






MCP服务器代码

# -*- coding: utf-8 -*-# 以下代码在2025年6月18日 python3.11环境下运行通过import syssys.path.append('/usr/local/lib/python3.7/dist-packages')import paho.mqtt.client as mqtt#import syssys.path.append('/usr/lib/python3/dist-packages')import requestsimport jsonfrom mcp.server.fastmcp import FastMCPimport logging import socketimport threadingimport timedef connTCP(topic:str,msg:str):    global tcp_client_socket    # 创建socket    tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    # IP 和端口    server_ip = 'bemfa.com'    server_port = 8344    uid = 'UUID'    try:        # 连接服务器        tcp_client_socket.connect((server_ip, server_port))        #发送订阅指令        substr = 'cmd=2&uid=' + uid + '&topic=' + topic + '&msg=' + msg + '\r\n'        tcp_client_socket.send(substr.encode("utf-8"))        time.sleep(2)        tcp_client_socket.shutdown(2)        tcp_client_socket.close()    except:        time.sleep(2)        tcp_client_socket.shutdown(2)        tcp_client_socket.close()        #connTCP()#Kitchen_temperature = ""#Kitchen_CH4 = ""def on_connect(client, userdata, flags, rc):    print("Connected with result code "+str(rc))    client.subscribe("****"# 订阅消息#消息接收def on_message(client, userdata, msg):    global Ktemperature    global KCH4    #print("主题:"+msg.topic+" 消息:"+str(msg.payload.decode('utf-8')))    if msg.topic == topicA:  #判断topic是否是topicA       #print(" topicA msg")       msgstr = str(msg.payload.decode('utf-8'))       Ktemperature = msgstr.split("#")[-2]       KCH4 = msgstr.split("#")[-1]       #print(Kitchen_temperature)       #print(" topicA msg")       #print(Kitchen_CH4)       #if str(msg.payload.decode('utf-8')) == "ON":  #如果接收字符on,亮灯       #   connTCP('topic','ON') #开灯函数       #if str(msg.payload.decode('utf-8')) == "OFF":  #如果接收字亮灯       #   connTCP('topic','OFF')   #关灯函数#订阅成功def on_subscribe(client, userdata, mid, granted_qos):    print("On Subscribed: qos = %d" % granted_qos)# 失去连接def on_disconnect(client, userdata, rc):    if rc != 0:        print("Unexpected disconnection %s" % rc)logger = logging.getLogger('WHcontrol')# Create an MCP servermcp = FastMCP("WHcontrol")  # Add an addition tool@mcp.tool() def get_kitchen_temperature() -> dict:    """用于查询厨房温度,返回温度值,单位摄氏度。"""    global Ktemperature    if float(Ktemperature) > 45:       Ktemperature = f'{float(Ktemperature)-18:.1f}'  #修正温度值,由于电子温度计装的高,不修正,人工智能容易报警    result = Ktemperature    #print(result)    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def get_kitchen_CH4() -> dict:    """用于查询厨房天然气或是甲烷含量,返回天然气或是甲烷PPM值。"""    global KCH4    if int(KCH4) > 440:       KCH4 = f'{int(KCH4)-440}'    result = KCH4    # print(result)    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def get_wh_timer_switch() -> dict:    """用于查询热水器定时器开关的状态,on是打开,off是关闭。"""    headers={ "content-type""application/x-www-form-urlencoded"}    urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'    response = requests.get(url=urlmsg,headers=headers,timeout=5)    if response.status_code != 200:       raise ConnectionError(f'{url} status code is {response.status_code}.')    response = json.loads(response.content)    if 'data' not in response.keys():       raise ValueError(f'{url} miss key msg.')    response = json.dumps(response['data'][0])    response = json.loads(response)    if 'msg' not in response.keys():       raise ValueError(f'{url} miss key msg.')    result = response['msg'].split("#")[-1]    print(result)    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def set_wh_timer_switch(sw:str) -> dict:    """设置热水器定时加热开关状态,on是打开,off是关闭。打开后在每天晚上20点自动开始加热,一个小时后关闭加热开关。"""    if sw == "on":       connTCP("topic","G2on")    if sw == "off":       connTCP("topic","G2off")    result = sw    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def get_wh_switch() -> dict:    """用于查询热水器加热开关的状态,on是打开,off是关闭。"""    headers={ "content-type""application/x-www-form-urlencoded"}    urlmsg = f'https://apis.bemfa.com/va/getmsg?uid=UUID&topic=reshuiqi&type=3&num=1'    response = requests.get(url=urlmsg,headers=headers,timeout=5)    if response.status_code != 200:       raise ConnectionError(f'{url} status code is {response.status_code}.')    response = json.loads(response.content)    if 'data' not in response.keys():       raise ValueError(f'{url} miss key msg.')    response = json.dumps(response['data'][0])    response = json.loads(response)    if 'msg' not in response.keys():       raise ValueError(f'{url} miss key msg.')    result = response['msg'].split("#")[-2]    print(result)    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def set_wh_switch(sw:str) -> dict:    """设置热水器加热开关状态,on是打开热水器加热开关,off是关闭热水器加热开关。"""    if sw == "on":       connTCP("topic","G1on")    if sw == "off":       connTCP("topic","G1off")    result = sw    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def set_light_switch(sw:str) -> dict:    """设置客厅照明灯的开关状态,on是打开照明灯,off是关闭照明灯。"""    if sw == "on":       connTCP("topic","A857A4AC6C"#我是通过433网关来发的控制指令,这个是我的灯的指令,修改成你们自己的    if sw == "off":       connTCP("topic","A857A4AC69")    result = sw    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def set_light_brightness(sw:str) -> dict:    """设置客厅照明灯的亮度,add是增加亮度,fall是减小亮度。"""    if sw == "fall":       connTCP("topic","A857A4AC64")    if sw == "add":       connTCP("topic","A857A4AC68")    result = sw    logger.info(f"result: {result}")    return {"success"True"result": result}@mcp.tool()def set_air_switch(sw:str) -> dict:    """设置卧室空调的开关状态,on是打开空调,off是关闭空调。"""    if sw == "on":       connTCP("topic","H823CB2602002403053D000000007F")  #我是通过红外网关来发的控制指令,这个是我的空调的指令,修改成你们自己的    if sw == "off":       connTCP("topic","H823CB26020020030F3A0000000082")    result = sw    logger.info(f"result: {result}")    return {"success"True"result": result}# Start the serverif __name__ == "__main__":    MQTTHOST = "bemfa.com"    MQTTPORT = 9501    client_id = "UUID"    topicA = "tipoc"    client = mqtt.Client(client_id)    client.username_pw_set("userName""passwd")    client.on_connect = on_connect    client.on_message = on_message    client.on_subscribe = on_subscribe    client.on_disconnect = on_disconnect    client.connect(MQTTHOST, MQTTPORT, 60)    client.loop_start()    time.sleep(3)    mcp.run(transport="stdio")


笔者的设备都是通过bemfa云控制的,具体接入文档可以参照巴法云的说明。


MCP server运行在一个树莓派上,先按照小智MCP示例安装运行环境,然后直接把小智MCP接入点的字符串写到mcp_pipe.py文件中,省的设置环境变量。通过命令启动MCP server :python mcp_pipe.py switch-py.py。


在小智后台 --》配置角色 最下面的MCP接入点中就应该能看到已经接入的MCP服务了。


mcp.jpg




最后代码


里面的一些变量需要自己修改,代码写的比较乱,大家凑合看。

upload 附件:mcp代码.rar



1.gif


微信宣传推广动态二维码(1).gif

AI-Thinker-logo-原版-转曲.png

更多信息请点击:





产品https://docs.ai-thinker.com/


教程https://blog.csdn.net/Boantong_


官网www.ai-thinker.com


社区https://bbs.ai-thinker.com


业务咨询请联系:18022036575



未命名(3) (2).gif

【声明】内容源于网络
0
0
安信可科技
全球领先的联网模组、智能家居等物联网硬件方案提供商。
内容 393
粉丝 0
安信可科技 全球领先的联网模组、智能家居等物联网硬件方案提供商。
总阅读94
粉丝0
内容393