当前位置:   article > 正文

rabbitMQ介绍及使用方法_mq的使用

mq的使用

目录

一、MQ概述

二、RabbitMQ简介

三、RabbitMQ的五种工作模式

1、简单模式

2、work queues工作队列模式

3、Pub/Sub 订阅模式

4、Routing 路由模式

5、Topics 通配符模式


一、MQ概述

MQ全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统

之间进行通信。

总结:

MQ,消息队列,存储消息的中间件

分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信

发送方称为生产者,接收方称为消费者

MQ的优点:

应用解耦:提高系统容错性和可维护性

异步提速:提升用户体验和系统吞吐量

削峰填谷:提高系统稳定性

MQ的缺点:

1、系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

2、系统复杂度提高

MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调

用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

3、一致性问题

A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?

使用MQ的条件

1、生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为

空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

2、容许短暂的不一致性。

3、确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

常见的MQ

二、RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用

层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消

息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。

类比HTTP。

2007年,Rabbit 技术公司基于AMQP标准开发的RabbitMQ1.0发布。RabbitMQ采用Erlang语言开

发。Erlang 语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广

泛。

RabbitMQ中的相关概念:

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue 等

Connection:publisher / consumer和broker之间的TCP连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候TCPConnection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别 channel,所以 channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection 的开销。

Exchange:message到达 broker的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待consumer取走

Binding:exchange和queue之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到exchange 中的查询表中,用于message 的分发依据

RabbitMQ提供了6种工作模式:

简单模式、work queues、Publish/Subscribe发布与订阅模式、

Routing路由模式、Topics主题模式、RPC远程调用模式((远程调用,不太算MQ;暂不作介绍)。

官网对应模式介绍: https://www.rabbitmq.com


JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间

件的API

JMS是JavaEE规范中的一种,类比JDBC

很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是

开源社区有

三、RabbitMQ的五种工作模式

首先创建一个maven工程,里面创建两个模块(一个生产者,一个消费者)

2、添加依赖(生产者消费者都要依赖)

  1. <dependencies>
  2. <!--rabbitmq java 客户端-->
  3. <dependency>
  4. <groupId>com.rabbitmq</groupId>
  5. <artifactId>amqp-client</artifactId>
  6. <version>5.6.0</version>
  7. </dependency>
  8. </dependencies>
  9. <build>
  10. <plugins>
  11. <plugin>
  12. <groupId>org.apache.maven.plugins</groupId>
  13. <artifactId>maven-compiler-plugin</artifactId>
  14. <version>3.8.0</version>
  15. <configuration>
  16. <source>1.8</source>
  17. <target>1.8</target>
  18. </configuration>
  19. </plugin>
  20. </plugins>
  21. </build>

1、简单模式

P:生产者,也就是要发送消息的程序
C:消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从甚中取出消息

1、创建生产者

  1. public class HelloRabbitmq {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");//ip 默认值 localhost
  7. factory.setPort(5672); //端口 默认值 5672
  8. //factory.setVirtualHost("/itcast");//虚拟机 默认值/
  9. factory.setUsername("admin");//用户名 默认 guest
  10. factory.setPassword("admin");//密码 默认值 guest
  11. //3. 创建连接 Connection
  12. Connection connection = factory.newConnection();
  13. //4. 创建Channel
  14. Channel channel = connection.createChannel();
  15. //5. 创建队列Queue
  16. /*
  17. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  18. 参数:
  19. 1. queue:队列名称
  20. 2. durable:是否持久化,当mq重启之后,还在
  21. 3. exclusive:
  22. * 是否独占。只能有一个消费者监听这队列
  23. * 当Connection关闭时,是否删除队列
  24. *
  25. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  26. 5. arguments:参数。
  27. */
  28. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  29. channel.queueDeclare("hello_world",true,false,false,null);
  30. String body = "Hello rabbitmq....";
  31. /*
  32. * basicPublish(String exchange,String routingKey, BasicProperties props, byte[ ] body)参数:
  33. 1. exchange:交换机名称。简单模式下交换机会使用黑认的“"
  34. *2. routingKey:路由名称,默认交换机和队列名一样就可以绑定
  35. 3. props :配置信息
  36. 4. body:发送消息数据
  37. * */
  38. //发送消息
  39. channel.basicPublish("","hello_world",null,body.getBytes());
  40. channel.close();
  41. connection.close();
  42. }
  43. }

