赞
踩
上一章,我们搭建对接文心一言(ERNIE-Bot大模型)接口的微信聊天机器人,但只支持一轮对话。这一章我们加入数据持久化的逻辑,以便保持上下文,防止服务重启造成的聊天记录丢失。
数据持久化有很多方式,比较好的一种方式是使用数据库,方便扩展功能。eggjs可以使用Sequelize(ORM框架)对数据库进行操作(官网文档点这里)。我们使用目前比较流行的PostgreSQL数据库(以下简称pg)。
pg数据库安装文件从官网上下载,点击Download the installer
我选择版本13 x86-64(注:本人的Navicat版本比较低,安装版本16连接不上)
下载完开始安装
pgAdmin是数据库客户端,如果你机子上已经装了Navicat,也可以不用装pgAdmin。
数据文件所在路径
数据库访问密码,我们设置为pg-ernic
数据库服务访问端口5432
后面一路next就行
我们打开Navicat,配置一下数据库连接。
连接名 localhost_pgsql(可自定义)
主机 localhost
端口5432
初始数据库postgres
用户名postgres
密码pg-ernic
配置完,点击左下角【测试连接】,如果正确就会提示连接成功。点击确定。
双击侧边栏localhost_pgsql,可以逐个展开,我们可以看到下面已经有默认的数据库postgres,目前里面没有任何表。我们待会可以用eggjs的Sequelize(ORM框架)进行建表。
我们打开config/plugin.js,增加一下插件
plugin.js代码如下:
- 'use strict';
-
- /** @type Egg.EggPlugin */
- module.exports = {
- sequelize: {
- enable: true,
- package: 'egg-sequelize',
- },
- };
config.default.js中加入数据库连接配置,还有联系上下文的配置。这里稍微扩展一下,会话接口联系上下文会导致输入的数据变多,接口计费的规则。我们要对上下文的时间做个限制,比如超过1个小时的聊天记录不再计入传参数据。
- config.sequelize = { //Sequelize 数据库配置
- dialect: 'postgres',
- database: 'postgres',
- host: '127.0.0.1',
- port: '5432',
- username: 'postgres',
- password: 'pg-ernic',
- timezone: '+08:00', // 数据库时间加8小时
- define: {
- freezeTableName: true, //使用自己配置的表名,避免sequelize自动将表名转换为复数
- createdAt: false, //不默认加创建时间字段:
- updatedAt: false, //不默认加修改时间字段:
- }
- };
-
- //配置wechat
- config.wechat = {
- manageName: '蓝莲花', //管理员昵称 登录成功会发消息给管理员
- relaContextStatus: true, //需要联系上下文,会导致输入文字变多,接口费用增加
- relaContextHours: 1 //需要联系上下文 几个小时前的历史消息
- }
从上一章,我们知道要维护两种数据的持久化,一个是access_token,一个是聊天记录。按eggjs的规范,需要创建两个对象模型定义文件。定义好模型后,项目运行起来就可以自动建表。我们在app下创建model目录,新增access_token.js和chat_record.js两个文件。
代码如下:
- 'use strict';
-
- module.exports = app => {
- const Sequelize = app.Sequelize;
- const moment = require('moment');
-
- const AccessToken = app.model.define('access_token', {
- id: {
- type: Sequelize.INTEGER,
- primaryKey: true,
- autoIncrement: true
- },
- access_token: {
- type: Sequelize.STRING(200),
- comment: ''
- },
- created_at: {
- type: Sequelize.DATE,
- allowNull: true,
- defaultValue: Sequelize.NOW,
- get() {
- return moment(this.getDataValue('created_at')).format('YYYY-MM-DD HH:mm:ss');
- },
- },
- updated_at: {
- type: Sequelize.DATE,
- allowNull: true,
- defaultValue: Sequelize.NOW,
- get() {
- return moment(this.getDataValue('updated_at')).format('YYYY-MM-DD HH:mm:ss');
- },
- },
- }, {
- comment: 'AccessToken表'
- });
-
- return AccessToken;
- };
这里简单介绍一下access_token模型的字段。
access_token字段用于记录access_token数据。
updated_at字段用于记录access_token更新时间。
后续我们可以根据updated_at字段值来判断是否超过有效期。
代码如下:
- 'use strict';
-
- module.exports = app => {
- const Sequelize = app.Sequelize;
- const moment = require('moment');
-
- const ChatRecord = app.model.define('chat_record', {
- id: {
- type: Sequelize.INTEGER,
- primaryKey: true,
- autoIncrement: true
- },
- room_id: {
- type: Sequelize.STRING(100),
- comment: '群聊id'
- },
- talker_id: {
- type: Sequelize.STRING(100),
- comment: '对话者id'
- },
- talker_name: {
- type: Sequelize.STRING(100),
- comment: '对话者昵称'
- },
- send_content: {
- type: Sequelize.TEXT,
- comment: '发送文字'
- },
- receive_content: {
- type: Sequelize.TEXT,
- comment: '接收文字'
- },
- status: {
- type: Sequelize.BOOLEAN,
- comment: '是否可用',
- defaultValue: true
- },
- created_at: {
- type: Sequelize.DATE,
- allowNull: true,
- defaultValue: Sequelize.NOW,
- get() {
- return moment(this.getDataValue('created_at')).format('YYYY-MM-DD HH:mm:ss');
- },
- },
- updated_at: {
- type: Sequelize.DATE,
- allowNull: true,
- defaultValue: Sequelize.NOW,
- get() {
- return moment(this.getDataValue('updated_at')).format('YYYY-MM-DD HH:mm:ss');
- },
- },
- deleted_at: {
- type: Sequelize.DATE,
- allowNull: true,
- get() {
- return moment(this.getDataValue('deleted_at')).format('YYYY-MM-DD HH:mm:ss');
- },
- },
- }, {
- comment: '对话记录表'
- });
-
- return ChatRecord;
- };
这里简单介绍一下聊天记录模型的字段。微信聊天有两种场景:
1、私聊
我们需要记录用户的对话者id,以便区分聊天记录。
2、群聊
我们需要同时记录群聊id和对话者id,以便区分聊天记录。
我们需要有个方法app.model.sync来维护model对应的表
- module.exports = app => {
- app.beforeStart(async () => {
- // 应用会等待这个函数执行完成才启动
- console.log("==app beforeStart==");
- await app.model.sync({ //Sequelize 模型配置
- // force: true, // 默认false 为不覆盖 true会删除表再创建(如果有生产业务数据非常危险,不建议打开);
- alter: true // 默认true可以 添加或删除字段;
- });
- });
-
- app.ready(async () => {
- console.log("==app ready==");
- let ctx = app.createAnonymousContext();
- await ctx.service.ernie.checkAccessToken(); //检查AccessToken
- await ctx.service.wechat.startBot(); //初始化BOT
- })
-
- app.beforeClose(async () => {
- console.log("==app beforeClose==");
- })
- };
我们将access_token的维护交给一个定时任务,每天执行一次,当到了29天的时候,刷新一下数据。定时任务需要在app下新增目录schedule,新增task.js文件。
task.js代码如下:
- module.exports = {
- schedule: {
- interval: '1d', // 1 天间隔
- type: 'all', // 指定所有的 worker 都需要执行
- immediate: false, //是否项目启动就执行一次定时任务
- },
- async task(ctx) { //函数名task不能改
- await ctx.service.ernie.checkAccessToken(); //检查AccessToken
- },
- };
ernie.js需要一个方法checkAccessToken,用于检查access_token的有效性,全部代码如下:
- const {
- Service
- } = require('egg');
-
- const moment = require('moment');
-
- class ErnieService extends Service {
- async getAccessToken() {
- console.log('===================ErnieService getAccessToken=====================');
- let ctx = this.ctx;
-
- try {
- const res = await ctx.curl(
- `https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=${ctx.app.config.ernie.client_id}&client_secret=${ctx.app.config.ernie.client_secret}`, {
- method: 'GET',
- rejectUnauthorized: false,
- data: {},
- headers: {},
- timeout: 30000,
- contentType: 'json',
- dataType: 'json',
- })
- console.log(res)
-
- if (res.data.access_token) {
- console.log('access_token', ctx.app.config.ernie.access_token)
- return res.data.access_token;
- }
-
- } catch (error) {
- console.log(error)
- }
- return null;
- }
-
- async sendMsg(msg) {
- console.log('===================ErnieService sendMsg=====================');
- console.log(JSON.stringify(msg));
- let ctx = this.ctx;
- try {
- const res = await ctx.curl(
- `https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=${ctx.app.config.ernie.access_token}`, {
- method: 'POST',
- rejectUnauthorized: false,
- data: {
- "messages": msg
- },
- timeout: 30000,
- contentType: 'json',
- dataType: 'json',
- })
- console.log(res)
-
- if (res.data) {
- return res.data;
- }
- return null;
- } catch (error) {
- console.log(error)
- return null;
- }
- }
-
- async checkAccessToken() {
- console.log('===================ErnieService checkAccessToken=====================');
- let ctx = this.ctx;
- const query = {};
- const accessToken = await ctx.model.AccessToken.findOne(query);
- if (accessToken) {
- console.log('accessToken', accessToken.access_token, accessToken.updated_at);
-
- if (moment(new Date()).diff(moment(accessToken.updated_at), 'days') >= ctx.app.config.ernie.expire_day -
- 1) { //提前一天刷新
- console.log('accessToken已失效,重新获取中')
- const accessTokenStr = await ctx.service.ernie.getAccessToken();
- if (accessTokenStr) {
- await accessToken.update({
- accessToken: accessTokenStr,
- updated_at: moment().format('YYYY-MM-DD HH:mm:ss')
- });
- ctx.app.config.ernie.access_token = accessTokenStr;
- console.log('accessToken更新成功');
- } else {
- console.log('accessToken获取失败');
- }
- } else {
- ctx.app.config.ernie.access_token = accessToken.access_token;
- console.log('accessToken在有效期内');
- }
- } else {
- console.log('accessToken不存在');
- const accessTokenStr = await ctx.service.ernie.getAccessToken();
- if (accessTokenStr) {
- const queryData = await ctx.model.AccessToken.create({
- access_token: accessTokenStr
- });
- ctx.app.config.ernie.access_token = accessTokenStr;
- console.log('accessToken更新成功');
- } else {
- console.log('accessToken获取失败');
- }
- }
- }
- }
-
- module.exports = ErnieService;
wechat.js需要对聊天记录进行读取和存储,用于保持上下文,全部代码如下:
- const {
- Service
- } = require('egg');
-
-
- const {
- WechatyBuilder,
- ScanStatus
- } = require("wechaty");
- const qrcode = require("qrcode-terminal");
- const Sequelize = require('sequelize');
- const Op = Sequelize.Op;
-
- let ctx;
- let wechaty;
- let startStatus = false;
-
- const onMessage = async (message) => {
- console.log(`收到消息: ${message}`);
- const room = message.room() // 是否是群消息
- if (!room || await message.mentionSelf()) { //如果是私聊或者群聊@我
- if (message.type() === wechaty.Message.Type.Text) {
- const sendContent = await message.text();
- if (sendContent) {
- try {
- let room_id = '';
- if (room) { //如果是群聊
- room_id = room.id;
- console.log(`room_id: ${room_id}`);
- }
-
- const talker = message.talker();
- const talker_id = talker.id;
- const talker_name = talker.name();
- console.log(`talker_id: ${talker_id}`);
- console.log(`${talker_name}: ${sendContent}`);
-
- let msgRecord = [];
-
- if (ctx.app.config.wechat.relaContextStatus) { //需要联系上下文
- let query = {
- where: {
- created_at: {
- [Op.gte]: Sequelize.literal(
- `NOW() - (INTERVAL '${ctx.app.config.wechat.relaContextHours} HOUR')`
- ),
- },
- status: true
- },
- attributes: {
- exclude: ['deleted_at', 'status']
- }
- };
- if (room_id) {
- query.where.room_id = room_id;
- } else {
- query.where.talker_id = talker_id;
- }
- const queryData = await ctx.model.ChatRecord.findAll(query);
-
- for (let i of queryData) {
- msgRecord.push({
- "role": "user",
- "content": i.send_content
- });
- msgRecord.push({
- "role": "assistant",
- "content": i.receive_content
- });
- }
- }
-
- msgRecord.push({
- "role": "user",
- "content": sendContent
- });
- let res = await ctx.service.ernie.sendMsg(msgRecord);
- if (res) {
- if (res.error_code) {
- message.say(JSON.stringify(res));
- console.log(`报错: ${JSON.stringify(res)}`);
- } else {
- if (res.result) {
- message.say(res.result);
- console.log(`回复: ${res.result}`);
- const chatRecord = await ctx.model.ChatRecord.create({
- room_id,
- talker_id,
- talker_name,
- send_content: sendContent,
- receive_content: res.result
- });
- }
- }
- }
- } catch (error) {
- console.log(error);
- message.say(JSON.stringify(error));
- }
- }
- }
- }
- };
-
- const onLogout = (user) => {
- console.log(`用户 ${user} 退出成功`);
- };
- const onLogin = async (user) => {
- console.log(`用户 ${user} 登录成功`);
-
- const contact = await wechaty.Contact.find({
- name: ctx.app.config.wechat.manageName
- })
- if (contact) {
- await contact.say('微信机器人服务初始化完毕!')
- } else {
- console.error('未找到管理员微信')
- }
- };
- const onError = console.error;
- const onScan = (code, status) => {
- // status: 2代表链接等待调用,3代表链接已打开,这个链接实际上是提供一个登录的二维码供扫描
- if (status === ScanStatus.Waiting) {
- // status: 2代表等待,3代表扫码完成
- qrcode.generate(code, {
- small: true
- }, console.log)
- }
- };
-
- class WechatService extends Service {
- async startBot() {
- console.log('===================WechatService startBot=====================');
- ctx = this.ctx;
- if (startStatus && wechaty) {
- if (wechaty.isLoggedIn) {
- await wechaty.logout();
- }
- await wechaty.stop();
- startStatus = false;
- wechaty = null;
- }
- wechaty = await WechatyBuilder.build();
- wechaty
- .on("scan", onScan)
- .on("login", onLogin)
- .on("logout", onLogout)
- .on("error", onError)
- .on("message", onMessage);
- await wechaty.start();
- startStatus = true;
- }
- }
-
- module.exports = WechatService;
最后,我们要安装一下组件
- npm i -s egg-sequelize
- npm i -s pg
- npm i -s pg-hstore
然后我们启动项目,微信扫码登录后,管理员会立即收到机器人发的消息(前提是已经加过机器人为好友,并且昵称为config.wechat.manageName中设置的内容)。然后就可以连续对话了。
至此,本系列主体功能都已经结束了。看看后续如果还有什么内容,我会继续补充。
本章完整代码在这里下载。运行前请配置好config/config.default.js里面config.ernie下的client_id和client_secret配置项。 微信登录二维码如有显示问题请参看第二章-微信登录二维码问题处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。