点击蘑菇云创造 关注我们
项目背景
百模大战愈演愈烈,大模型在各个社会场景中释放出的潜力已肉眼可见,不断推动着生产力的变革。技术的迭代升级,让开发门槛进一步降低,为开发者带来了新机遇。
2023科大讯飞全球1024开发者节上,讯飞星火认知大模型V3.0如约而至,七大能力持续提升,整体超越ChatGPT,医疗六大核心能力超越GPT-4。
现在讯飞星火V3.0已向开发者开放调用,多项能力提升的同时,新增FunctionCall 、System指令和大模型定制训练平台:https://www.xfyun.cn/solutions/training,满足开发者更多场景调用需求,带来优质人机交互体验。
讯飞星火FunctionCall:
让大模型实现与外部物理世界的交互,其中的关键就是对外部功能的调用,而其底层就是大模型FunctionCall的支持能力。讯飞星火新推出的FunctionCall 可调用自有函数生成内容,让大模型能更准确理解用户意图并与函数调用相匹配。对于开发者来说,Functioncall 功能的推出不仅使研发成本大大降低,还有助于打造更懂用户的应用。
项目设计
本项目使用行空板结合星火大语言模型,让机器人理解我说的话(话的内容为控制机器人做动作的指令:抬起手、放下手,但话的内容不固定),做出相应的动作。同时配合讯飞语音识别和语音合成功能,用户可以与行空板机器人进行语音对话。
演示视频
Function Call分析
从人机交互上来说, Function Call 本质上只做了一件事,那就是实现了「准确识别用户的语义,将其转为结构化的指令」。而这,非常了不起。
它实现的最大的价值,就是让机器轻易地理解了用户模糊化的输入,将其转换为机器可以理解的技术指令。这对于人机交互的范式来说,完全是质的改变。
官方文档里给出了使用模型进行Function Call 工具调用的流程:https://www.xfyun.cn/doc/spark/Web.html#_2-function-call%E8%AF%B4%E6%98%8E
我:明天我应该穿什么衣服?星火:{"header":{"code":0,"message":"Success","sid":"cht000b8e77@dx18ce887dd74b8f3550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"datetime":"明天","location":"CURRENT_CITY"}","name":"天气查询"},"index":0}]},"usage":{"text":{"question_tokens":6,"prompt_tokens":6,"completion_tokens":0,"total_tokens":6}}}}
假设我们正在开发一个聊天类的AI应用,它需要根据用户输入的问题来生成相应的回答。我们在提问上并没有直接问明天的天气如何,而是问「我明天应该穿什么衣服?」此时 语言完全理解了我的问题,而且知道穿衣建议是需要了解相关天气情况的。那么机器人应该返回一个关于今天天气的字符串。为了实现这个功能,我们可以定义一个名为“天气查询”的函数,它接受两个参数:一个表示日期的字符串、一个表示所在城市的字符串。然后,我们可以将这个函数的描述传递给大模型,让它生成一个包含调用该函数所需参数的JSON对象。最后,当用户输入一个问题时,我们可以让大模型分析问题并确定是否需要调用“天气查询”函数。如果需要,我们可以从JSON对象中提取所需的参数,并将它们传递给"天气查"询函数。这样,我们就可以根据用户输入的问题来生成相应的回答了。
为了让机器理解人类的意图,过去我们想方设法去「约束用户行为」或者「猜测用户意图』。但时代已经开始变了,通过 Function Call, 我们只需要在发送给 GPT 请求时加一个functions的参数,告知 AI 可以调用的外部方法有什么,然后 AI 就能够自动分析问题的上下文,并通过多轮对话来收集必要的调用参数,最后拼合返回调用方法的 JSON。
修改functions
定制了一个"机器人控制"functions,描述为:“机器人控制可以控制机器人的左右手相应动作,你可以让机器人抬起、放下左右手。可以是只控制一只手,也可能同时控制两只手。”
"payload": {"message": {"text": question},"functions": {"text": [{"name": "机器人控制","description": "机器人控制可以控制机器人的左右手相应动作,你可以让机器人抬起、放下左右手。可以是只控制一只手,也可能同时控制两只手","parameters": {"type": "object","properties": {"hand": {"type": "string","description": "控制对象,左手或右手或左右手"},"action": {"type": "string","description": "动作,抬起或放下"}},"required": ["hand","action"]}}]}}
经过对话发现星火对"让两只手都抬起",给出的反馈“hand”为“双手”,此处给星火点赞!
我:让机器人抬起左手星火:{"header":{"code":0,"message":"Success","sid":"cht000bf06b@dx18ce8ce92c5b8f3550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"左手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":4,"prompt_tokens":4,"completion_tokens":0,"total_tokens":4}}}}我:放下右手星火:{"header":{"code":0,"message":"Success","sid":"cht000bf6ba@dx18ce8cee8eab8f2550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"放下","hand":"右手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":2,"prompt_tokens":6,"completion_tokens":0,"total_tokens":6}}}}我:抬起右手星火:{"header":{"code":0,"message":"Success","sid":"cht000b09e3@dx18ce8d1b7e69a4b550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"右手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":2,"prompt_tokens":2,"completion_tokens":0,"total_tokens":2}}}}我:让两只手都抬起星火:{"header":{"code":0,"message":"Success","sid":"cht000b0bea@dx18ce8d202b79a4b550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"双手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":5,"prompt_tokens":7,"completion_tokens":0,"total_tokens":7}}}}
定义行空板控制舵机函数
定义控制舵机函数“RobotControl”,根据用户输入参数的值。"hand"指控制操作的对象:左手、右手、双手。“action”指控制动作:抬起、放下。
def RobotControl(hand,action):if hand=="左手":if action=="抬起":servoL.write_angle(90)else:servoL.write_angle(178)elif hand=="右手":if action=="抬起":servoR.write_angle(90)else:servoR.write_angle(2)elif hand=="双手":if action=="抬起":servoL.write_angle(90)servoR.write_angle(90)else:servoR.write_angle(2)servoL.write_angle(178)
让我兴奋的是,我输入“把右手举起来”,星火也能反馈回来action:抬起。(当然此时,我已看到我的简易机器人,把它的“右手”“抬起”来了。
我:把右手举起来星火:{"header":{"code":0,"message":"Success","sid":"cht000b8d3e@dx18ce96fcae5b8f2550","status":2},"payload":{"choices":{"status":2,"seq":0,"text":[{"content":"","role":"assistant","content_type":"text","function_call":{"arguments":"{"action":"抬起","hand":"右手"}","name":"机器人控制"},"index":0}]},"usage":{"text":{"question_tokens":4,"prompt_tokens":4,"completion_tokens":0,"total_tokens":4}}}}
简易机器人制作

SparkApi模块文件完整代码
import _thread as threadimport base64import datetimeimport hashlibimport hmacimport jsonfrom urllib.parse import urlparseimport sslfrom datetime import datetimefrom time import mktimefrom urllib.parse import urlencodefrom wsgiref.handlers import format_date_timeimport websocket # 使用websocket_clientfrom pinpong.extension.unihiker import *from pinpong.board import Servofrom pinpong.board import Boardfrom pinpong.board import PinBoard().begin()pin1 = Pin(Pin.D23)pin2 = Pin(Pin.D22)servoL = Servo(pin1)#左手舵机servoR = Servo(pin2)#右手舵机#左右手放平servoL.write_angle(178)servoR.write_angle(2)hand=""action=""answer = ""class Ws_Param(object):# 初始化def __init__(self, APPID, APIKey, APISecret, Spark_url):self.APPID = APPIDself.APIKey = APIKeyself.APISecret = APISecretself.host = urlparse(Spark_url).netlocself.path = urlparse(Spark_url).pathself.Spark_url = Spark_url# 生成urldef create_url(self):# 生成RFC1123格式的时间戳now = datetime.now()date = format_date_time(mktime(now.timetuple()))# 拼接字符串signature_origin = "host: " + self.host + "\n"signature_origin += "date: " + date + "\n"signature_origin += "GET " + self.path + " HTTP/1.1"# 进行hmac-sha256进行加密signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),digestmod=hashlib.sha256).digest()signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')# 将请求的鉴权参数组合为字典v = {"authorization": authorization,"date": date,"host": self.host}# 拼接鉴权参数,生成urlurl = self.Spark_url + '?' + urlencode(v)# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致return url# 收到websocket错误的处理def on_error(ws, error):print("### error:", error)# 收到websocket关闭的处理def on_close(ws,one,two):print(" ")# 收到websocket连接建立的处理def on_open(ws):thread.start_new_thread(run, (ws,))def run(ws, *args):data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))ws.send(data)# 收到websocket消息的处理def on_message(ws, message):global hand,action#print(message)data = json.loads(message)code = data['header']['code']if code != 0:print(f'请求错误: {code}, {data}')ws.close()else:choices = data["payload"]["choices"]status = choices["status"]content = choices["text"][0]["content"]print(content,end ="")global answeranswer += content# print(1)if choices["text"][0]["function_call"] is not None:function_call=choices["text"][0]["function_call"]if function_call["name"]=="机器人控制":arguments=json.loads(function_call["arguments"])action=arguments["action"]hand=arguments["hand"]RobotControl(hand,action)if status == 2:ws.close()def gen_params(appid, domain,question):"""通过appid和用户的提问来生成请参数"""data = {"header": {"app_id": appid,"uid": "1234"},"parameter": {"chat": {"domain": domain,"temperature": 0.5,"max_tokens": 2048}},"payload": {"message": {"text": question},"functions": {"text": [{"name": "机器人控制","description": "“机器人控制”可以控制机器人的左右手相应动作,你可以使用动作指令让机器人抬起、放下左右手。动作指令只包括抬起和放下。可以是只控制一只手,也可能同时控制双手","parameters": {"type": "object","properties": {"hand": {"type": "string","description": "控制对象,左手或右手或左右手"},"action": {"type": "string","description": "动作,抬起或放下"}},"required": ["hand","action"]}}]}}}return datadef RobotControl(hand,action):if hand=="左手":if action=="抬起":servoL.write_angle(90)else:servoL.write_angle(178)elif hand=="右手":if action=="抬起":servoR.write_angle(90)else:servoR.write_angle(2)elif hand=="双手":if action=="抬起":servoL.write_angle(90)servoR.write_angle(90)else:servoR.write_angle(2)servoL.write_angle(178)def main(appid, api_key, api_secret, Spark_url,domain, question):# print("星火:")wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)websocket.enableTrace(False)wsUrl = wsParam.create_url()ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)ws.appid = appidws.question = questionws.domain = domainws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
使用语音交互
1.语音输入
(1)音量监听#主程序
import listening
音量监听listening.py模块文件代码
import pyaudio,waveimport numpy as npdef listen():temp = 20CHUNK = 1024FORMAT = pyaudio.paInt16CHANNELS = 1RATE = 16000RECORD_SECONDS = 2WAVE_OUTPUT_FILENAME = 'record.wav'mindb=1500 #最小声音,大于则开始录音,否则结束delayTime=1.3 #小声1.3秒后自动终止p = pyaudio.PyAudio()stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)#snowboydecoder.play_audio_file()print("开始!计时")frames = []flag = False # 开始录音节点stat = True #判断是否继续录音stat2 = False #判断声音小了tempnum = 0 #tempnum、tempnum2、tempnum3为时间tempnum2 = 0while stat:data = stream.read(CHUNK,exception_on_overflow = False)audio_data = np.frombuffer(data, dtype=np.short)temp = np.max(audio_data)if temp > mindb and flag==False:flag =Trueprint("开始录音")tempnum2=tempnumif flag:frames.append(data)if(temp < mindb and stat2==False):stat2 = Truetempnum2 = tempnumprint("声音小,且之前是是大的或刚开始,记录当前点")if(temp > mindb):stat2 =Falsetempnum2 = tempnum#刷新if(tempnum > tempnum2 + delayTime*15 and stat2==True):print("间隔%.2lfs后开始检测是否还是小声"%delayTime)if(stat2 and temp < mindb):stat = False#还是小声,则stat=Trueprint("小声!")else:stat2 = Falseprint("大声!")print(str(temp) + " " + str(tempnum))tempnum = tempnum + 1if tempnum > 3600:tempnum=0 #超时直接退出#stat = Falseprint("录音结束")stream.stop_stream()stream.close()p.terminate()wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')wf.setnchannels(CHANNELS)wf.setsampwidth(p.get_sample_size(FORMAT))wf.setframerate(RATE)wf.writeframes(b''.join(frames))wf.close()
(2)讯飞语音识别
主程序 导入讯飞语音识别库
import xunfeiasrxunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)
将主程序文字输入修改为讯飞语音识别文本输入。
修改:
Input = input("\n" +"我:")
为:
listening.listen()ShiBieNaRong=xunfeiasr.xunfeiasr(r"record.wav")
经测试星火有时会反馈其它action,所以在描述description中增加对动作指令的限制。
"description": "“机器人控制”可以控制机器人的左右手相应动作,你可以使用动作指令让机器人抬起、放下左右手。动作指令只包括抬起和放下。可以是只控制一只手,也可能同时控制双手",
使用语音输入如:把右手向上,也能实现机器人抬起右手。nice!
主程序代码:
import SparkApiimport listening#以下密钥信息从控制台获取appid = "*********" #填写控制台中获取的 APPID 信息api_secret = "***********" #填写控制台中获取的 APISecret 信息api_key ="**************" #填写控制台中获取的 APIKey 信息import xunfeiasrxunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)#用于配置大模型版本,默认“general/generalv2”#domain = "general" # v1.5版本# domain = "generalv2" # v2.0版本domain = "generalv3" # v3.0版本#云端环境的服务地址#Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"text =[]# length = 0def getText(role,content):jsoncon = {}jsoncon["role"] = rolejsoncon["content"] = contenttext.append(jsoncon)return textdef getlength(text):length = 0for content in text:temp = content["content"]leng = len(temp)length += lengreturn lengthdef checklen(text):while (getlength(text) > 8000):del text[0]return textif __name__ == '__main__':text.clearwhile(1):listening.listen()Input=xunfeiasr.xunfeiasr(r"record.wav")question = checklen(getText("user",Input))SparkApi.answer =""print("星火:",end = "")SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question)getText("assistant",SparkApi.answer)# print(str(text))
2.行空板屏幕表情
from unihiker import GUIu_gui=GUI()表情=u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=0.2)
3.测试视频:
4.机器人语音回答
(1)设置行空板蓝牙连接蓝牙音箱。
在行空板终端窗口下使用bluetoothctl配置蓝牙连接蓝牙音箱。

