当前位置:   article > 正文

RabbitMQ实现即时通讯

RabbitMQ实现即时通讯

有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。最近发现RabbitMQ可以很方便的实现即时通讯功能,如果你没有特殊的业务需求,甚至可以不写后端代码,今天给大家讲讲如何使用RabbitMQ来实现即时通讯

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。

  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。

  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。

  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。

  • Payload(负载);可以理解为发送消息的内容。

  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0QoS 1QoS 2三个等级,下面分别介绍下:

    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;

    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;

    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。

rabbitmq-plugins enable rabbitmq_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。

MQTT客户端

我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTBox这个客户端工具。

  • 首先下载并安装好MQTTBox,下载地址:http://workswithweb.com/mqttbox.html

  • 点击Create MQTT Client按钮来创建一个MQTT客户端;

  • 接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;

  • 再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;

  • 发布者向主题中发布消息,订阅者可以实时接收到。

前端直接实现即时通讯

既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;

rabbitmq-plugins enable rabbitmq_web_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;

  • WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js

  • 实现的功能非常简单,一个单聊功能,需要注意的是配置好MQTT服务的访问地址为:ws://localhost:15675/ws

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4.     <meta charset="UTF-8">
  5.     <title>Title</title>
  6. </head>
  7. <body>
  8. <div>
  9.     <label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
  10.     <label>发送消息:<input id="messageInput" type="text"></label><br>
  11.     <button onclick="sendMessage()">发送</button>
  12.     <button onclick="clearMessage()">清空</button>
  13.     <div id="messageDiv"></div>
  14. </div>
  15. </body>
  16. <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
  17. <script>
  18.     //RabbitMQ的web-mqtt连接地址
  19.     const url = 'ws://localhost:15675/ws';
  20.     //获取订阅的topic
  21.     const topic = getQueryString("topic");
  22.     //连接到消息队列
  23.     let client = mqtt.connect(url);
  24.     client.on('connect'function () {
  25.         //连接成功后订阅topic
  26.         client.subscribe(topic, function (err) {
  27.             if (!err) {
  28.                 showMessage("订阅topic:" + topic + "成功!");
  29.             }
  30.         });
  31.     });
  32.     //获取订阅topic中的消息
  33.     client.on('message'function (topic, message) {
  34.         showMessage("收到消息:" + message.toString());
  35.     });
  36.     //发送消息
  37.     function sendMessage() {
  38.         let targetTopic = document.getElementById("targetTopicInput").value;
  39.         let message = document.getElementById("messageInput").value;
  40.         //向目标topic中发送消息
  41.         client.publish(targetTopic, message);
  42.         showMessage("发送消息给" + targetTopic + "的消息:" + message);
  43.     }
  44.     //从URL中获取参数
  45.     function getQueryString(name) {
  46.         let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)""i");
  47.         let r = window.location.search.substr(1).match(reg);
  48.         if (r != null) {
  49.             return decodeURIComponent(r[2]);
  50.         }
  51.         return null;
  52.     }
  53.     //在消息列表中展示消息
  54.     function showMessage(message) {
  55.         let messageDiv = document.getElementById("messageDiv");
  56.         let messageEle = document.createElement("div");
  57.         messageEle.innerText = message;
  58.         messageDiv.appendChild(messageEle);
  59.     }
  60.     //清空消息列表
  61.     function clearMessage() {
  62.         let messageDiv = document.getElementById("messageDiv");
  63.         messageDiv.innerHTML = "";
  64.     }
  65. </script>
  66. </html>
  • 接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目录下了,需要先启动应用再访问):

    • 第一个订阅主题testTopicA,访问地址:http://localhost:8088/page/index?topic=testTopicA

    • 第二个订阅主题testTopicB,访问地址:http://localhost:8088/page/index?topic=testTopicB

  • 之后互相发送消息,让我们来看看效果吧!

在SpringBoot中使用

