赞
踩
目录
Direct Exchange(直连交换机):
Fanout Exchange(扇出交换机):
Topic Exchange(主题交换机):
Headers Exchange(标头交换机):
如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息
比如此时交换机有20条信息,a,b队列都会分配到20条信息
默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置
- package com.example.config;
-
-
- import lombok.Getter;
-
- @Getter
- public enum RabbitmqBind {
-
-
- DATA_CLEAN_PROCESS_DEAD(
- RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
- RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
- RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
- false,
- false,
- null,
- null
- ),
-
- DATA_CLEAN_PROCESS(
- RabbitMqExchangeEnum.E_DIRECT_RCP,
- RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
- RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
- true,
- true,
- RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
- RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),
-
- SMS_CLEAN_DEAD(
- RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
- RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
- RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
- true,
- false,
- null,
- null
- ),
-
- SMS_CLEAN(
- RabbitMqExchangeEnum.E_TOPIC_RCP,
- RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
- RabbitmqRoutingKey.K_API_TO_DCN_SMS,
- true,
- true,
- RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
- RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
- ),
-
-
- ;
-
- /**
- * 交换机
- */
- private RabbitMqExchangeEnum exchange;
-
- /**
- * 队列名称
- */
- private String queueName;
-
- /**
- * 路由Key
- */
- private RabbitmqRoutingKey routingKey;
-
- /**
- * 绑定标识
- * 是否启用
- */
- private Boolean isBind;
-
- /**
- * 是否绑定死信
- */
- private Boolean isDeathBelief;
-
- /**
- * 绑定的死信交换机
- */
- private RabbitMqExchangeEnum boundDeadExchange;
-
- /**
- * 死信key
- */
- private RabbitmqRoutingKey deadRoutingKey;
-
-
- RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
- Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
- ) {
- this.exchange = exchange;
- this.queueName = queueName;
- this.routingKey = routingKey;
- this.isBind = isBind;
- this.isDeathBelief = isDeathBelief;
- this.boundDeadExchange = boundDeadExchange;
- this.deadRoutingKey = deadRoutingKey;
- }
-
- /**
- * 交换机
- */
- @Getter
- public enum RabbitMqExchangeEnum {
-
- /**
- * 交换机定义,类型 - 名称
- */
- E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
- DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),
-
- E_TOPIC_RCP("topic", "E_TOPIC_RCP"),
-
- E_TOPIC_PAY("topic", "E_TOPIC_PAY");
-
- private String exchangeType;
-
- private String exchangeName;
-
- RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
- this.exchangeType = exchangeType;
- this.exchangeName = exchangeName;
- }
- }
-
- /**
- * 队列名定义
- */
- public interface RabbitMqQueueConstants {
-
- /**
- * 接收清洗数据
- */
- String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";
-
- /**
- * 清洗结束通知
- */
- String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";
-
- /**
- * 死信队列
- */
- String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";
-
- /**
- * 清洗结束通知死信队列
- */
- String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
- }
-
- /**
- * routingKey
- */
- @Getter
- public enum RabbitmqRoutingKey {
-
- /**
- * 路由
- */
- K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
- K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),
-
- // 路由绑定死信路由
- DEAD("DEAD"),
-
- //死信路由
- K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
- K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
- ;
-
- private String keyName;
-
- RabbitmqRoutingKey(String keyName) {
- this.keyName = keyName;
- }
- }
-
- }
package com.example.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Arrays; @Configuration @ConditionalOnClass(EnableRabbit.class) public class MqConfig { @Resource protected RabbitTemplate rabbitTemplate; @Resource ConnectionFactory connectionFactory; // // @Lazy // @Autowired // protected RabbitAdmin rabbitAdmin; // // // public static final int DEFAULT_CONCURRENT = 10; // // @Bean("customContainerFactory") // public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, // ConnectionFactory connectionFactory) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConcurrentConsumers(DEFAULT_CONCURRENT); // factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT); // configurer.configure(factory, connectionFactory); // return factory; // } // // @Bean // @ConditionalOnMissingBean // public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { // return new RabbitTransactionManager(connectionFactory); // } // // @Bean // @ConditionalOnMissingBean // public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { // return new RabbitAdmin(connectionFactory); // } @PostConstruct protected void init() { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitTemplate.setChannelTransacted(true); //创建exchange Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values()) .forEach(rabbitMqExchangeEnum -> { Exchange exchange = RabbitmqExchange .getInstanceByType(rabbitMqExchangeEnum.getExchangeType()) .createExchange(rabbitMqExchangeEnum.getExchangeName()); rabbitAdmin.declareExchange(exchange); } ); //创建队列并绑定exchange Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> { if (RabbitmqBind.getIsBind()) { if (RabbitmqBind.getIsDeathBelief()) { //需要绑定死信交换机的队列 rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName()) .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName()) .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build()); rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(), Binding.DestinationType.QUEUE, RabbitmqBind.getExchange().getExchangeName(), RabbitmqBind.getRoutingKey().getKeyName(), null)); } else { //不需要绑定死信交换机的队列 rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(), true, false, false, null)); rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(), Binding.DestinationType.QUEUE, RabbitmqBind.getExchange().getExchangeName(), RabbitmqBind.getRoutingKey().getKeyName(), null)); } } }); } }绑定的形式由枚举类中定义
- package com.example.config;
-
- import lombok.Getter;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Exchange;
- import org.springframework.amqp.core.TopicExchange;
-
- import java.util.Arrays;
-
-
- @Getter
- @Slf4j
- public enum RabbitmqExchange {
-
- DIRECT("direct"){
- @Override
- public Exchange createExchange(String exchangeName) {
- return new DirectExchange(exchangeName, true, false);
- }
- },
-
- TOPIC("topic"){
- @Override
- public Exchange createExchange(String exchangeName) {
- return new TopicExchange(exchangeName, true, false);
- }
- };
-
- public static RabbitmqExchange getInstanceByType(String type){
-
- return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
- .findAny()
- .orElseThrow(() ->
- // new ProcessException("无效的exchange type")
-
- new RuntimeException("无效的exchange type")
- );
- }
-
- private String type;
-
-
- RabbitmqExchange(String type) {
- this.type = type;
- }
-
- public abstract Exchange createExchange(String exchangeName);
-
- }
package com.example.listener; import com.example.config.RabbitmqBind; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component @RabbitListener(queues = { RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5") //, containerFactory = "customContainerFactory" public class MqListener { @RabbitHandler public void processMessage(String message) { log.info("DataClean recive message :{} ", message); process(message); } @RabbitHandler public void processMessage(byte[] message) { String msg = new String(message); log.info("DataClean recive message :{} ", msg); process(msg); } /** * 处理推送消息 * @param message */ private void process(String message) { log.info("process message :{}" , message); if(StringUtils.isBlank(message)) { log.error("process message is blank , message:{}" , message); return; } } }监听并处理任务
- package com.example.listener;
-
- import com.example.config.RabbitmqBind;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- @RabbitListener(queues = {
- RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
- public class DeadListener {
-
- @RabbitHandler
- public void processMessage(String message) {
- log.info("DataClean recive message :{} ", message);
- process(message);
- }
-
- @RabbitHandler
- public void processMessage(byte[] message) {
- String msg = new String(message);
- log.info("DataClean recive message :{} ", msg);
- process(msg);
- }
-
-
- /**
- * 处理推送消息
- * @param message
- */
- private void process(String message) {
- log.info("Dead process message :{}" , message);
- if(StringUtils.isBlank(message)) {
- log.error("Dead process message is blank , message:{}" , message);
- return;
- }
- }
-
- }
把高峰期的消息填进低峰期
可以用拉取的方式来实现
或者用消费者的最大数量和最小数量来实现
channel.basicQos();//设置最大获取未确认消息的数量,实现权重
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。