当前位置:   article > 正文

动态创建Rabbitmq队列和交换器_rabbitmq动态创建队列和交换机

rabbitmq动态创建队列和交换机
  1. @Configuration
  2. @Slf4j
  3. public class DynamicQueueExchangeConfig implements ApplicationContextAware {
  4. private ApplicationContext applicationContext;
  5. private static Map<String, String> msgCodeQueue = null;
  6. @Value("${operate.rabbitmq.commonExchange}")
  7. private String commonExchange;
  8. @PostConstruct
  9. public void init() {
  10. msgCodeQueue = MsgCodeEnum.allMsgCodeSet.stream().collect(Collectors.toMap(k -> k, k -> String.format("%s%s", MQ_MSG_CODE, k)));
  11. log.info("DynamicQueueExchangeConfig 所有的队列 {}", msgCodeQueue);
  12. //todo
  13. createQueue();
  14. createExchange();
  15. createBinding();
  16. }
  17. @Override
  18. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  19. this.applicationContext = applicationContext;
  20. }
  21. // 这个方法在QueueEnum
  22. public static String queuesNames(String msgCode) throws Throwable {
  23. return Optional.ofNullable(msgCodeQueue.get(msgCode)).orElseThrow(Throwable::new);
  24. }
  25. public void createQueue() {
  26. // 遍历队列 将队列注册到spring bean工厂 让spring实现队列的管理
  27. msgCodeQueue.forEach((key, value) -> {
  28. this.registerBean(key, QueueBuilder.durable(value).build());
  29. });
  30. }
  31. /**
  32. * @description 动态创建交换机
  33. */
  34. public void createExchange() {
  35. this.registerBean(commonExchange, ExchangeBuilder.directExchange(commonExchange).durable().build());
  36. }
  37. /**
  38. * @description 动态将交换机及队列绑定
  39. */
  40. public void createBinding() {
  41. // 遍历队列枚举 将队列绑定到指定交换机
  42. DirectExchange directExchange = applicationContext.getBean(commonExchange, DirectExchange.class);
  43. msgCodeQueue.keySet().forEach(queueEnum -> {
  44. Queue queue = applicationContext.getBean(queueEnum, Queue.class);
  45. // 声明绑定关系
  46. Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queue.getName());
  47. // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
  48. this.registerBean(queue.getName() + "_binding", binding);
  49. }
  50. );
  51. }
  52. public <T> void registerBean(String beanName, T bean) {
  53. ConfigurableApplicationContext context = (ConfigurableApplicationContext) applicationContext;
  54. context.getBeanFactory().registerSingleton(beanName, bean);
  55. }
  56. }

动态指定队列

  1. #{T(com.xxxx.operate.task.config.rabbitmq.config.DynamicQueueExchangeConfig).queuesNames('REMOTE_AUTHORIZE_COMMON')}
  2. T(com.hualala.operate.task.config.rabbitmq.config.DynamicQueueExchangeConfig) 表示类
  3. queuesNames 表示方法
  1. @RabbitHandler
  2. @RabbitListener(containerFactory = "operateRabbitSimpleFactory",
  3. queues = "#{T(com.xxxx.operate.task.config.rabbitmq.config.DynamicQueueExchangeConfig).queuesNames('REMOTE_AUTHORIZE_COMMON')}")
  4. public void receivePosAuth(Message message, Channel channel, @Headers Map<String, Object> headMap) {
  5. processMqData("REMOTE_AUTHORIZE_COMMON", message, channel, headMap);
  6. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/688504
推荐阅读
相关标签
  

闽ICP备14008679号