当前位置:   article > 正文

消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

目录

0.交换机种类和区别

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.声明交换机

4.监听队列

4.1 监听普通队列

4.2监听死信队列

 5.削峰填谷的实现


0.交换机种类和区别

  1. Direct Exchange(直连交换机)

    • 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。
    • 当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。
  2. Fanout Exchange(扇出交换机)

    • 扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。
    • 当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。
  3. Topic Exchange(主题交换机)

    • 主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。
    • 绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。
  4. Headers Exchange(标头交换机)

    • 标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。
    • 在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。

如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息

比如此时交换机有20条信息,a,b队列都会分配到20条信息

默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置

1.声明队列和交换机以及RountKey

  1. package com.example.config;
  2. import lombok.Getter;
  3. @Getter
  4. public enum RabbitmqBind {
  5. DATA_CLEAN_PROCESS_DEAD(
  6. RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
  7. RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
  8. RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
  9. false,
  10. false,
  11. null,
  12. null
  13. ),
  14. DATA_CLEAN_PROCESS(
  15. RabbitMqExchangeEnum.E_DIRECT_RCP,
  16. RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
  17. RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
  18. true,
  19. true,
  20. RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
  21. RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),
  22. SMS_CLEAN_DEAD(
  23. RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
  24. RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
  25. RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
  26. true,
  27. false,
  28. null,
  29. null
  30. ),
  31. SMS_CLEAN(
  32. RabbitMqExchangeEnum.E_TOPIC_RCP,
  33. RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
  34. RabbitmqRoutingKey.K_API_TO_DCN_SMS,
  35. true,
  36. true,
  37. RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
  38. RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
  39. ),
  40. ;
  41. /**
  42. * 交换机
  43. */
  44. private RabbitMqExchangeEnum exchange;
  45. /**
  46. * 队列名称
  47. */
  48. private String queueName;
  49. /**
  50. * 路由Key
  51. */
  52. private RabbitmqRoutingKey routingKey;
  53. /**
  54. * 绑定标识
  55. * 是否启用
  56. */
  57. private Boolean isBind;
  58. /**
  59. * 是否绑定死信
  60. */
  61. private Boolean isDeathBelief;
  62. /**
  63. * 绑定的死信交换机
  64. */
  65. private RabbitMqExchangeEnum boundDeadExchange;
  66. /**
  67. * 死信key
  68. */
  69. private RabbitmqRoutingKey deadRoutingKey;
  70. RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
  71. Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
  72. ) {
  73. this.exchange = exchange;
  74. this.queueName = queueName;
  75. this.routingKey = routingKey;
  76. this.isBind = isBind;
  77. this.isDeathBelief = isDeathBelief;
  78. this.boundDeadExchange = boundDeadExchange;
  79. this.deadRoutingKey = deadRoutingKey;
  80. }
  81. /**
  82. * 交换机
  83. */
  84. @Getter
  85. public enum RabbitMqExchangeEnum {
  86. /**
  87. * 交换机定义,类型 - 名称
  88. */
  89. E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
  90. DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),
  91. E_TOPIC_RCP("topic", "E_TOPIC_RCP"),
  92. E_TOPIC_PAY("topic", "E_TOPIC_PAY");
  93. private String exchangeType;
  94. private String exchangeName;
  95. RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
  96. this.exchangeType = exchangeType;
  97. this.exchangeName = exchangeName;
  98. }
  99. }
  100. /**
  101. * 队列名定义
  102. */
  103. public interface RabbitMqQueueConstants {
  104. /**
  105. * 接收清洗数据
  106. */
  107. String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";
  108. /**
  109. * 清洗结束通知
  110. */
  111. String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";
  112. /**
  113. * 死信队列
  114. */
  115. String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";
  116. /**
  117. * 清洗结束通知死信队列
  118. */
  119. String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
  120. }
  121. /**
  122. * routingKey
  123. */
  124. @Getter
  125. public enum RabbitmqRoutingKey {
  126. /**
  127. * 路由
  128. */
  129. K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
  130. K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),
  131. // 路由绑定死信路由
  132. DEAD("DEAD"),
  133. //死信路由
  134. K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
  135. K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
  136. ;
  137. private String keyName;
  138. RabbitmqRoutingKey(String keyName) {
  139. this.keyName = keyName;
  140. }
  141. }
  142. }

