当前位置:   article > 正文

MongoDb + RabbitMQ 搭建系统通知模块_netty + rabbitmq +mongodb

netty + rabbitmq +mongodb

MongoDb + RabbitMQ 搭建系统通知模块

使用springboot实现

一、选择MongoDB和RabbitMQ理由

  1. 对于公告消息,本设计是设计为 为每个用户创建一条公告信息(原因是:方便记录用户对于消息的已读和未读状态,这样设计会更符合用户需求),因此设计两个表,message(存储消息及其发送者消息) 和 message_ref(存储接收者,及已读和新接收状态)
  2. 数据库选择为MongoDB,原因是MongoDB对于海量数据以及高并发情况下的读写数据很有优势,同时他的数据存储是以文档结构存储,简单来说就是用JSON格式,他会更加类似关系型数据库的存储,同时MongoDB从3.X开始支持集合的连接查询,就可以实现message和message_ref连接查询,查询出某个用户拥有的信息。
  3. 由于存在海量数据,高并发情况下,MongoDB也支持不了瞬时写入百万数据,因此引入消息队列MQ来进行削峰填谷。选择RabbitMQ的原因是,它的可靠性和稳定性比较好,而且不仅支持消息的异步收发,还支持消息的同步收发

二、简单逻辑

1.发送系统通知时,先把消息数据插入message表中,然后把消息推送到相关的消息队列(以用户id作为routing-key)中去

3.消费者消费消息时,取消自动应答,每次从消息队列中取数据的时候,把数据插入message_ref(lastFlag为true【新信息】,readFlag为false【未读】)中,插入成功后再发送消息的Ack应答,让消息队列删除消息

