当前位置:   article > 正文

SpringBoot集成MQTT,WebSocket返回前端信息_springboot mqtt websocket

springboot mqtt websocket

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

直接上代码

pom文件

  1. <!--websocket依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-websocket</artifactId>
  5. <version>2.1.3.RELEASE</version>
  6. </dependency>
  7. <!-- mqtt-->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-integration</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.integration</groupId>
  14. <artifactId>spring-integration-stream</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.integration</groupId>
  18. <artifactId>spring-integration-mqtt</artifactId>
  19. </dependency>

application.yml配置文件

  1. #mqtt配置
  2. com:
  3. mqtt:
  4. #连接地址
  5. url: tcp://127.0.0.1:7788
  6. #客户端id(不能重复)
  7. clientId: mqtt_test1234
  8. #订阅主题
  9. topics: test/#
  10. #MQTT用户名
  11. username: admin
  12. #MQTT密码
  13. password: 123456
  14. #超时时间
  15. timeout: 30
  16. #保持连接数
  17. keepalive: 20

mqtt的java配置文件

  1. import org.springframework.boot.context.properties.ConfigurationProperties;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * @Author majinzhong
  5. * @Date 2023/2/7 15:08
  6. * @Version 1.0
  7. * 读取yml
  8. */
  9. @Component
  10. @ConfigurationProperties(prefix = "com.mqtt") //对应yml文件中的com下的mqtt文件配置
  11. public class MqttConfiguration {
  12. private String url;
  13. private String clientId;
  14. private String topics;
  15. private String username;
  16. private String password;
  17. private String timeout;
  18. private String keepalive;
  19. public String getUrl() {
  20. return url;
  21. }
  22. public void setUrl(String url) {
  23. this.url = url;
  24. }
  25. public String getUsername() {
  26. return username;
  27. }
  28. public void setUsername(String username) {
  29. this.username = username;
  30. }
  31. public String getPassword() {
  32. return password;
  33. }
  34. public void setPassword(String password) {
  35. this.password = password;
  36. }
  37. public String getClientId() {
  38. return clientId;
  39. }
  40. public void setClientId(String clientId) {
  41. this.clientId = clientId;
  42. }
  43. public String getTopics() {
  44. return topics;
  45. }
  46. public void setTopics(String topics) {
  47. this.topics = topics;
  48. }
  49. public String getTimeout() {
  50. return timeout;
  51. }
  52. public void setTimeout(String timeout) {
  53. this.timeout = timeout;
  54. }
  55. public String getKeepalive() {
  56. return keepalive;
  57. }
  58. public void setKeepalive(String keepalive) {
  59. this.keepalive = keepalive;
  60. }
  61. }
  1. import com.shangfei.service.SocketService;
  2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.integration.annotation.IntegrationComponentScan;
  7. import org.springframework.integration.annotation.ServiceActivator;
  8. import org.springframework.integration.channel.DirectChannel;
  9. import org.springframework.integration.core.MessageProducer;
  10. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  11. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  12. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  13. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  14. import org.springframework.integration.mqtt.support.MqttHeaders;
  15. import org.springframework.messaging.Message;
  16. import org.springframework.messaging.MessageChannel;
  17. import org.springframework.messaging.MessageHandler;
  18. import org.springframework.messaging.MessagingException;
  19. /**
  20. * @Author majinzhong
  21. * @Date 2023/2/7 15:09
  22. * @Version 1.0
  23. * MQTT消费端
  24. */
  25. @Configuration
  26. @IntegrationComponentScan
  27. public class MqttInboundConfiguration {
  28. @Autowired
  29. private MqttConfiguration mqttProperties;
  30. @Bean
  31. public MessageChannel mqttInputChannel() {
  32. return new DirectChannel();
  33. }
  34. @Bean
  35. public MqttPahoClientFactory mqttInClientFactory() {
  36. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  37. String[] array = mqttProperties.getUrl().split(",");
  38. MqttConnectOptions options = new MqttConnectOptions();
  39. options.setServerURIs(array);
  40. options.setUserName(mqttProperties.getUsername());
  41. options.setPassword(mqttProperties.getPassword().toCharArray());
  42. options.setKeepAliveInterval(2);
  43. //接受离线消息
  44. options.setCleanSession(false);
  45. factory.setConnectionOptions(options);
  46. return factory;
  47. }
  48. //配置client,监听的topic
  49. @Bean
  50. public MessageProducer inbound() {
  51. String[] inboundTopics = mqttProperties.getTopics().split(",");
  52. MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
  53. mqttProperties.getClientId()+"_inbound",mqttInClientFactory(), inboundTopics); //对inboundTopics主题进行监听
  54. adapter.setCompletionTimeout(5000);
  55. adapter.setQos(1);
  56. adapter.setConverter(new DefaultPahoMessageConverter());
  57. adapter.setOutputChannel(mqttInputChannel());
  58. return adapter;
  59. }
  60. //通过通道获取数据
  61. @Bean
  62. @ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
  63. public MessageHandler handler() {
  64. return new MessageHandler() {
  65. @Override
  66. public void handleMessage(Message<?> message) throws MessagingException {
  67. System.out.println("----------------------");
  68. //获取mqtt的topic
  69. String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
  70. //使用webSocket返回给前端
  71. SocketService socketService = new SocketService();
  72. socketService.onMessage(message.getPayload().toString(),null,topic);
  73. System.out.println("message:"+message.getPayload());
  74. System.out.println("PacketId:"+message.getHeaders().getId());
  75. System.out.println("Qos:"+message.getHeaders().get(MqttHeaders.QOS));
  76. System.out.println("topic:"+topic);
  77. }
  78. };
  79. }
  80. }
  1. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.integration.annotation.ServiceActivator;
  6. import org.springframework.integration.channel.DirectChannel;
  7. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  8. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  9. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  10. import org.springframework.messaging.MessageChannel;
  11. import org.springframework.messaging.MessageHandler;
  12. /**
  13. * @Author majinzhong
  14. * @Date 2023/2/7 15:09
  15. * @Version 1.0
  16. * MQTT生产端
  17. */
  18. @Configuration
  19. public class MqttOutboundConfiguration {
  20. @Autowired
  21. private MqttConfiguration mqttProperties;
  22. @Bean
  23. public MessageChannel mqttOutboundChannel() {
  24. return new DirectChannel();
  25. }
  26. @Bean
  27. public MqttPahoClientFactory mqttOutClientFactory() {
  28. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  29. String[] array = mqttProperties.getUrl().split(",");
  30. MqttConnectOptions options = new MqttConnectOptions();
  31. options.setServerURIs(array);
  32. options.setUserName(mqttProperties.getUsername());
  33. options.setPassword(mqttProperties.getPassword().toCharArray());
  34. // 接受离线消息
  35. options.setCleanSession(false); //告诉代理客户端是否要建立持久会话 false为建立持久会话
  36. factory.setConnectionOptions(options);
  37. return factory;
  38. }
  39. @Bean
  40. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  41. public MessageHandler mqttOutbound() {
  42. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
  43. mqttProperties.getClientId()+"outbound", mqttOutClientFactory());
  44. messageHandler.setAsync(true);
  45. return messageHandler;
  46. }
  47. }

