当前位置:   article > 正文

(二十)springboot实战——springboot使用redis的订阅发布机制结合SSE实现站内信的功能_redis实现站内信

redis实现站内信

前言

在前面的章节内容中,我们介绍了如何使用springboot项目实现基于redis订阅发布机制实现消息的收发,同时也介绍了基于SSE机制的单通道消息推送案例,本节内容结合redis和sse实现一个常用的实战案例——站内信。实现系统消息的实时推送。

正文

①引入项目的pom依赖,并在application.yml中配置redis连接

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.projectlombok</groupId>
  11. <artifactId>lombok</artifactId>
  12. </dependency>

 ②创建一个SSE服务器,用于连接用户和收发消息

  1. package com.yundi.atp.server;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.io.IOException;
  4. import java.util.ArrayList;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. @Slf4j
  9. public class SseServer {
  10. /**
  11. * 存储用户的连接
  12. */
  13. public static Map<String, SseEmitterUTF8> sseMap = new HashMap<>();
  14. /**
  15. * 建立连接
  16. *
  17. * @param username
  18. * @throws IOException
  19. */
  20. public static SseEmitterUTF8 connect(String username) throws IOException {
  21. if (!sseMap.containsKey(username)) {
  22. //设置超时时间(和token有效期一致,超时后不再推送消息),0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
  23. SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
  24. sseEmitter.send(String.format("%s号用户,连接成功!", username));
  25. sseEmitter.onCompletion(() -> sseMap.remove(username));
  26. sseEmitter.onTimeout(() -> sseMap.remove(username));
  27. sseEmitter.onError(throwable -> sseMap.remove(username));
  28. sseMap.put(username, sseEmitter);
  29. return sseEmitter;
  30. } else {
  31. SseEmitterUTF8 sseEmitterUTF8 = sseMap.get(username);
  32. sseEmitterUTF8.send(String.format("%s,用户连接成功!", username));
  33. return sseEmitterUTF8;
  34. }
  35. }
  36. /**
  37. * 发送消息
  38. *
  39. * @param message
  40. */
  41. public static synchronized void sendMessage(String message) {
  42. List<String> removeList = new ArrayList<>();
  43. for (Map.Entry<String, SseEmitterUTF8> entry : sseMap.entrySet()) {
  44. String username = entry.getKey();
  45. try {
  46. SseEmitterUTF8 sseEmitterUTF8 = entry.getValue();
  47. sseEmitterUTF8.onCompletion(() -> sseMap.remove(username));
  48. sseEmitterUTF8.onTimeout(() -> sseMap.remove(username));
  49. sseEmitterUTF8.onError(throwable -> sseMap.remove(username));
  50. sseEmitterUTF8.send(message);
  51. } catch (IOException e) {
  52. //发送不成功,将该用户加入移除列表
  53. removeList.add(username);
  54. }
  55. }
  56. //移除连接异常的用户
  57. removeList.forEach(item -> sseMap.remove(item));
  58. }
  59. }

 ③创建一个redis消息的监听器,将监听到的消息通过sse服务推送给连接的用户

  1. package com.yundi.atp.listen;
  2. import com.yundi.atp.constant.ChannelConstant;
  3. import com.yundi.atp.server.SseServer;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.data.redis.connection.Message;
  7. import org.springframework.data.redis.connection.MessageListener;
  8. import org.springframework.data.redis.listener.ChannelTopic;
  9. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  10. import org.springframework.stereotype.Component;
  11. import javax.annotation.PostConstruct;
  12. import java.nio.charset.StandardCharsets;
  13. @Slf4j
  14. @Component
  15. public class RedisMessageSubscriber implements MessageListener {
  16. @Autowired
  17. private RedisMessageListenerContainer redisMessageListenerContainer;
  18. /**
  19. * 订阅消息:将订阅者添加到指定的频道
  20. */
  21. @PostConstruct
  22. public void subscribeToChannel() {
  23. //广播消息
  24. redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(ChannelConstant.CHANNEL_GLOBAL_NAME));
  25. }
  26. @Override
  27. public void onMessage(Message message, byte[] bytes) {
  28. String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
  29. String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
  30. log.info("Received message: " + messageBody + " from channel: " + channel);
  31. SseServer.sendMessage(messageBody);
  32. }
  33. }

 ④创建SseEmitterUTF8并继承SseEmitter,重写extendResponse方法,解决中文消息发送乱码问题

  1. package com.yundi.atp.server;
  2. import org.springframework.http.HttpHeaders;
  3. import org.springframework.http.MediaType;
  4. import org.springframework.http.server.ServerHttpResponse;
  5. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  6. import java.nio.charset.StandardCharsets;
  7. public class SseEmitterUTF8 extends SseEmitter {
  8. public SseEmitterUTF8(Long timeout) {
  9. super(timeout);
  10. }
  11. @Override
  12. protected void extendResponse(ServerHttpResponse outputMessage) {
  13. super.extendResponse(outputMessage);
  14. HttpHeaders headers = outputMessage.getHeaders();
  15. headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
  16. }
  17. }

