当前位置:   article > 正文

Kafka 实战 - Kafka Streams 示例 TemperatureDemo_kafka stream demo

kafka stream demo

Kafka Streams提供了丰富的API和操作符,使开发者能够轻松构建实时流处理应用。以下是一个名为TemperatureDemo的示例,展示了如何使用Kafka Streams处理温度传感器数据流,实现简单的温度监控和报警功能。假设我们有一个名为sensor-readings的Topic,其中包含传感器ID(key)和温度值(value)。

TemperatureDemo.java

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;

import java.util.Properties;

public class TemperatureDemo {

    public static void main(String[] args) {
        Properties streamsConfig = new Properties();
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature-demo");
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 1. 从 `sensor-readings` Topic 中读取数据流
        KStream<String, Double> sensorReadings = builder.stream("sensor-readings");

        // 2. 过滤出温度高于阈值(例如 30°C)的读数
        double threshold = 30.0;
        KStream<String, Double> highTemperatures = sensorReadings.filter((sensorId, temperature) -> temperature > threshold);

        // 3. 将过滤后的高温读数输出到 `high-temperatures` Topic
        highTemperatures.to("high-temperatures");

        // 4. 创建一个打印流,将所有高温读数打印到控制台
        highTemperatures.foreach((sensorId, temperature) -> {
            System.out.printf("Sensor ID: %s, Temperature: %.1f°C (above threshold %.1f°C)%n", sensorId, temperature, threshold);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
        streams.start();

        // 在程序退出前,优雅地关闭Kafka Streams应用
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

示例解析:

  1. 配置:设置Kafka Streams应用的基本配置,包括应用程序ID、Kafka Bootstrap服务器地址、键值的默认序列化器等。

  2. 创建流处理拓扑:使用StreamsBuilder构建流处理拓扑。从名为sensor-readings的Topic创建一个名为sensorReadings的KStream,表示传感器读数的无界数据流。

  3. 过滤操作:定义一个Predicate(谓词)函数,用于筛选温度值大于阈值(此处为30°C)的读数。应用此谓词到sensorReadings流,得到一个新的KStreamhighTemperatures,仅包含高温读数。

  4. 输出结果:将过滤后的highTemperatures流写入到名为high-temperatures的Topic,供其他应用程序进一步处理或存储。同时,使用foreach操作将高温读数打印到控制台,便于实时监控。

  5. 启动与关闭:创建KafkaStreams实例,传入构建好的拓扑和配置,然后启动流处理应用。为了确保在程序退出时优雅地关闭Kafka Streams,注册一个shutdown hook。

这个TemperatureDemo示例展示了如何使用Kafka Streams处理实时温度数据流,实现高温读数的筛选、输出到新Topic以及实时监控打印。实际应用中,可以根据需要扩展此示例,添加更复杂的流处理逻辑,如窗口聚合、JOIN操作、状态管理等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/639903
推荐阅读
相关标签
  

闽ICP备14008679号