大数跨境
0
0

用大语言模型控制行空板机器人

用大语言模型控制行空板机器人 蘑菇云创造
2024-01-17
2
导读:“举起手来”

点击蘑菇云创造 关注我们


项目背景

百模大战愈演愈烈,大模型在各个社会场景中释放出的潜力已肉眼可见,不断推动着生产力的变革。技术的迭代升级,让开发门槛进一步降低,为开发者带来了新机遇。


2023科大讯飞全球1024开发者节上,讯飞星火认知大模型V3.0如约而至,七大能力持续提升,整体超越ChatGPT,医疗六大核心能力超越GPT-4。

现在讯飞星火V3.0已向开发者开放调用,多项能力提升的同时,新增FunctionCall 、System指令和大模型定制训练平台:https://www.xfyun.cn/solutions/training,满足开发者更多场景调用需求,带来优质人机交互体验。

讯飞星火FunctionCall:
让大模型实现与外部物理世界的交互,其中的关键就是对外部功能的调用,而其底层就是大模型FunctionCall的支持能力。讯飞星火新推出的FunctionCall 可调用自有函数生成内容,让大模型能更准确理解用户意图并与函数调用相匹配。对于开发者来说,Functioncall 功能的推出不仅使研发成本大大降低,还有助于打造更懂用户的应用。


项目设计

本项目使用行空板结合星火大语言模型,让机器人理解我说的话(话的内容为控制机器人做动作的指令:抬起手、放下手,但话的内容不固定),做出相应的动作。同时配合讯飞语音识别和语音合成功能,用户可以与行空板机器人进行语音对话。


演示视频


Function Call分析

从人机交互上来说, Function Call 本质上只做了一件事,那就是实现了「准确识别用户的语义,将其转为结构化的指令」。而这,非常了不起。


它实现的最大的价值,就是让机器轻易地理解了用户模糊化的输入,将其转换为机器可以理解的技术指令。这对于人机交互的范式来说,完全是质的改变。


官方文档里给出了使用模型进行Function Call 工具调用的流程:https://www.xfyun.cn/doc/spark/Web.html#_2-function-call%E8%AF%B4%E6%98%8E


我:明天我应该穿什么衣服?星火:{"header":{"code":0,"message":"Success","sid":"cht000b8e77@dx18ce887dd74b8f3550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"datetime":"明天","location":"CURRENT_CITY"}","name":"天气查询"},"index":0}]},"usage":{"text":{"question_tokens":6,"prompt_tokens":6,"completion_tokens":0,"total_tokens":6}}}}


假设我们正在开发一个聊天类的AI应用,它需要根据用户输入的问题来生成相应的回答。我们在提问上并没有直接问明天的天气如何,而是问「我明天应该穿什么衣服?」此时 语言完全理解了我的问题,而且知道穿衣建议是需要了解相关天气情况的。那么机器人应该返回一个关于今天天气的字符串。为了实现这个功能,我们可以定义一个名为“天气查询”的函数,它接受两个参数:一个表示日期的字符串、一个表示所在城市的字符串。然后,我们可以将这个函数的描述传递给大模型,让它生成一个包含调用该函数所需参数的JSON对象。最后,当用户输入一个问题时,我们可以让大模型分析问题并确定是否需要调用“天气查询”函数。如果需要,我们可以从JSON对象中提取所需的参数,并将它们传递给"天气查"询函数。这样,我们就可以根据用户输入的问题来生成相应的回答了。


为了让机器理解人类的意图,过去我们想方设法去「约束用户行为」或者「猜测用户意图』。但时代已经开始变了,通过 Function Call, 我们只需要在发送给 GPT 请求时加一个functions的参数,告知 AI 可以调用的外部方法有什么,然后 AI 就能够自动分析问题的上下文,并通过多轮对话来收集必要的调用参数,最后拼合返回调用方法的 JSON。


修改functions

定制了一个"机器人控制"functions,描述为:“机器人控制可以控制机器人的左右手相应动作,你可以让机器人抬起、放下左右手。可以是只控制一只手,也可能同时控制两只手。”

       "payload": {            "message": {                "text": question            },            "functions": {                "text": [                    {                        "name": "机器人控制",                        "description": "机器人控制可以控制机器人的左右手相应动作,你可以让机器人抬起、放下左右手。可以是只控制一只手,也可能同时控制两只手",                        "parameters": {                            "type": "object",                            "properties": {                                "hand": {                                    "type": "string",                                    "description": "控制对象,左手或右手或左右手"                                },                                "action": {                                    "type": "string",                                    "description": "动作,抬起或放下"                                }                            },                            "required": [                                "hand","action"                            ]                        }                    }                ]            }            }


