当前位置:   article > 正文

springboot集成kafka指定分区发送_springboot kafka生产者 指定分区

springboot kafka生产者 指定分区

springboot 版本为 2.0.6

1.配置文件

  1. kafka:
  2. bootstrap-servers: kafka地址
  3. producer:
  4. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  5. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. retries: 3
  7. buffer-memory: 40960
  8. batch-size: 4096
  9. consumer:
  10. group-id: gps_mileage_second
  11. auto-offset-reset: earliest
  12. enable-auto-commit: false
  13. auto-commit-interval: 1000
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

2.添加Java文件

  1. public class VehiclePartitioner implements Partitioner {
  2. @Override
  3. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  4. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  5. int numPartitions = partitions.size();
  6. if (key == null) {
  7. Random rand = new Random();
  8. return rand.nextInt(numPartitions);
  9. }
  10. int floorMod = Math.floorMod(key.hashCode(), numPartitions);
  11. return floorMod;
  12. }
  13. @Override
  14. public void close() {
  15. }
  16. @Override
  17. public void configure(Map<String, ?> configs) {
  18. }
  19. }

 

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfiguration {
  4. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  5. @Value("${spring.kafka.bootstrap-servers}")
  6. private String bootstrapServers;
  7. @Value("${spring.kafka.consumer.enable-auto-commit}")
  8. private Boolean autoCommit;
  9. @Value("${spring.kafka.consumer.auto-commit-interval}")
  10. private Integer autoCommitInterval;
  11. @Value("${spring.kafka.consumer.group-id}")
  12. private String groupId;
  13. @Value("${spring.kafka.consumer.auto-offset-reset}")
  14. private String autoOffsetReset;
  15. /**
  16. * 消息发送失败重试次数
  17. */
  18. @Value("${spring.kafka.producer.retries}")
  19. private int retries;
  20. /**
  21. * 消息批量发送容量
  22. */
  23. @Value("${spring.kafka.producer.batch-size}")
  24. private int batchSize;
  25. /**
  26. * 缓存容量
  27. */
  28. @Value("${spring.kafka.producer.buffer-memory}")
  29. private int bufferMemory;
  30. /**
  31. * 消费者配置信息
  32. */
  33. @Bean
  34. public Map<String, Object> consumerConfigs() {
  35. logger.info("Kafka消费者配置");
  36. Map<String, Object> props = new HashMap<>();
  37. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  38. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  39. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  40. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
  41. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
  42. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  43. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  44. return props;
  45. }
  46. /**
  47. * 生产者相关配置
  48. * @return
  49. */
  50. public Map<String, Object> producerConfigs() {
  51. logger.info("Kafka生产者配置");
  52. Map<String, Object> props = new HashMap<>(6);
  53. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  54. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  55. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  56. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  57. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  58. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  59. props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, VehiclePartitioner.class);
  60. return props;
  61. }
  62. /**
  63. * 生产者创建工厂
  64. * @return
  65. */
  66. public ProducerFactory<String, String> producerFactory() {
  67. return new DefaultKafkaProducerFactory<>(producerConfigs());
  68. }
  69. /**
  70. * kafkaTemplate 覆盖默认配置类中的kafkaTemplate
  71. * @return
  72. */
  73. @Bean
  74. public KafkaTemplate<String, String> kafkaTemplate() {
  75. return new KafkaTemplate<String, String>(producerFactory());
  76. }
  77. }

3.创建生产者

根据key不同,发往不同的kafka分区

  1. @Component
  2. public class KafkaProducer {
  3. @Autowired
  4. private KafkaTemplate kafkaTemplate;
  5. protected final Logger logger = Logger.getLogger(this.getClass());
  6. public void sendMessage(String topic,String key, String message) {
  7. if(key !=null){
  8. kafkaTemplate.send(topic,key,message);
  9. }
  10. }
  11. }

4.使用

  1. public class Test{
  2. @Autowired
  3. private KafkaProducer kafkaProducer;
  4. public void test(){
  5. kafkaProducer.sendMessage(kafka主题,特定的key,消息);
  6. }
  7. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/794061
推荐阅读
相关标签
  

闽ICP备14008679号