⑤ 创建redis的配置类,用于初始化redis的容器监听器和工具类

  1. package com.yundi.atp.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  7. import org.springframework.data.redis.serializer.StringRedisSerializer;
  8. @Configuration
  9. public class RedisConfig {
  10. /**
  11. * 初始化一个Redis消息监听容器
  12. * @param connectionFactory
  13. * @return
  14. */
  15. @Bean
  16. public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
  17. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  18. container.setConnectionFactory(connectionFactory);
  19. // 添加其他配置,如线程池大小等
  20. return container;
  21. }
  22. @Bean
  23. public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
  24. RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
  25. redisTemplate.setConnectionFactory(connectionFactory);
  26. redisTemplate.setDefaultSerializer(new StringRedisSerializer());
  27. return redisTemplate;
  28. }
  29. }

⑦ 创建用于站内信发送的频道Channel

  1. package com.yundi.atp.constant;
  2. public class ChannelConstant {
  3. /**
  4. * 广播通道
  5. */
  6. public static final String CHANNEL_GLOBAL_NAME = "channel-global";
  7. /**
  8. * 单播通道
  9. */
  10. public static final String CHANNEL_SINGLE_NAME = "channel-single";
  11. }

 ⑧创建一个消息发布接口和一个sse用户消息推送连接接口

  1. package com.yundi.atp.controller;
  2. import com.yundi.atp.constant.ChannelConstant;
  3. import com.yundi.atp.server.SseEmitterUTF8;
  4. import com.yundi.atp.server.SseServer;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.http.MediaType;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import javax.annotation.Resource;
  12. import java.io.IOException;
  13. @RequestMapping(value = "base")
  14. @RestController
  15. public class BaseController {
  16. @Resource
  17. private RedisTemplate redisTemplate;
  18. /**
  19. * 发布广播消息
  20. *
  21. * @param msg
  22. */
  23. @GetMapping(value = "/publish/{msg}")
  24. public void sendMsg(@PathVariable(value = "msg") String msg) {
  25. redisTemplate.convertAndSend(ChannelConstant.CHANNEL_GLOBAL_NAME, msg);
  26. }
  27. /**
  28. * 接收消息
  29. *
  30. * @return
  31. * @throws IOException
  32. */
  33. @GetMapping(path = "/connect/{username}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  34. public SseEmitterUTF8 connect(@PathVariable(value = "username") String username) throws IOException {
  35. SseEmitterUTF8 connect = SseServer.connect(username);
  36. return connect;
  37. }
  38. }

 ⑨启动服务,验证站内信功能是否可以正常使用

结语

关于springboot使用redis的订阅发布机制结合SSE实现站内信的功能到这里就结束了,我们下期见。。。。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/798414
推荐阅读
相关标签
  

闽ICP备14008679号