赞
踩
- @Configuration
- @Slf4j
- public class DynamicQueueExchangeConfig implements ApplicationContextAware {
-
- private ApplicationContext applicationContext;
- private static Map<String, String> msgCodeQueue = null;
-
- @Value("${operate.rabbitmq.commonExchange}")
- private String commonExchange;
-
-
- @PostConstruct
- public void init() {
- msgCodeQueue = MsgCodeEnum.allMsgCodeSet.stream().collect(Collectors.toMap(k -> k, k -> String.format("%s%s", MQ_MSG_CODE, k)));
- log.info("DynamicQueueExchangeConfig 所有的队列 {}", msgCodeQueue);
- //todo
- createQueue();
- createExchange();
- createBinding();
- }
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
-
- // 这个方法在QueueEnum
- public static String queuesNames(String msgCode) throws Throwable {
- return Optional.ofNullable(msgCodeQueue.get(msgCode)).orElseThrow(Throwable::new);
- }
-
-
- public void createQueue() {
- // 遍历队列 将队列注册到spring bean工厂 让spring实现队列的管理
- msgCodeQueue.forEach((key, value) -> {
- this.registerBean(key, QueueBuilder.durable(value).build());
- });
- }
-
- /**
- * @description 动态创建交换机
- */
- public void createExchange() {
- this.registerBean(commonExchange, ExchangeBuilder.directExchange(commonExchange).durable().build());
- }
-
- /**
- * @description 动态将交换机及队列绑定
- */
- public void createBinding() {
- // 遍历队列枚举 将队列绑定到指定交换机
- DirectExchange directExchange = applicationContext.getBean(commonExchange, DirectExchange.class);
- msgCodeQueue.keySet().forEach(queueEnum -> {
- Queue queue = applicationContext.getBean(queueEnum, Queue.class);
- // 声明绑定关系
- Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queue.getName());
- // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
- this.registerBean(queue.getName() + "_binding", binding);
- }
- );
- }
-
- public <T> void registerBean(String beanName, T bean) {
- ConfigurableApplicationContext context = (ConfigurableApplicationContext) applicationContext;
- context.getBeanFactory().registerSingleton(beanName, bean);
- }
- }
动态指定队列
- #{T(com.xxxx.operate.task.config.rabbitmq.config.DynamicQueueExchangeConfig).queuesNames('REMOTE_AUTHORIZE_COMMON')}
-
- T(com.hualala.operate.task.config.rabbitmq.config.DynamicQueueExchangeConfig) 表示类
- queuesNames 表示方法
- @RabbitHandler
- @RabbitListener(containerFactory = "operateRabbitSimpleFactory",
- queues = "#{T(com.xxxx.operate.task.config.rabbitmq.config.DynamicQueueExchangeConfig).queuesNames('REMOTE_AUTHORIZE_COMMON')}")
- public void receivePosAuth(Message message, Channel channel, @Headers Map<String, Object> headMap) {
- processMqData("REMOTE_AUTHORIZE_COMMON", message, channel, headMap);
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。