当前位置:   article > 正文

KafkaJavaApi操作:练习题(四)_java api练习

java api练习

题目

kafka集群中创建title主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为2
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为指定分区2,把数据发送到指定的分区中

消费者设置:
消费者组id为test
设置自动提交偏移量
设置自动提交偏移量的时间间隔
设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
设置key的序列化为org.apache.kafka.common.serialization. StringDeserializer
设置value的序列化为org.apache.kafka.common.serialization. StringDeserializer

模拟生产者,请写出代码向title主题中生产数据test0-test99
模拟消费者,请写出代码把title主题中的数据test0-test99消费掉 ,打印输出到控制台

代码

Producer:

public class Producer {
    public static void main(String[] args) {
        //1、配置kafka集群
        Properties props = new Properties();

        //kafka服务器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);

        //kafka数据中key  value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            ProducerRecord record = new ProducerRecord("title",2,"keys", "Test" + i);
            kafkaProducer.send(record);

        }
        kafkaProducer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

Consumer:

public class Consumer {
    public static void main(String[] args) {
        //1、添加配置
        Properties props = new Properties();
        //指定kafka服务器
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消费组
        props.put("group.id", "test");
        //以下两行代码 ---消费者自动提交offset值
        props.put("enable.auto.commit", "true");
        //设置offset提交
        props.put("auto.offset.reset","latest");
        //自动提交的周期
        props.put("auto.commit.interval.ms",  "1000");
        //设置key value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        kafkaConsumer.subscribe(Arrays.asList("title"));
        while (true){
            ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> consumerRecord : poll) {
                System.out.println("消费的数据:"+consumerRecord.value()+"所属的分区:"+consumerRecord.partition());
            }
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/1017797
推荐阅读
相关标签
  

闽ICP备14008679号