当前位置:   article > 正文

springboot集成kafka指定分区partition发送_springboot集成kafka往分区中发送消息

springboot集成kafka往分区中发送消息

springboot 版本为 2.0.6

1.配置文件

 kafka:
    bootstrap-servers: kafka地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3
      buffer-memory: 40960
      batch-size: 4096
    consumer:
      group-id: gps_mileage_second
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.添加Java文件

public class VehiclePartitioner implements Partitioner {
 
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (key == null) {
            Random rand = new Random();
            return rand.nextInt(numPartitions);
        }
        int floorMod = Math.floorMod(key.hashCode(), numPartitions);
        return floorMod;
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}

@Configuration
@EnableKafka
public class KafkaConfiguration {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;
 
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;
 
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
 
 
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
 
    /**
     * 消息发送失败重试次数
     */
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    /**
     * 消息批量发送容量
     */
    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;
    /**
     * 缓存容量
     */
    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;
 
    /**
     *  消费者配置信息
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        logger.info("Kafka消费者配置");
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
 
  
 
    /**
     * 生产者相关配置
     * @return
     */
    public Map<String, Object> producerConfigs() {
        logger.info("Kafka生产者配置");
        Map<String, Object> props = new HashMap<>(6);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, VehiclePartitioner.class);
        return props;
    }
    /**
     * 生产者创建工厂
     * @return
     */
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    /**
     * kafkaTemplate 覆盖默认配置类中的kafkaTemplate
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116

3.创建生产者

根据key不同,发往不同的kafka分区

@Component
public class KafkaProducer {
 
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
    protected final Logger logger = Logger.getLogger(this.getClass());
 
 
 
    public void sendMessage(String topic,String key, String message) {
        if(key !=null){
            kafkaTemplate.send(topic,key,message);
        }
 
    }
 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4.使用

public class Test{
 
   
    @Autowired
    private KafkaProducer kafkaProducer;
    
    public void test(){
        kafkaProducer.sendMessage(kafka主题,特定的key,消息);
    }
  
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/794095
推荐阅读
相关标签
  

闽ICP备14008679号