当前位置:   article > 正文

Websocket Stomp+RabbitMQ实现消息推送_stomp websocket rabbitmq

stomp websocket rabbitmq

系列导读:

1.springboot+websocket构建在线聊天室(群聊+单聊)

2.Spring Boot WebSocket:单聊(实现思路)

3.Websocket Stomp+RabbitMQ实现消息推送

目录

1、技术栈:

2、依赖:

3、修改配置文件

4、RabbitConfig 

5、消息包装类

6、利用STOMP实现前后端长连接

7、编写前端页面:

8、编写消息生产者和消费者

9、测试


1、技术栈:

后端:springboot2.0.6

前端:html  js

2、依赖:

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.0.6.RELEASE</version>
  5. <relativePath/> <!-- lookup parent from repository -->
  6. </parent>
  7. <groupId>com.zj</groupId>
  8. <artifactId>online-test</artifactId>
  9. <version>0.0.1-SNAPSHOT</version>
  10. <name>online-test</name>
  11. <description>Demo project for Spring Boot</description>
  12. <properties>
  13. <java.version>1.8</java.version>
  14. </properties>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-web</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>mysql</groupId>
  22. <artifactId>mysql-connector-java</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-data-jpa</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>commons-lang</groupId>
  30. <artifactId>commons-lang</artifactId>
  31. <version>2.6</version>
  32. </dependency>
  33. <!--websocket 相关依赖-->
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-websocket</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.webjars</groupId>
  40. <artifactId>webjars-locator-core</artifactId>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.webjars</groupId>
  44. <artifactId>sockjs-client</artifactId>
  45. <version>1.0.2</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.webjars</groupId>
  49. <artifactId>stomp-websocket</artifactId>
  50. <version>2.3.3</version>
  51. </dependency>
  52. <!--rabbitmq-->
  53. <dependency>
  54. <groupId>org.springframework.boot</groupId>
  55. <artifactId>spring-boot-starter-amqp</artifactId>
  56. </dependency>

3、修改配置文件

#rabbitmq

spring.rabbitmq.host=192.168.XXX

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

 本文省略RabbitMq的安装,这边将你安装的rabbitmq的相关信息填入即可

参考安装地址: docker 安装 RabbitMq

几分钟就搞定安装

4、RabbitConfig 

先创一个rabbitmq 的配置类,由于我们这边业务逻辑比较简单,就简单使用rabbitmq一下

这边创一个hello的消息队列。

  1. @Configuration
  2. public class RabbitConfig {
  3. @Bean
  4. public Queue helloQueue() {
  5. return new Queue("hello");
  6. }
  7. }

5、消息包装类

先讲一下消息包装类,前后端的消息都以这种格式来传递,大家可以根据自己的需求自定义。

  1. public class RequestMessage {
  2. private String room;//频道号
  3. private String type;//消息类型('1':客户端到服务端 '2':客户端到服务端)
  4. private String content;//消息内容(即答案)
  5. private String userId;//用户id
  6. private String questionId;//题目id
  7. private String createTime;//时间
  8. public RequestMessage() {
  9. }
  10. public RequestMessage(String room, String type, String content, String userId, String questionId, String createTime) {
  11. this.room = room;
  12. this.type = type;
  13. this.content = content;
  14. this.userId = userId;
  15. this.questionId = questionId;
  16. this.createTime = createTime;
  17. }
  18. public String getRoom() {
  19. return room;
  20. }
  21. public String getType() {
  22. return type;
  23. }
  24. public String getContent() {
  25. return content;
  26. }
  27. public void setRoom(String room) {
  28. this.room = room;
  29. }
  30. public String getUserId() {
  31. return userId;
  32. }
  33. public void setUserId(String userId) {
  34. this.userId = userId;
  35. }
  36. public String getQuestionId() {
  37. return questionId;
  38. }
  39. public void setQuestionId(String questionId) {
  40. this.questionId = questionId;
  41. }
  42. public String getCreateTime() {
  43. return createTime;
  44. }
  45. public void setCreateTime(String createTime) {
  46. this.createTime = createTime;
  47. }
  48. public void setType(String type) {
  49. this.type = type;
  50. }
  51. public void setContent(String content) {
  52. this.content = content;
  53. }
  54. }

