当前位置:   article > 正文

Spring整合RabbitMQ_spring rabbitmq

spring rabbitmq

一、RabbitMQ简介

        RabbitMQ是一个消息中间件,常见的消息中间件还有ActiveMQRocketMQKafka等、其中ActiveMQ算是比较老的技术了,RocketMQ为阿里团队研发的其特点和RabbitMQ差不多、而Kafka则侧重于吞吐量。消息中间件的主要功能,流量削峰、解耦合、异步通知。本文主要介绍RabbitMQ的安装、MQ工作原理的简单介绍和springboot项目整合MQ案例。

二、RabbitMQ安装

         推荐使用docker安装。前提条件先安装docker,关于docker的讲解本文就不详细介绍,在"/"根目录下创建doker目录,再创建mq目录,结构如下图:

mq-start.sh启动脚本,data为挂载数据的目录,启动后自动生成。 

mq-start.sh详情

  1. #!/bin/bash
  2. docker rm -f rabbitmq
  3. docker run -d \
  4. -v /docker/mq/data:/var/lib/rabbitmq \
  5. -p 5672:5672 -p 15672:15672 --name rabbitmq --restart=always \
  6. --hostname myRabbit rabbitmq:3-management

创建 mq-start.sh文件后使用 chmod +x mq-start.sh 增加可执行权限。

启动mq直接执行命令:"./mq-start.sh",docker会自动删除原来的镜像再拉去指定的镜像启动。

 5672为RabbitMQ默认端口15672为网页控制台端口,都需要在云服务器控制台的安全组策略添加这两个端口使得外部可以访问。

启动成功后打开浏览器输入服务器地址:15672,弹出控制台登录页面则安装启动成功!

默认用户名:guest,默认密码:guest

登录成功后主页面如下,然后我们创建一个admin用户,角色设置为administrator,以后就通过admin用户进行操作。

 至此RabbitMQ安装启动完毕!

三、SpringBoot整合RabbitMQ

1.创建项目填写信息和选择rabbitmq依赖

创建成功后刷新maven依赖。

 此外还需要的依赖如下:

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. <version>2.9.5</version>
  5. </dependency>

项目结构如下:

 config 配置,consumer 消息消费者,producer 消息生产方

 2.配置连接和操作对象

        看了我之前的文章的朋友都知道spring 提供了自动配置类,里面规定了该配什么或者一些默认值,如图mq的自动配置类为 RabbitAutoConfiguration

 个人喜欢用yml配置,详情如下:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1 ---改为服务器地址
  4. port: 5672
  5. username: admin
  6. password: admin
  7. virtual-host: /

3.配置交换机和消息队列(注解方式)

  1. package com.example.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. @Configuration
  12. public class MQConfig {
  13. //将数据自动的转为json发送出去
  14. @Bean
  15. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  16. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  17. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  18. return rabbitTemplate;
  19. }
  20. /**
  21. * 申明一个FanoutExchange
  22. * 名字:hot_send_fanout_exchange
  23. * 开启持久化
  24. * 自动删除关闭
  25. *
  26. * @return
  27. */
  28. @Bean
  29. public FanoutExchange e1() {
  30. return new FanoutExchange("hot_send_fanout_exchange", true, false);
  31. }
  32. /**
  33. * 申明一个队列
  34. * 名字:q1
  35. *
  36. * @return
  37. */
  38. @Bean
  39. public Queue q1() {
  40. return new Queue("q1", true);
  41. }
  42. /**
  43. * 申明一个队列
  44. * 名字:q2
  45. *
  46. * @return
  47. */
  48. @Bean
  49. public Queue q2() {
  50. return new Queue("q2", true);
  51. }
  52. /**
  53. * 将q1 绑定到 hot_send_fanout_exchange
  54. *
  55. * @return
  56. */
  57. @Bean
  58. public Binding bin1() {
  59. return BindingBuilder.bind(q1()).to(e1());
  60. }
  61. /**
  62. * 将q2 绑定到 hot_send_fanout_exchange
  63. *
  64. * @return
  65. */
  66. @Bean
  67. public Binding bin2() {
  68. return BindingBuilder.bind(q2()).to(e1());
  69. }
  70. }

        启动项目后能在mq控制台看到自己创建的交换机和消息队列就成功了,若失败请仔细检查配置文件地址和maven依赖是否正确结合控制台错误信息排查错误。 

4.发送消息

  1. @Component
  2. public class MsgProducer {
  3. @Resource
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(String exchangeName, String routingKey, Object msg) {
  6. rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
  7. }
  8. }

发送消息主要靠 RabbitTemplateconvertAndSend方法,这个方法很多重载但参数大多如下:String exchange:交换机名称
String routingKey:路由键
Object object:数据 上面配置了以json形式发送

4.接收消息

  1. @Component
  2. @RabbitListener(queues = {"q1"})
  3. public class MsgReceive {
  4. @RabbitHandler
  5. public void getMsg(byte[] msg) {
  6. System.out.println(new String(msg));
  7. }
  8. }

只需要2个注解@RabbitListener@RabbitHandler其中RabbitListener配置监听队列的名称,RabbitHandler标记方法为处理信息的方法。

6.测试

  1. @SpringBootTest
  2. class DemoMqApplicationTests {
  3. @Autowired
  4. private MsgProducer msgProducer;
  5. class User {
  6. private String name;
  7. private int age;
  8. public void setName(String name) {
  9. this.name = name;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public int getAge() {
  15. return age;
  16. }
  17. public void setAge(int age) {
  18. this.age = age;
  19. }
  20. }
  21. @Test
  22. void contextLoads() {
  23. User user = new User();
  24. user.setAge(18);
  25. user.setName("zhangsan");
  26. msgProducer.send("hot_send_fanout_exchange", "", user);
  27. }
  28. }

就这样消息的发送和接收都完成了,入门案例就到此为止了。朋友们可以尝试定义其它类型的交换机比如topic exchange 来通过routing key 指定路由到特定的消息队列里。

关于RabbitMQ的高级用法如延时队列和死信队列实现分布式事务等在后续文章为大家分享,如果对本文有任何疑惑欢迎留言,大家一起学习、共同进步!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/933779
推荐阅读
相关标签
  

闽ICP备14008679号