当前位置:   article > 正文

spring boot配置双rabbitMq和双kafka_同时使用两个rabbit

同时使用两个rabbit

前言

最近在弄一个需要连接很多中间件的类似数据中台的项目,其中就要连接双rabbitMq和双kafka,顺手记录一下

rabbitMq

首先先看配置文件的配置
在这里插入图片描述
分为了push和receive两个,分别配置host、port、username、password和vhost
然后是config.java文件

@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq", value = "active", havingValue = "true")
public class RabbitConfiguration {

    public static final int MAX_RETRY = 3;

    @Bean(name="pushConnectionFactory")
    @Primary
    public ConnectionFactory pushConnectionFactory(
            @Value("${spring.rabbitmq.push.host}") String host,
            @Value("${spring.rabbitmq.push.port}") int port,
            @Value("${spring.rabbitmq.push.username}") String username,
            @Value("${spring.rabbitmq.push.password}") String password,
            @Value("${spring.rabbitmq.push.virtual-host}") String vHost
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vHost);
        return connectionFactory;
    }

    @Bean(name="receiveConnectionFactory")
    public ConnectionFactory receiveConnectionFactory(
            @Value("${spring.rabbitmq.receive.host}") String host,
            @Value("${spring.rabbitmq.receive.port}") int port,
            @Value("${spring.rabbitmq.receive.username}") String username,
            @Value("${spring.rabbitmq.receive.password}") String password,
            @Value("${spring.rabbitmq.receive.virtual-host}") String vHost
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vHost);
        return connectionFactory;
    }

    @Bean(name="pushRabbitTemplate")
    @Primary
    public RabbitTemplate mergeRabbitTemplate(
            @Qualifier("pushConnectionFactory") ConnectionFactory connectionFactory
    ){
        return rabbitTemplate(connectionFactory);
    }

    @Bean(name="receiveRabbitTemplate")
    public RabbitTemplate receiveRabbitTemplate(
            @Qualifier("receiveConnectionFactory") ConnectionFactory connectionFactory
    ){
        return rabbitTemplate(connectionFactory);
    }

    @Bean(name="pushFactory")
    public SimpleRabbitListenerContainerFactory mergeFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("pushConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name="receiveFactory")
    public SimpleRabbitListenerContainerFactory receiveFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("receiveConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        configurer.configure(factory, connectionFactory);
        return factory;
    }



    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("RabbitMQ Message-[{}] Send {}. {}", correlationData != null ? correlationData.getId() : "undefined", ack ? "SUCCESS" : "FAILURE", cause != null ? cause : "");
            if (!ack && correlationData instanceof RabbitMessageCorrelation) {
                RabbitMessageCorrelation<?> rabbitMessageCorrelation = (RabbitMessageCorrelation<?>) correlationData;
                log.warn("Message-[{}] ACK Failure Round-{}. Because of {}", correlationData.getId(), rabbitMessageCorrelation.getRetryNumber() + 1, cause);
                if (rabbitMessageCorrelation.getRetryNumber() < MAX_RETRY) {
                    Object message = ((RabbitMessageCorrelation<?>) correlationData).getMessage();
                    RabbitMessageCorrelation<?> correlation = new RabbitMessageCorrelation<>(rabbitMessageCorrelation);
                    rabbitTemplate.convertAndSend(rabbitMessageCorrelation.getExchange(), rabbitMessageCorrelation.getRoutingKey(), message, correlation);
                    log.info("Resend Message-[{}] to exchange: {}, routingKey: {}", rabbitMessageCorrelation.getId(), rabbitMessageCorrelation.getExchange(), rabbitMessageCorrelation.getRoutingKey());
                } else {
                    log.error("Message-[{}] failed to send. exchange: {}, routingKey: {}", rabbitMessageCorrelation.getId(), rabbitMessageCorrelation.getExchange(), rabbitMessageCorrelation.getRoutingKey());
                }
            }
        });
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            Message message = returnedMessage.getMessage();
            String content = new String(message.getBody());
            log.error("Message-[{}] Callback,Exchange: {}, RoutingKey: {}, replyCode: {}, replyText: {}.", content, returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText());
        });
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113

需要用的时候只需要@Qualifier就可以了
在这里插入图片描述

kafka

双kafka就非常简单了,如果是一个用于接收一个用于发送的话,只需要配置配置文件就可以解决了

kafka:
    active: true
    # 集群地址
    bootstrap-servers: ${KAFKA_SERVERS}
    listener:
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: single
      concurrency: 1
      missing-topics-fatal: false
    producer:
      bootstrap-servers: ${KAFKA_PRODUCE_SERVERS}
      # 客户端ID
      client-id: ${KAFKA_CLIENT_ID}
      # 重试次数
      retries: 3
      # 应答级别
      # acks=0 把消息发送到kafka就认为发送成功
      # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
      # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
      acks: all
      # 批量处理的最大大小 单位 byte
      batch-size: 4096
      # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      buffer-memory: 33554432
      # Key 序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value 序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息压缩:none、lz4、gzip、snappy,默认为 none。
      compression-type: gzip
    consumer:
      bootstrap-servers: ${KAFKA_CONSUME_SERVERS}
      # 默认消费者组
      group-id: ${KAFKA_GROUP_ID}
      # 自动提交 offset 默认 true
      enable-auto-commit: true
      # 自动提交的频率 单位 ms
      auto-commit-interval: 1000
      # 批量消费最大数量
      max-poll-records: 100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/616992
推荐阅读
相关标签
  

闽ICP备14008679号