赞
踩
目录
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator;main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker
重要参数:
列表:
相关依赖:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> </dependencies>
参数配置:
//kafka配置文件 Properties properties = new Properties(); //添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); //key/value的序列化类型 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //提高集群的吞吐量 //批次大小,默认是16k properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); // linger.ms:等待时间,默认 0,修改为1 properties.put(ProducerConfig.LINGER_MS_CONFIG,10); //compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //RecordAccumulator:缓冲区大小,默认 32M:buffer.memory(注意是以字节为单位) properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //设置ack的类型 properties.put(ProducerConfig.RETRIES_CONFIG,3); //重试次数 properties.put(ProducerConfig.ACKS_CONFIG,"all");//数字也可
创建生产者:
//创建生产者对象 Producer<String, String> firstProducer = new KafkaProducer<String, String>(properties);
异步发送
send方法
参数(new ProducerRecord
)
for (int i = 0; i < 50; i++) { firstProducer.send(new ProducerRecord<String, String>("first","why"+i)); }
带回调函数的异步发送
send方法:
参数:(new ProducerRecord
,new Callback(){}
)
重写onCompletion
方法,Exception
是异常信息,RecordMetadata
是主题的元数据信息
//发送信息 for (int i = 0; i < 500; i++) { firstProducer.send(new ProducerRecord<String, String>("first", "why" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println("partition:"+recordMetadata.partition()+"-----"+recordMetadata.topic()); } } }); Thread.sleep(2); }
同步发送
只需在异步发送的基础上,再调用一下 get()方法即可
//发送信息 for (int i = 0; i < 5; i++) { firstProducer.send(new ProducerRecord<String, String>("first","why"+i)).get(); }
分区
分区的好处
分区策略
1.指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0
//Integer partition,直接赋值即可 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); }
2.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }
3.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
/** * Create a record with no key * * @param topic The topic this record should be sent to * @param value The record contents */ public ProducerRecord(String topic, V value) { this(topic, null, null, null, value, null); }
具体如何进行分区:
/** * Compute the partition for the given record. * * @param topic The topic name * @param numPartitions The number of partitions of the given {@code topic} * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) { if (keyBytes == null) { //没有指定key return stickyPartitionCache.partition(topic, cluster); //使用粘性分区器 } // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
自定义分区器
步骤:
/** * @author why * 自定义的分区器 */ public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partition = 0; String msg = value.toString(); if (msg.contains("why")) { partition = 1; } //设置该条数据应处的分区 return partition; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
在配置文件中配置即可:
//设置自定义的分区器 (使用全类名) properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.why.kafka.tools.MyPartitioner");
不同ack策略的可靠性分析
replica.lag.time.max.ms
参数设定,默认30s。例如2超时,(leader:0, isr:0,1);这样就不用等长期联系不上或者已经故障的节点总结:在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
数据传递语义
At Least Once可以保证数据不丢失,但是不能保证数据不重复
At Most Once可以保证数据不重复,但是不能保证数据不丢失
Exactly Once:可以保证数据不重复也不丢失,通过幂等性和事务来实现(Kafka 0.11版本以后的新特性)
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复
重复数据的判断标准:具有
<PID, Partition, SeqNumber>
相同主键的消息提交时,Broker只会持久化一条
其中PID是Kafka每次重启都会分配一个新的;
Partition 表示分区号;
Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复
如何开启幂等性:
开启参数 enable.idempotence
默认为 true(开启),false 关闭
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //从以下源码中可以看到ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG代表的就是enable.idempotence /** <code>enable.idempotence</code> */ public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
生产者事务
开启事务,必须开启幂等性
//设置事务id(必须) properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); //创建生产者对象 Producer<String, String> firstProducer = new KafkaProducer<String, String>(properties); //初始化事务 firstProducer.initTransactions(); //开启事务 firstProducer.beginTransaction(); for (int i = 0; i < 5; i++) { firstProducer.send(new ProducerRecord<String, String>("first", "why" + i)); } //提交事务 firstProducer.commitTransaction(); //关闭资源 firstProducer.close();
单分区内,有序
多分区,分区与分区间无序
单分区有序的条件:
max.in.flight.requests.per.connection=1
max.in.flight.requests.per.connection
需要设置为1max.in.flight.requests.per.connection
需要设置小于等于5原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。