2、创建消费者

  1. public class ConsumerHelloWorld {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");
  7. factory.setPort(5672);
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. //3. 创建连接 Connection
  11. Connection connection = factory.newConnection();
  12. //4. 创建Channel
  13. Channel channel = connection.createChannel();
  14. //5. 创建队列Queue
  15. /*
  16. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  17. 参数:
  18. 1. queue:队列名称
  19. 2. durable:是否持久化,当mq重启之后,还在
  20. 3. exclusive:
  21. * 是否独占。只能有一个消费者监听这队列
  22. * 当Connection关闭时,是否删除队列
  23. *
  24. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  25. 5. arguments:参数。
  26. */
  27. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  28. channel.queueDeclare("hello_world",true,false,false,null);
  29. /*
  30. basicConsume(String queue, boolean autoAck, Consumer callback)
  31. 参数:
  32. 1. queue:队列名称
  33. 2. autoAck:是否自动确认
  34. 3. callback:回调对象
  35. */
  36. // 接收消息
  37. Consumer consumer = new DefaultConsumer(channel){
  38. /*
  39. 回调方法,当收到消息后,会自动执行该方法
  40. 1. consumerTag:标识
  41. 2. envelope:获取一些信息,交换机,路由key...
  42. 3. properties:配置信息
  43. 4. body:数据
  44. */
  45. @Override
  46. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  47. System.out.println("consumerTag:" + consumerTag);
  48. System.out.println("Exchange:" + envelope.getExchange());
  49. System.out.println("RoutingKey:" + envelope.getRoutingKey());
  50. System.out.println("properties:" + properties);
  51. System.out.println("body:" + new String(body));
  52. }
  53. };
  54. channel.basicConsume("hello_world",true,consumer);
  55. //关闭资源?不要
  56. }
  57. }

2、work queues工作队列模式

Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个

队列中的消息。应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

1、创建生产者

  1. public class ProducerWorkQueues {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");//ip 默认值 localhost
  7. factory.setPort(5672); //端口 默认值 5672
  8. //factory.setVirtualHost("/itcast");//虚拟机 默认值/
  9. factory.setUsername("admin");//用户名 默认 guest
  10. factory.setPassword("admin");//密码 默认值 guest
  11. //3. 创建连接 Connection
  12. Connection connection = factory.newConnection();
  13. //4. 创建Channel
  14. Channel channel = connection.createChannel();
  15. //5. 创建队列Queue
  16. /*
  17. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  18. 参数:
  19. 1. queue:队列名称
  20. 2. durable:是否持久化,当mq重启之后,还在
  21. 3. exclusive:
  22. * 是否独占。只能有一个消费者监听这队列
  23. * 当Connection关闭时,是否删除队列
  24. *
  25. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  26. 5. arguments:参数。
  27. */
  28. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  29. channel.queueDeclare("hello_workQueues",true,false,false,null);
  30. //创建消息
  31. for (int i = 1; i <= 10; i++) {
  32. String body = i+"hello rabbitmq~~~";
  33. //6. 发送消息
  34. channel.basicPublish("","hello_workQueues",null,body.getBytes());
  35. }
  36. /*
  37. * basicPublish(String exchange,String routingKey, BasicProperties props, byte[ ] body)参数:
  38. 1. exchange:交换机名称。简单模式下交换机会使用黑认的“"
  39. 2. routingKey:路由名称
  40. 3. props :配置信息
  41. 4. body:发送消息数据
  42. * */
  43. //7.释放资源
  44. channel.close();
  45. connection.close();
  46. }
  47. }

