赞
踩
注意:
Kafka中提供了内置的性能测试工具
生产者:测试生产每秒传输的数据量(多少条数据、多少M的数据)
5000000 records sent, 11825.446943 records/sec (11.28 MB/sec), 2757.61 ms avg latency
消费者:测试消费每条拉取的数据量
对比生产者和消费者:消费者的速度更快
public class KafkaProducerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); // 3. 发送1-100的消息到指定的topic中 for(int i = 0; i < 100; ++i) { // 构建一条消息,直接new ProducerRecord ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + ""); Future<RecordMetadata> future = kafkaProducer.send(producerRecord); // 调用Future的get方法等待响应 future.get(); System.out.println("第" + i + "条消息写入成功!"); } // 4.关闭生产者 kafkaProducer.close(); } }
/** * 消费者程序 * * 1.创建Kafka消费者配置 * Properties props = new Properties(); * props.setProperty("bootstrap.servers", "node1.itcast.cn:9092"); * props.setProperty("group.id", "test"); * props.setProperty("enable.auto.commit", "true"); * props.setProperty("auto.commit.interval.ms", "1000"); * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * * 2.创建Kafka消费者 * 3.订阅要消费的主题 * 4.使用一个while循环,不断从Kafka的topic中拉取消息 * 5.将将记录(record)的offset、key、value都打印出来 */ public class KafkaConsumerTest { public static void main(String[] args) { // 1.创建Kafka消费者配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.itcast.cn:9092"); // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 props.setProperty("group.id", "test"); // 自动提交offset props.setProperty("enable.auto.commit", "true"); // 自动提交offset的时间间隔 props.setProperty("auto.commit.interval.ms", "1000"); // 拉取的key、value数据的 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2.创建Kafka消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); // 3. 订阅要消费的主题 // 指定消费者从哪个topic中拉取数据 kafkaConsumer.subscribe(Arrays.asList("test")); // 4.使用一个while循环,不断从Kafka的topic中拉取消息 while(true) { // Kafka的消费者一次拉取一批的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5)); // 5.将将记录(record)的offset、key、value都打印出来 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // 主题 String topic = consumerRecord.topic(); // offset:这条消息处于Kafka分区中的哪个位置 long offset = consumerRecord.offset(); // key\value String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value); } } } }
// 二、使用异步回调的方式发送消息 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + ""); kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 1. 判断发送消息是否成功 if(exception == null) { // 发送成功 // 主题 String topic = metadata.topic(); // 分区id int partition = metadata.partition(); // 偏移量 long offset = metadata.offset(); System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset); } else { // 发送出现错误 System.out.println("生产消息出现异常!"); // 打印异常消息 System.out.println(exception.getMessage()); // 打印调用栈 System.out.println(exception.getStackTrace()); } } });
生产者消息重复问题
在Kafka中可以开启幂等性
开启事务的条件
生产者
// 开启事务必须要配置事务的ID
props.put("transactional.id", "dwd_user");
消费者
// 配置事务的隔离级别
props.put("isolation.level","read_committed");
// 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
props.setProperty("enable.auto.commit", "false");
生产者
如果使用了事务,不要使用异步发送
public class TransactionProgram { public static void main(String[] args) { // 1. 调用之前实现的方法,创建消费者、生产者对象 KafkaConsumer<String, String> consumer = createConsumer(); KafkaProducer<String, String> producer = createProducer(); // 2. 生产者调用initTransactions初始化事务 producer.initTransactions(); // 3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic while(true) { try { // (1) 生产者开启事务 producer.beginTransaction(); // 这个Map保存了topic对应的partition的偏移量 Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); // 从topic中拉取一批的数据 // (2) 消费者拉取消息 ConsumerRecords<String, String> concumserRecordArray = consumer.poll(Duration.ofSeconds(5)); // (3) 遍历拉取到的消息,并进行预处理 for (ConsumerRecord<String, String> cr : concumserRecordArray) { // 将1转换为男,0转换为女 String msg = cr.value(); String[] fieldArray = msg.split(","); // 将消息的偏移量保存 // 消费的是ods_user中的数据 String topic = cr.topic(); int partition = cr.partition(); long offset = cr.offset(); int i = 1 / 0; // offset + 1:offset是当前消费的记录(消息)对应在partition中的offset,而我们希望下一次能继续从下一个消息消息 // 必须要+1,从能消费下一条消息 offsetMap.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset + 1)); // 将字段进行替换 if(fieldArray != null && fieldArray.length > 2) { String sexField = fieldArray[1]; if(sexField.equals("1")) { fieldArray[1] = "男"; } else if(sexField.equals("0")){ fieldArray[1] = "女"; } } // 重新拼接字段 msg = fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2]; // (4) 生产消息到dwd_user topic中 ProducerRecord<String, String> dwdMsg = new ProducerRecord<>("dwd_user", msg); // 发送消息 Future<RecordMetadata> future = producer.send(dwdMsg); try { future.get(); } catch (Exception e) { e.printStackTrace(); producer.abortTransaction(); } // new Callback() // { // @Override // public void onCompletion(RecordMetadata metadata, Exception exception) { // // 生产消息没有问题 // if(exception == null) { // System.out.println("发送成功:" + dwdMsg); // } // else { // System.out.println("生产消息失败:"); // System.out.println(exception.getMessage()); // System.out.println(exception.getStackTrace()); // } // } // }); } producer.sendOffsetsToTransaction(offsetMap, "ods_user"); // (6) 提交事务 producer.commitTransaction(); }catch (Exception e) { e.printStackTrace(); // (7) 捕获异常,如果出现异常,则取消事务 producer.abortTransaction(); } } } // 一、创建一个消费者来消费ods_user中的数据 private static KafkaConsumer<String, String> createConsumer() { // 1. 配置消费者的属性(添加对事务的支持) Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1.itcast.cn:9092"); props.setProperty("group.id", "ods_user"); // 配置事务的隔离级别 props.put("isolation.level","read_committed"); // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset props.setProperty("enable.auto.commit", "false"); // 反序列化器 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2. 构建消费者对象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); // 3. 订阅一个topic kafkaConsumer.subscribe(Arrays.asList("ods_user")); return kafkaConsumer; } // 二、编写createProducer方法,用来创建一个带有事务配置的生产者 private static KafkaProducer<String, String> createProducer() { // 1. 配置生产者带有事务配置的属性 Properties props = new Properties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); // 开启事务必须要配置事务的ID props.put("transactional.id", "dwd_user"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 构建生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); return kafkaProducer; } }
// 开启事务必须要配置事务的ID
props.put("transactional.id", "dwd_user");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 构建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
return kafkaProducer;
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。