赞
踩
本文档参看的视频是:
本文档参看的文档是:
在这之前大家可以看我以下几篇文章,循序渐进:
❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程
Topic主题已经创建好了,接下来我们就可以向该主题生产消息了,这里我们采用Java代码通过
Kafka Producer API
的方式生产数据。
(一)创建Map类型的配置对象,根据场景增加相应的配置属性。
(二)创建待发送数据
在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型:
下面的代码,我们很熟悉吧~ 包括topic、key、value
但是! 其实对于表述一份完整的Record 是不完整的!那完整的是什么样的呢~~
like this~
相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而Key可以用于数据的分区定位。
(三)创建生产者对象,发送生产的数据:
根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。
(1) 数据生产者(KafkaProducer):生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者。这里我们简单介绍一下内部的数据转换处理:
(2) 数据收集器(RecordAccumulator):用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。
(3) 数据发送器(Sender):线程对象,用于从收集器对象中获取数据,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中
因为数据真正发送的地方是Broker节点
,不是分区
。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。
将组合后的<节点,List<批次>>
的数据封装成客户端请求(请求键为:Produce)发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。
Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。
// TODO 配置属性集合 Map<String, Object> configMap = new HashMap<>(); // TODO 配置属性:Kafka服务器集群地址 configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作 configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // TODO 创建Kafka生产者对象,建立Kafka连接 // 构造对象时,需要传递配置参数 KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // TODO 准备数据,定义泛型 // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数 ProducerRecord<String, String> record = new ProducerRecord<String, String>( "test", "key1", "value1" ); // TODO 生产(发送)数据 producer.send(record); // TODO 关闭生产者连接 producer.close();
两个线程
在生产者客户端中,主要通过两个线程协调运行:主线程和Sender线程
主线程
主线程负责创建消息,然后将消息传递给拦截器、序列化器和分区器,最后将消息缓存到消息累加器
Sender线程
sender线程负责从消息累加器中获取消息,然后进行组装成<Node, List< ProducerBatch>>
, -> <Node,Request>
->Map<Nodeld, Deque<Request>>
这样的形式发送到对应的broker中
ProducerBatch
ProducerBatch包含多个ProducerRecord,目的是为了消息更加紧凑,提高吞吐量
BufferPool
因为kafka内部是通过ByteBuffer创建和释放内存,ByteBuffer的创建 是很消耗资源的,为了解决这个问题,Kafka封装了BufferPool实现ByteBuffer重复使用,当然并不是所有大小的ByteBuffer都可以缓存 到BufferPoll中的,可以通过配置batch.size,如果小于等于这个大小则可以被缓存到pool中,默认16kb.ProducerBatch 也和这个参数关联。
生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以大家可以根据实际的情况自行选择使用。
但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。
接下来,我们来演示一下拦截器的操作
(1) 实现生产者拦截器接口ProducerInterceptor
package com.atguigu.test; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * TODO 自定义数据拦截器 * 1. 实现Kafka提供的生产者接口ProducerInterceptor * 2. 定义数据泛型 <K, V> * 3. 重写方法 * onSend * onAcknowledgement * close * configure */ public class KafkaInterceptorMock implements ProducerInterceptor<String, String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
(2) 实现接口中的方法,根据业务功能重写具体的方法
方法名 | 作用 |
---|---|
onSend | 数据发送前,会执行此方法,进行数据发送前的预处理 |
onAcknowledgement | 数据发送后,获取应答时,会执行此方法 |
close | 生产者关闭时,会执行此方法,完成一些资源回收和释放的操作 |
configure | 创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换。 |
package com.atguigu.test; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; public class ProducerInterceptorTest { public static void main(String[] args) { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName()); KafkaProducer<String, String> producer = null; try { producer = new KafkaProducer<>(configMap); for ( int i = 0; i < 1; i++ ) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); final Future<RecordMetadata> send = producer.send(record); } } catch ( Exception e ) { e.printStackTrace(); } finally { if ( producer != null ) { producer.close(); } } } }
Kafka发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理,具体代码实现采用匿名类或Lambda表达式都可以。
package com.atguigu.kafka.test; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; public class KafkaProducerASynTest { public static void main(String[] args) { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // TODO 循环生产数据 for ( int i = 0; i < 1; i++ ) { // TODO 创建数据 ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); // TODO 发送数据 producer.send(record, new Callback() { // TODO 回调对象 public void onCompletion(RecordMetadata recordMetadata, Exception e) { // TODO 当数据发送成功后,会回调此方法 System.out.println("数据发送成功:" + recordMetadata.timestamp()); } }); } producer.close(); } }
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。
如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送一下条数据的场合,我们就称之为异步发送。
package com.atguigu.kafka.test; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; public class KafkaProducerASynTest { public static void main(String[] args) { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // TODO 循环生产数据 for ( int i = 0; i < 10; i++ ) { // TODO 创建数据 ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); // TODO 发送数据 producer.send(record, new Callback() { // TODO 回调对象 public void onCompletion(RecordMetadata recordMetadata, Exception e) { // TODO 当数据发送成功后,会回调此方法 System.out.println("数据发送成功:" + recordMetadata.timestamp()); } }); // TODO 发送当前数据 System.out.println("发送数据"); } producer.close(); } }
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程代码作为生产者向缓冲区中放数据,而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。
如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。
代码实现上,采用的是JDK1.5增加的JUC并发编程的Future接口的get方法实现。
package com.atguigu.kafka.test; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; public class KafkaProducerASynTest { public static void main(String[] args) throws Exception { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // TODO 循环生产数据 for ( int i = 0; i < 10; i++ ) { // TODO 创建数据 ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); // TODO 发送数据 producer.send(record, new Callback() { // TODO 回调对象 public void onCompletion(RecordMetadata recordMetadata, Exception e) { // TODO 当数据发送成功后,会回调此方法 System.out.println("数据发送成功:" + recordMetadata.timestamp()); } }).get(); // TODO 发送当前数据 System.out.println("发送数据"); } producer.close(); } }
Kafka中Topic是对数据逻辑上的分类,而Partition才是数据真正存储的物理位置。所以在生产数据时,如果只是指定Topic的名称,其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时,也可以指定数据存储的分区编号。
for ( int i = 0; i < 1; i++ ) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", 0, "key" + i, "value" + i);
final Future<RecordMetadata> send = producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e != null ) {
e.printStackTrace();
} else {
System.out.println("数据发送成功:" + record.key() + "," + record.value());
}
}
});
}
指定分区传递数据是没有任何问题的。Kafka会进行基本简单的校验,比如是否为空,是否小于0之类的,但是你的分区是否存在就无法判断了,所以需要从Kafka中获取集群元数据信息,此时会因为长时间获取不到元数据信息而出现超时异常。所以如果不能确定分区编号范围的情况,不指定分区还是一个不错的选择。
如果不指定分区,Kafka会根据集群元数据中的主题分区来通过算法来计算分区编号并设定:
murmur2非加密散列算法
(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号粘性分区策略
进行分区选择:
在某些场合中,指定的数据我们是需要根据自身的业务逻辑发往指定的分区的。所以需要自己定义分区编号规则,而不是采用Kafka自动设置就显得尤其必要了。Kafka早期版本中提供了两个分区器,不过在当前kafka版本中已经不推荐使用了。
接下来我们就说一下当前版本Kafka中如何定义我们自己的分区规则:分区器
首先我们需要创建一个类,然后实现Kafka提供的分区类接口Partitioner,接下来重写方法。这里我们只关注partition方法即可,因为此方法的返回结果就是需要的分区编号。
package com.atguigu.test; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * TODO 自定义分区器实现步骤: * 1. 实现Partitioner接口 * 2. 重写方法 * partition : 返回分区编号,从0开始 * close * configure */ public class KafkaPartitionerMock implements Partitioner { /** * 分区算法 - 根据业务自行定义即可 * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata * @return 分区编号,从0开始 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
package com.atguigu.test; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; public class ProducerPartitionTest { public static void main(String[] args) { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName()); KafkaProducer<String, String> producer = null; try { producer = new KafkaProducer<>(configMap); for ( int i = 0; i < 1; i++ ) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); final Future<RecordMetadata> send = producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if ( e != null ) { e.printStackTrace(); } else { System.out.println("数据发送成功:" + record.key() + "," + record.value()); } } }); } } catch ( Exception e ) { e.printStackTrace(); } finally { if ( producer != null ) { producer.close(); } } } }
对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。
但是在更多的场景中,我们是需要确定数据是否已经发送成功了且Kafka正确接收到数据的,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。
而这个确定的过程一般是通过Kafka给我们返回的响应确认结果(Acknowledgement)来决定的,这里的响应确认结果我们也可以简称为ACK应答。根据场景,Kafka提供了3种应答处理,可以通过配置对象进行配置
效率最高,可靠性低
当生产数据时,生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,明显可以看出,这种应答方式,数据已经走网络给Kafka发送了,但这其实并不能保证Kafka能正确地接收到数据,在传输过程中如果网络出现了问题,那么数据就丢失了。也就是说这种应答确认的方式,数据的可靠性是无法保证的。不过相反,因为无需等待Kafka服务节点的确认,通信效率倒是比较高的,也就是系统吞吐量会非常高。
当生产数据时,Kafka Leader副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,可以看出,这种应答方式,数据已经存储到了分区Leader副本中,那么数据相对来讲就比较安全了,也就是可靠性比较高。之所以说相对来讲比较安全,就是因为现在只有一个节点存储了数据,而数据并没有来得及进行备份到follower副本,那么一旦当前存储数据的broker节点出现了故障,数据也依然会丢失。
效率最低,可靠性最高,最安全
当生产数据时,Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。
通过图形,可以看出,这种应答方式,数据已经同时存储到了分区Leader副本和follower副本中,那么数据已经非常安全了,可靠性也是最高的。此时,如果Leader副本出现了故障,那么follower副本能够开始起作用,因为数据已经存储了,所以数据不会丢失。
不过这里需要注意,如果假设我们的分区有5个follower副本,编号为1,2,3,4,5
但是此时只有3个副本处于和Leader副本之间处于数据同步状态,那么此时分区就存在一个同步副本列表,我们称之为In Syn Replica,简称为ISR。此时,Kafka只要保证ISR中所有的4个副本接收到了数据,就可以对数据请求进行响应了。无需5个副本全部收到数据。
由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。但其实在某些场景中,数据的丢失并不是真正地丢失,而是“虚假丢失”,比如咱们将ACK应答设置为1,也就是说一旦Leader副本将数据写入文件后,Kafka就可以对请求进行响应了。
此时,如果假设由于网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer眼里,数据已经丢了
所以在这种情况下,kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。
如果此时发送成功,那么Kafka就又收到了数据,而这两条数据是一样的,也就是说,导致了数据的重复。
可能导致数据重复
数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。假设我们需要将编号为1,2,3的三条连续数据发送给Kafka。每条数据会对应于一个连接请求
此时,如果第一个数据的请求出现了故障,而第二个数据和第三个数据的请求正常,那么Broker就收到了第二个数据和第三个数据,并进行了应答。
为了保证数据的可靠性,此时,Kafka Producer会将第一条数据重新放回到缓冲区的第一个。进行重试操作
如果重试成功,Broker收到第一条数据,你会发现。数据的顺序已经被打乱了。
数据乱序
为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。
默认幂等性是不起作用的,所以如果想要使用幂等性操作,只需要在生产者对象的配置中开启幂等性配置即可
配置项 | 配置值 | 说明 |
---|---|---|
enable.idempotence | true | 开启幂等性 |
max.in.flight.requests.per.connection | 小于等于5 | 每个连接的在途请求数,不能大于5,取值范围为[1,5] |
acks | all(-1) | 确认应答,固定值,不能修改 |
retries | >0 | 重试次数,推荐使用Int最大值 |
kafka是如何实现数据的幂等性操作呢,我们这里简单说一下流程:
(1) 开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,kafka中,这个标识采用的是连续的序列号数字sequencenum
,但是不同的生产者Producer可能序列号是一样的,所以仅仅靠seqnum
还无法唯一标记数据,所以还需要同时对生产者进行区分,所以Kafka采用申请生产者ID(producerid)
的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请producerid
以及序列号sequencenum
(2) Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum
进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。
(3) 如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。
(4) 如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱。那么继续,如果不是,那么说明数据已经乱了,发生异常。
(5) Broker根据异常返回响应,通知Producer进行重试
。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试即可。
(6) 如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。
从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是通过原理,咱们也能明白,这种幂等性还是有缺陷的:
对于幂等性的缺陷,kafka可以采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID,保证事务开启后,生产者对象总能获取一致的生产者ID。
为了实现事务,Kafka引入了**事务协调器(TransactionCoodinator)**负责事务的处理,所有的事务逻辑包括分派PID
等都是由TransactionCoodinator
负责实施的。TransactionCoodinator
会将事务状态持久化到该主题中。
事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题 __transaction_state
中,而内部主题的操作是由 TransactionCoodinator
对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。其实这种设计是很巧妙的,因为kafka将事务ID和生产者ID看成了消息数据,然后将数据发送到一个内部主题中。这样,使用事务处理的流程和咱们自己发送数据的流程是很像的。接下来,我们就把这两个流程简单做一个对比。
通过两张图大家可以看到,基本的事务操作和数据操作是很像的,不过要注意,我们这里只是简单对比了数据发送的过程,其实它们的区别还在于数据发送后的提交过程。普通的数据操作,只要数据写入了日志,那么对于消费者来讲。数据就可以读取到了,但是事务操作中,如果数据写入了日志,但是没有提交的话,其实数据默认情况下也是不能被消费者看到的。只有提交后才能看见数据。
Kafka中的事务是分布式事务,所以采用的也是二阶段提交
第一个阶段提交事务协调器会告诉生产者事务已经提交了,所以也称之预提交操作,事务协调器会修改事务为预提交状态
第二个阶段提交事务协调器会向分区Leader节点中发送数据标记,通知Broker事务已经提交,然后事务协调器会修改事务为完成提交状态
特殊情况下,事务已经提交成功,但还是读取不到数据,那是因为当前提交成功只是一阶段提交成功,事务协调器会继续向各个Partition发送marker信息,此操作会无限重试,直至成功。
但是不同的Broker可能无法全部同时接收到marker信息,此时有的Broker上的数据还是无法访问,这也是正常的,因为kafka的事务不能保证强一致性,只能保证最终数据的一致性,无法保证中间的数据是一致的。不过对于常规的场景这里已经够用了,事务协调器会不遗余力的重试,直至成功。
package com.atguigu.test; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; public class ProducerTransactionTest { public static void main(String[] args) { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // TODO 配置幂等性 configMap.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // TODO 配置事务ID configMap.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id"); // TODO 配置事务超时时间 configMap.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5); // TODO 创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // TODO 初始化事务 producer.initTransactions(); try { // TODO 启动事务 producer.beginTransaction(); // TODO 生产数据 for ( int i = 0; i < 10; i++ ) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); final Future<RecordMetadata> send = producer.send(record); } // TODO 提交事务 producer.commitTransaction(); } catch ( Exception e ) { e.printStackTrace(); // TODO 终止事务 producer.abortTransaction(); } // TODO 关闭生产者对象 producer.close(); } }
传输语义 | 说明 | 例子 |
---|---|---|
at most once | 最多一次:不管是否能接收到,数据最多只传一次。这样数据可能会丢失, | Socket, ACK=0 |
at least once | 最少一次:消息不会丢失,如果接收不到,那么就继续发,所以会发送多次,直到收到为止,有可能出现数据重复 | ACK=1 |
Exactly once | 精准一次:消息只会一次,不会丢,也不会重复。 | 幂等+事务+ACK=-1 |
首先,我们从上图中可以看出,RecordAccumulator
会由业务线程写入、Sender 线程读取,这是一个非常明显的生产者-消费者模式,所以我们需要保证 RecordAccumulator
是线程安全的。
RecordAccumulator
中维护了一个 ConcurrentMap<TopicPartition, Deque<ProducerBatch>>
类型的集合,其中的 Key 是 TopicPartition
用来表示目标 partition
,Value 是 ArrayDeque<ProducerBatch>
队列,用来缓冲发往目标 partition 的消息。 这里的 ArrayDeque 并不是线程安全的集合,后面我们会看到加锁的相关操作。
在每个 ProducerBatch
中都维护了一个 MemoryRecordsBuilder
对象,MemoryRecordsBuilder
才是真正存储 message 的地方。
RecordAccumulator 、ProducerBatch、MemoryRecordsBuilder 这三个核心类要明确
其中需要关注的是,所有标识长度的字段都是 varint(或 varlong),也就是变长字段,timestamp 和 offset 都是 delta 值,也就是偏移量。另外,就是 attribute 字段中的所有位都废弃了,并添加 header 扩展。
除了基础的 Record 格式之外,V2 版本中还定义了一个 Record Batch 的结构.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。