当前位置:   article > 正文

RabbitMQ - RabbitMQ交换机;事务;发送、接收消息_rabbitmq 事务消息 接收方

rabbitmq 事务消息 接收方

消息队列(Message Queue)MQ 产品从模型抽象上来说都是一样的过程:

消费者(consumer)订阅某个队列,生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者

AMQP协议

AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列

AMQP的机制如下图所示:

消息(message)被发布者(publisher)发送给交换机(exchange),然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。 

一、RabbitMQ架构

RabbitMQ系统最核心的组件是Exchange(交换机)和Queue(队列),Exchange和Queue是在rabbitmq server(又叫做Broker)端,Producer(生产者)和Consumer(消费者)在应用端

1、RabbitMQ架构

RabbitMQ架构图如下:

1、Message

消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 

2、Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3、Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

4、Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

5、Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6、Connection

网络连接,比如一个TCP连接。

7、Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8、Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9、Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

10、Broker

表示消息队列服务器实体。

2、交换机(Exchange)和交换机类型

队列,交换机和绑定统称为AMQP实体(AMQP entities)

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct(直连交换机)、fanout(扇型交换机)、topic(主题交换机)、headers(头交换机)

headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎不用,主要使用direct、fanout、topic

direct、fanout、topic

1、direct(直连交换机)

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“hello”,则只转发 routing key 标记为“hello”的消息。它是完全匹配、单播的模式。

2、fanout(扇型交换机)

发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的

3、topic(主题交换机)

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。

它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”

#匹配0个或多个单词

“*”匹配不多不少一个单词

 3、Java发送、接收消息(不使用交换机)

分别创建两个maven Java普通工程,一个命名java-send,一个命名java-receive

