当前位置:   article > 正文

kafka本地查看数据_在命令行查看本地kafka数据

在命令行查看本地kafka数据

kafka本地代码展示

package com.kafkaSimple;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.FileWriter;
import java.util.Arrays;
import java.util.Properties;

/**
 * kafka简单消费者实例
 *
 * 消费者组:当多个消费者的group.id相同的时候那么他们就是属于同一个
 * 消费者组。同一个消费者组中消费主题中位于不同分区的消息。每一个分区
 * 只能分配给消费者组中的一个消费者。
 * 如果创建主题的时候只制定了一个分区那么,将只有一个消费者消费所有消息。
 * 如果主题有多个分区,那么消息将分布到不同的分区上,那么消费者组中的
 * 所有消费者分摊所有消息。
 *
 * 当本例单个使用的时候是单个消费者。
 * 如果本实例代码作为另外一个新类运行的话,那么根本类就是同属于test组的消费者。
 *
 * 消费者组通过指定group.id来确定。
 */
public class SimpleConsumer {
    public static void main(String[] args) throws Exception{

        String topicName = "ecarx_log_analysis";
        Properties props = new Properties();
        // 制定要连接的代理
//        props.put("bootstrap.servers","pro01.cdh.ecarx.local:9092,pro02.cdh.ecarx.local:9092,pro03.cdh.ecarx.local:9092,pro04.cdh.ecarx.local:9092,pro05.cdh.ecarx.local:9092 ");
//        props.put("bootstrap.servers","10.160.25.137:9092,10.160.25.138:9092,10.160.25.139:9092");
        props.put("bootstrap.servers","10.161.31.65:9092,10.161.31.83:9092,10.161.31.234:9092,10.160.26.85:9092");
        // 将单个消费者分配给组
        props.put("group.id","test1111");
        // 如果值为true,则为偏移启动自动落实,否则不提交
        props.put("enable.auto.commit","true");
//        props.put("auto.offset.reset","earliest");
        props.put("auto.offset.reset","latest");
        // 更新偏移量的频率
        props.put("auto.commit.interval.ms","1000");
        // 超时时间
        props.put("session.timeout.ms","30000");
        // 键值序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建kafkaconsumer实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        // 指定主题
        consumer.subscribe(Arrays.asList(topicName));

        System.out.println("Subscribed to topic" + topicName);

        while(true){
            // 拉取主题中的消息
            ConsumerRecords<String,String> records = consumer.poll(1000);
            for(ConsumerRecord<String,String> record : records){
                System.out.printf("offset=%d,key=%s,value=%s\n",record.offset(),record.key(),record.value());


            }






        }



    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/621238
推荐阅读
相关标签
  

闽ICP备14008679号