赞
踩
需要启动kafka服务和zookeeper服务以下为入口
kafka入门
zookeeper入门
参考地址: https://kafka.apache.org/23/documentation/streams/tutorial#tutorial_code_pipe
<!-- kafka 所需jar包 start --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency> <!-- kafka 所需jar包 end -->
package com.skindow.kafka;
/**
* Created by Administrator on 2019/8/12.
*/
public class Pipe {
public static void main(String[] args)
{
}
}
编写Streams应用程序的第一步是创建一个java.util.Properties映射,以指定不同的Streams执行配置值StreamsConfig。您需要设置的几个重要配置值是:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,它指定用于建立与Kafka集群的初始连接的主机/端口对的列表,并且StreamsConfig.APPLICATION_ID_CONFIG它提供Streams应用程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 假设与此应用程序对话的Kafka代理运行在端口为9092的本地机器上
此外,您可以在同一映射中自定义其他配置,例如,记录键值对的默认序列化和反序列化库:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
接下来,我们将定义Streams应用程序的计算逻辑。在Kafka Streams中,该计算逻辑被定义为topology连接的处理器节点之一。我们可以使用拓扑构建器来构建这样的拓扑
final StreamsBuilder builder = new StreamsBuilder();
然后从my-replicated-topic(就是toptic)使用此拓扑构建器命名的Kafka主题创建源流
KStream<String, String> source = builder.stream("my-replicated-topic");
现在我们得到一个KStream从源Kafka主题不断生成记录my-replicated-topic。记录被组织为String键入的键值对。我们可以用这个流做的最简单的事情是将它写入另一个Kafka 的 toptic中,比如它的名字skindow-toptic:
source.to("skindow-toptic");
也可以将上面的两行连接成一行:
builder.stream("my-replicated-topic").to("skindow-toptic");
我们可以topology通过执行以下操作来检查从此构建器创建的类型:
final Topology topology = builder.build();
并将其描述打印到标准输出为
System.out.println(topology.describe());
如果我们停在这里,编译并运行程序,它将输出以下信息:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic])
--> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001 (topic: skindow-toptic)
<-- KSTREAM-SOURCE-0000000000
如上所示,它说明构造的拓扑具有两个处理器节点,源节点KSTREAM-SOURCE-0000000000和汇聚节点KSTREAM-SINK-0000000001。 KSTREAM-SOURCE-0000000000连续读取Kafka主题的记录my-replicated-topic并将它们传送到下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001将写入每个接收到的记录以便另一个Kafka主题skindow-toptic (–>和<–箭头指示该节点的下游和上游处理器节点,即拓扑图中的“子节点”和“父节点”)。它还说明了这个简单的拓扑没有与之关联的全局状态存储
java.util.Properties实例中指定的配置映射和的Topology对象。
final KafkaStreams streams = new KafkaStreams(topology, props);
通过调用它的start()函数,我们可以触发该客户端的执行。close()在此客户端上调用之前,执行不会停止。例如,我们可以添加带倒计时锁存器的关闭钩子来捕获用户中断并在终止此程序时关闭客户端:
final CountDownLatch latch = new CountDownLatch(1); //附加关闭处理程序来捕获control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0);
完整的代码如下所示:
package com.skindow.kafka; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class Pipe { public static void main(String[] args) throws Exception { Properties props = new Properties(); //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); //用于建立与Kafka集群的初始连接的主机/端口对的列表 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //记录键值对的默认序列化和反序列化库 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一 final StreamsBuilder builder = new StreamsBuilder(); //将"my-replicated-topic写入另一个Kafka toptic(skindow-toptic) builder.stream("my-replicated-topic").to("skindow-toptic"); //构建Topology对象 final Topology topology = builder.build(); //构建 kafka流 API实例 final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // 附加关闭处理程序来捕获control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1);//是非正常退出,就是说无论程序正在执行与否,都退出 } System.exit(0);//正常退出,程序正常执行结束退出 } }
接着上面讲,复制pipe类并将其改名为LineSplit
由于每个源流的记录都是一个String键入的键值对,让我们将值字符串视为文本行,并将其拆分为带有FlatMapValues运算符的单词:
KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split("\\W+"));
}
});
运算符将source流作为其输入,并生成一个新流words ,该流通过按顺序处理源流中的每个记录而命名,并将其值字符串分解为单词列表,并将每个单词作为新记录生成到输出words流。这是一个无状态运算符,无需跟踪任何先前接收的记录或处理结果。
如果我们现在将此增强拓扑描述为System.out.println(topology.describe()),我们将得到以下内容:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
如上所述,新的处理器节点KSTREAM-FLATMAPVALUES-0000000001被注入到原始源节点和宿节点之间的拓扑中。它将源节点作为其父节点,将sink节点作为其子节点。换句话说,源节点提取的每个记录将首先遍历到新添加的KSTREAM-FLATMAPVALUES-0000000001节点以进行处理,结果将生成一个或多个新记录。它们将继续遍历到汇聚节点以写回Kafka。请注意,此处理器节点是“无状态”的,因为它与任何存储(即(stores: []))无关。
完整代码如下
package com.skindow.kafka; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.ValueMapper; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; @Slf4j public class LineSplit { public static void main(String[] args) throws Exception { Properties props = new Properties(); //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); //用于建立与Kafka集群的初始连接的主机/端口对的列表 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //记录键值对的默认序列化和反序列化库 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一 final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("my-replicated-topic"); source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split("\\W+")); } }); //构建Topology对象 final Topology topology = builder.build(); log.info(topology.describe().toString()); //构建 kafka流 API实例 final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // 附加关闭处理程序来捕获control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1);//是非正常退出,就是说无论程序正在执行与否,都退出 } System.exit(0);//正常退出,程序正常执行结束退出 } }
复制LineSplit类并将其改名为Wordcount
为了计算单词,我们可以先修改flatMapValues运算符,将它们全部视为小写(假设使用了lambda表达式):
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
});
为了进行计数聚合,我们必须首先指定我们想要使用groupBy运算符来键入值字符串上的流,即较低的套接字。此运算符生成新的分组流,然后可由count运算符聚合,该运算符在每个分组键上生成运行计数:
KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) .groupBy(new KeyValueMapper<String, String, String>() { @Override public String apply(String key, String value) { return value; } }) //将结果实体化到名为“counts-store”的KeyValueStore中。 //物化存储的类型总是,因为这是最内部存储的格式。 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
请注意,count运算符有一个Materialized参数,指定运行计数应存储在名为的状态存储中counts-store
我们还可以将countsKTable的更改日志流写回另一个Kafka主题skindow-toptic。因为结果是更改日志流,所以skindow-toptic应该配置输出主题并启用日志压缩。请注意,这一次的值类型不再String,而是Long,因此默认的序列化类不再适用于将其写入Kafka。我们需要为Long类型提供重写的序列化方法,否则将抛出运行时异常:
counts.toStream().to("skindow-toptic", Produced.with(Serdes.String(), Serdes.Long()));
注意,为了从主题skindow-toptic读取更改日志流,需要将反序列化值设置为org.apache.kafka.common. serialize . longdeserializer。这方面的详细信息可以在Play with a Streams应用程序部分中找到。假设可以使用JDK 8中的lambda表达式,则上述代码可以简化为:
KStream<String, String> source = builder.stream("my-replicated-topic");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("skindow-toptic", Produced.with(Serdes.String(), Serdes.Long()));
如果我们再次将此增强拓扑描述为System.out.println(topology.describe()),我们将得到以下内容:
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic]) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FILTER-0000000005 (stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000004 (topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000006 (topics: [counts-store-repartition]) --> KSTREAM-AGGREGATE-0000000003 Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006 Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003 Sink: KSTREAM-SINK-0000000008 (topic: skindow-toptic) <-- KTABLE-TOSTREAM-0000000007
如上所述,拓扑现在包含两个断开连接的子拓扑。第一个子拓扑的汇聚节点KSTREAM-SINK-0000000004将写入重新分区主题Counts-repartition,该主题将由第二个子拓扑的源节点读取KSTREAM-SOURCE-0000000006。重新分区主题用于通过其聚合键“混洗”源流,在这种情况下是值字符串。另外,在第一子拓扑内部,KSTREAM-FILTER-0000000005在分组KSTREAM-KEY-SELECT-0000000002节点和汇聚节点之间注入无状态节点,以过滤掉其聚合密钥为空的任何中间记录。
在第二子拓扑中,聚合节点KSTREAM-AGGREGATE-0000000003与名为的状态存储相关联Counts(该名称由count运营商中的用户指定)。在从其即将到来的流源节点接收到每个记录时,聚合处理器将首先查询其关联的Counts存储以获得该密钥的当前计数,增加1,然后将新计数写回到存储。密钥的每个更新计数也将通过管道传输到KTABLE-TOSTREAM-0000000007节点的下游,该节点将此更新流解释为记录流,然后再进一步管道到汇聚节点KSTREAM-SINK-0000000008以写回Kafka。
完整的代码看起来像这样(假设使用了lambda表达式):
package com.skindow.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; @Slf4j public class Wordcount { public static void main(String[] args) throws Exception { Properties props = new Properties(); //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); //用于建立与Kafka集群的初始连接的主机/端口对的列表 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //记录键值对的默认序列化和反序列化库 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一 final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("my-replicated-topic"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("skindow-toptic", Produced.with(Serdes.String(), Serdes.Long())); //构建Topology对象 final Topology topology = builder.build(); log.info(topology.describe().toString()); //构建 kafka流 API实例 final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // 附加关闭处理程序来捕获control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1);//是非正常退出,就是说无论程序正在执行与否,都退出 } System.exit(0);//正常退出,程序正常执行结束退出 } }
项目地址:
https://github.com/skindowSyc/firstProject.git 对应tag kafkaStreamDemo
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。