赞
踩
在kafka集群中创建student主题 副本为2个,分区为3个 生产者设置: 设置key的序列化为 org.apache.kafka.common.serialization. StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 其他都是默认设置 消费者设置: 消费者组id为test 设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer 设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer 其他都是默认设置 模拟生产者,请写出代码向student主题中生产数据0-99 模拟消费者,请写出代码把student主题中的数据0-99消费掉,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_01 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("student", i+""); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer_01 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("student")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } } } }
在kafka集群中创建teacher主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为2 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为默认轮询方式 消费者设置: 消费者组id为test 设置自动提交偏移量 设置自动提交偏移量的时间间隔 设置 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 auto.offset.reset //earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 //none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer 设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer 模拟生产者,请写出代码向teacher主题中生产数据bigdata0-bigdata99 模拟消费者,请写出代码把teacher主题中的数据bigdata0-bigdata99消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_02 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("teacher", "bigdata" + i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer_02 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset","earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("teacher")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } } } }
在kafka集群中创建title主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为指定数据key为title,分发到同一个分区中 消费者设置: 消费者组id为test 设置自动提交偏移量 设置自动提交偏移量的时间间隔 设置 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer 设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer 模拟生产者,请写出代码向title主题中生产数据kafka0-kafka99 模拟消费者,请写出代码把title主题中的数据kafka0-kafka99消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_03 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("title","title" ,"kafka" + i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer_03 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset","latest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("title")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } } } }
在kafka集群中创建title主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为2 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为指定分区2,把数据发送到指定的分区中 消费者设置: 消费者组id为test 设置自动提交偏移量 设置自动提交偏移量的时间间隔 设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer 设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer 模拟生产者,请写出代码向title主题中生产数据test0-test99 模拟消费者,请写出代码把title主题中的数据test0-test99消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_04 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("title","test" + i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer_04 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset","none "); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("title")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } } } }
在kafka集群中创建order主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为自定义,请把生产的数据100以内的数据分发到分区0中,100-200以内的数据分发到分区1中,200-300内的数据分发到分区2中 消费者设置: 消费者组id为test 设置自动提交偏移量 设置自动提交偏移量的时间间隔 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer 设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer 模拟生产者,请写出代码向title主题中生产数据0-299 模拟消费者,请写出代码把title主题中的数据0-299消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_05 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("partitioner.class","HomeWork.ProducerPartition"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 300; i++) { ProducerRecord record = new ProducerRecord("order", i + ""); producer.send(record); } producer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer05 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("order")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println("数据:"+consumerRecord.value()+" 分区:"+consumerRecord.partition()); } } } } 自定义分区代码: import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class Partition implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { int a = Integer.parseInt((String) o1); if (a<=100){ return 0; } if (a>100&&a<=200){ return 1; } return 2; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
在kafka集群中创建18BD-10主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为2 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为指定分区2,把数据发送到指定的分区中 消费者设置: 消费者组id为test 设置自动提交偏移量 设置自动提交偏移量的时间间隔 设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer 消费指定分区2中的数据 模拟生产者,请写出代码向18BD-10主题中生产数据test0-test99 模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_06 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("18BD-10",2,"test", "test"+i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; public class Consumer_06 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","latest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition topicPartition = new TopicPartition("18BD-10", 2); consumer.assign(Arrays.asList(topicPartition)); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(" 自定义分区:"+consumerRecord.partition()+consumerRecord.value()); } } } }
在kafka集群中创建18BD-20主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为轮询方式发送到每个分区中 手动提交每条数据 消费者设置: 消费者组id为test 设置手动提交偏移量 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer 模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99 模拟消费者,请写出代码把18BD-20主题中的2号分区的数据消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_07 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("18BD-20", "test"+i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; public class Consumer_07 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition topicPartition = new TopicPartition("18BD-20", 2); consumer.assign(Arrays.asList(topicPartition)); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(" 分区:"+consumerRecord.partition()+" "+consumerRecord.value()); } consumer.commitAsync(); } } }
在kafka集群中创建18BD-30主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为轮询方式发送到每个分区中 消费者设置: 消费者组id为test 设置手动提交偏移量 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer 依次消费完每个分区之后手动提交offset 模拟生产者,请写出代码向18BD-30主题中生产数据test0-test99 模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_08 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("18BD-30", "test"+i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; public class Consumer_08 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition topicPartition = new TopicPartition("18BD-30", 2); consumer.assign(Arrays.asList(topicPartition)); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(" 分区:"+consumerRecord.partition()+" "+consumerRecord.value()); } consumer.commitAsync(); } } }
在kafka集群中创建18BD-40主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为轮询方式发送到每个分区中 消费者设置: 消费者组id为test 设置自动提交偏移量 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer 消费指定分区0和分区2中的数据 模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99 模拟消费者,请写出代码把18BD-40主题中的0和2号分区的数据消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_09 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("18BD-40", "test"+i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; public class Consumer_09 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset","earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition topicPartition = new TopicPartition("18BD-40", 0); TopicPartition topicPartition1 = new TopicPartition("18BD-40", 2); consumer.assign(Arrays.asList(topicPartition,topicPartition1)); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(" 分区:"+consumerRecord.partition()+" "+consumerRecord.value()); } consumer.commitAsync(); } } }
在kafka集群中创建18BD-50主题 副本为2个,分区为3个 生产者设置: 消息确认机制 为all 重试次数 为1 批量处理消息字节数 为16384 设置缓冲区大小 为 33554432 设置每条数据生产延迟1ms 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer 数据分发策略为轮询方式发送到每个分区中 消费者设置: 消费者组id为test 设置自动提交偏移量 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer 消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始,消费2分区的数据offerset值从10开始 模拟生产者,请写出代码向18BD-50主题中生产数据test0-test99 模拟消费者,请写出代码把18BD-50主题中的0和2号分区的数据消费掉 ,打印输出到控制台 生产者答案代码: import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class Producer_10 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String> (props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("18BD-50", "test"+i); kafkaProducer.send(record); } kafkaProducer.close(); } } 消费者答案代码: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; public class Consumer_10 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition topicPartition0 = new TopicPartition("18BD-50", 0); TopicPartition topicPartition2 = new TopicPartition("18BD-50", 2); consumer.assign(Arrays.asList(topicPartition0,topicPartition2)); consumer.seek(topicPartition0,0); consumer.seek(topicPartition2,10); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(" offset:"+consumerRecord.offset()+" 分区: "+consumerRecord.partition()+" "+consumerRecord.value()); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。