当前位置:   article > 正文

RabbitMQ初级篇:生产者与消费者关系、消息确认机制(ACK)、交换器与队列进行消息路由和存储_mq消费者和生产者

mq消费者和生产者

1. 生产者与消费者关系

RabbitMQ中,生产者(Producer)负责发送消息,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则负责处理接收到的这些消息。在RabbitMQ中,生产者和消费者之间使用交换器(Exchange)和队列(Queue)进行消息路由和存储。生产者将消息发送到交换器,交换器根据消息的路由键将其放入相应的队列中,最后消费者从队列中获取并处理这些消息。

2. 交换器与队列进行消息路由和存储

2.1 交换器与队列

交换器(Exchange)负责处理生产者发送的消息,并根据路由键(Routing Key)将消息分发到相应的队列(Queue)中。RabbitMQ中以下四种类型的交换器:

1. 直接交换器(Direct)

2. 分发交换器(Fanout)

3. 主题交换器(Topic)

4. 头部交换器(Headers)

队列是用于存储消息的数据结构,并为消费者准备好消息以进行消息消费。生产者发送的消息将被保存在一个或多个队列中,等待消费者进行消息处理。

2.2Java代码示例

以下是一个简单的Java代码示例,展示了如何在RabbitMQ中创建交换器、队列和绑定,并进行消息的发送与接收。

添加以下依赖到pom.xml文件

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.13.0</version>
  5. </dependency>

RabbitMqUtils代码:

  1. public class RabbitMqUtils {
  2. //得到一个连接的 channel
  3. public static Channel getChannel() throws Exception{
  4. // 创建工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. factory.setUsername("admin");
  8. factory.setPassword("admin");
  9. // 创建链接
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. return channel;
  13. }
  14. }

2.2 Java代码示例与注释

创建一个生产者ProducerDemo

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class ProducerDemo {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. // 创建链接
  9. Channel channel = RabbitMqUtils.getChannel();
  10. // 创建交换器
  11. String exchangeName = "test_exchange";
  12. // 1队列名称 2队列类型 3 是否持久化交换机 true是 false否
  13. channel.exchangeDeclare(exchangeName, "direct", true);
  14. // 创建队列
  15. String queueName = "test_queue";
  16. /**
  17. * 生成一个队列
  18. * 1.队列名称
  19. * 2.队列里面的消息是否持久化 默认消息存储在内存中
  20. * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
  21. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
  22. * 5.其他参数
  23. */
  24. channel.queueDeclare(queueName, true, false, false, null);
  25. // 绑定队列与交换器
  26. String routingKey = "test.routing.key";
  27. // 1 绑定的队列 2交换机名称 3路由key
  28. channel.queueBind(queueName, exchangeName, routingKey);
  29. // 发送消息
  30. String message = "Hello, RabbitMQ!";
  31. /**
  32. * 发送一个消息
  33. * 1.发送到那个交换机
  34. * 2.路由的 key 是哪个
  35. * 3.其他的参数信息
  36. * 4.发送消息的消息体
  37. */
  38. channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8"));
  39. // 关闭资源
  40. channel.close();
  41. // 关闭连接
  42. connection.close();
  43. }
  44. }

以上是在ProducerDemo类中,我们首先创建了一个交换器和一个队列。然后将队列与交换机进行绑定,并设置一个路由键。接着,我们将消息发送到交换器,并使用相同的路由键。

创建一个消费者ConsumerDemo

  1. import com.rabbitmq.client.AMQP.BasicProperties;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.DefaultConsumer;
  6. import com.rabbitmq.client.Envelope;
  7. import java.io.IOException;
  8. public class ConsumerDemo {
  9. public static void main(String[] args) throws IOException {
  10. // 创建链接
  11. Channel channel = RabbitMqUtils.getChannel();
  12. // 创建队列
  13. String queueName = "test_queue";
  14. /**
  15. * 生成一个队列
  16. * 1.队列名称
  17. * 2.队列里面的消息是否持久化 默认消息存储在内存中
  18. * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
  19. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
  20. * 5.其他参数
  21. */
  22. channel.queueDeclare(queueName, true, false, false, null);
  23. // 创建消费者
  24. DefaultConsumer consumer = new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  27. String message = new String(body, "UTF-8");
  28. System.out.println("Received: " + message);
  29. }
  30. };
  31. /**
  32. * 消费者消费消息
  33. * 1.消费哪个队列
  34. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
  35. * 3.消费者未成功消费的回调
  36. */
  37. channel.basicConsume(queueName, true, consumer);
  38. }
  39. }

