当前位置:   article > 正文

记录rabbitmq工作模式之工作队列模式(work queues)_rabbitmq应用工作队列work queues

rabbitmq应用工作队列work queues

1、应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2、实现过程
1)首先建立消息的发布者

 public class producer {
//队列名称
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
    //通过连接工程创建新连接和mq连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    //设置mq端口
    factory.setPort(5672);
    //设置用户名
    factory.setUsername("guest");
    //设置密码
    factory.setPassword("guest");
    //设置虚拟机  一个mq服务可以设置多个虚拟机 一个虚拟机相当于一个mq服务
    factory.setVirtualHost("/");
    Connection connection = null;
    Channel channel = null;
    try {
        //建立连接
        connection = factory.newConnection();
        //设置创建会话通道  所有通信都在通道中完成
         channel = connection.createChannel();
        /**
         * 声明队列 如果没有就创建
         * 队列参数 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * 1、queue 队列名称
         * 2、durable 是否持久化  如果是  mq关闭后重启队列还在
         * 3、exclusive 是否独占 只能在该链接中访问 链接关闭自动删除  设为true可以实现临时队列
         * 4、autoDelete 自动删除 和exclusive一样  设为true可以实现临时队列
         * 5、arguments 队列参数  可以设置扩展参数  比如可以设置存活时间
         *
         */
        channel.queueDeclare(QUEUE,true,false,false,null);
        /**
         * 发送消息
         * 参数 String exchange, String routingKey, BasicProperties props, byte[] body
         * 1、exchange 交换机 如果不指定则使用默认交换机(设置为 "")
         * 2、routingKey 路由key  交换机使用路由kye将消息转发给指定消息队列 如果使用默认交换机 则路由key设置为交换机名称
         * 3、props 消息属性
         *
         */
        String message = "测试生成发送消息--hello";
        channel.basicPublish("",QUEUE,null,message.getBytes());
        System.out.print(message);
    }catch (Exception e){
        e.printStackTrace();
    }finally {
       //先关闭通道 再关连接
       try {
           channel.close();
       }catch (Exception e){
           e.printStackTrace();
       }
       try {
           connection.close();
       }catch (Exception e){
           e.printStackTrace();
       }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

}

2)设置消息的消费者

public class Consumer {
     private static final String QUEUE = "helloWorld";
     public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setVirtualHost("/");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    /**
     * 消费者接收消息调用此方法
     */
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        /**
         *
         * @param consumerTag  消费者标签标签 可以不设 可以在监听队列时设置
         * @param envelope     信封
         * @param properties   消息属性
         * @param body         消息内容
         * @throws IOException
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //交换机
            envelope.getExchange();
            //消息id mq在通道中(channel)标识消息id 可用于消息已接收
           long deliveryTag =  envelope.getDeliveryTag();
             //消息内容   编码格式设置utf-8
           String message = new String(body, "UTF-8");
           System.out.print(message);
        }
    };
    //监听队列
    //声明队列
    channel.queueDeclare(QUEUE,true,false,false,null);
    /**
     *参数
     * 1、string queue
     * 2、boolean autoAck
     * 3、Consumer callback
     */
    channel.basicConsume(QUEUE,true,defaultConsumer);
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

3、测试工作队列模式
在这里插入图片描述
程序测试:
1、启动多个消费者。
2、生产者发送多个消息。
总结:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/931809
推荐阅读
相关标签
  

闽ICP备14008679号