经过对话发现星火对"让两只手都抬起",给出的反馈“hand”为“双手”,此处给星火点赞!


我:让机器人抬起左手星火:{"header":{"code":0,"message":"Success","sid":"cht000bf06b@dx18ce8ce92c5b8f3550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"左手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":4,"prompt_tokens":4,"completion_tokens":0,"total_tokens":4}}}}我:放下右手星火:{"header":{"code":0,"message":"Success","sid":"cht000bf6ba@dx18ce8cee8eab8f2550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"放下","hand":"右手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":2,"prompt_tokens":6,"completion_tokens":0,"total_tokens":6}}}}
我:抬起右手星火:{"header":{"code":0,"message":"Success","sid":"cht000b09e3@dx18ce8d1b7e69a4b550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"右手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":2,"prompt_tokens":2,"completion_tokens":0,"total_tokens":2}}}} 我:让两只手都抬起星火:{"header":{"code":0,"message":"Success","sid":"cht000b0bea@dx18ce8d202b79a4b550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"双手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":5,"prompt_tokens":7,"completion_tokens":0,"total_tokens":7}}}}


定义行空板控制舵机函数

定义控制舵机函数“RobotControl”,根据用户输入参数的值。"hand"指控制操作的对象:左手、右手、双手。“action”指控制动作:抬起、放下。


def RobotControl(hand,action): if hand=="左手": if action=="抬起": servoL.write_angle(90) else: servoL.write_angle(178) elif hand=="右手": if action=="抬起": servoR.write_angle(90) else: servoR.write_angle(2) elif hand=="双手": if action=="抬起": servoL.write_angle(90) servoR.write_angle(90) else: servoR.write_angle(2) servoL.write_angle(178)


让我兴奋的是,我输入“把右手举起来”,星火也能反馈回来action:抬起。(当然此时,我已看到我的简易机器人,把它的“右手”“抬起”来了。

我:把右手举起来星火:{"header":{"code":0,"message":"Success","sid":"cht000b8d3e@dx18ce96fcae5b8f2550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"右手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":4,"prompt_tokens":4,"completion_tokens":0,"total_tokens":4}}}}


简易机器人制作



SparkApi模块文件完整代码



import _thread as threadimport base64import datetimeimport hashlibimport hmacimport jsonfrom urllib.parse import urlparseimport sslfrom datetime import datetimefrom time import mktimefrom urllib.parse import urlencodefrom wsgiref.handlers import format_date_time
import websocket # 使用websocket_clientfrom pinpong.extension.unihiker import *from pinpong.board import Servofrom pinpong.board import Boardfrom pinpong.board import PinBoard().begin()pin1 = Pin(Pin.D23)pin2 = Pin(Pin.D22)servoL = Servo(pin1)#左手舵机servoR = Servo(pin2)#右手舵机#左右手放平servoL.write_angle(178)servoR.write_angle(2)hand=""action=""
answer = ""
class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, Spark_url): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.host = urlparse(Spark_url).netloc self.path = urlparse(Spark_url).path self.Spark_url = Spark_url
# 生成url def create_url(self): # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple()))
# 拼接字符串 signature_origin = "host: " + self.host + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + self.path + " HTTP/1.1"
# 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": self.host } # 拼接鉴权参数,生成url url = self.Spark_url + '?' + urlencode(v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 return url

# 收到websocket错误的处理def on_error(ws, error): print("### error:", error)

# 收到websocket关闭的处理def on_close(ws,one,two): print(" ")

# 收到websocket连接建立的处理def on_open(ws): thread.start_new_thread(run, (ws,))

def run(ws, *args): data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question)) ws.send(data)

# 收到websocket消息的处理def on_message(ws, message): global hand,action #print(message) data = json.loads(message) code = data['header']['code'] if code != 0: print(f'请求错误: {code}, {data}') ws.close() else: choices = data["payload"]["choices"] status = choices["status"] content = choices["text"][0]["content"] print(content,end ="") global answer answer += content # print(1) if choices["text"][0]["function_call"] is not None: function_call=choices["text"][0]["function_call"] if function_call["name"]=="机器人控制": arguments=json.loads(function_call["arguments"]) action=arguments["action"] hand=arguments["hand"]
RobotControl(hand,action) if status == 2: ws.close()

