赞
踩
本篇内容可以很好的帮助和理解Kafka stream的原理,这便于我们更好的使用它,内含一个搭建Kafka stream的实例,便于我们更好的掌握使用
Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统。Kafka Stream基于一个重要的流处理概念。如正确的区分事件时间和处理时间,窗口支持,以及简单而有效的应用程序状态管理。Kafka Streams的入口门槛很低: 你可以快速的编写和在单台机器上运行一个小规模的概念证明(proof-of-concept);而你只需要运行你的应用程序部署到多台机器上,以扩展高容量的生产负载。Kafka Stream利用kafka的并行模型来透明的处理相同的应用程序作负载平衡。
Kafka Stream 的亮点:
我们首先总结Kafka Streams的关键概念。
Stream处理拓扑
在拓扑中有两个特别的处理器:
Kafka streams提供2种方式来定义流处理器拓扑:Kafka Streams DSL提供了更常用的数据转换操作,如map和filter;低级别Processor API允许开发者定义和连接自定义的处理器,以及和状态仓库交互。
处理器拓扑仅仅是流处理代码的逻辑抽象。
注意:如果是搭建的集群的话,需要把各个broker都启动起来
启动各个zookerper、broker、producer
创建所需要的topic
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.4.5</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com</groupId>
- <artifactId>KafkaTest02</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>KafkaTest02</name>
- <description>Demo project for Spring Boot</description>
- <properties>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.4.0.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <version>2.5.0</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
3.3 创建一个测试类
这个类用于将输入到主题中的字段进行整理再输入到另一个主题中
- package com.kafkatest02;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.KTable;
- import org.apache.kafka.streams.kstream.Produced;
-
- import java.util.Arrays;
- import java.util.Locale;
- import java.util.Properties;
- import java.util.concurrent.CountDownLatch;
-
- public class StreamSample {
-
- //定义数据来源的topic和输出到的topic
- //注意这两个主题需要提前创建
- public static final String INPUT_TOPIC = "countin";
- public static final String OUTPUT_TOPIC = "countout";
-
- //创建一个配置对象
- static Properties getStreamsConfig() {
- final Properties props = new Properties();
- //新建一个Kafka消费者组
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
- //监听broker的端口地址
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-
- // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
- // Note: To re-run the demo, you need to use the offset reset tool:
- // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
- //获取topic中最早的消息
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- return props;
- }
-
- static void createWordCountStream(final StreamsBuilder builder) {
- final KStream<String, String> source = builder.stream(INPUT_TOPIC);
-
- final KTable<String, Long> counts = source
- .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
- .groupBy((key, value) -> value)
- .count();
-
- // need to override value serde to Long type
- counts.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
- }
-
- public static void main(final String[] args) {
- final Properties props = getStreamsConfig();
-
- final StreamsBuilder builder = new StreamsBuilder();
- createWordCountStream(builder);
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- final CountDownLatch latch = new CountDownLatch(1);
- // attach shutdown handler to catch control-c
- Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
- @Override
- public void run() {
- streams.close();
- latch.countDown();
- }
- });
-
- try {
- streams.start();
- latch.await();
- } catch (final Throwable e) {
- System.exit(1);
- }
- System.exit(0);
- }
- }
用于输出整理后输入到主题中的字段数据
- package com.kafkatest02;
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.TopicPartition;
- import org.springframework.kafka.annotation.KafkaListener;
-
- import java.util.Arrays;
- import java.util.Properties;
-
- public class UserLogConsumer {
- @KafkaListener(topics = {"countout"})
- public void consumer(){
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "localhost:9091");
- props.setProperty("group.id", "group01"); //注意这里是重新创建个分组,如果已存在则会报错
- props.setProperty("enable.auto.commit", "false");
- props.setProperty("auto.commit.interval.ms", "3000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.assign(Arrays.asList(new TopicPartition("countout", 0)));
- consumer.seek(new TopicPartition("countout", 0), 0);//不改变当前offset
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records){
- System.out.println(record.value()+"~~~~~~~~~~~~~~~~~~~~~~~~~~~"+record.key());
- }
- consumer.commitAsync();
- }
- }
-
- public static void main(String[] args) {
- UserLogConsumer userLogConsumer=new UserLogConsumer();
- userLogConsumer.consumer();
- }
- }
- package com.kafkatest02.kafka_Stream;
-
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
-
- import java.util.Properties;
- import java.util.Scanner;
-
- @Component
- public class UserLogProducer {
- @Autowired
- private KafkaTemplate kafkaTemplate;
- /**
- * 发送数据
- * @param topic
- */
- public void sendLog(String topic,String msg){
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<>(props);
- producer.send(new ProducerRecord<String, String>(topic, msg));
- producer.close();
- }
-
- public static void main(String[] args) {
- Scanner sc=new Scanner(System.in);
- UserLogProducer producer=new UserLogProducer();
- String s="";
- do {
- s=sc.nextLine();
- producer.sendLog("countin",s);
- }while(s!="n"||"n".equals(s));
- }
- }
在UserLogProducer的控制台中输入消息,在Kafka管理工具上进行查看效果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。