当前位置:   article > 正文

乐优商城(二十四)——RabbitMQ及数据同步_乐优商城使用的rabbitmq下载

乐优商城使用的rabbitmq下载

目录

一、RabbitMQ

1.1 搜索与商品服务的问题

1.2 消息队列(MQ)

1.2.1 什么是消息队列

1.2.2 AMQP和JMS

1.2.3 常见MQ产品

1.2.4 RabbitMQ

1.3 下载和安装

1.3.1 下载

1.3.2 安装

二、五种消息模型

2.1 基本消息模型

2.1.1 生产者发送消息

2.1.2 管理工具中查看消息

2.1.3 消费者获取消息

2.1.4 消息确认机制(ACK)

2.2 work消息模型

2.2.1 生产者

2.2.2 消费者1

2.2.3 消费者2

2.2.4 能者多劳

2.3 订阅模型分类

2.4 订阅模型-Fanout

2.4.1 生产者

2.4.2 消费者1

2.4.3 消费者2

2.4.4 测试

2.5 订阅模型-Direct

2.5.1 生产者

2.5.2 消费者1

2.5.3 消费者2

2.5.4 测试

2.6 订阅模型-Topic

2.6.2 消费者1

2.6.3 消费者2

2.6.4 测试

2.7 持久化

2.7.1 交换机持久化

2.7.2 队列持久化

2.7.3 消息持久化

2.8 总结

2.8.1 生产者代码

2.8.2 消费者代码


一、RabbitMQ

1.1 搜索与商品服务的问题

目前已经完成了商品详情和搜索系统的开发。思考一下,是否存在问题?

  • 商品的原始数据保存在数据库中,增删改查都在数据库中完成。

  • 搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新。

  • 商品详情做了页面静态化,静态页面数据也不会随着数据库商品发生变化。

如果在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?

这里有两种解决方案:

  • 方案1:每当后台对商品做增删改操作,同时要修改索引库数据及静态页面

  • 方案2:搜索服务和商品页面服务对外提供操作接口,后台在商品增删改后,调用接口

以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。

所以,会通过另外一种方式来解决这个问题:消息队列

1.2 消息队列(MQ)

1.2.1 什么是消息队列

消息队列,即MQ,Message Queue。

消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

结合前面所说的问题:

  • 商品服务对商品增删改以后,无需去操作索引库或静态页面,只是发送一条消息,也不关心消息被谁接收。

  • 搜索服务和静态页面服务接收消息,分别去处理索引库和静态页面。

如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。

1.2.2 AMQP和JMS

MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

1.2.3 常见MQ产品

  • ActiveMQ:基于JMS

  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

  • Kafka:分布式消息系统,高吞吐量

1.2.4 RabbitMQ

RabbitMQ是基于AMQP的一款消息管理系统

官网: http://www.rabbitmq.com/

官方教程:http://www.rabbitmq.com/getstarted.html

  • RabbitMQ是一个开源的、在AMQP基础上完整的、可复用的企业消息系统
  • 支持主流的操作系统,Linux、Windows、MacOS等
  • 多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等

1.3 下载和安装

1.3.1 下载

官网下载地址:http://www.rabbitmq.com/download.html

1.3.2 安装

Centos6安装RabbitMQ

二、五种消息模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,不做介绍。那么也就剩下5种。

但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

导入一个demo工程,来了解RabbitMQ的工作方式

项目结构

依赖

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>cn.itcast.rabbitmq</groupId>
  5. <artifactId>itcast-rabbitmq</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <parent>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-parent</artifactId>
  10. <version>2.0.2.RELEASE</version>
  11. </parent>
  12. <properties>
  13. <java.version>1.8</java.version>
  14. </properties>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.apache.commons</groupId>
  18. <artifactId>commons-lang3</artifactId>
  19. <version>3.3.2</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-amqp</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-test</artifactId>
  28. </dependency>
  29. </dependencies>
  30. </project>

工具类

方便其他程序获取连接

2.1 基本消息模型

P(producer/ publisher):生产者,一个发送消息的用户应用程序。

C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:

生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

用Java编写两个程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。

