当前位置:   article > 正文

4_Java 实现RabbitMQ_java rabbitmq

java rabbitmq

环境搭建

1、新建 java web 项目

在这里插入图片描述

2、改pom
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

同时去掉测试依赖的<scope>test</scope>,不然一会不能进行单元测试


第一种模型(直连)

在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

前提:

  • 创建好虚拟主机
  • 创建好用户
  • 虚拟主机与用户进行绑定

1. 开发生产者

public class Producer {
    @Test
    public void pro() throws IOException, TimeoutException {

        //创建连接工厂
        //创建连接mq的连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接rabbitmq主机
        factory.setHost("192.168.77.138");
        //设置端口号
        factory.setPort(5672);
        //设置访问虚拟主机的用户名和密码
        factory.setUsername("ems");
        factory.setPassword("123");
        //设置连接那个虚拟主机
        factory.setVirtualHost("/ems");

        //获取连接对象
        Connection connection = factory.newConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello", true, false, false, null);

        //发布消息
        //参数1: 交换机名称 参数2:队列名称  参数3:传递息额外设置  参数4:消息的具体内容
        channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitmq".getBytes());

        //关闭连接
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

生产消息到队列:

在这里插入图片描述


2. 开发消费者

消费者端不能使用单元测试,需要一直监听

同时不关闭连接消息通道与连接

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.77.138");
        factory.setPort(5672);
        factory.setVirtualHost("/ems");
        factory.setUsername("ems");
        factory.setPassword("123");
        Connection connection = factory.newConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //通道绑定队列:与生产端一致
        channel.queueDeclare("hello", true, false, false, null);

        //获取消息
        //参数1: 消费那个队列的消息 队列名称
        //参数2: 开始消息的自动确认机制[只要消费就从队列删除消息]
        //参数3: 消费时的回调接口
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("取出消息:===>" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

3. API参数的说明

注意:生产方队列的声明与消费方队列的声明要一致

channel.queueDeclare("hello",true,false,false,null);

# '参数1': 用来声明通道对应的队列 [ 如果不存在队列,会自动创建]
# '参数2': 用来指定是否持久化队列 [D: 指定后 rabbitmq 重启后不会消失 ]
# '参数3': 用来指定是否独占队列 [ 只允许当特定的连接]
# '参数4': 用来指定是否自动删除队列 [A:消息方消费完成后,队列自动删除]
# '参数5': 对队列的额外配置
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

第二种模型(work queue)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

在这里插入图片描述

角色:

  • P:生产者:任务的发布者
  • C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者-2:领取任务并完成任务,假设完成速度快

1. 开发生产者

public class Producer {
    @Test
    public void pro() throws IOException {

        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //绑定队列
        channel.queueDeclare("work", true, false, false, null);

        //发送消息
        for (int i = 1; i <= 20; i++) {
            channel.basicPublish("", "work", null, (i + "号消息").getBytes());
        }
        //关闭连接
        RabbitMQUtils.closeConnection(channel, connection);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.开发消费者-1

public class Consumer1 {
    public static void main(String[] args) throws IOException {

        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        final Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare("work", true, false, false, null);

        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //处理消息比较慢 2秒处理一个消息
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号:==>消费" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3.开发消费者-2

public class Consumer2 {
    public static void main(String[] args) throws IOException {

        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        final Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare("work", true, false, false, null);

        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2号:==>消费" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4.测试结果

在这里插入图片描述
在这里插入图片描述

总结: 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。


5.消息自动确认机制

如果想要实现消费快的多消费,消费慢的少消费,就需要对rabbitmq的消息确认机制进行配置

同时避免消息丢失

在这里插入图片描述

两点:

  • 设置通道一次只能消费一个消息

  • 关闭消息的自动确认,开启手动确认消息

消费方:

//步骤一:一次只接受一条未确认的消息
channel.basicQos(1);

//步骤二:[参数2:关闭自动确认消费]
channel.basicConsume("work", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者2号:==>消费" + new String(body));
        //消费完成后,手动确认消息 [ 参数1:确认标识,参数2:是否一次确认多条消息]
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注意:消费方都需要进行手动确认,否则队列中不会确认删除

生产方:

//一次只发送一条消息
channel.basicQos(1);
  • 1
  • 2

测试:

消费者1号消费一条消息,2s后确认消息,消费者2号消费一条消息,手动确认,消息从队列删除,继续从队列取得一条进行消费,直到队列不存在消息

在这里插入图片描述
在这里插入图片描述


第三种模型(fanout)

中文翻译:fanout 扇出 ,也称为广播

在这里插入图片描述

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

1. 开发生产者

广播 :一条消息多个消费者同时消费

public class Producer {
    @Test
    public void pro() throws IOException {

        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机 [ 参数一:交换机名字,参数2:交换机类型:fanout(广播模式) 固定]
        channel.exchangeDeclare("register", "fanout");

        //发布消息[参数1:交换机名字,参数2:路由,参数3:消息持久化,参数4:消息内容]
        channel.basicPublish("register", "", null, "fanout...".getBytes());
        //关闭
        RabbitMQUtils.closeConnection(channel, connection);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2. 开发消费者-1

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("register", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //[绑定]=>临时队列和交换机 [参数1:临时队列,参数2:交换机,参数3:路由]
        channel.queueBind(queue, "register", "");

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1号:==>" + new String(body));
            }
        });

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3. 开发消费者-2

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("register", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定临时队列和交换机 [参数1:临时队列,参数2:交换机,参数3:路由]
        channel.queueBind(queue, "register", "");

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2号:==>" + new String(body));
            }
        });

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

4.开发消费者-3

public class Consumer3 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("register", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定临时队列和交换机 [参数1:临时队列,参数2:交换机,参数3:路由]
        channel.queueBind(queue, "register", "");

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者3号:==>" + new String(body));
            }
        });

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

5. 测试结果

启动消费者,再启动生产者往交换机发送消息,三个消费者同时接收到消息

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


第四种模型(Routing)

Routing 之订阅模型-Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

在这里插入图片描述

在Direct模型中,流程如下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的Routingkey与消息的 Routing key 完全一致,才会接收到消息

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

1. 开发生产者

public class Producer {
    @Test
    public void pro() throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机[参数1:交换机名字, 参数2:交换机类型,direct路由模式]=>基于指令的 Routing key 转发
        channel.exchangeDeclare("logs_direct", "direct");

        //发布的路由名称==>根据路由key的不同发送到不同的绑定队列中
        String key = "error";

        //发布消息[参数1:交换机名字,参数2:路由名字,参数3:消息内容]
        channel.basicPublish("logs_direct", key, null, ("发送给指定路由" + key + "的消息").getBytes());

        //关闭连接
        RabbitMQUtils.closeConnection(channel, connection);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.开发消费者-1

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机[参数1:交换机名字, 参数2:交换机类型,direct路由模式]
        channel.exchangeDeclare("logs_direct", "direct");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定临时队列与交换机并设置指定路由名称
        channel.queueBind(queue, "logs_direct", "info");
        channel.queueBind(queue, "logs_direct", "error");
        channel.queueBind(queue, "logs_direct", "warn");

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1号:==>" + new String(body));
            }
        });

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3.开发消费者-2

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机[参数1:交换机名字, 参数2:交换机类型,direct路由模式]
        channel.exchangeDeclare("logs_direct", "direct");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定临时队列与交换机并设置指定路由名称
        channel.queueBind(queue, "logs_direct", "error");

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2号:==>" + new String(body));
            }
        });

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

4.测试

测试生产者发送Route key为error的消息时

在这里插入图片描述
在这里插入图片描述


测试生产者发送Route key为info的消息时

在这里插入图片描述
在这里插入图片描述


第五种模型(Topic)

Routing 之订阅模型-Topic

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列

只不过Topic类型 Exchange 可以让队列在绑定Routing key 的时候使用通配符!

这种模型 Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

在这里插入图片描述

# 统配符

*(star) can substitute for exactly one word.    匹配不多不少恰好1个词
#(hash) can substitute for zero or more words.  匹配一个或多个词
 
# 如:
audit.#    匹配audit.irs.corporate或者 audit.irs 等
audit.*   只能匹配 audit.irs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.开发生产者

public class Producer {
    @Test
    public void pro() throws IOException {
        //创建连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机[交换机名字+交换机类型]
        channel.exchangeDeclare("topics", "topic");

        //发布消息==>使用动态路由(通配符方式)
        String key = "user.update"; //指定发布的路由key
        channel.basicPublish("topics", key, null, ("发送消息给指定的路由key" + key).getBytes());

        //关闭连接
        RabbitMQUtils.closeConnection(channel, connection);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2.开发消费者-1

Routing Key中使用*通配符方式

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        //创建连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("topics", "topic");

        //声明临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定临时队列与交换机并设置获取交换机中动态路由
        String key = "user.*";//使用通配符指定路由key
        channel.queueBind(queue, "topics", key);

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1号:===>" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3.开发消费者-2

Routing Key中使用*通配符方式

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        //创建连接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("topics", "topic");

        //声明临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定临时队列与交换机
        String key = "user.#";//使用通配符指定路由key
        channel.queueBind(queue, "topics", key);

        //消费消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2号:===>" + new String(body));
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

4.测试结果

生产者发送user.update时

两个消费者都可以消费消息

在这里插入图片描述
在这里插入图片描述

生产者发送user.update.to时

仅消费者2号可以消费消息

在这里插入图片描述
在这里插入图片描述


本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号