赞
踩
我们在开发的时候经常会用到kafka作为消息订阅模式,里面会涉及到很多参数的配置,通过参数配置取优化业务处理的过程。其中,我们最常用的参数如下:
kafka: consumer: enable-auto-commit: true group-id: groupid auto-commit-interval: 1000 auto-offset-reset: latest bootstrap-servers: 192.168.10.10:4320,192.168.10.11:4321,192.168.10.12:4322 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer concurrency: 3 max-poll-records: 50 poll-timeout: 1500 batch-listener: false producer: servers: 192.168.10.10:4320,192.168.10.11:4321,192.168.10.12:4322 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer switch-enable: true retries: 3 batch: size: 16384 linger: 0 buffer: memory: 33554432 topic: topic_008,topic_001
消费者consumer中enable-auto-commit、auto-commit-interval参数代表开启自动提交,配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费。
auto-commit-interval 的默认值是 5000,单位是毫秒。
消费者consumer中group-id,topic到group之间是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。
在一个消费者组当中可以有一个或者多个消费者实例,它们共享一个公共的group ID,组ID是一个字符串,用来唯一标志一个消费者组,组内的所有消费者协调在一起来消费订阅主题的所有分区,但是同一个topic下的某个分区只能被消费者组中的一个消费者消费,不同消费者组中的消费者可以消费相同的分区。
消费者consumer中auto-offset-reset有三个值,earliest 、latest、none含义如下:
值 | 描述 |
---|---|
earliest | 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 |
latest | 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 |
none | topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 |
消费者中bootstrap-servers用来连接Kafka集群的入口参数,这个参数对应的值通常是Kafka集群中broker的地址。
消费者中concurrency参数用来开启消费者线程数。应用在单机部署环境下,这个参数很好理解,你想要开几个相应设置几个就行,concurrency数不能大于partition数量,因为partition会尽量平均分配给消费者,多出的会再重新分配给某些消费者,即消费者消费的partition数量会不等。
消费者max-poll-records指定每次最大消费消息数量。
消费者poll-timeout指定消费的超时时间。
消费者 batch-listener是否开启批量消费,true 表示批量消费 。
kafka消费者配置
/**kafka 集群,broker-list*/ @Value("${kafka.consumer.bootstrap-servers}") private String servers; /**开启自动提交*/ @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; /**自动提交延迟*/ @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; /**消费者组*/ @Value("${kafka.consumer.group-id}") private String groupId; /**重置消费者的offset*/ @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; /**最多并发数*/ @Value("${kafka.consumer.concurrency}") private int concurrency; /**是否批量拉取*/ @Value("${kafka.consumer.batch-listener}") private boolean batchListener; /**批量拉取个数*/ @Value("${kafka.consumer.max-poll-records}") private int maxPollRecords; /**拉取超时时间*/ @Value("${kafka.consumer.poll-timeout}") private long pollTimeout; /**否启用权限认证*/ @Value("${kafka.consumer.kafkaSecurityStatus}") private int kafkaSecurityStatus; @Value("${kafka.securityConfig}") private String password; @Bean @Primary public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); /*并发数量*/ factory.setConcurrency(concurrency); /*批量获取开关*/ factory.setBatchListener(batchListener); /*设置拉取时间超时间隔*/ factory.getContainerProperties().setPollTimeout(pollTimeout); return factory; } private ConsumerFactory<String, byte[]> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); /* 批量拉取数量*/ propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); /*灵活配置是否启用权限认证开关*/ if (kafkaSecurityStatus == 1) { propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); } return propsMap; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); } @Bean public void configureSaslConsumer() { //如果用-D或者其它方式设置过,这里不再设置。 if (null == System.getProperty("java.security.auth.login.config")) { //这个路径必须是一个文件系统可读的路径,不能被打包到JAR中。 System.setProperty("java.security.auth.login.config",password); } }
2.生产者
retries:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。
batch.size:当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送,批次的大小可以通过batch.size 参数设置.默认是102416(16KB),一个非常大的批次大小可能会浪费内存。
linger:延时发送,比如设置batch size为102416,但是有的时刻消息比较少,过了很久也没有凑够32KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到32KB,也将这个批次发送出去. 比如设置5ms,就是到了5ms,大小没到32KB,也会发出去。
同时设置batch.size和 linger,优先将满足条件的消息发送出去(为互斥),Kafka需要考虑高吞吐量与延时的平衡。
生产者初始化
/**kafka 集群,broker-list*/ @Value("${kafka.producer.servers}") private String servers; /**重试次数*/ @Value("${kafka.producer.retries}") private int retries; /**批次大小*/ @Value("${kafka.producer.batch.size}") private int batchSize; /**等待时间*/ @Value("${kafka.producer.linger}") private int linger; /**RecordAccumulator 缓冲区大小*/ @Value("${kafka.producer.buffer.memory}") private int bufferMemory; /**否启用权限认证*/ @Value("${kafka.producer.kafkaSecurityStatus}") private int kafkaSecurityStatus; /**否启用权限认证*/ @Value("${kafka.securityConfig}") private String password; private Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); /*灵活配置开关是否启用权限认证*/ if (kafkaSecurityStatus == 1) { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); } return props; } private ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * 创建KafkaTemplate模板,并注入spring中 * @return */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } @Bean public void configureSaslProducer() { //如果用-D或者其它方式设置过,这里不再设置。 if (null == System.getProperty("java.security.auth.login.config")) { //这个路径必须是一个文件系统可读的路径,不能被打包到JAR中。 System.setProperty("java.security.auth.login.config",password); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。