赞
踩
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
生产者、消费者没有定义exchange,使用同一个QUEUE_NAME
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
配置文件
spring:
rabbitmq:
host: 192.168.99.99
username: leyou
password: leyou
virtual-host: /leyou
获取链接
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { /** * 建立与RabbitMQ的连接 * @return * @throws Exception */ public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("192.168.99.99"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/leyou"); factory.setUsername("leyou"); factory.setPassword("leyou"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } }
生产者
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生产者 */ public class Send { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道,使用通道才能完成消息相关的操作 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello World!"; for (int i = 0; i < 10; i++) { // 向指定的队列中发送消息 message=message+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //关闭通道和连接 channel.close(); connection.close(); } }
消费者
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消费者 */ public class Recv { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); } }; // 监听队列,第二个参数:是否自动进行消息确认。 channel.basicConsume(QUEUE_NAME, true, consumer); } }
上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
手动ACK
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; /** * 消费者,手动进行ACK */ public class Recv2 { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 创建通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 //int i=1/0; String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手动进行ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列,第二个参数false,手动进行ACK // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明: channel.basicConsume(QUEUE_NAME, false, consumer); } }
工作队列或者竞争消费者模式
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者,领取任务并且完成任务,假设完成速度较快
C2:消费者2:领取任务并完成任务,假设完成速度慢
我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
在消费者上加入:
channel.basicQos(1);
生产者消费者都不需要指定exchange、QUEUE_NAME 使用同一个,消费者1,消费者2的QUEUE_NAME 都相同
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; // 生产者 public class Send { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循环发布任务 for (int i = 0; i < 50; i++) { // 消息内容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 关闭通道和连接 channel.close(); connection.close(); } }
消费者1和消费者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; // 消费者1 public class Recv { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 设置每个消费者同时只能处理一条消息 //channel.basicQos(1); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); try { // 模拟完成任务的耗时:1000ms Thread.sleep(1000); } catch (InterruptedException e) { } // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }
创建一个新的消费者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; //消费者2 public class Recv2 { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 设置每个消费者同时只能处理一条消息 //channel.basicQos(1); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }
面试题:避免消息堆积?
1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。
在广播模式下,消息发送流程是这样的:
生产者
1) 声明Exchange,不再声明Queue
2) 发送消息到Exchange,不再发送到Queue
import cn.itcast.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class Send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true); // 消息内容 String message = "Hello everyone"; // 发布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); channel.close(); connection.close(); } }
消费者1和消费者2
import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; //消费者1 public class Recv { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } } import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.itcast.rabbitmq.util.ConnectionUtil; // 消费者2 public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); } }; // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
在广播模式中,生产者发布消息,所有消费者都可以获取所有消息。
在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者
此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
与前面基本相似,在生产者声明交换机时指定类型为:“direct”
channel.exchangeDeclare(EXCHANGE_NAME, “direct”);
发送消息时,指定routing key
channel.basicPublish(EXCHANGE_NAME, “insert”, null, message.getBytes());
消费者绑定交换机时指定routingkey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “insert”);
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
如:
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
使用与Direct模式基本一样,只是生产者在声明交换机的时候指定类型为:‘topic’
channel.exchangeDeclare(EXCHANGE_NAME, “topic”);
转载自
https://promsing.blog.csdn.net/article/details/123254601?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-1-123254601-blog-119463900.pc_relevant_multi_platform_whitelistv3&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-1-123254601-blog-119463900.pc_relevant_multi_platform_whitelistv3&utm_relevant_index=1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。