调用消息发布者(发送者)Send和消息消费者(接收者)Recv。发布者将连接到RabbitMQ,发送一条消息,然后退出。

2.1.1 生产者发送消息

  1. public class Send {
  2. private final static String QUEUE_NAME = "simple_queue";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. // 从连接中创建通道,这是完成大部分API的地方。
  7. Channel channel = connection.createChannel();
  8. // 声明(创建)队列,必须声明队列才能够发送消息,我们可以把消息发送到队列中。
  9. // 声明一个队列是幂等的 - 只有当它不存在时才会被创建
  10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11. // 消息内容
  12. String message = "Hello World!";
  13. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  14. System.out.println(" [x] Sent '" + message + "'");
  15. //关闭通道和连接
  16. channel.close();
  17. connection.close();
  18. }
  19. }

输出:

2.1.2 管理工具中查看消息

登录/leyou,然后查看消息。

进入队列页面,可以看到新建了一个队列:simple_queue

点击队列名称,进入详情页,可以查看消息:

在控制台查看消息并不会将消息消费,所以消息还在。

2.1.3 消费者获取消息

  1. package cn.itcast.rabbitmq.simple;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. /**
  10. * 消费者
  11. */
  12. public class Recv {
  13. private final static String QUEUE_NAME = "simple_queue";
  14. public static void main(String[] argv) throws Exception {
  15. // 获取到连接
  16. Connection connection = ConnectionUtil.getConnection();
  17. // 创建通道
  18. Channel channel = connection.createChannel();
  19. // 声明队列
  20. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  21. // 定义队列的消费者
  22. DefaultConsumer consumer = new DefaultConsumer(channel) {
  23. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  26. byte[] body) throws IOException {
  27. // body 即消息体
  28. String msg = new String(body);
  29. System.out.println(" [x] received : " + msg + "!");
  30. }
  31. };
  32. // 监听队列,第二个参数:是否自动进行消息确认。
  33. channel.basicConsume(QUEUE_NAME, true, consumer);
  34. }
  35. }

输出:

这个时候,队列中的消息就没了:

消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。再次运行Send,一旦有新的消息进入队列,Recv就会立即打印:

2.1.4 消息确认机制(ACK)

通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ怎么知道消息被接收了呢?

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  • 自动ACK:消息一旦被接收,消费者自动发送ACK

  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

如何选择呢?

这需要看消息的重要性:

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

之前的测试都是自动ACK的,如果要手动ACK,需要改动代码:

  1. public class Recv2 {
  2. private final static String QUEUE_NAME = "simple_queue";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接
  5. Connection connection = ConnectionUtil.getConnection();
  6. // 创建通道
  7. final Channel channel = connection.createChannel();
  8. // 声明队列
  9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10. // 定义队列的消费者
  11. DefaultConsumer consumer = new DefaultConsumer(channel) {
  12. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  13. @Override
  14. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  15. byte[] body) throws IOException {
  16. // body 即消息体
  17. String msg = new String(body);
  18. System.out.println(" [x] received : " + msg + "!");
  19. // 手动进行ACK
  20. channel.basicAck(envelope.getDeliveryTag(), false);
  21. }
  22. };
  23. // 监听队列,第二个参数false,手动进行ACK
  24. channel.basicConsume(QUEUE_NAME, false, consumer);
  25. }
  26. }

 注意到最后一行代码:

channel.basicConsume(QUEUE_NAME, false, consumer);

如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:

自动ACK存在的问题

修改消费者,添加异常,如下:

生产者不做任何修改,直接运行,消息发送成功:

运行消费者,程序抛出异常。但是消息依然被消费:

演示手动ACK

修改消费者,把自动改成手动(去掉之前制造的异常)

生产者不变,再次运行:

运行消费者:

查看管理界面,发现有一条信息没有确认

停掉消费者的程序,发现:

这是因为虽然设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。

当关掉这个消费者,消息的状态再次称为Ready。

修改代码,手动ACK:

执行:

消息消费成功。

2.2 work消息模型

工作队列或者竞争消费者模式

在基本消息模型中,从一个命名队列中发送并接受消息。在这里,将创建一个工作队列,在多个工作者之间分配耗时任务。