6、利用STOMP实现前后端长连接

这部分与我之前的文章类似:SpringBoot+STOMP 实现聊天室(单聊+多聊)

本文这边 省略了消息群发

首先编写websocket配置类:

  1. @Configuration
  2. @EnableWebSocketMessageBroker
  3. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  4. @Override
  5. public void configureMessageBroker(MessageBrokerRegistry config) {
  6. /**
  7. * 订阅来自"/topic""/user"的消息,
  8. * /topic 单聊
  9. * /all 群聊
  10. */
  11. config.enableSimpleBroker("/topic","/all");
  12. /**
  13. * 客户端发送过来的消息,需要以"/app"为前缀,再经过Broker转发给响应的Controller
  14. */
  15. config.setApplicationDestinationPrefixes("/app");
  16. }
  17. @Override
  18. public void registerStompEndpoints(StompEndpointRegistry registry) {
  19. /**
  20. * 路径"/websocket"被注册为STOMP端点,对外暴露,客户端通过该路径接入WebSocket服务
  21. */
  22. registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
  23. }
  24. }

然后编写消息转发器(前端的消息通过这边,进行转发)

  1. @RestController
  2. public class WebSocketTestController {
  3. @Autowired
  4. private SimpMessagingTemplate messagingTemplate;
  5. @Autowired
  6. Sender senderMQ;
  7. /**聊天室(单聊+多聊)&&消息转发
  8. * @param requestMessage
  9. * @throws Exception
  10. */
  11. @CrossOrigin
  12. @MessageMapping("/chat")
  13. public void messageHandling(RequestMessage requestMessage) throws Exception {
  14. String destination = "/topic/" + HtmlUtils.htmlEscape(requestMessage.getRoom());
  15. String room = HtmlUtils.htmlEscape(requestMessage.getRoom());//htmlEscape 转换为HTML转义字符表示
  16. String type = HtmlUtils.htmlEscape(requestMessage.getType());
  17. String content = HtmlUtils.htmlEscape(requestMessage.getContent());
  18. String userId = HtmlUtils.htmlEscape(requestMessage.getUserId());
  19. String questionId = HtmlUtils.htmlEscape(requestMessage.getQuestionId());
  20. String createTime = HtmlUtils.htmlEscape(requestMessage.getCreateTime());
  21. System.out.println( requestMessage.getRoom() );
  22. System.out.println( content );
  23. messagingTemplate.convertAndSend(destination, requestMessage);
  24. }
  25. }

注意:

1、使用@MessageMapping注解来标识所有发送到“/chat”这个destination的消息,都会被路由到这个方法进行处理

2、使用@SendTo注解来标识这个方法返回的结果,都会被发送到它指定的destination,“/topic” 

3、传入的参数RequestMessage requestMessage为客户端发送过来的消息,是自动绑定的。

将前端传过来的消息解析,然后再通过messagingTemplate.convertAndSend(“目的地”,“消息内容”)转发出去。

7、编写前端页面:

