当前位置:   article > 正文

RabbitMQ的使用_rabbitmq x q r

rabbitmq x q r

1、MQ是什么

MQ: Message Quene	消息队列
  • 1
队列: 一种数据结构,先进先出
  • 1
消息队列: 本质是个队列,只不过存放的是消息而已
  • 1
消息中间件:	简单来说就是用来传输消息的中间载体
  • 1

MQ的作用:

  • 流量消峰(解决高并发)
  • 模块之间的异步通信

MQ的框架很多,比如ActiveMQ,RabbitMQ,RocketMQ,kafka,而我们使用的就是RabbitMQ

2、RabbitMQ的安装(Linux)

安装环境

yum install epel-release
yum install erlang
  • 1
  • 2

安装RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm

yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm
  • 1
  • 2
  • 3

设置为开机启动

systemctl enable rabbitmq-server.service
  • 1

查看服务状态

systemctl status rabbitmq-server.service
  • 1

启动服务

systemctl start rabbitmq-server.service
  • 1

停止服务

systemctl stop rabbitmq-server.service
  • 1

重启服务

systemctl restart rabbitmq-server.service
  • 1

查看当前所有的用户

 rabbitmqctl list_users
  • 1

查看guest用户所有拥有的权限

rabbitmqctl list_user_permissions guest
  • 1

删除guest用户

rabbitmqctl delete_user guest
  • 1

添加一个新的用户

rabbitmqctl add_user xbb 123456
  • 1

给用户设置角色

rabbitmqctl set_user_tags xbb administrator
  • 1

给用户赋予权限

  rabbitmqctl set_permissions -p / xbb ".*" ".*" ".*"
  • 1

开启web的管理界面

rabbitmq-plugins enable rabbitmq_management
  • 1

3、RabbitMQ中的五种消息模型

1、helloworld模型

在这里插入图片描述
P:生产者
C:消费者
红色代表队列
生产者将消息发送到队列,消费者从队列获取消息
1.导入依赖

 <!--导入RabbitMQ的相关的包-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.5.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.获取MQ的连接

public class ConnectionUtils {

    /**
     * 获取连接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {

        //申明连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接主机
        connectionFactory.setHost("114.55.219.117");
        //设置虚拟机
        connectionFactory.setVirtualHost("/");
        //设置访问的用户名
        connectionFactory.setUsername("xbb");
        //设置密码
        connectionFactory.setPassword("123456");
        //设置请求的端口
        connectionFactory.setPort(5672);
        
        return connectionFactory.newConnection();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3.生产者发送消息到队列

public class Producer {

    private static final String QUENE_NAME="helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建数据传输的通道
        Channel channel = connection.createChannel();
        //申明队列
        /**
         * 第一个参数:队列的名字
         * 第二个参数:是否持久化,发送到队列的消息,如果没有持久化重启会丢失
         * 第三个参数:是否排外
         *      1:连接关闭之后,这个队列是否自动删除
         *      2:是否允许其他通道来进行访问
         *  第四个参数:是否允许自动删除
         *  第五个参数:申明队列的时候附带的一些参数
         */
        channel.queueDeclare(QUENE_NAME,false,false,false,null);
        //发送数据到队列
        /**
         * 第一个参数:exchange   交换机
         * 第二个参数:路由的key,没有就使用队列的名字
         * 第三个参数:发送数据到队列时携带的参数
         * 第四个参数:向队列发送的数据
         */
        channel.basicPublish("",QUENE_NAME,null,"helloworld".getBytes());
        //释放资源
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

在这里插入图片描述
4.消费者从队列获取消息

public class Consumer {

