当前位置:   article > 正文

星火大模型 function calling Python 调用思路_星火大模型的function call

星火大模型的function call

讯飞星火大模型虽说开发文档上写着支持 function calling,但是只给了参数构造示例,没有给Python代码,只给了api调用的代码。本以为把两者融合挺容易,没想到坑太多了,连借助gpt都不能很快写好。星火的还是太冷门了,在网上一个可行的例子,哪怕是其他语言的都找不到。
这里按照接口说明“缝合”了起来,但是星火对返回结果的拼接使用实在是找不到方案,所以这块的处理还有待完善。
目前的处理是把函数调用的情况添加到用户的发言中。但是模型经常不考虑调用结果,还是自顾自地回答,输出一堆细节。

[{'role': 'user', 'content': "2024.7.6北京什么天气\n请参考这个函数调用情况回答:函数调用 天气查询 ,输入参数是:{'date': '2024.7.6', 'location': '北京'},返回结果是: 高温,同时有冰雹"}]
  • 1

不过刚才的是Max版本,Pro版本可能会好点,看来模型越蠢,越容易被忽悠。

Max版本: 目前我只能提供昨天的历史天气数据。北京昨天的天气状况为多云转雾,温度范围在22℃~31℃,风向风力为南风微风。
具体天气情况如下:
天气:多云转雾。 - 最高温度:31℃。 - 最低温度:22℃。 - 日出时间:04:51。 - 日落时间:19:46。 - 风向风力:南风微风。 - 相对湿度:52%。 - 空气质量:良。
Pro版本:在2024年7月6日,北京的天气预计会非常炎热,同时可能会有冰雹出现。请注意防暑降温并关注气象预报,做好冰雹防范措施。

代码导入SparkApi的包,重写了 on_message和gen_params方法。


# coding: utf-8
import SparkApi
import time
import os
import json
from dotenv import load_dotenv, find_dotenv
load_dotenv()
appid=os.getenv("SPARKAI_APP_ID")
api_secret=os.getenv("SPARKAI_API_SECRET")
api_key=os.getenv("SPARKAI_API_KEY")

domain = "generalv3.5"      # Max版本
# domain = "generalv3"       # Pro版本
# domain = "general"         # Lite版本

Spark_url = "wss://spark-api.xf-yun.com/v3.5/chat"   # Max服务地址
# Spark_url = "wss://spark-api.xf-yun.com/v3.1/chat"  # Pro服务地址
# Spark_url = "wss://spark-api.xf-yun.com/v1.1/chat"  # Lite服务地址

#初始上下文内容,当前可传system、user、assistant 等角色
text =[
    # {"role": "system", "content": "你现在扮演李白,你豪情万丈,狂放不羁;接下来请用李白的口吻和用户对话。"} , # 设置对话背景或者模型角色
    # {"role": "user", "content": "你是谁"},  # 用户的历史问题
    # {"role": "assistant", "content": "....."} , # AI的历史回答结果
    # # ....... 省略的历史对话
    # {"role": "user", "content": "你会做什么"}  # 最新的一条问题,如无需上下文,可只传最新一条问题
]

def getText(role,content):
    jsoncon = {}
    jsoncon["role"] = role
    jsoncon["content"] = content
    text.append(jsoncon)
    return text

def getlength(text):
    length = 0
    for content in text:
        temp = content["content"]
        leng = len(temp)
        length += leng
    return length

def checklen(text):
    while (getlength(text) > 8000):
        del text[0]
    return text

# 定义示例函数
def add_numbers(a, b):
    return a + b

def get_weather(location, date=None):
    # return f"正在调用示例函数:查询{location}的天气信息,日期: {date}"
    return '高温,同时有冰雹'

# 提取 query 中的参数
def extract_parameters(query):
    function_call = query["payload"]["choices"]["text"][0]["function_call"]
    function_name = function_call.get("name")
    function_args = json.loads(function_call.get("arguments", "{}"))
    return function_name, function_args