工作队列,又称任务队列。主要思想就是:尽可能的减少执行资源密集型任务时的等待时间。我们将任务封装为消息并发送到队列,在后台的工作进程将弹出任务并进行作业。当你运行很多worker时,任务将在他们之间共享。

这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。

接下来模拟这个流程:

​ P:生产者:任务的发布者

​ C1:消费者,领取任务并且完成任务,假设完成速度较快

​ C2:消费者2:领取任务并完成任务,假设完成速度慢

面试题:如何避免消息堆积?

1) 采用workqueue,多个消费者监听同一队列。

2)接收到消息以后,通过线程池,异步消费。

2.2.1 生产者

循环发送50条信息

  1. public class Send {
  2. private final static String QUEUE_NAME = "test_work_queue";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接
  5. Connection connection = ConnectionUtil.getConnection();
  6. // 获取通道
  7. Channel channel = connection.createChannel();
  8. // 声明队列
  9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10. // 循环发布任务
  11. for (int i = 0; i < 50; i++) {
  12. // 消息内容
  13. String message = "task .. " + i;
  14. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  15. System.out.println(" [x] Sent '" + message + "'");
  16. Thread.sleep(i * 2);
  17. }
  18. // 关闭通道和连接
  19. channel.close();
  20. connection.close();
  21. }
  22. }

2.2.2 消费者1

2.2.3 消费者2

与消费者1基本类似,就是没有设置消费耗时间。

这里是模拟有些消费者快,有些比较慢。

接下来,两个消费者一同启动,然后发送50条消息:

启动生产者,然后发送50条信息:

消费者:

                       

2.2.4 能者多劳

刚才的实现有问题吗?

  • 消费者1比消费者2的效率要低,一次任务的耗时较长

  • 然而两人最终消费的消息数量是一样的

  • 消费者2大量时间处于空闲状态,消费者1一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

怎么实现呢?

可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

两个消费者都加一句:channel.basicQos(1)

再次测试:

2.3 订阅模型分类

在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。

订阅模型示意图:

解读:

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange类型有以下几种:

​ Fanout:广播,将消息交给所有绑定到交换机的队列

​ Direct:定向,把消息交给符合指定routing key 的队列

​ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

2.4 订阅模型-Fanout

Fanout,也称为广播。

流程图:

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者

  • 2) 每个消费者有自己的queue(队列)

  • 3) 每个队列都要绑定到Exchange(交换机)

  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

  • 5) 交换机把消息发送给绑定过的所有队列

  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

2.4.1 生产者

两个变化:

  • 1) 声明Exchange,不再声明Queue

  • 2) 发送消息到Exchange,不再发送到Queue

  1. package cn.itcast.rabbitmq.fanout;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "fanout_exchange_test";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接
  9. Connection connection = ConnectionUtil.getConnection();
  10. // 获取通道
  11. Channel channel = connection.createChannel();
  12. // 声明exchange,指定类型为fanout
  13. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  14. // 消息内容
  15. String message = "Hello everyone";
  16. // 发布消息到Exchange
  17. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  18. System.out.println(" [生产者] Sent '" + message + "'");
  19. channel.close();
  20. connection.close();
  21. }
  22. }

2.4.2 消费者1

  1. package cn.itcast.rabbitmq.fanout;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. //消费者1
  10. public class Recv {
  11. private final static String QUEUE_NAME = "fanout_exchange_queue_1";
  12. private final static String EXCHANGE_NAME = "fanout_exchange_test";
  13. public static void main(String[] argv) throws Exception {
  14. // 获取到连接
  15. Connection connection = ConnectionUtil.getConnection();
  16. // 获取通道
  17. Channel channel = connection.createChannel();
  18. // 声明队列
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. // 绑定队列到交换机
  21. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  22. // 定义队列的消费者
  23. DefaultConsumer consumer = new DefaultConsumer(channel) {
  24. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  27. byte[] body) throws IOException {
  28. // body 即消息体
  29. String msg = new String(body);
  30. System.out.println(" [消费者1] received : " + msg + "!");
  31. }
  32. };
  33. // 监听队列,自动返回完成
  34. channel.basicConsume(QUEUE_NAME, true, consumer);
  35. }
  36. }