    private static final String QUEUE_NAME="helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag   消息的唯一标识
             * @param envelope  请求消息属性的封装
             * @param properties    前面队列携带过来的值
             * @param body  接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是:"+new String(body));
                //进行手动应答
                /**
                 * 第一个参数:自动应答
                 * 第二个参数:false表示收到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //绑定消费者
        /**
         * 第一个参数:队列名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
2、work模型

在这里插入图片描述
多个消费者消费的数据之和才是原来队列中的所有数据 适用于流量的消峰
消费者1:

public class Consumer1 {

    private static final String QUEUE_NAME="work";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag   消息的唯一标识
             * @param envelope  请求消息属性的封装
             * @param properties    前面队列携带过来的值
             * @param body  接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接受到的消息是:"+new String(body));
                //进行手动应答
                /**
                 * 第一个参数:自动应答的标记
                 * 第二个参数:false表示收到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //绑定消费者
        /**
         * 第一个参数:队列名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

消费者2:

public class Consumer2 {

    private static final String QUEUE_NAME="work";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag   消息的唯一标识
             * @param envelope  请求消息属性的封装
             * @param properties    前面队列携带过来的值
             * @param body  接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接受到的消息是:"+new String(body));
                //进行手动应答
                /**
                 * 第一个参数:自动应答
                 * 第二个参数:false表示收到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //绑定消费者
        /**
         * 第一个参数:队列名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

生产者:

public class Producer {

    private static final String QUEUE_NAME="work";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        for (int i=0;i<100;i++){
            channel.basicPublish("",QUEUE_NAME,null,(""+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
3、发布订阅模式

在这里插入图片描述
生产者没有将消息发送到队列而是发送到交换机,每个消费者都有自己的队列,每个队列都要绑定到交换机,消费者获取到生产者发送的信息是完整的
消费者1:

public class Consumer1 {

    private static final String QUEUE_NAME="work1";

    private static final String EXCHANGE_NAME="fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        /**将队列绑定到交换机
         * 第一个参数:队列的名字
         * 第二个参数:交换机的名字
         * 第三个参数:路由的key
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接受到的消息是:"+new String(body));
                //进行手动应答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

消费者2:

public class Consumer2 {

    private static final String QUEUE_NAME="work2";

    private static final String EXCHANGE_NAME="fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        /**将队列绑定到交换机
         * 第一个参数:队列的名字
         * 第二个参数:交换机的名字
         * 第三个参数:路由的key
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接受到的消息是:"+new String(body));
                //进行手动应答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

生产者:

public class publish {

    //交换机的名字
    private static final String EXCHANGE_NAME="fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //申明交换机
        /**
         * 第一个参数:交换机的名字
         * 第二个参数:交换机的类型
         *      如果使用的是发布订阅模式:只能写fanout
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //发送消息到交换机
        for (int i = 0; i<100;i++){
            channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模式的值"+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
4、路由模型

在这里插入图片描述
生产者:

public class Producer {

    private static final String EXCHANGE_NAME="test_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //如果是路由模型,第二个参数必须为direct
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //发送消息到交换机
        for (int i = 0; i < 100; i++) {
            if(i%2==0){
                //将偶数项的消息发送到路由为hello的队列
                channel.basicPublish(EXCHANGE_NAME,"hello",null,(EXCHANGE_NAME+i).getBytes());
            }else {
                //将奇数项的消息发送到路由为world的队列
                channel.basicPublish(EXCHANGE_NAME,"world",null,(EXCHANGE_NAME+i).getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

消费者1:

public class Consumer1 {

    private static final String QUEUE_NAME="direct_queue_1";

    private static final String EXCHANGE_NAME="test_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //绑定队列到交换机
        //第二个参数为路由的key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"hello");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由hello的队列接受的消息为"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

消费者2:

public class Consumer2 {

    private static final String QUEUE_NAME="direct_queue_2";

    private static final String EXCHANGE_NAME="test_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //绑定队列到交换机
        //第二个参数为路由的key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"world");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由world的队列接受的消息为"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5、主题模式(topic)

在这里插入图片描述
topic模式相当于是对 路由模式的一个升级 topic模式主要就是在匹配的规则上可以实现模糊匹配

*:只能匹配一个单词	  如:user.*	user.username
#:可以匹配一个到多个单词	如:user.#	user.username.xbb
  • 1
  • 2

生产者:

public class Producer {

    private static final String QUEUE_NAME="topic_queue1";

    private static final String EXCHANE_NAME="test_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //交换机
        channel.exchangeDeclare(EXCHANE_NAME,"topic");
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANE_NAME,"hello.world",null,("topic模型"+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费者1:

public class Consumer1 {
    private static final String QUEUE_NAME="topic_hello";

    private static final String EXCHANGE_NAME="test_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"hello.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由hello接受的数据:"+new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

消费者2:

public class Consumer2 {
    private static final String QUEUE_NAME="topic_world";

    private static final String EXCHANGE_NAME="test_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.world");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由world接受的数据:"+new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

4、RabbitMQ中的高级属性

1、confirm机制

放到队列中的消息,怎么保证一定成功的放入队列了呢?
confirm机制:只要消息放入队列成功,那么队列就一定会给反馈
生产者:

public class Producer {
    private static final String QUEUE_NAME="test_confirm";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //开启confirm消息机制
        channel.confirmSelect();
        //对消息实施监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println("消息发送成功的监听");
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("消息发送失败的监听");
            }
        });

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicPublish("",QUEUE_NAME,null,"hello world".getBytes());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
2、return机制

生产者发送消息的时候,如果交换机不存在,或者路由的key不存在,这时候就需要监听这种到达不了的消息。
注意:当前的队列中必须要有消费者存在
生产者:

public class Producer {

    private static final String EXCHANGE_NAME="test_return";
    //正确的路由的key
    private static final String ROUTING_KEY="return.hello";
    //错误的路由的key
    private static final String ROUTING_ERROR_KEY="world";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.addReturnListener(new ReturnListener() {
            /**
             *
             * @param i 队列相应给浏览器的状态码
             * @param s 状态码对于的文本信息
             * @param s1    交换机的名字
             * @param s2    路由的key
             * @param basicProperties   消息的属性
             * @param bytes 消息体的内容
             * @throws IOException
             */
            @Override
            public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("监听到不可达的消息");
                System.out.println("状态码"+i);
                System.out.println("文本信息"+s);
                System.out.println("交换机名字"+s1);
                System.out.println("路由的key"+s2);

            }
        });
        //这里的第三个参数如果设置为true,表示要监听不可达的消息进行处理
        //如果设置为false,那么队列会直接删除这个消息
        channel.basicPublish(EXCHANGE_NAME,ROUTING_ERROR_KEY,true,null,"厕所return机制".getBytes());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

