当前位置:   article > 正文

微信接入讯飞星火(Windows)_nextchat 接入讯飞星火

nextchat 接入讯飞星火

展示

微信与星火

准备

  1. 讯飞账号
  2. uiautomation模块

流程

        使用uiautomation获取聊天消息(文本),将聊天消息传给星火大模型,得到大模型的输出,最后将大模型的输出作为微信聊天的输入即可

部署

一、下载api

1.进入星火官网(注册)登陆账号,进入以下页面

完成个人认证后可拥有10000的免费token

进入Web文档

我习惯使用Python,点击Python调用示例,会自动下载一个压缩包

解压后得到两个文件

二、写自动化脚本

创建一个py文件,我给其命名为Ai_chat.py

首先定位到微信窗口

  1. import uiautomation as auto
  2. wx = auto.WindowControl(Name="微信", ClassName='WeChatMainWndForPC')
  3. wx.SwitchToThisWindow()

遍历每一个会话名,是否有新消息在其中,有新消息则点击

  1. for session in sessions_c.GetChildren():
  2. if '条新消息' in session.Name:
  3. current_contact = wx.GetLastChildControl().GetFirstChildControl().GetLastChildControl().TextControl().Name
  4. print('cur:' + current_contact)
  5. contact = session.GetChildren()[0].GetChildren()[1].GetChildren()[0].GetChildren()[0].Name
  6. print('cont' + contact)
  7. # 重复点击会被微信理解为双击动作,双击会弹出到一个新窗口
  8. # 只有当新消息会话和当前会话不同时才点击切换
  9. if current_contact != contact:
  10. session.Click()

获取当前会话里的最后一个文本

  1. messages = wx.ListControl(Name='消息')
  2. message = messages.GetChildren()[-1]

导入上面下载好的SparkApi,并将test.py中定义的函数及前面的密钥信息等,放入我们刚刚写的Ai_chat.py里

  1. # 以下密钥信息从控制台获取
  2. appid = ""
  3. api_secret = ""
  4. api_key = ""
  5. domain = "generalv3.5" # v3.0版本
  6. Spark_url = "wss://spark-api.xf-yun.com/v3.5/chat" # v3.5环境的地址

环境的填写官网有说的

将question传给大模型,大模型生成的回复会以str格式保存到SparkApi.answer中

  1. setting = '用幽默高情商风格回复字数不超15:'
  2. question = checklen(getText("user", setting + Input))
  3. SparkApi.answer = ""
  4. SparkApi.main(appid, api_key, api_secret, Spark_url, domain, question)

 最后把回复填到会话中,模拟按下Enter键

  1. # 切换会话时会自动进入编辑,所以可以直接用刚开始定义的wx句柄
  2. wx.SendKeys(SparkApi.answer, waitTime=2)
  3. wx.SendKeys('{Enter}', waitTime=1)

完整代码

  1. import uiautomation as auto
  2. import re
  3. import SparkApi
  4. # 以下密钥信息从控制台获取
  5. appid = "" # 填写控制台中获取的 APPID 信息
  6. api_secret = "" # 填写控制台中获取的 APISecret 信息
  7. api_key = "" # 填写控制台中获取的 APIKey 信息
  8. domain = "generalv3.5" # v3.0版本
  9. Spark_url = "wss://spark-api.xf-yun.com/v3.5/chat" # v3.5环境的地址
  10. text = []
  11. def getText(role, content):
  12. jsoncon = {}
  13. jsoncon["role"] = role
  14. jsoncon["content"] = content
  15. text.append(jsoncon)
  16. return text
  17. def getlength(text):
  18. length = 0
  19. for content in text:
  20. temp = content["content"]
  21. leng = len(temp)
  22. length += leng
  23. return length
  24. def checklen(text):
  25. while getlength(text) > 8000:
  26. del text[0]
  27. return text
  28. wx = auto.WindowControl(Name="微信", ClassName='WeChatMainWndForPC')
  29. wx.SwitchToThisWindow()
  30. # 消息类型过滤
  31. filter_type = ['[视频]', '[动画表情]', '[图片]', '[链接]', '发出红包,请在手机上查看']
  32. filter_meaningless = ['嗯', '嗯嗯', '好的', '好', '哦', '噢噢', 'ok', 'OK', '欧克']
  33. filter_time = '\d{4}年\d{1,2}月\d{1,2}日 \d{1,2}:\d{2}'
  34. sessions_c = wx.ListControl(Name='会话')
  35. # 当满足一系列条件才开启与大模型的对话
  36. open_answer = False
  37. while True:
  38. Input = ''
  39. for session in sessions_c.GetChildren():
  40. if '条新消息' in session.Name:
  41. current_contact = wx.GetLastChildControl().GetFirstChildControl().GetLastChildControl().TextControl().Name
  42. print('cur:' + current_contact)
  43. contact = session.GetChildren()[0].GetChildren()[1].GetChildren()[0].GetChildren()[0].Name
  44. print('cont' + contact)
  45. if current_contact != contact:
  46. session.Click()
  47. messages = wx.ListControl(Name='消息')
  48. message = messages.GetChildren()[-1]
  49. if message.Name not in filter_type:
  50. if not re.match(filter_time, message.Name):
  51. if message.Name not in filter_meaningless:
  52. open_answer = True
  53. Input = message.Name
  54. if open_answer:
  55. text.clear()
  56. setting = '用幽默高情商风格回复字数不超15:'
  57. question = checklen(getText("user", setting + Input))
  58. SparkApi.answer = ""
  59. SparkApi.main(appid, api_key, api_secret, Spark_url, domain, question)
  60. getText("assistant", SparkApi.answer)
  61. # print(str(text))
  62. # 切换会话时会自动进入编辑
  63. wx.SendKeys(SparkApi.answer, waitTime=2)
  64. wx.SendKeys('{Enter}', waitTime=1)
  65. open_answer = False

