当前位置:   article > 正文

Rabbitmq的几种工作模式

Rabbitmq的几种工作模式

工具类

  1. public class RabbitMQConnection {
  2. public static Connection getConnection() throws Exception{
  3. //1.创建connectionFactory
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. //2.配置Host
  6. connectionFactory.setHost("127.0.0.1");
  7. //3.设置Port
  8. connectionFactory.setPort(5672);
  9. //4.设置账户和密码
  10. connectionFactory.setUsername("guest");
  11. connectionFactory.setPassword("guest");
  12. //5.设置VirtualHost
  13. connectionFactory.setVirtualHost("0517");
  14. return connectionFactory.newConnection();
  15. }
  16. }

点对点(简单)的队列

图解:

        

生产者代码
  1. public class Producer {
  2. private final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. //创建一个连接工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("127.0.0.1");
  7. factory.setUsername("guest");
  8. factory.setPassword("guest");
  9. //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
  10. try(Connection connection = factory.newConnection(); Channel channel =
  11. connection.createChannel()) {
  12. /**
  13. * 生成一个队列
  14. * 1.队列名称
  15. * 2.队列里面的消息是否持久化 默认消息存储在内存中
  16. * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
  17. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
  18. * 5.其他参数
  19. */
  20. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  21. String message="hello world";
  22. /**
  23. * 发送一个消息
  24. * 1.发送到那个交换机
  25. * 2.路由的 key 是哪个
  26. * 3.其他的参数信息
  27. * 4.发送消息的消息体
  28. */
  29. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  30. System.out.println("消息发送完毕");
  31. }
  32. }
  33. }
消费者代码
  1. public class Consumer {
  2. private final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("127.0.0.1");
  6. factory.setUsername("guest");
  7. factory.setPassword("guest");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. System.out.println("等待接收消息....");
  11. //推送的消息如何进行消费的接口回调
  12. DeliverCallback deliverCallback=(consumerTag, delivery)->{
  13. String message= new String(delivery.getBody());
  14. System.out.println(message);
  15. };
  16. //取消消费的一个回调接口 如在消费的时候队列被删除掉了
  17. CancelCallback cancelCallback=(consumerTag)->{
  18. System.out.println("消息消费被中断");
  19. };
  20. /**
  21. * 消费者消费消息
  22. * 1.消费哪个队列
  23. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
  24. * 3.消费者未成功消费的回调
  25. */
  26. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  27. }
  28. }
重点解析:点对点没什么可说的,就是生产者产生消息给到消息队列,第一次已推送的形式给到消费者,之后就是消费者端主动的拉取,需要在生产端创建好队列或在图形化页面创建好队列

工作(公平性)队列模式

图解:

       

生产者代码
  1. public class Task01 {
  2. private static final String QUEUE_NAME="hello";
  3. public static void main(String[] args) throws Exception {
  4. try(Channel channel=RabbitMqUtils.getChannel();) {
  5. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  6. //从控制台当中接受信息
  7. Scanner scanner = new Scanner(System.in);
  8. while (scanner.hasNext()){
  9. String message = scanner.next();
  10. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  11. System.out.println("发送消息完成:"+message);
  12. }
  13. }
  14. }
  15. }
消费者代码

消费者1:

  1. public class Consumer1 {
  2. private final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. //推送的消息如何进行消费的接口回调
  6. DeliverCallback deliverCallback=(consumerTag, delivery)->{
  7. String receivedMessage = new String(delivery.getBody());
  8. System.out.println("接收到消息:"+receivedMessage);
  9. };
  10. CancelCallback cancelCallback=(consumerTag)->{
  11. System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
  12. };
  13. System.out.println("C1 消费者启动等待消费......");
  14. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  15. }
  16. }

消费者2:

  1. public class Consumer2 {
  2. private final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. //推送的消息如何进行消费的接口回调
  6. DeliverCallback deliverCallback=(consumerTag, delivery)->{
  7. String receivedMessage = new String(delivery.getBody());
  8. System.out.println("接收到消息:"+receivedMessage);
  9. };
  10. CancelCallback cancelCallback=(consumerTag)->{
  11. System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
  12. };
  13. System.out.println("C2 消费者启动等待消费......");
  14. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  15. }
  16. }
