赞
踩
这篇文章来源于稀土掘金,来源:https://juejin.cn/post/7132268340541653005,主要用来收藏学习。
常见的消息队列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,相关的选型可以看我之前的系列,这篇文章只讲 RabbitMQ,先讲原理,后搞实战。
消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。
一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。
需要额外注意的是,如果消费者处理一个消息失败了,消息系统一般会把这个消息放回队列,这样其他消费者可以继续处理。
单个消息可以被多个订阅者并发的获取和处理。一般来说,订阅有两种类型:
对消息队列进行技术选型时,需要通过以下指标衡量你所选择的消息队列,是否可以满足你的需求:
RabbitMQ 2007 年发布,是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。
提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念:
AMQP 协议模型由三部分组成:生产者、消费者和服务端,执行流程如下:
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种:
我们先看几个基本概念:
集群中有两个节点,每个节点上有一个broker,每个broker负责本机上队列的维护,并且borker之间可以互相通信。集群中有两个队列A和B,每个队列都分为master queue和mirror queue(备份)。那么队列上的生产消费怎么实现的呢?
对于消费队列,如下图有两个consumer消费队列A,这两个consumer连在了集群的不同机器上。RabbitMQ集群中的任何一个节点都拥有集群上所有队列的元信息,所以连接到集群中的任何一个节点都可以,主要区别在于有的consumer连在master queue所在节点,有的连在非master queue节点上。
因为mirror queue要和master queue保持一致,故需要同步机制,正因为一致性的限制,导致所有的读写操作都必须都操作在master queue上(想想,为啥读也要从master queue中读?和数据库读写分离是不一样的),然后由master节点同步操作到mirror queue所在的节点。即使consumer连接到了非master queue节点,该consumer的操作也会被路由到master queue所在的节点上,这样才能进行消费。
对于生成队列,原理和消费一样,如果连接到非 master queue 节点,则路由过去。
所以,到这里小伙伴们就可以看到 RabbitMQ的不足:由于master queue单节点,导致性能瓶颈,吞吐量受限。虽然为了提高性能,内部使用了Erlang这个语言实现,但是终究摆脱不了架构设计上的致命缺陷。
Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,单位是毫秒,下面看看RabbitMQ过期时间特性:
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。
消费者订阅队列的时候,可以指定autoAck参数,当autoAck为true的时候,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。
当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依然存在。如果只设置队列持久化或者消息持久化,重启之后消息都会消失。
当然,也可以将所有的消息都设置为持久化,但是这样做会影响RabbitMQ的性能,因为磁盘的写入速度比内存的写入要慢得多。
对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得,关键在于选择和取舍。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。
当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器成为死信交换器,与该交换器绑定的队列称为死信队列。
消息变成死信有下面几种情况:
消息被拒绝。
消息过期
队列达到最大长度
DLX也是一个正常的交换器,和一般的交换器没有区别,他能在任何的队列上面被指定,实际上就是设置某个队列的属性。当这个队列中有死信的时候,RabbitMQ会自动将这个消息重新发送到设置的交换器上,进而被路由到另一个队列,我们可以监听这个队列中消息做相应的处理。
死信队列有什么用?当发生异常的时候,消息不能够被消费者正常消费,被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常,进而改善和优化系统。
一般的队列,消息一旦进入队列就会被消费者立即消费。延迟队列就是进入该队列的消息会被消费者延迟消费,延迟队列中存储的对象是的延迟消息,“延迟消息”是指当消息被发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。
延迟队列用于需要延迟工作的场景。最常见的使用场景:淘宝或者天猫我们都使用过,用户在下单之后通常有30分钟的时间进行支付,如果这30分钟之内没有支付成功,那么订单就会自动取消。
除了延迟消费,延迟队列的典型应用场景还有延迟重试。比如消费者从队列里面消费消息失败了,可以延迟一段时间以后进行重试。
这里才是内容的重点,不仅需要知道Rabbit的特性,还需要知道支持这些特性的原因:
因为我用的是Mac,所以直接可以参考官网:
https://www.rabbitmq.com/install-homebrew.html
执行:
brew update && brew install rabbitmq
之前没有执行brew update,直接执行brew install rabbitmq时,会报各种各样奇怪的错误,其中“403 Forbidde”居多。
但是在执行“brew install rabbitmq”,会自动安装其它的程序,如果你使用源码安装Rabbitmq,因为启动该服务依赖erlang环境,所以你还需手动安装erlang,但是目前官方已经一键给你搞定,会自动安装Rabbitmq依赖的所有程序,是不是很棒!
执行成功输出:
启动服务:
# 启动方式1:后台启动
brew services start rabbitmq
# 启动方式2:当前窗口启动
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在浏览器输入:
http://localhost:15672/
会出现RabbitMQ后台管理界面(用户名和密码都为guest):
通过brew安装,一行命令搞定,真香!
首先得启动mq
## 添加账号
rabbitmqctl add_user admin admin
## 添加访问权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 设置超级权限
rabbitmqctl set_user_tags admin administrator
因为代码中引入了java 8的特性,pom引入依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins>
测试代码
public class RabbitMqTest { //消息队列名称 private final static String QUEUE_NAME = "hello"; @Test public void send() throws java.io.IOException, TimeoutException { //创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); //生成一个消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { String message = "Hello World RabbitMQ count: " + i; //发布消息,第一个参数表示路由(Exchange名称),为""则表示使用默认消息路由 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //关闭消息通道和连接 channel.close(); connection.close(); } @Test public void consumer() throws java.io.IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); //创建连接 Connection connection = factory.newConnection(); //创建消息信道 final Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("[*] Waiting for message. To exist press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }
执行send()后控制台输出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'
执行consumer()后:
封装工厂类:
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//创建连接工程,下面给出的是默认的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
封装生成者:
public class MsgProducer { public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息为可持久化,不自动删除 channel.exchangeDeclare(exchange, exchangeType, true, false, null); // 发布消息 channel.basicPublish(exchange, toutingKey, null, message.getBytes()); System.out.println("Sent '" + message + "'"); channel.close(); connection.close(); } }
生产者:
public class DirectProducer { private static final String EXCHANGE_NAME = "direct.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectProducer directProducer = new DirectProducer(); String[] routingKey = new String[]{"aaa", "bbb", "ccc"}; String msg = "hello >>> "; for (int i = 0; i < 10; i++) { directProducer.publishMsg(routingKey[i % 3], msg + i); } System.out.println("----over-------"); Thread.sleep(1000 * 60 * 100); } }
执行生产者,往消息队列中放入10条消息,其中key分别为“aaa”、“bbb”和“ccc”,分别放入qa、qb、qc三个队列:
下面是qa队列的信息:
消费者:
public class DirectConsumer { private static final String exchangeName = "direct.exchange"; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer.consumerMsg(exchangeName, queueName, routingKey); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{"aaa", "bbb", "ccc"}; String[] queueNames = new String[]{"qa", "qb", "qc"}; for (int i = 0; i < 3; i++) { consumer.msgConsumer(queueNames[i], routingKey[i]); } Thread.sleep(1000 * 60 * 100); } }
执行后输出
[*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 0 [x] Done [x] Received 'hello >>> 3 [x] Done [x] Received 'hello >>> 6 [x] Done [x] Received 'hello >>> 9 [x] Done [*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 1 [x] Done [x] Received 'hello >>> 4 [x] Done [x] Received 'hello >>> 7 [x] Done [*] Waiting for message. To exist press CTRL+C [x] Received 'hello >>> 2 [x] Done [x] Received 'hello >>> 5 [x] Done [x] Received 'hello >>> 8 [x] Done
可以看到,分别从qa、qb、qc中将不同的key的数据消费掉。
有个疑问:这个队列的名称qa、qb和qc是RabbitMQ自动生成的么,我们可以指定队列名称么?
我做了个简单的实验,我把消费者代码修改了一下:
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 将qc修改为qc1
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
执行后如下图所示:
我们可以发现,多了一个qc1,所以可以判断这个界面中的queues,是消费者执行时,会将消费者指定的队列名称和direct.exchange绑定,绑定的依据就是key。
当我们把队列中的数据全部消费掉,然后重新执行生成者后,会发现qc和qc1中都有3条待消费的数据,因为绑定的key都是“ccc”,所以两者的数据是一样的:
绑定关系如下:
注意:当没有Queue绑定到Exchange时,往Exchange中写入的消息也不会重新分发到之后绑定的queue上。
思考:不执行消费者,看不到这个Queres中信息,我其实可以把这个界面理解为消费者信息界面。不过感觉还是怪怪的,这个queues如果是消费者信息,就不应该叫queues,我理解queues应该是RabbitMQ中实际存放数据的queues,难道是我理解错了?
生产者封装:
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void publishMsg(String routingKey, String msg) { try { MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = "hello >>> "; for (int i = 0; i < 10; i++) { directProducer.publishMsg("", msg + i); } } }
消费者:
public class FanoutConsumer { private static final String EXCHANGE_NAME = "fanout.exchange"; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutConsumer consumer = new FanoutConsumer(); String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"}; for (int i = 0; i < 3; i++) { consumer.msgConsumer(queueNames[i], ""); } } }
执行生成者,结果如下:
我们发现,生产者生产的10条数据,在每个消费者中都可以消费,这个是和Direct不同的地方,但是使用Fanout方式时,有几个点需要注意一下:
这幅图就画出了Fanout的精髓之处,exchange会和所有的queue进行绑定,不区分路由,消费者需要绑定指定的queue才能发起消费。
注意:往队列塞数据时,可能通过界面看不到消息个数的增加,可能是你之前已经开启了消费进程,导致增加的消息马上被消费了。
上面我们是指定了队列,这个方式其实很不友好,比如对于Fanout,我其实根本无需关心队列的名字,如果还指定对应队列进行消费,感觉这个很冗余,所以我们这里就采用随机获取队列名字的方式,下面代码直接Copy官网。
生成者封装:
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); //创建连接 Connection connection = factory.newConnection(); //创建消息通道 Channel channel = connection.createChannel(); // 声明exchange中的消息 channel.exchangeDeclare(exchange, exchangeType); // 发布消息 channel.basicPublish(exchange, "", null, message.getBytes("UTF-8")); System.out.println("Sent '" + message + "'"); channel.close(); connection.close(); }
消费者封装
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil.getConnectionFactory(); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchange, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }
生产者:
public class FanoutProducer { private static final String EXCHANGE_NAME = "fanout.exchange.v2"; public void publishMsg(String msg) { try { MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = "hello >>> "; for (int i = 0; i < 10000; i++) { directProducer.publishMsg(msg + i); } } }
消费者:
public class FanoutConsumer { private static final String EXCHANGE_NAME = "fanout.exchange.v2"; public void msgConsumer() { try { MsgConsumer.consumerMsgV2(EXCHANGE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) { FanoutConsumer consumer = new FanoutConsumer(); for (int i = 0; i < 3; i++) { consumer.msgConsumer(); } } }
执行后,管理界面如下:
代码详见官网:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,请查看官网:https://www.rabbitmq.com/getstarted.html
在定义Queue时,可以指定这两个参数:
/** * Declare an exchange. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @param autoDelete true if the server should delete the exchange when it is no longer in use * @param arguments other properties (construction arguments) for the exchange * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
持久化,保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
若是将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,会重新读取之前被持久化的queue。
虽然队列可以被持久化,但是里面的消息是否为持久化,还要看消息的持久化设置。即重启queue,但是queue里面还没有发出去的消息,那队列里面还存在该消息么?这个取决于该消息的设置。
自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
当一个Queue被设置为自动删除时,当消费者断掉之后,queue会被删除,这个主要针对的是一些不是特别重要的数据,不希望出现消息积累的情况。
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
因此手动ACK的常见手段:
// 接收消息之后,主动ack/nak Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); try { System.out.println(" [ " + queue + " ] Received '" + message); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(), false, true); } } }; // 取消自动ack channel.basicConsume(queue, false, consumer);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。