2、创建消费者:这里要创建两个消费者才能看到效果

  1. public class ConsumerWorkQueues1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");
  7. factory.setPort(5672);
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. //3. 创建连接 Connection
  11. Connection connection = factory.newConnection();
  12. //4. 创建Channel
  13. Channel channel = connection.createChannel();
  14. //5. 创建队列Queue
  15. /*
  16. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  17. 参数:
  18. 1. queue:队列名称
  19. 2. durable:是否持久化,当mq重启之后,还在
  20. 3. exclusive:
  21. * 是否独占。只能有一个消费者监听这队列
  22. * 当Connection关闭时,是否删除队列
  23. *
  24. 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  25. 5. arguments:参数。
  26. */
  27. //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
  28. channel.queueDeclare("hello_workQueues",true,false,false,null);
  29. /*
  30. basicConsume(String queue, boolean autoAck, Consumer callback)
  31. 参数:
  32. 1. queue:队列名称
  33. 2. autoAck:是否自动确认
  34. 3. callback:回调对象
  35. */
  36. // 接收消息
  37. Consumer consumer = new DefaultConsumer(channel){
  38. /*
  39. 回调方法,当收到消息后,会自动执行该方法
  40. 1. consumerTag:标识
  41. 2. envelope:获取一些信息,交换机,路由key...
  42. 3. properties:配置信息
  43. 4. body:数据
  44. */
  45. @Override
  46. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  47. System.out.println("body:" + new String(body));
  48. }
  49. };
  50. channel.basicConsume("hello_workQueues",true,consumer);
  51. //关闭资源?不要
  52. }
  53. }

3、Pub/Sub 订阅模式

在订阅模型中,多了一个Exchange角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机 (X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
>Fanout:广播,将消息交给所有绑定到交换机的队列
>Direct:定向,把消息交给符合指定routing key 的队列
>Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

1、创建生产者

  1. public class ProducerPubSub {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");//ip 默认值 localhost
  7. factory.setPort(5672); //端口 默认值 5672
  8. //factory.setVirtualHost("/itcast");//虚拟机 默认值/
  9. factory.setUsername("admin");//用户名 默认 guest
  10. factory.setPassword("admin");//密码 默认值 guest
  11. //3. 创建连接 Connection
  12. Connection connection = factory.newConnection();
  13. //4. 创建Channel
  14. Channel channel = connection.createChannel();
  15. /*
  16. exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
  17. 参数:
  18. 1. exchange:交换机名称
  19. 2. type:交换机类型
  20. DIRECT("direct"),:定向
  21. FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  22. TOPIC("topic"),通配符的方式
  23. HEADERS("headers");参数匹配
  24. 3. durable:是否持久化
  25. 4. autoDelete:自动删除
  26. 5. internal:内部使用。 一般false
  27. 6. arguments:参数
  28. */
  29. String exchangeName = "test_fanout";
  30. //5. 创建交换机
  31. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
  32. //6、创建队列
  33. String queueName1 = "test_fanout_queue1";
  34. String queueName2 = "test_fanout_queue2";
  35. channel.queueDeclare(queueName1,true,false,false,null);
  36. channel.queueDeclare(queueName2,true,false,false,null);
  37. //7. 绑定队列和交换机
  38. /*
  39. queueBind(String queue, String exchange, String routingKey)
  40. 参数:
  41. 1. queue:队列名称
  42. 2. exchange:交换机名称
  43. 3. routingKey:路由键,绑定规则
  44. 如果交换机的类型为fanout ,routingKey设置为""
  45. */
  46. channel.queueBind(queueName1,exchangeName,"");
  47. channel.queueBind(queueName2,exchangeName,"");
  48. String body = "发送消息。。。。";
  49. //8. 发送消息
  50. channel.basicPublish(exchangeName,"",null,body.getBytes());
  51. //9. 释放资源
  52. channel.close();
  53. connection.close();
  54. }
  55. }