#重写on_message以支持function_call
def new_on_message(ws, message):
    print(message)
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        # global sid
        # sid = data["header"]["sid"]
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        function_name, arguments = extract_parameters(data)
        # print('function_name, arguments为',function_name, arguments)

        if function_name:
            if function_name == "add_numbers":
                result = add_numbers(**arguments)
            elif function_name == "天气查询":
                result = get_weather(**arguments)
            else:
                result = "未知的函数调用"

            response_query = f"函数调用 {function_name} ,输入参数是:{arguments},返回结果是: {result}"
            #增加要求大模型查看函数调用情况的提示
            please_use='\n请参考这个函数调用情况回答:'
            text[-1]['content']+=please_use+response_query
            answer = checklen(text)
            SparkApi.answer =""
            print("response_query内容:",response_query)
            #已有函数调用结果时,按照原始方式生成
            SparkApi.on_message=old_on_message
            SparkApi.gen_params=old_gen_params
            SparkApi.main(appid,api_key,api_secret,Spark_url,domain,answer)
            #恢复到准备函数调用状态
            SparkApi.on_message=new_on_message
            SparkApi.gen_params=new_gen_params
        else:
            SparkApi.answer += content
            
        if status == 2:
            print("#### 关闭会话")
            ws.close()


#重写gen_params以支持function_call
def new_gen_params(appid, domain, question):
    print('appid, domain, question为',appid, domain, question)
    """
    通过appid和用户的提问来生成请求参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "random_threshold": 0.5,
                "max_tokens": 2048,
                # "top_k": 5, #另一版本有的
                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                # "text": [
                #     {"role": "user", "content": question}  # 用户的提问 #写成列表就报错
                # ]
                "text": question
            },
            "functions": {
                "text": [
                    {
                        "name": "天气查询",
                        "description": "天气插件可以提供天气相关信息。你可以提供指定的地点信息、指定的时间点或者时间段信息,来精准检索到天气信息。",
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "location": {
                                    "type": "string",
                                    "description": "地点,比如北京。"
                                },
                                "date": {
                                    "type": "string",
                                    "description": "日期。"
                                }
                            },
                            "required": [
                                "location"
                            ]
                        }
                    }
                ]
            }
        }
    }
    return data

#更新函数,并保留原始函数
old_on_message=SparkApi.on_message
SparkApi.on_message=new_on_message
old_gen_params=SparkApi.gen_params
SparkApi.gen_params=new_gen_params

if __name__ == '__main__':
    while(1):
        Input = input("\n" +"我:")
        question = checklen(getText("user",Input))
        SparkApi.answer =""
        print("星火:",end ="")
        SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question)
        # print('完整回答:',SparkApi.answer)
        getText("assistant",SparkApi.answer)

# Input='2024.7.6北京什么天气'
# [{'role': 'user', 'content': '你喜欢简洁,不喜欢多说话。随便给我一个数字'},
#  {'role': 'assistant', 'content': '1'},
#  {'role': 'user', 'content': '他加一是多少'},
#  {'role': 'assistant', 'content': '$1+1$ $=2$'}]

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189

参考:
星火认知大模型Web API文档 | 讯飞开放平台文档中心 https://www.xfyun.cn/doc/spark/Web.html
网页中可下载 Python 调用示例(SparkApi.py),Python 带上下文调用示例

附:
本文所需的SparkApi.py文件

import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
import time
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket  # 使用websocket_client
answer = ""
sid = ''

class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, Spark_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(Spark_url).netloc
        self.path = urlparse(Spark_url).path
        self.Spark_url = Spark_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数,生成url
        url = self.Spark_url + '?' + urlencode(v)
        # print(url)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws,one,two):
    print(" ")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    # print(time.time())
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        global sid
        sid = data["header"]["sid"]
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end ="")
        global answer
        answer += content
        # print(1)
        if status == 2:
            ws.close()


def gen_params(appid, domain,question):
    """
    通过appid和用户的提问来生成请参数
    """
    data = {
        "header": {
            "app_id": appid,
            "uid": "1234"
        },
        "parameter": {

            "chat": {
                "domain": domain,
                "temperature": 0.8,
                "max_tokens": 2048,
                "top_k": 5,

                "auditing": "default"
            }
        },
        "payload": {
            "message": {
                "text": question
            }
        }
    }
    return data


def main(appid, api_key, api_secret, Spark_url,domain, question):
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    websocket.enableTrace(False)
    wsUrl = wsParam.create_url()
    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.question = question
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/898358
推荐阅读
相关标签
  

闽ICP备14008679号