赞
踩
目录
编辑4.3、发布/订阅模式(Publish/Subscribe)
4.6、发布确认模式(Publisher Confirms)
从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
以12306为例,假设平时可能买票的人不多,所以订单系统的QPS( 每秒查询率 )也不是很高,每秒也就处理1000个请求,但是一到节假日、春运期间可能抢票的人就非常多,并发量远远大于平时,这个时候,订单系统明显扛不住了。怎么办呢,当然我们可以设计弹性伸缩的集群,进行机器扩容,保证高可用。但是我们依然可以采用MQ来解决这个问题。
MQ的吞吐能力还是还强大的,所以我们可以设计高可用的MQ,让所有的请求都到MQ,缓存起来。这样一来高峰期的流量和数据都将积压在MQ中,流量高峰就被削弱了(削峰),然后我们的订单系统就避免了高并发的请求,它可以慢慢的从MQ中拉取自己能力范围内的消息就行处理。这样一来,高峰期积压的消息也终将被消费完,可以叫做填谷。
1.2.2、应用解耦
产品经理提需求,好多人关注了我们12306微信客户端,我们需要买票成功后在通知微信小程序。那么我们又需要修改订单系统的代码。一次还好,如果隔一段时间发生一件这样的事,那谁能忍受?
某一天,短信系统挂了,然后客户成功买到一张票,然后呢是短信也没收到,邮件也没收到,库存也没扣,这还得了。你短信系统坏了,我邮件系统好好的,凭什么影响我,让客户收不到邮件,这就不合理。 所以呢,还是各个系统之间的耦合太高了,我们应该解耦。不是有人说互联网的任何问题都可以通过一个中间件来解决吗,那么我们看MQ如何帮我们解决这件棘手的事情。
那么我们发现其实短信系统、邮件系统等都只依赖订单系统产生的一条数据那就是订单,因此我们在订单系统产生数据后,将订单这条数据发送给MQ,就返回成功,然后让短信、邮件等系统都订阅MQ,一旦发现MQ有消息,他们主动拉取消息,然后解析,进行业务处理。这样一来,就算你短信系统挂了,丝毫不会影响其他系统,而且如果后来想加一个新的系统,你也不用改订单系统的代码了,你只要订阅我们的MQ提供的消息就行了
还以上面12306为例,假设我们不用MQ,那么我们的代码必然耦合在一起,下单成功后,依次要通过RPC远程调用这几个系统,然后同步等到他们的响应才能返回给用户是否成功的结果。假设每个系统耗时200ms,那么就得花费600ms
但是其实有时候我们发现,下单是个核心业务,可能压力本来就大,客户也着急知道下单是否成功,但是短信邮件等通知,可能大多数人根本不急或者压根不关心,那么我们为什么要让这些细枝末节的业务影响核心业务的效率呢,是不是有点舍本逐末。所以这个逻辑我们可以设计成异步的。我们可以当下单成功后,只需要将订单消息发给MQ,然后立即将结果返回通知客户。这才是正确的打开姿势。这样一来,我订单系统只需要告诉你MQ,我下单成功了,其他模块收到消息后,该发短信的发短信,发邮件的发邮件。因为本来MQ的性能就很好,所以这个效率一下就提升了。
Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。
优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性,较低的概率丢失数据。
缺点:维护越来越少,高吞吐量场景较少使用。
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ,本章就以RabbitMQ为例。
RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:
Messaging that just works — RabbitMQhttp://www.rabbitmq.com/
RabbitMQ的架构图
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP是协议,类比HTTP。
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JSM是API接口规范,类比JDBC。
RabbitMQ提供了7种模式:
1、简单模式
2、work-queue工作队列模式
3、Publish/Subscribe发布与订阅模式
4、Routing路由模式
5、Topics主题模式
6、RPC远程调用模式(远程调用,不太算MQ,不作介绍)
7、Publisher Confirms发布确认
官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ
官方windows安装文档,Installing on Windows — RabbitMQ
安装注意事项: 1、 推荐使用默认的安装路径 2、 系统用户名必须是英文
百度网盘资源
链接:https://pan.baidu.com/s/1C87Piy7co6BWf9v8N-4WrQ
提取码:c17d
RabbitMQ是由erlang语言开发,所以我在安装 RabbitMQ 一定要先安装Erlang环境,注意版本匹配
右击 otp_win64_25.0.exe 以管理员身份运行 进行安装
安装Erlang只需要下一步下一步即可
安装RabbitMQ只需要下一步下一步即可
右击 rabbitmq-server-3.10.10.exe以管理员身份运行 进行安装
安装完后,cmd输入services.msc打开服务,开启RabbitMQ服务
通过windows快捷键直接找到sbin命令,如果没有直接找到安装目录下找到sbin目录,以管理员身份运行。
输入下面命令,启动管理页面。
rabbitmq-plugins.bat enable rabbitmq_management
RabbitMQ在安装好后,可以访问 http://localhost:15672 ,其自带了guest/guest 的用户名和密码。
如果以上操作都进行后,仍然访问不到页面,请重启电脑再次测试。
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
给用户添加虚拟主机权限
首先创建一个Maven项目
导入依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.14.2</version>
- </dependency>
只有一个生产者,一个消费者;生产者将消息发送到队列,消费者从队列中获取消息。
创建一个工具类,用来连接RabbitMQ
- public class ConnectionUtil {
- public static Connection getConnection() throws Exception {
- //创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- //主机地址;默认为 localhost
- connectionFactory.setHost("localhost");
- //连接端口;默认为 5672
- connectionFactory.setPort(5672);
- //虚拟主机名称;默认为 /
- connectionFactory.setVirtualHost("/yh");
- //连接用户名;默认为guest
- connectionFactory.setUsername("guest");
- //连接密码;默认为guest
- connectionFactory.setPassword("guest");
- //创建连接
- return connectionFactory.newConnection();
- }
- }
- public class Producer {
- static final String QUEUE_NAME="simple_queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
-
- //声明队列
- /**
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接。只能有一个消费者监听到这队列
- * 参数4:是否在不使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
-
- //发送信息
- String message="hello RabbitMQ";
- /**
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println("已发送信息:"+message);
-
- //关闭资源
- channel.close();
- connection.close();
- }
- }
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
-
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //路由的key
- System.out.println("路由key为:" + envelope.getRoutingKey());
- //交换机
- System.out.println("交换机:" + envelope.getExchange());
- //消息id
- System.out.println("消息id为:" + envelope.getDeliveryTag());
- //收到的消息
- System.out.println("接收到的消息:" + new String(body, "utf-8"));
- }
- };
-
- //监听消息
- channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
先启动消费者,再启动生产者
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
- public class Producer {
- static final String QUEUE_NAME="simple_queue";
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
-
- //声明队列
- /**
- * 参数1:队列名称
- * 参数2:是否定义持久化队列
- * 参数3:是否独占本次连接。只能有一个消费者监听到这队列
- * 参数4:是否在不
- * 使用的时候自动删除队列
- * 参数5:队列其它参数
- */
- channel.queueDeclare(QUEUE_NAME,true,false,false,null);
-
- //发送信息
- for (int i = 1; i <=10 ; i++) {
- String message="hello RabbitMQ"+i;
- /**
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println("已发送信息:"+message);
- }
-
- //关闭资源
- channel.close();
- connection.close();
- }
- }
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
-
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
- //收到的消息
- System.out.println("消费者1--接收到的消息:" + new String(body, "utf-8"));
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- //监听消息
- channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
-
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
- //收到的消息
- System.out.println("消费者2--接收到的消息:" + new String(body, "utf-8"));
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
-
- //监听消息
- channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看消费者消费的消息。
一个生产者发送的消息会被多个消费者获取。发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(binding)的所有的Queue上。这种模式不需要任何Routekey,需要提前将Exchange 与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以和多个Exchange绑定。如果接收到消息的Exchange没有与任何Queue绑定,则消息会丢失。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- public class Producer {
- //交换机名称
- static final String FANOUT_EXCHANGE="fanout_exchange";
- //队列名称
- static final String FANOUT_QUEUE_1="fanout_queue_1";
- static final String FANOUT_QUEUE_2="fanout_queue_2";
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
-
- //发送信息
- for (int i = 1; i <=10 ; i++) {
- String message="发布/订阅模式--"+i;
- /**
- * 参数1:交换机名称,如果没有指定则使用默认Default Exchange
- * 参数2:路由key,简单模式可以传递队列名称
- * 参数3:消息其它属性
- * 参数4:消息内容
- */
- channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
- System.out.println("已发送信息:"+message);
- }
-
- //关闭资源
- channel.close();
- connection.close();
- }
- }
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
- //声明队列
- channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
- //队列绑定交换机
- channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
-
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //收到的消息
- System.out.println("消费者1--接收到的消息:" + new String(body, "utf-8"));
- }
- };
- //监听消息
- channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
- //声明队列
- channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
- //队列绑定交换机
- channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE,"");
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //收到的消息
- System.out.println("消费者2--接收到的消息:" + new String(body, "utf-8"));
- }
- };
-
- //监听消息
- channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 fanout_exchange
的交换机,可以查看到如下的绑定:
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue,这种模式下不需要将Exchange进行任何绑定(binding)操作,消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey
(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
在编码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
- public class Producer {
- //交换机名称
- static final String DIRECT_EXCHANGE = "direct_exchange";
- //队列名称
- static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
- static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
-
- //声明交换机
- // 参数1:交换机名称
- // 参数2:交换机类型,fanout、topic、direct、headers
- channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
-
- // 发送信息到消费者1
- String message = "新增了商品。路由模式;routing key 为 insert ";
- channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- // 发送信息到消费者2
- message = "修改了商品。路由模式;routing key 为 update";
- channel.basicPublish(DIRECT_EXCHANGE, "update", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- //关闭资源
- channel.close();
- connection.close();
- }
- }
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
- //声明队列
- channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT,true,false,false,null);
- //队列绑定交换机
- channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE,"insert");
-
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //路由key
- System.out.println("路由key为:" + envelope.getRoutingKey());
- //交换机
- System.out.println("交换机为:" + envelope.getExchange());
- //消息id
- System.out.println("消息id为:" + envelope.getDeliveryTag());
- //收到的消息
- System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
- }
- };
- //监听消息
- channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
- //声明队列
- channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
- //队列绑定交换机
- channel.queueBind(Producer.DIRECT_QUEUE_UPDATE,Producer.DIRECT_EXCHANGE,"update");
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //路由key
- System.out.println("路由key为:" + envelope.getRoutingKey());
- //交换机
- System.out.println("交换机为:" + envelope.getExchange());
- //消息id
- System.out.println("消息id为:" + envelope.getDeliveryTag());
- //收到的消息
- System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
- }
- };
-
- //监听消息
- channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 direct_exchange
的交换机,可以查看到如下的绑定:
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey指定主题的Queue中。就是每个队列都有其关心的主题,所有的消息都带有一个标题(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配队列。这种模式需要Routekey并且提前绑定Exchange与Queue。在进行绑定时要提供一个该队列对应的主题。‘ # ’表示0个或若干个关键字,‘ * ’表示一个关键字。如果Exchange没有发现能够与RouteKey匹配的Queue,消息会丢失。
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
- public class Producer {
- //交换机名称
- static final String TOPIC_EXCHANGE = "topic_exchange";
- //队列名称
- static final String TOPIC_QUEUE_1 = "topic_queue_1";
- static final String TOPIC_QUEUE_2 = "topic_queue_2";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- // 参数1:交换机名称
- // 参数2:交换机类型,fanout、topic、direct、headers
- channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
- // 发送信息
- String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
- channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- // 发送信息
- message = "修改了商品。Topic模式;routing key 为 item.update" ;
- channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- // 发送信息
- message = "删除了商品。Topic模式;routing key 为 item.delete" ;
- channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message.getBytes());
- System.out.println("已发送消息:" + message);
-
- //关闭资源
- channel.close();
- connection.close();
- }
- }
- public class Consumer1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
- //声明队列
- channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);
- //队列绑定交换机
- channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.insert");
- channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.update");
-
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //收到的消息
- System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
- }
- };
- //监听消息
- channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- //声明交换机
- channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
- //声明队列
- channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
- //队列绑定交换机
- channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"item.delete");
- //接收信息
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //收到的消息
- System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
- }
- };
-
- //监听消息
- channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
- //不关闭资源,应该一直监听消息
- }
- }
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 topic_exchange
的交换机,可以查看到如下的绑定:
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式
和 Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。
发布确认模式有三种策略: 1、单次确认 2、批量确认 3、异步确认
- public class Single {
- //单个确认 267ms
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //获取连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- String queueName = UUID.randomUUID().toString();
- //声明队列
- channel.queueDeclare(queueName,true,false,false,null);
- //开启消息确认发布应答模式
- channel.confirmSelect();
- //记录开始时间
- long start = System.currentTimeMillis();
- //发送1000条信息
- for (int i = 1; i <=1000 ; i++) {
- //模拟信息
- String message=i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
- //单个确认
- boolean flag = channel.waitForConfirms();
- if(flag){
- System.out.println("--------第"+(i)+"条信息发送成功!");
- }else{
- System.out.println("=========第"+(i)+"条消息发送失败!");
- }
- }
- //记录结束时间
- long end = System.currentTimeMillis();
- System.out.println("共耗时:"+(end-start)+"ms");
- }
- }
- //批量确认 72ms
- public class More {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //获取连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- String queueName = UUID.randomUUID().toString();
- //声明队列
- channel.queueDeclare(queueName,true,false,false,null);
- //开启消息确认发布应答模式
- channel.confirmSelect();
- //记录开始时间
- long start = System.currentTimeMillis();
- //发送1000条信息
- for (int i = 1; i <=1000 ; i++) {
- //模拟信息
- String message=i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
- //批量确认
- if(i%100==0){
- boolean flag = channel.waitForConfirms();
- if(flag){
- System.out.println("--------第"+(i)+"条信息发送成功!");
- }else{
- System.out.println("该批消息有确认失败的,需要重新发送整批失败的消息");
- }
- }
-
- }
- //记录结束时间
- long end = System.currentTimeMillis();
- System.out.println("共耗时:"+(end-start)+"ms");
- }
- }
- //异步确认 46ms
- public class Asny {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection = ConnectionUtil.getConnection();
- //创建通道
- Channel channel = connection.createChannel();
- String queueName = UUID.randomUUID().toString();
- //声明队列
- channel.queueDeclare(queueName,true,false,false,null);
- //开启消息确认发布应答模式
- channel.confirmSelect();
- //消息发送成功回调函数
- ConfirmCallback ackCallback=(deliveryTag, multiple)->{
- System.out.println("消息发送成功");
- };
- //消息法送失败回调函数
- ConfirmCallback nackCallback=(deliveryTag, multiple)->{
- System.out.println("消息发送失败");
- };
- //注册监听器监听,异步通知
- channel.addConfirmListener(ackCallback,nackCallback);
- //记录开始时间
- long start = System.currentTimeMillis();
- //发送消息
- int message_num =1000;
- for (int i = 1; i <= message_num; i++) {
- String message=i+"";
- channel.basicPublish("",queueName,null,message.getBytes());
- }
- //记录发消息后时间
- long end=System.currentTimeMillis();
- System.out.println("该模式为异步批量确认模式:"+message_num+",耗时:"+(end-start)+"ms");
- }
- }
同步-单独发布消息:同步等待确认,简单,但吞吐量非常有限。
同步-批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难推断出是那条消息出现了问题。
异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。
不直接Exchange交换机(默认交换机)
simple简单模式: 一个生产者、一个消费者,生产者生产消息到一个队列被一个消费者接收
work Queue工作队列模式: 一个生产者、多个消费者(竞争关系),生产者发送消息到一个队列中,可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系
使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)
发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列
路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息
通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消息
发布确认模式(Publisher Confirms)
1、单次确认
2、批量确认
3、同步处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。