赞
踩
在互联网项目中,系统通知模块是必备的。主要功能是将系统的通知消息发送给用户,在不同项目中系统通知模块的实现也多有不同,以下我来举几个例子说明,不同项目下不同系统通知功能的实现。
1.前端pull类型:在一些对消息实时性要求不高的项目中,例如toB项目,系统通知中不涉及私信等用户间通信功能,主要是系统端向用户推送数据,此时对消息的实时性的要求并不高,我们可以让前端设置一个定时任务,按时向服务端发送请求来拉取消息通知。这样对于系统设计是最为简单的,当然占用资源也是最大的。
2.后端push类型,对于一些对于消息实时性要求搞的项目中,类似于IM场景,我们可以通过类似webSocket的双向通信协议,由后端向前端主动推送消息,使得消息实时性高,不会过多占用网络IO资源。当然这样对与系统的设计会出现更多挑战。
在Love中我使用的是pull+hold综合实现系统通知功能。主要借鉴WebQQ中系统通知的设计。前端像后端发起拉取通知的请求。后端不立即返回结果,而是将这个请求hold住,在此期间有消息推送的时候就会异步的将结果返回,这样能极大的减少网络IO资源的开销。这个方案中我使用了Spring提供的ReferredResult类,将请求阻塞在服务端中,设置超时返回并异步监听通知队列中的消息。
下面我来具体展示代码
new出一个DeferredResult对象,设置超时时间后返回暂无消息通知的response,将这个DeferredResult缓存到本地。
- @Override
- public DeferredResult<Result<?>> pullNotify(Long userId, PullNotifyRequest pullNotifyRequest, Long timeOut) {
- //使用DeferredResult来执行异步操作
- DeferredResult<Result<?>> deferredResult = new DeferredResult<>(timeOut);
- //超时情况下
- deferredResult.onTimeout(()->{
- //从MAP中删除
- userNotifyDeferredCache.remove(userId);
- Result result = new Result();
- result.setCode(206);
- result.setMsg("暂无新消息通知");
- deferredResult.setResult(result);
- });
- userNotifyDeferredCache.save(userId,deferredResult::setResult);
- return deferredResult;
- }
这个类主要就是将消息发送到redis消息队列中。//TODO:使用Redis Stream消息队列来代替。在这里我简单的使用Redis List数据结构来做消息队列,使用一个监听器来监听这个消息队列
- @Component
- @RequiredArgsConstructor
- @AutoConfiguration
- public class UserNotifyProduction {
-
- private final StringRedisTemplate redisTemplate;
-
- /**
- * 系统通知消息生产者
- * @param noticeDTO
- */
- public void sendNotifyMessage(NoticeDTO noticeDTO){
- //判断一下目标用户是不是在线
- // Long userId = noticeDTO.getUserId();
- // if (userNotifyDeferredCache.isExist(userId)){
- redisTemplate.opsForList().rightPush(UserNotifyConstants.USER_NOTIFY_QUEUE, JSON.toJSONString(noticeDTO));
- // }else {
- // //存入数据库中
- // redisTemplate.opsForList().rightPush(UserNotifyConstants.USER_NOTIFY_TO_DB_QUEUE,JSON.toJSONString(noticeDTO));
- // }
- }
- }
- package blossom.project.towelove.server.redisMQ;
-
- import blossom.project.towelove.common.entity.notification.NoticeDTO;
- import blossom.project.towelove.framework.redis.core.UserNotifyConstants;
- import blossom.project.towelove.server.entity.Notice;
- import blossom.project.towelove.server.mapper.NoticeMapper;
- import blossom.project.towelove.server.service.impl.NotificationServiceImpl;
- import com.alibaba.fastjson2.JSON;
- import lombok.Getter;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
-
- import java.util.Objects;
- import java.util.concurrent.*;
-
- /**
- * @projectName: Towelove
- * @package: blossom.project.towelove.server.redisMQ
- * @className: RedisListener
- * @author: Link Ji
- * @description: GOGO
- * @date: 2024/1/19 18:38
- * @version: 1.0
- */
- @Component
- @RequiredArgsConstructor
- @Slf4j
- public class RedisListener implements InitializingBean {
-
- @Getter
- private final StringRedisTemplate redisTemplate;
-
- private final NotificationServiceImpl notificationService;
-
- private final NoticeMapper noticeMapper;
-
- private final UserNotifyDeferredCache userNotifyDeferredCache;
-
- ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> new Thread(r,"notifyListener"));
-
- @Override
- public void afterPropertiesSet() {
- /**
- * 100毫秒到队列中读取一次
- */
- scheduler.scheduleAtFixedRate(() ->{
- String message = redisTemplate.opsForList().leftPop(UserNotifyConstants.USER_NOTIFY_QUEUE);
- NoticeDTO noticeDTO = JSON.parseObject(message, NoticeDTO.class);
- if (Objects.isNull(noticeDTO)) return;
- if (userNotifyDeferredCache.isExist(noticeDTO.getUserId())) {
- log.info("用户:{}在线收到一条通知消息:{}",noticeDTO.getUserId(),noticeDTO.getContent());
- notificationService.setDeferredResult(noticeDTO.getUserId(), noticeDTO.getContent());
- }else{
- log.info("用户:{}离线收到一条通知消息:{}",noticeDTO.getUserId(),noticeDTO.getContent());
- //存入DB中
- Notice notice = new Notice();
- notice.setType(1);
- notice.setUserId(noticeDTO.getUserId());
- notice.setContent(noticeDTO.getContent());
- noticeMapper.insert(notice);
- }
- },5,100, TimeUnit.MILLISECONDS);
-
- // scheduler.scheduleAtFixedRate(() ->{
- // String message = redisTemplate.opsForList().leftPop(UserNotifyConstants.USER_NOTIFY_TO_DB_QUEUE);
- // NoticeDTO noticeDTO = JSON.parseObject(message, NoticeDTO.class);
- // if (Objects.nonNull(noticeDTO)){
- // log.info("用户:{}离线收到一条通知消息:{}",noticeDTO.getUserId(),noticeDTO.getContent());
- // //存入DB中
- // Notice notice = new Notice();
- // notice.setType(1);
- // notice.setUserId(noticeDTO.getUserId());
- // notice.setContent(noticeDTO.getContent());
- // noticeMapper.insert(notice);
- }
- // },5,100,TimeUnit.MILLISECONDS);
- log.info("notifyListener start successfully");
- }
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。