发送消息类Send

  1. /**
  2. * 普通发送消息,无交换机
  3. * 消费者(consumer)订阅某个队列,生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者
  4. */
  5. public class Send {
  6. public static void main(String[] args) {
  7. //创建连接工厂
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. /**
  10. * 配置RabbitMQ连接信息
  11. */
  12. //指定IP
  13. connectionFactory.setHost("192.168.133.129");
  14. //指定端口(控制台端口15672)
  15. connectionFactory.setPort(5672);
  16. //指定账号
  17. connectionFactory.setUsername("root");
  18. //指定密码
  19. connectionFactory.setPassword("root");
  20. //定义连接
  21. Connection connection = null;
  22. //定义通道
  23. Channel channel = null;
  24. try {
  25. //获取连接
  26. connection = connectionFactory.newConnection();
  27. //获取通道
  28. channel = connection.createChannel();
  29. /**
  30. * 声明一个队列
  31. * 参数1 队列名称取值任意
  32. * 参数2 是否为持久化的队列
  33. * 参数3 是否排外,如果排外这个队列只允许一个消费者监听
  34. * 参数4 是否自动删除队列,如果为true表示当前队列中没有消息,也没有消费者连接时就会自动删除这个队列
  35. * 参数5 队列的一些属性设置,通常为null
  36. *
  37. * 注:
  38. * 1、声明队列时,这个队列名称如果已经存在则放弃声明,如果队列不存在则会声明一个新的队列
  39. * 2、队列名可以取值任意,但是要与消息接收时的队列名称完全一致
  40. * 3、一定要在发送消息前确认队列名已经存在在RabbitMQ中,否则就会出现问题(如下代码可有可无)
  41. */
  42. String queueName = "myQueue";
  43. channel.queueDeclare(queueName,true,false,false,null);
  44. String message = "我的RabbitMQ消息,无交换机";
  45. /**
  46. * 发送消息到RabbitMQ
  47. * 参数1 交换机名称,空字符串表示不使用交换机
  48. * 参数2 队列名或RoutingKey(当指定了交换机名称,这个值就是RoutingKey)
  49. * 参数3 消息属性信息,通常为null
  50. * 参数4 具体的消息数据的字节数组
  51. *
  52. * 注:发送消息的队列名与接收消息的队列名要保持完全一致
  53. */
  54. channel.basicPublish("",queueName,null,message.getBytes("UTF-8"));
  55. System.out.println("消息发送成功!(无交换机)");
  56. } catch (UnsupportedEncodingException e) {
  57. e.printStackTrace();
  58. } catch (IOException e) {
  59. e.printStackTrace();
  60. } catch (TimeoutException e) {
  61. e.printStackTrace();
  62. } finally {
  63. if(channel != null){
  64. try {
  65. channel.close();
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. } catch (TimeoutException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. if(connection != null){
  73. try {
  74. connection.close();
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }
  80. }
  81. }

 运行程序,从RabbitMQ管理控制台查看消息,如下

接收消息类Receive

  1. /**
  2. * 普通接收消息,无交换机
  3. * 消费者(consumer)订阅某个队列,生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者
  4. */
  5. public class Receive {
  6. public static void main(String[] args) {
  7. //创建连接工厂
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. /**
  10. * 配置RabbitMQ连接信息
  11. */
  12. //指定IP
  13. connectionFactory.setHost("192.168.133.129");
  14. //指定端口(控制台端口15672)
  15. connectionFactory.setPort(5672);
  16. //指定账号
  17. connectionFactory.setUsername("root");
  18. //指定密码
  19. connectionFactory.setPassword("root");
  20. //定义连接
  21. Connection connection = null;
  22. //定义通道
  23. Channel channel = null;
  24. try {
  25. //获取连接
  26. connection = connectionFactory.newConnection();
  27. //获取通道
  28. channel = connection.createChannel();
  29. /**
  30. * 声明一个队列
  31. * 参数1 队列名称取值任意
  32. * 参数2 是否为持久化的队列
  33. * 参数3 是否排外,如果排外这个队列只允许一个消费者监听
  34. * 参数4 是否自动删除队列,如果为true表示当前队列中没有消息,也没有消费者连接时就会自动删除这个队列
  35. * 参数5 队列的一些属性设置,通常为null
  36. *
  37. * 注:
  38. * 1、声明队列时,这个队列名称如果已经存在则放弃声明,如果队列不存在则会声明一个新的队列
  39. * 2、队列名可以取值任意,但是要与消息接收时的队列名称完全一致
  40. * 3、一定要在发送消息前确认队列名已经存在在RabbitMQ中,否则就会出现问题(如下代码可有可无)
  41. */
  42. String queueName = "myQueue";
  43. channel.queueDeclare(queueName,true,false,false,null);
  44. String message = "我的RabbitMQ消息,无交换机";
  45. /**
  46. * 接收消息
  47. * 参数1 当前消费者监听的队列名(接收消息的队列名必须要和发送消息的队列名完全保持一致,否则接收不到消息)
  48. * 参数2 消息是否自动确认;true表示自动确认接收,且接收完消息以后会自动将消息从队列中移除
  49. * 参数3 消息接收者标签,用于多个消费者同时监听一个队列时,用于确认不同的消费者,通常为空字符串
  50. * 参数4 消息接收的回调方法,完成对消息的处理
  51. *
  52. * 注:使用了basiccnsume()方法后,会自动开启一个线程持续监听队列,如果队列中有消息数据进入会自动接收消息
  53. * 因此不能关闭连接和通道对象
  54. */
  55. channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
  56. //消息具体接收方法和处理方法
  57. public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
  58. String message = new String(body,"UTF-8");
  59. System.out.println("消费者----" + message);
  60. }
  61. });
  62. } catch (UnsupportedEncodingException e) {
  63. e.printStackTrace();
  64. } catch (IOException e) {
  65. e.printStackTrace();
  66. } catch (TimeoutException e) {
  67. e.printStackTrace();
  68. } finally {
  69. //不能关闭通道和链接,如果一旦关闭可能会造成接收时抛出异常,或无法接收到消息
  70. // if(channel != null){
  71. // try {
  72. // channel.close();
  73. // } catch (IOException e) {
  74. // e.printStackTrace();
  75. // } catch (TimeoutException e) {
  76. // e.printStackTrace();
  77. // }
  78. // }
  79. // if(connection != null){
  80. // try {
  81. // connection.close();
  82. // } catch (IOException e) {
  83. // e.printStackTrace();
  84. // }
  85. // }
  86. }
  87. }
  88. }

  运行程序,从RabbitMQ管理控制台查看消息,如下,之后再刷新查看,消息就为0条

注:

1、Queue的消息只能被同一个消费者消费,如果没有消费监听队列那么消息会存放到队列中持久化保存,直到有消费者来消费这个消息,如果已有消费者监听队列则立即消费发送到队列中的消息

2、Queue的消息可以保证每个消息都一定能被消费

4、Java发送、接收消息(使用交换机) 

1、direct交换机

发送消息

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * 发送消息,direct交换机(完全匹配、单播的模式)
  8. *
  9. * 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,
  10. * 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,
  11. * 如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等
  12. */
  13. public class DirectSend {
  14. public static void main(String[] args) {
  15. //创建连接工厂
  16. ConnectionFactory connectionFactory = new ConnectionFactory();
  17. /**
  18. * 配置RabbitMQ连接信息
  19. */
  20. //指定IP
  21. connectionFactory.setHost("192.168.133.129");
  22. //指定端口(控制台端口15672)
  23. connectionFactory.setPort(5672);
  24. //指定账号
  25. connectionFactory.setUsername("root");
  26. //指定密码
  27. connectionFactory.setPassword("root");
  28. //定义连接
  29. Connection connection = null;
  30. //定义通道
  31. Channel channel = null;
  32. try {
  33. //获取连接
  34. connection = connectionFactory.newConnection();
  35. //获取通道
  36. channel = connection.createChannel();
  37. /**
  38. * 声明一个队列
  39. * 参数1 队列名称取值任意
  40. * 参数2 是否为持久化的队列
  41. * 参数3 是否排外,如果排外这个队列只允许一个消费者监听
  42. * 参数4 是否自动删除队列,如果为true表示当前队列中没有消息,也没有消费者连接时就会自动删除这个队列
  43. * 参数5 队列的一些属性设置,通常为null
  44. *
  45. * 注:
  46. * 1、声明队列时,这个队列名称如果已经存在则放弃声明,如果队列不存在则会声明一个新的队列
  47. * 2、队列名可以取值任意,但是要与消息接收时的队列名称完全一致
  48. * 3、一定要在发送消息前确认队列名已经存在在RabbitMQ中,否则就会出现问题(如下代码可有可无)
  49. */
  50. String queueName = "myDirectQueue";
  51. channel.queueDeclare(queueName,true,false,false,null);
  52. /**
  53. * 声明一个交换机
  54. * 参数1 交换机名称
  55. * 参数2 交换机类型,取值:direct,fanout,topic,headers
  56. * 参数3 为是否为持久化交换机
  57. *
  58. * 注:
  59. * 1、声明交换机时如果这个交换机应存在则会放弃声明,如果交换机不存在则声明交换机
  60. * 2、在使用前必须要确保这个交换机被声明(如下代码可有可无)
  61. */
  62. String exchageName = "directExchange";
  63. String exchageType = "direct";
  64. channel.exchangeDeclare(exchageName,exchageType,true);
  65. /**
  66. * 将队列绑定到交换机
  67. * 参数1 队列名称
  68. * 参数2 交换机名称
  69. * 参数3 消息的RoutingKey(也是BindingKey)
  70. *
  71. * 注:在进行队列和交换机绑定时必须确保队列和交换机已经成功声明
  72. */
  73. String routingkey = "directRoutingKey";
  74. channel.queueBind(queueName,exchageName,routingkey);
  75. String message="direct交换机的消息!";
  76. /**
  77. * 发送消息到指定队列
  78. * 参数1 交换机名称
  79. * 参数2 消息的RoutingKey,如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致,
  80. * 那么这个消息就会发送的指定的队列中
  81. * 参数3 消息属性信息,通常为null
  82. * 参数4 具体的消息数据的字节数组
  83. *
  84. * 注:发送消息时必须确保交换机已经创建,并且已经绑定到了某个队列
  85. */
  86. channel.basicPublish(exchageName,routingkey,null,message.getBytes("UTF-8"));
  87. System.out.println("direct交换机消息发送成功!");
  88. } catch (IOException e) {
  89. e.printStackTrace();
  90. } catch (TimeoutException e) {
  91. e.printStackTrace();
  92. } finally {
  93. if(channel!=null){
  94. try {
  95. channel.close();
  96. } catch (IOException e) {
  97. e.printStackTrace();
  98. } catch (TimeoutException e) {
  99. e.printStackTrace();
  100. }
  101. }
  102. if(connection!=null){
  103. try {
  104. connection.close();
  105. } catch (IOException e) {
  106. e.printStackTrace();
  107. }
  108. }
  109. }
  110. }
  111. }

  运行程序,从RabbitMQ管理控制台查看消息,如下

 接收消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 接收消息,direct交换机
  6. */
  7. public class DirectReceive {
  8. public static void main(String[] args) {
  9. //创建连接工厂
  10. ConnectionFactory connectionFactory = new ConnectionFactory();
  11. /**
  12. * 配置RabbitMQ连接信息
  13. */
  14. //指定IP
  15. connectionFactory.setHost("192.168.133.129");
  16. //指定端口(控制台端口15672)
  17. connectionFactory.setPort(5672);
  18. //指定账号
  19. connectionFactory.setUsername("root");
  20. //指定密码
  21. connectionFactory.setPassword("root");
  22. //定义连接
  23. Connection connection = null;
  24. //定义通道
  25. Channel channel = null;
  26. try {
  27. //获取连接
  28. connection = connectionFactory.newConnection();
  29. //获取通道
  30. channel = connection.createChannel();
  31. /**
  32. * 声明一个队列
  33. * 参数1 队列名称取值任意
  34. * 参数2 是否为持久化的队列
  35. * 参数3 是否排外,如果排外这个队列只允许一个消费者监听
  36. * 参数4 是否自动删除队列,如果为true表示当前队列中没有消息,也没有消费者连接时就会自动删除这个队列
  37. * 参数5 队列的一些属性设置,通常为null
  38. *
  39. * 注:
  40. * 1、声明队列时,这个队列名称如果已经存在则放弃声明,如果队列不存在则会声明一个新的队列
  41. * 2、队列名可以取值任意,但是要与消息接收时的队列名称完全一致
  42. * 3、一定要在发送消息前确认队列名已经存在在RabbitMQ中,否则就会出现问题(如下代码可有可无)
  43. */
  44. String queueName = "myDirectQueue";
  45. channel.queueDeclare(queueName,true,false,false,null);
  46. /**
  47. * 声明一个交换机
  48. * 参数1 交换机名称
  49. * 参数2 交换机类型,取值:direct,fanout,topic,headers
  50. * 参数3 为是否为持久化交换机
  51. *
  52. * 注:
  53. * 1、声明交换机时如果这个交换机应存在则会放弃声明,如果交换机不存在则声明交换机
  54. * 2、在使用前必须要确保这个交换机被声明(如下代码可有可无)
  55. */
  56. String exchageName = "directExchange";
  57. String exchageType = "direct";
  58. channel.exchangeDeclare(exchageName,exchageType,true);
  59. /**
  60. * 将队列绑定到交换机
  61. * 参数1 队列名称
  62. * 参数2 交换机名称
  63. * 参数3 消息的RoutingKey(也是BindingKey)
  64. *
  65. * 注:在进行队列和交换机绑定时必须确保队列和交换机已经成功声明
  66. */
  67. String routingkey = "directRoutingKey";
  68. channel.queueBind(queueName,exchageName,routingkey);
  69. /**
  70. * 监听某个队列并获取队列中的数据
  71. * 参数1 当前消费者监听的队列名(接收消息的队列名必须要和发送消息的队列名完全保持一致,否则接收不到消息)
  72. * 参数2 消息是否自动确认;true表示自动确认接收,且接收完消息以后会自动将消息从队列中移除
  73. * 参数3 消息接收者标签,用于多个消费者同时监听一个队列时,用于确认不同的消费者,通常为空字符串
  74. * 参数4 消息接收的回调方法,完成对消息的处理
  75. *
  76. * 注:当前被监听的队列必须已经存在,并绑定到了某个交换机中
  77. */
  78. channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
  79. @Override
  80. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  81. String message = new String(body);
  82. System.out.println("direct消费者----" + message);
  83. }
  84. });
  85. } catch (IOException e) {
  86. e.printStackTrace();
  87. } catch (TimeoutException e) {
  88. e.printStackTrace();
  89. } finally {
  90. }
  91. }
  92. }

 

