当前位置:   article > 正文

kafka stream简单使用教程_kstream方法

kstream方法

produce代码

public class Producer {

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers","192.168.232.144:9092");
        prop.put("acks", "all");
        prop.put("retries", 0);
        prop.put("batch.size", 16384);
        prop.put("linger.ms", 1);
        prop.put("buffer.memory", 33554432);
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

        while (true){
//            String v = new Random().nextInt(1000)+"";
            String v = "this is word count test,this is kafka test";

            producer.send(new ProducerRecord<>("my-producer",v));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

Kstream代码

public class KafStream {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.232.144:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        StreamsBuilder builder = new StreamsBuilder();
        try {
            Serde<String> string = Serdes.String();
            KStream<String, String> source = builder.stream("my-producer",Consumed.with(string, string));
            //foreach 为消费型函数  不会产生新的stream 所以 kafka订阅端不会获取数据
//            source.foreach((k,v)->{
//
//                System.out.println(k);
//                System.out.println(v + " ****************** ");
//
//            });

//            KStream<String, String> filter = source.filter((k, v) -> {
//                if(Integer.parseInt(v) >890){
//                    return true;
//                }
//                return false;
//            });

//            KStream<String, String> rdd = source.map((k, v) -> {
//                System.out.println(v);
//                KeyValue<String, String> kv = new KeyValue<>(k, "1 :" + v);
//                return kv;
//            });

            //分支 参数为 Predicate (k,v) -> boolean 类型的lambda表达式
//            KStream<String, String>[] branch = source.branch((k,v)->{
//                return Integer.parseInt(v)<100;
//            });
//
//            KStream<String, String> rdd = branch[0];

            //过滤功能  (k,v)->boolean 类型参数
//            KStream<String, String> rdd = source.filter((k, v) -> {
//                return v.startsWith("1");
//            });


            //filterNot
            //翻转过滤:将一个Stream经过Boolean函数处理保留不符合条件的结果

//            KStream<String, String> rdd = source.flatMap((k,v)->{
//                List<KeyValue<String,String>> kvs = new ArrayList<>();
//                String[] split = v.split("1");
//                for (String sv : split) {
//                    kvs.add(new KeyValue<String, String>(k,sv));
//                }
//                return kvs;
//            });

            //感觉是flatMap的一种优化写法
//            KStream<String, String> rdd = source.flatMapValues((k,v)-> Arrays.asList(v.split("1")));


            //map 既可以对key进行修改 又可以对value进行修改
//            KStream<String, String> rdd = source.map((k, v) -> new KeyValue<String, String>(k, v+"-->"));

            //mapValues 只对value进行修改
//            KStream<String, String> rdd = source.mapValues((k, v) -> v + 1);
//
//            rdd.to("stream", Produced.with(string, string));

//            KTable<String, Long> count = source.flatMapValues(v -> {
//                return Arrays.asList(v.toLowerCase().split("\\W"));
//            }).selectKey((key, word) -> word).groupByKey().count();
//
//            count.toStream().to("stream", Produced.with(Serdes.String(), Serdes.Long()));


            KTable<String, Long> count = source.flatMapValues(v -> {
                return Arrays.asList(v.toLowerCase().split("\\W"));
            }).selectKey((key, word) -> word).groupByKey().count();

            count.toStream().to("stream", Produced.with(Serdes.String(), Serdes.Long()));


            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);

            kafkaStreams.start();
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

consumer代码

public class Consumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.144:9092");
        props.put("group.id", "test");//消费者的组id
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");

        KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(props);
        //订阅主题列表topic
        consumer.subscribe(Arrays.asList("stream"));

        while (true) {
            ConsumerRecords<String, Long> records = consumer.poll(100);
            for (ConsumerRecord<String, Long> record : records){
                System.out.println();
                System.out.println();
                System.out.println();
                System.out.println();
                System.out.println(record.key() + " : " + record.value());
                System.out.println();
                System.out.println();
                System.out.println();
                System.out.println();
                System.out.println();
            }


        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/749781
推荐阅读
相关标签
  

闽ICP备14008679号