没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。

  • 首先我们需要在pom.xml中添加MQTT相关依赖;

  1. <!--Spring集成MQTT-->
  2. <dependency>
  3.     <groupId>org.springframework.integration</groupId>
  4.     <artifactId>spring-integration-mqtt</artifactId>
  5. </dependency>
  • application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;

  1. rabbitmq:
  2.   mqtt:
  3.     url: tcp://localhost:1883
  4.     username: guest
  5.     password: guest
  6.     defaultTopic: testTopic
  • 编写一个Java配置类从配置文件中读取配置便于使用;

  1. /**
  2.  * MQTT相关配置
  3.  * Created by macro on 2020/9/15.
  4.  */
  5. @Data
  6. @EqualsAndHashCode(callSuper = false)
  7. @Component
  8. @ConfigurationProperties(prefix = "rabbitmq.mqtt")
  9. public class MqttConfig {
  10.     /**
  11.      * RabbitMQ连接用户名
  12.      */
  13.     private String username;
  14.     /**
  15.      * RabbitMQ连接密码
  16.      */
  17.     private String password;
  18.     /**
  19.      * RabbitMQ的MQTT默认topic
  20.      */
  21.     private String defaultTopic;
  22.     /**
  23.      * RabbitMQ的MQTT连接地址
  24.      */
  25.     private String url;
  26. }
  • 添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;

  1. /**
  2.  * MQTT消息订阅者相关配置
  3.  * Created by macro on 2020/9/15.
  4.  */
  5. @Slf4j
  6. @Configuration
  7. public class MqttInboundConfig {
  8.     @Autowired
  9.     private MqttConfig mqttConfig;
  10.     @Bean
  11.     public MessageChannel mqttInputChannel() {
  12.         return new DirectChannel();
  13.     }
  14.     @Bean
  15.     public MessageProducer inbound() {
  16.         MqttPahoMessageDrivenChannelAdapter adapter =
  17.                 new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
  18.                         mqttConfig.getDefaultTopic());
  19.         adapter.setCompletionTimeout(5000);
  20.         adapter.setConverter(new DefaultPahoMessageConverter());
  21.         //设置消息质量:0->至多一次;1->至少一次;2->只有一次
  22.         adapter.setQos(1);
  23.         adapter.setOutputChannel(mqttInputChannel());
  24.         return adapter;
  25.     }
  26.     @Bean
  27.     @ServiceActivator(inputChannel = "mqttInputChannel")
  28.     public MessageHandler handler() {
  29.         return new MessageHandler() {
  30.             @Override
  31.             public void handleMessage(Message<?> message) throws MessagingException {
  32.                 //处理订阅消息
  33.                 log.info("handleMessage : {}",message.getPayload());
  34.             }
  35.         };
  36.     }
  37. }
  • 添加MQTT消息发布者相关配置;

  1. /**
  2.  * MQTT消息发布者相关配置
  3.  * Created by macro on 2020/9/15.
  4.  */
  5. @Configuration
  6. public class MqttOutboundConfig {
  7.     @Autowired
  8.     private MqttConfig mqttConfig;
  9.     @Bean
  10.     public MqttPahoClientFactory mqttClientFactory() {
  11.         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  12.         MqttConnectOptions options = new MqttConnectOptions();
  13.         options.setServerURIs(new String[] { mqttConfig.getUrl()});
  14.         options.setUserName(mqttConfig.getUsername());
  15.         options.setPassword(mqttConfig.getPassword().toCharArray());
  16.         factory.setConnectionOptions(options);
  17.         return factory;
  18.     }
  19.     @Bean
  20.     @ServiceActivator(inputChannel = "mqttOutboundChannel")
  21.     public MessageHandler mqttOutbound() {
  22.         MqttPahoMessageHandler messageHandler =
  23.                 new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
  24.         messageHandler.setAsync(true);
  25.         messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
  26.         return messageHandler;
  27.     }
  28.     @Bean
  29.     public MessageChannel mqttOutboundChannel() {
  30.         return new DirectChannel();
  31.     }
  32. }
  • 添加MQTT网关,用于向主题中发送消息;

  1. /**
  2.  * MQTT网关,通过接口将数据传递到集成流
  3.  * Created by macro on 2020/9/15.
  4.  */
  5. @Component
  6. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  7. public interface MqttGateway {
  8.     /**
  9.      * 发送消息到默认topic
  10.      */
  11.     void sendToMqtt(String payload);
  12.     /**
  13.      * 发送消息到指定topic
  14.      */
  15.     void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
  16.     /**
  17.      * 发送消息到指定topic并设置QOS
  18.      */
  19.     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
  20. }
  • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;

  1. /**
  2.  * MQTT测试接口
  3.  * Created by macro on 2020/9/15.
  4.  */
  5. @Api(tags = "MqttController", description = "MQTT测试接口")
  6. @RestController
  7. @RequestMapping("/mqtt")
  8. public class MqttController {
  9.     @Autowired
  10.     private MqttGateway mqttGateway;
  11.     @PostMapping("/sendToDefaultTopic")
  12.     @ApiOperation("向默认主题发送消息")
  13.     public CommonResult sendToDefaultTopic(String payload) {
  14.         mqttGateway.sendToMqtt(payload);
  15.         return CommonResult.success(null);
  16.     }
  17.     @PostMapping("/sendToTopic")
  18.     @ApiOperation("向指定主题发送消息")
  19.     public CommonResult sendToTopic(String payload, String topic) {
  20.         mqttGateway.sendToMqtt(payload, topic);
  21.         return CommonResult.success(null);
  22.     }
  23. }
  • 调用接口向主题中发送消息进行测试;

  • 后台成功接收到消息并进行打印。

  1. 2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
  2. 2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
  3. 2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息

总结

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!

项目源码地址

https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/484227?site
推荐阅读
相关标签
  

闽ICP备14008679号