赞
踩
系列导读:
1.springboot+websocket构建在线聊天室(群聊+单聊)
2.Spring Boot WebSocket:单聊(实现思路)
3.Websocket Stomp+RabbitMQ实现消息推送
目录
后端:springboot2.0.6
前端:html js
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.0.6.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.zj</groupId>
- <artifactId>online-test</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>online-test</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.6</version>
- </dependency>
- <!--websocket 相关依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- <dependency>
- <groupId>org.webjars</groupId>
- <artifactId>webjars-locator-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.webjars</groupId>
- <artifactId>sockjs-client</artifactId>
- <version>1.0.2</version>
- </dependency>
- <dependency>
- <groupId>org.webjars</groupId>
- <artifactId>stomp-websocket</artifactId>
- <version>2.3.3</version>
- </dependency>
-
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
spring.rabbitmq.host=192.168.XXX
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
本文省略RabbitMq的安装,这边将你安装的rabbitmq的相关信息填入即可
参考安装地址: docker 安装 RabbitMq
几分钟就搞定安装
先创一个rabbitmq 的配置类,由于我们这边业务逻辑比较简单,就简单使用rabbitmq一下
这边创一个hello的消息队列。
- @Configuration
- public class RabbitConfig {
-
- @Bean
- public Queue helloQueue() {
- return new Queue("hello");
- }
-
- }
先讲一下消息包装类,前后端的消息都以这种格式来传递,大家可以根据自己的需求自定义。
- public class RequestMessage {
-
- private String room;//频道号
- private String type;//消息类型('1':客户端到服务端 '2':客户端到服务端)
- private String content;//消息内容(即答案)
- private String userId;//用户id
- private String questionId;//题目id
- private String createTime;//时间
-
- public RequestMessage() {
- }
-
- public RequestMessage(String room, String type, String content, String userId, String questionId, String createTime) {
- this.room = room;
- this.type = type;
- this.content = content;
- this.userId = userId;
- this.questionId = questionId;
- this.createTime = createTime;
- }
-
- public String getRoom() {
- return room;
- }
-
- public String getType() {
- return type;
- }
-
- public String getContent() {
- return content;
- }
-
- public void setRoom(String room) {
- this.room = room;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public String getQuestionId() {
- return questionId;
- }
-
- public void setQuestionId(String questionId) {
- this.questionId = questionId;
- }
-
- public String getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(String createTime) {
- this.createTime = createTime;
- }
-
-
- public void setType(String type) {
- this.type = type;
- }
-
- public void setContent(String content) {
- this.content = content;
- }
- }
这部分与我之前的文章类似:SpringBoot+STOMP 实现聊天室(单聊+多聊)
本文这边 省略了消息群发
首先编写websocket配置类:
- @Configuration
- @EnableWebSocketMessageBroker
- public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
-
- @Override
- public void configureMessageBroker(MessageBrokerRegistry config) {
- /**
- * 订阅来自"/topic"和"/user"的消息,
- * /topic 单聊
- * /all 群聊
- */
- config.enableSimpleBroker("/topic","/all");
-
- /**
- * 客户端发送过来的消息,需要以"/app"为前缀,再经过Broker转发给响应的Controller
- */
- config.setApplicationDestinationPrefixes("/app");
- }
-
- @Override
- public void registerStompEndpoints(StompEndpointRegistry registry) {
-
- /**
- * 路径"/websocket"被注册为STOMP端点,对外暴露,客户端通过该路径接入WebSocket服务
- */
- registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
- }
-
-
- }
然后编写消息转发器(前端的消息通过这边,进行转发)
- @RestController
- public class WebSocketTestController {
-
- @Autowired
- private SimpMessagingTemplate messagingTemplate;
-
- @Autowired
- Sender senderMQ;
-
- /**聊天室(单聊+多聊)&&消息转发
- * @param requestMessage
- * @throws Exception
- */
- @CrossOrigin
- @MessageMapping("/chat")
- public void messageHandling(RequestMessage requestMessage) throws Exception {
- String destination = "/topic/" + HtmlUtils.htmlEscape(requestMessage.getRoom());
-
- String room = HtmlUtils.htmlEscape(requestMessage.getRoom());//htmlEscape 转换为HTML转义字符表示
- String type = HtmlUtils.htmlEscape(requestMessage.getType());
- String content = HtmlUtils.htmlEscape(requestMessage.getContent());
- String userId = HtmlUtils.htmlEscape(requestMessage.getUserId());
- String questionId = HtmlUtils.htmlEscape(requestMessage.getQuestionId());
- String createTime = HtmlUtils.htmlEscape(requestMessage.getCreateTime());
-
- System.out.println( requestMessage.getRoom() );
- System.out.println( content );
-
-
-
-
- messagingTemplate.convertAndSend(destination, requestMessage);
- }
-
- }
注意:
1、使用@MessageMapping注解来标识所有发送到“/chat”这个destination的消息,都会被路由到这个方法进行处理
2、使用@SendTo注解来标识这个方法返回的结果,都会被发送到它指定的destination,“/topic”
3、传入的参数RequestMessage requestMessage为客户端发送过来的消息,是自动绑定的。
将前端传过来的消息解析,然后再通过messagingTemplate.convertAndSend(“目的地”,“消息内容”)转发出去。
这边的前端页面仅仅用了html+js
- <!DOCTYPE HTML>
- <html>
- <head>
- <meta charset="UTF-8">
- <title>My WebSocket</title>
-
- <script src="js/sockjs.min.js"></script>
- <script src="js/jquery.min.js"></script>
- <script src="js/stomp.min.js"></script>
- <!--<script type="text/javascript"></script>-->
-
- <style>
- #message22{
- margin-top:40px;
- border:1px solid gray;
- padding:20px;
- }
- </style>
-
- <style>
- #message{
- margin-top:40px;
- border:1px solid gray;
- padding:20px;
- }
- </style>
-
- </head>
-
- <body>
-
- 频道号:<input id="room" type="text"/>
- <button onclick="conectWebSocket()">连接WebSocket</button>
- <button onclick="disconnect()">断开连接</button>
- <hr />
-
- <div id="message22"></div>
-
-
- <br />
- 做题区:<input id="text" type="text" />
- <!-- 频道号:<input id="toUser" type="text" /> -->
- <button onclick="sendMessage()">发送消息</button>
- <div id="message"></div>
- </body>
-
- <script type="text/javascript">
- var stompClient;
-
- var serverUrl = "http://localhost:8080/websocket";
-
- var room;//频道号
-
-
- var websocket = null;
-
- //websocket连接
- function conectWebSocket(){
-
- this.room = document.getElementById('room').value;//频道号
-
- console.log(this.room);
-
- console.log(this.serverUrl);
-
- var socket = new SockJS(this.serverUrl);
- this.stompClient = Stomp.over(socket);
- var that = this;
- this.stompClient.connect({}, function (frame) {
- that.stompClient.subscribe('/topic/'+that.room ,function(txt) {
- // console.log("websocket连接成功");
- // console.log(txt);
- document.getElementById('message').innerHTML += JSON.parse(txt.body)['content']+ '<br/>';
- // const sender = JSON.parse(message.body)['sender'];
- const language = JSON.parse(message.body)['language'];
- const content = JSON.parse(message.body)['content'];
- const type = JSON.parse(message.body)['type'];
- });
- });
-
-
- }
-
- //发送消息
- function sendMessage() {
- //获取输入的文本信息进行发送
- var message = document.getElementById('text').value;
- // var room = document.getElementById('toUser').value;
- var socketMsg = {msg:message,toUser:room};
- var that = this
- this.stompClient.send(
- '/app/chat',
- {},
- JSON.stringify({
- 'room': that.room,
- 'type': "1",//1,2
- 'content': message,
- 'userId':"566072523",//小明
- 'questionId':"222299023",//题目1
- 'createTime':"",
- })
- );
-
- }
-
-
- function disconnect() {
-
- //断开连接的方法
- if (this.stompClient !== undefined) {
-
- this.stompClient.disconnect();
- alert("Disconnected");
- }else{
- alert("当前没有连接websocket")
- }
- this.stompClient = undefined;
-
- }
-
- </script>
- </html>
页面都很简单,就不细讲了,但要引入sockjs.min.js jquery.min.js stomp.min.js这3个js
这样就前后端长连接就完成了,赶紧来试试~
可以看出长连接成功
下面我们要来讲讲 消息推送如何实现,当我们前端发一个http请求,在这个请求结束前,后端会新开一个线程。当原本的请求结束时,后端新开的那个线程还在运行,当它结束时,后端应该要返回消息给前端,但之前的请求已经结束,http握手已经早结束了,消息怎么传递。所以我们需要用到rabbitmq(用来消息的解耦) websocket(用来保证前后端通信通道不关闭),来让后端主动推送的消息,能到前端页面显示。
现在我们讲讲 rabbitmq 消息2个重要的组成部分:消息生产者和消息消费者
sender:
- @Component
- public class Sender {
-
- @Autowired
- private AmqpTemplate rabbitTemplate;
-
- public void send(String context) { //注意因为是AmqpTemplate,所有这里只接受String,byte[],Seriz..
- System.out.println("Sender : " + context);
- this.rabbitTemplate.convertAndSend("hello", context);
- }
-
- }
这里注意一下,发送消息采用了AmqpTemplate 模板 。AmqpTemplate接口已经定义了发送和接收消息的基本操作。我们直接使用即可,但要注意的是 必须符合它的类型,这里只支持 String,byte[],Seriz..类型的消息
convertAndSend("hello", context); hello就是我们前面创的消息队列,context是消息的内容
但我们有时候消息特别复杂,一般用对象来储存消息。这个怎么办呢?,这边我们先不讲,到我们下面再讲。我们继续来讲消息消费者
receiver:
- @Component
- @RabbitListener(queues = "hello")
- public class Receiver {
-
-
- @Autowired
- private SimpMessagingTemplate messagingTemplate;
-
-
- @RabbitHandler
- public void process(String context) throws IOException {
- System.out.println("Receiver : " + context);
-
- RequestMessage mqTask = new RequestMessage( );
- BeanUtils.copyProperties( JsonUtils.jsonToObject( context,RequestMessage.class ),mqTask );
-
- if (Objects.equals( mqTask.getType(), "2" )) {
- String destination = "/topic/" +mqTask.getRoom();
-
- messagingTemplate.convertAndSend( destination, mqTask);
- }
-
- }
-
- }
这里注意:我们前面说过,由于消息一般比较复杂,所以发送过来的消息一般是对象类型,但AmqpTemplate不支持,所以我们需要在消息生产者这边把它先转化成String型,传到消息消费者这边,再转化成对象。然后对消息进行处理。
最后通过
messagingTemplate.convertAndSend( destination, mqTask);
将消息发送到前端页面上。
我们这边用到的工具类:
- public class JsonUtils {
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- /**
- * 对象-->Json字符串
- * @version 创建时间:2018年4月17日 下午3:39:35
- */
- public static String objectToJson(Object data) {
- try {
- return MAPPER.writeValueAsString(data);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * Json字符串-->对象
- * @version 创建时间:2018年4月17日 下午3:39:45
- */
- public static <T> T jsonToObject(String jsonData, Class<T> beanType) {
- try {
- return MAPPER.readValue(jsonData, beanType);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * Json字符串--> List<对象>
- * @version 创建时间:2018年4月17日 下午3:40:09
- */
- public static <T> List<T> jsonToList(String jsonData, Class<T> beanType) {
- JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
- try {
- return MAPPER.readValue(jsonData, javaType);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return null;
- }
-
- public static <T> Map<String, T> jsonToMap(String jsonData) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.readValue(jsonData, Map.class);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public static String mapToJson(Map map) {
- try {
- return MAPPER.writeValueAsString(map);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public static <T> Set<T> jsonToSet(String jsonData) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.readValue(jsonData, Set.class);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public static String setToJson(Set set) {
- try {
- return MAPPER.writeValueAsString(set);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- return null;
- }
- }
我们先定义一个sevice方法,来实现异步。
- public interface CommonService {
-
- void testAsync();
-
- }
- @Service
- public class CommonServiceImpl implements CommonService {
-
-
- @Autowired
- Sender sender;
-
-
- @Async
- @Override
- public void testAsync() {
-
- RequestMessage mqTask = new RequestMessage( );
- for(int i=0;i<6;i++) {
- mqTask.setRoom( "123");
- mqTask.setUserId("000");
- mqTask.setType( "2" );
- mqTask.setQuestionId( "0000");
- mqTask.setCreateTime( "0000");
- mqTask.setContent("this:"+i);
-
- sender.send( JsonUtils.objectToJson( mqTask ) );
- try {
- Thread.sleep( 1000 );
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
-
-
- }
这里实现异步,为了简单就只用了@Async
再写一个测试controller
- @RestController
- public class TestController {
-
- @Autowired
- CommonService commonService;
-
-
- @GetMapping("/test/testAsync")
- public String testAsync() {
- commonService.testAsync();
- return "http请求已结束";
- }
-
- }
打开浏览器,先连接websocket,然后再开一个窗口,输入请求
localhost:8080/test/testAsync
发现如下:
表明成功,本文暂且就到这了 = - =
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。