websocket的java配置文件

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  4. /**
  5. * @authoer:majinzhong
  6. * @Date: 2022/11/7
  7. * @description:
  8. */
  9. @Configuration
  10. public class WebSocketConfig {
  11. @Bean
  12. public ServerEndpointExporter serverEndpointExporter() {
  13. return new ServerEndpointExporter();
  14. }
  15. }

MQTT的controller文件

  1. import com.shangfei.service.MqttGateway;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. /**
  6. * @Author majinzhong
  7. * @Date 2023/2/7 15:10
  8. * @Version 1.0
  9. */
  10. @RestController
  11. public class MqttPubController {
  12. @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
  13. @Autowired
  14. private MqttGateway mqttGateway;
  15. @RequestMapping("/hello")
  16. public String hello() {
  17. return "hello!";
  18. }
  19. @RequestMapping("/sendMqtt")
  20. public String sendMqtt(String sendData){
  21. System.out.println(sendData);
  22. System.out.println("进入sendMqtt-------"+sendData);
  23. mqttGateway.sendToMqtt("topic01",(String) sendData);
  24. return "Test is OK";
  25. }
  26. @RequestMapping("/sendMqttTopic")
  27. public String sendMqtt(String sendData,String topic){
  28. //System.out.println(sendData+" "+topic);
  29. //System.out.println("进入inbound发送:"+sendData);
  30. mqttGateway.sendToMqtt(topic,(String) sendData);
  31. return "Test is OK";
  32. }
  33. }