(2)讯飞语音合成
初始化
from df_xfyun_speech import XfTtsfrom unihiker import Audiou_audio = Audio()options = {}business_args = {"aue":"raw","vcn":"xiaoyan","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}options["business_args"] = business_argstts = XfTts(appid, api_key, api_secret, options)
主程序循环体while(1)中增加,当星火有回复时:
if SparkApi.answer!="":tts.synthesis(answer+"……", "speech.wav")u_audio.play("speech.wav")
当命中机器人控制时:
if SparkApi.hand!="":tts.synthesis("好的"+SparkApi.hand+SparkApi.action+"了", "speech.wav")u_audio.play("speech.wav")
主程序完整代码
import SparkApiimport listeningimport time#以下密钥信息从控制台获取appid = "*******************" #填写控制台中获取的 APPID 信息api_secret = "***************************" #填写控制台中获取的 APISecret 信息api_key ="*******************************" #填写控制台中获取的 APIKey 信息import xunfeiasrxunfeiasr.xunfeiasr_set(APPID=appid,APISecret=api_secret,APIKey=api_key)from df_xfyun_speech import XfTtsfrom unihiker import Audiou_audio = Audio()options = {}business_args = {"aue":"raw","vcn":"xiaoyan","tte":"utf8","speed":50,"volume":50,"pitch":50,"bgs":0}options["business_args"] = business_argsglobal ttstts = XfTts(appid, api_key, api_secret, options)#用于配置大模型版本,默认“general/generalv2”#domain = "general" # v1.5版本# domain = "generalv2" # v2.0版本domain = "generalv3" # v3.0版本#云端环境的服务地址#Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat"text =[]from unihiker import GUIu_gui=GUI()表情=u_gui.draw_emoji(emoji="Peace",x=0,y=0,duration=0.2)# length = 0def getText(role,content):jsoncon = {}jsoncon["role"] = rolejsoncon["content"] = contenttext.append(jsoncon)return textdef getlength(text):length = 0for content in text:temp = content["content"]leng = len(temp)length += lengreturn lengthdef checklen(text):while (getlength(text) > 8000):del text[0]return textif __name__ == '__main__':text.clearwhile(1):listening.listen()表情.config(emoji="Wink")Input=xunfeiasr.xunfeiasr(r"record.wav")print(Input)question = checklen(getText("user",Input))SparkApi.answer =""SparkApi.hand=""print("星火:",end = "")SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question)if SparkApi.answer!="":tts.synthesis(SparkApi.answer+"……", "speech.wav")u_audio.play("speech.wav")time.sleep(3)if SparkApi.hand!="":tts.synthesis("好的"+SparkApi.hand+SparkApi.action+"了", "speech.wav")u_audio.play("speech.wav")getText("assistant",SparkApi.answer)表情.config(emoji="Peace")# print(str(text))
【注:System指令】
System指令可对模型在多轮对话中的角色信息进行快捷约束,避免模型定制,即要求模型按照约定的方式进行后续响应。相比prompt约束,可以解决prompt拼接不连贯、不支持多轮的问题。
往期推荐