消费者:

public class Consumer {
    //交换机的名字
    private static final String EXCHANE_NAME="test_return";
    //队列的名字
    private static final String QUEUE_NAME="queue_return";
    //路由的key
    private static final String ROUTING_NAME="*.hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANE_NAME,"topic");
        //绑定
        channel.queueBind(QUEUE_NAME,EXCHANE_NAME,ROUTING_NAME);
        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到消息了");
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

在这里插入图片描述

3、消费端的限流问题

假设消费者挂了,消息全部堆积到队列里面,然后当消费者重新启动时,队列里的消息就全部发送过来,但是客户端没办法同时去处理那么多的消息
这种场景下就需要对消费者进行限流
生产者:

public class Producer {
    private static final String QUEUE_NAME="test_limit";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish("",QUEUE_NAME,null,(i+"元钱").getBytes());
        }
        channel.close();
        connection.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

限流的消费者:

public class Consumer1 {
    private static final String QUEUE_NAME="test_limit";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        /**
         * 开启限流
         * 第一个参数:消息本身的大小,如果设置为0,那么表示对消息大小不限制
         * 第二个参数:一次性推送消息的最大数量,前提消息必须手动应答完成
         * 第三个参数:true:将设置应用到通道  false:只是当前消费者的策略
         */
        channel.basicQos(0,5,false);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到了"+new String(body)+"元钱");
                try {
                    Thread.sleep(100);
                } catch (Exception e){

                }
                //进行手动应答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //使用限流必须手动应答
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

消费者2:

public class Consumer2 {
    private static final String QUEUE_NAME="test_limit";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到了"+new String(body)+"元钱");
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
4、ttl队列

给队列中的消息添加时间限制,如果超时这个消息会被删除
生产者:

public class Producer {
    private static final String QUEUE_NAME="test_ttl";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //ttl队列
        Map<String,Object> map = new HashMap<>();
        //设置5秒的过期时间
        map.put("x-message-ttl",5000);

