当前位置:   article > 正文

kafka个人基础教程(三)java demo kafkaStream篇_kafka streams demo

kafka streams demo

需要启动kafka服务和zookeeper服务以下为入口
kafka入门
zookeeper入门
参考地址: https://kafka.apache.org/23/documentation/streams/tutorial#tutorial_code_pipe

  1. 导入Maven包
			<!-- kafka 所需jar包 start -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.12</artifactId>
			<version>2.3.0</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
			<version>2.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.3.0</version>
		</dependency>
		<!-- kafka 所需jar包 end -->
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 编写第一个Streams应用程序:Pipe
package com.skindow.kafka;

/**
 * Created by Administrator on 2019/8/12.
 */
public class Pipe {
    public static void main(String[] args)
    {
        
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

编写第一个Streams应用程序:Pipe

编写Streams应用程序的第一步是创建一个java.util.Properties映射,以指定不同的Streams执行配置值StreamsConfig。您需要设置的几个重要配置值是:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,它指定用于建立与Kafka集群的初始连接的主机/端口对的列表,并且StreamsConfig.APPLICATION_ID_CONFIG它提供Streams应用程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // 假设与此应用程序对话的Kafka代理运行在端口为9092的本地机器上
  • 1
  • 2
  • 3

此外,您可以在同一映射中自定义其他配置,例如,记录键值对的默认序列化和反序列化库:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  • 1
  • 2

接下来,我们将定义Streams应用程序的计算逻辑。在Kafka Streams中,该计算逻辑被定义为topology连接的处理器节点之一。我们可以使用拓扑构建器来构建这样的拓扑

final StreamsBuilder builder = new StreamsBuilder();
  • 1

然后从my-replicated-topic(就是toptic)使用此拓扑构建器命名的Kafka主题创建源流
在这里插入图片描述

KStream<String, String> source = builder.stream("my-replicated-topic");
  • 1

现在我们得到一个KStream从源Kafka主题不断生成记录my-replicated-topic。记录被组织为String键入的键值对。我们可以用这个流做的最简单的事情是将它写入另一个Kafka 的 toptic中,比如它的名字skindow-toptic:

source.to("skindow-toptic");
  • 1

也可以将上面的两行连接成一行:

builder.stream("my-replicated-topic").to("skindow-toptic");
  • 1

我们可以topology通过执行以下操作来检查从此构建器创建的类型:

final Topology topology = builder.build();
  • 1

并将其描述打印到标准输出为

System.out.println(topology.describe());
  • 1

如果我们停在这里,编译并运行程序,它将输出以下信息:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: skindow-toptic)
      <-- KSTREAM-SOURCE-0000000000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如上所示,它说明构造的拓扑具有两个处理器节点,源节点KSTREAM-SOURCE-0000000000和汇聚节点KSTREAM-SINK-0000000001。 KSTREAM-SOURCE-0000000000连续读取Kafka主题的记录my-replicated-topic并将它们传送到下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001将写入每个接收到的记录以便另一个Kafka主题skindow-toptic (–>和<–箭头指示该节点的下游和上游处理器节点,即拓扑图中的“子节点”和“父节点”)。它还说明了这个简单的拓扑没有与之关联的全局状态存储

java.util.Properties实例中指定的配置映射和的Topology对象。

final KafkaStreams streams = new KafkaStreams(topology, props);
  • 1

通过调用它的start()函数,我们可以触发该客户端的执行。close()在此客户端上调用之前,执行不会停止。例如,我们可以添加带倒计时锁存器的关闭钩子来捕获用户中断并在终止此程序时关闭客户端:

final CountDownLatch latch = new CountDownLatch(1);
 
//附加关闭处理程序来捕获control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});
 
try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

完整的代码如下所示:

package com.skindow.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //用于建立与Kafka集群的初始连接的主机/端口对的列表
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //记录键值对的默认序列化和反序列化库
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一
        final StreamsBuilder builder = new StreamsBuilder();
        //将"my-replicated-topic写入另一个Kafka toptic(skindow-toptic)
        builder.stream("my-replicated-topic").to("skindow-toptic");
        //构建Topology对象
        final Topology topology = builder.build();
        //构建 kafka流 API实例
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // 附加关闭处理程序来捕获control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);//是非正常退出,就是说无论程序正在执行与否,都退出
        }
        System.exit(0);//正常退出,程序正常执行结束退出
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

编写第二个Streams应用程序:Line Split

接着上面讲,复制pipe类并将其改名为LineSplit
在这里插入图片描述
由于每个源流的记录都是一个String键入的键值对,让我们将值字符串视为文本行,并将其拆分为带有FlatMapValues运算符的单词:

KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split("\\W+"));
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

运算符将source流作为其输入,并生成一个新流words ,该流通过按顺序处理源流中的每个记录而命名,并将其值字符串分解为单词列表,并将每个单词作为新记录生成到输出words流。这是一个无状态运算符,无需跟踪任何先前接收的记录或处理结果。
如果我们现在将此增强拓扑描述为System.out.println(topology.describe()),我们将得到以下内容:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> none
      <-- KSTREAM-SOURCE-0000000000

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

如上所述,新的处理器节点KSTREAM-FLATMAPVALUES-0000000001被注入到原始源节点和宿节点之间的拓扑中。它将源节点作为其父节点,将sink节点作为其子节点。换句话说,源节点提取的每个记录将首先遍历到新添加的KSTREAM-FLATMAPVALUES-0000000001节点以进行处理,结果将生成一个或多个新记录。它们将继续遍历到汇聚节点以写回Kafka。请注意,此处理器节点是“无状态”的,因为它与任何存储(即(stores: []))无关。
完整代码如下

