赞
踩
在前面的章节内容中,我们介绍了如何使用springboot项目实现基于redis订阅发布机制实现消息的收发,同时也介绍了基于SSE机制的单通道消息推送案例,本节内容结合redis和sse实现一个常用的实战案例——站内信。实现系统消息的实时推送。
①引入项目的pom依赖,并在application.yml中配置redis连接
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
②创建一个SSE服务器,用于连接用户和收发消息
- package com.yundi.atp.server;
-
- import lombok.extern.slf4j.Slf4j;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- @Slf4j
- public class SseServer {
- /**
- * 存储用户的连接
- */
- public static Map<String, SseEmitterUTF8> sseMap = new HashMap<>();
-
- /**
- * 建立连接
- *
- * @param username
- * @throws IOException
- */
- public static SseEmitterUTF8 connect(String username) throws IOException {
- if (!sseMap.containsKey(username)) {
- //设置超时时间(和token有效期一致,超时后不再推送消息),0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
- SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
- sseEmitter.send(String.format("%s号用户,连接成功!", username));
- sseEmitter.onCompletion(() -> sseMap.remove(username));
- sseEmitter.onTimeout(() -> sseMap.remove(username));
- sseEmitter.onError(throwable -> sseMap.remove(username));
- sseMap.put(username, sseEmitter);
- return sseEmitter;
- } else {
- SseEmitterUTF8 sseEmitterUTF8 = sseMap.get(username);
- sseEmitterUTF8.send(String.format("%s,用户连接成功!", username));
- return sseEmitterUTF8;
- }
- }
-
- /**
- * 发送消息
- *
- * @param message
- */
- public static synchronized void sendMessage(String message) {
- List<String> removeList = new ArrayList<>();
- for (Map.Entry<String, SseEmitterUTF8> entry : sseMap.entrySet()) {
- String username = entry.getKey();
- try {
- SseEmitterUTF8 sseEmitterUTF8 = entry.getValue();
- sseEmitterUTF8.onCompletion(() -> sseMap.remove(username));
- sseEmitterUTF8.onTimeout(() -> sseMap.remove(username));
- sseEmitterUTF8.onError(throwable -> sseMap.remove(username));
- sseEmitterUTF8.send(message);
- } catch (IOException e) {
- //发送不成功,将该用户加入移除列表
- removeList.add(username);
- }
- }
- //移除连接异常的用户
- removeList.forEach(item -> sseMap.remove(item));
- }
- }
③创建一个redis消息的监听器,将监听到的消息通过sse服务推送给连接的用户
- package com.yundi.atp.listen;
-
- import com.yundi.atp.constant.ChannelConstant;
- import com.yundi.atp.server.SseServer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.connection.Message;
- import org.springframework.data.redis.connection.MessageListener;
- import org.springframework.data.redis.listener.ChannelTopic;
- import org.springframework.data.redis.listener.RedisMessageListenerContainer;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.nio.charset.StandardCharsets;
-
-
- @Slf4j
- @Component
- public class RedisMessageSubscriber implements MessageListener {
- @Autowired
- private RedisMessageListenerContainer redisMessageListenerContainer;
-
- /**
- * 订阅消息:将订阅者添加到指定的频道
- */
- @PostConstruct
- public void subscribeToChannel() {
- //广播消息
- redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(ChannelConstant.CHANNEL_GLOBAL_NAME));
- }
-
- @Override
- public void onMessage(Message message, byte[] bytes) {
- String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
- String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
- log.info("Received message: " + messageBody + " from channel: " + channel);
- SseServer.sendMessage(messageBody);
- }
- }
④创建SseEmitterUTF8并继承SseEmitter,重写extendResponse方法,解决中文消息发送乱码问题
- package com.yundi.atp.server;
-
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.MediaType;
- import org.springframework.http.server.ServerHttpResponse;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
- import java.nio.charset.StandardCharsets;
-
-
- public class SseEmitterUTF8 extends SseEmitter {
-
- public SseEmitterUTF8(Long timeout) {
- super(timeout);
- }
-
- @Override
- protected void extendResponse(ServerHttpResponse outputMessage) {
- super.extendResponse(outputMessage);
- HttpHeaders headers = outputMessage.getHeaders();
- headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
- }
- }
⑤ 创建redis的配置类,用于初始化redis的容器监听器和工具类
- package com.yundi.atp.config;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.listener.RedisMessageListenerContainer;
- import org.springframework.data.redis.serializer.StringRedisSerializer;
-
-
- @Configuration
- public class RedisConfig {
- /**
- * 初始化一个Redis消息监听容器
- * @param connectionFactory
- * @return
- */
- @Bean
- public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- // 添加其他配置,如线程池大小等
- return container;
- }
-
- @Bean
- public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
- RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
- redisTemplate.setConnectionFactory(connectionFactory);
- redisTemplate.setDefaultSerializer(new StringRedisSerializer());
- return redisTemplate;
- }
- }
⑦ 创建用于站内信发送的频道Channel
- package com.yundi.atp.constant;
-
-
- public class ChannelConstant {
- /**
- * 广播通道
- */
- public static final String CHANNEL_GLOBAL_NAME = "channel-global";
-
- /**
- * 单播通道
- */
- public static final String CHANNEL_SINGLE_NAME = "channel-single";
- }
⑧创建一个消息发布接口和一个sse用户消息推送连接接口
- package com.yundi.atp.controller;
-
- import com.yundi.atp.constant.ChannelConstant;
- import com.yundi.atp.server.SseEmitterUTF8;
- import com.yundi.atp.server.SseServer;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.http.MediaType;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.io.IOException;
-
-
- @RequestMapping(value = "base")
- @RestController
- public class BaseController {
- @Resource
- private RedisTemplate redisTemplate;
-
- /**
- * 发布广播消息
- *
- * @param msg
- */
- @GetMapping(value = "/publish/{msg}")
- public void sendMsg(@PathVariable(value = "msg") String msg) {
- redisTemplate.convertAndSend(ChannelConstant.CHANNEL_GLOBAL_NAME, msg);
- }
-
-
- /**
- * 接收消息
- *
- * @return
- * @throws IOException
- */
- @GetMapping(path = "/connect/{username}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public SseEmitterUTF8 connect(@PathVariable(value = "username") String username) throws IOException {
- SseEmitterUTF8 connect = SseServer.connect(username);
- return connect;
- }
- }
⑨启动服务,验证站内信功能是否可以正常使用
关于springboot使用redis的订阅发布机制结合SSE实现站内信的功能到这里就结束了,我们下期见。。。。。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。