赞
踩
一个正常的生产逻辑需要具备以下几个步骤:
(1)配置生产者客户端参数,创建生产者实例
(2)构建待发送的消息
(3)发送消息
(4)关闭生产者实例
- public static void main(String[] args) {
- //配置信息
- Properties props = new Properties();
- //kafka服务器地址
- props.put("bootstrap.servers", "10.65.5.76:19092,10.65.5.77:19092,10.65.5.78:19092");
- //设置数据key和value的序列化处理类
- props.put("key.serializer", StringSerializer.class);
- props.put("value.serializer", StringSerializer.class);
- props.put("acks","1");
- props.put("retries", 1);
- props.put("linger.ms", 100);
- props.put("buffer.memory", 33554432);
-
- KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
- //发送消息
- for (int i = 0; i < 100; i++) {
- JSONObject json = new JSONObject();
- json.put("key1","韩联社刚刚消息中国外交部9日就韩联社提出的有关进口车用尿素问题以书面形式回复称中方重视韩方需求正积极与韩方协商解决");
- json.put("key2","韩联社刚刚消息中国外交部9日就韩联社提出的有关进口车用尿素问题以书面形式回复称中方重视韩方需求正积极与韩方协商解决");
- json.put("key3","韩联社刚刚消息中国外交部9日就韩联社提出的有关进口车用尿素问题以书面形式回复称中方重视韩方需求正积极与韩方协商解决");
- // 每条数据都要封装成一个 ProducerRecord 对象,定义主题
- producer.send(new ProducerRecord<>("test_tini",Thread.currentThread().getName()+"-"+Integer.toString(i),json.toJSONString()));
- System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<:"+i);
- }
- // 关闭资源
- producer.close();
- }
这里有必要说明的是消息对象ProducerRecord,他并不是单纯意义上的消息,它包含多个属性,原本要发送的消息体只是其中的一个value属性。ProducerRecord属性定义如下:
- //主题
- private final String topic;
- //分区号
- private final Integer partition;
- //消息头部
- private final Headers headers;
- //键
- private final K key;
- //值
- private final V value;
- //时间戳
- private final Long timestamp;
在kafka生产者 客户端KafkaProducer中有3个参数是必填的。
该参数用来指定生产者客户端连接kafka集群所需的broker地址。格式为host1:port1,host2:port2。此处建议要设置两个以上broker地址,避免其中一个宕机,导致服务无法使用
broker端接收的消息必须以字节数组的形式存在。
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
以上代码中的泛型对应消息中的key和value的类型,生产者客户端必须使用这种方式,代码才具有良好的可读性,不过在发送王broker之前,需要将消息中对应的key和value做相应的序列化操作。key.serializer和value.serializer这两个参数分别用来指定key和value的序列化操作的序列化器,这两个参数没有默认值。
创建生产者实例和构建消息后,就可以发送消息了。发送消息主要有三种模式:发送即忘(fire-adn-forget),同步(sync)及异步(async)。
KafkaProducer的send()方法并非是void类型的,而是Future<RecordMetadata>类型send()方法有两个重载:
- public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- return this.send(record, (Callback)null);
- }
-
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
- return this.doSend(interceptedRecord, callback);
- }
要实现同步的发送方式,可以利用返回的Future对象实现,如下
- try {
- producer.send(new ProducerRecord<>("biubiubiu",Thread.currentThread().getName()+"-"+Integer.toString(i),json.toJSONString())).get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
实际上send()方法本身就是异步的,Future对象可以使调用方稍后获得发送的结果。示例中在执行send()方法之后链式调用get()方法来阻塞等待kafka的响应,直到消息发送成功,或者发生异常。
Future表示一个任务的生命周期,并提供相应的方法来判断任务是否已经完成或取消。
Future的get()方法可以实现可超时的阻塞: get(long timeout, TimeUnit unit)
KafkaProducer中一般会发生两种类型的异常:可重试异常和不可重试异常。常见的可重试异常有:NetworkException,LeaderNotAvailableException,UnknownLeaderEpochException等。比如NetworkException表示网络异常,通常是由于瞬时网络故障导致的,可以通过重试解决;又比如LeaderNotAvailableException表示分区的的leader副本不可用,这个异常通常发生在leader下线而新的leader选举完成之前,重试之后可以重新恢复。不可重试的异常有RecordTooLargeException异常,表示发送消息太大,不会进行重试,直接抛出异常。对于可重试的异常,如果配置了retries参数,那么只要在规定的重试次数以内自行恢复了,就不会抛出异常。retries默认为0.
异步发送的方式,一般是在send()方法里指定Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
- producer.send(new ProducerRecord<>("biubiubiu",Thread.currentThread().getName()+"-"+Integer.toString(i),json.toJSONString()),
- new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e!=null){
- e.printStackTrace();
- }else {
- System.out.println("topic:"+recordMetadata.topic()+"patition:"+recordMetadata.partition()+"offset:"+recordMetadata.offset());
- }
- }
- });
onCompletion()方法的两个参数是互斥的,消息成功时metadata不为null而exception为null;异常时metadata为null而exception不为null。
消息在发送到broker的过程中,需要确定它发往的分区,如果消息ProducerRecord指定了partition字段,就不需要分区器的介入;如果消息ProducerRecord没有指定了partition字段,那么就需要依赖分区器根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器就是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,这个接口定义了3个方法:
- int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
-
- void close();
-
- default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
- }
partition()方法用来计算分区号,方法中的参数分别代表主题,键,序列化后的键,值,序列化后的值,以及集群的元数据。
close()方法是在关闭分区器时,回收一些资源。
onNewBatch()方法是一种粘性分区策略。粘性分区器通过选择单个分区来发送所有非键记录,解决了将没有键的记录分散成较小批次的问题。一旦该分区的批次被填满或以其他方式完成,粘性分区程序会随机选择并“粘”到一个新分区。这样,在更长的时间内,记录大致均匀地分布在所有分区中,同时获得更大批量的额外好处。详情请跳转:Apache Kafka Producer Improvements: Sticky Partitioner
在默认分区器DefaultPartitioner中,close()是空方法,而在partition()方法中定义了主要的分区分配逻辑。如果key不为null,那么默认的分区器会对key进行哈希,最终根据得到的哈希值来计算分区,拥有相同key的消息会被分配到同一个分区中。如果key为null,在2.4以前的版本中,是以轮询的方式发送到可用分区中;2.4以后的版本中,使用粘性分区策略。
生产者拦截器既可以用来在消息发送之前做一些准备工作,也可以用来在发送回调逻辑前做一些订制化需求。
生产者拦截器的实现主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。接口中包含三个方法:
- ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
-
- void onAcknowledgement(RecordMetadata var1, Exception var2);
-
- void close();
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息做出定制化操作。一般最好不要修改的ProducerRecord的topic,key和partition等消息,如果要修改,则需要确保对其有精准的判断,否则会与预想的效果出现偏差。比如:修改key不仅会影响分区的计算,还会影响broker端日志压缩的功能。
KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前执行。这个方法运行在Producer的I/O线程中,所以这个方法实现逻辑越简单越好,否则会影响消息的发送速度。
close()方法是在关闭拦截器时执行一些资源回收的清理工作。
示例:
- public class ProducerInterceptorPrefix implements ProducerInterceptor {
- private volatile long sendSuccess = 0;
- private volatile long sendFailure = 0;
- @Override
- public ProducerRecord onSend(ProducerRecord producerRecord) {
- String modifyValue = "prefix-" + producerRecord.value();
- return new ProducerRecord(producerRecord.topic(),producerRecord.partition(),producerRecord.timestamp(),producerRecord.key(),modifyValue,producerRecord.headers());
- }
- @Override
- public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
- if (e==null){
- sendSuccess++;
- }else {
- sendFailure++;
- }
- }
- @Override
- public void close() {
- double successRatio = (double) sendSuccess / (sendSuccess+sendFailure);
- System.out.println("[info]发送成功率="+String.format("%f",successRatio*100+"%"));
- }
-
- @Override
- public void configure(Map<String, ?> map) {
-
- }
- }
实现自定义的ProducerInterceptorPrefix后,需要在KafkaProducer的配置参数interceptor.classes中指定这个拦截器,此参数默认值为" "。
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器来形成链式拦截,拦截链会按照interceptor.classes的参数顺序依次执行,配置的时候各拦截器用逗号隔开。
整个生产者客户端由两个线程协调运行,分别为主线程和sender线程。在主线程中由kafkaProducer创建消息,然后通过拦截器,序列化器,分区器的作用之后缓存到消息累加器(RecordAccumulator,也成为消息收集器)中。sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。
RecordAccumulator主要用来缓存消息以便于sender线程可批量发送,进而减少网络传输开销提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置。如果生产者发送消息的速度超过了发送到服务器的速度,则会导致生产者空间不足,这个时候kafkaProducer的send()方法要么阻塞,要么抛出异常,这个取决于max.block.ms的配置。
主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护着一个双端队列,队列的内容就是ProducerBatch,即Deque(ProducerBatch)。消息写入缓存时,追加到队列的尾部;读取消息 时从队列的头部读取。
参数 | 默认值 | 可配置值 | 解释 |
acks | "1" | "1","0","-1"或者“all” | 指定分区必须有多少个副本收到这条消息才会认为消息成功写入。“1”只需要分区leader副本成功即认为写入成功;“0”不需要等待服务端响应;“-1”需要等待ISR中所有的副本都成功 |
max.request.size | 1048576B,即1M。 | 这个参数的用来限制生产者客户端能够发送的消息最大值 | |
retires和retry.backoff.ms | 0 | retires参数是用来配置生产者重试的次数,默认值为0。即不进行重试,即在发送消息的时候不进行任何的重试动作。消息从生产者写入到broker的时候可能会发生一些临时性的异常,比如网络抖动,leader副本选举等,这种异常往往是可以自行恢复的。 | |
compression.type | NONE | 用来指定消息压缩的类型,默认值为:NONE。该参数还可以配置为gzip、snappy、lz4、zstd. | |
connections.max.idle.ms | 540000ms,即9分钟。 | 用来配置多长时间后关闭空闲连接。 | |
linger.ms | 0 | 这个参数用来指定生产者发送ProducerBatch之前等待更多的消息加入的时间,默认值为0。增大这个参数的值,能提升一定的吞吐量,但会增加消息的延迟 | |
receive.buffer.bytes | 默认值为32768,即32KB | 这个参数用来设定Socket接收消息缓冲区的大小,默认值为32768,即32KB。如果设置为-1,则使用操作系统的默认值。如果producer与kafka处于不同的机房,则可以适当的增大这个参数的值。 | |
send.buffer.bytes | 默认值为131072即128K | 这个参数用来设置Socket发送消息的缓冲区大小,默认值为131072即128K。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。 | |
request.timeout.ms | 30000ms | 这个参数用来配置Producer等待响应的最长时间。默认值为30000ms。请求超时之后可以选择进行重试,注意这个参数需要比broker端的参数:replica.lag.time.max.ms的值要大,这个可以减少因客户端重试而引起的消息重复的概率。 | |
bootstrap.servers | 默认值为" " | 默认值为" ",值kafka集群中的broker地址集合。 | |
key.serializer | 默认值 “ ” | 默认值 “ ”,消息中key的序列化方式。需要实现Serializer接口。 | |
value.serializer | 默认值 “ ” | 默认值 “ ”,消息中value的序列化方式。需要实现Serializer接口。 | |
buffer.memory | 默认值:33554432(32M) | 默认值:33554432(32M),生产者客户端用于缓存消息的缓存区的大小。 | |
batch.size | 默认值,16384(16KB) | 默认值,16384(16KB),用于指定ProducerBatch可以复用内存区域大小。 | |
max.in.flight.requests.per.connnection | 默认值 5 | 默认值 5,限制每个连接(也就是客户端与Node节点之间的连接)最多缓存的请求数。 |
《深入理解Kafka核心设计与实践理论》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。