2、创建消费者:这里要创建两个消费者才能看到效果

  1. public class ConsumerPubSub1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");
  7. factory.setPort(5672);
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. //3. 创建连接 Connection
  11. Connection connection = factory.newConnection();
  12. //4. 创建Channel
  13. Channel channel = connection.createChannel();
  14. //队列名称
  15. String queueName1 = "test_fanout_queue1";
  16. String queueName2 = "test_fanout_queue2";
  17. /*
  18. basicConsume(String queue, boolean autoAck, Consumer callback)
  19. 参数:
  20. 1. queue:队列名称
  21. 2. autoAck:是否自动确认
  22. 3. callback:回调对象
  23. */
  24. // 接收消息
  25. Consumer consumer = new DefaultConsumer(channel){
  26. /*
  27. 回调方法,当收到消息后,会自动执行该方法
  28. 1. consumerTag:标识
  29. 2. envelope:获取一些信息,交换机,路由key...
  30. 3. properties:配置信息
  31. 4. body:数据
  32. */
  33. @Override
  34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  35. System.out.println("body:"+new String(body));
  36. System.out.println("将日志信息打印到控制台.....");
  37. }
  38. };
  39. channel.basicConsume(queueName1,true,consumer);
  40. }
  41. }

 执行消费者结果

4、Routing 路由模式

模式说明:

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在向Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

图解:

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
X: Exchange (交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为error 的消息
C2:消费者,其所在队列指定了需要routing key为info、error、warning 的消息
 

1、创建生产者

  1. public class ProducerRouting {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");//ip 默认值 localhost
  7. factory.setPort(5672); //端口 默认值 5672
  8. //factory.setVirtualHost("/itcast");//虚拟机 默认值/
  9. factory.setUsername("admin");//用户名 默认 guest
  10. factory.setPassword("admin");//密码 默认值 guest
  11. //3. 创建连接 Connection
  12. Connection connection = factory.newConnection();
  13. //4. 创建Channel
  14. Channel channel = connection.createChannel();
  15. /*
  16. exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
  17. 参数:
  18. 1. exchange:交换机名称
  19. 2. type:交换机类型
  20. DIRECT("direct"),:定向
  21. FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  22. TOPIC("topic"),通配符的方式
  23. HEADERS("headers");参数匹配
  24. 3. durable:是否持久化
  25. 4. autoDelete:自动删除
  26. 5. internal:内部使用。 一般false
  27. 6. arguments:参数
  28. */
  29. String exchangeName = "test_direct";
  30. //5. 创建交换机
  31. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
  32. //6. 创建队列
  33. String queueName1 = "test_direct_queue1";
  34. String queueName2 = "test_direct_queue2";
  35. channel.queueDeclare(queueName1,true,false,false,null);
  36. channel.queueDeclare(queueName2,true,false,false,null);
  37. //7. 绑定队列和交换机
  38. /*
  39. queueBind(String queue, String exchange, String routingKey)
  40. 参数:
  41. 1. queue:队列名称
  42. 2. exchange:交换机名称
  43. 3. routingKey:路由键,绑定规则
  44. 如果交换机的类型为fanout ,routingKey设置为""
  45. */
  46. //队列1绑定 error
  47. channel.queueBind(queueName1,exchangeName,"error");
  48. //队列2绑定 info error warning
  49. channel.queueBind(queueName2,exchangeName,"info");
  50. channel.queueBind(queueName2,exchangeName,"error");
  51. channel.queueBind(queueName2,exchangeName,"warning");
  52. String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
  53. //8. 发送消息
  54. channel.basicPublish(exchangeName,"info",null,body.getBytes());
  55. //9. 释放资源
  56. channel.close();
  57. connection.close();
  58. }
  59. }

 

2、创建消费者:这里要创建两个消费者才能看到效果

  1. public class ConsumerRouting1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");
  7. factory.setPort(5672);
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. //3. 创建连接 Connection
  11. Connection connection = factory.newConnection();
  12. //4. 创建Channel
  13. Channel channel = connection.createChannel();
  14. //队列名称
  15. String queueName1 = "test_direct_queue1";
  16. String queueName2 = "test_direct_queue2";
  17. /*
  18. basicConsume(String queue, boolean autoAck, Consumer callback)
  19. 参数:
  20. 1. queue:队列名称
  21. 2. autoAck:是否自动确认
  22. 3. callback:回调对象
  23. */
  24. // 接收消息
  25. Consumer consumer = new DefaultConsumer(channel){
  26. /*
  27. 回调方法,当收到消息后,会自动执行该方法
  28. 1. consumerTag:标识
  29. 2. envelope:获取一些信息,交换机,路由key...
  30. 3. properties:配置信息
  31. 4. body:数据
  32. */
  33. @Override
  34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  35. System.out.println("body:"+new String(body));
  36. System.out.println("将日志信息打印到控制台.....");
  37. }
  38. };
  39. channel.basicConsume(queueName1,true,consumer);
  40. //关闭资源?不要
  41. }
  42. }

 

