赞
踩
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
Kafka Stream的特点如下:
因为Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
此处我使用的是linux系统,云服务器,Docker进行的一个安装
Docker安装zookeeper:
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
Docker安装kafka:
创建容器
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka
–env KAFKA_ADVERTISED_HOST_NAME=ip地址
–env KAFKA_ZOOKEEPER_CONNECT=ip地址
–env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip地址:9092
–env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
–env KAFKA_HEAP_OPTS=“-Xmx256M -Xms256M”
-p 9092:9092 wurstmeister/kafka:2.12-2.3.1
注:因为此处我使用的是云服务器,所以是-p 9092:9092 wurstmeister/kafka:2.12-2.3.1,如果是本地的虚拟机则改为:–net=host wurstmeister/kafka:2.12-2.3.1
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <exclusions> <exclusion> <artifactId>connect-json</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>
生产者
package com.wcl.demo; import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) throws ExecutionException, InterruptedException { //2.添加配置 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip地址:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //1.创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //3.封装发送的消息 ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic_input","kafka001","hello,world"); //4.发送消息 //4.1 同步发送 使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功 RecordMetadata recordMetadata = producer.send(record).get(); System.out.println(recordMetadata); //5.关闭消息通道,否则发不出去 producer.close(); } }
消费者
package com.wcl.demo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * 消费者 */ public class ConsumerQuickStart { public static void main(String[] args) { //2.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip地址:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //1.创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("topic_out")); //4.获取消息,while循环,一直处于接收状态 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key()); System.out.println(record.value()); } } } }
流式就是 kafka stream
package com.wcl.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaStreamQuickStart { public static void main(String[] args) { //kafka配置中心 Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"ip地址:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart"); //stream构造器 StreamsBuilder streamsBuilder = new StreamsBuilder(); //流式计算 //指定从那个topic中接收消息 KStream<String, String> stream = streamsBuilder.stream("topic_input"); // 处理接收到的消息:value stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String s) { return Arrays.asList(s.split(" ")); } }) //按照value进行聚合处理 .groupBy((key, value) -> value) //时间窗口 此处为10秒 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //统计单词的个数 .count() //转换为kStream .toStream() .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<?, ?>>() { @Override public KeyValue<?, ?> apply(Windowed<String> stringWindowed, Long aLong) { System.out.println("key:====>"+stringWindowed+", value:=====>"+aLong); System.out.println("key测试:====>"+stringWindowed.key()+", value:=====>"+aLong.toString()); return new KeyValue<>(stringWindowed.key().toString(),aLong.toString()); } }) //发送消息 .to("topic_out"); //创建kafkaStream对象 KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties); //开启流式计算 kafkaStreams.start(); } }
结果:
使用生产者在topic为:topic_input中发送多条消息
使用消费者接收topic为:topic_out
通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
package com.wcl.demo.config; import lombok.Getter; import lombok.Setter; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import java.util.HashMap; import java.util.Map; @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { // private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map<String, Object> props = new HashMap<>(); //连接信息 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); //设置一个组 props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); //设置一个应用名称 props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); //重试测试 props.put(StreamsConfig.RETRIES_CONFIG, 10); //key props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //value props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } }
package com.wcl.demo.config; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Duration; import java.util.Arrays; @Configuration @Slf4j public class KafkaStreamHelloListener { @Bean public KStream<String,String> kStream(StreamsBuilder streamsBuilder){ //创建kstream对象,同时指定从那个topic中接收消息 KStream<String, String> stream = streamsBuilder.stream("topic_input"); stream.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split(" ")); } }) //根据value进行聚合分组 .groupBy((key,value)->value) //聚合计算时间间隔 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //求单词的个数 .count() .toStream() //处理后的结果转换为string字符串 .map((key,value)->{ System.out.println("key:"+key+",value:"+value); return new KeyValue<>(key.key().toString(),value.toString()); }) //发送消息 .to("topic_out"); return stream; } }
spring:
application:
name: kafkaStream
kafka:
hosts: 122.112.249.147:9092
group: ${spring.application.name}
测试:启动服务,和消费者,在通过生产者发送消息,完成测试
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。