当前位置:   article > 正文

rabbitmq五种模式详解(含实现详情代码)_rabbitmq多个项目如何区分

rabbitmq多个项目如何区分

rabbitMQ的结构以及每个组件的作用

在这里插入图片描述

简介:

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

rabbitMQ的模式

下面介绍常用的五大模式:

首先建工程引入相关的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aaa</groupId>
    <artifactId>rabbit-parent</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <!--子模块-->
    <modules>
        <module>consumer</module>
        <module>product</module>
    </modules>
    <dependencies>
        <!--引入rabbitMQ的依赖-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

工程图浏览:

consumer和product都是子项目。

在这里插入图片描述

1.简单模式

在这里插入图片描述

从上图可以看到一共三个角色
p(product):代表的是生产者,用来生产消息的。
红色的中间部分(queue):代表的是一个队列,是用来存储消息和发送消息的。
c(consumer):代表的是消费者,用来消费消息的。
应用场景:    
    手机短信,邮件单发
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生产者的代码:

package com.aaa.myf;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Product {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。
         * boolean exclusive, 是否独占 false
         * boolean autoDelete, 是否自动删除  如果长时间没有发生消息 则自动删除
         * Map<String, Object> arguments 额外参数  先给null
         *//*"qy129_shuaifei",true,false,false,null*/

            channel.queueDeclare("qy129_shuaifei",true,false,false,null);
            String msg="毛云飞不是最帅的么!!!!!!!";
         /**
             * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认
             * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称
             * BasicProperties props,消息的一些额外配置,目前先不加null
             * byte[] body 消息的内容
             */
            channel.basicPublish("","qy129_shuaifei",null,msg.getBytes());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

消费者代码

package com.aaa.myf;
import com.rabbitmq.client.*;

import java.io.IOException;
public class Consumer {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //body接受的消息
                System.out.println("消息的内容:"+new String(body));

            }
        };
       channel.basicConsume("qy129_shuaifei",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2.工作者模式

在这里插入图片描述

特点:
一个生产者
一个中间队列
多个消费者
是一对多的关系
消费者之间的关系是竞争关系
用处及应用场景:
   比如批量处理上,rabbitmq里面挤压了大量的信息
   抢红包
   资源分配系统
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

生产者:

package com.aaa.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 工作者模式:
 * 特点:一个生产者
 * 多个消费者
 * 统一一个队列
 * 这些消费者之间存在竞争关系
 */
public class Product {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。
         * boolean exclusive, 是否独占 false
         * boolean autoDelete, 是否自动删除  如果长时间没有发生消息 则自动删除
         * Map<String, Object> arguments 额外参数  先给null
         *//*"qy129_shuaifei",true,false,false,null*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        channel.queueDeclare("qy129_work",true,false,false,null);
        for (int i = 0; i < 10; i++) {

            String msg="今天天气不错,加油!!!!!!!";
            /**
             * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认
             * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称
             * BasicProperties props,消息的一些额外配置,目前先不加null
             * byte[] body 消息的内容
             */
            channel.basicPublish("","qy129_work",null,msg.getBytes());
        }
        //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态
        channel.close();
        connection.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

消费者1号:

package com.aaa.work;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                try {
                    //加上休眠时间是为了区分两个消费者的区别
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //body接受的消息
                System.out.println("消息者01:"+new String(body));

            }
        };
       channel.basicConsume("qy129_work",true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

消费者2号

package com.aaa.work;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer02 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //body接受的消息
                System.out.println("消息者02:"+new String(body));

            }
        };
       channel.basicConsume("qy129_work",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

3.发布订阅模式

在这里插入图片描述

特点:
一个生产者
交换机转发消息
多个个队列
多个消费者
应用场景:
	消息推送,广告
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

生产者:

package com.aaa.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 订阅者模式:
 * 一个生产者
 * 多个消费者
 * 多个订阅
 * 交换机转发消息
 */
public class Product {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。
         * boolean exclusive, 是否独占 false
         * boolean autoDelete, 是否自动删除  如果长时间没有发生消息 则自动删除
         * Map<String, Object> arguments 额外参数  先给null
         *//*"qy129_shuaifei",true,false,false,null*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        channel.queueDeclare("qy129_fanout01",true,false,false,null);
        channel.queueDeclare("qy129_fanout02",true,false,false,null);
        /**
         * 创建交换机
         * String exchange,交换机的名称
         * BuiltinExchangeType type,交换机的类型
         * boolean durable是否持久化
         */
        channel.exchangeDeclare("qy129_angermao", BuiltinExchangeType.FANOUT,true);
        /**
         * String queue,队列名
         * String exchange,交换机的名称
         * String routingKey;路由key,如果交换机为fanout模式则不需要路由key
         */
        channel.queueBind("qy129_fanout01","qy129_angermao","");
        channel.queueBind("qy129_fanout02","qy129_angermao","");
        for (int i = 0; i < 10; i++) {
            String msg="今天天气不错,加油!!!!!!!"+i;
            /**
             * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认
             * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称
             * BasicProperties props,消息的一些额外配置,目前先不加null
             * byte[] body 消息的内容
             */
            channel.basicPublish("qy129_angermao","",null,msg.getBytes());
        }
        //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态
        channel.close();
        connection.close();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

消费者01:

package com.aaa.fanout;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                try {
                    //加上休眠时间是为了区分两个消费者的区别
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //body接受的消息
                System.out.println("消息者01:"+new String(body));

            }
        };
       channel.basicConsume("qy129_fanout01",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

消费者02:

package com.aaa.fanout;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer02 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //body接受的消息
                System.out.println("消息者02:"+new String(body));

            }
        };
       channel.basicConsume("qy129_fanout02",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

4.路由模式

在这里插入图片描述

特点:
一个生产者
交换机 转发消息
有多个队列
routekey:路由key routekey匹配的消息可以达到对应的序列
应用场景:
   短信,聊天工具,邮箱。。
   手机号/邮箱地址,都可以是路由key
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

生产者:

package com.aaa.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 路由模式:
 * 一个生产者
 * 多个消费者
 * 多个队列
 * 交换机转发消息
 * routekey:路由key只要routekey匹配的消息可以
 */
public class Product {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。
         * boolean exclusive, 是否独占 false
         * boolean autoDelete, 是否自动删除  如果长时间没有发生消息 则自动删除
         * Map<String, Object> arguments 额外参数  先给null
         *//*"qy129_shuaifei",true,false,false,null*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        channel.queueDeclare("qy129_direct01",true,false,false,null);
        channel.queueDeclare("qy129_direct02",true,false,false,null);
        /**
         * 创建交换机
         * String exchange,交换机的名称
         * BuiltinExchangeType type,交换机的类型
         * boolean durable是否持久化
         */
        channel.exchangeDeclare("qy129_direct", BuiltinExchangeType.FANOUT,true);
        /**
         * String queue,队列名
         * String exchange,交换机的名称
         * String routingKey;路由key,如果交换机为fanout模式则不需要路由key
         */
        channel.queueBind("qy129_direct01","qy129_direct","error");

        channel.queueBind("qy129_direct02","qy129_direct","info");
        channel.queueBind("qy129_direct02","qy129_direct","error");
        channel.queueBind("qy129_direct02","qy129_direct","warning");
        for (int i = 0; i < 10; i++) {
            String msg="今天天气不错,加油!!!!!!!"+i;
            /**
             * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认
             * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称
             * BasicProperties props,消息的一些额外配置,目前先不加null
             * byte[] body 消息的内容
             */
            channel.basicPublish("qy129_direct","error",null,msg.getBytes());
        }
        //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态
        channel.close();
        connection.close();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

消费者01:

package com.aaa.direct;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                try {
                    //加上休眠时间是为了区分两个消费者的区别
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //body接受的消息
                System.out.println("消息者01:"+new String(body));

            }
        };
       channel.basicConsume("qy129_direct01",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

消费者02:

package com.aaa.direct;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer02 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //body接受的消息
                System.out.println("消息者02:"+new String(body));

            }
        };
       channel.basicConsume("qy129_direct02",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

5.topic主体模式

在这里插入图片描述

绑定按照统配符的模式:
 *:匹配一个单词
 #:统配n个字符
应用场景:
	做物流分拣的多级传递.
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生产者:

package com.aaa.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * topic主体模式:
 * 一个生产者
 * 多个消费者
 * 绑定按照通配符的模式
 * *:统配一个单词
 * #:统配n个单词
 */
public class Product {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 是否该队列持久化 rabbitMQ服务重启后该存放是否存在。
         * boolean exclusive, 是否独占 false
         * boolean autoDelete, 是否自动删除  如果长时间没有发生消息 则自动删除
         * Map<String, Object> arguments 额外参数  先给null
         *//*"qy129_shuaifei",true,false,false,null*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        /*channel.queueDeclare("qy129_shuaifei",true,false,false,null);*/
        channel.queueDeclare("qy129_topic01",true,false,false,null);
        channel.queueDeclare("qy129_topic02",true,false,false,null);
        /**
         * 创建交换机
         * String exchange,交换机的名称
         * BuiltinExchangeType type,交换机的类型
         * boolean durable是否持久化
         */
        channel.exchangeDeclare("qy129_exchange_topic", BuiltinExchangeType.TOPIC,true);
        /**
         * String queue,队列名
         * String exchange,交换机的名称
         * String routingKey;路由key,如果交换机为fanout模式则不需要路由key
         */
        channel.queueBind("qy129_topic01","qy129_exchange_topic","*.orange.*");

        channel.queueBind("qy129_topic02","qy129_exchange_topic","*.*.rabbit");
        channel.queueBind("qy129_topic02","qy129_exchange_topic","lazy.#");


        for (int i = 0; i < 10; i++) {
            String msg="今天很生气,啥也不想说,明天加油,依旧光芒万丈!!!!!!!"+i;
            /**
             * String exchange,交换机的名称,如果没有则使用"",它会自动采用默认
             * String routingKey,路由key,如果没有交换机的绑定,使用队列的名称
             * BasicProperties props,消息的一些额外配置,目前先不加null
             * byte[] body 消息的内容
             */
            channel.basicPublish("qy129_exchange_topic","lazy.orange.rabbit",null,msg.getBytes());
        }
        //生产者这里可以管理资源,消费者不能关闭资源,他一直处于一个监听的状态
        channel.close();
        connection.close();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

消费者01:

package com.aaa.topic;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                try {
                    //加上休眠时间是为了区分两个消费者的区别
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //body接受的消息
                System.out.println("消息者01:"+new String(body));

            }
        };
       channel.basicConsume("qy129_topic01",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

消费者02:

package com.aaa.topic;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer02 {
    public static void main(String[] args) throws Exception{
        //创建连接对象-配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.49.197");
        //创建连接对象Connection
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //接受信息
        /**
         * (String queue, 队列的名称
         *  boolean autoAck, 是否自动确认
         *  Consumer callback: 回调方法 当队列中存在信息后 会自动触发回调函数。
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //body接受的消息
                System.out.println("消息者02:"+new String(body));

            }
        };
       channel.basicConsume("qy129_topic02",true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/594678
推荐阅读
相关标签
  

闽ICP备14008679号