当前位置:   article > 正文

Java RabbitMQ消息队列简单使用_java mq消息队列

java mq消息队列

一、消息队列

什么是消息队列

消息队列,即MQ,Message Queue。

消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

二、RabbitMQ

RabbitMQ是基于AMQP的一款消息管理系统。

支持主流的操作系统,Linux、Windows、MacOX等。

支持多种开发语言,Java、Python、Ruby、.NET、PHP、C/C++、node.js等。

官网: Messaging that just works — RabbitMQ

官方教程:RabbitMQ Tutorials — RabbitMQ

RabbitMQ 基本概念

Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Connection
网络连接,比如一个TCP连接。无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费。

Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker
表示消息队列服务器实体。

 三、简单消息模型使用

RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

P(producer/ publisher):生产者,一个发送消息的用户应用程序。

C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

代码演示

引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>2.0.6.RELEASE</version>
  5. </dependency>

 获取连接

  1. package com.util;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class ConnectionUtil {
  5. /**
  6. * 建立与RabbitMQ的连接
  7. * @return
  8. * @throws Exception
  9. */
  10. public static Connection getConnection() throws Exception {
  11. //定义连接工厂
  12. ConnectionFactory factory = new ConnectionFactory();
  13. //设置服务地址
  14. factory.setHost("192.168.33.88");
  15. //TCP端口
  16. factory.setPort(5672);
  17. //设置账号信息,用户名、密码、vhost
  18. factory.setVirtualHost("vhost_NetDataGather");
  19. factory.setUsername("admin");
  20. factory.setPassword("123456");
  21. // 通过工程获取连接
  22. Connection connection = factory.newConnection();
  23. return connection;
  24. }
  25. }

生产者

  1. import com.util.ConnectionUtil;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. /**
  5. * 生产者
  6. */
  7. public class Send {
  8. //声明队列名称
  9. private final static String QUEUE_NAME = "simple_queue";
  10. public static void main(String[] argv) throws Exception {
  11. // 生产者和Broker建立TCP连接。
  12. Connection connection = ConnectionUtil.getConnection();
  13. // 生产者和Broker建立通道。
  14. Channel channel = connection.createChannel();
  15. // 声明(创建)队列
  16. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17. // 消息内容
  18. String message = "Hello World!";
  19. for (int i = 0; i < 10; i++) {
  20. // 向指定的队列中发送消息
  21. message=message+i;
  22. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  23. }
  24. //关闭通道和连接
  25. channel.close();
  26. connection.close();
  27. }
  28. }

消费者

  1. import java.io.IOException;
  2. import com.rabbitmq.client.AMQP.BasicProperties;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.DefaultConsumer;
  6. import com.rabbitmq.client.Envelope;
  7. import com.util.ConnectionUtil;
  8. /**
  9. * 消费者
  10. */
  11. public class Recv {
  12. private final static String QUEUE_NAME = "simple_queue";
  13. public static void main(String[] argv) throws Exception {
  14. // 获取到连接
  15. Connection connection = ConnectionUtil.getConnection();
  16. // 创建通道
  17. Channel channel = connection.createChannel();
  18. // 声明队列
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. // 定义队列的消费者
  21. DefaultConsumer consumer = new DefaultConsumer(channel) {
  22. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  23. @Override
  24. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  25. byte[] body) throws IOException {
  26. // body 即消息体
  27. String msg = new String(body);
  28. }
  29. };
  30. // 监听队列,第二个参数:是否自动进行消息确认。
  31. channel.basicConsume(QUEUE_NAME, true, consumer);
  32. }
  33. }

上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

自动ACK:消息一旦被接收,消费者自动发送ACK。

手动ACK:消息接收后,不会发送ACK,需要手动调用。

手动ACK

  1. import java.io.IOException;
  2. import com.rabbitmq.client.AMQP.BasicProperties;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.DefaultConsumer;
  6. import com.rabbitmq.client.Envelope;
  7. import com.util.ConnectionUtil;
  8. /**
  9. * 消费者,手动进行ACK
  10. */
  11. public class Recv2 {
  12. private final static String QUEUE_NAME = "simple_queue";
  13. public static void main(String[] argv) throws Exception {
  14. // 获取到连接
  15. Connection connection = ConnectionUtil.getConnection();
  16. // 创建通道
  17. final Channel channel = connection.createChannel();
  18. // 声明队列
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. // 定义队列的消费者
  21. DefaultConsumer consumer = new DefaultConsumer(channel) {
  22. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  23. @Override
  24. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  25. byte[] body) throws IOException {
  26. // body 即消息体
  27. String msg = new String(body);
  28. // 手动进行ACK
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. };
  32. // 监听队列,第二个参数false,手动进行ACK
  33. // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
  34. channel.basicConsume(QUEUE_NAME, false, consumer);
  35. }
  36. }

以上就是RabbitMQ的简单使用讲解,更多详细使用内容,请再多多学习。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/438974
推荐阅读
相关标签
  

闽ICP备14008679号