要注意代码中:队列需要和交换机绑定

2.4.3 消费者2

  1. package cn.itcast.rabbitmq.fanout;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. // 消费者2
  10. public class Recv2 {
  11. private final static String QUEUE_NAME = "fanout_exchange_queue_2";
  12. private final static String EXCHANGE_NAME = "fanout_exchange_test";
  13. public static void main(String[] argv) throws Exception {
  14. // 获取到连接
  15. Connection connection = ConnectionUtil.getConnection();
  16. // 获取通道
  17. Channel channel = connection.createChannel();
  18. // 声明队列
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. // 绑定队列到交换机
  21. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  22. // 定义队列的消费者
  23. DefaultConsumer consumer = new DefaultConsumer(channel) {
  24. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  27. byte[] body) throws IOException {
  28. // body 即消息体
  29. String msg = new String(body);
  30. System.out.println(" [消费者2] received : " + msg + "!");
  31. }
  32. };
  33. // 监听队列,手动返回完成
  34. channel.basicConsume(QUEUE_NAME, true, consumer);
  35. }
  36. }

2.4.4 测试

运行两个消费者,然后发送1条消息:

2.5 订阅模型-Direct

有选择性的接收消息

在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。

在路由模式中,我们将添加一个功能:只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

2.5.1 生产者

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

  1. package cn.itcast.rabbitmq.direct;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. /**
  6. * 生产者,模拟为商品服务
  7. */
  8. class Send {
  9. private final static String EXCHANGE_NAME = "direct_exchange_test";
  10. public static void main(String[] argv) throws Exception {
  11. // 获取到连接
  12. Connection connection = ConnectionUtil.getConnection();
  13. // 获取通道
  14. Channel channel = connection.createChannel();
  15. // 声明exchange,指定类型为direct
  16. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  17. // 消息内容
  18. String message = "商品删除了, id = 1001";
  19. // 发送消息,并且指定routing key 为:insert ,代表新增商品
  20. channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
  21. System.out.println(" [商品服务:] Sent '" + message + "'");
  22. channel.close();
  23. connection.close();
  24. }
  25. }

2.5.2 消费者1

此处假设消费者1只接收两种类型的消息:更新商品和删除商品。

  1. package cn.itcast.rabbitmq.direct;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. /**
  10. * 消费者1
  11. */
  12. class Recv {
  13. private final static String QUEUE_NAME = "direct_exchange_queue_1";
  14. private final static String EXCHANGE_NAME = "direct_exchange_test";
  15. public static void main(String[] argv) throws Exception {
  16. // 获取到连接
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 获取通道
  19. Channel channel = connection.createChannel();
  20. // 声明队列
  21. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  22. // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
  23. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  25. // 定义队列的消费者
  26. DefaultConsumer consumer = new DefaultConsumer(channel) {
  27. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  28. @Override
  29. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  30. byte[] body) throws IOException {
  31. // body 即消息体
  32. String msg = new String(body);
  33. System.out.println(" [消费者1] received : " + msg + "!");
  34. }
  35. };
  36. // 监听队列,自动ACK
  37. channel.basicConsume(QUEUE_NAME, true, consumer);
  38. }
  39. }

2.5.3 消费者2

此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。

  1. package cn.itcast.rabbitmq.direct;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. /**
  10. * 消费者2
  11. */
  12. public class Recv2 {
  13. private final static String QUEUE_NAME = "direct_exchange_queue_2";
  14. private final static String EXCHANGE_NAME = "direct_exchange_test";
  15. public static void main(String[] argv) throws Exception {
  16. // 获取到连接
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 获取通道
  19. Channel channel = connection.createChannel();
  20. // 声明队列
  21. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  22. // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
  23. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
  24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  25. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  26. // 定义队列的消费者
  27. DefaultConsumer consumer = new DefaultConsumer(channel) {
  28. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  29. @Override
  30. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  31. byte[] body) throws IOException {
  32. // body 即消息体
  33. String msg = new String(body);
  34. System.out.println(" [消费者2] received : " + msg + "!");
  35. }
  36. };
  37. // 监听队列,自动ACK
  38. channel.basicConsume(QUEUE_NAME, true, consumer);
  39. }
  40. }

