当前位置:   article > 正文

Java Web项目系统通知模块的设计_java 信息通知 系统通知设计

java 信息通知 系统通知设计

在互联网项目中,系统通知模块是必备的。主要功能是将系统的通知消息发送给用户,在不同项目中系统通知模块的实现也多有不同,以下我来举几个例子说明,不同项目下不同系统通知功能的实现。

1.前端pull类型:在一些对消息实时性要求不高的项目中,例如toB项目,系统通知中不涉及私信等用户间通信功能,主要是系统端向用户推送数据,此时对消息的实时性的要求并不高,我们可以让前端设置一个定时任务,按时向服务端发送请求来拉取消息通知。这样对于系统设计是最为简单的,当然占用资源也是最大的。

2.后端push类型,对于一些对于消息实时性要求搞的项目中,类似于IM场景,我们可以通过类似webSocket的双向通信协议,由后端向前端主动推送消息,使得消息实时性高,不会过多占用网络IO资源。当然这样对与系统的设计会出现更多挑战。

在Love中我使用的是pull+hold综合实现系统通知功能。主要借鉴WebQQ中系统通知的设计。前端像后端发起拉取通知的请求。后端不立即返回结果,而是将这个请求hold住,在此期间有消息推送的时候就会异步的将结果返回,这样能极大的减少网络IO资源的开销。这个方案中我使用了Spring提供的ReferredResult类,将请求阻塞在服务端中,设置超时返回并异步监听通知队列中的消息。

下面我来具体展示代码

  1. 拉取消息Controller层

new出一个DeferredResult对象,设置超时时间后返回暂无消息通知的response,将这个DeferredResult缓存到本地。

  1. @Override
  2. public DeferredResult<Result<?>> pullNotify(Long userId, PullNotifyRequest pullNotifyRequest, Long timeOut) {
  3. //使用DeferredResult来执行异步操作
  4. DeferredResult<Result<?>> deferredResult = new DeferredResult<>(timeOut);
  5. //超时情况下
  6. deferredResult.onTimeout(()->{
  7. //从MAP中删除
  8. userNotifyDeferredCache.remove(userId);
  9. Result result = new Result();
  10. result.setCode(206);
  11. result.setMsg("暂无新消息通知");
  12. deferredResult.setResult(result);
  13. });
  14. userNotifyDeferredCache.save(userId,deferredResult::setResult);
  15. return deferredResult;
  16. }
  1. 发送系统消息类

这个类主要就是将消息发送到redis消息队列中。//TODO:使用Redis Stream消息队列来代替。在这里我简单的使用Redis List数据结构来做消息队列,使用一个监听器来监听这个消息队列

  1. @Component
  2. @RequiredArgsConstructor
  3. @AutoConfiguration
  4. public class UserNotifyProduction {
  5. private final StringRedisTemplate redisTemplate;
  6. /**
  7. * 系统通知消息生产者
  8. * @param noticeDTO
  9. */
  10. public void sendNotifyMessage(NoticeDTO noticeDTO){
  11. //判断一下目标用户是不是在线
  12. // Long userId = noticeDTO.getUserId();
  13. // if (userNotifyDeferredCache.isExist(userId)){
  14. redisTemplate.opsForList().rightPush(UserNotifyConstants.USER_NOTIFY_QUEUE, JSON.toJSONString(noticeDTO));
  15. // }else {
  16. // //存入数据库中
  17. // redisTemplate.opsForList().rightPush(UserNotifyConstants.USER_NOTIFY_TO_DB_QUEUE,JSON.toJSONString(noticeDTO));
  18. // }
  19. }
  20. }
  1. 系统通知消息队列:系统每100ms到消息队列中拉取一次消息,得到消息后判断消息userId在DeferredResult的缓存中是否可以取到,如果可以取到则说明是在线用户可以直接将消息返回。如果取不到则存入数据库中,等待用户下次登入后则一并拉取所有目标通知数据。
  1. package blossom.project.towelove.server.redisMQ;
  2. import blossom.project.towelove.common.entity.notification.NoticeDTO;
  3. import blossom.project.towelove.framework.redis.core.UserNotifyConstants;
  4. import blossom.project.towelove.server.entity.Notice;
  5. import blossom.project.towelove.server.mapper.NoticeMapper;
  6. import blossom.project.towelove.server.service.impl.NotificationServiceImpl;
  7. import com.alibaba.fastjson2.JSON;
  8. import lombok.Getter;
  9. import lombok.RequiredArgsConstructor;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.beans.factory.InitializingBean;
  12. import org.springframework.data.redis.core.StringRedisTemplate;
  13. import org.springframework.stereotype.Component;
  14. import java.util.Objects;
  15. import java.util.concurrent.*;
  16. /**
  17. * @projectName: Towelove
  18. * @package: blossom.project.towelove.server.redisMQ
  19. * @className: RedisListener
  20. * @author: Link Ji
  21. * @description: GOGO
  22. * @date: 2024/1/19 18:38
  23. * @version: 1.0
  24. */
  25. @Component
  26. @RequiredArgsConstructor
  27. @Slf4j
  28. public class RedisListener implements InitializingBean {
  29. @Getter
  30. private final StringRedisTemplate redisTemplate;
  31. private final NotificationServiceImpl notificationService;
  32. private final NoticeMapper noticeMapper;
  33. private final UserNotifyDeferredCache userNotifyDeferredCache;
  34. ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> new Thread(r,"notifyListener"));
  35. @Override
  36. public void afterPropertiesSet() {
  37. /**
  38. * 100毫秒到队列中读取一次
  39. */
  40. scheduler.scheduleAtFixedRate(() ->{
  41. String message = redisTemplate.opsForList().leftPop(UserNotifyConstants.USER_NOTIFY_QUEUE);
  42. NoticeDTO noticeDTO = JSON.parseObject(message, NoticeDTO.class);
  43. if (Objects.isNull(noticeDTO)) return;
  44. if (userNotifyDeferredCache.isExist(noticeDTO.getUserId())) {
  45. log.info("用户:{}在线收到一条通知消息:{}",noticeDTO.getUserId(),noticeDTO.getContent());
  46. notificationService.setDeferredResult(noticeDTO.getUserId(), noticeDTO.getContent());
  47. }else{
  48. log.info("用户:{}离线收到一条通知消息:{}",noticeDTO.getUserId(),noticeDTO.getContent());
  49. //存入DB中
  50. Notice notice = new Notice();
  51. notice.setType(1);
  52. notice.setUserId(noticeDTO.getUserId());
  53. notice.setContent(noticeDTO.getContent());
  54. noticeMapper.insert(notice);
  55. }
  56. },5,100, TimeUnit.MILLISECONDS);
  57. // scheduler.scheduleAtFixedRate(() ->{
  58. // String message = redisTemplate.opsForList().leftPop(UserNotifyConstants.USER_NOTIFY_TO_DB_QUEUE);
  59. // NoticeDTO noticeDTO = JSON.parseObject(message, NoticeDTO.class);
  60. // if (Objects.nonNull(noticeDTO)){
  61. // log.info("用户:{}离线收到一条通知消息:{}",noticeDTO.getUserId(),noticeDTO.getContent());
  62. // //存入DB中
  63. // Notice notice = new Notice();
  64. // notice.setType(1);
  65. // notice.setUserId(noticeDTO.getUserId());
  66. // notice.setContent(noticeDTO.getContent());
  67. // noticeMapper.insert(notice);
  68. }
  69. // },5,100,TimeUnit.MILLISECONDS);
  70. log.info("notifyListener start successfully");
  71. }
  72. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/849443
推荐阅读
相关标签
  

闽ICP备14008679号