当前位置:   article > 正文

kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量_o.s.kafka.listener.kafkamessagelistenercontainer -

o.s.kafka.listener.kafkamessagelistenercontainer - partitions assigned

往期精选

    这一篇我们将叙述,我是怎么在项目中进行对kafka优化的我们将从三方面进行考虑,一是代码;二是    配置;三是集群。项目背景,做数据迁移工作后面我将写几篇文章讲诉我们是怎么对百万数据进行迁移的工作)。主要场景利用kafka做读写分离,一直请求源数据写入到kafka生产者,然后kafka消费者进行写入数据到新数据。

一、配置优化《报错》节选:

  1. [2018-09-25 11:23:59.370] ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] LoggingErrorHandler.java:37 - Error while processing: null
  2. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
  3. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)
  4. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
  5. at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)
  6. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1324)
  7. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1185)
  8. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688)
  9. at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
  10. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
  11. at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
  12. at java.util.concurrent.FutureTask.run(FutureTask.java)
  13. at java.lang.Thread.run(Thread.java:748)

这是由于kafka一直生产数据,导致kafka消费太慢了。我们主要优化也是对消费者进行优化。根据上面的报错,我们可以看到一个参数:max-poll-records,所以我们首先将对提交数,进行调大。具体的需要根据项目进行测试,我们把数进行调大到100,同时对下面的参数进行:

#自动提交offset到zookeeper的时间间隔

auto-commit-interval: 1000

#earliest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 

#latest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 

#none 

#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

#提交方式改为false,是否自动周期性提交已经拉取到消费端的消息offset

enable-auto-commit: false

由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略:

  然后我修改了kafka的配置(spring-kafka),需要到安装的文件(config)下进行修改,分别是生产文件和配置文件。

1.session.timeout.ms=100000(增大session超时时间)。

2.request.timeout.ms=110000(socket握手超时时间,默认是3000 但是kafka配置要求大于session.timeout.ms时间).

同时Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

二、代码优化:《日志》节选

如果进行了的配置调优,差不多会提高kafka的消费能力,但是写入过大,控制台还是打印下面日志信息:

  1. 2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-4, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0]
  2. [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0]
  3. [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-3, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXXX-0]
  4. [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXX-0]
  5. [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0]
  6. [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0]
  7. [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:336 - [Consumer clientId=consumer-4, groupId=test-consumer-group] 

也是就说机制一直打印这些信息,但是又不报错,但是又不写入数据,我们就想,除了配置优化之后,能不能像数据库一样,批量提交或者说是批量消费呢?看了官网资料,发现确实可以,以下是我们对代码的优化,由单一的消费,改为批量消费:

一:增加一个config类。

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4.     @Bean
  5.     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  6.         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  7.         factory.setConsumerFactory(consumerFactory());
  8.         factory.setConcurrency(10);
  9.         factory.getContainerProperties().setPollTimeout(1500);
  10.         factory.setBatchListener(true);
  11.         //@KafkaListener 批量消费  每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  12.         factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
  13.         //设置提交偏移量的方式
  14.         return factory;
  15.     }
  16.     public ConsumerFactory<String, String> consumerFactory() {
  17.         return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  18.     }
  19.     public Map<String, Object> consumerConfigs() {
  20.         Map<String, Object> propsMap = new HashMap<>(16);
  21.         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP地址需要修改");
  22.         propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  23.         propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  24.         propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000);
  25.         propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,110000);
  26.         propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27.         propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  28.         propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
  29.         propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  30.         propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 150);
  31.         //每个批次获取数
  32.         return propsMap;
  33.     }
  34. }

二、更改消费接受代码。

  1.  @KafkaListener(topics = {"消费名称需要改"})
  2.     public void listen(List<ConsumerRecord> records, Acknowledgment ack) {
  3.         try {
  4.             for (ConsumerRecord record : records) {
  5.                 Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  6.                 if (kafkaMessage.isPresent()) {
  7.                     Object message = kafkaMessage.get();
  8.                     log.info("----------------- record =" + record);
  9.                     log.info("------------------ message =" + message);
  10.                 }
  11.             }
  12.         } catch (Exception e) {
  13.             log.error("kafka失败,当前失败的批次。data:{}", records);
  14.             e.printStackTrace();
  15.         } finally {
  16.             ack.acknowledge();
  17.         }
  18.     }

集群搭建

前面虽然优化配置和代码,但是代码执行还是不够快,网上寻找资料(提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力),说可以提高分区数量,如果单机怎么提高还是一样的(我们试过了),后来搭建了一个集群。注意我们是使用docker搭建kafka集群的,搭建过程如下。docker-compose.yml内容:

  1. version: '2'
  2. services:
  3.   zookeeper:
  4.     image: wurstmeister/zookeeper
  5.     ports:
  6.       - "2181:2181"
  7.   kafka:
  8.     image: wurstmeister/kafka
  9.     ports:
  10.       - "9095:9095"
  11.     environment:
  12.       KAFKA_ADVERTISED_HOST_NAME: IP地址
  13.       KAFKA_ADVERTISED_PORT: 9095
  14.       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://XXXXXXXX:9095
  15.       KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9095
  16.       KAFKA_DELETE_TOPIC_ENABLE: "true"
  17.       KAFKA_LOG_RETENTION_HOURS: 1
  18.       KAFKA_MESSAGE_MAX_BYTES: 10000000
  19.       KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
  20.       KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 100000
  21.       KAFKA_NUM_PARTITIONS: 2
  22.       KAFKA_DELETE_RETENTION_MS: 1000
  23.       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
  24.       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  25.     volumes:
  26.       - /var/run/docker.sock:/var/run/docker.sock
  27.   kafka-manager:
  28.     image: sheepkiller/kafka-manager
  29.     links:
  30.       - kafka
  31.       - zookeeper
  32.     environment:
  33.       ZK_HOSTS: zookeeper:2181
  34.       APPLICATION_SECRET: letmein
  35.       KM_ARGS: -Djava.net.preferIPv4Stack=true
  36.     ports:
  37.       - "9000:9000"

1.启动的命令:

docker-compose up -d

2.先去修改配置文件的端口,然后再启动相关的命令:

docker-compose scale kafka=2

3.再次修改文件袋的端口,然后再启动相关的命令:

docker-compose scale kafka=3

以上就是我所总结的kafka优化,欢迎有更好的方案进行交流,欢迎关注微信号:繁荣Aaron和转发。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/125502
推荐阅读
相关标签
  

闽ICP备14008679号