当前位置:   article > 正文

RabbitMQ-Java-01-简单队列_rabbitmq java 简单队列

rabbitmq java 简单队列

说明

  • RabbitMQ-Java-01-简单队列
  • 本案例是一个Maven项目
  • 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/

操作步骤

》安装RabbitMQ

  • 可自行安装,也可参考我的相关教程(CentOS7离线安装RabbitMq),本章假设你已经安装好了RabbitMQ。

》搭建环境

  • idea创建一个空项目
  • 创建一个Maven管理的module
  • pom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)
    <!-- 指定JDK编译版本 -->
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>8</source>
            <target>8</target>
        </configuration>
    </plugin>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • pom.xml添加依赖:RabbitMQ相关
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.1</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.11.0</version>
    </dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

》简单案例

  • 说明
    • 我将代码分成三部分:初始化、消费者、生产者。多一层拆分思路更清晰明朗便于理解。
  • 代码组成
    • 初始化类:Initialization
      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      /*
       * 初始化
       *  作用:
       *      创建交换机
       *      创建队列
       *      绑定队列、交换机、路由key
       */
      public class Initialization {
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 获取连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
      
              connection.close();
      
              System.out.println("初始化成功。。。");
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
    • 消费者类:Consumer
      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.*;
      
      /*
       * 消费者
       *  消费者消费消息流程
       *      建立连接(connection)
       *      获取信道(channel)
       *      消费队列中的消息,自动应答
       *          接收消息回调方法
       *          拒绝消息回调方法
       */
      public class Consumer {
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 创建连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 消费队列中的消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          // 接收消息回调方法
          public static DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
              System.out.println(" [*] 成功处理消息:" + new String(message.getBody()));
          };
      
          // 拒绝消息回调方法
          public static CancelCallback cancelCallback = (String consumerTag) -> {
              System.out.println("消费消息失败");
          };
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
    • 生产者类:Producer
      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      /*
       * 生产者
       *  发送消息流程:
       *      建立连接(connection)
       *      获取信道(channel)
       *      通过信道将消息发送到指定交换机(exchange),并绑定路由key(routingKey),路由key可以是多个
       *  注意:
       *      生产者不需要关心队列(queue)
       *      生产者发送消息前需要准备好:
       *          创建相关交换机
       *          创建相关队列
       *          绑定队列、交换机、路由key
       */
      public class Producer {
      
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置信息
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 创建连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 发送消息
              channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello.".getBytes());
      
              connection.close();
      
              System.out.println("消息发送成功。。。");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
  • 运行初始化:Initialization -> main -> run
  • 运行消费者:Consumer -> main -> run
  • 运行生产者:Producer -> main -> run

备注

  • 该教程部分内容收集自网络,感谢原作者。

附录

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/906977
推荐阅读
相关标签
  

闽ICP备14008679号