当前位置:   article > 正文

【RabbitMQ】RabbbitMQ的六种工作模式以及代码实现_rabbmitmq工作模式java 代码

rabbmitmq工作模式java 代码

目录

一、交换机类型

二、简单模式

1、介绍

2、代码实现 

三、Work Queues工作队列模式

1、介绍

2、代码实现

四、Pub/Sub订阅模式

1、介绍

2、代码实现

五、Routing路由模式

1、介绍

2、代码实现

六、Topics通配符模式

1、介绍

2、代码实现


一、交换机类型

在之前的文章里我们了解了什么是交换机,当有生产者把消息发给MQ服务器时,消息会通过交换机分发给不同的消息队列里,那么交换机是如何分发消息的,我们就需要了解交换机的几种类型

类型说明
fanout广播类型,交换机会把收到的消息分发给每一与他绑定的队列
direct定向类型,交换机会根据路由将消息分发给与它绑定的指定的匹配队列
topic通配符类型,交换机会根据通配符匹配后分发给与他绑定的指定的匹配队列

二、简单模式

1、介绍

RabbitMQ的简单模式是一对一即,一个生产者生产消息后不经交换机直接给指定的队列供消费者消费

2、代码实现 

他的不需要交换机进行分发,所以我们代码实现整体过程为

生产者:建立连接-》创建队列-》构造发送消息-》关闭连接

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Producer {
  7. private static final String QUEUE = "easyQueue";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 1.创建连接工厂 配置连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("127.0.0.1");
  12. factory.setPort(5672);
  13. factory.setVirtualHost("/DemoVirtualHost");
  14. factory.setUsername("guest");
  15. factory.setPassword("guest");
  16. // 2.建立连接 获取连接通道
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 3.创建队列
  20. channel.queueDeclare(QUEUE,true,false,false,null);
  21. // 4.发送消息
  22. String body = "hello mq";
  23. channel.basicPublish("",QUEUE,null,body.getBytes());
  24. // 5.关闭连接
  25. channel.close();
  26. connection.close();
  27. }
  28. }

运行代码即可发送消息到服务器 

消费者:建立连接监听队列获取消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5. private static final String QUEUE = "easyQueue";
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. // 1.创建连接工厂 配置连接工厂
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("127.0.0.1");
  10. factory.setPort(5672);
  11. factory.setVirtualHost("/DemoVirtualHost");
  12. factory.setUsername("guest");
  13. factory.setPassword("guest");
  14. // 2.建立连接
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel();
  17. // 3.监听队列获取消息
  18. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("消费者消费消息:" + new String(body));
  22. }
  23. };
  24. channel.basicConsume(QUEUE,true,consumer);
  25. }
  26. }

 运行代码

完成消费,这就是RabbitMQ工作模式的简单模式,他不需要交换机来去对消息进行分发 

三、Work Queues工作队列模式

1、介绍

工作队列模式相比简单模式,他的处理任务速度在一定情况下会更快,因为他相比简单模式单一消费者而言它增加了消费者个数

2、代码实现

它同样也不需要交换机对收到的消息进行分发,代码实现与简单模式基本相同,唯一不同点是他相比简单模式多了一个消费者

生产者

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Producer {
  7. private static final String QUEUE = "workQueue";
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. // 1.创建连接工厂 配置连接工厂
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("127.0.0.1");
  12. factory.setPort(5672);
  13. factory.setVirtualHost("/DemoVirtualHost");
  14. factory.setUsername("guest");
  15. factory.setPassword("guest");
  16. // 2.建立连接 获取连接通道
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 3.创建队列
  20. channel.queueDeclare(QUEUE,true,false,false,null);
  21. // 4.发送消息
  22. String body = "hello mq";
  23. channel.basicPublish("",QUEUE,null,body.getBytes());
  24. // 5.关闭连接
  25. channel.close();
  26. connection.close();
  27. }
  28. }

消费者1

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5. private static final String QUEUE = "workQueue";
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. // 1.创建连接工厂 配置连接工厂
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("127.0.0.1");
  10. factory.setPort(5672);
  11. factory.setVirtualHost("/DemoVirtualHost");
  12. factory.setUsername("guest");
  13. factory.setPassword("guest");
  14. // 2.建立连接
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel();
  17. // 3.监听队列获取消息
  18. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("消费者消费消息:" + new String(body));
  22. }
  23. };
  24. channel.basicConsume(QUEUE,true,consumer);
  25. }
  26. }

 消费者2:

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5. private static final String QUEUE = "workQueue";
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. // 1.创建连接工厂 配置连接工厂
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("127.0.0.1");
  10. factory.setPort(5672);
  11. factory.setVirtualHost("/DemoVirtualHost");
  12. factory.setUsername("guest");
  13. factory.setPassword("guest");
  14. // 2.建立连接
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel();
  17. // 3.监听队列获取消息
  18. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("消费者消费消息:" + new String(body));
  22. }
  23. };
  24. channel.basicConsume(QUEUE,true,consumer);
  25. }
  26. }

