赞
踩
使用springboot实现
1.发送系统通知时,先把消息数据插入message表中,然后把消息推送到相关的消息队列(以用户id作为routing-key)中去
3.消费者消费消息时,取消自动应答,每次从消息队列中取数据的时候,把数据插入message_ref(lastFlag为true【新信息】,readFlag为false【未读】)中,插入成功后再发送消息的Ack应答,让消息队列删除消息
3.前端设计定时器轮训:每次读取消息的时候,先从消息队列中接收消息(步骤3;然后把message_ref中的lastFlag改为false,更新的数量即为新消息的数量;查询readFlag为false的数据(查询未读数据),然后把这些数据传给前端
有些地方引用了hutool工具,可自行引入依赖
<!--Hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.0</version>
</dependency>
集合相当于MySQL中的数据表,但是没有固定的表结构。集合有什么字段,取决于保存在其中的数据。下面这张表格是Message集合中JSON数据的结构要求。
字段 | 类型 | 备注 |
---|---|---|
_id | UUID | 自动生成的主键值 |
uuid | UUID | UUID值,并且设置有唯一性索引,防止消息被重复消费 |
senderId | Integer | 发送者ID,就是用户ID。如果是系统自动发出,这个ID值是0 |
senderPhoto | String | 发送者的头像URL。在消息页面要显示发送人的头像 |
senderName | String | 发送者名称,也就是用户姓名。在消息页面要显示发送人的名字 |
msg | String | 消息正文 |
sendTime | Date | 发送时间 |
uuid : 防止重复消费
消息积压过多的情况下,如果第一次轮询还没结束,第二次轮询就开始了,那就可能出现把重复的数据写入数据库中,因此如果每条MQ消息都有唯一的UUID,第一个消费者把消息保存到数据库,那么第二个消费者就无法再把这条消息保存到数据库,解决了消息的重复消费问题。
虽然message集合记录的是消息内容及其发送者,message_ref集合来记录接收人和已读状态。
字段 | 类型 | 备注 |
---|---|---|
_id | UUID | 主键 |
messageId | UUID | message记录的_id |
receiverId | String | 接收人ID |
readFlag | Boolean | 是否已读 |
lastFlag | Boolean | 是否为新接收的消息 |
执行两个集合的联合查询,根据接收人来查询消息,并且按照消息发送时间降序排列,查询前50条记录
db.message.aggregate([ { $set: { "id": { $toString: "$_id" } } }, { $lookup:{ from:"message_ref", localField:"id", foreignField:"messageId", as:"ref" }, }, { $match:{"ref.receiverId": 1} }, { $sort: {sendTime : -1} }, { $skip: 0 }, { $limit: 50 } ])
解析1.$set: { "id": { $toString: "$_id" } }添加一个字段,字段名为id,值为"_id"的值,格式为string类型3.$lookup:{ from:"message_ref", localField:"id", foreignField:"messageId", as:"ref" },连接另一个表from:连接哪个表localField :以自己的那个字段与另一个表连接foreignField:另一个表的连接字段as:给连接的那个表起别名3.$match:{"ref.receiverId": 1}查询条件(相当于where),该例子表示where ref.receiverId = 14.$sort: {sendTime : -1}排序,该例子为根据sendTime按降序排5.$skip: 0 从那条数据开始(数据是从0开始算的,即0为第一条数据),相当于分页查询的start ,即sql语句limit start,length中的start6.$limit: 50查询几条数据,相当于分页查询的length,即sql语句limit start,length中的length
依赖导入
<!--mongodb-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
mongoDB配置
spring: #MongoDB data: mongodb: #ip host: localhost #端口 port: 37017 #数据库 database: pbms authentication-database: admin username: admin password: abc133456
@Data @Document(collection = "message") public class MessageEntity implements Serializable { @Id private String _id; @Indexed(unique = true) private String uuid; @Indexed private Integer senderId; private String senderPhoto=""; //默认系统图片 private String senderName; @Indexed private Date sendTime; private String msg; }
@Document(collection = "message_ref") @Data public class MessageRefEntity implements Serializable { @Id private String _id; @Indexed private String messageId; @Indexed private Integer receiverId; @Indexed private Boolean readFlag; @Indexed private Boolean lastFlag; }
解析1.@Document(collection = "message") 表示映射到Mongodb文档上的领域对象3.@Id 表示某个域为ID域3.@Indexed 表示某个字段为Mongodb的索引字段
messageDao
@Repository public class MessageDao { @Autowired private MongoTemplate mongoTemplate; //插入消息数据 public String insert(MessageEntity entity) { //把北京时间转换成格林尼治时间 Date sendTime = entity.getSendTime(); sendTime = DateUtil.offset(sendTime, DateField.HOUR, -8); entity.setSendTime(sendTime); entity = mongoTemplate.save(entity); return entity.get_id(); } //分页查询某个人的信息 public List<HashMap> searchMessageByPage(int userId,long start,long length){ JSONObject json = new JSONObject(); json.set("$toString","$_id"); //类比上面所说的连接查询,以那个为逻辑 Aggregation aggregation = Aggregation.newAggregation( Aggregation.addFields().addField("id").withValue(json).build(), Aggregation.lookup("message_ref","id","messageId","ref"), Aggregation.match(Criteria.where("ref.receiverId").is(userId)), Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")), Aggregation.skip(start), Aggregation.limit(length) ); AggregationResults<HashMap> message = mongoTemplate.aggregate(aggregation, "message", HashMap.class); List<HashMap> list = message.getMappedResults(); list.forEach(one -> { List<MessageRefEntity> refList = (List<MessageRefEntity>) one.get("ref"); MessageRefEntity entity = refList.get(0); Boolean readFlag = entity.getReadFlag(); String id = entity.get_id(); one.remove("ref"); one.put("readFlag",readFlag); one.put("refId",id); one.remove("_id"); //把格林尼治时间转换成北京时间 Date sendTime = (Date) one.get("sendTime"); sendTime = DateUtil.offset(sendTime, DateField.HOUR, 8); String today = DateUtil.today(); //如果是今天的消息,只显示发送时间,不需要显示日期 if (today.equals(DateUtil.date(sendTime).toDateStr())) { one.put("sendTime", DateUtil.format(sendTime, "HH:mm")); } //如果是以往的消息,只显示日期,不显示发送时间 else { one.put("sendTime", DateUtil.format(sendTime, "yyyy/MM/dd")); } }); return list; } //根据id查询某条消息详细内容 public HashMap searchMessageById(String id){ HashMap message = mongoTemplate.findById(id, HashMap.class, "message"); Date sendTime = (Date) message.get("sendTime"); //把格林尼治时间转换成北京时间 sendTime = DateUtil.date(sendTime).offset(DateField.HOUR, 8); message.replace("sendTime", DateUtil.format(sendTime, "yyyy-MM-dd HH:mm")); return message; } }
MessageRefDao
@Repository public class MessageRefDao { @Autowired private MongoTemplate mongoTemplate; public String insert(MessageRefEntity entity){ entity = mongoTemplate.save(entity); return entity.get_id(); } /** * 查询未读消息数量 * * @param userId * @return */ public long searchUnreadCount(int userId){ Query query = new Query(); query.addCriteria(Criteria.where("readFlag").is(false).and("receiverId").is(userId)); long count = mongoTemplate.count(query, MessageRefEntity.class); return count; } /** * 查询新接收消息数量 * 更新lastFlag为false,修改的数据数量即为新接收的消息数量 * @param userId * @return */ public long searchLastCount(int userId){ Query query = new Query(); query.addCriteria(Criteria.where("lastFlag").is(true).and("receiverId").is(userId)); Update update = new Update(); update.set("lastFlag",false); UpdateResult result = mongoTemplate.updateMulti(query, update, "message_ref"); long count = result.getModifiedCount(); return count; } /** * 把未读消息变更为已读消息 * * @param id * @return */ public long updateUnreadMessage(String id){ Query query = new Query(); query.addCriteria(Criteria.where("_id").is(id)); Update update = new Update(); update.set("readFlag",true); UpdateResult result = mongoTemplate.updateFirst(query, update, "message_ref"); long count = result.getModifiedCount(); return count; } /** * 根据ID删除ref消息 * * @param id * @return */ public long deleteMessageRefById(String id){ Query query = new Query(); query.addCriteria(Criteria.where("_id").is(id)); DeleteResult messageRef = mongoTemplate.remove(query, "message_ref"); long deletedCount = messageRef.getDeletedCount(); return deletedCount; } /** * 删除某个用户全部消息 * * @param userId * @return */ public long deleteUserMessageRef(int userId){ Query query = new Query(); query.addCriteria(Criteria.where("receiverId").is(userId)); DeleteResult ref = mongoTemplate.remove(query, "message_ref"); long deletedCount = ref.getDeletedCount(); return deletedCount; } }
基本上是引用DAO层
public interface MessageService { public String insertMessage(MessageEntity entity); public String insertRef(MessageRefEntity entity); public long searchUnreadCount(int userId); public long searchLastCount(int userId); public List<HashMap> searchMessageByPage(int userId, long start, int length) ; public HashMap searchMessageById(String id); public long updateUnreadMessage(String id) ; public long deleteMessageRefById(String id); public long deleteUserMessageRef(int userId); }
@Service public class MessageServiceImpl implements MessageService { @Autowired private MessageDao messageDao; @Autowired private MessageRefDao messageRefDao; @Override public String insertMessage(MessageEntity entity) { String id = messageDao.insert(entity); return id; } @Override public String insertRef(MessageRefEntity entity) { String id = messageRefDao.insert(entity); return id; } @Override public long searchUnreadCount(int userId) { long count = messageRefDao.searchUnreadCount(userId); return count; } @Override public long searchLastCount(int userId) { long count = messageRefDao.searchLastCount(userId); return count; } @Override public List<HashMap> searchMessageByPage(int userId, long start, int length) { List<HashMap> list = messageDao.searchMessageByPage(userId, start, length); return list; } @Override public HashMap searchMessageById(String id) { HashMap map = messageDao.searchMessageById(id); return map; } @Override public long updateUnreadMessage(String id) { long count = messageRefDao.updateUnreadMessage(id); return count; } @Override public long deleteMessageRefById(String id) { long count = messageRefDao.deleteMessageRefById(id); return count; } @Override public long deleteUserMessageRef(int userId) { long count = messageRefDao.deleteUserMessageRef(userId); return count; } }
@ApiModel @Data public class SearchMessageByPageForm { @NotNull @Min(1) private Integer page; @NotNull @Range(min = 1,max = 40) private Integer length; } @RestController @RequestMapping("/message") @Api("消息模块网络接口") public class MessageController { @Autowired private JwtUtil jwtUtil; @Autowired private MessageService messageService; @PostMapping("/searchMessageByPage") @ApiOperation("获取分页消息列表") public R searchMessageByPage(@Valid @RequestBody SearchMessageByPageForm form, @RequestHeader("token") String token) { int userId = jwtUtil.getUserId(token); int page = form.getPage(); int length = form.getLength(); long start = (page - 1) * length; List<HashMap> list = messageService.searchMessageByPage(userId, start, length); return R.ok().put("result", list); } }
@ApiModel @Data public class SearchMessageByIdForm { @NotBlank private String id; } public class MessageController { …… @PostMapping("/searchMessageById") @ApiOperation("根据ID查询消息") public R searchMessageById(@Valid @RequestBody SearchMessageByIdForm form) { HashMap map = messageService.searchMessageById(form.getId()); return R.ok().put("result", map); } }
@ApiModel @Data public class UpdateUnreadMessageForm { @NotBlank private String id; } public class MessageController { …… @PostMapping("/updateUnreadMessage") @ApiOperation("未读消息更新成已读消息") public R updateUnreadMessage(@Valid @RequestBody UpdateUnreadMessageForm form) { long rows = messageService.updateUnreadMessage(form.getId()); return R.ok().put("result", rows == 1 ? true : false); } }
@Data @ApiModel public class DeleteMessageRefByIdForm { @NotBlank private String id; } public class MessageController { …… @PostMapping("/deleteMessageRefById") @ApiOperation("删除消息") public R deleteMessageRefById(@Valid @RequestBody DeleteMessageRefByIdForm form){ long rows=messageService.deleteMessageRefById(form.getId()); return R.ok().put("result", rows == 1 ? true : false); } }
public class MessageController { …… @GetMapping("/refreshMessage") @ApiOperation("刷新用户的消息") public R refreshMessage(@RequestHeader("token") String token) { int userId = jwtUtil.getUserId(token); //异步接收消息 messageTask.receiveAysnc(userId + ""); //查询接收了多少条消息 long lastRows=messageService.searchLastCount(userId); //查询未读数据 long unreadRows = messageService.searchUnreadCount(userId); return R.ok().put("lastRows", lastRows).put("unreadRows", unreadRows); } }
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
注意这里可能存在rabbitMQ连接不上的问题:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
原因是不支持使用默认用户进行非本地连接,应新建用户
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory getFactory(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip"); //rabbitMQ所处服务器ip
factory.setPort(5672); //端口
factory.setUsername("admin"); //用户名
factory.setPassword("admin"); //密码
return factory;
}
}
这里只是使用的rabbitMQ的简单模式,即一个消费者对应一个队列
如有其他需求可自行学习rabbitMQ的其他模式
@Slf4j @Component public class MessageTask { @Autowired private ConnectionFactory factory; @Autowired private MessageService messageService; /** * 同步发送消息 * * @param topic 主题 * @param entity 消息对象 */ public void send(String topic, MessageEntity entity) { String id = messageService.insertMessage(entity); //向MongoDB保存消息数据,返回消息ID //向RabbitMQ发送消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //连接到某个Topic /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(topic, true, false, false, null); HashMap header = new HashMap(); //存放属性数据 header.put("messageId", id); //创建AMQP协议参数对象,添加附加属性 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build(); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("", topic, properties, entity.getMsg().getBytes()); log.debug("消息发送成功"); } catch (Exception e) { log.error("执行异常", e); throw new PBMSException("向MQ发送消息失败"); } } /** * 异步发送消息 * * @param topic 主题 * @param entity */ @Async public void sendAsync(String topic, MessageEntity entity) { send(topic, entity); } /** * 同步接收数据 * * @param topic 主题 * @return 接收消息数量 */ public int receive(String topic) { int i = 0; try (//接收消息数据 Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 从队列中获取消息,不自动确认 channel.queueDeclare(topic, true, false, false, null); //Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环 while (true) { //创建响应接收数据,禁止自动发送Ack应答 GetResponse response = channel.basicGet(topic, false); if (response != null) { AMQP.BasicProperties properties = response.getProps(); Map<String, Object> header = properties.getHeaders(); //获取附加属性对象 String messageId = header.get("messageId").toString(); byte[] body = response.getBody();//获取消息正文 String message = new String(body); log.debug("从RabbitMQ接收的消息:" + message); MessageRefEntity entity = new MessageRefEntity(); entity.setMessageId(messageId); entity.setReceiverId(Integer.parseInt(topic)); entity.setReadFlag(false); entity.setLastFlag(true); messageService.insertRef(entity); //把消息存储在MongoDB中 //数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息 long deliveryTag = response.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); i++; } else { break; //接收不到消息,则退出死循环 } } } catch (Exception e) { log.error("执行异常", e); } return i; } /** * 异步接收数据 * * @param topic * @return */ @Async public int receiveAysnc(String topic) { return receive(topic); } /** * 同步删除消息队列 * * @param topic 主题 */ public void deleteQueue(String topic) { try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDelete(topic); log.debug("消息队列成功删除"); } catch (Exception e) { log.error("删除队列失败", e); throw new PBMSException("删除队列失败"); } } /** * 异步删除消息队列 * * @param topic 主题 */ @Async public void deleteQueueAsync(String topic) { deleteQueue(topic); } }
1.发送消息
在要发送信息的地方:创建MessageEntity,调用messageService.insertMessage
,插入message表,然后调用messageTask.send发送消息到消息队列
2.接收消息
调用messageController.refreshMessage()接收消息,同时可获取新消息数量和未读消息数量
测试数据
@Test void contextLoads() { for (int i = 1; i <= 100; i++) { MessageEntity message = new MessageEntity(); message.setUuid(IdUtil.simpleUUID()); message.setSenderId(0); message.setSenderName("系统消息"); message.setMsg("这是第" + i + "条测试消息"); message.setSendTime(new Date()); String id=messageService.insertMessage(message); MessageRefEntity ref=new MessageRefEntity(); ref.setMessageId(id); ref.setReceiverId(11); //注意:这是接收人ID ref.setLastFlag(true); ref.setReadFlag(false); messageService.insertRef(ref); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。