当前位置:   article > 正文

Java实现kafka生产者和消费者_kafka消费者输出生产者的ip

kafka消费者输出生产者的ip

需要的依赖:

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

生产者:

Properties properties = new Properties();
        //Kafka代理的地址,生产者建立连接broker的地址,如果是集群ip间用逗号隔开
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        //除了all还可选0,或1。all表示复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
        properties.put("acks", "all");
        //用于序列化秘钥KEY对象
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //用于序列化值对象的类。下面的示例中,我们的值是String,因此我们可以使用StringSerializer类来序列化键。如果值是其他对象,则创建自定义序列化程序类
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");

        //创建生产者实例
        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);

        //key,可以不传。key的作用是可以将同一topic的消息放到同一分区
        String topic = "test";
        //String key = "userName";
        //value可以传json字符串,消费的时候转成json解析
        String value = "cccc";
        ProducerRecord record = new ProducerRecord<String, String>(topic,  value);
        //发送记录
        producer.send(record);
        producer.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

消费者:

		Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("group.id", "jd-group1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //设置每次拉取的数量,不设置默认是500
        //properties.put("max.poll.records",100);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true) {
            Duration timeout = Duration.ofMillis(500);
            ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout);

            records.forEach(record -> {
                System.out.println("Record Key: " + record.key());
                System.out.println("Record value: " + record.value());
                System.out.println("Record partition: " + record.partition());
                System.out.println("Record offset: " + record.offset());
                System.out.println("------------------------------");
            });
            //异步提交偏移量到服务器broker
            kafkaConsumer.commitAsync();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

消费过的再消费:

		Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");//xxx是服务器集群的ip
        properties.put("group.id", "jd-group1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("test"));

        Duration timeout1 = Duration.ofMillis(0);
        //timeout1为0则返回所有的分区信息,然后重新设置所有分区的偏移量
        ConsumerRecords<String, String> records = kafkaConsumer.poll(timeout1);
        long offset = 0;
        //TopicPartition 分区信息
        for (TopicPartition partition : kafkaConsumer.assignment()) {
            kafkaConsumer.seek(partition, offset);
        }

        //再次消费
        Duration timeout2 = Duration.ofMillis(500);
        ConsumerRecords<String, String> records_2 = kafkaConsumer.poll(timeout2);
        while (true) {
            records_2.forEach(record -> {
                System.out.println("Record Key: " + record.key());
                System.out.println("Record value: " + record.value());
                System.out.println("Record partition: " + record.partition());
                System.out.println("Record offset: " + record.offset());
                System.out.println("------------------------------");
            });
            //异步提交偏移量到服务器broker
            kafkaConsumer.commitAsync();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

参考:
kafka 生产者使用详解
Java实现Kafka的生产者、消费者
Kafka consumer poll(long)与poll(Duration)的区别
Kafka之重新消费数据
Kafka auto.offset.reset值详解
kafka的auto.offset.reset详解
kafka auto.offset.reset latest earliest 详解

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/575310
推荐阅读
相关标签
  

闽ICP备14008679号