赞
踩
RabbitMQ是一个消息中间件,常见的消息中间件还有ActiveMQ、RocketMQ、Kafka等、其中ActiveMQ算是比较老的技术了,RocketMQ为阿里团队研发的其特点和RabbitMQ差不多、而Kafka则侧重于吞吐量。消息中间件的主要功能,流量削峰、解耦合、异步通知。本文主要介绍RabbitMQ的安装、MQ工作原理的简单介绍和springboot项目整合MQ案例。
推荐使用docker安装。前提条件先安装docker,关于docker的讲解本文就不详细介绍,在"/"根目录下创建doker目录,再创建mq目录,结构如下图:
mq-start.sh启动脚本,data为挂载数据的目录,启动后自动生成。
mq-start.sh详情
- #!/bin/bash
- docker rm -f rabbitmq
- docker run -d \
- -v /docker/mq/data:/var/lib/rabbitmq \
- -p 5672:5672 -p 15672:15672 --name rabbitmq --restart=always \
- --hostname myRabbit rabbitmq:3-management
创建 mq-start.sh文件后使用 chmod +x mq-start.sh 增加可执行权限。
启动mq直接执行命令:"./mq-start.sh",docker会自动删除原来的镜像再拉去指定的镜像启动。
5672为RabbitMQ默认端口15672为网页控制台端口,都需要在云服务器控制台的安全组策略添加这两个端口使得外部可以访问。
启动成功后打开浏览器输入服务器地址:15672,弹出控制台登录页面则安装启动成功!
默认用户名:guest,默认密码:guest
登录成功后主页面如下,然后我们创建一个admin用户,角色设置为administrator,以后就通过admin用户进行操作。
至此RabbitMQ安装启动完毕!
创建成功后刷新maven依赖。
此外还需要的依赖如下:
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.9.5</version>
- </dependency>
项目结构如下:
config 配置,consumer 消息消费者,producer 消息生产方
看了我之前的文章的朋友都知道spring 提供了自动配置类,里面规定了该配什么或者一些默认值,如图mq的自动配置类为 RabbitAutoConfiguration
个人喜欢用yml配置,详情如下:
- spring:
- rabbitmq:
- host: 127.0.0.1 ---改为服务器地址
- port: 5672
- username: admin
- password: admin
- virtual-host: /
- package com.example.mq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class MQConfig {
- //将数据自动的转为json发送出去
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
- return rabbitTemplate;
- }
-
- /**
- * 申明一个FanoutExchange
- * 名字:hot_send_fanout_exchange
- * 开启持久化
- * 自动删除关闭
- *
- * @return
- */
- @Bean
- public FanoutExchange e1() {
- return new FanoutExchange("hot_send_fanout_exchange", true, false);
- }
-
- /**
- * 申明一个队列
- * 名字:q1
- *
- * @return
- */
- @Bean
- public Queue q1() {
- return new Queue("q1", true);
- }
-
- /**
- * 申明一个队列
- * 名字:q2
- *
- * @return
- */
- @Bean
- public Queue q2() {
- return new Queue("q2", true);
- }
-
- /**
- * 将q1 绑定到 hot_send_fanout_exchange
- *
- * @return
- */
- @Bean
- public Binding bin1() {
- return BindingBuilder.bind(q1()).to(e1());
- }
-
- /**
- * 将q2 绑定到 hot_send_fanout_exchange
- *
- * @return
- */
- @Bean
- public Binding bin2() {
- return BindingBuilder.bind(q2()).to(e1());
- }
-
- }
启动项目后能在mq控制台看到自己创建的交换机和消息队列就成功了,若失败请仔细检查配置文件地址和maven依赖是否正确结合控制台错误信息排查错误。
- @Component
- public class MsgProducer {
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- public void send(String exchangeName, String routingKey, Object msg) {
- rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
- }
- }
发送消息主要靠 RabbitTemplate的convertAndSend方法,这个方法很多重载但参数大多如下:String exchange:交换机名称
String routingKey:路由键
Object object:数据 上面配置了以json形式发送
- @Component
- @RabbitListener(queues = {"q1"})
- public class MsgReceive {
- @RabbitHandler
- public void getMsg(byte[] msg) {
- System.out.println(new String(msg));
- }
- }
只需要2个注解@RabbitListener和@RabbitHandler其中RabbitListener配置监听队列的名称,RabbitHandler标记方法为处理信息的方法。
- @SpringBootTest
- class DemoMqApplicationTests {
- @Autowired
- private MsgProducer msgProducer;
-
- class User {
- private String name;
- private int age;
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
- }
-
- @Test
- void contextLoads() {
- User user = new User();
- user.setAge(18);
- user.setName("zhangsan");
- msgProducer.send("hot_send_fanout_exchange", "", user);
- }
-
- }
就这样消息的发送和接收都完成了,入门案例就到此为止了。朋友们可以尝试定义其它类型的交换机比如topic exchange 来通过routing key 指定路由到特定的消息队列里。
关于RabbitMQ的高级用法如延时队列和死信队列实现分布式事务等在后续文章为大家分享,如果对本文有任何疑惑欢迎留言,大家一起学习、共同进步!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。