2.5.4 测试

 

 

2.6 订阅模型-Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“<speed>.<color>.<species>”。

我们创建了三个绑定:Q1绑定了绑定键“* .orange.”,Q2绑定了“.*.rabbit”和“lazy.#”。

Q1匹配所有的橙色动物。

Q2匹配关于兔子以及懒惰动物的消息。

练习,生产者发送如下消息,会进入那个队列:

quick.orange.rabbit  Q1 Q2

lazy.orange.elephant  Q1 Q2

quick.orange.fox  Q1

lazy.pink.rabbit  Q2

quick.brown.fox  不匹配任意队列,被丢弃

2.6.1 生产者

使用topic类型的Exchange,发送消息的routing key有3种: item.isnertitem.updateitem.delete

  1. package cn.itcast.rabbitmq.topic;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. /**
  6. * 生产者,模拟为商品服务
  7. */
  8. public class Send {
  9. private final static String EXCHANGE_NAME = "topic_exchange_test";
  10. public static void main(String[] argv) throws Exception {
  11. // 获取到连接
  12. Connection connection = ConnectionUtil.getConnection();
  13. // 获取通道
  14. Channel channel = connection.createChannel();
  15. // 声明exchange,指定类型为topic
  16. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  17. // 消息内容
  18. String message = "新增商品 : id = 1001";
  19. // 发送消息,并且指定routing key 为:insert ,代表新增商品
  20. channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
  21. System.out.println(" [商品服务:] Sent '" + message + "'");
  22. channel.close();
  23. connection.close();
  24. }
  25. }

2.6.2 消费者1

此处假设消费者1只接收两种类型的消息:更新商品和删除商品。

  1. package cn.itcast.rabbitmq.topic;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. /**
  10. * 消费者1
  11. */
  12. public class Recv {
  13. private final static String QUEUE_NAME = "topic_exchange_queue_1";
  14. private final static String EXCHANGE_NAME = "topic_exchange_test";
  15. public static void main(String[] argv) throws Exception {
  16. // 获取到连接
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 获取通道
  19. Channel channel = connection.createChannel();
  20. // 声明队列
  21. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  22. // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
  23. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
  24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
  25. // 定义队列的消费者
  26. DefaultConsumer consumer = new DefaultConsumer(channel) {
  27. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  28. @Override
  29. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  30. byte[] body) throws IOException {
  31. // body 即消息体
  32. String msg = new String(body);
  33. System.out.println(" [消费者1] received : " + msg + "!");
  34. }
  35. };
  36. // 监听队列,自动ACK
  37. channel.basicConsume(QUEUE_NAME, true, consumer);
  38. }
  39. }

2.6.3 消费者2

此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。

  1. package cn.itcast.rabbitmq.topic;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import cn.itcast.rabbitmq.util.ConnectionUtil;
  9. /**
  10. * 消费者2
  11. */
  12. public class Recv2 {
  13. private final static String QUEUE_NAME = "topic_exchange_queue_2";
  14. private final static String EXCHANGE_NAME = "topic_exchange_test";
  15. public static void main(String[] argv) throws Exception {
  16. // 获取到连接
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 获取通道
  19. Channel channel = connection.createChannel();
  20. // 声明队列
  21. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  22. // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
  23. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
  24. // 定义队列的消费者
  25. DefaultConsumer consumer = new DefaultConsumer(channel) {
  26. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  29. byte[] body) throws IOException {
  30. // body 即消息体
  31. String msg = new String(body);
  32. System.out.println(" [消费者2] received : " + msg + "!");
  33. }
  34. };
  35. // 监听队列,自动ACK
  36. channel.basicConsume(QUEUE_NAME, true, consumer);
  37. }
  38. }

2.6.4 测试

发送三种消息

2.7 持久化

如何避免消息丢失?

1) 消费者的ACK机制。可以防止消费者丢失消息。

2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。

如果可以将消息进行持久化呢?