补充:

因为官网的Python调用示例有时会更新,我们写好的自动化脚本会因为SparkApi中的参数调整、变量调整等原因而执行失败,所以这里我将可以成功执行的SparkApi方到这里

  1. import _thread as thread
  2. import base64
  3. import datetime
  4. import hashlib
  5. import hmac
  6. import json
  7. from urllib.parse import urlparse
  8. import ssl
  9. from datetime import datetime
  10. from time import mktime
  11. from urllib.parse import urlencode
  12. from wsgiref.handlers import format_date_time
  13. import websocket # 使用websocket_client
  14. answer = ""
  15. class Ws_Param(object):
  16. # 初始化
  17. def __init__(self, APPID, APIKey, APISecret, Spark_url):
  18. self.APPID = APPID
  19. self.APIKey = APIKey
  20. self.APISecret = APISecret
  21. self.host = urlparse(Spark_url).netloc
  22. self.path = urlparse(Spark_url).path
  23. self.Spark_url = Spark_url
  24. # 生成url
  25. def create_url(self):
  26. # 生成RFC1123格式的时间戳
  27. now = datetime.now()
  28. date = format_date_time(mktime(now.timetuple()))
  29. # 拼接字符串
  30. signature_origin = "host: " + self.host + "\n"
  31. signature_origin += "date: " + date + "\n"
  32. signature_origin += "GET " + self.path + " HTTP/1.1"
  33. # 进行hmac-sha256进行加密
  34. signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
  35. digestmod=hashlib.sha256).digest()
  36. signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
  37. authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
  38. authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
  39. # 将请求的鉴权参数组合为字典
  40. v = {
  41. "authorization": authorization,
  42. "date": date,
  43. "host": self.host
  44. }
  45. # 拼接鉴权参数,生成url
  46. url = self.Spark_url + '?' + urlencode(v)
  47. # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
  48. return url
  49. # 收到websocket错误的处理
  50. def on_error(ws, error):
  51. print("### error:", error)
  52. # 收到websocket关闭的处理
  53. def on_close(ws, one, two):
  54. print(" ")
  55. # 收到websocket连接建立的处理
  56. def on_open(ws):
  57. thread.start_new_thread(run, (ws,))
  58. def run(ws, *args):
  59. data = json.dumps(gen_params(appid=ws.appid, domain=ws.domain, question=ws.question))
  60. ws.send(data)
  61. # 收到websocket消息的处理
  62. def on_message(ws, message):
  63. # print(message)
  64. data = json.loads(message)
  65. code = data['header']['code']
  66. if code != 0:
  67. print(f'请求错误: {code}, {data}')
  68. ws.close()
  69. else:
  70. choices = data["payload"]["choices"]
  71. status = choices["status"]
  72. content = choices["text"][0]["content"]
  73. print(content, end="")
  74. global answer
  75. answer += content
  76. # print(1)
  77. if status == 2:
  78. ws.close()
  79. def gen_params(appid, domain, question):
  80. """
  81. 通过appid和用户的提问来生成请参数
  82. """
  83. data = {
  84. "header": {
  85. "app_id": appid,
  86. "uid": "1234"
  87. },
  88. "parameter": {
  89. "chat": {
  90. "domain": domain,
  91. "temperature": 0.5,
  92. "max_tokens": 2048
  93. }
  94. },
  95. "payload": {
  96. "message": {
  97. "text": question
  98. }
  99. }
  100. }
  101. return data
  102. def main(appid, api_key, api_secret, Spark_url, domain, question):
  103. # print("星火:")
  104. wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
  105. websocket.enableTrace(False)
  106. wsUrl = wsParam.create_url()
  107. ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
  108. ws.appid = appid
  109. ws.question = question
  110. ws.domain = domain
  111. ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/454900
推荐阅读
相关标签
  

闽ICP备14008679号