2.初始化循环绑定

  1. package com.example.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.Exchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.QueueBuilder;
  6. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
  7. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  8. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  9. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11. import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  14. import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
  15. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  16. import org.springframework.context.annotation.Bean;
  17. import org.springframework.context.annotation.Configuration;
  18. import org.springframework.context.annotation.Lazy;
  19. import javax.annotation.PostConstruct;
  20. import javax.annotation.Resource;
  21. import java.util.Arrays;
  22. @Configuration
  23. @ConditionalOnClass(EnableRabbit.class)
  24. public class MqConfig {
  25. @Resource
  26. protected RabbitTemplate rabbitTemplate;
  27. @Resource
  28. ConnectionFactory connectionFactory;
  29. //
  30. // @Lazy
  31. // @Autowired
  32. // protected RabbitAdmin rabbitAdmin;
  33. //
  34. //
  35. // public static final int DEFAULT_CONCURRENT = 10;
  36. //
  37. // @Bean("customContainerFactory")
  38. // public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
  39. // ConnectionFactory connectionFactory) {
  40. // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  41. // factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
  42. // factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
  43. // configurer.configure(factory, connectionFactory);
  44. // return factory;
  45. // }
  46. //
  47. // @Bean
  48. // @ConditionalOnMissingBean
  49. // public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
  50. // return new RabbitTransactionManager(connectionFactory);
  51. // }
  52. //
  53. // @Bean
  54. // @ConditionalOnMissingBean
  55. // public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  56. // return new RabbitAdmin(connectionFactory);
  57. // }
  58. @PostConstruct
  59. protected void init() {
  60. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  61. rabbitTemplate.setChannelTransacted(true);
  62. //创建exchange
  63. Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
  64. .forEach(rabbitMqExchangeEnum -> {
  65. Exchange exchange = RabbitmqExchange
  66. .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
  67. .createExchange(rabbitMqExchangeEnum.getExchangeName());
  68. rabbitAdmin.declareExchange(exchange);
  69. }
  70. );
  71. //创建队列并绑定exchange
  72. Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
  73. if (RabbitmqBind.getIsBind()) {
  74. if (RabbitmqBind.getIsDeathBelief()) {
  75. //需要绑定死信交换机的队列
  76. rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())
  77. .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())
  78. .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());
  79. rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
  80. Binding.DestinationType.QUEUE,
  81. RabbitmqBind.getExchange().getExchangeName(),
  82. RabbitmqBind.getRoutingKey().getKeyName(), null));
  83. } else {
  84. //不需要绑定死信交换机的队列
  85. rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
  86. true, false, false, null));
  87. rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
  88. Binding.DestinationType.QUEUE,
  89. RabbitmqBind.getExchange().getExchangeName(),
  90. RabbitmqBind.getRoutingKey().getKeyName(), null));
  91. }
  92. }
  93. });
  94. }
  95. }

 绑定的形式由枚举类中定义

3.声明交换机

  1. package com.example.config;
  2. import lombok.Getter;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Exchange;
  6. import org.springframework.amqp.core.TopicExchange;
  7. import java.util.Arrays;
  8. @Getter
  9. @Slf4j
  10. public enum RabbitmqExchange {
  11. DIRECT("direct"){
  12. @Override
  13. public Exchange createExchange(String exchangeName) {
  14. return new DirectExchange(exchangeName, true, false);
  15. }
  16. },
  17. TOPIC("topic"){
  18. @Override
  19. public Exchange createExchange(String exchangeName) {
  20. return new TopicExchange(exchangeName, true, false);
  21. }
  22. };
  23. public static RabbitmqExchange getInstanceByType(String type){
  24. return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
  25. .findAny()
  26. .orElseThrow(() ->
  27. // new ProcessException("无效的exchange type")
  28. new RuntimeException("无效的exchange type")
  29. );
  30. }
  31. private String type;
  32. RabbitmqExchange(String type) {
  33. this.type = type;
  34. }
  35. public abstract Exchange createExchange(String exchangeName);
  36. }

4.监听队列

4.1 监听普通队列

  1. package com.example.listener;
  2. import com.example.config.RabbitmqBind;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang.StringUtils;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. @RabbitListener(queues = {
  11. RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")
  12. //, containerFactory = "customContainerFactory"
  13. public class MqListener {
  14. @RabbitHandler
  15. public void processMessage(String message) {
  16. log.info("DataClean recive message :{} ", message);
  17. process(message);
  18. }
  19. @RabbitHandler
  20. public void processMessage(byte[] message) {
  21. String msg = new String(message);
  22. log.info("DataClean recive message :{} ", msg);
  23. process(msg);
  24. }
  25. /**
  26. * 处理推送消息
  27. * @param message
  28. */
  29. private void process(String message) {
  30. log.info("process message :{}" , message);
  31. if(StringUtils.isBlank(message)) {
  32. log.error("process message is blank , message:{}" , message);
  33. return;
  34. }
  35. }
  36. }

 监听并处理任务

4.2监听死信队列

  1. package com.example.listener;
  2. import com.example.config.RabbitmqBind;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang.StringUtils;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. @RabbitListener(queues = {
  11. RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
  12. public class DeadListener {
  13. @RabbitHandler
  14. public void processMessage(String message) {
  15. log.info("DataClean recive message :{} ", message);
  16. process(message);
  17. }
  18. @RabbitHandler
  19. public void processMessage(byte[] message) {
  20. String msg = new String(message);
  21. log.info("DataClean recive message :{} ", msg);
  22. process(msg);
  23. }
  24. /**
  25. * 处理推送消息
  26. * @param message
  27. */
  28. private void process(String message) {
  29. log.info("Dead process message :{}" , message);
  30. if(StringUtils.isBlank(message)) {
  31. log.error("Dead process message is blank , message:{}" , message);
  32. return;
  33. }
  34. }
  35. }

 5.削峰填谷的实现

把高峰期的消息填进低峰期

可以用拉取的方式来实现

或者用消费者的最大数量和最小数量来实现

channel.basicQos();//设置最大获取未确认消息的数量,实现权重

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/436766
推荐阅读
相关标签
  

闽ICP备14008679号