MQTT的service文件

  1. import org.springframework.integration.annotation.MessagingGateway;
  2. import org.springframework.integration.mqtt.support.MqttHeaders;
  3. import org.springframework.messaging.handler.annotation.Header;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @Author majinzhong
  7. * @Date 2023/2/7 15:21
  8. * @Version 1.0
  9. */
  10. @Service
  11. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  12. public interface MqttGateway {
  13. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData);
  14. }

websocket的sevice文件

  1. import cn.hutool.json.JSON;
  2. import cn.hutool.json.JSONUtil;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.stereotype.Service;
  5. import org.springframework.web.bind.annotation.CrossOrigin;
  6. import javax.websocket.*;
  7. import javax.websocket.server.PathParam;
  8. import javax.websocket.server.ServerEndpoint;
  9. import java.util.HashMap;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.concurrent.CopyOnWriteArraySet;
  13. /**
  14. * @authoer:majinzhong
  15. * @Date: 2022/11/16
  16. * @description:
  17. */
  18. @Component
  19. @ServerEndpoint(value = "/socket/{nickname}")
  20. @CrossOrigin
  21. @Service
  22. public class SocketService {
  23. /**
  24. * 用来存放每个客户端对应的MyWebSocket对象。
  25. **/
  26. private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>();
  27. /**
  28. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  29. **/
  30. private Session session;
  31. /**
  32. * 用户名称
  33. **/
  34. private String nickname;
  35. /**
  36. * 用来记录sessionId和该session进行绑定
  37. **/
  38. private static Map<String,Session> map = new HashMap<String, Session>();
  39. /**
  40. * 连接建立成功调用的方法
  41. */
  42. @OnOpen
  43. public void onOpen(Session session,@PathParam("nickname") String nickname) {
  44. this.session = session;
  45. this.nickname=nickname;
  46. map.put(nickname, session);
  47. socketSet.add(this);
  48. System.out.println("有新连接加入:"+nickname+",当前在线人数为" + socketSet.size());
  49. }
  50. /**
  51. * 连接关闭调用的方法
  52. */
  53. @OnClose
  54. public void onClose() {
  55. socketSet.remove(this);
  56. List<String> nickname = this.session.getRequestParameterMap().get("nickname");
  57. for(String nick:nickname) {
  58. map.remove(nick);
  59. }
  60. System.out.println("有一连接关闭!当前在线人数为" + socketSet.size());
  61. }
  62. /**
  63. * 收到客户端消息后调用的方法
  64. */
  65. @OnMessage
  66. public void onMessage(String message, Session session,@PathParam("nickname") String nickname) {
  67. System.out.println("来自客户端的消息-->"+nickname+": " + message);
  68. //将mqtt发送过来的数据返回给前端
  69. try {
  70. JSON parse = JSONUtil.parse(message);
  71. Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values");
  72. String tag1 = valuesMap.get("tag1").toString();
  73. String replace = tag1.replace("\r\n", "");
  74. String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":");
  75. String tag2 = valuesMap.get("tag2").toString();
  76. String substring1=substring+"tag2:"+tag2;
  77. String result="{"+substring1+"}";
  78. //发送给前端,用作页面渲染
  79. Session fromSession = map.get(nickname);
  80. fromSession.getAsyncRemote().sendText(result);
  81. } catch (Exception e) {
  82. e.printStackTrace();
  83. }
  84. }
  85. /**
  86. * 发生错误时调用
  87. */
  88. @OnError
  89. public void onError(Session session, Throwable error) {
  90. System.out.println("发生错误");
  91. error.printStackTrace();
  92. }
  93. /**
  94. * 群发自定义消息
  95. */
  96. // public void broadcast(String message) {
  97. // for (SocketService item : socketSet) {
  98. // /**
  99. // * 同步异步说明参考:http://blog.csdn.net/who_is_xiaoming/article/details/53287691
  100. // *
  101. // * this.session.getBasicRemote().sendText(message);
  102. // **/
  103. // item.session.getAsyncRemote().sendText(message);
  104. // }
  105. // }
  106. }

总结:

1.websocket和MQTT是通过MQTT的topic进行区分消息的消费,所以在和前端进行联调的时候,需要的nickname需要是MQTT的topic,其中消息的整理在WebSocket里面

2.在调用MqttPubController里的方法时,需要正确的topic入参,不然在MQTTX里面是接收不到数据的

补充:application.yml文件里面的url可以是多个,使用逗号进行拼接,topics也可以是多个,使用逗号进行拼接,其中#是通配符,代表test下的所有

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/200743
推荐阅读
相关标签
  

闽ICP备14008679号