赞
踩
Kafka Streams提供了丰富的API和操作符,使开发者能够轻松构建实时流处理应用。以下是一个名为TemperatureDemo
的示例,展示了如何使用Kafka Streams处理温度传感器数据流,实现简单的温度监控和报警功能。假设我们有一个名为sensor-readings
的Topic,其中包含传感器ID(key)和温度值(value)。
TemperatureDemo.java
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Predicate; import java.util.Properties; public class TemperatureDemo { public static void main(String[] args) { Properties streamsConfig = new Properties(); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature-demo"); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass()); StreamsBuilder builder = new StreamsBuilder(); // 1. 从 `sensor-readings` Topic 中读取数据流 KStream<String, Double> sensorReadings = builder.stream("sensor-readings"); // 2. 过滤出温度高于阈值(例如 30°C)的读数 double threshold = 30.0; KStream<String, Double> highTemperatures = sensorReadings.filter((sensorId, temperature) -> temperature > threshold); // 3. 将过滤后的高温读数输出到 `high-temperatures` Topic highTemperatures.to("high-temperatures"); // 4. 创建一个打印流,将所有高温读数打印到控制台 highTemperatures.foreach((sensorId, temperature) -> { System.out.printf("Sensor ID: %s, Temperature: %.1f°C (above threshold %.1f°C)%n", sensorId, temperature, threshold); }); KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig); streams.start(); // 在程序退出前,优雅地关闭Kafka Streams应用 Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
示例解析:
配置:设置Kafka Streams应用的基本配置,包括应用程序ID、Kafka Bootstrap服务器地址、键值的默认序列化器等。
创建流处理拓扑:使用StreamsBuilder
构建流处理拓扑。从名为sensor-readings
的Topic创建一个名为sensorReadings
的KStream,表示传感器读数的无界数据流。
过滤操作:定义一个Predicate(谓词)函数,用于筛选温度值大于阈值(此处为30°C)的读数。应用此谓词到sensorReadings
流,得到一个新的KStreamhighTemperatures
,仅包含高温读数。
输出结果:将过滤后的highTemperatures
流写入到名为high-temperatures
的Topic,供其他应用程序进一步处理或存储。同时,使用foreach
操作将高温读数打印到控制台,便于实时监控。
启动与关闭:创建KafkaStreams
实例,传入构建好的拓扑和配置,然后启动流处理应用。为了确保在程序退出时优雅地关闭Kafka Streams,注册一个shutdown hook。
这个TemperatureDemo
示例展示了如何使用Kafka Streams处理实时温度数据流,实现高温读数的筛选、输出到新Topic以及实时监控打印。实际应用中,可以根据需要扩展此示例,添加更复杂的流处理逻辑,如窗口聚合、JOIN操作、状态管理等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。