2.3 交换器与队列总结

交换器与队列用于实现消息的路由和存储。在具体应用中,我们需要创建不同类型的交换器、队列,并使用路由键进行绑定。 Java代码示例展示了在RabbitMQ中如何进行简单的交换器、队列创建、绑定以及消息的发送和接收

3. 消息确认机制(ACK)

RabbitMQ中的消息确认机制,即ACK(Acknowledgement),是为了确保消息成功地从生产者传递到消费者。消费者处理完一个消息后,需要向RabbitMQ服务器发送一个ACK信号,告知服务器该消息已收到且处理完毕,允许服务器删除这个消息。根据确认时机的不同,ACK分为:

3.1 自动确认(Auto Ack)

自动确认是指,当消费者接收到消息后,会立即向服务器发送ACK信号,不管消息是否处理成功。自动确认的优点是速度快,但是可能导致消息丢失。如果在消费者处理消息过程中发生异常或宕机,由于已经发送了ACK信号,服务器将认为消息已被处理,从而导致消息丢失。

3.2 手动确认(Manual Ack)

手动确认是指,当消费者接收到消息后,可以选择在消息处理成功后再向服务器发送ACK信号。如果消费者处理消息过程中发生异常,可以选择不发送ACK信号,服务器将重新分发该消息给其他消费者。这样可以降低消息丢失的风险,但是相对于自动确认,速度较慢。

Java代码示例:

生产者代码:

  1. public class producer {
  2. public static final String TASK_QUEUE_NAME = "ack_queue";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. // 发布确认
  6. channel.confirmSelect();
  7. // 创建队列
  8. // 持久化
  9. boolean durable = true;
  10. /**
  11. * 生成一个队列
  12. * 1.队列名称
  13. * 2.队列里面的消息是否持久化 默认消息存储在内存中
  14. * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
  15. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
  16. * 5.其他参数
  17. */
  18. channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
  19. Scanner scanner = new Scanner(System.in);
  20. while (scanner.hasNext()){
  21. String message = scanner.next();
  22. // 设置生产发送消息为持久化消息(要求保存到磁盘上)
  23. /**
  24. * 发送一个消息
  25. * 1.发送到那个交换机
  26. * 2.路由的 key 是哪个
  27. * 3.其他的参数信息
  28. * 4.发送消息的消息体
  29. */
  30. channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
  31. System.out.println("生产者发送消息:"+message);
  32. }
  33. }
  34. }

消费者代码:

  1. package cn.example.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.DefaultConsumer;
  6. import com.rabbitmq.client.Envelope;
  7. import com.rabbitmq.client.AMQP.BasicProperties;
  8. import java.io.IOException;
  9. public class ConsumerDemo {
  10. public static void main(String[] args) throws IOException {
  11. Channel channel = RabbitMqUtils.getChannel();
  12. String queueName = "ack_queue";
  13. // 创建队列
  14. channel.queueDeclare(queueName, false, false, false, null);
  15. channel.basicQos(1);
  16. // 自动确认
  17. boolean autoAck = false;
  18. // 手动确认
  19. // boolean autoAck = true;
  20. // 创建一个消费者
  21. DefaultConsumer consumer = new DefaultConsumer(channel) {
  22. @Override
  23. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  24. String message = new String(body, "UTF-8");
  25. System.out.println("Received: " + message);
  26. try {
  27. Thread.sleep(2000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. if (!autoAck) {
  32. // 手动确认
  33. channel.basicAck(envelope.getDeliveryTag(), false);
  34. }
  35. }
  36. };
  37. /**
  38. * 消费者消费消息
  39. * 1.消费哪个队列
  40. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
  41. * 3.消费者未成功消费的回调
  42. */
  43. channel.basicConsume(queueName, autoAck, consumer);
  44. }
  45. }

3.3 消息确认机制总结

生产者和消费者通过交换器和队列进行消息的发送与接收。为了确保消息正常传递,RabbitMQ提供了消息确认机制。自动确认的速度较快,但存在消息丢失的风险;手动确认可以降低消息丢失风险,但速度较慢。开发者可根据实际应用场景选择合适的确认机制。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/399185
推荐阅读
相关标签
  

闽ICP备14008679号