当前位置:   article > 正文

厚积薄发打卡Day92: MongoDB + RabbitMQ 构建消息推送功能(下)<RabbitMQ简易入门与整合>_rocketmq持久化到mogodb

rocketmq持久化到mogodb

产品选用

消息队列产品有很多,比如说常见的有Kafka、RocketMQ、RabbitMQ和ActiveMQ:

  • 其中了解后Kafka的性能是最好的,并发量比较大,而且消息收发的速度也非常快。但是消息收发的可靠性上,Kafka不如RabbitMQ,而且技术选型的时候执行速度并不是唯一标准,开发效率、易用性也是需要考虑的。
  • RabbitMQ具有支持消息异步收发,又支持同步收发的特点,虽然现在大部分的场景对应的是消息异步收发,但是有的场合要支持消息的同步收发,这时候RabbitMQ能适应各种业务场景的优点就显现出来了。所以在顾及程序健壮性时,选用了RabbitMQ。

RabbitMQ在系统中的作用(削峰填谷

  • 通过把消息发送到消息队列上,等用户登录时再把信息获从消息队列中取出,存入mongoDB中实现

RabbitMQ快速入门

  1. 安装,通过docker即可快速安装

    1. 拉取镜像:

      docker pull rabbitmq:3.7.7-management
      
      • 1
    2. 启动:

      根据下载的镜像创建和启动容器

      docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
      
      • 1

      说明:

      • -d 后台运行容器;
      • –name 指定容器名;
      • -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
      • -v 映射目录或文件;
      • –hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
      • -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
    3. 查看,启动成功:

      在这里插入图片描述

  2. 五种队列模式

    具体例子:RabbitMQ 详解 五种队列-SpiritMark_liu - 云+社区 - 腾讯云 (tencent.com)

    在此次消息模块的功能使用(topic)主题模式实现

    1. 简单模式

      一个生产者(发送方)对应一个消费者(接收方)

      在这里插入图片描述

    2. Work模式

      一个生产者对应多个消费者,但是只能有一个消费者获得消息(排他)

      在这里插入图片描述

    3. 发布/订阅模式

      一个消费者将消息首先发送到fanout交换器,交换器绑定到多个队列,然后与之对应的所有消费者都能接收到消息(不排他)

      在这里插入图片描述

    4. 路由模式

    生产者将消息发送到direct交换器,交换器按照关键字(Key),把消息路由到某个队列

    在这里插入图片描述

    1. 主题模式(√)

      生产者将消息发送到Topic交换器,交换器按照复杂的规则,把消息路由到某个队列

      在这里插入图片描述

  3. 消息持久化

    消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。除了消息持久化之外,甚至交换器和队列都能持久化。

  4. 消息过期时间

    默认情况下,消息是无限期存储在RabbitMQ上面的,但是我们可以设置消息过期时间,到期之后无论该消息是否已经被接收,都会被RabbitMQ删除。

  5. Ack应答

    消费者接收消息之后,必须返回一个Ack应答,那么RabbitMQ才会认为这条消息接收成功。如果想要删除这条消息,消费者发送Ack应答的时候,附带一个deliveryTag标志位就可以了。

功能整合

  1. pom.xml文件中添加RabbitMQ的依赖库

    <dependency>
    	<groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  2. 使用我们采用【 异步线程同步收发消息】实现消息推送功能:

    1. 线程执行分为同步和异步,MQ消息收发的API也分为异步和同步;现在是线程异步执行,调用MQ同步收发API
    2. MQ的异步准确的说是阻塞式的收发消息,没收到消息也不退出,一直在后端运行处于阻塞状态,一直等待有新消息的到来。类比Servlet,它也是阻塞执行的。如果我们采用了异步收发消息,迟迟不退出线程,那么线程就没办法回收到线程池,所以线程池很快被耗光。所以在系统中采用的是RabbitMQ自带的同步收发消息方式。
    3. 使用MQ同步执行,收完消息就退出,没有消息,接收程序也不退出,
  3. 异步收发配置yaml文件,同步收发则需配置ConnectionFactory对象:

    @Configuration
    public class RabbitMQConfig {
        @Bean
        public ConnectionFactory getFactory() {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("***"); //Linux主机的IP地址
            factory.setPort(5672); //RabbitMQ端口号
            return factory;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  4. 创建线程任务类:

    @Slf4j
    @Component
    public class MessageTask {
    
        @Autowired
        private ConnectionFactory factory;
    
        @Autowired
        private MessageService messageService;
    
        /**
         * 同步发送消息
         *
         * @param topic
         * @param entity
         */
        public void send(String topic, MessageEntity entity) {
            //向MongoDb保存消息数据,返回消息主键
            String id = messageService.insertMessage(entity);
            //向rabbitMQ发送消息
            try {
                //类比jdbc:创建连接,创建statement,执行sql
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                //连接到某个Topic
                channel.queueDeclare(topic, true, false, false, null);
                //存放属性数据:
                HashMap header = new HashMap();
                header.put("messageId", id);
                //创建AMQP协议参数对象,添加附加属性
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
                channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
                log.debug("消息发送成功");
            } catch (Exception e) {
                log.error("执行异常", e);
                throw new EmosException("向MQ发送消息失败");
            }
        }
    
    
        /**
         * 异步发送消息
         *
         * @param topic
         * @param entity
         */
        @Async
        //messageTask.sendAsync(userid + "", entity);
        public void sendAsync(String topic, MessageEntity entity) {
            send(topic, entity);
        }
    
    
        /**
         * 同步接收数据
         *
         * @param topic
         * @return
         */
        public int receive(String topic) {
            int i = 0;
            try {
                //创建mq连接
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                //从队列中获取消息,不自动确认
                channel.queueDeclare(topic, true, false, false, null);
                //Topic中有多少条数据位置,所以使用死循环接收数据,直到接收不到数据,退出死循环
                while (true) {
                    //创建响应接收数据,禁止自动发送ack应答
                    GetResponse response = channel.basicGet(topic, false);
                    if (response != null) {
                        AMQP.BasicProperties properties = response.getProps();
                        Map<String, Object> headers = properties.getHeaders();
                        String messageId = (String) headers.get("messageId");
                        byte[] body = response.getBody();
                        String message = new String(body);
                        log.debug("从RabbitMQ接收的消息" + message);
    
                        MessageRefEntity entity = new MessageRefEntity();
                        entity.setMessageId(messageId);
                        entity.setReceiverId(Integer.parseInt(topic));
                        entity.setReadFlag(false);
                        entity.setLastFlag(true);
                        //把消息存储在MongoDB中
                        messageService.insertRef(entity);
                        //数据保存到MongoDB后,才发送给ack应答 让topic删除这条消息
                        long deliveryTag = response.getEnvelope().getDeliveryTag();
                        channel.basicAck(deliveryTag, false);
                        i++;
                    } else {
                        break;//接收不到消息则退出循环
                    }
                }
            } catch (Exception e) {
                log.error("执行异常", e);
                throw new EmosException("获取消息出现异常");
            }
    
            return i;
        }
    
        /**
         * 异步接收数据
         *
         * @param topic
         * @return
         */
        @Async
        //异步接收消息
        //messageTask.receiveAysnc(userId + "");
        public int receiveAysnc(String topic) {
            return receive(topic);
        }
    
        /**
         * 同步删除消息队列
         *
         * @param topic 主题
         */
        public void deleteQueue(String topic) {
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDelete(topic);
                log.debug("消息队列成功删除");
            } catch (Exception e) {
                log.error("删除队列失败", e);
                throw new EmosException("删除队列失败");
            }
        }
    
        /**
         * 异步删除消息队列
         *
         * @param topic 主题
         */
        @Async
        public void deleteQueueAsync(String topic) {
            deleteQueue(topic);
        }
    }
            throw new EmosException("删除队列失败");
            }
        }
    
        /**
         * 异步删除消息队列
         *
         * @param topic 主题
         */
        @Async
        public void deleteQueueAsync(String topic) {
            deleteQueue(topic);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号