当前位置:   article > 正文

SpringBoot集成RabbitMQ_spring-boot-starter-amqp 持久化

spring-boot-starter-amqp 持久化

消息队列 -- 流量削峰

MQ的六种工作模式:

1.简单模式

2.工作模式

3.发布订阅模式

4.路由模式

5.主题模式

6.RPC模式

RabbitMQ 的事务消息执行流程

第一步发送“半消息”
第二步执行本地事务
第三步对事务消息进行提交或回滚:

使用RabbitMQ

在RabbitMQ网站上查看Queues

 指定查看消息队列的条数

处理生产者发送消息,消费者在没处理完手里消息的情况继续接收消息

  1. autoAck = false ,手动确认模式,让服务器可以知道消费者有没有处理完消息
  2. Qos = 1,设置每次只收1条消息,处理完之前的不收下一条

Ready  未读的消息     Unacked  没有确认的消息  Total  服务器端的消息(如图)

持久化设置

      1.队列持久化(第二个参数设置为true)

        2.消息持久化 

 SrpingBoot集成RabbitMQ

一:配置pom包,主要是添加spring-boot-starter-amqp的支持

  1. <dependency>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

二:yml文件中配置rabbitMQ的配置信息:

  1. spring:
  2. application:
  3. name:
  4.   rabbitmq:
  5.     host: 127.0.0.1
  6.     username: guest
  7.     password: guest
  8.     port: 5672
  9. virtual-host: /ems

注解      //默认持久化   不自动删除

  1. @RabbitListener(queuesToDeclare = @Queue( value = “队列名”,durable= “true”[表示是否实现持久化],autoDelete=”true”))  --- 消费者上 写在serviceimpl 类上
  2. @RabbitHandler  代表从队列取出消息时的回调方法,写在方法上 配合上面注解一起使用 .
  3. 注入rabbitTemplate

查看交换机

Demo

 主题模式

路由模式 

订阅模式 

#表示1个或多个   *代表1个

MQ的应用场景:异步处理(串行方式,并行方式,消息队列)、应用解耦、流量削峰

RabbitMQ的集群

1.普通集群

核心解决问题:当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份。

主备架构--  主从复制集群

2.镜像集群

Dos

Rabbitmqctl cluster_status  --查看rabbitMQ集群状态

3.代码实现