def gen_params(appid, domain,question): """ 通过appid和用户的提问来生成请参数 """ data = { "header": { "app_id": appid, "uid": "1234" }, "parameter": { "chat": { "domain": domain, "temperature": 0.5, "max_tokens": 2048 } }, "payload": { "message": { "text": question }, "functions": { "text": [ { "name": "机器人控制", "description": "“机器人控制”可以控制机器人的左右手相应动作,你可以使用动作指令让机器人抬起、放下左右手。动作指令只包括抬起和放下。可以是只控制一只手,也可能同时控制双手", "parameters": { "type": "object", "properties": { "hand": { "type": "string", "description": "控制对象,左手或右手或左右手" }, "action": { "type": "string", "description": "动作,抬起或放下" } }, "required": [ "hand","action" ] } } ] } } } return data
def RobotControl(hand,action): if hand=="左手": if action=="抬起": servoL.write_angle(90) else: servoL.write_angle(178) elif hand=="右手": if action=="抬起": servoR.write_angle(90) else: servoR.write_angle(2) elif hand=="双手": if action=="抬起": servoL.write_angle(90) servoR.write_angle(90) else: servoR.write_angle(2) servoL.write_angle(178)def main(appid, api_key, api_secret, Spark_url,domain, question): # print("星火:") wsParam = Ws_Param(appid, api_key, api_secret, Spark_url) websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open) ws.appid = appid ws.question = question ws.domain = domain ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})




使用语音交互


1.语音输入
(1)音量监听#主程序

import listening


音量监听listening.py模块文件代码

import pyaudio,waveimport numpy as npdef listen():
temp = 20 CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 RECORD_SECONDS = 2 WAVE_OUTPUT_FILENAME = 'record.wav'
mindb=1500 #最小声音,大于则开始录音,否则结束 delayTime=1.3 #小声1.3秒后自动终止 p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) #snowboydecoder.play_audio_file() print("开始!计时")
frames = [] flag = False # 开始录音节点 stat = True #判断是否继续录音 stat2 = False #判断声音小了
tempnum = 0 #tempnum、tempnum2、tempnum3为时间 tempnum2 = 0
while stat: data = stream.read(CHUNK,exception_on_overflow = False) audio_data = np.frombuffer(data, dtype=np.short) temp = np.max(audio_data) if temp > mindb and flag==False: flag =True print("开始录音") tempnum2=tempnum
if flag: frames.append(data) if(temp < mindb and stat2==False): stat2 = True tempnum2 = tempnum print("声音小,且之前是是大的或刚开始,记录当前点") if(temp > mindb): stat2 =False tempnum2 = tempnum #刷新
if(tempnum > tempnum2 + delayTime*15 and stat2==True): print("间隔%.2lfs后开始检测是否还是小声"%delayTime) if(stat2 and temp < mindb): stat = False #还是小声,则stat=True print("小声!") else: stat2 = False print("大声!")

print(str(temp) + " " + str(tempnum)) tempnum = tempnum + 1 if tempnum > 3600: tempnum=0 #超时直接退出 #stat = False print("录音结束")
stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close()


(2)讯飞语音识别
主程序 导入讯飞语音识别库

import xunfeiasrxunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)


将主程序文字输入修改为讯飞语音识别文本输入。
修改:

Input = input("\n" +"我:")


为:

listening.listen()ShiBieNaRong=xunfeiasr.xunfeiasr(r"record.wav")


经测试星火有时会反馈其它action,所以在描述description中增加对动作指令的限制。

 "description": "“机器人控制”可以控制机器人的左右手相应动作,你可以使用动作指令让机器人抬起、放下左右手。动作指令只包括抬起和放下。可以是只控制一只手,也可能同时控制双手",


使用语音输入如:把右手向上,也能实现机器人抬起右手。nice!

主程序代码:


import SparkApiimport listening#以下密钥信息从控制台获取appid = "*********" #填写控制台中获取的 APPID 信息api_secret = "***********" #填写控制台中获取的 APISecret 信息api_key ="**************" #填写控制台中获取的 APIKey 信息import xunfeiasrxunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)#用于配置大模型版本,默认“general/generalv2”#domain = "general" # v1.5版本# domain = "generalv2" # v2.0版本domain = "generalv3" # v3.0版本#云端环境的服务地址#Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"
text =[]
# length = 0
def getText(role,content): jsoncon = {} jsoncon["role"] = role jsoncon["content"] = content text.append(jsoncon) return text
def getlength(text): length = 0 for content in text: temp = content["content"] leng = len(temp) length += leng return length
def checklen(text): while (getlength(text) > 8000): del text[0] return text

if __name__ == '__main__': text.clear while(1): listening.listen() Input=xunfeiasr.xunfeiasr(r"record.wav") question = checklen(getText("user",Input)) SparkApi.answer ="" print("星火:",end = "") SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question) getText("assistant",SparkApi.answer) # print(str(text))



2.行空板屏幕表情


