赞
踩
目录
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k。
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。
应答acks:
0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader收到数据后应答。
-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。
参数名称 | 描述 |
bootstrap.servers | 生产者连接集群所需的broker地址清单。例如:hadoop01:9092,hadoop02:9092,hadoop03:9092,可以设置1个或者多个,中间用逗号隔开。注意,这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。 |
key.serializer和value.serializer | 指定发送消息的key和value的序列化类型。(注:要写全列名) |
buffer.memory | RecordAccumulator缓冲区总大小,默认32M。 |
batch.size | 缓冲区一批数据的最大值,默认16k。适当增加该值可以提高吞吐量,但是如果该值设置太大会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到batch.size,sender等待linger.time设置的时间到了就会发送数据。单位ms,默认值是0ms:表示没有延迟。 生产环境建议该值大小为5-100ms之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是1-5的数字。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
enable.idempotence | 是否开启幂等性,默认true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值:2147483647。 如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
(1)需求:创建kafka生产者,采用异步发送的方式发送到kafka broker。
(2)代码编写:
1)创建maven工程
2)导入依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.4.1</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
3)创建包
4)编写API代码
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class CustomProducer {
- public static void main(String[] args) {
- /**
- 编写不带回调函数的API代码
- */
- //1.创建kafka生产者的配置对象
- Properties properties = new Properties();
- //2.给kafka配置对象添加配置信息:bootstrap.server
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化:key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
- //3.创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //4.调用send方法,发送信息
- for (int i = 0;i < 5;i++) {
- kafkaProducer.send(new ProducerRecord<>("first","hello kafka" + i));
- }
- //5.关闭资源
- kafkaProducer.close();
- }
- }
(3)测试
1)开启kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
2)执行idea的代码,查看kafka是否接收到消息
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null说明消息发送成功,如果Exception不为null说明消息发送失败。
注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
(1)代码编写
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CustomProducerCallBack {
- public static void main(String[] args) throws InterruptedException {
- /**
- * 编写回调函数异步发送API代码
- */
- //1.创建kafka生产者环境
- Properties properties = new Properties();
- //2.给kafka配置对象添加配置信息:bootstrap.server
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key.value序列化:key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //3.创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //4.调用send方法,发送信息
- for (int a = 0;a < 5; a++) {
- kafkaProducer.send(new ProducerRecord<>("first", "hello mykafka" + a), new Callback() {
- //在kafkaProducer接收到ack时调用,异步调用
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e == null) {
- System.out.println("主题:"+ recordMetadata.topic() + "->"+"分区:"+recordMetadata.partition());
- } else {
- //出现异常打印
- e.printStackTrace();
- }
- }
- });
- //延迟一会会看到数据发往不同的分区
- Thread.sleep(2);
- }
- //5.关闭环境
- kafkaProducer.close();
- }
- }
(2)测试
1)开启消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
2)执行idea的代码,查看控制台中是否有接收到回调消息
3)查看kafka是否接收到消息
只需在异步发送的基础上再调用一下get()方法即可。
(1)代码编写
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
-
- public class CustomProducerS {
- public static void main(String[] args) throws ExecutionException,InterruptedException {
- /**
- * 同步发送API
- */
- //1.创建kafka生产者的配置环境
- Properties properties = new Properties();
- //2.给kafka配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化:key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //3.创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //4.调用send方法,发送消息
- for (int i = 0; i < 8; i++) {
- //异步发送(默认)
- //kafkaProducer.send(new ProducerRecord<>("first","hello!"+i));
- //同步发送
- kafkaProducer.send(new ProducerRecord<>("first","kafka"+i)).get();
- }
- //5.关闭资源
- kafkaProducer.close();
- }
- }
(2)测试
1)开启消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
2)执行idea的代码,查看是否有接收到消息
(1)便于合理使用存储资源:每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度:生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。
(1)默认分区器:DefaultPartitioner
1)指明partition的情况下,直接将指明的值作为partition值;
例如partition=0,所有数据写入分区0。
2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那
么key1对应的value1写入1号分区,key2对应的value2写入0号分区。
3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。
(2)案例一
将数据发送指定partition,如:将数据指定发送分区0中。
1)代码
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
-
- public class CustomProducerPartitions {
- public static void main(String[] args) throws ExecutionException,InterruptedException{
- /**
- * 将数据发送到指定分区
- * 将所有数据发送到分区0中
- */
- //1.创建kafka生产者的配置对象
- Properties properties = new Properties();
- //2.给kafka配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化:key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //3.创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //4.调用send方式,方式消息
- for (int i = 0; i < 5; i++) {
- //指定数据发送到1分区
- kafkaProducer.send(new ProducerRecord<>("first", 0, "", "hi spark" + i), new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e == null) {
- System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
- } else {
- e.printStackTrace();
- }
- }
- });
- }
- //5.关闭环境
- kafkaProducer.close();
- }
- }
2)测试
开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
执行idea中代码,观察控制台和kafka的消息
控制台
kafka
(3)案例二
没有指定partition值但是有key的情况,将key的hash值与topic的partition数进行取余得到partition值
1)代码
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CustomProducerPartitions2 {
- public static void main(String[] args) {
- /**
- * 没有指定partition值但是有key的情况
- * 将key的hash值与topic的partition数进行取余得到partition值
- */
- //1.创建kafka生产者的配置对象
- Properties properties = new Properties();
- //2.给kafka配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化:key.serializer,value.serializer
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //3.创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //4.调用send方式,方式消息
- for (int i = 0; i < 5; i++) {
- //
- kafkaProducer.send(new ProducerRecord<>("first", "a","world" + i), new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e == null) {
- System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
- } else {
- e.printStackTrace();
- }
- }
- });
- }
- //5.关闭环境
- kafkaProducer.close();
- }
- }
2)测试
开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
key=a 查看控制台结果
key=b 查看控制台结果
key=c 查看控制台结果
key=f 查看控制台结果
根据需求可以自己重新实现分区器
(1)需求:实现一个分区器,发送的数据出现jeffry就发送到分区1,否则发送到分区2.
(2)实现:
1)代码
partition:
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.Cluster;
-
- import java.util.Map;
- /**
- * 1.实现接口partitioner
- * 2.实现三个方法:partition,close,configure
- * 3.编写partition方法,返回分区号
- */
- public class MyPartition implements Partitioner {
- /**
- *
- * @param topic 主题
- * @param key 消息的key
- * @param keyBytes 消息的key序列化后的字节数组
- * @param value 消息的value
- * @param valueBytes 消息的value序列化后的字节数组
- * @param cluster 集群元数据可以查看分区信息
- * @return
- */
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- //获取消息
- String msgValue = value.toString();
- //创建partition
- int partition;
- //判断消息是否包含jeffry
- if (msgValue.contains("jeffry")) {
- partition = 1;
- } else {
- partition = 2;
- }
-
- //返回分区号
- return partition;
- }
-
- //关闭资源
- @Override
- public void close() {
-
- }
-
- //配置方法
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
- }
测试自定义partition:
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CustomProducerMyPartitions {
- public static void main(String[] args) {
- /**
- * 测试自定义partition
- */
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
-
- properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zj.kafka.MyPartition");
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- for (int i = 0;i < 5;i++) {
- kafkaProducer.send(new ProducerRecord<>("first", "jeffry" + i), new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e == null) {
- System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
- } else {
- e.printStackTrace();
- }
- }
- });
- }
- kafkaProducer.close();
- }
- }
2)测试
开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
在idea启动执行程序
查看控制台的信息
查看kafka的信息
batch.size:批次大小,默认16k;
linger.ms:等待时间,修改为5-100ms;
compression.type:压缩形式为snappy;
RecordAccumulator:缓冲区大小,修改为64m;
(1)代码
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class Parameters {
- public static void main(String[] args) {
- //创建kafka生产者配置对象
- Properties properties = new Properties();
- //给kafka配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
-
- //batch.size:批次大小,默认16K
- properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
- //linger.ms:等待时间,默认0
- properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
- //RecordAccumulator:缓冲区大小,默认32M buffer。memory
- properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
- //compression.type:压缩,默认none,可以配置gzip、snappy、lzo、zstd
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
-
- //创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //调用send方法,发送信息
- for (int i = 0;i < 5;i++) {
- kafkaProducer.send(new ProducerRecord<>("first","jeffryOne" + i));
- }
- //关闭环境
- kafkaProducer.close();
- }
- }
(2)测试
1)开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
2)在idea中启动执行程序
观察kafka是否接收到消息
(1)ack应答原理
数据可靠性分析:
如果分区副本设置为1个,或者ISR里应答的最小副本数量设置为1( min.insync.replicas默认为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。
数据完全可靠条件 = ACK级别为-1+分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
数据重复分析:
acks: -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。
(2)代码编写
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CustomProducerAck {
- public static void main(String[] args) {
- //创建kafka生产者的配置对象
- Properties properties = new Properties();
- //给kafka配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
-
- //设置acks
- properties.put(ProducerConfig.ACKS_CONFIG,"all");
- //重试次数retries。默认是int最大的值:2147483647
- properties.put(ProducerConfig.RETRIES_CONFIG,5);
-
- //创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //调用send方法,发送信息
- for (int i = 0;i < 5;i++) {
- kafkaProducer.send(new ProducerRecord<>("first","jeffry" + i));
- }
- //关闭环境
- kafkaProducer.close();
- }
- }
(3)测试
1)开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first
2)在idea中启动执行程序,查看kafka情况
(1)至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2;
At Least Once可以保证数据不丢失,但是不能保证数据不重复。
(2)最多一次(At Most Once)= ACK级别设置为0;
At Most Once可以保证数据不重复,但是不能保证数据不丢失。
(3)精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
(1)幂等性原理
幂等性:指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次 = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数>=2)。
判断重复数据的标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。
PID是Kafka每次重启都会分配一个新的;
Partition表示分区号;
Sequence Number是单调自增的。
幂等性只能保证在单分区单会话内不重复。
(2)如何使用幂等性
开启enable.idempotence默认为true,false关闭。
(1)kafka事务原理
注:开启事务必须开启幂等性。
(2)kafka事务的5个API
- //1初始化事务
- void initTransactions();
- //2开启事务
- void beginTransaction() throws ProducerFencedException;
- //3在事务内提交已经消费的偏移量
- void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) throws ProducerFencedException;
- //4提交事务
- void commitTransaction() throws ProducerFencedException;
- //5放弃事务
- void abortTransaction() throws ProducerFencedException;
(3)单个producer,使用事务保证消息的仅一次发送
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class Transaction {
- public static void main(String[] args) throws InterruptedException {
- //1.创建kafka生产者的配置对象
- Properties properties = new Properties();
- //2.给kafka配置对象添加配置信息
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
- //key,value序列化
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
-
- //设置事务id(必须),事务id可以任意起名
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionis-0");
-
- //3.创建kafka生产者对象
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
- //初始化事务
- kafkaProducer.initTransactions();
- //开启事务
- kafkaProducer.beginTransaction();
- try {
- //4.调用send方法,发送信息
- for (int i = 0;i < 5;i++) {
- //发送消息
- kafkaProducer.send(new ProducerRecord<>("first","tom" + i));
- }
- int i = 1/0;
- //提交事务
- kafkaProducer.commitTransaction();
- } catch (Exception e) {
- //中止事务
- kafkaProducer.abortTransaction();
- } finally {
- //5.关闭环境
- kafkaProducer.close();
- }
- }
- }
单分区内数据是有序的;
多发区内数据是无序的;
(1)kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
(2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
1)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
2)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
如果开启了幂等性且缓存的请求个数小于5个。会在服务端重新排序。
本文为学习笔记!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。