注:

1、使用direct消息模式时必须要指定RoutingKey(路由键),将指定的消息绑定到指定的路由键上 

2、使用Exchange的direct模式时接收者的RoutingKey必须要与发送时的RoutingKey完全一致否则无法获取消息

3、接收消息时队列名也必须要发送消息时的完全一致

2、fanout交换机

发送消息

  1. /**
  2. * 发送消息,fanout交换机(一对多,广播)
  3. *
  4. * 注:
  5. * 1、Fanout模式必须先启动消息接收者队列监听消息,否则会丢失消息
  6. * 2、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();
  7. * 获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。
  8. * 3、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchange即可
  9. * 让所有接收者同时接收同一个消息,是一种广播的消息机制
  10. */
  11. public class FanoutSend {
  12. public static void main(String[] args) {
  13. //创建连接工厂
  14. ConnectionFactory connectionFactory = new ConnectionFactory();
  15. /**
  16. * 配置RabbitMQ连接信息
  17. */
  18. //指定IP
  19. connectionFactory.setHost("192.168.133.129");
  20. //指定端口(控制台端口15672)
  21. connectionFactory.setPort(5672);
  22. //指定账号
  23. connectionFactory.setUsername("root");
  24. //指定密码
  25. connectionFactory.setPassword("root");
  26. //定义连接
  27. Connection connection = null;
  28. //定义通道
  29. Channel channel = null;
  30. try {
  31. //获取连接
  32. connection = connectionFactory.newConnection();
  33. //获取通道
  34. channel = connection.createChannel();
  35. /**
  36. * 声明一个交换机
  37. * 参数1 交换机名称
  38. * 参数2 交换机类型,取值:direct,fanout,topic,headers
  39. * 参数3 为是否为持久化交换机
  40. *
  41. * 由于使用Fanout类型的交换机,因此消息的接收方可能会有多个因此不建议在消息发送时来创建队列
  42. * 以及绑定交换机,建议在消费者中创建队列并绑定交换机
  43. * 但是发送消息时至少应该确保交换机存在
  44. */
  45. String exchangeName = "fanoutExchange";
  46. String exchangeType = "fanout";
  47. channel.exchangeDeclare(exchangeName,exchangeType,true);
  48. // channel.queueDeclare("fanoutQueue",true,false,false,null);
  49. // channel.queueBind("fanoutQueue","fanoutExchange","");
  50. String message="fanout的测试消息!";
  51. /**
  52. * 发送消息到指定队列
  53. * 参数1 交换机名称
  54. * 参数2 消息的RoutingKey,如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致,
  55. * 那么这个消息就会发送的指定的队列中
  56. * 参数3 消息属性信息,通常为null
  57. * 参数4 具体的消息数据的字节数组
  58. *
  59. * 注:发送消息时必须确保交换机已经创建,并且已经绑定到了某个队列
  60. */
  61. channel.basicPublish(exchangeName,"",null,message.getBytes("UTF-8"));
  62. System.out.println("fanout交换机消息发送成功!");
  63. } catch (IOException e) {
  64. e.printStackTrace();
  65. } catch (TimeoutException e) {
  66. e.printStackTrace();
  67. } finally {
  68. if(channel!=null){
  69. try {
  70. channel.close();
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. } catch (TimeoutException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. if(connection!=null){
  78. try {
  79. connection.close();
  80. } catch (IOException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. }
  86. }

接收消息,可以有多个接收消息者,因此可以复制多个 FanoutReceive 类

  1. /**
  2. * 接收消息,fanout交换机
  3. *
  4. * 注:
  5. * 1、Fanout模式必须先启动消息接收者队列监听消息,否则会丢失消息
  6. * 2、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();
  7. * 获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。
  8. * 3、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchange即可
  9. * 让所有接收者同时接收同一个消息,是一种广播的消息机制
  10. */
  11. public class FanoutReceive {
  12. public static void main(String[] args) {
  13. //创建连接工厂
  14. ConnectionFactory connectionFactory = new ConnectionFactory();
  15. /**
  16. * 配置RabbitMQ连接信息
  17. */
  18. //指定IP
  19. connectionFactory.setHost("192.168.133.129");
  20. //指定端口(控制台端口15672)
  21. connectionFactory.setPort(5672);
  22. //指定账号
  23. connectionFactory.setUsername("root");
  24. //指定密码
  25. connectionFactory.setPassword("root");
  26. //定义连接
  27. Connection connection = null;
  28. //定义通道
  29. Channel channel = null;
  30. try {
  31. //获取连接
  32. connection = connectionFactory.newConnection();
  33. //获取通道
  34. channel = connection.createChannel();
  35. /**
  36. * 由于Fanout类型的交换机的消息时类似于广播的模式,它不需要绑定RoutingKey
  37. * 而又可能会有很多个消费来接收这个交换机中的数据,因此创建队列时要创建一个随机的队列
  38. *
  39. * 没有参数的channel.queueDeclare()方法会创建一个名字为随机的一个队列
  40. * 这个队列的数据
  41. * 非持久化
  42. * 排外(同时最多只允许有一个消费者监听当前队列)
  43. * 自动删除(当没有任何消费者监听队列时这个队列会自动删除)
  44. *
  45. * getQueue() 方法用于获取这个随机的队列名
  46. */
  47. String queueName = channel.queueDeclare().getQueue();
  48. String exchangeName = "fanoutExchange";
  49. String exchangeType = "fanout";
  50. channel.exchangeDeclare(exchangeName,exchangeType,true);
  51. //将这个随机的队列绑定到交换机中, 由于是fanout类型的交换机因此不需指定RoutingKey进行绑定
  52. channel.queueBind(queueName,exchangeName,"");
  53. //接收消息
  54. channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
  55. @Override
  56. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  57. String message = new String(body);
  58. System.out.println("fanout消费者----" + message);
  59. }
  60. });
  61. } catch (IOException e) {
  62. e.printStackTrace();
  63. } catch (TimeoutException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }

 查看RabbitMQ的管理控制台,如下

注:

1、fanout模式的消息需要将一个消息同时绑定到多个队列中因此这里不能创建并指定某个队列

2、Fanout模式必须先启动消息接收者队列监听消息,否则会丢失消息

3、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。

4、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchang即可让所有接收者同时接收同一个消息是一种广播的消息机制

3、topic交换机

发送消息

  1. /**
  2. * 发送消息,topic交换机(一对多)
  3. *
  4. * Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,
  5. * #表示通配任意一个单词 *表示通配任意多个单词,
  6. * 例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息
  7. */
  8. public class TopicSend {
  9. public static void main(String[] args) {
  10. //创建连接工厂
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. /**
  13. * 配置RabbitMQ连接信息
  14. */
  15. //指定IP
  16. connectionFactory.setHost("192.168.133.129");
  17. //指定端口(控制台端口15672)
  18. connectionFactory.setPort(5672);
  19. //指定账号
  20. connectionFactory.setUsername("root");
  21. //指定密码
  22. connectionFactory.setPassword("root");
  23. //定义连接
  24. Connection connection = null;
  25. //定义通道
  26. Channel channel = null;
  27. try {
  28. //获取连接
  29. connection = connectionFactory.newConnection();
  30. //获取通道
  31. channel = connection.createChannel();
  32. /**
  33. * 声明一个交换机
  34. * 参数1 交换机名称
  35. * 参数2 交换机类型,取值:direct,fanout,topic,headers
  36. * 参数3 为是否为持久化交换机
  37. *
  38. * 由于使用topic类型的交换机,因此消息的接收方可能会有多个因此不建议在消息发送时来创建队列
  39. * 以及绑定交换机,建议在消费者中创建队列并绑定交换机
  40. * 但是发送消息时至少应该确保交换机存在
  41. */
  42. String exchangeName = "topicExchange";
  43. String exchangeType = "topic";
  44. channel.exchangeDeclare(exchangeName,exchangeType,true);
  45. String message="topic的测试消息aa.bb.cc!";
  46. /**
  47. * 发送消息到指定队列
  48. * 参数1 交换机名称
  49. * 参数2 消息的RoutingKey,如果这个消息的RoutingKey和某个队列与交换机绑定的RoutingKey一致,
  50. * 那么这个消息就会发送的指定的队列中
  51. * 参数3 消息属性信息,通常为null
  52. * 参数4 具体的消息数据的字节数组
  53. *
  54. * Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,
  55. * #表示通配任意一个单词 *表示通配任意多个单词,
  56. * 例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息
  57. * aa;aa.bb;aa.bb.cc
  58. *
  59. * 注:发送消息时必须确保交换机已经创建,并且已经绑定到了某个队列
  60. */
  61. channel.basicPublish(exchangeName,"aa.bb.cc",null,message.getBytes("UTF-8"));
  62. System.out.println("topic交换机消息发送成功!");
  63. } catch (IOException e) {
  64. e.printStackTrace();
  65. } catch (TimeoutException e) {
  66. e.printStackTrace();
  67. } finally {
  68. if(channel!=null){
  69. try {
  70. channel.close();
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. } catch (TimeoutException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. if(connection!=null){
  78. try {
  79. connection.close();
  80. } catch (IOException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. }
  86. }

接收消息,接收消息可以有多个类

  1. /**
  2. * 接收消息,topic交换机
  3. */
  4. public class TopicReceive {
  5. public static void main(String[] args) {
  6. //创建连接工厂
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. /**
  9. * 配置RabbitMQ连接信息
  10. */
  11. //指定IP
  12. connectionFactory.setHost("192.168.133.129");
  13. //指定端口(控制台端口15672)
  14. connectionFactory.setPort(5672);
  15. //指定账号
  16. connectionFactory.setUsername("root");
  17. //指定密码
  18. connectionFactory.setPassword("root");
  19. //定义连接
  20. Connection connection = null;
  21. //定义通道
  22. Channel channel = null;
  23. try {
  24. //获取连接
  25. connection = connectionFactory.newConnection();
  26. //获取通道
  27. channel = connection.createChannel();
  28. /**
  29. * Topic 类型的交换机也是消息一对多的一种交换机类型,它和fanout都能实现一个消息同时发送给多个队列
  30. *
  31. * fanout更适合于使用在一个功能不同的进程来获取数据,例如手机App中的消息推送,一个App可能会还有很
  32. * 多个用户来进行安装然后他们都会启动一个随机的队列来接收着自己的数据
  33. *
  34. * Topic更适合不同的功能模块来接收同一个消息,例如商城下单成功以后需要发送消息到队列中。例如RoutingKey
  35. * 为 的order.success,物流系统监听订单order.* 发票系统监听order.*
  36. *
  37. * Topic可以使用随机的队列名也可以使用一个明确的队列名,但是如果应用在和订单有关的功能中,建议是有个
  38. * 名取的队列名并且要求为持久化的队列
  39. */
  40. //声明队列
  41. String queueName = "topicQueue";
  42. channel.queueDeclare(queueName,true,false,false,null);
  43. //声明交换机
  44. String exchageName = "topicExchange";
  45. String exchageType = "topic";
  46. channel.exchangeDeclare(exchageName,exchageType,true);
  47. //绑定队列和交换机
  48. channel.queueBind(queueName,exchageName,"aa");
  49. //接收消息
  50. channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
  51. @Override
  52. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  53. String message=new String(body);
  54. System.out.println("TopicReceive消费者aa ---" + message);
  55. }
  56. });
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. } catch (TimeoutException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }

注:

1、在topic模式中必须要指定Routingkey,并且可以同时指定多层的RoutingKey,每个层次之间使用 点分隔即可 例如 test.myRoutingKey

2、Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,#表示通配任意一个单词 *表示通配任意多个单词,例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息

3、事务消息

事务消息与数据库的事务类似,MQ中的消息是要保证消息是否会全部发送成功,防止丢失消息的一种策略。

RabbitMQ有两种方式:

1、通过AMQP提供的事务机制实现

2、使用发送者确认模式实现

1、事务

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

channel.txSelect()声明启动事务模式

channel.txCommint()提交事务

channel.txRollback()回滚事务

消息发送类

  1. /**
  2. * 事务发送消息
  3. *
  4. * 事务消息与数据库的事务类似,只是MQ中的消息是要保证消息是否会全部发送成功,防止丢失消息的一种策略
  5. * RabbitMQ有两种方式来解决这个问题:
  6. * 1、通过AMQP提供的事务机制实现;
  7. * 2、使用发送者确认模式实现;
  8. *
  9. * 事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
  10. * 1、channel.txSelect()声明启动事务模式;
  11. * 2、channel.txCommint()提交事务;
  12. * 3、channel.txRollback()回滚事务;
  13. */
  14. public class TransactionSend {
  15. public static void main(String[] args) {
  16. //创建连接工厂
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. /**
  19. * 配置RabbitMQ连接信息
  20. */
  21. //指定IP
  22. connectionFactory.setHost("192.168.133.129");
  23. //指定端口(控制台端口15672)
  24. connectionFactory.setPort(5672);
  25. //指定账号
  26. connectionFactory.setUsername("root");
  27. //指定密码
  28. connectionFactory.setPassword("root");
  29. //定义连接
  30. Connection connection = null;
  31. //定义通道
  32. Channel channel = null;
  33. try {
  34. //获取连接
  35. connection = connectionFactory.newConnection();
  36. //获取通道
  37. channel = connection.createChannel();
  38. //声明队列
  39. String queueName = "transactionQueue";
  40. channel.queueDeclare(queueName,true,false,false,null);
  41. //声明交换机
  42. String exchangeName = "transactionExchange";
  43. String exchangeType = "direct";
  44. channel.exchangeDeclare(exchangeName,exchangeType,true);
  45. //绑定队列和交换机
  46. String routingKey = "transactionRoutingKey";
  47. channel.queueBind(queueName,exchangeName,routingKey);
  48. String message="事务的测试消息的测试消息!";
  49. //启动一个事务,启动事务以后所有消息写入到队列中;必须显示的调用 txCommit()提交事务或txRollback()回滚事务
  50. channel.txSelect();
  51. //发送消息
  52. channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));
  53. //提交事务,如果调用txSelect()方法启动了事务,必须显示调用事务的提交;否则消息不会真正的写入到队列,提交时以后会将内存中的消息写入队列并释放内存
  54. channel.txCommit();
  55. System.out.println("事务消息发送成功");
  56. } catch (IOException e) {
  57. e.printStackTrace();
  58. } catch (TimeoutException e) {
  59. e.printStackTrace();
  60. } finally {
  61. if(channel!=null){
  62. try {
  63. //回滚事务,放弃当前事务中所有没有提交的消息,释放内存
  64. channel.txRollback();
  65. channel.close();
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. } catch (TimeoutException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. if(connection!=null){
  73. try {
  74. connection.close();
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }
  80. }
  81. }

消息接收类

  1. /**
  2. * 事务接收消息
  3. */
  4. public class TransactionReceive {
  5. public static void main(String[] args) {
  6. //创建连接工厂
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. /**
  9. * 配置RabbitMQ连接信息
  10. */
  11. //指定IP
  12. connectionFactory.setHost("192.168.133.129");
  13. //指定端口(控制台端口15672)
  14. connectionFactory.setPort(5672);
  15. //指定账号
  16. connectionFactory.setUsername("root");
  17. //指定密码
  18. connectionFactory.setPassword("root");
  19. //定义连接
  20. Connection connection = null;
  21. //定义通道
  22. Channel channel = null;
  23. try {
  24. //获取连接
  25. connection = connectionFactory.newConnection();
  26. //获取通道
  27. channel = connection.createChannel();
  28. //声明队列
  29. String queueName = "transactionQueue";
  30. channel.queueDeclare(queueName,true,false,false,null);
  31. //声明交换机
  32. String exchangeName = "transactionExchange";
  33. String exchangeType = "direct";
  34. channel.exchangeDeclare(exchangeName,exchangeType,true);
  35. //绑定队列和交换机
  36. String routingKey = "transactionRoutingKey";
  37. channel.queueBind(queueName,exchangeName,routingKey);
  38. /**
  39. * 开启事务
  40. * 当消费者开启事务以后,即使不作为事务的提交,那么依然可以获取队列中的消息并且将消息从队列中移除掉
  41. * 注:
  42. * 暂时事务队列接收者没有任何的影响
  43. */
  44. channel.txSelect();
  45. channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
  46. @Override
  47. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  48. String message = new String(body);
  49. System.out.println("事务消费者 ----" + message);
  50. }
  51. });
  52. } catch (IOException e) {
  53. e.printStackTrace();
  54. } catch (TimeoutException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }

2、消息确认

消息确认有三种方式

1、channel.waitForConfirms()普通发送方确认模式

  1. /**
  2. * Confirm的三种实现方式:
  3. * 方式一:channel.waitForConfirms()普通发送方确认模式
  4. */
  5. public class WaitForConfirmsSend {
  6. public static void main(String[] args) {
  7. //创建连接工厂
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. /**
  10. * 配置RabbitMQ连接信息
  11. */
  12. //指定IP
  13. connectionFactory.setHost("192.168.133.129");
  14. //指定端口(控制台端口15672)
  15. connectionFactory.setPort(5672);
  16. //指定账号
  17. connectionFactory.setUsername("root");
  18. //指定密码
  19. connectionFactory.setPassword("root");
  20. //定义连接
  21. Connection connection = null;
  22. //定义通道
  23. Channel channel = null;
  24. try {
  25. //获取连接
  26. connection = connectionFactory.newConnection();
  27. //获取通道
  28. channel = connection.createChannel();
  29. //声明队列
  30. String queueName = "confirmsQueue";
  31. channel.queueDeclare(queueName,true,false,false,null);
  32. //声明交换机
  33. String exchangeName = "confirmsExchange";
  34. String exchangeType = "direct";
  35. channel.exchangeDeclare(exchangeName,exchangeType,true);
  36. //绑定队列和交换机
  37. String routingKey = "confirmsRoutingKey";
  38. channel.queueBind(queueName,exchangeName,routingKey);
  39. String message="普通发送者确认模式waitForConfirms的测试消息!";
  40. //启动普通发送者确认模式
  41. channel.confirmSelect();
  42. channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));
  43. /*
  44. 阻塞线程等待服务返回响应 ,用于是否消费发送成功,如果服务确认消费已经发送完成则返回true 否则返回false
  45. 可以为这个方法指定一个毫秒用于确定我们的需要等待服务确认的超时时间,
  46. 如果超过了指定的时间以后则会抛出异常InterruptedException 表示服务器出现问题了需要补发消息或
  47. 将消息缓存到Redis中稍后利用定时任务补发
  48. 无论是返回false还是抛出异常消息都有可能发送成功有可能没有发送成功
  49. 如果我们要求这个消息一定要发送到队列例如订单数据,那怎么我们可以采用消息补发(重新发送一次消息,可以使用递归或利用Redis+定时任务来完成补发)
  50. * */
  51. boolean flag = channel.waitForConfirms();
  52. System.out.println("消息发送成功" + flag);
  53. } catch (IOException e) {
  54. e.printStackTrace();
  55. } catch (TimeoutException e) {
  56. e.printStackTrace();
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. } finally {
  60. if(channel!=null){
  61. try {
  62. channel.close();
  63. } catch (IOException e) {
  64. e.printStackTrace();
  65. } catch (TimeoutException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. if(connection!=null){
  70. try {
  71. connection.close();
  72. } catch (IOException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. }
  77. }
  78. }

2、channel.waitForConfirmsOrDie()批量确认模式

  1. /**
  2. * Confirm的三种实现方式:
  3. * 方式二:channel.waitForConfirmsOrDie()批量确认模式
  4. */
  5. public class WaitForConfirmsOrDieSend {
  6. public static void main(String[] args) {
  7. //创建连接工厂
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. /**
  10. * 配置RabbitMQ连接信息
  11. */
  12. //指定IP
  13. connectionFactory.setHost("192.168.133.129");
  14. //指定端口(控制台端口15672)
  15. connectionFactory.setPort(5672);
  16. //指定账号
  17. connectionFactory.setUsername("root");
  18. //指定密码
  19. connectionFactory.setPassword("root");
  20. //定义连接
  21. Connection connection = null;
  22. //定义通道
  23. Channel channel = null;
  24. try {
  25. //获取连接
  26. connection = connectionFactory.newConnection();
  27. //获取通道
  28. channel = connection.createChannel();
  29. //声明队列
  30. String queueName = "confirmsQueue";
  31. channel.queueDeclare(queueName,true,false,false,null);
  32. //声明交换机
  33. String exchangeName = "confirmsExchange";
  34. String exchangeType = "direct";
  35. channel.exchangeDeclare(exchangeName,exchangeType,true);
  36. //绑定队列和交换机
  37. String routingKey = "confirmsRoutingKey";
  38. channel.queueBind(queueName,exchangeName,routingKey);
  39. String message="普通发送者确认模式waitForConfirms的测试消息!";
  40. //启动普通发送者确认模式
  41. channel.confirmSelect();
  42. channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));
  43. /**
  44. * waitForConfirmsOrDie 批量消息确认,它会同时向服务中确认之前当前通道中发送的所有的消息是否已经全部成功写入
  45. * 这个方法没有任何的返回值,如果服务器中有一条消息没有能够成功或向服务器发送确认时服务不可访问都被认定为
  46. * 消息确认失败,可能有消息没有发送成功,我们需要进行消费的补发。
  47. * 如果无法向服务器获取确认信息那么方法就会抛出InterruptedException异常,这时就需要补发消息到队列
  48. * waitForConfirmsOrDie方法可以指定一个参数timeout 用于等待服务器的确认时间,如果超过这个时间也会
  49. * 抛出异常,表示确认失败需要补发消息
  50. *
  51. * 注:
  52. * 批量消息确认的速度比普通的消息确认要快,但是如果一旦出现了消息补发的情况,我们不能确定具体
  53. * 是哪条消息没有完成发送,需要将本次的发送的所有消息全部进行补发
  54. *
  55. */
  56. channel.waitForConfirmsOrDie();
  57. System.out.println("消息发送成功");
  58. } catch (IOException e) {
  59. e.printStackTrace();
  60. } catch (TimeoutException e) {
  61. e.printStackTrace();
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. } finally {
  65. if(channel!=null){
  66. try {
  67. channel.close();
  68. } catch (IOException e) {
  69. e.printStackTrace();
  70. } catch (TimeoutException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. if(connection!=null){
  75. try {
  76. connection.close();
  77. } catch (IOException e) {
  78. e.printStackTrace();
  79. }
  80. }
  81. }
  82. }
  83. }

3、channel.addConfirmListener()异步监听发送方确认模式

  1. /**
  2. * Confirm的三种实现方式:
  3. * 方式三:channel.addConfirmListener()异步监听发送方确认模式
  4. */
  5. public class AddConfirmListenerSend {
  6. public static void main(String[] args) {
  7. //创建连接工厂
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. /**
  10. * 配置RabbitMQ连接信息
  11. */
  12. //指定IP
  13. connectionFactory.setHost("192.168.133.129");
  14. //指定端口(控制台端口15672)
  15. connectionFactory.setPort(5672);
  16. //指定账号
  17. connectionFactory.setUsername("root");
  18. //指定密码
  19. connectionFactory.setPassword("root");
  20. //定义连接
  21. Connection connection = null;
  22. //定义通道
  23. Channel channel = null;
  24. try {
  25. //获取连接
  26. connection = connectionFactory.newConnection();
  27. //获取通道
  28. channel = connection.createChannel();
  29. //声明队列
  30. String queueName = "confirmsQueue";
  31. channel.queueDeclare(queueName, true, false, false, null);
  32. //声明交换机
  33. String exchangeName = "confirmsExchange";
  34. String exchangeType = "direct";
  35. channel.exchangeDeclare(exchangeName, exchangeType, true);
  36. //绑定队列和交换机
  37. String routingKey = "confirmsRoutingKey";
  38. channel.queueBind(queueName, exchangeName, routingKey);
  39. String message = "普通发送者确认模式waitForConfirms的测试消息!";
  40. //启动普通发送者确认模式
  41. channel.confirmSelect();
  42. /**
  43. * 异步消息确认监听器,需要在发送消息前启动
  44. */
  45. channel.addConfirmListener(new ConfirmListener() {
  46. /**
  47. 消息确认以后的回调方法
  48. 参数 1 为被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
  49. 参数 2 为当前消息是否同时确认了多个
  50. 注:如果参数 2 为true 则表示本次确认同时确认了多条消息,消息等于当前参数1(消息编号)的所有消息
  51. 全部被确认 如果为false 则表示只确认多了当前编号的消息
  52. * */
  53. public void handleAck(long l, boolean b) throws IOException {
  54. System.out.println("消息被确认了 --- 消息编号:" + l +" 是否确认了多条:" + b);
  55. }
  56. /**
  57. 消息没有确认的回调方法
  58. 如果这个方法被执行表示当前的消息没有被确认 需要进行消息补发
  59. 参数 1 为没有被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
  60. 参数 2 为当前消息是否同时没有确认多个
  61. 注: 如果参数2 为true 则表示小于当前编号的所有的消息可能都没有发送成功需要进行消息的补发
  62. 如果参数2 为false则表示当前编号的消息没法发送成功需要进行补发
  63. */
  64. public void handleNack(long l, boolean b) throws IOException {
  65. System.out.println("消息没有被确认-----消息编号:" + l + " 是否没有确认多条:" + b);
  66. }
  67. });
  68. for(int i = 0 ;i < 1000; i++){
  69. channel.basicPublish(exchangeName,routingKey,null,message.getBytes("UTF-8"));
  70. }
  71. System.out.println("消息发送成功");
  72. } catch (UnsupportedEncodingException e) {
  73. e.printStackTrace();
  74. } catch (IOException e) {
  75. e.printStackTrace();
  76. } catch (TimeoutException e) {
  77. e.printStackTrace();
  78. } finally {
  79. //在发批量多条消息时,关闭会导致消息编号错误
  80. // if(channel!=null){
  81. // try {
  82. // channel.close();
  83. // } catch (IOException e) {
  84. // e.printStackTrace();
  85. // } catch (TimeoutException e) {
  86. // e.printStackTrace();
  87. // }
  88. // }
  89. // if(connection!=null){
  90. // try {
  91. // connection.close();
  92. // } catch (IOException e) {
  93. // e.printStackTrace();
  94. // }
  95. // }
  96. }
  97. }
  98. }

消息接收

  1. /**
  2. * 确认模式接收消息
  3. */
  4. public class ConfirmReceive {
  5. public static void main(String[] args) {
  6. //创建连接工厂
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. /**
  9. * 配置RabbitMQ连接信息
  10. */
  11. //指定IP
  12. connectionFactory.setHost("192.168.133.129");
  13. //指定端口(控制台端口15672)
  14. connectionFactory.setPort(5672);
  15. //指定账号
  16. connectionFactory.setUsername("root");
  17. //指定密码
  18. connectionFactory.setPassword("root");
  19. //定义连接
  20. Connection connection = null;
  21. //定义通道
  22. Channel channel = null;
  23. try {
  24. //获取连接
  25. connection = connectionFactory.newConnection();
  26. //获取通道
  27. channel = connection.createChannel();
  28. //声明队列
  29. String queueName = "confirmsQueue";
  30. channel.queueDeclare(queueName,true,false,false,null);
  31. //声明交换机
  32. String exchangeName = "confirmsExchange";
  33. String exchangeType = "direct";
  34. channel.exchangeDeclare(exchangeName,exchangeType,true);
  35. //绑定队列和交换机
  36. String routingKey = "confirmsRoutingKey";
  37. channel.queueBind(queueName,exchangeName,routingKey);
  38. //启动事务
  39. channel.txSelect();
  40. /**
  41. * 接收消息
  42. * 参数 2 为消息的确认机制,true表示自动消息确认,确认以后消息会从队列中被移除 ,当读取完消息以后就会自动确认
  43. * 如果为false 表示手动确认消息
  44. * 注:
  45. * 如果我们只是接收的消息但是还没有来得处理,当前应用就崩溃或在进行处理的时候例如像数据库中
  46. * 写数据但是数据库这时不可用,那么由于消息是自动确认的那么这个消息就会在接收完成以后自动从队列中
  47. * 被删除,这就会丢失消息
  48. */
  49. channel.basicConsume(queueName,false,"",new DefaultConsumer(channel){
  50. @Override
  51. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  52. //获取当前当前消息是否被接收过一次如果返回值为false表示消息之前没有被接收过,如果返回值为true
  53. //则表示之前这个消息被接收过,可能也处理完成,因此我们要进行消息的防重复处理
  54. Boolean isReceived = envelope.isRedeliver();
  55. //获取当前内部类中的通道
  56. Channel chan = this.getChannel();
  57. if (!isReceived){
  58. String message = new String(body);
  59. System.out.println("确认模式消费者 处理了消息---"+message);
  60. //获取消息的编号,我们需要根据消息的编号来确认消息
  61. long tag = envelope.getDeliveryTag();
  62. /**
  63. 手动确认消息,确认以后表示当前消息已经成功处理了,需要从队列中移除掉
  64. 这个方法应该在当前消息的处理程序全部完成以后执行
  65. 参数 1 为消息的序号
  66. 参数 2 为是否确认多个,如果为true则表示需要确认小等于当前编号的所有消息,false就是单个确认值确认当前消息
  67. * */
  68. chan.basicAck(tag,true);
  69. } else {
  70. /*
  71. 这个消息之前已经被接收过需要进行防重复处理
  72. 如查询数据库中是否已经添加了记录或已经修改过了记录
  73. 如果经过判断这条没有被处理完成则需要重新处理消息然后确认掉这条消息
  74. 如果已经处理过了则直接确认消息即可不需要进行其他处理操作
  75. * */
  76. //获取消息的编号,我们需要根据消息的编号来确认消息
  77. long tag = envelope.getDeliveryTag();
  78. chan.basicAck(tag,false);
  79. }
  80. //注:如果启动了事务,而消息消费者确认模式为手动确认,那么必须要提交事务否则即使调用了确认方法
  81. //那么消息也不会从队列中被移除掉
  82. chan.txCommit();
  83. }
  84. });
  85. } catch (IOException e) {
  86. e.printStackTrace();
  87. } catch (TimeoutException e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/431662
推荐阅读
相关标签
  

闽ICP备14008679号