当前位置:   article > 正文

RabbitMQ消息模型及springboot整合RabbitMQ_springboot rabbitmq 自定义消息类型

springboot rabbitmq 自定义消息类型

什么场景下会用到RabbitMQ  数据库与ES搜索库 同步时

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同

 

常用为五种 

1、simple-简单消息模型

 (发送者,上游)producer------------->queue----------------->|多consumer|(消费,下游)

手动ack:
            channel.basicConsume(, false, );//不自动akc

            try{
                .......
                channel.basicAck(envelope.getDeliveryTag(),false);//手动akc
            }catch(){
            }
        好处:保证消费者把消息成功消费(理论)

2、work-工作消息模型(当有一个消息中间件 建议用)

 producer------------->queue----------------->|多consumer|

  能者多劳:
            channel.basicQos(1);
        如何防止消息堆积:多consumer + 能者多劳
 

3、fanout-广播消息模型

producer--------->exchange-----(绑定)------>|多queue|----------------->|多consumer|

注意:exchange只负责分发消息,若没有queue绑定则会把消息丢弃

4、direct-定向消息模型

                             routingkey
        producer--------->exchange------(绑定)----->|多queue|----------------->|多consumer|

routingkey:灵活分发消息

5、topic-定向消息模型(当有多个消息中间件  建议用(含有模糊匹配))

                          *.routingkey.#
        producer--------->exchange------(绑定)----->|多queue|----------------->|多consumer|

        *:匹配一个字符
        #:匹配多个字符

持久化


    1、队列
        channel.queueDeclare(, true, , , );
    2、交换器
        channel.exchangeDeclare(, , true);
    3、消息
        channel.basicPublish(, , MessageProperties.PERSISTENT_TEXT_PLAIN, );

    如何保证消息成功消费?
        1、rabbitmq会宕机?
            持久化
        2、consumer业务会处理失败?
            手动akc:只有ack后消息才会删除

springboot整合rabbitmq

 1、pom.xml
        spring-boot-starter-amqp
    2、application.yml
        spring:
          rabbitmq:
            host: 192.168.145.136  (rebbitmq 虚拟机地址)
            port: 5672
            username: admin
            password: 1111 
            virtual-host: / (库名)
    3、接收者
        @Component
        public class Recver{

            @RabbitListener(bindings = {@QueueBinding(
                   value = @Queue(name = "springboot_queue",durable = "true"),
                   exchange = @Exchange(name = "spring_exchange", type = ExchangeTypes.TOPIC),
                   key={"*.*"}
            )})
            public void listen(String msg){

            }
        }
    4、发送者
        @Autowired
        private AmqpTemplate amqpTemplate;

        amqpTemplate.convertAndSend(EXCHANGE_NAME, ROUTINGKEY, msg);

RebbitMQ 工具类

抽取一个建立RabbitMQ连接的工具类,方便其他程序获取连接

  1. package com.bjpowernode.util;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. public class ConnectionUtil {
  5. /**
  6. * 建立与RabbitMQ的连接
  7. * @return
  8. * @throws Exception
  9. */
  10. public static Connection getConnection() throws Exception {
  11. //定义连接工厂
  12. ConnectionFactory factory = new ConnectionFactory();
  13. //设置服务地址
  14. factory.setHost("192.168.145.136 ");
  15. //端口
  16. factory.setPort(5672);
  17. //设置账号信息,用户名、密码、vhost
  18. factory.setUsername("admin");
  19. factory.setPassword("1111");
  20. factory.setVirtualHost("/");
  21. // 通过工程获取连接
  22. Connection connection = factory.newConnection();
  23. return connection;
  24. }
  25. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/430680
推荐阅读
相关标签
  

闽ICP备14008679号