赞
踩
学习Kafka Stream,我们需要先了解什么是Kafka Stream ,为什么要使用Kafka Stream,以及我们怎么使用Kafka Stream。
在此推荐Kafka Stream学习博客:https://www.cnblogs.com/warehouse/p/9521382.html
Kafka Streams是一套客户端类库,它可以对存储在Kafka内的数据进行流式处理和分析。
开源流式处理系统有:Spark Streaming和Apache Storm,它们能与SQL处理集成等优点,功能强大,那为何还需要Kafka Stream呢?
1、使用方便。Spark和Storm都是流式处理框架,而Kafka Stream是基于Kafka的流式处理类库。开发者很难了解框架的具体运行方式,调试成本高,使用受限。而类库直接提供具体的类给开发者使用,整个应用的运行方式主要由开发者控制,方便使用和调试。
2、使用成本低。就流式处理系统而言,基本都支持Kafka作为数据源。Kafka基本上是主流的流式处理系统的标准数据源。大部分流式系统中都部署了Kafka,包括Spark和Storm,此时使用Kafka Stream的成本非常低。
3、省资源。使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,框架本身也占资源。
4、Kafka本身也有优点。由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并发度。
KTable和KSteam是Kafka中非常重要的概念,在此分析一下二者区别。
Kafka支持三种时间:
流式数据在时间上无界的,但是聚合操作只能作用在特定(有界)的数据集,咋整???这时候就有了窗口的概念,在时间无界的数据流中定义一个边界来用于计算。Kafka支持的窗口如下:
添加pom依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class MyStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //输入key的类型 prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //输入value的类型 //创建流构造器 StreamsBuilder builder = new StreamsBuilder(); //构建好builder,将myStreamIn topic中的数据写入到myStreamOut topic中 builder.stream("myStreamIn").to("myStreamOut"); final Topology topo=builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.exit(0); } }
在这里说明一下prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
,我们将TopicA的数据写入到TopicB中,就相当于这个流在消费TopicA的数据,我们知道一个Topic中,一个消费组里只能有一个消费者去消费它。假设我们将TopicA的数据写入到TopicB的过程中,报错了(比如虚拟机内存满了),这时数据只写了一半,我们清理完内存后,想继续写剩下的数据,再次运行我们发现报错,写不了了。这时候我们需要修改这个参数将mystream改成别的名字,因为同一个消费者组里只能有一个消费者去消费它。
开启zookeeper和Kafka
# 开启zookeeper
zkServer.sh start
# 后台启动Kafka
kafka-server-start.sh -daemon /opt/kafka/config/server.properties
创建topic myStreamIn
kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamIn --partitions 1 --replication-factor 1
创建topic myStreamOut
kafka-topics.sh --create --zookeeper 192.168.136.20:2181 --topic myStreamOut --partitions 1 --replication-factor 1
生产消息写入到myStreamIn
kafka-console-producer.sh --topic myStreamIn --broker-list 192.168.136.20:9092
消费myStreamOut里的数据
kafka-console-consumer.sh --topic myStreamOut --bootstrap-server 192.168.136.20:9092 --from-beginning
运行示例代码并在生产者端输入数据,能在消费端看到数据,表明Kafka Stream写入成功。
工作中不可能像案例一一样将一个Topic的数据原封不动存入另一个Topic,一般是要经过处理,这就需要在流中加上逻辑。
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCountStream { public static void main(String[] args) { Properties prop =new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcountstream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092"); //zookeeper的地址 prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,2000); //提交时间设置为2秒 //prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,""earliest ); //earliest latest none 默认latest //prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //true(自动提交) false(手动提交) prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //创建流构造器 //hello world //hello java StreamsBuilder builder = new StreamsBuilder(); KTable<String, Long> count = builder.stream("wordcount-input") //从kafka中一条一条取数据 .flatMapValues( //返回压扁后的数据 (value) -> { //对数据按空格进行切割,返回List集合 String[] split = value.toString().split(" "); List
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。