当前位置:   article > 正文

rabbitMQ_mq virture host 绑定一个队列两个虚拟机都有

mq virture host 绑定一个队列两个虚拟机都有

rabbitMQ

安装

1.手动安装

官网下载-需要手动安装Erlang环境

2.docker安装

brew install --cask --appdir=/Applications docker

$ docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
  • 1

rabbitMq管理界面

在这里插入图片描述

什么是AMQP

全称:高级消息队列协议,由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。

消息分发策略的机制对比

ActieveMQRabbitMQkafkaRockeetMQ
发布订阅支持支持支持支持
轮询分发支持支持支持/
公平分发/支持支持/
重发支持支持/支持
消息拉取/支持支持支持

RabbitMQ核心组件

在这里插入图片描述

Connection :连接,应用程序与Broker的网络连接TCP/IP三次握手和四次握手

Channel :网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立多个Channel。每个Channel代表一个会话任务

Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机里有若干个Exchange和queue,同一个虚拟主机里面不能有相同名字的Exchange

bindings:Exchange和Queue之间的虚拟连接,bingding中可以保护多个routing key

routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息

RabbitMq支持消息的工作模式

fanout模式

发布与订阅

在这里插入图片描述

原生代码

交换机和队列在可视化界面手动绑定情况下

public class Producer {