四、Pub/Sub订阅模式

1、介绍

他相比之前两种模式引入了交换机,当MQ服务器收到消息后,交换机会把收到的消息分发给每一个绑定了该交换机的队列给不同的消费者进行消费,之前的文章里我们是通过控制台进行队列与交换机的绑定,这里我们会使用Java代码来进行绑定

 比如当用户下单后我们需要去给用户通过邮箱以及短信发送回购优惠券信息时,我们就可以通过该模式,分别创建邮箱队列以及短信队列,然后生产者将用户信息发给MQ服务器经过交换机进入邮箱与短信队列让不同的消费者去消费通知。

2、代码实现

与前两种模式不同的时,我们在实现该模式的生产者时建立连接后需要去创建交换机,由于交换机不同的类型其工作模式不同,所以我们创建交换机时 需要去指定他的类型,创建完成后我们还需要去将邮箱队列与短信队列与该交换机进行绑定

创建交换机需要使用该方法exchangeDeclare

参数说明
exchange交换机名
type交换机类型
durable是否持久化到内存,一般为true
autoDelete是否自动删除,一般为false
internal内部使用参数,一般为false
arguments参数信息,一般为null

队列绑定交换机需要用到这个queueBind(队列名,交换机名,路由名--如果交换机类型为fanout则为空字符串)方法

生产者:

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. private static final String EXCHANGE_NAME = "pubSubExchange";
  9. private static final String EMAIL_QUEUE = "email";
  10. private static final String SMS_QUEUE = "sms";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. // 1.创建连接工厂 配置连接工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. factory.setHost("127.0.0.1");
  15. factory.setPort(5672);
  16. factory.setVirtualHost("/DemoVirtualHost");
  17. factory.setUsername("guest");
  18. factory.setPassword("guest");
  19. // 2.建立连接
  20. Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel();
  22. // 3.创建交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true,false,false,null);
  24. // 4.创建队列
  25. channel.queueDeclare(EMAIL_QUEUE,true,false,false,null);
  26. channel.queueDeclare(SMS_QUEUE,true,false,false,null);
  27. // 5.绑定交换机
  28. channel.queueBind(EMAIL_QUEUE,EXCHANGE_NAME,"");
  29. channel.queueBind(SMS_QUEUE,EXCHANGE_NAME,"");
  30. // 6.发送信息
  31. channel.basicPublish(EXCHANGE_NAME,"",null,"hello".getBytes());
  32. // 7.关闭连接
  33. channel.close();
  34. connection.close();
  35. }
  36. }

运行查看服务器 

 此时两个消费者只需修改监听的队列即可

email消费者:

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class EmailConsumer {
  5. private static final String EMAIL_QUEUE = "email";
  6. private static final String SMS_QUEUE = "sms";
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. // 1.创建连接工厂 配置工厂
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("127.0.0.1");
  11. factory.setPort(5672);
  12. factory.setVirtualHost("/DemoVirtualHost");
  13. factory.setUsername("guest");
  14. factory.setPassword("guest");
  15. // 2.建立连接
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. // 3.监听队列获取消息
  19. Consumer consumer = new DefaultConsumer(channel) {
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. System.out.println("邮件已发送" + new String(body));
  23. }
  24. };
  25. channel.basicConsume(EMAIL_QUEUE,true,consumer);
  26. }
  27. }

运行代码 

sms消费者:

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class EmailConsumer {
  5. private static final String EMAIL_QUEUE = "email";
  6. private static final String SMS_QUEUE = "sms";
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. // 1.创建连接工厂 配置工厂
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("127.0.0.1");
  11. factory.setPort(5672);
  12. factory.setVirtualHost("/DemoVirtualHost");
  13. factory.setUsername("guest");
  14. factory.setPassword("guest");
  15. // 2.建立连接
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. // 3.监听队列获取消息
  19. Consumer consumer = new DefaultConsumer(channel) {
  20. @Override
  21. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22. System.out.println("已发送" + new String(body));
  23. }
  24. };
  25. channel.basicConsume(SMS_QUEUE,true,consumer);
  26. }
  27. }

五、Routing路由模式

1、介绍

上述用户下单发送回购优惠券需求时,如果有的用户他注册时没有设置邮箱或者电话,我们不需要去给没有的邮箱发送,此时使用订阅模式是不可取的,使用路由模式是可行的,那么为什么路由模式可以呢,让我们来看。路由模式与订阅模式相同的是都需要使用交换机,不同的是当收到生产者的消息时,订阅模式会分发给每一个与之绑定的队列,而路由模式则不同,他会按照消息指定的路由去匹配到相应的队列从而发送给指定的队列

2、代码实现

由上述路由模式图,我们在发送消息前,需要先创建一个交换机且路由模式交换机类型不在是fanout而是direct路由模式,还需要去创建队列,然后绑定交换机,在绑定交换机的时候我们需要去给队列设置对应的路由,且在发送消息时指定路由