5、Topics 通配符模式

Topic主题模式可以实现 Pub/Sub 发布与订阅模式和Routing 路由模式的功能,只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。

1、创建生产者

  1. public class ProducerTopics {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");//ip 默认值 localhost
  7. factory.setPort(5672); //端口 默认值 5672
  8. //factory.setVirtualHost("/itcast");//虚拟机 默认值/
  9. factory.setUsername("admin");//用户名 默认 guest
  10. factory.setPassword("admin");//密码 默认值 guest
  11. //3. 创建连接 Connection
  12. Connection connection = factory.newConnection();
  13. //4. 创建Channel
  14. Channel channel = connection.createChannel();
  15. /*
  16. exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
  17. 参数:
  18. 1. exchange:交换机名称
  19. 2. type:交换机类型
  20. DIRECT("direct"),:定向
  21. FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
  22. TOPIC("topic"),通配符的方式
  23. HEADERS("headers");参数匹配
  24. 3. durable:是否持久化
  25. 4. autoDelete:自动删除
  26. 5. internal:内部使用。 一般false
  27. 6. arguments:参数
  28. */
  29. String exchangeName = "test_topic";
  30. //5. 创建交换机
  31. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
  32. //6. 创建队列
  33. String queueName1 = "test_topic_queue1";
  34. String queueName2 = "test_topic_queue2";
  35. channel.queueDeclare(queueName1,true,false,false,null);
  36. channel.queueDeclare(queueName2,true,false,false,null);
  37. //7. 绑定队列和交换机
  38. /*
  39. queueBind(String queue, String exchange, String routingKey)
  40. 参数:
  41. 1. queue:队列名称
  42. 2. exchange:交换机名称
  43. 3. routingKey:路由键,绑定规则
  44. 如果交换机的类型为fanout ,routingKey设置为""
  45. */
  46. // routing key 系统的名称.日志的级别。
  47. //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
  48. channel.queueBind(queueName1,exchangeName,"#.error");
  49. channel.queueBind(queueName1,exchangeName,"order.*");
  50. channel.queueBind(queueName2,exchangeName,"*.*");
  51. String body = "日志信息:张三调用了findAll方法...日志级别:info...";
  52. //8. 发送消息
  53. channel.basicPublish(exchangeName,"order.error",null,body.getBytes());
  54. //9. 释放资源
  55. channel.close();
  56. connection.close();
  57. }
  58. }

2、创建消费者:这里要创建两个消费者才能看到效果

  1. public class ConsumerTopic1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. //2. 设置参数
  6. factory.setHost("192.168.138.129");
  7. factory.setPort(5672);
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. //3. 创建连接 Connection
  11. Connection connection = factory.newConnection();
  12. //4. 创建Channel
  13. Channel channel = connection.createChannel();
  14. //队列名称
  15. String queueName1 = "test_topic_queue1";
  16. String queueName2 = "test_topic_queue2";
  17. /*
  18. basicConsume(String queue, boolean autoAck, Consumer callback)
  19. 参数:
  20. 1. queue:队列名称
  21. 2. autoAck:是否自动确认
  22. 3. callback:回调对象
  23. */
  24. // 接收消息
  25. Consumer consumer = new DefaultConsumer(channel){
  26. /*
  27. 回调方法,当收到消息后,会自动执行该方法
  28. 1. consumerTag:标识
  29. 2. envelope:获取一些信息,交换机,路由key...
  30. 3. properties:配置信息
  31. 4. body:数据
  32. */
  33. @Override
  34. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  35. System.out.println("body:"+new String(body));
  36. System.out.println("将日志信息存入数据库.......");
  37. }
  38. };
  39. channel.basicConsume(queueName1,true,consumer);
  40. //关闭资源?不要
  41. }
  42. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/515794
推荐阅读
相关标签
  

闽ICP备14008679号