重点解析
        工作(公平性)队列模式,和点对点差不多,就是生产者将消息直接存放到队列中,然后队列默认采用轮询的形式选择消费者进行消费

        当然也可以设置channel.basicQos(i)的形式进行公平分发(谁处理快,谁做的多)

        这里公平的意思是谁做的多,谁处理的多,并不是平均分配的意思

发布订阅模式

图解:

       

生产者代码
  1. public class ProducerFanout {
  2. //定义交换机名称
  3. private static final String EXCHANGE_NAME = "fanout_exchange";
  4. public static void main(String[] args) throws Exception{
  5. //创建连接
  6. Connection connection = RabbitMQConnection.getConnection();
  7. //创建通道
  8. Channel channel = connection.createChannel();
  9. //通道关联交换机(创建交换机)(fanout类型会自动创建)
  10. //channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
  11. //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
  12. String msg = "程子强你好";
  13. channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
  14. channel.close();
  15. connection.close();
  16. }
  17. }
消费者代码

消费者1:

  1. public class MailConsumer {
  2. /**
  3. * 定义邮件队列
  4. */
  5. private static final String QUEUE_NAME = "fanout_email_queue";
  6. /**
  7. * 定义交换机的名称
  8. */
  9. private static final String EXCHANGE_NAME = "fanout_exchange";
  10. public static void main(String[] args) throws Exception{
  11. System.out.println("邮件消费者...");
  12. // 创建我们的连接
  13. Connection connection = RabbitMQConnection.getConnection();
  14. // 创建我们通道
  15. final Channel channel = connection.createChannel();
  16. // 关联队列消费者关联队列
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  18. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. String msg = new String(body, "UTF-8");
  22. System.out.println("邮件消费者获取消息:" + msg);
  23. }
  24. };
  25. // 开始监听消息 自动签收
  26. channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
  27. }
  28. }

消费者2:

  1. public class SmsConsumer {
  2. /**
  3. * 定义短信队列
  4. */
  5. private static final String QUEUE_NAME = "fanout_email_sms";
  6. /**
  7. * 定义交换机的名称
  8. */
  9. private static final String EXCHANGE_NAME = "fanout_exchange";
  10. public static void main(String[] args) throws Exception{
  11. System.out.println("短信消费者...");
  12. // 创建我们的连接
  13. Connection connection = RabbitMQConnection.getConnection();
  14. // 创建我们通道
  15. final Channel channel = connection.createChannel();
  16. // 关联队列消费者关联队列
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  18. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. String msg = new String(body, "UTF-8");
  22. System.out.println("短信消费者获取消息:" + msg);
  23. }
  24. };
  25. // 开始监听消息 自动签收
  26. channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
  27. }
  28. }
重点解析
       发布订阅模式,和前两种模式不同这里用到了一个fanout类型的交换机(具体交换机的类型和概念小伙伴们可以自行查阅下,这里主要讲工作模式),生产者将消息发送给这个交换机,这个交换机把消息发送给每一个和其绑定的队列(注意fanout类型的交换机不需要key所以生产者传递直接传""就好)

路由模式Routing

图解:

       

生产者代码
  1. public class ReceiveLogsDirect {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] args) throws Exception {
  4. try (Connection connection = RabbitMQConnection.getConnection(); Channel channel =
  5. connection.createChannel()) {
  6. //创建交换机
  7. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
  8. //创建多个 bindingKey
  9. Map<String, String> bindingKeyMap = new HashMap<>();
  10. bindingKeyMap.put("info","普通 info 信息");
  11. bindingKeyMap.put("warning","警告 warning 信息");
  12. bindingKeyMap.put("error","错误 error 信息");
  13. //debug 没有消费这接收这个消息 所有就丢失了
  14. bindingKeyMap.put("debug","调试 debug 信息");
  15. for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
  16. String bindingKey = bindingKeyEntry.getKey();
  17. String message = bindingKeyEntry.getValue();
  18. channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
  19. message.getBytes("UTF-8"));
  20. System.out.println("生产者发出消息:" + message);
  21. }
  22. }
  23. }
  24. }
消费者代码