生产者:

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. private static final String EXCHANGE_NAME = "rout-ex";
  9. private static final String EMAIL_QUEUE = "email";
  10. private static final String SMS_EMAIL = "sms";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. // 1.创建连接工厂 配置工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. factory.setHost("127.0.0.1");
  15. factory.setPort(5672);
  16. factory.setVirtualHost("/DemoVirtualHost");
  17. factory.setUsername("guest");
  18. factory.setPassword("guest");
  19. // 2.建立连接
  20. Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel();
  22. // 3.创建交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
  24. // 4.创建队列
  25. channel.queueDeclare(EMAIL_QUEUE,true,false,false,null);
  26. channel.queueDeclare(SMS_EMAIL,true,false,false,null);
  27. // 5.绑定交换机
  28. channel.queueBind(EMAIL_QUEUE,EXCHANGE_NAME,"email");
  29. channel.queueBind(SMS_EMAIL,EXCHANGE_NAME,"sms");
  30. // 6.发送消息
  31. channel.basicPublish(EXCHANGE_NAME,"email",null,"routing".getBytes());
  32. // 7.关闭连接
  33. channel.close();
  34. connection.close();
  35. }
  36. }

发送消息指定了路由为email,此时我们运行代码查看服务器

消费者:

修改监听队列名即可

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. // 1.获取连接工厂
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. // 2.设置参数:队列、交换机、虚拟机、端口、IP
  9. connectionFactory.setHost("127.0.0.1");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setVirtualHost("/DemoVirtualHost");
  12. connectionFactory.setUsername("1886");
  13. connectionFactory.setPassword("1886");
  14. // 3.建立连接 Connection ---- channel
  15. Connection connection = connectionFactory.newConnection();
  16. Channel channel = connection.createChannel();
  17. // 4.获取消息
  18. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("email消费");
  22. }
  23. };
  24. channel.basicConsume("email",true,consumer);
  25. }
  26. }

 

六、Topics通配符模式

1、介绍

上述路由模式我们知道了消息可以指定路由进而到达指定的队列,但路由只有一个所以该消息只能进入一个队列,而后续如果有一个消息需要进入几个队列的需求时订阅模式与路由模式就不能完成,此时我们可以使用通配符模式进行匹配,而该模式的交换机类型为通配符类型的topic。

通配符有两种

种类介绍
*(system.*)*指的是匹配一个单词,(system.*)是指路由前缀为system.且后面有一个单词即可
#(#.error)#指的是匹配0个或多个单词,(#.error)是指路由前面没有或有若干单词,后面为.error

如果此时服务器有两个队列一个为存放所有分布式系统的error日志,一个只用于存放A系统的所有等级日志 ,此时前者路由配置为#.error,后者可为A.*

2、代码实现

生产者:

我们实现上述业务时可对队列进行上述路由配置,此时我们将消息路由设置为B系统下的error日志:B.error,该消息就会进入error队列

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. private static final String EXCHANGE_NAME = "topic-ex";
  9. private static final String ERROR_QUEUE = "error";
  10. private static final String SYSTEM_LOG = "a-system-logging";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. // 1.创建连接工厂 配置工厂
  13. ConnectionFactory factory = new ConnectionFactory();
  14. factory.setHost("127.0.0.1");
  15. factory.setPort(5672);
  16. factory.setVirtualHost("/DemoVirtualHost");
  17. factory.setUsername("guest");
  18. factory.setPassword("guest");
  19. // 2.建立连接
  20. Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel();
  22. // 3.创建交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true,false,false,null);
  24. // 4.创建队列
  25. channel.queueDeclare(ERROR_QUEUE,true,false,false,null);
  26. channel.queueDeclare(SYSTEM_LOG,true,false,false,null);
  27. // 5.绑定交换机
  28. channel.queueBind(ERROR_QUEUE,EXCHANGE_NAME,"#.error"); // 所有系统的error日志
  29. channel.queueBind(SYSTEM_LOG,EXCHANGE_NAME,"A.*"); // A系统的所有日志
  30. // 6.发送消息
  31. channel.basicPublish(EXCHANGE_NAME,"B.error",null,"b系统的error日志".getBytes());
  32. // 7.关闭连接
  33. channel.close();
  34. connection.close();
  35. }
  36. }

消费者:

消费者只需要修改监听的队列名即可

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. // 1.创建连接工厂 配置工厂
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("127.0.0.1");
  9. factory.setPort(5672);
  10. factory.setVirtualHost("/DemoVirtualHost");
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. // 2.建立连接
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 3.监听队列获取消息
  17. com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("处理日志" + new String(body));
  21. }
  22. };
  23. channel.basicConsume("error",true,consumer);
  24. }
  25. }

 运行代码即可查看

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/598195
推荐阅读
相关标签
  

闽ICP备14008679号