赞
踩
在kafka集群中创建title主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为2
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为指定分区2,把数据发送到指定的分区中
消费者设置:
消费者组id为test
设置自动提交偏移量
设置自动提交偏移量的时间间隔
设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer
设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer
模拟生产者,请写出代码向title主题中生产数据test0-test99
模拟消费者,请写出代码把title主题中的数据test0-test99消费掉 ,打印输出到控制台
Producer:
public class Producer { public static void main(String[] args) { //1、配置kafka集群 Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //消息确认机制 props.put("acks", "all"); //重试机制 props.put("retries", 0); //批量发送的大小 props.put("batch.size", 16384); //消息延迟 props.put("linger.ms", 1); //批量的缓冲区大小 props.put("buffer.memory", 33554432); //kafka数据中key value的序列化 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { ProducerRecord record = new ProducerRecord("title",2,"keys", "Test" + i); kafkaProducer.send(record); } kafkaProducer.close(); } }
Consumer:
public class Consumer { public static void main(String[] args) { //1、添加配置 Properties props = new Properties(); //指定kafka服务器 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //消费组 props.put("group.id", "test"); //以下两行代码 ---消费者自动提交offset值 props.put("enable.auto.commit", "true"); //设置offset提交 props.put("auto.offset.reset","latest"); //自动提交的周期 props.put("auto.commit.interval.ms", "1000"); //设置key value的序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); kafkaConsumer.subscribe(Arrays.asList("title")); while (true){ ConsumerRecords<String, String> poll = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> consumerRecord : poll) { System.out.println("消费的数据:"+consumerRecord.value()+"所属的分区:"+consumerRecord.partition()); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。