赞
踩
最近在弄一个需要连接很多中间件的类似数据中台的项目,其中就要连接双rabbitMq和双kafka,顺手记录一下
首先先看配置文件的配置
分为了push和receive两个,分别配置host、port、username、password和vhost
然后是config.java文件
@Slf4j @Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq", value = "active", havingValue = "true") public class RabbitConfiguration { public static final int MAX_RETRY = 3; @Bean(name="pushConnectionFactory") @Primary public ConnectionFactory pushConnectionFactory( @Value("${spring.rabbitmq.push.host}") String host, @Value("${spring.rabbitmq.push.port}") int port, @Value("${spring.rabbitmq.push.username}") String username, @Value("${spring.rabbitmq.push.password}") String password, @Value("${spring.rabbitmq.push.virtual-host}") String vHost ){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vHost); return connectionFactory; } @Bean(name="receiveConnectionFactory") public ConnectionFactory receiveConnectionFactory( @Value("${spring.rabbitmq.receive.host}") String host, @Value("${spring.rabbitmq.receive.port}") int port, @Value("${spring.rabbitmq.receive.username}") String username, @Value("${spring.rabbitmq.receive.password}") String password, @Value("${spring.rabbitmq.receive.virtual-host}") String vHost ){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vHost); return connectionFactory; } @Bean(name="pushRabbitTemplate") @Primary public RabbitTemplate mergeRabbitTemplate( @Qualifier("pushConnectionFactory") ConnectionFactory connectionFactory ){ return rabbitTemplate(connectionFactory); } @Bean(name="receiveRabbitTemplate") public RabbitTemplate receiveRabbitTemplate( @Qualifier("receiveConnectionFactory") ConnectionFactory connectionFactory ){ return rabbitTemplate(connectionFactory); } @Bean(name="pushFactory") public SimpleRabbitListenerContainerFactory mergeFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("pushConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name="receiveFactory") public SimpleRabbitListenerContainerFactory receiveFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("receiveConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); configurer.configure(factory, connectionFactory); return factory; } public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { log.info("RabbitMQ Message-[{}] Send {}. {}", correlationData != null ? correlationData.getId() : "undefined", ack ? "SUCCESS" : "FAILURE", cause != null ? cause : ""); if (!ack && correlationData instanceof RabbitMessageCorrelation) { RabbitMessageCorrelation<?> rabbitMessageCorrelation = (RabbitMessageCorrelation<?>) correlationData; log.warn("Message-[{}] ACK Failure Round-{}. Because of {}", correlationData.getId(), rabbitMessageCorrelation.getRetryNumber() + 1, cause); if (rabbitMessageCorrelation.getRetryNumber() < MAX_RETRY) { Object message = ((RabbitMessageCorrelation<?>) correlationData).getMessage(); RabbitMessageCorrelation<?> correlation = new RabbitMessageCorrelation<>(rabbitMessageCorrelation); rabbitTemplate.convertAndSend(rabbitMessageCorrelation.getExchange(), rabbitMessageCorrelation.getRoutingKey(), message, correlation); log.info("Resend Message-[{}] to exchange: {}, routingKey: {}", rabbitMessageCorrelation.getId(), rabbitMessageCorrelation.getExchange(), rabbitMessageCorrelation.getRoutingKey()); } else { log.error("Message-[{}] failed to send. exchange: {}, routingKey: {}", rabbitMessageCorrelation.getId(), rabbitMessageCorrelation.getExchange(), rabbitMessageCorrelation.getRoutingKey()); } } }); rabbitTemplate.setReturnsCallback(returnedMessage -> { Message message = returnedMessage.getMessage(); String content = new String(message.getBody()); log.error("Message-[{}] Callback,Exchange: {}, RoutingKey: {}, replyCode: {}, replyText: {}.", content, returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText()); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
需要用的时候只需要@Qualifier就可以了
双kafka就非常简单了,如果是一个用于接收一个用于发送的话,只需要配置配置文件就可以解决了
kafka: active: true # 集群地址 bootstrap-servers: ${KAFKA_SERVERS} listener: #设置是否批量消费,默认 single(单条),batch(批量) type: single concurrency: 1 missing-topics-fatal: false producer: bootstrap-servers: ${KAFKA_PRODUCE_SERVERS} # 客户端ID client-id: ${KAFKA_CLIENT_ID} # 重试次数 retries: 3 # 应答级别 # acks=0 把消息发送到kafka就认为发送成功 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 acks: all # 批量处理的最大大小 单位 byte batch-size: 4096 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory: 33554432 # Key 序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # Value 序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息压缩:none、lz4、gzip、snappy,默认为 none。 compression-type: gzip consumer: bootstrap-servers: ${KAFKA_CONSUME_SERVERS} # 默认消费者组 group-id: ${KAFKA_GROUP_ID} # 自动提交 offset 默认 true enable-auto-commit: true # 自动提交的频率 单位 ms auto-commit-interval: 1000 # 批量消费最大数量 max-poll-records: 100 # Key 反序列化类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value 反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset # latest:重置为分区中最新的offset(消费分区中新产生的数据) # none:只要有一个分区不存在已提交的offset,就抛出异常 auto-offset-reset: latest
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。