消费者

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.stereotype.Component;
  4. import com.rabbitmq.client.Channel;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. @Component
  8. public class RabbitMqReceiveHandler {
  9. private static final Logger log = LoggerFactory.getLogger(RabbitMqReceiveHandler.class);
  10. private final LogEventSubscribeService logEventSubscribeServiceImpl;
  11. private final LogApiService logApiServiceImpl;
  12. private final LogEmailService logEmailServiceImpl;
  13. public RabbitMqReceiveHandler(LogEventSubscribeService logEventSubscribeServiceImpl, LogApiService logApiServiceImpl, LogEmailService logEmailServiceImpl){
  14. this.logEventSubscribeServiceImpl = logEventSubscribeServiceImpl;
  15. this.logApiServiceImpl = logApiServiceImpl;
  16. this.logEmailServiceImpl = logEmailServiceImpl;
  17. }
  18. //监听队列
  19. @RabbitListener(queues = {RabbitmqConfig.QUEUE_TESTEVENT})
  20. public void receive_lenovo(Message message, Channel channel){
  21. if(logEventSubscribeServiceImpl.handlingSubscribedMessages(message)){
  22. log.info("消息处理完成....");
  23. }
  24. }

RabbitMQ配置类 

  1. import cn.hutool.json.JSONUtil;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.*;
  6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. @Configuration
  15. public class RabbitmqConfig {
  16. private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
  17. public static final String QUEUE_TESTEVENT = "testEvent";
  18. public static final String TESTEVENT_EXCHANGE="testEvent_Exchange";
  19. public static final String ROUTINGKEY_TESTEVENT="testEvent_routeKey";
  20. public static final boolean DURABLE= true; // 是否持久化
  21. public static final boolean EXCLUSIVE= false; //是否开启当前创建连接
  22. public static final boolean AUTO_DELETE= false; //是否自动删除
  23. @Autowired
  24. private ApolloConfig apolloConfig;
  25. @Value("${rabbit-env}")
  26. private String rabbitEnv;
  27. //声明队列
  28. @Bean(QUEUE_TESTEVENT)
  29. public Queue testEventQueue() {
  30. JSONObject rabitCfg = apolloConfig.getPropertyForObject(ApolloEnum.RABIT_CFG.getValue()
  31. , JSONObject.class);
  32. log.info("raw mq config:{}", rabitCfg.toJSONString());
  33. JSONObject configEnv = rabitCfg.getJSONObject(rabbitEnv);
  34. log.info("Online mq config:{}", configEnv.toJSONString());
  35. Map<String, Object> map = new HashMap<>(10);
  36. map.put("x-message-ttl", Integer.parseInt(configEnv.getString(ApolloEnum.TTL.getValue()))*1000);
  37. return new Queue(QUEUE_TESTEVENT, DURABLE, EXCLUSIVE, AUTO_DELETE, map);
  38. }
  39. //Direct交换机
  40. @Bean(TESTEVENT_EXCHANGE)
  41. public DirectExchange testEventExchange() {
  42. return new DirectExchange(TESTEVENT_EXCHANGE, DURABLE, AUTO_DELETE);
  43. }
  44. //绑定交换机
  45. @Bean
  46. public Binding bindingDirect(@Qualifier(QUEUE_TESTEVENT) Queue queue,
  47. @Qualifier(TESTEVENT_EXCHANGE) Exchange exchange) {
  48. return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_TESTEVENT).noargs();
  49. }
  50. @Bean
  51. public CachingConnectionFactory cachingConnectionFactory(){
  52. CachingConnectionFactory factory = new CachingConnectionFactory();
  53. log.info("开始获取rabbitmq连接信息");
  54. RabbitConfigVm rabbitConfig = this.getRabitCfgFromApollo();
  55. log.info("rabbitmq连接信息:{}", JSONUtil.toJsonStr(rabbitConfig));
  56. factory.setHost(rabbitConfig.getHost().trim());
  57. factory.setPort(rabbitConfig.getPort());
  58. factory.setUsername(rabbitConfig.getUsername().trim());
  59. factory.setPassword(rabbitConfig.getPassword().trim());
  60. factory.setVirtualHost("/");
  61. return factory;
  62. }
  63. private RabbitConfigVm getRabitCfgFromApollo(){
  64. JSONObject rabitCfg = apolloConfig.getPropertyForObject(ApolloEnum.RABIT_CFG.getValue()
  65. , JSONObject.class);
  66. log.info("raw rabbitmq连接信息:{}", rabbitCfg.toJSONString());
  67. RabbitConfigVm rabbitConfigVm = new RabbitConfigVm();
  68. log.info("****rabbitmq env:{}", rabbitEnv);
  69. JSONObject configEnv = rabitCfg.getJSONObject(rabbitEnv);
  70. rabbitConfigVm.setHost(configEnv.getString(ApolloEnum.HOST_NAME.getValue()));
  71. rabbitConfigVm.setPort(Integer.parseInt(configEnv.getString(ApolloEnum.PORT.getValue())));
  72. rabbitConfigVm.setUsername(configEnv.getString(ApolloEnum.USER_NAME.getValue()));
  73. rabbitConfigVm.setPassword(configEnv.getString(ApolloEnum.PASSWORD.getValue()));
  74. log.info("****apollo rabbit config**:{}", rabbitConfigVm.toString());
  75. return rabbitConfigVm;
  76. }

生产者

  1. @Component
  2. public class RabbitMQProducerUtils {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. // public RabbitMQProducerUtils(RabbitTemplate rabbitTemplate){
  6. // this.rabbitTemplate = rabbitTemplate;
  7. // }
  8. //向队列里抛消息
  9. public void sendMessage(String paramsStr){
  10. rabbitTemplate.convertAndSend(RabbitmqConfig.TESTEVENT_EXCHANGE, RabbitmqConfig.ROUTINGKEY_TESTEVENT, paramsStr);
  11. }
  12. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/728239
推荐阅读
相关标签
  

闽ICP备14008679号