当前位置:   article > 正文

rabbitmq入门(一)简单队列(Hello World!、Work queues)_rabbitmq 简单队列

rabbitmq 简单队列


docker搭建rabbitm

mq官网

“Hello World!”:

官网教程
点对点,一个生产者,一个消费者,一个队列。
特点:

  • 没有交换机概念,生产者和消费者直接通过队列进行交流

在这里插入图片描述

1. mq创建一个队列
  • 安装完rabbitm直接访问 118.25.188.37:15672
  • 进入登入界面:默认密码都guest
  • 这里不创建队列也行,java中绑定队列如果队列没有会创建
    在这里插入图片描述
    在这里插入图片描述
    完成后点击add queue
    在这里插入图片描述
2. 创建生产者消费者
  1. 引入依赖
     dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>5.73</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 获取mq连接工具类(类似jdbc连接)
  • MQConnectionUtils
public class MQConnectionUtils {
    private static final String IP = "118.25.188.37";
    private static final Integer PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    public static Connection newConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost(IP);
        //设置端口号
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //创建连接
        Connection connection = factory.newConnection();
        return connection;

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  1. Producer 生产者
  • Producer
public class Producer {
	private static final String QUEUE_NAME = "mq";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.获取连接
		Connection newConnection = MQConnectionUtils.newConnection();
		// 2.创建通道
		Channel channel = newConnection.createChannel();
		// 3.创建队列声明
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String msg = "直接模式消息发送";
		System.out.println("生产者发送消息:" + msg);
		// 4.发送消息
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		channel.close();
		newConnection.close();
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

发送消息后mq会出现待消费的消息
在这里插入图片描述

  1. Customer 消费者
  • Customer
public class Customer {
	private static final String QUEUE_NAME = "mq";

	public static void main(String[] args) throws IOException, TimeoutException {
		// 1.获取连接
		Connection newConnection = MQConnectionUtils.newConnection();
		// 2.获取通道
		Channel channel = newConnection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msgString = new String(body, "UTF-8");
				System.out.println("消费者获取消息:" + msgString);
			}
		};
		// 3.监听队列  true表示自动应答,false表示手动应答
		channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

消费完后mq消息就没有了

工作队列 Work queues

在这里插入图片描述
与点对点不同的是,消费者由1个变成了两个,消费者集群了
我们这里启动两个消费者
在这里插入图片描述
在这里插入图片描述
然后发送10条消息
看看结果:
在这里插入图片描述

在这里插入图片描述
可以看到实现的是均摊消费

应答模式

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
这里第二个参数表示应答模式为true,表示自动签收

  • 自动应答:不会在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息,如果消息获取失败的情况,实现自动补偿
  • 手动应答:消费者处理完业务逻辑,手动返回一个ack(通知)告诉队列服务器是否删除该消息

这里我们将 应答模式设置为false
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
然后向消费者发送10个消息
在这里插入图片描述
可以看到消费者接收到了10个消息,但是我现在如果停止消费者
在这里插入图片描述
发现队列中还是有10个消息未消费,原因我我们没有手动返回ask
这里我们需要加上这个channel.basicAck(envelope.getDeliveryTag(), false);

public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws  Exception {
        System.out.println("消费者2启动");
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        /* 2.获取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

这样就表示消费者接受消息成功了
实现:添加如下代码channel.basicQos(1);

public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws  Exception {
        System.out.println("消费者2启动");
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        /* 2.获取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //表示一次只消费一个消息
        channel.basicQos(1);
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

公平队列

在上面我们消费者如果集群,消费者接受采用的均摊消费,但每个消费者处理业务时间不同,这样就不能让性能更好的消费者消费更多的消息(能者多劳)

  • 解决方案:消费者都采用应答模式实现公平队列,即谁消费快,消费的消息多
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/639630
推荐阅读
相关标签
  

闽ICP备14008679号