赞
踩
Kafka Stream定位是轻量级的流计算类库,所有功能放在Lib中实现,实现的程序不依赖单独执行环境。
一、测试环境搭建
1、下载kafka_2.11-1.1.1.tgz包解压后修改config目录下server.properties文件,主要是修改zk地址为本集群的例如:zookeeper.connect=xxx.xxx.xxx.xxx:2181
2、启动kafka,命令如下:bin
/zookeeper-server-start
.sh config
/zookeeper
.properties
3、创建topic,命令如下:bin
/kafka-topics
.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
test
4、查看topic是否创建成功,命令如下bin
/kafka-topics
.sh --list --zookeeper localhost:2181
5、启动命令行生产者生产消息,命令如下:bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic
test
This is a message
This is another message
6、启动命令行消费者消费消息,命令如下:bin/kafka-console-consumer
.sh --bootstrap-server localhost:9092 --topic
test
--from-beginning
This is a message
This is another message
二、正式开始demo过程,直接贴例子:
public class WordCountProcessorDemo { private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { @Override public Processor<String, String> get() { return new Processor<String, String>() { private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context) { this.context = context; //这里1000在PunctuationType.STREAM_TIME没有作用,只有在PunctuationType.WALL_CLOCK_TIME才有作用 this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() { @Override public void punctuate(long timestamp) { try (KeyValueIterator<String, Integer> iter = kvStore.all()) { while (iter.hasNext()) { KeyValue<String, Integer> entry = iter.next(); //将统计结果sink到下一个topic中 context.forward(entry.key,entry.key+":"+ entry.value.toString()); } } } }); this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); } @Override public void process(String dummy, String line) { String[] words = line.toLowerCase(Locale.getDefault()).split(" "); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } context.commit(); } @Override @Deprecated public void punctuate(long timestamp) {} @Override public void close() {} }; } } public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Topology builder = new Topology(); builder.addSource("Source", "test"); builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); builder.addStateStore(Stores.keyValueStoreBuilder( //这里存储有内存、持久化形式 Stores.inMemoryKeyValueStore("Counts"), Serdes.String(), Serdes.Integer()), "Process"); builder.addProcessor("MyProcessorA", MyProcessorA::new, "Process"); builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); builder.addSink("SINK1", "topicA", "MyProcessorA"); final KafkaStreams streams = new KafkaStreams(builder, props); //这个门栓厉害了,帮助我们释放资源 final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); //释放完资源后释放门栓,然程序正常退出 latch.countDown(); } }); try { streams.start(); //程序启动时阻塞这这里,等接收到control -c 信号后执行jvm钩子,释放门栓然后执行退出 latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
public class MyProcessorA implements Processor<String, String> { private ProcessorContext context; @Override public void init(ProcessorContext processorContext) { this.context = processorContext; this.context.schedule(1000); } /** * @param key 消息的key,一般为null * @param value 消息的value */ @Override public void process(String key, String value) { String line = key + " ---- MyProcessor A ---- " + value; System.out.println(line); // 将处理完成的数据转发到downstream processor,比如当前是processor1处理器,通过forward流向到processor2处理器 context.forward(key, line); } @Override public void punctuate(long timestamp) { } @Override public void close() { } }
一下是我测试的结果截图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。