赞
踩
produce代码
public class Producer { public static void main(String[] args) { Properties prop = new Properties(); prop.put("bootstrap.servers","192.168.232.144:9092"); prop.put("acks", "all"); prop.put("retries", 0); prop.put("batch.size", 16384); prop.put("linger.ms", 1); prop.put("buffer.memory", 33554432); prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(prop); while (true){ // String v = new Random().nextInt(1000)+""; String v = "this is word count test,this is kafka test"; producer.send(new ProducerRecord<>("my-producer",v)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Kstream代码
public class KafStream { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.232.144:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); StreamsBuilder builder = new StreamsBuilder(); try { Serde<String> string = Serdes.String(); KStream<String, String> source = builder.stream("my-producer",Consumed.with(string, string)); //foreach 为消费型函数 不会产生新的stream 所以 kafka订阅端不会获取数据 // source.foreach((k,v)->{ // // System.out.println(k); // System.out.println(v + " ****************** "); // // }); // KStream<String, String> filter = source.filter((k, v) -> { // if(Integer.parseInt(v) >890){ // return true; // } // return false; // }); // KStream<String, String> rdd = source.map((k, v) -> { // System.out.println(v); // KeyValue<String, String> kv = new KeyValue<>(k, "1 :" + v); // return kv; // }); //分支 参数为 Predicate (k,v) -> boolean 类型的lambda表达式 // KStream<String, String>[] branch = source.branch((k,v)->{ // return Integer.parseInt(v)<100; // }); // // KStream<String, String> rdd = branch[0]; //过滤功能 (k,v)->boolean 类型参数 // KStream<String, String> rdd = source.filter((k, v) -> { // return v.startsWith("1"); // }); //filterNot //翻转过滤:将一个Stream经过Boolean函数处理保留不符合条件的结果 // KStream<String, String> rdd = source.flatMap((k,v)->{ // List<KeyValue<String,String>> kvs = new ArrayList<>(); // String[] split = v.split("1"); // for (String sv : split) { // kvs.add(new KeyValue<String, String>(k,sv)); // } // return kvs; // }); //感觉是flatMap的一种优化写法 // KStream<String, String> rdd = source.flatMapValues((k,v)-> Arrays.asList(v.split("1"))); //map 既可以对key进行修改 又可以对value进行修改 // KStream<String, String> rdd = source.map((k, v) -> new KeyValue<String, String>(k, v+"-->")); //mapValues 只对value进行修改 // KStream<String, String> rdd = source.mapValues((k, v) -> v + 1); // // rdd.to("stream", Produced.with(string, string)); // KTable<String, Long> count = source.flatMapValues(v -> { // return Arrays.asList(v.toLowerCase().split("\\W")); // }).selectKey((key, word) -> word).groupByKey().count(); // // count.toStream().to("stream", Produced.with(Serdes.String(), Serdes.Long())); KTable<String, Long> count = source.flatMapValues(v -> { return Arrays.asList(v.toLowerCase().split("\\W")); }).selectKey((key, word) -> word).groupByKey().count(); count.toStream().to("stream", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props); kafkaStreams.start(); }catch (Exception e){ e.printStackTrace(); } } }
consumer代码
public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.232.144:9092"); props.put("group.id", "test");//消费者的组id props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(props); //订阅主题列表topic consumer.subscribe(Arrays.asList("stream")); while (true) { ConsumerRecords<String, Long> records = consumer.poll(100); for (ConsumerRecord<String, Long> record : records){ System.out.println(); System.out.println(); System.out.println(); System.out.println(); System.out.println(record.key() + " : " + record.value()); System.out.println(); System.out.println(); System.out.println(); System.out.println(); System.out.println(); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。