当前位置:   article > 正文

Python代码调用扣子平台大模型,结合wxauto优秀开源项目实现微信自动回复好友消息_大模型api调用使用流式的请求,使用python怎么截取里面的回答内容

大模型api调用使用流式的请求,使用python怎么截取里面的回答内容

最近看到微信自动化回复,于是想调用大模型,自动回复好友消息。以下文章将对代码流程进行详细解释,文章末尾附源码

1.在扣子平台创建发布一个大模型智能问答助手,获取API-key等。在扣子平台有详细文档。

2.wxauto安装。pip install wxauto

项目地址是​​​​​​cluic/wxauto: Windows版本微信客户端(非网页版)自动化,可实现简单的发送、接收微信消息,简单微信机器人 (github.com)https://github.com/cluic/wxauto?tab=readme-ov-fileicon-default.png?t=N7T8https://github.com/cluic/wxauto?tab=readme-ov-file

作者有详细文档介绍。接收消息以及发送消息。

3.代码基本原理:利用wxauto接收到好友消息,以网络请求方式发送到扣子,调用大模型,获取回答,解析回答内容,利用wxauto发送至好友,以此实现自动回复流程。

4.代码如下,很简短。token , bot_id需要自己从扣子平台获取。不会的可以留言。

import requests

import json

import re

import uuid

from wxauto import WeChat

# 使用Coze API获取AI生成的回答

def coze_chat(query):

    print(f"发送到Coze的消息内容: {query}")  # 调试信息

    url = 'https://api.coze.cn/open_api/v2/chat'

    token = '扣子平台获取'

    bot_id = "7380746........"

    stream = False  # True: 流式返回  False: 非流式返回

    headers = {

        'Authorization': f'Bearer {token}',

        'Content-Type': 'application/json',

        'Accept': '*/*',

        'Host': 'api.coze.cn',

        'Connection': 'keep-alive'

    }

    data = {

        "conversation_id": "123",

        "bot_id": bot_id,

        "user": str(uuid.uuid4()),

        "query": query,

        "user_photo": None,

        "stream": stream,

    }

    response = requests.post(url, headers=headers, data=json.dumps(data))

    response = str(response.content, encoding='utf-8')

    if stream:

        answers = extract_answers(response)

    else:

        answers = extract_answers_non_stream(response)

    #print(f"从Coze接收到的回复: {answers}")  

    return answers

# 流式返回提取模型回答内容

def extract_answers(response):

    lines = response.split('\n')

    answers_parts = []

    content_re = re.compile(r'"type":"answer".*?"content":"(.*?)"')

    for line in lines:

        if line.startswith('data:'):

            match = content_re.search(line)

            if match:

                answers_parts.append(match.group(1))

    full_answer = ''.join(answers_parts)

    return full_answer

# 非流式返回提取模型回答内容

def extract_answers_non_stream(response):

    data = json.loads(response)

    answers_parts = []

    for message in data['messages']:

        if message['type'] == 'answer':

            answers_parts.append(message['content'])

    full_answer = ''.join(answers_parts)

    return full_answer


 

# 主函数,自动回复

def main(wx, msg_content):

    print(f"收到的微信消息: {msg_content}")  

    response_msg = coze_chat(msg_content)

   

    #发送给备注名是一的

    wx.SendMsg(msg=response_msg + "--此内容为AI生成", who="一")

    print(f"发送给一的消息: {response_msg}")  



 

if __name__ == '__main__':

    wx = WeChat()

    while True:

        msgs = wx.GetAllMessage()

        #print(f"收到的消息列表: {msgs}")  

        if msgs:

            last_msg = msgs[-1]

            # print(f"最新一条消息: {last_msg}")  

            # print(f"消息备注名: {last_msg.sender_remark}")  # 打印备注名

            # print(f"消息类型: {last_msg.type}")  # 打印消息类型

           

            # 判断是否是好友消息,并且备注名是“一”

            if last_msg.type == "friend" and last_msg.sender_remark == "一":

                print("符合条件的消息,准备处理...")

                main(wx, last_msg.content)

           

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号