赞
踩
springboot 版本为 2.0.6
1.配置文件
- kafka:
- bootstrap-servers: kafka地址
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- retries: 3
- buffer-memory: 40960
- batch-size: 4096
- consumer:
- group-id: gps_mileage_second
- auto-offset-reset: earliest
- enable-auto-commit: false
- auto-commit-interval: 1000
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.添加Java文件
- public class VehiclePartitioner implements Partitioner {
-
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- if (key == null) {
- Random rand = new Random();
- return rand.nextInt(numPartitions);
- }
- int floorMod = Math.floorMod(key.hashCode(), numPartitions);
- return floorMod;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
- }
- @Configuration
- @EnableKafka
- public class KafkaConfiguration {
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${spring.kafka.consumer.enable-auto-commit}")
- private Boolean autoCommit;
-
- @Value("${spring.kafka.consumer.auto-commit-interval}")
- private Integer autoCommitInterval;
-
- @Value("${spring.kafka.consumer.group-id}")
- private String groupId;
-
-
- @Value("${spring.kafka.consumer.auto-offset-reset}")
- private String autoOffsetReset;
-
- /**
- * 消息发送失败重试次数
- */
- @Value("${spring.kafka.producer.retries}")
- private int retries;
- /**
- * 消息批量发送容量
- */
- @Value("${spring.kafka.producer.batch-size}")
- private int batchSize;
- /**
- * 缓存容量
- */
- @Value("${spring.kafka.producer.buffer-memory}")
- private int bufferMemory;
-
- /**
- * 消费者配置信息
- */
- @Bean
- public Map<String, Object> consumerConfigs() {
- logger.info("Kafka消费者配置");
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
- props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return props;
- }
-
-
-
- /**
- * 生产者相关配置
- * @return
- */
- public Map<String, Object> producerConfigs() {
- logger.info("Kafka生产者配置");
- Map<String, Object> props = new HashMap<>(6);
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.RETRIES_CONFIG, retries);
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, VehiclePartitioner.class);
- return props;
- }
- /**
- * 生产者创建工厂
- * @return
- */
- public ProducerFactory<String, String> producerFactory() {
- return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- /**
- * kafkaTemplate 覆盖默认配置类中的kafkaTemplate
- * @return
- */
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- return new KafkaTemplate<String, String>(producerFactory());
- }
- }
3.创建生产者
根据key不同,发往不同的kafka分区
- @Component
- public class KafkaProducer {
-
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- protected final Logger logger = Logger.getLogger(this.getClass());
-
-
-
- public void sendMessage(String topic,String key, String message) {
- if(key !=null){
- kafkaTemplate.send(topic,key,message);
- }
-
- }
-
- }
4.使用
-
- public class Test{
-
-
- @Autowired
- private KafkaProducer kafkaProducer;
-
- public void test(){
- kafkaProducer.sendMessage(kafka主题,特定的key,消息);
- }
-
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。