赞
踩
Kafka一直被认为是一个强大的消息中间件,它实现了高吞吐、高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源。目前通用的一些流式处理框架如Apache Spark、Apache Flink、Apache Storm等都可以将Kafka作为可靠的数据来源。但遗憾的是,在0.l0.x版本之前,Kafka还并不具备任何数据处理的能力,但在此之后,Kafka Streams应运而生。
Kafka Streams是一个用于处理和分析数据的客户端库。它先把存储在Kafka中的数据进行处理和分析,然后将最终所得的数据结果回写到Kafka或发送到外部系统。它建立在一些非常重要的流式处理概念之上,例如适当区分事件时间和处理时间、窗口支持,以及应用程序状态的简单(高效)管理。同时,它也基于Kafka中的许多概念,例如通过划分主题进行扩展。此外,由于这个原因,它作为一个轻量级的库可以集成到应用程序中。这个应用程序可以根据需要独立运行、在应用程序服务器中运行、作为Docker容器,或者通过资源管理器(如Mesos)进行操作。
Kafka Streams直接解决了流式处理中的很多问题:
单词统计是流式处理领域中最常见的示例,这里我们同样使用它来演示一下Kafka Streams的用法。在Kafka的代码中就包含了一个单词统计的示例程序,即org.apache.kafka.streams. examples.wordcount.WordCountDemo
,这个示例中以硬编码的形式用到了两个主题:
streams-plaintext-input
和streams-wordcount-output
。为了能够使示例程序正常运行,我们需要预先准备好这两个主题:
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-plaintext-input --replication-factor 1 --partitions 1
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-wordcount-output --replication-factor 1 --partitions 1
这两个主题的详细信息如下:
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic streams-plaintext-input,streams-wordcount-output
之后就可以运行WordCountDemo这个示例:
./kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
这个示例程序将从主题streams-plaintext-input中读取消息,然后对读取的消息执行单词统计,并将结果持续写入主题streams-wordcount-output。
使用Kafka自带的console producer来生产一些输入数据供WordCount程序消费:
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
再运行console consumer脚本来验证WordCount程序的计算结果:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
输入一些单词:
统计结果:输出结果中的第一列是消息的key,这里表示被计数的单词,第二列是消息的value,这里表示该单词的最新计数。
下面我们通过WordCountDemo程序来了解一下Kafka Streams的开发方式,WordCountDemo程序如下所示:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
</dependency>
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; /** * @Author: acton_zhang * @Date: 2023/8/17 11:41 下午 * @Version 1.0 * 单词统计 */ public class WordCountDemo { public static void main(String[] args) { //构建Kafka Streams的配置 Properties props = new Properties(); //每个Kafka Streams应用程序必须要有一个application.id,这个applicationID用于协调应用实例 //也用于命名内部的本地存储和相关主题。在整个Kafka集群中,applicationId必须是唯一的 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); //bootstrap.servers配置的是Kafka集群的地址,必填 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //最大缓冲字节数 props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); //default.key.serde设置key的序列化器 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); //default.value.serde设置value的序列化器 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); //创建StreamBuilder实例 StreamsBuilder builder = new StreamsBuilder(); //创建一个KStream实例,并设定了输入主题streams-plaintext-input KStream<String, String> source = builder.stream("streams-plaintext-input"); //具体的单词统计逻辑 //KStream是一个由键值对构成的抽象记录流,每个键值对是一个独立单元,即使相同的key也不会被覆盖,类似数据库的插入操作 //KTable可以理解成一个基于表主键的日志更新流,相同key的每条记录只保存最新的一条记录,类似数据库中基于主键的更新 //无论记录流(用KStream定义),还是更新日志流(用KTable定义),都可以从一个或多个Kafka主题数据源来创建。 //一个KStream可以与另一个KStream或KTable进行Join操作,或者聚合成一个KTable。同样,一个KTable也可以转换成一个KStream。 //KStream和KTable都提供了一系列转换操作,每个转换操作都可以转化为一个KStream或KTable对象,将这些转换操作连接在一起就构成了一个处理器拓扑。 KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList( value.toLowerCase(Locale.getDefault()).split(" "))) .groupBy((key, value) -> value) .count(); //toStream().to()将单词统计结果写入输出主题streams-wordcount-output,key是String类型,value是Long类型 counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); //基于拓扑和配置来订阅一个KafkaStreams对象 final KafkaStreams streams = new KafkaStreams(builder.build(), props); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); //启动KafkaStreams引擎 try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。