    public static void main(String[] args) {

        //所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp

        //1.创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setPort(5672);
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        //2.创建连接Connection
        try {
            connection = factory.newConnection("生产者");
            //3.通过连接获取通道Channel
            connection.createChannel();
            //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接受消息
            String queueName = "queue1";
            /**
             * @Params1 队列名称
             * @Params2 是否要持久化durable =false 所谓持久化消息是否存盘
             * @Params3 排他性,是否是独占独立
             * @Params4 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除
             * @Params5 携带附属参数
             */
            channel.queueDeclare(queueName, false, false, false, null);
            //5.准备消息内容
            String message = "Hello kun";
            //6.准备交换机
            String exchange ="fanout-exchange";
            //7.定义路由key
            String routingkey = "";
            //8.指定交换机的类型
            String exchangeType ="direct";
            channel.basicPublish(exchange, queueName, null, message.getBytes());

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            //7.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
public class Consumer {

    public static void main(String[] args) {

        //所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp

        //1.创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setPort(5672);
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/");

        Connection connection = null;
        final Channel channel = null;
        //2.创建连接Connection
        try {
            connection = factory.newConnection("生产者");
            //3.通过连接获取通道Channel
            connection.createChannel();
            //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接受消息

            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接受消息失败");
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            //7.关闭连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭通道
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

绑定关系代码实现----原生代码

//声明交换机
channel.exchangeDeclare(exchange,exchangeType,true);
//声明队列
channel.queueDeclare("queue",true,false,false,null);
//绑定队列
channel.queueBind("queue",exchange,exchangeType);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

direct模式

路由模式

在这里插入图片描述

topic模式

主题模式

在这里插入图片描述

lazy.#. #代表0个或者一个或者多个

*.orange. * *代表至少要满足一个

#.order.#

#.user.*

work模式

轮训分发

公平分发

//指标定义出来,qos=1
xxxx.basicQos("")
  • 1
  • 2

在这里插入图片描述

RabbitMq使用场景

解耦、削峰、异步

springboot整合

引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
  • 1
  • 2
  • 3
  • 4

交换机和队列的配置绑定

fanout模式

package com.kun.order.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfiguration {

    //1.声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("direct_order_exchange", true, false);
    }

    //2.声明队列 sms.fanout.queue/email.fanout.queue
    @Bean
    public Queue smsQueue() {
        return new Queue("smsQueue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("emailQueue", true);
    }

    //3.绑定关系
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

direct 模式

//3.绑定关系
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("email");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

注解配置

package com.kun.order.topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.sms.*"
))
public class TopicEmailConsumer {


    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println(message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

生产者

package com.kun.order.service;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 模拟用户下单
     *
     * @param userId
     * @param productId
     * @param num
     */
    public void makeOrder(String userId, String productId, int num) {
        String orderId = UUID.randomUUID().toString();
        String exchangeName = "direct";
        String routingKey = "";

        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

消费者

package com.kun.order.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "email.fanout.queue")
public class FanoutEmailConsumer {


    @RabbitHandler
    public void reviceMessage(String message) {
        System.out.println(message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

TTL

过期时间TTL表示可以对消息设置预期的时间,目前有两种方法可以设置

1.通过队列属性设置,队列中所有消息都有相同的过期时间

2.通过消息进行单独设置,每条消息TTL可以不同

如果两种方法同时使用,则消息的过期时间以两者之间TTL较小的为准

TTL队列过期时间

@Bean
    public Queue smsQueue() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",5000);
        return new Queue("smsQueue", true,false,false,map);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

TTL消息过期时间

 public void makeOrder(String userId, String productId, int num) {
        String orderId = UUID.randomUUID().toString();
        String exchangeName = "direct";
        String routingKey = "";

        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };

        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId,processor);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

DLX-死信交换机

@Bean
    public Queue smsQueue() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",5000);
        //x-dead-letter-exchange  -固定写法
      	//x-dead-letter-routing-key
      	map.put("x-dead-letter-exchange","exchangeName");
      	map.put("x-dead-letter-routing-key","routingName");
        return new Queue("smsQueue", true,false,false,map);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

RabbitMq的内存控制

参考帮助文档

RabbitMq分布式事务

具体实现

可靠生产和可靠消费的示意图

在这里插入图片描述

基于MQ的分布式事务消息的可靠生产问题-定时重发

在这里插入图片描述

基于MQ的分布式事务消息的消息重发

在这里插入图片描述

@Service
public class OrderMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private JdbcTemplate jdbcTemplate;

    //@PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。
    //Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,
    // 并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行。
    @PostConstruct
    public void regCallback() {
        // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("cause:"+cause);
                // 如果ack为true代表消息已经收到
                String orderId = correlationData.getId();

                if (!ack) {
                    // 这里可能要进行其他的方式进行存储
                    System.out.println("MQ队列应答失败,orderId是:" + orderId);
                    return;
                }

                try {
                    String updatesql = "update ksd_order_message set status = 1 where order_id = ?";
                    int count = jdbcTemplate.update(updatesql, orderId);
                    if (count == 1) {
                        System.out.println("本地消息状态修改成功,消息成功投递到消息队列中...");
                    }
                } catch (Exception ex) {
                    System.out.println("本地消息状态修改失败,出现异常:" + ex.getMessage());
                }
            }
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

基于MQ分布式事务消息的死信队列消息转移+人工处理

可靠消费问题
在这里插入图片描述

如果死信队列报错就进行人工处理
在这里插入图片描述

@Service
public class DeadMqConsumer {

    @Autowired
    private DispatchService dispatchService;



    // 解决消息重试的集中方案:
    // 1: 控制重发的次数 + 死信队列
    // 2: try+catch+手动ack
    // 3: try+catch+手动ack + 死信队列处理 + 人工干预
    @RabbitListener(queues = {"dead.order.queue"})
    public void messageconsumer(String ordermsg, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            // 1:获取消息队列的消息
            System.out.println("收到MQ的消息是: " + ordermsg );
            //  2: 获取订单服务的信息
            Order order = JsonUtil.string2Obj(ordermsg, Order.class);
            // 3: 获取订单id
            String orderId = order.getOrderId();
            // 幂等性问题
            //int count = countOrderById(orderId);
            // 4:保存运单
            //if(count==0)dispatchService.dispatch(orderId);
            //if(count>0)dispatchService.updateDispatch(orderId);
             dispatchService.dispatch(orderId);
            // 3:手动ack告诉mq消息已经正常消费
            channel.basicAck(tag, false);
        } catch (Exception ex) {
            System.out.println("人工干预");
            System.out.println("发短信预警");
            System.out.println("同时把消息转移别的存储DB");
            channel.basicNack(tag, false,false);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/694334
推荐阅读
相关标签
  

闽ICP备14008679号