赞
踩
一般流式计算会与批量计算相比较
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界
。
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。
流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
应用场景
技术方案选型
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。
Kafka Stream
:提供了对存储于 Kafka内
的数据进行流式处理和分析的功能
Kafka Stream的特点如下:
源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
key-value
键值对KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
package com.heima.kafka.sample; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 流式处理 */ public class KafkaStreamQuickStart { public static void main(String[] args) { //kafka的配置信心 Properties prop = new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart"); //stream 构建器 StreamsBuilder streamsBuilder = new StreamsBuilder(); //流式计算 streamProcessor(streamsBuilder); //创建kafkaStream对象 KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop); //开启流式计算 kafkaStreams.start(); } /** * 流式计算 * 消息的内容:hello kafka hello itcast * @param streamsBuilder */ private static void streamProcessor(StreamsBuilder streamsBuilder) { //创建kstream对象,同时指定从那个topic中接收消息 KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input"); /** * 处理消息的value */ stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } }) //按照value进行聚合处理 .groupBy((key,value)->value) //时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //统计单词的个数 .count() //转换为kStream .toStream() .map((key,value)->{ System.out.println("key:"+key+",vlaue:"+value); return new KeyValue<>(key.key().toString(),value.toString()); }) //发送消息 .to("itcast-topic-out"); } }
itcast_topic_input
中发送多条消息itcast_topic_input
的数据,进行聚合操作后,将处理结果发送到 itcast_topic_out
itcast_topic_out
结果:
package com.heima.kafka.config; import lombok.Getter; import lombok.Setter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import java.util.HashMap; import java.util.Map; /** * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数 */ @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } }
application.yml
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
package com.heima.kafka.stream; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Duration; import java.util.Arrays; @Configuration @Slf4j public class KafkaStreamHelloListener { @Bean public KStream<String,String> kStream(StreamsBuilder streamsBuilder){ //创建kstream对象,同时指定从那个topic中接收消息 KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input"); stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } }) //根据value进行聚合分组 .groupBy((key,value)->value) //聚合计算时间间隔 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //求单词的个数 .count() .toStream() //处理后的结果转换为string字符串 .map((key,value)->{ System.out.println("key:"+key+",value:"+value); return new KeyValue<>(key.key().toString(),value.toString()); }) //发送消息 .to("itcast-topic-out"); return stream; } }
启动 springboot 项目即可自动监听
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。