当前位置:   article > 正文

RabbitMQ的四种工作模式_rabbithandler相当于defaultconsuner

rabbithandler相当于defaultconsuner

1. RabbitMQ的工作模式

1.1 简单模式

一个队列只有一个消费者
在这里插入图片描述

1.2 工厂模式

多个消费者监听同一个队列
在这里插入图片描述

1.3 订阅模式

一个交换机绑定多个消息队列,每个消息队列有一个消费者监听
在这里插入图片描述

1.4 路由模式

一个交换机绑定多个消息队列,每个消息队列都由自己唯一的key,每个消息队列有一个消费者监听

在这里插入图片描述

2. Spring整合RabbitMQ

创建RabbitMQ队列结构:在host1中共创建6个队列,两个交换机,其中交换机ex1连接queue3 和queue4,交换机ex2连接queue5和queue6

在这里插入图片描述

2.1 简单模式

在这里插入图片描述
创建MQ连接工具类ConnectionUtil

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException, IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
        factory.setHost("192.168.157.130");
        factory.setPort(5672);
        factory.setVirtualHost("host1");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3.通过工厂对象获取与MQ的链接
        Connection connection = factory.newConnection();
        return connection;
    }

    //测试是否连接成功
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println(getConnection());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.1.1 消息生产者

public class SendMsg {

    public static void main(String[] args) throws Exception{

        String msg = "Hello HuangDaoJun!";
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        //定义队列(使用Java代码在MQ中新建一个队列)
        //参数1:定义的队列名称
        //参数2:队列中的数据是否持久化(如果选择了持久化)
        //参数3: 是否排外(当前队列是否为当前连接私有)
        //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
        //参数5:设置当前队列的参数
        //channel.queueDeclare("queue7",false,false,false,null);

        //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""(简单模式和工作模式不需要交换机)
        //参数2:目标队列名称
        //参数3:设置当前这条消息的属性(设置过期时间 10)
        //参数4:消息的内容
        channel.basicPublish("","queue1",null,msg.getBytes());
        System.out.println("发送:" + msg);

        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

2.1.2 消息消费者

public class ReceiveMsg {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("接收:"+msg);
            }
        };
        channel.basicConsume("queue1",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.2 工作模式

2.2.1 消息生产者

public class SendMsg {
    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
			//生产消息,将消息发送到队列
            channel.basicPublish("","queue2",null,msg.getBytes());
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.2.2 消息消费者1

public class ReceiveMsg {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
            }
        };
        channel.basicConsume("queue2",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.2.3 消息消费者2

public class ReceiveMsg {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };
        //消费消息
        channel.basicConsume("queue2",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

最终结果:消息生产者生产消息,消息消费者消费消息,但是具体是哪个消息消费者消费是随机挑选的,也就是说消息生产者生产的消息只能有一个消费者消费,具体是哪个是随机的,不会出现两个消费者消费同一个消息的情况

2.3 订阅模式

2.3.1 消息生产者

public class SendMsg {
    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            
            //消息发送到交换机,而不是队列,因此需要设置交换机名称,将队列名称置为空
            channel.basicPublish("ex1","",null,msg.getBytes());
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2.3.2 消息消费者1

public class ReceiveMsg {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
            }
        };
        //消息消费者1只监听队列queue3
        channel.basicConsume("queue3",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.3.3 消息消费者2

public class ReceiveMsg {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };
        //消息消费者2只监听队列queue4
        channel.basicConsume("queue4",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

结果:消息生产者生产的消息,各个消息消费者都能够接收到。

2.4 路由模式

2.4.1 消息生产者

public class SendMsg {
    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            //指定交换机ex2,后面的a,b代表key
            if(msg.startsWith("a")){
                channel.basicPublish("ex2","a",null,msg.getBytes());
            }else if(msg.startsWith("b")){
                channel.basicPublish("ex2","b",null,msg.getBytes());
            }
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.4.2 消息消费者1

public class ReceiveMsg {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
            }
        };
        channel.basicConsume("queue5",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.4.3 消息消费者2

public class ReceiveMsg {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };
        channel.basicConsume("queue6",true,consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

结果:消息消费者1监听queue5,并且接收key=a的消息,消息消费者2监听queue6,并且接收key=b的消息。

3. SpringBoot整合RabbitMQ

配置文件:

server:
  port: 9001
spring:
  application:
    name: producer
  rabbitmq:
    host: 192.168.157.130
    port: 5672
    virtual-host: host1
    username: admin
    password: admin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.1 消息生产者

@Service
public class TestService {
    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String msg){

        //1. 发送消息到队列
        amqpTemplate.convertAndSend("queue1",msg);

        //2. 发送消息到交换机(订阅交换机)
        amqpTemplate.convertAndSend("ex1","",msg);

        //3. 发送消息到交换机(路由交换机)
        amqpTemplate.convertAndSend("ex2","a",msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
@RestController
public class TestController {

    @Autowired
    private TestService testService;

    @RequestMapping("test")
    public String test(){
        return "success";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.2 消息消费者

@Service
//@RabbitListener(queues = {"queue1","queue2"})
@RabbitListener(queues = "queue1")
public class ReceiveMsgService {
    @RabbitHandler
    public void receiveMsg(String msg){
        System.out.println("接收MSG:"+msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/594766
推荐阅读
相关标签
  

闽ICP备14008679号