赞
踩
Kafka的HighLevel API使用是非常简单的,所以梳理模型时也要尽量简单化,主线清晰,细节慢慢扩展。
Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。
Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.13</artifactId>
- <version>3.4.0</version>
- </dependency>
然后可以使用Kafka提供的Producer类,快速发送消息。
- public class MyProducer {
- private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
- private static final String TOPIC = "disTopic";
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- //PART1:设置发送者相关属性
- Properties props = new Properties();
- // 此处配置的是kafka的端口
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- // 配置key的序列化类
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
- // 配置value的序列化类
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String,String> producer = new KafkaProducer<>(props);
- CountDownLatch latch = new CountDownLatch(5);
- for(int i = 0; i < 5; i++) {
- //Part2:构建消息
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
- //Part3:发送消息
- //单向发送:不关心服务端的应答。
- producer.send(record);
- System.out.println("message "+i+" sended");
- //同步发送:获取服务端应答消息前,会阻塞当前线程。
- RecordMetadata recordMetadata = producer.send(record).get();
- String topic = recordMetadata.topic();
- int partition = recordMetadata.partition();
- long offset = recordMetadata.offset();
- String message = recordMetadata.toString();
- System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);
- //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if(null != e){
- System.out.println("消息发送失败,"+e.getMessage());
- e.printStackTrace();
- }else{
- String topic = recordMetadata.topic();
- long offset = recordMetadata.offset();
- String message = recordMetadata.toString();
- System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
- }
- latch.countDown();
- }
- });
- }
- //消息处理完才停止发送者。
- latch.await();
- producer.close();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
整体来说,构建Producer分为三个步骤:
接下来可以使用Kafka提供的Consumer类,快速消费消息。
- public class MyConsumer {
- private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
- private static final String TOPIC = "disTopic";
-
- public static void main(String[] args) {
- //PART1:设置发送者相关属性
- Properties props = new Properties();
- //kafka地址
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- //每个消费者要指定一个group
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
- //key序列化类
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- //value序列化类
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(TOPIC));
- while (true) {
- //PART2:拉取消息
- // 100毫秒超时时间
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
- //PART3:处理消息
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
- }
- //提交offset,消息就不会重复推送。
- consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
- // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
整体来说,Consumer同样是分为三个步骤:
Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中,最大的变数基本上就是给生产者和消费者的设定合适的属性。这些属性极大的影响了客户端程序的执行方式。
改改配置就学会Kafka了?kafka官方配置: https://kafka.apache.org/documentation/#configuration。看看你晕不晕。
**渔与鱼:**Kafka的客户端API的重要目的就是想要简化客户端的使用方式,所以对于API的使用,尽量熟练就可以了。对于其他重要的属性,都可以通过源码中的描述去学习,并且可以设计一些场景去进行验证。其重点,是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。
其实Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐,那才是Kafka最为精妙的地方。但是要理解那些复杂的问题,都是需要建立在这个基础模型基础上的。
这是我们在使用kafka时,最为重要的一个机制,因此最先进行梳理。
在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。他的描述是这样的:
- public static final String GROUP_ID_CONFIG = "group.id";
- public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
既然这里提到了kafka-based offset management strategy,那是不是也有非Kafka管理Offset的策略呢?
另外,还有一个相关的参数GROUP_INSTANCE_ID_CONFIG,可以给组成员设置一个固定的instanceId,这个参数通常可以用来减少Kafka不必要的rebalance。
从这段描述中看到,对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的:
生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。这就是消费者组的作用。
与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。
[oper@worker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test
这个Offset偏移量,需要消费者处理完成后主动向Kafka的Broker提交。提交完成后,Broker就会更新消费进度,表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交Offset,Broker就会认为这个消息还没有被处理过,就会重新往对应的消费者组进行推送,不过这次,一般会尽量推送给同一个消费者组当中的其他消费者实例。
在示例当中,是通过业务端主动调用Consumer的commitAsync方法或者commitSync方法主动提交的,Kafka中自然也提供了自动提交Offset的方式。使用自动提交,只需要在Comsumer中配置ENABLE_AUTO_COMMIT_CONFIG属性即可。
- public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
- private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
渔与鱼: 从这里可以看到,Offset是Kafka进行消息推送控制的关键之处。这里需要思考两个问题:
一、Offset是根据Group、Partition分开记录的。消费者如果一个Partition对应多个Consumer消费者实例,那么每个Consumer实例都会往Broker提交同一个Partition的不同Offset,这时候Broker要听谁的?所以一个Partition最多只能同时被一个Consumer消费。也就是说,示例中四个Partition的Topic,那么同一个消费者组中最多就只能配置四个消费者实例。
二、这么关键的Offset数据,保存在Broker端,但是却是由"不靠谱"的消费者主导推进,这显然是不够安全的。那么应该如何提高Offset数据的安全性呢?如果你有兴趣自己观察,会发现在Consumer中,实际上也提供了AUTO_OFFSET_RESET_CONFIG参数,来指定消费者组在服务端的Offset不存在时如何进行后续消费。(有可能服务端初始化Consumer Group的Offset失败,也有可能Consumer Group当前的Offset对应的数据文件被过期删除了。)这就相当于服务端做的兜底保障。
ConsumerConfig.AUTO_OFFSET_RESEWT_CONFIG :当Server端没有对应的Offset时,要如何处理。 可选项:
- earliest: 自动设置为当前最早的offset
- latest:自动设置为当前最晚的offset
- none: 如果消费者组对应的offset找不到,就向Consumer抛异常。
- 其他选项: 向Consumer抛异常。
有了服务端兜底后,消费者应该要如何保证offset的安全性呢?有两种方式:一种是异步提交。就是消费者在处理业务的同时,异步向Broker提交Offset。这样好处是消费者的效率会比较高,但是如果消费者的消息处理失败了,而offset又成功提交了。这就会造成消息丢失。另一种方式是同步提交。消费者保证处理完所有业务后,再提交Offset。这样的好处自然是消息不会因为offset丢失了。因为如果业务处理失败,消费者就可以不去提交Offset,这样消息还可以重试。但是坏处是消费者处理信息自然就慢了。另外还会产生消息重复。因为Broker端不可能一直等待消费者提交。如果消费者的业务处理时间比较长,这时在消费者正常处理消息的过程中,Broker端就已经等不下去了,认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息,这就造成了消息重复处理。
这时,如果采取头疼医头,脚疼医脚的方式,当然都有对应的办法。但是都会显得过于笨重。其实这类问题的根源在于Offset反映的是消息的处理进度。而消息处理进度跟业务的处理进度又是不同步的。所有我们可以换一种思路,将Offset从Broker端抽取出来,放到第三方存储比如Redis里自行管理。这样就可以自己控制用业务的处理进度推进Offset往前更新。
生产者拦截机制允许客户端在生产者在消息发送到Kafka集群之前,对消息进行拦截,甚至可以修改消息内容。
这涉及到Producer中指定的一个参数:INTERCEPTOR_CLASSES_CONFIG
public static final Strin
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。