from unihiker import GUIu_gui=GUI()表情=u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=0.2)


3.测试视频:



4.机器人语音回答
(1)设置行空板蓝牙连接蓝牙音箱。

在行空板终端窗口下使用bluetoothctl配置蓝牙连接蓝牙音箱。



(2)讯飞语音合成
初始化

from df_xfyun_speech import XfTtsfrom unihiker import Audiou_audio = Audio()options = {}business_args = {"aue":"raw","vcn":"xiaoyan","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}options["business_args"] = business_argstts = XfTts(appid, api_key, api_secret, options)


主程序循环体while(1)中增加,当星火有回复时:

       if SparkApi.answer!="":             tts.synthesis(answer+"……", "speech.wav")             u_audio.play("speech.wav")


当命中机器人控制时:


if SparkApi.hand!="": tts.synthesis("好的"+SparkApi.hand+SparkApi.action+"了", "speech.wav") u_audio.play("speech.wav")


主程序完整代码


import SparkApiimport listeningimport time#以下密钥信息从控制台获取appid = "*******************" #填写控制台中获取的 APPID 信息api_secret = "***************************" #填写控制台中获取的 APISecret 信息api_key ="*******************************" #填写控制台中获取的 APIKey 信息import xunfeiasrxunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)from df_xfyun_speech import XfTtsfrom unihiker import Audiou_audio = Audio()options = {}business_args = {"aue":"raw","vcn":"xiaoyan","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}options["business_args"] = business_argsglobal ttstts = XfTts(appid, api_key, api_secret, options)
#用于配置大模型版本,默认“general/generalv2”#domain = "general" # v1.5版本# domain = "generalv2" # v2.0版本domain = "generalv3" # v3.0版本#云端环境的服务地址#Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"
text =[]from unihiker import GUIu_gui=GUI()表情=u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=0.2)
# length = 0
def getText(role,content): jsoncon = {} jsoncon["role"] = role jsoncon["content"] = content text.append(jsoncon) return text
def getlength(text): length = 0 for content in text: temp = content["content"] leng = len(temp) length += leng return length
def checklen(text): while (getlength(text) > 8000): del text[0] return text

if __name__ == '__main__': text.clear while(1): listening.listen() 表情.config(emoji="Wink") Input=xunfeiasr.xunfeiasr(r"record.wav") print(Input) question = checklen(getText("user",Input)) SparkApi.answer ="" SparkApi.hand="" print("星火:",end = "") SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question) if SparkApi.answer!="": tts.synthesis(SparkApi.answer+"……", "speech.wav") u_audio.play("speech.wav") time.sleep(3) if SparkApi.hand!="": tts.synthesis("好的"+SparkApi.hand+SparkApi.action+"了", "speech.wav") u_audio.play("speech.wav") getText("assistant",SparkApi.answer) 表情.config(emoji="Peace") # print(str(text))



【注:System指令】

System指令可对模型在多轮对话中的角色信息进行快捷约束,避免模型定制,即要求模型按照约定的方式进行后续响应。相比prompt约束,可以解决prompt拼接不连贯、不支持多轮的问题。







往期推荐

[行空板+大模型]智能家居助手——GPT3.5 function calling控制硬件

使用 ChatGPT API 和 Azure Speech API 在 行空板单板计算机上构建 AI 助手

物联网农业土壤养分和气象站监测系统

【比赛优秀案例】物联网室内甲醛监控和智能遥控窗户通风系统

【新课标】信息科技跨学科案例-八年级 校园气象站系统

【比赛优秀案例】智能植物育苗养护数据采集系统

【比赛优秀案例】低碳生活-传统酿醋工艺产生的二氧化碳的监测和利用

【比赛优秀案例】闯关游戏-魔镜传奇 勇敢救小猪

【比赛优秀案例】智慧校园“开心农场”物联网可视化数据平台

【比赛优秀案例】气象及大气质量监测可视化平台

无线行空板, 远程控制人脸追踪的麦轮小车

用行空板自制桌面盖革计数器—实时核辐射监测

【声明】内容源于网络
0
0
蘑菇云创造
蘑菇云是DFRobot旗下专注于AI人工智能、创客、STEAM、劳动教育的科技创新教育品牌;以为中国培养下一代科技创新人才为使命,为学校提供k12全龄段科技创新教育解决方案。
内容 969
粉丝 0
蘑菇云创造 蘑菇云是DFRobot旗下专注于AI人工智能、创客、STEAM、劳动教育的科技创新教育品牌;以为中国培养下一代科技创新人才为使命,为学校提供k12全龄段科技创新教育解决方案。
总阅读1.1k
粉丝0
内容969