3.前端设计定时器轮训:每次读取消息的时候,先从消息队列中接收消息(步骤3;然后把message_ref中的lastFlag改为false,更新的数量即为新消息的数量;查询readFlag为false的数据(查询未读数据),然后把这些数据传给前端

三、实现

有些地方引用了hutool工具,可自行引入依赖

        <!--Hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.1、MongoDB数据库表的设计

3.1.1.message集合

集合相当于MySQL中的数据表,但是没有固定的表结构。集合有什么字段,取决于保存在其中的数据。下面这张表格是Message集合中JSON数据的结构要求。

字段类型备注
_idUUID自动生成的主键值
uuidUUIDUUID值,并且设置有唯一性索引,防止消息被重复消费
senderIdInteger发送者ID,就是用户ID。如果是系统自动发出,这个ID值是0
senderPhotoString发送者的头像URL。在消息页面要显示发送人的头像
senderNameString发送者名称,也就是用户姓名。在消息页面要显示发送人的名字
msgString消息正文
sendTimeDate发送时间

uuid : 防止重复消费

消息积压过多的情况下,如果第一次轮询还没结束,第二次轮询就开始了,那就可能出现把重复的数据写入数据库中,因此如果每条MQ消息都有唯一的UUID,第一个消费者把消息保存到数据库,那么第二个消费者就无法再把这条消息保存到数据库,解决了消息的重复消费问题。

3.1.2 message_ref集合

虽然message集合记录的是消息内容及其发送者,message_ref集合来记录接收人和已读状态。

字段类型备注
_idUUID主键
messageIdUUIDmessage记录的_id
receiverIdString接收人ID
readFlagBoolean是否已读
lastFlagBoolean是否为新接收的消息
3.1.3 连接查询

执行两个集合的联合查询,根据接收人来查询消息,并且按照消息发送时间降序排列,查询前50条记录

db.message.aggregate([
        {
                $set: {
                        "id": { $toString: "$_id" }
                }
        },
        {
                $lookup:{
                        from:"message_ref",
                        localField:"id",
                        foreignField:"messageId",
                        as:"ref"
                },
        },
        { $match:{"ref.receiverId": 1} },
        { $sort: {sendTime : -1} },
        { $skip: 0 },
        { $limit: 50 }
])


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
解析1.$set: {          "id": { $toString: "$_id" }       }添加一个字段,字段名为id,值为"_id"的值,格式为string类型3.$lookup:{             from:"message_ref",             localField:"id",             foreignField:"messageId",             as:"ref"          },连接另一个表from:连接哪个表localField :以自己的那个字段与另一个表连接foreignField:另一个表的连接字段as:给连接的那个表起别名3.$match:{"ref.receiverId": 1}查询条件(相当于where),该例子表示where ref.receiverId = 14.$sort: {sendTime : -1}排序,该例子为根据sendTime按降序排5.$skip: 0 从那条数据开始(数据是从0开始算的,即0为第一条数据),相当于分页查询的start ,即sql语句limit start,length中的start6.$limit: 50查询几条数据,相当于分页查询的length,即sql语句limit start,length中的length
  • 1

3.2 程序中实现数据表实体设计

依赖导入

        <!--mongodb-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

mongoDB配置

spring:  #MongoDB  data:    mongodb:      #ip      host: localhost           #端口      port: 37017      #数据库      database: pbms      authentication-database: admin      username: admin      password: abc133456
  • 1
3.2.1 pojo实体类
@Data
@Document(collection = "message")
public class MessageEntity implements Serializable {
    @Id
    private String _id;

    @Indexed(unique = true)
    private String uuid;

    @Indexed
    private Integer senderId;

    private String senderPhoto=""; //默认系统图片

    private String senderName;

    @Indexed
    private Date sendTime;

    private String msg;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
@Document(collection = "message_ref")
@Data
public class MessageRefEntity implements Serializable {
    @Id
    private String _id;

    @Indexed
    private String messageId;

    @Indexed
    private Integer receiverId;

    @Indexed
    private Boolean readFlag;

    @Indexed
    private Boolean lastFlag;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
解析1.@Document(collection = "message") 表示映射到Mongodb文档上的领域对象3.@Id 表示某个域为ID域3.@Indexed 表示某个字段为Mongodb的索引字段
  • 1
3.2.2 DAO

messageDao

@Repository
public class MessageDao {
    @Autowired
    private MongoTemplate mongoTemplate;

    //插入消息数据
    public String insert(MessageEntity entity) {
        //把北京时间转换成格林尼治时间
        Date sendTime = entity.getSendTime();
        sendTime = DateUtil.offset(sendTime, DateField.HOUR, -8);

        entity.setSendTime(sendTime);
        entity = mongoTemplate.save(entity);
        return entity.get_id();
    }

    //分页查询某个人的信息
    public List<HashMap> searchMessageByPage(int userId,long start,long length){
        JSONObject json = new JSONObject();
        json.set("$toString","$_id");
        //类比上面所说的连接查询,以那个为逻辑
        Aggregation aggregation = Aggregation.newAggregation(
                Aggregation.addFields().addField("id").withValue(json).build(),
                Aggregation.lookup("message_ref","id","messageId","ref"),
                Aggregation.match(Criteria.where("ref.receiverId").is(userId)),
                Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")),
                Aggregation.skip(start),
                Aggregation.limit(length)
        );
        AggregationResults<HashMap> message = mongoTemplate.aggregate(aggregation, "message", HashMap.class);
        List<HashMap> list = message.getMappedResults();
        list.forEach(one -> {
            List<MessageRefEntity> refList = (List<MessageRefEntity>) one.get("ref");
            MessageRefEntity entity = refList.get(0);
            Boolean readFlag = entity.getReadFlag();
            String id = entity.get_id();
            one.remove("ref");
            one.put("readFlag",readFlag);
            one.put("refId",id);
            one.remove("_id");
            //把格林尼治时间转换成北京时间
            Date sendTime = (Date) one.get("sendTime");
            sendTime = DateUtil.offset(sendTime, DateField.HOUR, 8);

            String today = DateUtil.today();
            //如果是今天的消息,只显示发送时间,不需要显示日期
            if (today.equals(DateUtil.date(sendTime).toDateStr())) {
                one.put("sendTime", DateUtil.format(sendTime, "HH:mm"));
            }
            //如果是以往的消息,只显示日期,不显示发送时间
            else {
                one.put("sendTime", DateUtil.format(sendTime, "yyyy/MM/dd"));
            }
        });
        return list;
    }

    //根据id查询某条消息详细内容
    public HashMap searchMessageById(String id){
        HashMap message = mongoTemplate.findById(id, HashMap.class, "message");
        Date sendTime = (Date) message.get("sendTime");
        //把格林尼治时间转换成北京时间
        sendTime = DateUtil.date(sendTime).offset(DateField.HOUR, 8);
        message.replace("sendTime", DateUtil.format(sendTime, "yyyy-MM-dd HH:mm"));
        return message;

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

MessageRefDao

@Repository
public class MessageRefDao {
    @Autowired
    private MongoTemplate mongoTemplate;

    public String insert(MessageRefEntity entity){
        entity = mongoTemplate.save(entity);
        return entity.get_id();
    }

    /**
     * 查询未读消息数量
     *
     * @param userId
     * @return
     */
    public long searchUnreadCount(int userId){
        Query query = new Query();
        query.addCriteria(Criteria.where("readFlag").is(false).and("receiverId").is(userId));
        long count = mongoTemplate.count(query, MessageRefEntity.class);
        return count;
    }

    /**
     * 查询新接收消息数量
     * 更新lastFlag为false,修改的数据数量即为新接收的消息数量
     * @param userId
     * @return
     */
    public long searchLastCount(int userId){
        Query query = new Query();
        query.addCriteria(Criteria.where("lastFlag").is(true).and("receiverId").is(userId));
        Update update = new Update();
        update.set("lastFlag",false);
        UpdateResult result = mongoTemplate.updateMulti(query, update, "message_ref");
        long count = result.getModifiedCount();
        return count;
    }

    /**
     * 把未读消息变更为已读消息
     * 
     * @param id
     * @return
     */
    public long updateUnreadMessage(String id){
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        Update update = new Update();
        update.set("readFlag",true);
        UpdateResult result = mongoTemplate.updateFirst(query, update, "message_ref");
        long count = result.getModifiedCount();
        return count;
    }

    /**
     * 根据ID删除ref消息
     *
     * @param id
     * @return
     */
    public long deleteMessageRefById(String id){
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        DeleteResult messageRef = mongoTemplate.remove(query, "message_ref");
        long deletedCount = messageRef.getDeletedCount();
        return deletedCount;
    }

    /**
     * 删除某个用户全部消息
     *
     * @param userId
     * @return
     */
    public long deleteUserMessageRef(int userId){
        Query query = new Query();
        query.addCriteria(Criteria.where("receiverId").is(userId));
        DeleteResult ref = mongoTemplate.remove(query, "message_ref");
        long deletedCount = ref.getDeletedCount();
        return deletedCount;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

3.3 业务层设计

基本上是引用DAO层

public interface MessageService {
    public String insertMessage(MessageEntity entity);

    public String insertRef(MessageRefEntity entity);

    public long searchUnreadCount(int userId);

    public long searchLastCount(int userId);

    public List<HashMap> searchMessageByPage(int userId, long start, int length) ;

    public HashMap searchMessageById(String id);

    public long updateUnreadMessage(String id) ;

    public long deleteMessageRefById(String id);

    public long deleteUserMessageRef(int userId);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
@Service
public class MessageServiceImpl implements MessageService {
    @Autowired
    private MessageDao messageDao;

    @Autowired
    private MessageRefDao messageRefDao;

    @Override
    public String insertMessage(MessageEntity entity) {
        String id = messageDao.insert(entity);
        return id;
    }

    @Override
    public String insertRef(MessageRefEntity entity) {
        String id = messageRefDao.insert(entity);
        return id;
    }

    @Override
    public long searchUnreadCount(int userId) {
        long count = messageRefDao.searchUnreadCount(userId);
        return count;
    }

    @Override
    public long searchLastCount(int userId) {
        long count = messageRefDao.searchLastCount(userId);
        return count;
    }

    @Override
    public List<HashMap> searchMessageByPage(int userId, long start, int length) {
        List<HashMap> list = messageDao.searchMessageByPage(userId, start, length);
        return list;
    }

    @Override
    public HashMap searchMessageById(String id) {
        HashMap map = messageDao.searchMessageById(id);
        return map;
    }

    @Override
    public long updateUnreadMessage(String id) {
        long count = messageRefDao.updateUnreadMessage(id);
        return count;
    }

    @Override
    public long deleteMessageRefById(String id) {
        long count = messageRefDao.deleteMessageRefById(id);
        return count;
    }

    @Override
    public long deleteUserMessageRef(int userId) {
        long count = messageRefDao.deleteUserMessageRef(userId);
        return count;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

3.4 web层

1.获取分页消息列表
@ApiModel
@Data
public class SearchMessageByPageForm {
    @NotNull
    @Min(1)
    private Integer page;

    @NotNull
    @Range(min = 1,max = 40)
    private Integer length;
}

@RestController
@RequestMapping("/message")
@Api("消息模块网络接口")
public class MessageController {
    @Autowired
    private JwtUtil jwtUtil;

    @Autowired
    private MessageService messageService;

    @PostMapping("/searchMessageByPage")
    @ApiOperation("获取分页消息列表")
    public R searchMessageByPage(@Valid @RequestBody SearchMessageByPageForm form, @RequestHeader("token") String token) {
        int userId = jwtUtil.getUserId(token);
        int page = form.getPage();
        int length = form.getLength();
        long start = (page - 1) * length;
        List<HashMap> list = messageService.searchMessageByPage(userId, start, length);
        return R.ok().put("result", list);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
3.根据ID查询消息
@ApiModel
@Data
public class SearchMessageByIdForm {
    @NotBlank
    private String id;
}

public class MessageController {
        ……
    @PostMapping("/searchMessageById")
    @ApiOperation("根据ID查询消息")
    public R searchMessageById(@Valid @RequestBody SearchMessageByIdForm form) {
        HashMap map = messageService.searchMessageById(form.getId());
        return R.ok().put("result", map);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
3.把未读消息更新成已读消息
@ApiModel
@Data
public class UpdateUnreadMessageForm {
    @NotBlank
    private String id;
}

public class MessageController {
        ……
    @PostMapping("/updateUnreadMessage")
    @ApiOperation("未读消息更新成已读消息")
    public R updateUnreadMessage(@Valid @RequestBody UpdateUnreadMessageForm form) {
        long rows = messageService.updateUnreadMessage(form.getId());
        return R.ok().put("result", rows == 1 ? true : false);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
4.删除消息
@Data
@ApiModel
public class DeleteMessageRefByIdForm {
    @NotBlank
    private String id;
}

public class MessageController {
        ……
    @PostMapping("/deleteMessageRefById")
    @ApiOperation("删除消息")
    public R deleteMessageRefById(@Valid @RequestBody DeleteMessageRefByIdForm form){
        long rows=messageService.deleteMessageRefById(form.getId());
        return R.ok().put("result", rows == 1 ? true : false);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
5.轮询接收系统消息
public class MessageController {
    ……

    @GetMapping("/refreshMessage")
    @ApiOperation("刷新用户的消息")
    public R refreshMessage(@RequestHeader("token") String token) {
        int userId = jwtUtil.getUserId(token);
        //异步接收消息
        messageTask.receiveAysnc(userId + "");
        //查询接收了多少条消息
        long lastRows=messageService.searchLastCount(userId);
        //查询未读数据
        long unreadRows = messageService.searchUnreadCount(userId);
        return R.ok().put("lastRows", lastRows).put("unreadRows", unreadRows);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.5 使用RabbitMQ实现削峰填谷

3.5.1 导入依赖
<dependency>
        <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
        <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
3.5.2 配置类

注意这里可能存在rabbitMQ连接不上的问题:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

原因是不支持使用默认用户进行非本地连接,应新建用户

如何新建用户

@Configuration
public class RabbitMQConfig {
    @Bean
    public ConnectionFactory getFactory(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");  //rabbitMQ所处服务器ip
        factory.setPort(5672); //端口
        factory.setUsername("admin"); //用户名
        factory.setPassword("admin"); //密码
        return factory;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3.5.3 收发消息

这里只是使用的rabbitMQ的简单模式,即一个消费者对应一个队列

如有其他需求可自行学习rabbitMQ的其他模式

rabbitMQ相关笔记


@Slf4j
@Component
public class MessageTask {
    @Autowired
    private ConnectionFactory factory;
    @Autowired
    private MessageService messageService;

    /**
     * 同步发送消息
     *
     * @param topic  主题
     * @param entity 消息对象
     */
    public void send(String topic, MessageEntity entity) {
        String id = messageService.insertMessage(entity); //向MongoDB保存消息数据,返回消息ID
        //向RabbitMQ发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //连接到某个Topic
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare(topic, true, false, false, null);
            HashMap header = new HashMap(); //存放属性数据
            header.put("messageId", id);
            //创建AMQP协议参数对象,添加附加属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
            log.debug("消息发送成功");
        } catch (Exception e) {
            log.error("执行异常", e);
            throw new PBMSException("向MQ发送消息失败");
        }
    }
    /**
     * 异步发送消息
     *
     * @param topic  主题
     * @param entity
     */
    @Async
    public void sendAsync(String topic, MessageEntity entity) {
        send(topic, entity);

    }
    /**
     * 同步接收数据
     *
     * @param topic 主题
     * @return 接收消息数量
     */
    public int receive(String topic) {
        int i = 0;
        try (//接收消息数据
             Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 从队列中获取消息,不自动确认
            channel.queueDeclare(topic, true, false, false, null);
            //Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环
            while (true) {
                //创建响应接收数据,禁止自动发送Ack应答
                GetResponse response = channel.basicGet(topic, false);
                if (response != null) {
                    AMQP.BasicProperties properties = response.getProps();
                    Map<String, Object> header = properties.getHeaders(); //获取附加属性对象
                    String messageId = header.get("messageId").toString();
                    byte[] body = response.getBody();//获取消息正文
                    String message = new String(body);
                    log.debug("从RabbitMQ接收的消息:" + message);
                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insertRef(entity); //把消息存储在MongoDB中
                    //数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    i++;
                } else {
                    break; //接收不到消息,则退出死循环
                }
            }
        } catch (Exception e) {
            log.error("执行异常", e);
        }
        return i;
    }

    /**
     * 异步接收数据
     *
     * @param topic
     * @return
     */
    @Async
    public int receiveAysnc(String topic) {
        return receive(topic);
    }

    /**
     * 同步删除消息队列
     *
     * @param topic 主题
     */
    public void deleteQueue(String topic) {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDelete(topic);
            log.debug("消息队列成功删除");
        } catch (Exception e) {
            log.error("删除队列失败", e);
            throw new PBMSException("删除队列失败");
        }
    }
    /**
     * 异步删除消息队列
     *
     * @param topic 主题
     */
    @Async
    public void deleteQueueAsync(String topic) {
        deleteQueue(topic);
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

3.6 使用

1.发送消息
在要发送信息的地方:创建MessageEntity,调用messageService.insertMessage,插入message表,然后调用messageTask.send发送消息到消息队列

2.接收消息
调用messageController.refreshMessage()接收消息,同时可获取新消息数量和未读消息数量

附录

测试数据

	@Test
    void contextLoads() {
        for (int i = 1; i <= 100; i++) {
            MessageEntity message = new MessageEntity();
            message.setUuid(IdUtil.simpleUUID());
            message.setSenderId(0);
            message.setSenderName("系统消息");
            message.setMsg("这是第" + i + "条测试消息");
            message.setSendTime(new Date());
            String id=messageService.insertMessage(message);

            MessageRefEntity ref=new MessageRefEntity();
            ref.setMessageId(id);
            ref.setReceiverId(11); //注意:这是接收人ID
            ref.setLastFlag(true);
            ref.setReadFlag(false);
            messageService.insertRef(ref);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/917827
推荐阅读
相关标签
  

闽ICP备14008679号