赞
踩
指定了producer端用于缓冲消息的缓冲区大小,单位是字节。
producer是否压缩消息。
发送给Kafka Broker的key/value 值对,producer将待发送的消息封装进ProducerRecord实例类。
当发送时指定了partition就使用该partition。即kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)指定了发送到哪个具体的分区。
如果kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)没有指定发送到哪个具体的分区,即partition=null(并且key也为空时,如果此时key不为空的话就会采用另一种分区策略key哈希分区策略),并且使用了默认的分区器,那么消息将被随机的发送到主题的各个可用分区上,分区器使用轮询的算法将消息均衡的分布到各个分区。
根据消息的key进行哈希计算,并将消息发送到对应的分区。保证相同key的消息始终被发送到同一个分区,确保消息的顺序性。
用户可以根据自己的需求实现自定义的分区策略,通过实现org.apache.kafka.clients.producer.Partitioner接口来自定义分区选择逻辑。
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.ExecutionException; public class CustomProducerSync { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value序列化(必须): properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建kafka生产者对象 KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 10; i++) { // 默认为异步发送 kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)); // 末尾加get为同步发送 kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get(); } // 5. 关闭资源 kafkaProducer.close(); } }
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class CustomProducer { public static void main(String[] args) { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value序列化(必须): properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建kafka生产者对象 KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 10; i++) { kafkaProducer.send(new ProducerRecord<>("first", "wtyy")); } // 5. 关闭资源 kafkaProducer.close(); } }
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CustomProducerCallBack { public static void main(String[] args) { // 1. 创建kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value序列化(必须): // 序列化器的serialization是一个接口,找到他的实现类 // 我们一般都是使用String properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建kafka生产者对象 KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties); // 4. 调用send方法,发送消息 for (int i = 0; i < 10; i++) { kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法 //(2)消息发送失败 exception != null 也会调用该方法 if (exception == null) { System.out.println(metadata);//使用打印演示 }else{ exception.printStackTrace();//打印异常信息 } } }); } // 5. 关闭资源 kafkaProducer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumer { public static void main(String[] args) { // 1. 创建消费者配置对象 Properties properties = new Properties(); // 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数) // broker的ip地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // 配置 反序列化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。