当前位置:   article > 正文

java kafkastream_Kafka 实战:(五)Kafka Stream API 实现

kafkastreams实战

案例一:实现topic之间的流传输

一、Kafka Java代码

创建maven过程,导入以下依赖

org.apache.kafka

kafka_2.11

2.0.0

org.apache.kafka

kafka-streams

2.0.0

代码部分public class MyStream {

public static void main(String[] args) {

Properties prop = new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

// 创建流构造器

StreamsBuilder builder = new StreamsBuilder();

// 构建好builder 将mystreamin topic中的数据写入到 mystreamout topic中

builder.stream("mystreamin").to("mystreamout");

final Topology topo = builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}

二、Kafka Shell 命令

1、创建Topic`kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic mystreamin --partitions 1 --replication-factor 1

kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic mystreamout --partitions 1 --replication-factor 1`

* 1

* 2

8ba7f1f0f49a03e26546d4087f179a94.png

7589590ef022cec1beaf029379dee540.png

查看Topickafka-topics.sh --zookeeper 192.168.247.201:2181 --list

ff60f293b213cde0e2227b73bfc484f1.png

2、运行Java代码,执行以下步骤:

生产消息kafka-console-producer.sh --topic mystreamin --broker-list 127.0.0.1:9092

ec592c93584bd646e4003f1ecf6f7739.png

消费消息kafka-console-consumer.sh --topic mystreamout --bootstrap-server 127.0.0.1:9092 --from-beginning

2f9db86e52599e9e8ed63098c0d2a5e7.png

案例二:WordCount Stream API

一、Kafka Java代码

代码部分public class WordCountStream {

public static void main(String[] args) {

Properties prop = new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);

prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest"); // earliest latest

prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交方式

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

// 创建流构造器

// wordcount-input

// hello world

// hello java

StreamsBuilder builder = new StreamsBuilder();

KTable count = builder.stream("wordcount-input") // 从kafka中一条一条的取数据

.flatMapValues( // 返回压扁后的数据

(value) -> { // 对数据进行按空格切割,返回List集合

String[] split = value.toString().split(" ");

List strings = Arrays.asList(split);

return strings;

}) // key:null value:hello ,key:null value:world ,key:null value:hello ,key:null value:java

.map((k, v) -> {

return new KeyValue(v,"1");

}).groupByKey().count();

count.toStream().foreach((k,v) -> {

System.out.println("key:"+k+" value:"+v);

});

count.toStream().map((x,y) -> {

return new KeyValue(x,y.toString());

}).to("wordcount-out");

final Topology topo = builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}

358fc00270cdc60018b9789bb6ff101b.png

二、Kafka Shell 命令

1、创建Topickafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic wordcount-input --partitions 1 --replication-factor 1

kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic wordcount-out --partitions 1 --replication-factor 1

c6dab4f1ac21e31abe5935bf896be343.png

bd9de6b4b5138cb4e1f68ed87ce8bd39.png

**2、运行Java代码,执行以下步骤:

生产消息**kafka-console-producer.sh --topic wordcount-input --broker-list 127.0.0.1:9092

4a4c995963a356e3a667ad81a28cd637.png

消费消息kafka-console-consumer.sh --topic wordcount-out --bootstrap-server 127.0.0.1:9092 --from-beginning

e9c7e817b8e0bf796998d9efc96ca3c5.png

显示key消费消息kafka-console-consumer.sh --topic wordcount-out --bootstrap-server 127.0.0.1:9092 --property print.key=true --from-beginning

a0a3e3a94c8f206a1c2e3aa16c067882.png

案例三:利用Kafka流实现对输入数字的求和

一、Kafka Java代码public class SumStream {

public static void main(String[] args) {

Properties prop = new Properties();

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sumstream");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);

prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest"); // earliest latest

prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交方式

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream source = builder.stream("suminput");

source.map((key,value) ->

new KeyValue("sum: ",value.toString())

).groupByKey().reduce((x,y) ->{

System.out.println("x: "+x+" y: "+y);

Integer sum = Integer.valueOf(x)+Integer.valueOf(y);

System.out.println("sum: "+sum);

return sum.toString();

});

final Topology topo = builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}

9adda2e8a21da4e2c00bdd8d96ce8f3d.png

二、Kafka Shell 命令

1、创建Topickafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic suminput --partitions 1 --replication-factor 1

ea113ac4c0e48208d3dc50ba2795bb46.png

**2、运行Java代码,执行以下步骤:

生产消息**kafka-console-producer.sh --topic suminput --broker-list 127.0.0.1:9092

1adfcf72d7afa26fba0e8b4a779f3c1b.png

案例四:Kafka Stream实现不同窗口的流处理

一、Kafka Java代码package cn.kgc.kb09;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.protocol.types.Field;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.SessionWindows;

import org.apache.kafka.streams.kstream.TimeWindows;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

/**

* @Qianchun

* @Date 2020/12/16

* @Description

*/

public class WindowStream {

public static void main(String[] args) {

Properties prop = new Properties();

// 不同的窗口流不能使用相同的应用ID

prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"SessionWindow");

prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");

prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);

prop.put(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest"); // earliest latest

prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 设置手动提交方式

prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream source = builder.stream("windowdemo");

source.flatMapValues(value -> Arrays.asList(value.toString().split("s+")))

.map((x,y) -> {

return new KeyValue(y,"1");

}).groupByKey()

//以下所有窗口的时间均可通过下方参数调设

// Tumbling Time Window(窗口为5秒,5秒内有效)

// .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()))

// Hopping Time Window(窗口为5秒,每次移动2秒,所以若5秒内只输入一次会出现5/2+1=3次)

// .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis())

// .advanceBy(Duration.ofSeconds(2).toMillis()))

// Session Time Window(20秒内只要输入Session就有效,距离下一次输入超过20秒Session失效,所有从重新从0开始)

// .windowedBy(SessionWindows.with(Duration.ofSeconds(20).toMillis()))

.count().toStream().foreach((x,y) -> {

System.out.println("x: "+x+" y:"+y);

});

final Topology topo = builder.build();

final KafkaStreams streams = new KafkaStreams(topo, prop);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("stream"){

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.exit(0);

}

}

二、Kafka Shell 命令

1、创建Topickafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic windowdemo --partitions 1 --replication-factor 1

054408f86788190a07c18de775d1cca5.png

**2、运行Java代码,执行以下步骤:

生产消息**kafka-console-producer.sh --topic windowdemo --broker-list 127.0.0.1:9092

注意:ERROR:Exception in thread “sum-a3bbe4d0-4cc9-4812-a7a0-e650a8a60c9f-StreamThread-1” java.lang.IllegalArgumentException: Window endMs time cannot be smaller than window startMs time.

数组越界

解决方案:大概率是窗口ID一致,请修改prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "sessionwindow");的参数。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/639896
推荐阅读
相关标签
  

闽ICP备14008679号