消费者1:

  1. public class ReceiveLogsDirect01 {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] args) throws Exception{
  4. Connection connection = RabbitMQConnection.getConnection();
  5. Channel channel = connection.createChannel();
  6. //写不写都可以,如果代码创建在生产端写,如果是浏览器创建,就不需要写这段代码
  7. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
  8. String queueName = "disk";
  9. channel.queueBind(queueName, EXCHANGE_NAME, "error");
  10. System.out.println("等待接收消息.....");
  11. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  12. String message = new String(delivery.getBody(), "UTF-8");
  13. message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
  14. File file = new File("E:\\xxx\\rabbitmq_info.txt");//路径任意写
  15. FileUtils.writeStringToFile(file,message,"UTF-8");
  16. System.out.println("错误日志已经接收");
  17. };
  18. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
  19. });
  20. }
  21. }

消费者2:

  1. public class ReceiveLogsDirect02 {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] argv) throws Exception {
  4. Connection connection = RabbitMQConnection.getConnection();
  5. Channel channel = connection.createChannel();
  6. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
  7. String queueName = "console";
  8. channel.queueBind(queueName, EXCHANGE_NAME, "info");
  9. channel.queueBind(queueName, EXCHANGE_NAME, "warning");
  10. System.out.println("等待接收消息.....");
  11. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  12. String message = new String(delivery.getBody(), "UTF-8");
  13. System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
  14. };
  15. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
  16. });
  17. }
  18. }
重点解析
       路由模式,用到了direct类型的交换机,简单讲就是队列通过key和交换机进行绑定,生产者那边传入的key和消息给交换机,如果该队列绑定的key与其传入的key相同则,交换机讲该消息传给对应的队列,一个队列可以绑定多个key

通配符模式Topics(主题)

图解:

       

生产者代码
  1. public class ProducerTopic {
  2. /**
  3. * 定义交换机的名称
  4. */
  5. private static final String EXCHANGE_NAME = "topic_exchange";
  6. public static void main(String[] args) throws Exception{
  7. // 创建Connection
  8. Connection connection = RabbitMQConnection.getConnection();
  9. // 创建Channel
  10. Channel channel = connection.createChannel();
  11. // 通道关联交换机
  12. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  13. String msg = "我是xxx";
  14. channel.basicPublish(EXCHANGE_NAME, "czq.hhh.aa", null, msg.getBytes());
  15. channel.close();
  16. connection.close();
  17. }
  18. }
消费者代码

消费者1:

  1. public class SmsConsumer {
  2. /**
  3. * 定义短信队列
  4. */
  5. private static final String QUEUE_NAME = "topic_sms_queue";
  6. /**
  7. * 定义交换机的名称
  8. */
  9. private static final String EXCHANGE_NAME = "topic_exchange";
  10. public static void main(String[] args) throws Exception{
  11. System.out.println("短信消费者...");
  12. // 创建Connection
  13. Connection connection = RabbitMQConnection.getConnection();
  14. // 创建Channel
  15. Channel channel = connection.createChannel();
  16. // 关联队列消费者关联队列
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "czq.#");
  18. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. String msg = new String(body, "UTF-8");
  22. System.out.println("短信消费者获取消息:" + msg);
  23. }
  24. };
  25. // 开始监听消息 自动签收
  26. channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
  27. }
  28. }

消费者2:

  1. public class MailConsumer {
  2. /**
  3. * 定义邮件队列
  4. */
  5. private static final String QUEUE_NAME = "topic_email_queue";
  6. /**
  7. * 定义交换机的名称
  8. */
  9. private static final String EXCHANGE_NAME = "topic_exchange";
  10. public static void main(String[] args) throws Exception{
  11. System.out.println("邮件消费者...");
  12. // 创建Connection
  13. Connection connection = RabbitMQConnection.getConnection();
  14. // 创建Channel
  15. Channel channel = connection.createChannel();
  16. // 关联队列消费者关联队列
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.boyatop.#");
  18. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. String msg = new String(body, "UTF-8");
  22. System.out.println("邮件消费者获取消息:" + msg);
  23. }
  24. };
  25. // 开始监听消息 自动签收
  26. channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
  27. }
  28. }
重点解析
       通配符模式,用到了topic类型的交换机,简单讲与通配符模式原理大致,差别在于根据队列绑定的路由建模糊转发到具体的队列中存放。其中#号表示支持匹配多个词;*号表示只能匹配一个词,假如同一个队列与交换机直接设置的多个模糊的key都符合传入的,那么也只传送一次
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/928297
推荐阅读
相关标签
  

闽ICP备14008679号