当前位置:   article > 正文

RabbitMQ_15671 rabbitmq

15671 rabbitmq

RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛
消息中间件
           1.提升异步通信,扩展解耦
           2.消息代理和目的地
           3.目的地形式:队列(queue),主题(topic)
           4.点对点式:一个发送者一个接收者
           5.发布订阅:发布者发送消息,多个接收者监听,同时收到消息
           6.JMS   java消息服务:ActiveMQ和HornetMQ是这种模式,如果不跨语言可以使用此服务
           7.AMQP  高级消息队列协议:RabbitMQ,可跨语言

 RabbitMQ:
        核心概念:
            Message     消息:由消息头和体组成,消息头由一系列可选属性组成,routing-key(路由键),priority(优先权)等
            Publisher   消息生产者,交换器发布消息的客户端
            Exchange    交换器,接收消息并路由到服务器队列
            Queue       消息队列
            Binding     绑定,用于消息队列和交换器之间的关系
            Connection  网络连接
            Channel     信道,建立一条独立双向数据流通道,建立TCP通信开销大,所有开辟信道,复用一条TCP连接
            Consumer    消费者,取得消息的客户端
            Virtual Host虚拟主机,一批交换器和消息队列和相关对象
            Broker      消息队列服务器实体
        安装:docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
        整合RabbitMQ
            1.引入pom文件spring-boot-starter-amaq
            2.application.yml配置
            3.测试RabbitMQ
                1.AamqpAdmin:管理组件
                2.RabbitTemplate:消息发送处理组件
            4.RabbitMQ消息确认机制-可靠抵达
                1.publisher  confirmCallback  确认模式
                2.publisher  returnCallback   未投递到queue退回模式
                3.consumer   ack机制,手动配置
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

yml配置

RabbitMQ配置
spring.rabbitmq.host=192.168.77.130
spring.rabbitmq.port=5672
 虚拟主机配置
spring.rabbitmq.virtual-host=/
 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
#只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
#手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

config配置

@Configuration
	public class MyRabbitConfig {

    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

   
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }}

	



```java
在这里插入代码片
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
@Configuration
	public class MyRabbitMQConfig {

    /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

    /**
     * 死信队列
     *
     * @return
     */@Bean
    public Queue orderDelayQueue() {
        /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);

        return queue;
    }

    /**
     * 普通队列
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    }


    @Bean
    public Exchange orderEventExchange() {

        return new TopicExchange("order-event-exchange", true, false);

    }


    @Bean
    public Binding orderCreateBinding() {

        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    @Bean
    public Binding orderReleaseBinding() {

        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

    /**
     * 订单释放直接和库存释放进行绑定
     * @return
     */
    @Bean
    public Binding orderReleaseOtherBinding() {

        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    }


 
    @Bean
    public Queue orderSecKillOrrderQueue() {
        Queue queue = new Queue("order.seckill.order.queue", true, false, false);
        return queue;
    }

    @Bean
    public Binding orderSecKillOrrderQueueBinding() {
 
        Binding binding = new Binding(
                "order.seckill.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.seckill.order",
                null);

        return binding;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103

## RabbitMQ延时队列(实现定时任务)
	支付宝未付款,超过一定时间后,系统自动取消并释放占有物品
	常见解决方案:spring的schedule定时任务轮询数据库
	缺点:消耗内存,增加数据库压力     解决:rabbitMQ的消息TTL和死信Exchange结合
	TTL就是消息的过期时间 ,MQ可以对队列和消息设置TTL,超过时间消息就死了称为死信   
            延时队列步骤就是先将消息放进队列设置30分钟过期,过期后消息会被丢进死信路由,死信路由再放进另一个队列,而微服务监听的就是这个队列
            创建一个配置类,类创建两个队列queue,一个交换机exchange,两个绑定关系
            消息丢失,积压,重复解决方案:
                消息丢失:db创建一个表,每个消息做好日志记录,保存消息的详细信息,定期扫描数据库将失败的消息再发送一遍,设法消息重试发送     
                         Broker尚未持久化完成,死机:publisher必须加入确认回调机制,确认成功的消息,修改数据库消息状态
                         自动ACK状态下,消费者收到消息,但没来得及消费:手动ACK,没消费成功就重新入队
                消息重复:消息消费成功,但没返回ack,导致重新发送等各种情况:设计为幂等性,防重表,唯一标识。判断业务修改状态等
                消息积压:消费者死机没消费等:上线更多的消费者或上线专门队列消费服务,将消息都取出来,记录数据库,离线慢慢处理
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/558370
推荐阅读
相关标签
  

闽ICP备14008679号