        channel.queueDeclare(QUEUE_NAME,false,false,false,map);
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish("",QUEUE_NAME,null,(i+"元钱").getBytes());
        }
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
5、死信队列

什么是死信队列

当消息在队列中变成死信之后、可以定义它重新push 到另外一个交换机上、这个交换机 也有自己对应的队列 这个队列就称为死信队列
满足死信的条件:

  • 发送到队列中的消息被拒绝了
  • 消息的ttl时间过期
  • 队列达到了最大长度 再往里面放信息

当这个队列中如果有这个死信的时候、rabbitmq就会将这个消息自动发送到我们提前定义好的死信队列中去

​生产者:

public class Producer {
    private static final String EXCHANGE_NAME="ttl-dlx-exchange";

    private static final String ROUTING_KEY="dlx.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,false,null,"nihaoa".getBytes());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

​ 消费者:

public class Consumer {
    //交换机名字
    private static final String EXCHANGE_NAME="ttl-dlx-exchange";
    //队列名
    private static final String QUEUE_NAME="ttl-dlx-queue";
    //死信队列
    private static final String DLX_QUEUE_NAME="dlx-queue";
    //死信交换机
    private static final String DLX_EXCHANGE_NAME="dlx-exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //申明队列
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000);
        map.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        channel.queueDeclare(QUEUE_NAME, false, false, false, map);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dlx.#");


        //绑定死信队列
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, "topic");
        channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
        channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, "#");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到:" + new String(body));
            }
        };
        channel.basicConsume(DLX_QUEUE_NAME, true, defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
6、消费者端的手动签收和消息重回队列

消费者除了手动签收应答还可以拒绝接受消息,让消息重回队列
生产者:

public class Producer {

    private static final String QUEUE_NAME="helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建数据传输的通道
        Channel channel = connection.createChannel();
        //申明队列
        /**
         * 第一个参数:队列的名字
         * 第二个参数:是否持久化,发送到队列的消息,如果没有持久化重启会丢失
         * 第三个参数:是否排外
         *      1:连接关闭之后,这个队列是否自动删除
         *      2:是否允许其他通道来进行访问
         *  第四个参数:是否允许自动删除
         *  第五个参数:申明队列的时候附带的一些参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送数据到队列
        /**
         * 第一个参数:exchange   交换机
         * 第二个参数:路由的key,没有就使用队列的名字
         * 第三个参数:发送数据到队列时携带的参数
         * 第四个参数:向队列发送的数据
         */
        channel.basicPublish("",QUEUE_NAME,null,"helloworld".getBytes());
        //释放资源
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

消费者:

public class Consumer {

    private static final String QUEUE_NAME="helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag   消息的唯一标识
             * @param envelope  请求消息属性的封装
             * @param properties    前面队列携带过来的值
             * @param body  接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是:"+new String(body));

                /**
                 * 第一个参数:自动应答
                 * 第二个参数:false表示收到消息了
                 * 第三个参数:表示决绝签收之后这个消息是否要重回队列?
                 */
                channel.basicNack(envelope.getDeliveryTag(),false,true);
            }
        };

        //绑定消费者
        /**
         * 第一个参数:队列名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
7、保证消息的投递是成功的

1.消息的延迟投递来解决传递的可靠性
在这里插入图片描述
2.日志消息表实现可靠消息的传输
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/808708
推荐阅读
相关标签
  

闽ICP备14008679号