当前位置:   article > 正文

Kafka,RabbitMQ,RockedMQ实际应用大汇总2.RabbitMQ,RocketMQ_rocketmq、rabbitmq使用

rocketmq、rabbitmq使用
Kafka,RabbitMQ,RockedMQ实际应用大汇总2.RocketMQ&RabbitMQ
  • 本文结合官网用例,记载了三大主流mq的实例以及实际运用。
  • 本文不涉及相关环境的安装与配置,涉及较为全面的代码(包括配置文件及maven)
  • 本文直接上代码及用例,适合对mq已经学过一遍或了解过的同学进行学习&复习,可加入生产环境。
  • 关于各种mq的介绍及对比可以参照我之前的文章
  • 其实mq的设计思想都差不多,可以细细感受一下mq的设计理念以及基本的服务对象。
  • 如果有问题,欢迎留言区或私信进行交流。
  • 虽然已经挑重点代码拿出来了,但是一篇文章里写三个mq还是很长,因此还是分为三篇记录吧,这是第二篇,主讲RabbitMQ,RocketMQ。

二:RocketMQ

推荐阅读官方example,讲的非常非常详细了,按照同步异步订阅等等等类别都分好了,阅读example的体验感也很好。
官方example

三:RabbitMQ

官方example同样给的非常详细,但有些并不常用。但官网上的图绝对是学习与理解的利器。
官方example

  • 路由模式(工作队列)
    • (消费者)监听者
      在这里插入图片描述
      在这里插入图片描述
package net.xdclass.direct;

import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;

public class Recv{

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);

        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();

        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");



        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • (生产者)消息的发送端
package net.xdclass.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String error = "我是订单服务的error日志";
            String info = "我是订单服务的info日志";
            String debug = "我是订单服务的debug日志";

            channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("发送成功");

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 订阅发布模式
    在这里插入图片描述
  • 订阅者(消息消费者)
package net.xdclass.pub;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Recv{

   private final static String EXCHANGE_NAME = "exchange_fanout";

   public static void main(String[] argv) throws Exception {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("");
       factory.setUsername("admin");
       factory.setPassword("123456");
       factory.setVirtualHost("/dev");
       factory.setPort(5672);

       //消费者一般不增加自动关闭
       Connection connection = factory.newConnection();
       Channel channel = connection.createChannel();

       //绑定交换机,fanout扇形,即广播类型
       channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);

       //获取队列(排它队列)
       String queueName = channel.queueDeclare().getQueue();

       //绑定队列和交换机,fanout交换机不用指定routingkey
       channel.queueBind(queueName,EXCHANGE_NAME,"");


       DeliverCallback deliverCallback = (consumerTag, delivery) -> {
           String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
           System.out.println(" [x] Received '" + message + "'");
       };

       //自动确认消息
       channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 发布者(消息生产者)
package net.xdclass.pub;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机,fanout扇形,即广播类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            String message = "Hello World pub !";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 主题模式
    在这里插入图片描述
  • 消费者
package net.xdclass.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");



        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body=" + new String(body, StandardCharsets.UTF_8));

                //手工确认消息消费,不是多条确认
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };

        //消费,关闭消息自动确认
        channel.basicConsume(queueName,false,consumer);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 生产者
package net.xdclass.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("admin");
        factory.setPassword("123456");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {

            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String error = "我是订单服务的error日志";
            String info = "我是订单服务的info日志";
            String debug = "我是订单服务的debug日志";

            channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("topic发送成功");



        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词
一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order,
 而 *.order ,只会匹配 info.order, 之间是使用. 点进行分割多个词的; 
 如果是 ., 则info.order、error.order都会匹配
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/729719
推荐阅读
相关标签
  

闽ICP备14008679号