赞
踩
本文展示了kafka Stream Wordcount 例子的两种写法
kafka Stream 版本0.10.1.0
此例子 使用了高层流DSL创建kStream 多实例(instances1,instances2为两个实例)并行计算处理了从topic1 中读取的数据。
package com.us.kafka.Stream;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import com.us.kafka.KafkaConfig;
import java.util.Properties;
import static org.apache.kafka.common.serialization.Serdes.String;
/**
* Created by yangyibo on 16/12/12. my learn demo
* 高层流DSL
*/
public class MyKstream {
public static void main(String[] args) {
//tow instances
KStreamBuilder instances1 = new KStreamBuilder();
// filterWordCount(builder);
lambdaFilter(instances1);
KStreamBuilder instances2 = new KStreamBuilder();
lambdaFilter(instances2);
KafkaStreams ks = new KafkaStreams(instances2, init());
ks.start();
// Runtime.getRuntime().addShutdownHook(new Thread(ks::close));
}
public static Properties init() {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "MyKstream");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.metadata_broker_list);
properties.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, KafkaConfig.zookeeper);
properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, String().getClass().getName());
properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, String().getClass().getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return properties;
}
private static void filterWordCount(KStreamBuilder builder) {
KStream<String, String> source = builder.stream("topic1");
KTable<String, Long> count = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
}).filter(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
if (value.contains("abel")) {
return true;
}
return false;
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(value + "--read", value);
}
}).groupByKey().count("us");
count.print();
// count.to("topic2");
}
private static void lambdaFilter(KStreamBuilder builder) {
KStream<String, String> textLines = builder.stream("topic1");
textLines
.flatMapValues(value -> Arrays.asList(value.split(" ")))
.map((key, word) -> new KeyValue<>(word, word))
.filter((k, v) -> (!k.contains("message")))
// .through("RekeyedIntermediateTopic")
.groupByKey().count("us").print();
System.out.println("-----------2-----------");
}
}

pom.xml 文件如下
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<!-- kafka Stream -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.1.0</version>
</dependency>
</dependencies>

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。