当前位置:   article > 正文

微信小程序--P2P消息收发模式(MQTT)_微信p2p

微信p2p

目录

前言

js demo

参数

new Paho.Client 创建对象

onConnectionLost  连接丢失回调

onMessageArrived  监听数据

disconnect() :关闭链接

connect (connectOptions)将此消息客户端连接到其服务器。

mqtt 频繁断开和重连问题

小程序实践

单例模式 mqtt 封装

initMqtt文件

页面创建链接


前言

P2P,顾名思义,是一对一的消息收发模式,即只有一个消息发送者和一个消息接收者。而Pub/Sub模式通常用于一对多或多对多的消息群发场景,即拥有一个或多个消息发送者和多个消息接收者的场景。

在P2P模式中,发送者发送消息时已经明确该消息预期的接收者信息,并明确该消息只需要被特定的单个客户端消费。发送者发送消息时通过Topic信息直接指定接收者,接收者无需提前订阅即可获取该消息。

P2P模式不仅可以为接收者节省注册订阅关系的成本,此外,由于收发消息的链路有单独的优化,还可以降低推送延迟。

参考文档 :P2P消息收发模式(MQTT

https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.2.13.229f42caHWXK5fhttps://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.2.13.229f42caHWXK5f参考文档 :P2P消息收发模式 .Client.

https://www.eclipse.org/paho/files/jsdoc/Paho.MQTT.Client.htmlhttps://www.eclipse.org/paho/files/jsdoc/Paho.MQTT.Client.html参考文档  paho-mqtt基础库

https://github.com/AwakenCN/InChat/blob/paho-mqtt/wechat-client/utils/paho-mqtt.jshttps://github.com/AwakenCN/InChat/blob/paho-mqtt/wechat-client/utils/paho-mqtt.js

 参考文档  阿里云安全库

https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.9-1/crypto-js.jshttps://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.9-1/crypto-js.js

js demo

  1. <?xml version="1.0" encoding="UTF-8" standalone="no"?>
  2. <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
  3. "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
  4. <html xmlns="http://www.w3.org/1999/xhtml">
  5. <head>
  6. <title>Aliyun Mqtt Websockets</title>
  7. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  8. <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
  9. <script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.9-1/crypto-js.js" type="text/javascript"></script>
  10. <script type="text/javascript">
  11. instanceId = 'XXXX';//实例 ID,购买后从控制台获取
  12. host = 'XXXX.mqtt.aliyuncs.com';// 设置当前用户的接入点域名,接入点获取方法请参考接入准备章节文档,先在控制台创建实例
  13. port = 80;//WebSocket 协议服务端口,如果是走 HTTPS,设置443端口
  14. topic = 'XXXX';//需要操作的 Topic,第一级父级 topic 需要在控制台申请
  15. useTLS = false;//是否走加密 HTTPS,如果走 HTTPS,设置为 true
  16. accessKey = 'XXXXX';//账号的 AccessKey,在阿里云控制台查看
  17. secretKey = 'XXXXX';//账号的的 SecretKey,在阿里云控制台查看
  18. cleansession = true;
  19. groupId = 'GID_XXXX';//MQTT GroupID,创建实例后从 MQTT 控制台创建
  20. clientId = groupId + '@@@00001';//GroupId@@@DeviceId,由控制台创建的 Group ID 和自己指定的 Device ID 组合构成
  21. var mqtt;
  22. var reconnectTimeout = 2000;
  23. var username = 'Signature|' + accessKey + '|' + instanceId;//username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7
  24. var password = CryptoJS.HmacSHA1(clientId, secretKey).toString(CryptoJS.enc.Base64);
  25. function MQTTconnect() {
  26. mqtt = new Paho.MQTT.Client(
  27. host,//MQTT 域名
  28. port,//WebSocket 端口,如果使用 HTTPS 加密则配置为443,否则配置80
  29. clientId//客户端 ClientId
  30. );
  31. var options = {
  32. timeout: 3,
  33. onSuccess: onConnect,
  34. mqttVersion: 4,
  35. cleanSession: cleansession,
  36. onFailure: function (message) {
  37. setTimeout(MQTTconnect, reconnectTimeout);
  38. }
  39. };
  40. mqtt.onConnectionLost = onConnectionLost;
  41. mqtt.onMessageArrived = onMessageArrived;
  42. if (username != null) {
  43. options.userName = username;
  44. options.password = password;
  45. options.useSSL = useTLS;//如果使用 HTTPS 加密则配置为 true
  46. }
  47. mqtt.connect(options);
  48. }
  49. function onConnect() {
  50. // Connection succeeded; subscribe to our topic
  51. //发送 P2P 消息,topic 设置方式参考https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.6.586.694f7cb4oookL7
  52. message = new Paho.MQTT.Message("Hello mqtt P2P Msg!!");//set body
  53. message.destinationName = topic + "/p2p/" + clientId;// set topic
  54. mqtt.send(message);
  55. }
  56. function onConnectionLost(response) {
  57. setTimeout(MQTTconnect, reconnectTimeout);
  58. };
  59. function onMessageArrived(message) {
  60. var topic = message.destinationName;
  61. var payload = message.payloadString;
  62. console.log("recv msg : " + topic + " " + payload);
  63. };
  64. MQTTconnect();
  65. </script>
  66. </head>
  67. </html>

参数

new Paho.Client 创建对象

JavaScript应用程序使用Paho.MQTT.Client对象与服务器通信。

大多数应用程序只创建一个Client对象,然后调用其connect()方法,但是如果需要,应用程序可以创建多个Client对象。在这种情况下,每个客户端对象的主机、端口和clientId属性的组合必须不同。

发送、订阅和取消订阅方法被实现为异步JavaScript方法(即使底层协议交换本质上可能是同步的)。这意味着它们通过调用应用程序(通过应用程序提供的有关方法的成功或失败回调函数)来发出完成的信号。这种回调在每个方法调用中最多调用一次,并且不会持续到调用脚本的生命周期之外。

相比之下,Paho.MQTT.Client对象上定义了一些回调函数,尤其是onMessageArrived。这些方法可能会被多次调用,并且与客户端进行的特定方法调用没有直接关系。

  • host字符串消息服务器的地址,作为完全限定的WebSocket URI,作为DNS名称或虚线十进制IP地址。
  • port number要连接到的端口号-仅当主机不是URI时才需要
  • path字符串要连接到的主机上的路径-仅当主机不是URI时使用。默认值:'/mqt'。
  • clientId字符串消息传递客户端标识符,长度在1到23个字符之间。

onConnectionLost  连接丢失回调

当连接丢失时调用。connect()方法成功后。建立连接丢失时使用的回调。连接可能会丢失,因为客户端启动了断开连接,或者因为服务器或网络导致客户端断开连接。例如,如果客户端无法连接,则可以在不调用connectionComplete回调的情况下调用disconnect回调。将单个响应对象参数传递给onConnectionLost回调,该回调包含以下字段:

  •  errorCode 错误代码
  • errorMessage 错误消息

onMessageArrived  监听数据

当消息到达此Paho.MQTT.客户端时调用。传递给onMessageArrived回调的参数为

  • destinationName强制消息要发送到的目的地的名称(对于即将发送的消息)或接收消息的目的地名称。(用于onMessage功能接收的消息)。
  • payloadString :只读如果有效负载包含有效的UTF-8字符,则将有效负载作为字符串
  • payloadBytes :只读作为ArrayBuffer的有效负载
  • qs:用于传递消息的服务质量。0最大努力(默认)。1至少一次。2正好一次。
  • retained:如果为true,则服务器将保留该消息,并将其传递给当前订阅和未来订阅。如果为false,则服务器仅将消息传递给当前订户,这是新消息的默认值。如果消息发布时保留的布尔值设置为true,并且在消息发布后进行了订阅,则收到的消息将保留的布尔设置为true。
  • duplicate :只读如果为true,则此消息可能与已接收的消息重复。这仅在从服务器接收的消息上设置。

disconnect() :关闭链接

isConnected:(检查链接状态)

获取或设置当前的服务器连接是否成功,定时获取本属性可用于实时更新连接状态信息。

connect (connectOptions)将此消息客户端连接到其服务器。

connectOptions用于连接的属性

  1. timeout : 如果在该秒数内连接未成功,则视为连接失败。默认值为30秒
  2. userName 此连接的身份验证用户名
  3. password:此连接的身份验证密码。
  4. onSuccess:当从服务器接收到连接确认时调用。单个响应对象参数传递给onSuccess回调,该回调包含以下字段: invocationContext传递给connectOptions中的onSuccess方法。
  5. cleanSession:清除会话  cleanif true(默认值)成功连接时删除客户端和服务器的持久状态。
  6. mqttVersion 用于连接到MQTT代理的MQTT版本。 3or4
  7. useSSL 如果存在且为真,请使用SSL Websocket连接。{如果使用 HTTPS 加密则配置为 true}
  8. onFailure:当连接请求失败或超时时调用。单个响应对象参数传递给onFailure回调,该回调包含以下字段: 
  • 传入connectOptions中onFailure方法的invocationContext。
  • errorCode表示错误性质的数字。
  • error描述错误的消息文本。

mqtt 频繁断开和重连问题

mqtt.js底层没有对你调用它方法传入的callback函数做异常捕获(你传入callback函数里面不做异常捕获,会导致mqtt底层代码逻辑异常,导致频繁断连&重连问题发生),所以你所有的callback函数都需要增加try..catch..方法捕获异常(比如发布,订阅,监听等方法调用的时候的第二个callback函数)

小程序实践

  1. const Paho = require('../../utils/mqtt');
  2. const CryptoJS = require('../../utils/crypto-js.js');
  3. let instanceId = 'XXXX';//实例 ID,购买后从控制台获取
  4. let host = 'XXXX.mqtt.aliyuncs.com';// 设置当前用户的接入点域名,接入点获取方法请参考接入准备章节文档,先在控制台创建实例
  5. let port = 80;//WebSocket 协议服务端口,如果是走 HTTPS,设置443端口
  6. let topic = 'XXXX';//需要操作的 Topic,第一级父级 topic 需要在控制台申请
  7. let useTLS = false;//是否走加密 HTTPS,如果走 HTTPS,设置为 true
  8. let accessKey = 'XXXXX';//账号的 AccessKey,在阿里云控制台查看
  9. let secretKey = 'XXXXX';//账号的的 SecretKey,在阿里云控制台查看
  10. let cleansession = true;
  11. let groupId = 'GID_XXXX';//MQTT GroupID,创建实例后从 MQTT 控制台创建
  12. let clientId = groupId + '@@@00001';//GroupId@@@DeviceId,由控制台创建的 Group ID 和自己指定的 Device ID 组合构成
  13. var mqtt;
  14. var reconnectTimeout = 2000;
  15. var username = 'Signature|' + accessKey + '|' + instanceId;//username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7
  16. var password = CryptoJS.HmacSHA1(clientId, secretKey).toString(CryptoJS.enc.Base64);
  17. onLoad(options) {
  18. this.MQTTconnect()
  19. },
  20. // MQTT连接
  21. MQTTconnect() {
  22. try {
  23. let that = this
  24. //创建mqtt对象
  25. mqtt = new Paho.Client(
  26. host,//MQTT 域名
  27. port,//WebSocket 端口,如果使用 HTTPS 加密则配置为443,否则配置80
  28. clientId//客户端 ClientId
  29. );
  30. // 准备链接属性
  31. var options = {
  32. timeout: 3, //超时时间
  33. onSuccess: that.onConnect, //当从服务器接收到连接确认时调用
  34. mqttVersion: 4,//用于连接到MQTT代理的MQTT版本。
  35. cleanSession: cleansession,//清除会话  cleanif true(默认值)成功连接时删除客户端和服务器的持久状态。
  36. onFailure: function (message) {//当连接请求失败或超时时调用。
  37. console.log(message, "连接请求失败或超时。");
  38. setTimeout(that.MQTTconnect, reconnectTimeout);
  39. }
  40. };
  41. mqtt.onConnectionLost = that.onConnectionLost; //连接丢失回调
  42. mqtt.onMessageArrived = that.onMessageArrived; //监听数据回调
  43. if (username != null) {
  44. options.userName = username;//此连接的身份验证密码。
  45. options.password = password;//此连接的身份验证用户名s
  46. options.useSSL = useTLS;//如果使用 HTTPS 加密则配置为 true
  47. }
  48. //链接
  49. mqtt.connect(options);
  50. } catch (error) {
  51. }
  52. },
  53. //当从服务器接收到连接确认时调用
  54. onConnect() {
  55. try {
  56. // Connection succeeded; subscribe to our topic
  57. //发送 P2P 消息,topic 设置方式参考https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.6.586.694f7cb4oookL7
  58. message = new Paho.Message("Hello mqtt P2P Msg!!");//set body
  59. message.destinationName = topic + "/p2p/" + clientId;// set topic
  60. mqtt.send(message);
  61. } catch (error) {
  62. }
  63. },
  64. //连接丢失回调
  65. onConnectionLost(response) {
  66. try {
  67. let that = this
  68. console.log("连接丢失了:" + response.errorCode + response.errorMessage)
  69. setTimeout(that.MQTTconnect, reconnectTimeout);
  70. } catch (error) {
  71. }
  72. },
  73. //监听数据
  74. onMessageArrived(message) {
  75. try {
  76. var topic = message.destinationName;
  77. var payload = message.payloadString;
  78. console.log("recv msg : " + topic + " " + payload);
  79. } catch (error) {
  80. }
  81. },

单例模式 mqtt 封装

保证只有一个链接对象,便于复用,页使用面重写 onConnectionLost & onMessageArrived 

  • getInstance 创建链接
  • onConnect  服务器确认链接回调
  • disconnect 关闭链接
  • onConnectionLost 连接丢失回调
  • onMessageArrived 监听数据
  • onFailure 创建链接失败回调 页面传入callback
  • info 链接信息

initMqtt文件

  1. const single = (() => {
  2. let mqtt, instanceId, host, port, accessKey, secretKey, clientId, username, password, topic, message, infodata;
  3. let i = 0;
  4. const cleanSession = true;
  5. const useTLS = true;
  6. // 创建mqtt链接对象
  7. const createMqttObject = () => {
  8. mqtt = new Paho.Client(host, port, clientId);
  9. };
  10. // 建立mqtt链接
  11. const linkMqttObjectWithProperties = (options) => {
  12. // 链接对象存在&&已连接不在执行
  13. if (mqtt && mqtt.isConnected()) {
  14. console.log('已经连接---error');
  15. return;
  16. }
  17. try {
  18. mqtt.onConnectionLost = onConnectionLost;
  19. mqtt.onMessageArrived = onMessageArrived;
  20. if (username != null) {
  21. options.userName = username;
  22. options.password = password;
  23. options.useSSL = useTLS;
  24. }
  25. mqtt.connect(options);
  26. console.log('建立链接啦-----OK');
  27. } catch (error) {
  28. console.log(error);
  29. }
  30. };
  31. // 创建单例对象,保证只要一个mqtt对象
  32. function getInstance(info) {
  33. // console.log(`getInstance执行创建mqtt,mqtt: ${mqtt}, i: ${i}`);
  34. if (!mqtt) {
  35. //如果没有创建对象
  36. infodata = info;
  37. instanceId = info.instanceId;
  38. host = info.host;
  39. port = 443;
  40. accessKey = info.accessKey;
  41. secretKey = info.secretKey;
  42. clientId = `${info.client_id}${info.shopId}${info.xmid}`;
  43. username = `Signature|${accessKey}|${instanceId}`;
  44. password = CryptoJS.HmacSHA1(clientId, secretKey).toString(CryptoJS.enc.Base64);
  45. topic = info.topic;
  46. createMqttObject();
  47. }
  48. // 准备链接信息
  49. const options = {
  50. timeout: 3,
  51. onSuccess: onConnect,
  52. mqttVersion: 4,
  53. cleanSession: cleanSession,
  54. onFailure: (response) => {
  55. // console.log(response, '当连接请求失败或超时时调用----------');
  56. setTimeout(() => {
  57. if (response.errorCode != 8) {
  58. getInstance(infodata);
  59. }
  60. }, 2000);
  61. }
  62. };
  63. // 建立连接
  64. linkMqttObjectWithProperties(options);
  65. // console.log(mqtt, 'mqttttttttt');
  66. return mqtt;
  67. }
  68. function onConnect() {
  69. try {
  70. // message = new Paho.Message("Hello mqtt P2P Msg!!");
  71. // message.destinationName = `${topic}/p2p/${clientId}`;
  72. // mqtt.send(message);·
  73. console.log('链接成功');
  74. } catch (err) {
  75. console.log(err);
  76. }
  77. }
  78. // 如果是已连接在关闭连接 制空连接对象
  79. function Disconnect() {
  80. if (mqtt && mqtt.isConnected()) {
  81. console.log('通道关闭成功 js');
  82. mqtt.disconnect();
  83. mqtt = undefined;
  84. }
  85. }
  86. // 连接丢失的回调
  87. function onConnectionLost(response) {
  88. // console.log("连接丢失回调 js文件");
  89. // console.log(response);
  90. setTimeout(() => {
  91. if (response.errorCode != 0) {
  92. getInstance(infodata);
  93. }
  94. }, 2000);
  95. }
  96. // 监听数据回调
  97. function onMessageArrived(message) {
  98. // console.log("监听数据js 文件");
  99. PubSub.publish('mqttMessageReceived', message);
  100. }
  101. return {
  102. getInstance,
  103. Disconnect,
  104. };
  105. })();

页面创建链接

  1. onShow() {
  2. PubSub.unsubscribe('mqttMessageReceived',/*你要解绑的事件名*/);
  3. PubSub.subscribe('mqttMessageReceived', (msg, data) => {
  4. this.onMessageArrived(data)
  5. });
  6. },
  7. onMessageArrived(data){
  8. console.log(data)
  9. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/405189
推荐阅读
相关标签
  

闽ICP备14008679号