package com.skindow.kafka;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

@Slf4j
public class LineSplit {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //用于建立与Kafka集群的初始连接的主机/端口对的列表
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //记录键值对的默认序列化和反序列化库
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("my-replicated-topic");
        source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split("\\W+"));
            }
        });
        //构建Topology对象
        final Topology topology = builder.build();
        log.info(topology.describe().toString());
        //构建 kafka流 API实例
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // 附加关闭处理程序来捕获control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);//是非正常退出,就是说无论程序正在执行与否,都退出
        }
        System.exit(0);//正常退出,程序正常执行结束退出
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

编写第三个Streams应用程序:Wordcount

复制LineSplit类并将其改名为Wordcount
为了计算单词,我们可以先修改flatMapValues运算符,将它们全部视为小写(假设使用了lambda表达式):

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

为了进行计数聚合,我们必须首先指定我们想要使用groupBy运算符来键入值字符串上的流,即较低的套接字。此运算符生成新的分组流,然后可由count运算符聚合,该运算符在每个分组键上生成运行计数:

KTable<String, Long> counts =
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        })
      .groupBy(new KeyValueMapper<String, String, String>() {
           @Override
           public String apply(String key, String value) {
               return value;
           }
        })
    //将结果实体化到名为“counts-store”的KeyValueStore中。
//物化存储的类型总是,因为这是最内部存储的格式。
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

请注意,count运算符有一个Materialized参数,指定运行计数应存储在名为的状态存储中counts-store
我们还可以将countsKTable的更改日志流写回另一个Kafka主题skindow-toptic。因为结果是更改日志流,所以skindow-toptic应该配置输出主题并启用日志压缩。请注意,这一次的值类型不再String,而是Long,因此默认的序列化类不再适用于将其写入Kafka。我们需要为Long类型提供重写的序列化方法,否则将抛出运行时异常:

counts.toStream().to("skindow-toptic", Produced.with(Serdes.String(), Serdes.Long()));
  • 1

注意,为了从主题skindow-toptic读取更改日志流,需要将反序列化值设置为org.apache.kafka.common. serialize . longdeserializer。这方面的详细信息可以在Play with a Streams应用程序部分中找到。假设可以使用JDK 8中的lambda表达式,则上述代码可以简化为:

KStream<String, String> source = builder.stream("my-replicated-topic");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
      .groupBy((key, value) -> value)
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
      .toStream()
      .to("skindow-toptic", Produced.with(Serdes.String(), Serdes.Long()));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果我们再次将此增强拓扑描述为System.out.println(topology.describe()),我们将得到以下内容:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [my-replicated-topic])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: counts-store-repartition)
      <-- KSTREAM-FILTER-0000000005

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006 (topics: [counts-store-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
      --> KTABLE-TOSTREAM-0000000007
      <-- KSTREAM-SOURCE-0000000006
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008 (topic: skindow-toptic)
      <-- KTABLE-TOSTREAM-0000000007
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

如上所述,拓扑现在包含两个断开连接的子拓扑。第一个子拓扑的汇聚节点KSTREAM-SINK-0000000004将写入重新分区主题Counts-repartition,该主题将由第二个子拓扑的源节点读取KSTREAM-SOURCE-0000000006。重新分区主题用于通过其聚合键“混洗”源流,在这种情况下是值字符串。另外,在第一子拓扑内部,KSTREAM-FILTER-0000000005在分组KSTREAM-KEY-SELECT-0000000002节点和汇聚节点之间注入无状态节点,以过滤掉其聚合密钥为空的任何中间记录。

在第二子拓扑中,聚合节点KSTREAM-AGGREGATE-0000000003与名为的状态存储相关联Counts(该名称由count运营商中的用户指定)。在从其即将到来的流源节点接收到每个记录时,聚合处理器将首先查询其关联的Counts存储以获得该密钥的当前计数,增加1,然后将新计数写回到存储。密钥的每个更新计数也将通过管道传输到KTABLE-TOSTREAM-0000000007节点的下游,该节点将此更新流解释为记录流,然后再进一步管道到汇聚节点KSTREAM-SINK-0000000008以写回Kafka。

完整的代码看起来像这样(假设使用了lambda表达式):

package com.skindow.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

@Slf4j
public class Wordcount {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        //程序的唯一标识符以区别于其他应用程序与同一Kafka集群通信
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //用于建立与Kafka集群的初始连接的主机/端口对的列表
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //记录键值对的默认序列化和反序列化库
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //定义Streams应用程序的计算逻辑,计算逻辑被定义为topology连接的处理器节点之一
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("my-replicated-topic");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("skindow-toptic", Produced.with(Serdes.String(), Serdes.Long()));
        //构建Topology对象
        final Topology topology = builder.build();
        log.info(topology.describe().toString());
        //构建 kafka流 API实例
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // 附加关闭处理程序来捕获control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);//是非正常退出,就是说无论程序正在执行与否,都退出
        }
        System.exit(0);//正常退出,程序正常执行结束退出
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

项目地址:
https://github.com/skindowSyc/firstProject.git 对应tag kafkaStreamDemo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/749834
推荐阅读
相关标签
  

闽ICP备14008679号