赞
踩
输入new Properties().var 回车
- //创建属性
-
- Properties properties = new Properties();
-
- //连接集群
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
-
- //反序列化
-
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
-
- //指定消费者组id
-
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称
- //创建消费者
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
输入new ArrayList<String,String>().var 回车修改变量名为topics
- //创建一个数组列表变量接收topics值
- ArrayList<String> topics = new ArrayList<>();
- //指定要订阅的主题
- topics.add("customers");
- //订阅主题
- kafkaConsumer.subscribe(topics);
输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions
- //消费数据
- while (true){
- //if (flag == true) flag 标志位置
- //break;
- //}生产中退出循环的位置;
- ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
- //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
- for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
- System.out.println(consumerRecord);
- }
- }
输出台上可以看到输出的都是订阅的主题/分区的信息
- package com.ljr.kafka.replay;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Properties;
-
- public class MyConsumer {
- public static void main(String[] args) {
- //创建属性
- Properties properties = new Properties();
- //连接集群
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
- //反序列化
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
- //指定消费者组id
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
-
- //创建消费者
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
-
- /*//订阅主题
- //创建一个数组列表变量接收topics值
- ArrayList<String> topics = new ArrayList<>();
- //指定要订阅的主题
- topics.add("customers");
- //订阅主题
- kafkaConsumer.subscribe(topics);*/
-
- //订阅分区
- //创建一个数组列表变量接收主题分区值
- ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
- //指定要订阅的分区
- topicPartitions.add(new TopicPartition("customers",2));
- //订阅分区
- kafkaConsumer.assign(topicPartitions);
-
- //消费数据
- while (true){
- //if (flag == true) flag 标志位置
- //break;
- //}生产中退出循环的位置;
- ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
- //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历
- for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
- System.out.println(consumerRecord);
- }
- }
-
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。