赞
踩
- KafkaProducer<String, String> producer= new KafkaProducer<>(props)
- KafkaProducer<String, String> producer= new KafkaProducer<>(props, new StringSerializer() , new StringSerializer())
- public Future<RecordMetadata> send(ProducerRecord<K, V> record)
- public Future<RecordMetadata> send(ProducerRecord<K , V> record , Callback callback)
- Future<RecordMetadata> future = producer.send(record);
- RecordMetadata metadata = future.get();
- public ProducerRecord(String topic, Integer partition, Long timestamp,K key, V value, Iterable<Header> headers)
-
- public ProducerRecord(String topic, Integer partition, Long timestamp,K key , V value)
-
- public ProducerRecord(String topic , Integer partition, k key, V value,Iterable<Header> headers)
-
- public ProducerRecord(String topic, Integer partition, K key, V value)public ProducerRecord(String topic, K key, V value)
-
- public ProducerRecord(String topic, V value)
- public void configure (Map<String , ?> configs , boolean isKey) //配置当前类
- public byte[] serialize(String topic , T data) //执行序列化操作
- public void close() //关闭当前的序列化器,
- //计算分区号,返回值为int类型
- public int partition(String topic , Object key, byte[] keyBytes , Object value , byte[] valueBytes , Cluster cluster);
- //关闭分区器时回收资源的方法
- public void close();
- //KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作
- public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record);
-
- //在消息被应答之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,该方法会优先于用户设定的Callback之前执行
- public void onAcknowledgement(RecordMetadata metadata, Exception exception );
-
- //主要用于在关闭拦截器时执行一些资源的清理工作
- public void close() ;
- //生产者的interceptor.classes配置
- properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerinterceptorV1.class.getName() + ”,” + ProducerinterceptorV2.class.getName());
线程:整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)
消息累加器(RecordAccumulator)
Sender线程
本小节主要讲生产者客户端的参数
生产者客户端中一个非常重要的参数,可靠性和吞吐量之前的权衡策略,默认值acks = 1
分析
生产者客户端能发送的消息的最大值,默认1MB
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。