赞
踩
周末的两天没有写文章,因为项目分离出来了一个权限管理平台,花了一点时间整理项目,同时完成了一些功能的开发。
今天这篇文章介绍一下RabbitMQ这个消息中间件,以及如何Java中使用RabbitMQ。
RabbitMQ是开发中常用的消息中间件之一,市面上使用较多的同类型产品还有Kafka和RocketMQ
三种消息队列的对比
消息中间件 | Kafka | RabbitMQ | RocketMQ |
优点 | kafka以其高吞吐著称,是三种消息队列中吞吐量最高的,同时性能也较高,集群高可用 | 消息可靠性高;功能全面;支持多种语言,跨语言; | 综合了Kafka和RabbitMQ的优点;高吞吐、高性能、高可用、功能全面 |
缺点 | 可能会丢失数据,功能比较简单 | 吞吐量低;因为是erlang语言开发的,底层代码修改困难;消息容易积累,影响性能; | 官方文档比较简单;客户端只支持Java |
适用场景 | 主要用于日志分析和大数据采集 | 主要用于并发量较低的场景 | 几乎所有场景都适用 |
目录
浏览器访问RabbitMQ的官网RabbitMQ官网
我们点击上方的菜单Features查看RabbitMQ的主要特性
上面介绍了一下RabbitMQ的特性:
1、异步消息传递:
支持多种消息传递协议、消息队列、传递确认、灵活的队列路由、多种交换类型。
2、开发人员经验:
使用 Kubernetes、BOSH、Chef、Docker 和 Puppet 进行部署。使用最喜欢的编程语言开发跨语言消息传递,如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等: Java、.NET、PHP、Python、JavaScript、Ruby、Go 等。
3、分布式部署:
以集群形式部署,实现高可用性和高吞吐量;跨多个可用区和区域联合部署。
4、企业和云就绪:
可插入式身份验证和授权,支持 TLS 和 LDAP。轻量级,易于在公共云和私有云中部署。
5、工具和插件:
各种工具和插件支持持续集成、运营指标以及与其他企业系统的集成。灵活的插件方法可扩展 RabbitMQ 功能。
6、管理和监控
用于管理和监控 RabbitMQ 的 HTTPAPI、命令行工具和用户界面。
官网的英文看不懂?推荐全世界最准确的翻译:deepl,作为一个程序员,你不会还连这个都不知道吧?赶紧收藏一下网址~
简单的了解RabbitMQ的特性后,点击右上角的菜单Docs开始学习如何使用RabbitMQ吧。
点击红框内的红色字体下载链接 Downloads and Installation下载RabbitMQ;
点击下图中蓝框内的链接分别下载RabbitMQ和erlang,安装RabbitMQ之前需要先完成erlang的安装。
安装过程就不介绍了,一直下一步、下一步就行了。
安装完成后,可以点开我们的安装目录RabbitMQ Server看一下
有个sbin目录,存放的是一些命令工具,etc点开看一下,有一个readme
In this directory you can find an example configuration file for RabbitMQ.
Note that this directory is *not* where the real RabbitMQ configuration lives.
The default location for the real configuration file is %APPDATA%\RabbitMQ\rabbitmq.config.
%APPDATA% usually expands to C:\Users\%USERNAME%\AppData\Roaming or similar.
这个文件内容告诉了我们RabbitMQ的配置文件的位置
C:\Users\%USERNAME%\AppData\Roaming\RabbitMQ\rabbitmq.config
我们访问这个路径C:\Users\%USERNAME%\AppData\Roaming,发现这个目录下确实存放了一个RabbitMQ目录,目录下有一个advanced.config文件。
通过命令启动RabbitMQ,在这之前,需要把erlang的安装目录添加到系统的Path环境变量中。
然后进到RabbitMQ的安装目录下的sbin目录下,在地址栏输入cmd,打开命令窗口
rabbitmq-plugins enable rabbitmq_management
如图,就是启动成功了,注意,启动这个插件的同时rabbitmq也已经启动了
在经过上一个步骤之后,通过http://localhost:15672 访问RabbitMQ的控制台管理工具RabbitMQ Management,这里的用户名/密码都是guest
登陆进去之后,就能看到这样一个页面
接下来,详细介绍页面各个菜单的功能。
在RabbitMQ Management提供的页面中,可以进行一些操作,首先点击上方导航栏的Queues,来对基本的队列进行操作。
点击红框内的Add a new queue展开添加队列的表单
作为初学者,只需要填写队列名称即可,其他使用默认值即可。填完名称之后,点击【Add queue】按钮。如图创建了一个test队列。
添加之后,刚刚添加的test会显示在上面的Queues-All queues的表格中。
然后点击表格中的队列名,这是一个链接,点击之后会进入队列的详情页面。
如图,这就是点击队列名之后进入的队列详情页面,图片中已经标记了页面各个地方的功能。
点击左边的箭头展开,因为现在还没有消费者,所以没有显示。
如图,在这个部分可以绑定一个交换机,这里的交换机和现实的交换机类似,功能都是转发,只不过这里是转发消息到绑定的队列。
上面的图片中,可以输入交换机名称绑定,但是因为现在还没有创建交换机,所以暂时不做操作。
在这个部分,可以发布消息,第一个下拉框选择转发的消息类型,是否持久化,Headers和Properties可以点击右边的?查看具体的说明,在Payload后面的textarea中输入要发送的消息内容,最后点击Publish message。
如图,博主在上面的界面发送了一条Hello world!消息,点击Get Message(s)就会查询到发送的消息的详情,显示在下方。
在这个部分,可以把消息移动到指定的队列,输入目标队列名称,点击Move messages。
比如,我们创建一个队列hello,把test队列的消息移动到hello队列里。
移动消息
如图所示,消息已经从test队列移动到了hello队列
原来的test队列已经没有消息了
这个功能非常简单,点击删除按钮就可以删除当前的test队列了。
这个操作会清空队列中所有消息。
为了测试这个功能,在test队列中发布了一条消息hello
然后清空一下消息
刷新页面,然后再查询一下,发现当前队列已经没有消息了
点击上方导航栏的Exchanges,对交换机进行操作。
输入交换机名称,点击Add exchange
如图,添加了一个交换机test_exchange
点击表格中的test_exchange进到交换机详情界面
接下来也介绍一下这个页面的功能。
如图,当前交换机又可以绑定其他交换机或者队列,由此可见,这里的交换机和网络设备的交换机功能一样,queue相当于网络中的计算机,多台计算机通过交换机exchange连接形成一个网络。
同样的,这里的queue通过exchange连接起来,形成一个复杂queue的网络架构。
同样的,也可以绑定路由键,这不就是对应网络设备的路由器吗。
总结,消息队列采用了现实生活中的网络模型,多个queue通过exchange和route相互连接,形成一个复杂的队列结构。
这里发布消息和发布消息到队列中是一样的
如图,在交换机test_exchange上绑定了队列test,并发送了一条消息,test队列中成功获取到了这条消息。
简单的删除功能,不介绍了。
springboot整合RabbitMQ
在idea中创建一个maven项目,命名为rabbitmq-maven。
在pom.xml中添加RabbitMQ的依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>3.0.8</version>
- </dependency>
- </dependencies>
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- public class RabbitExample {
-
- private static Connection getConn() throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
-
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setVirtualHost("/");
- factory.setHost("localhost");
- factory.setPort(5672);
-
- return factory.newConnection();
- }
-
- public static void main(String[] args) {
- testGet();
- }
-
- static void test() {
- String queue = "test_queue";
- String exchange = "test_exchange";
-
- try (Connection conn = getConn(); Channel channel = conn.createChannel()) {
- // 创建队列
- AMQP.Queue.DeclareOk testQueue = channel.queueDeclare(queue, true, false, false, null);
- System.out.println(testQueue.getQueue());
-
- // 队列和交换机绑定
- channel.queueBind(queue, exchange, "");
-
- // 删除队列
- channel.queueDelete(queue);
-
- // 清空队列
- channel.queuePurge(queue);
-
- // 创建交换机
- channel.exchangeDeclare(exchange, "direct");
-
- // 取消交换机和队列的绑定关系
- channel.queueUnbind(queue, exchange, "");
-
- // 发布消息
- channel.basicPublish(exchange, "", null, "send message from exchange test_exchange.".getBytes());
-
- // 交换机和交换机/队列绑定
- channel.exchangeBind(queue, exchange, "");
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- }
-
- static void testGet() {
- Connection conn = null;
- Channel channel = null;
-
- try {
- conn = getConn();
- channel = conn.createChannel();
-
- // 获取队列信息
- AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("test");
-
- // 队列名称
- String queue = declareOk.getQueue();
- // 消息数量
- int messageCount = declareOk.getMessageCount();
- // 消费者数量
- int consumerCount = declareOk.getConsumerCount();
-
- System.out.println(queue);
- System.out.println(messageCount);
- System.out.println(consumerCount);
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- } finally {
- try {
- if (channel != null) {
- channel.close();
- }
- if (conn != null) {
- conn.close();
- }
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
-
- }
创建一个springboot项目,命名为rabbitmq-springboot
添加整合了RabbitMQ的starter依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
完整的pom.xml如下
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.6</version>
- <relativePath/>
- </parent>
-
- <groupId>com.example</groupId>
- <artifactId>rabbitmq</artifactId>
- <version>0.0.1-SNAPSHOT</version>
-
- <properties>
- <java.version>1.8</java.version>
- <lombok.version>1.18.22</lombok.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>${lombok.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
修改application.yml,添加以下内容。
spring: application: name: rabbitmq rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / server: port: 8088 logging: level: com.example.rabbitmq: debug
项目根目录下创建config包,在config包下创建一个类RabbitMQConfig.java
- package com.example.rabbitmq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- @Configuration
- public class RabbitMQConfig {
- // 声明需要使用的交换机/路由Key/队列的名称
- public static final String DEFAULT_EXCHANGE = "exchange";
- public static final String DEFAULT_ROUTE = "route";
- public static final String DEFAULT_QUEUE = "queue";
-
- // 声明交换机,需要几个声明几个,这里就一个
- @Bean
- public DirectExchange exchange(){
- return new DirectExchange(DEFAULT_EXCHANGE);
- }
-
- // 声明队列,需要几个声明几个,这里就一个
- @Bean
- public Queue queue(){
- return new Queue(DEFAULT_QUEUE);
- }
-
- // 声明路由Key(交换机和队列的关系),需要几个声明几个,这里就一个
- @Bean
- public Binding binding(){
- return BindingBuilder.bind(queue()).to(exchange())
- .with(DEFAULT_ROUTE);
- }
-
- }
项目根目录下创建producer包,在producer包下创建一个类RabbitProducer.java
- package com.example.rabbitmq.producer;
-
- import com.example.rabbitmq.config.RabbitMQConfig;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- @Component
- public class RabbitProducer {
- private final RabbitTemplate rabbitTemplate;
-
- @Autowired
- public RabbitProducer(RabbitTemplate rabbitTemplate) {
- this.rabbitTemplate = rabbitTemplate;
- }
-
- /**
- * 发送消息
- * @param message 消息内容
- */
- public void sendMessage(Object message) {
- rabbitTemplate.convertAndSend(RabbitMQConfig.DEFAULT_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTE, message);
- }
-
- }
项目根目录下创建consumer包,在producer包下创建一个类RabbitConsumer.java
- package com.example.rabbitmq.consumer;
-
- import com.example.rabbitmq.config.RabbitMQConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- @Slf4j
- @Component
- @RabbitListener(queues = RabbitMQConfig.DEFAULT_QUEUE)
- public class RabbitConsumer {
-
- @RabbitHandler
- public void receive(Object message) {
- log.debug("收到一条消息:{}", message);
- }
-
- }
项目根目录下创建controller包,在controller包下创建一个类MessageController.java,把我们刚刚创建的消息生产者依赖进来。
- package com.example.rabbitmq.controller;
-
- import com.example.rabbitmq.producer.RabbitProducer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * @author heyunlin
- * @version 1.0
- */
- @RestController
- @RequestMapping(path = "/message", produces = "application/json;charset=utf-8")
- public class MessageController {
- private final RabbitProducer producer;
-
- @Autowired
- public MessageController(RabbitProducer producer) {
- this.producer = producer;
- }
-
- @RequestMapping(value = "/send", method = RequestMethod.POST)
- public void sendMessage(Object message) {
- producer.sendMessage(message);
- }
-
- }
这里可以使用多种方式测试,为了方便起见,使用postman
一测试居然报错了,查看后台报错信息,说是简单的消息转换器只支持String类型和字节数组的消息
我们回到上面,把消息类型的参数都改成String的,改完再测试一下
这一次控制台打印出来了收到消息的日志,至此,springboot整合RabbitMQ完美完成。
文章涉及的代码已同步到gitee,可按需下载
https://gitee.com/he-yunlin/rabbitmq.githttps://gitee.com/he-yunlin/rabbitmq.git
好了,文章就分享到到这里了,创作不易,如果觉得这篇文章对你有所帮助,不要忘了动动你发财的拇指点赞收藏一下哦~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。