这边的前端页面仅仅用了html+js

  1. <!DOCTYPE HTML>
  2. <html>
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>My WebSocket</title>
  6. <script src="js/sockjs.min.js"></script>
  7. <script src="js/jquery.min.js"></script>
  8. <script src="js/stomp.min.js"></script>
  9. <!--<script type="text/javascript"></script>-->
  10. <style>
  11. #message22{
  12. margin-top:40px;
  13. border:1px solid gray;
  14. padding:20px;
  15. }
  16. </style>
  17. <style>
  18. #message{
  19. margin-top:40px;
  20. border:1px solid gray;
  21. padding:20px;
  22. }
  23. </style>
  24. </head>
  25. <body>
  26. 频道号:<input id="room" type="text"/>
  27. <button onclick="conectWebSocket()">连接WebSocket</button>
  28. <button onclick="disconnect()">断开连接</button>
  29. <hr />
  30. <div id="message22"></div>
  31. <br />
  32. 做题区:<input id="text" type="text" />
  33. <!-- 频道号:<input id="toUser" type="text" /> -->
  34. <button onclick="sendMessage()">发送消息</button>
  35. <div id="message"></div>
  36. </body>
  37. <script type="text/javascript">
  38. var stompClient;
  39. var serverUrl = "http://localhost:8080/websocket";
  40. var room;//频道号
  41. var websocket = null;
  42. //websocket连接
  43. function conectWebSocket(){
  44. this.room = document.getElementById('room').value;//频道号
  45. console.log(this.room);
  46. console.log(this.serverUrl);
  47. var socket = new SockJS(this.serverUrl);
  48. this.stompClient = Stomp.over(socket);
  49. var that = this;
  50. this.stompClient.connect({}, function (frame) {
  51. that.stompClient.subscribe('/topic/'+that.room ,function(txt) {
  52. // console.log("websocket连接成功");
  53. // console.log(txt);
  54. document.getElementById('message').innerHTML += JSON.parse(txt.body)['content']+ '<br/>';
  55. // const sender = JSON.parse(message.body)['sender'];
  56. const language = JSON.parse(message.body)['language'];
  57. const content = JSON.parse(message.body)['content'];
  58. const type = JSON.parse(message.body)['type'];
  59. });
  60. });
  61. }
  62. //发送消息
  63. function sendMessage() {
  64. //获取输入的文本信息进行发送
  65. var message = document.getElementById('text').value;
  66. // var room = document.getElementById('toUser').value;
  67. var socketMsg = {msg:message,toUser:room};
  68. var that = this
  69. this.stompClient.send(
  70. '/app/chat',
  71. {},
  72. JSON.stringify({
  73. 'room': that.room,
  74. 'type': "1",//1,2
  75. 'content': message,
  76. 'userId':"566072523",//小明
  77. 'questionId':"222299023",//题目1
  78. 'createTime':"",
  79. })
  80. );
  81. }
  82. function disconnect() {
  83. //断开连接的方法
  84. if (this.stompClient !== undefined) {
  85. this.stompClient.disconnect();
  86. alert("Disconnected");
  87. }else{
  88. alert("当前没有连接websocket")
  89. }
  90. this.stompClient = undefined;
  91. }
  92. </script>
  93. </html>

页面都很简单,就不细讲了,但要引入sockjs.min.js   jquery.min.js   stomp.min.js这3个js

这样就前后端长连接就完成了,赶紧来试试~

 可以看出长连接成功

  下面我们要来讲讲 消息推送如何实现,当我们前端发一个http请求,在这个请求结束前,后端会新开一个线程。当原本的请求结束时,后端新开的那个线程还在运行,当它结束时,后端应该要返回消息给前端,但之前的请求已经结束,http握手已经早结束了,消息怎么传递。所以我们需要用到rabbitmq(用来消息的解耦)  websocket(用来保证前后端通信通道不关闭),来让后端主动推送的消息,能到前端页面显示。

8、编写消息生产者和消费者

  现在我们讲讲 rabbitmq 消息2个重要的组成部分:消息生产者和消息消费者

sender:

  1. @Component
  2. public class Sender {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5. public void send(String context) { //注意因为是AmqpTemplate,所有这里只接受String,byte[],Seriz..
  6. System.out.println("Sender : " + context);
  7. this.rabbitTemplate.convertAndSend("hello", context);
  8. }
  9. }

这里注意一下,发送消息采用了AmqpTemplate 模板 。AmqpTemplate接口已经定义了发送和接收消息的基本操作。我们直接使用即可,但要注意的是 必须符合它的类型,这里只支持   String,byte[],Seriz..类型的消息

convertAndSend("hello", context);      hello就是我们前面创的消息队列,context是消息的内容

但我们有时候消息特别复杂,一般用对象来储存消息。这个怎么办呢?,这边我们先不讲,到我们下面再讲。我们继续来讲消息消费者

receiver:

  1. @Component
  2. @RabbitListener(queues = "hello")
  3. public class Receiver {
  4. @Autowired
  5. private SimpMessagingTemplate messagingTemplate;
  6. @RabbitHandler
  7. public void process(String context) throws IOException {
  8. System.out.println("Receiver : " + context);
  9. RequestMessage mqTask = new RequestMessage( );
  10. BeanUtils.copyProperties( JsonUtils.jsonToObject( context,RequestMessage.class ),mqTask );
  11. if (Objects.equals( mqTask.getType(), "2" )) {
  12. String destination = "/topic/" +mqTask.getRoom();
  13. messagingTemplate.convertAndSend( destination, mqTask);
  14. }
  15. }
  16. }