要将消息持久化,前提是:队列、Exchange都持久化

2.7.1 交换机持久化

2.7.2 队列持久化

2.7.3 消息持久化

2.8 总结

2.8.1 生产者代码

获取连接

Connection connection = ConnectionUtil.getConnection();

获取通道

Channel channel = connection.createChannel();

模型选择

  • 如果是基本消息模型和work消息模型,那么就需要声明一个队列,然后将信息发布到队列当中,因为消费者是直接在队列里面拿信息。
 channel.queueDeclare("队列名字", false, false, false, null);
  1. public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
  2. com.rabbitmq.client.AMQP.Queue.DeclareOk ok = this.delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
  3. RecordedQueue q = (new RecordedQueue(this, ok.getQueue())).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments);
  4. if (queue.equals("")) {
  5. q.serverNamed(true);
  6. }
  7. this.recordQueue((com.rabbitmq.client.AMQP.Queue.DeclareOk)ok, q);
  8. return ok;
  9. }

参数详解:

  • queue: 队列名称
  • durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
  • exclusive:是否排外的,有两个作用:
  1. 当连接关闭时connection.close()该队列是否会自动删除;
  2. 该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景
  • autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
  • arguments: 队列中的消息什么时候会自动被删除?
  1. Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间
    1. AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”); 
    2. channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

    Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

  2. Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
  3. Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
  4. Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
  5. Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
  6. Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
  7. Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
  8. Master locator(x-queue-master-locator)
  • 如果是订阅模型(Fanout、Direct、Topic),那么就需要声明一个交换机,信息发送到交换机中,交换机负责将信息发送到队列里面,消费者再从队列里面获取消息。
channel.exchangeDeclare("交换机名字", "交换机类型");

信息发送

  • 基本模型

信息直接发送到队列中:

channel.basicPublish("", "队列名字", null, "信息字节流");
  • 订阅模型

信息发送到交换机中:

Fanout

channel.basicPublish("交换机名字", "", null, "信息字节流");

Direct、Topic

channel.basicPublish("交换机名字", "RoutingKey", null, message.getBytes());

参数详解: 

  1. public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException {
  2. if (this.nextPublishSeqNo > 0L) {
  3. this.unconfirmedSet.add(this.getNextPublishSeqNo());
  4. ++this.nextPublishSeqNo;
  5. }
  6. BasicProperties useProps = props;
  7. if (props == null) {
  8. useProps = MessageProperties.MINIMAL_BASIC;
  9. }
  10. this.transmit(new AMQCommand((new Builder()).exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), useProps, body));
  11. this.metricsCollector.basicPublish(this);
  12. }
  • exchange:交换机名字
  • routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
  • mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
  • immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
  • BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留

简单来说:

mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;

immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

关闭通道、关闭连接

  1. channel.close();
  2. connection.close();

2.8.2 消费者代码

获取连接

 Connection connection = ConnectionUtil.getConnection();

获取通道

Channel channel = connection.createChannel();

声明队列

channel.queueDeclare("队列名字", false, false, false, null);

绑定队列到交换机

如果是订阅类型(Fanout、Direct和Topic),则需要绑定队列到交换机 ;如果是基本消息消息模型和work消息模型,则不需要此操作

Fanout

 channel.queueBind("队列名字", "交换机名字", "");

 Direct和Topic

channel.queueBind("队列名字", "交换机名字", "routing key");

定义队列的消费者

  1. DefaultConsumer consumer = new DefaultConsumer(channel) {
  2. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  3. @Override
  4. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  5. byte[] body) throws IOException {
  6. // body 即消息体
  7. String msg = new String(body);
  8. System.out.println(" [消费者1] received : " + msg + "!");
  9. }
  10. };

监听队列,自动或者手动ACK

自动ACK:

channel.basicConsume("队列名字", true, consumer);

手动ACK:

channel.basicConsume("队列名字", false, consumer);

参数解析:

 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答

 

channel.basicAck(envelope.getDeliveryTag(), false);
  1. deliveryTag:该消息的index
  2. multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/597547
推荐阅读
相关标签
  

闽ICP备14008679号