这里注意:我们前面说过,由于消息一般比较复杂,所以发送过来的消息一般是对象类型,但AmqpTemplate不支持,所以我们需要在消息生产者这边把它先转化成String型,传到消息消费者这边,再转化成对象。然后对消息进行处理。

最后通过            

messagingTemplate.convertAndSend( destination, mqTask);

将消息发送到前端页面上。

我们这边用到的工具类:

  1. public class JsonUtils {
  2. private static final ObjectMapper MAPPER = new ObjectMapper();
  3. /**
  4. * 对象-->Json字符串
  5. * @version 创建时间:2018年4月17日 下午3:39:35
  6. */
  7. public static String objectToJson(Object data) {
  8. try {
  9. return MAPPER.writeValueAsString(data);
  10. } catch (JsonProcessingException e) {
  11. e.printStackTrace();
  12. }
  13. return null;
  14. }
  15. /**
  16. * Json字符串-->对象
  17. * @version 创建时间:2018年4月17日 下午3:39:45
  18. */
  19. public static <T> T jsonToObject(String jsonData, Class<T> beanType) {
  20. try {
  21. return MAPPER.readValue(jsonData, beanType);
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. return null;
  26. }
  27. /**
  28. * Json字符串--> List<对象>
  29. * @version 创建时间:2018年4月17日 下午3:40:09
  30. */
  31. public static <T> List<T> jsonToList(String jsonData, Class<T> beanType) {
  32. JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
  33. try {
  34. return MAPPER.readValue(jsonData, javaType);
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. return null;
  39. }
  40. public static <T> Map<String, T> jsonToMap(String jsonData) {
  41. ObjectMapper mapper = new ObjectMapper();
  42. try {
  43. return mapper.readValue(jsonData, Map.class);
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. }
  47. return null;
  48. }
  49. public static String mapToJson(Map map) {
  50. try {
  51. return MAPPER.writeValueAsString(map);
  52. } catch (JsonProcessingException e) {
  53. e.printStackTrace();
  54. }
  55. return null;
  56. }
  57. public static <T> Set<T> jsonToSet(String jsonData) {
  58. ObjectMapper mapper = new ObjectMapper();
  59. try {
  60. return mapper.readValue(jsonData, Set.class);
  61. } catch (IOException e) {
  62. e.printStackTrace();
  63. }
  64. return null;
  65. }
  66. public static String setToJson(Set set) {
  67. try {
  68. return MAPPER.writeValueAsString(set);
  69. } catch (JsonProcessingException e) {
  70. e.printStackTrace();
  71. }
  72. return null;
  73. }
  74. }

9、测试

我们先定义一个sevice方法,来实现异步。

  1. public interface CommonService {
  2. void testAsync();
  3. }
  1. @Service
  2. public class CommonServiceImpl implements CommonService {
  3. @Autowired
  4. Sender sender;
  5. @Async
  6. @Override
  7. public void testAsync() {
  8. RequestMessage mqTask = new RequestMessage( );
  9. for(int i=0;i<6;i++) {
  10. mqTask.setRoom( "123");
  11. mqTask.setUserId("000");
  12. mqTask.setType( "2" );
  13. mqTask.setQuestionId( "0000");
  14. mqTask.setCreateTime( "0000");
  15. mqTask.setContent("this:"+i);
  16. sender.send( JsonUtils.objectToJson( mqTask ) );
  17. try {
  18. Thread.sleep( 1000 );
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }

 这里实现异步,为了简单就只用了@Async

再写一个测试controller

  1. @RestController
  2. public class TestController {
  3. @Autowired
  4. CommonService commonService;
  5. @GetMapping("/test/testAsync")
  6. public String testAsync() {
  7. commonService.testAsync();
  8. return "http请求已结束";
  9. }
  10. }

 打开浏览器,先连接websocket,然后再开一个窗口,输入请求

localhost:8080/test/testAsync

发现如下:

 表明成功,本文暂且就到这了 = - =

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号