赞
踩
一 首生是kafka -stream 版本号问题,然后是springboot1.5.6兼容问题,发现springboot2.0不支持kafka -stream1.0.2包
- 第一个是窗口聚合的初始值,第二个是进行聚合的聚合器,第三个就是窗口时间,第四个是类似于序列化的东西
-
- KTable aggregate = groupedStream.aggregate(initializer, aggregator, timeWindows, resultSerde);
下面直接依赖包 坑了很久,各种找版本
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <!-- 1.5.6版本 能配置 kafka-stream 1.0.2 最新版本的流依赖版,简化很多步骤 不支持kafak的 springboot 2.0-->
- <version>1.5.6.RELEASE</version>
- <relativePath/>
- </parent>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>1.1.0</version>
- <exclusions>
- <exclusion>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <version>1.0.2</version>
- </dependency>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.10</version>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.5</version>
- </dependency>
二 最后计算是成功了, 但是因为序列化原因一直显示不出结果,所有结果都是0
下面是能用序列化方法
GenericDeserializer.java
- package cloud.stream.serdes;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.apache.kafka.common.errors.SerializationException;
- import org.apache.kafka.common.serialization.Deserializer;
-
- import java.util.Map;
-
- //反序列化实现
-
- public class GenericDeserializer<T> implements Deserializer<T> {
-
- private ObjectMapper objectMapper = new ObjectMapper();
- private Class<T> type;
- /**
- * Default constructor needed by Kafka
- */
- public GenericDeserializer() {
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void configure(Map<String, ?> props, boolean isKey) {
- type = (Class<T>) props.get("JsonPOJOClass");
- }
-
- @Override
- public T deserialize(String topic, byte[] bytes) {
- if (bytes == null)
- return null;
-
- T data;
- try {
- data = objectMapper.readValue(bytes, type);
- } catch (Exception e) {
- throw new SerializationException(e);
- }
-
- return data;
- }
-
- @Override
- public void close() {
-
- }
-
-
- }
GenericSerializer.java
- package cloud.stream.serdes;
-
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.apache.kafka.common.errors.SerializationException;
- import org.apache.kafka.common.serialization.Serializer;
-
- import java.util.Map;
-
- //序列化实现
-
- public class GenericSerializer<T> implements Serializer<T> {
-
- private ObjectMapper objectMapper = new ObjectMapper();
-
- // public GenericSerializer(Class<T> pojoClass) {
- public GenericSerializer() {
- }
-
- @Override
- public void configure(Map<String, ?> props, boolean isKey) {
- }
-
- @Override
- public byte[] serialize(String topic, T data) {
- if (data == null)
- return null;
-
- try {
- return objectMapper.writeValueAsBytes(data);
- } catch (Exception e) {
- throw new SerializationException("Error serializing JSON message", e);
- }
- }
-
- @Override
- public void close() {
- }
-
-
- }
-
SerdesFactory.java
- package cloud.stream.serdes;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import org.apache.kafka.common.serialization.Deserializer;
- import org.apache.kafka.common.serialization.Serde;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.common.serialization.Serializer;
-
- import cloud.stream.model.Statistics;
-
- public class SerdesFactory {
- /**
- * @param <T> The class should have a constructor without any
- * arguments and have setter and getter for every member variable
- * @param pojoClass POJO class.
- * @return Instance of {@link Serde}
- *
- * 序列化和反序列化能用方法,
- */
- public static <T> Serde<T> serdFrom(Class<T> pojoClass) {
- Map<String, Object> serdeProps = new HashMap<>();
- final Serializer<Statistics> statisticsSerializer = new GenericSerializer<>();
- serdeProps.put("JsonPOJOClass", pojoClass);
- statisticsSerializer.configure(serdeProps, false);
-
- final Deserializer<Statistics> statisticsDeserializer = new GenericDeserializer<>();
- serdeProps.put("JsonPOJOClass", pojoClass);
- statisticsDeserializer.configure(serdeProps, false);
-
- // return Serdes.serdeFrom(new GenericSerializer<T>(pojoClass), new GenericDeserializer<T>(pojoClass));
- return (Serde<T>) Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);
- }
- }
下面是主程序代码
-
- * 统计60秒内,温度值的最大值 topic中的消息格式为数字,30, 21或者{"temp":19, "humidity": 25}
- */
- public class TemperatureAvgDemo {
- private static final int TEMPERATURE_WINDOW_SIZE = 60;
-
- public static void main(String[] args) throws Exception {
-
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temp-avg");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "itcast:9092");
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-
- //序列化和反序列化
- final Serde<Statistics> statisticsSerde =SerdesFactory.serdFrom( Statistics.class);
-
- StreamsBuilder builder = new StreamsBuilder();
-
- KStream<String, String> source = builder.stream("snmp-temp1");
- KTable<Windowed<String>, Statistics> max = source
- .selectKey(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return "stat";
- }
- })
- .groupByKey()
- .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
- .aggregate(
- new Initializer<Statistics>() {
- @Override
- public Statistics apply() {
- Statistics avgAndSum = new Statistics(0L,0L,0L);
- return avgAndSum;
- }
- },
- new Aggregator<String, String, Statistics>() {
- @Override
- public Statistics apply(String aggKey, String newValue, Statistics aggValue) {
- //topic中的消息格式为{"temp":19, "humidity": 25}
- System.out.println("aggKey:" + aggKey + ", newValue:" + newValue + ", aggKey:" + aggValue);
- Long newValueLong = null;
- try {
- JSONObject json = JSON.parseObject(newValue);
- newValueLong = json.getLong("temp");
- }
- catch (ClassCastException ex) {
- newValueLong = Long.valueOf(newValue);
- }
-
- aggValue.setCount(aggValue.getCount() + 1);
- aggValue.setSum(aggValue.getSum() + newValueLong);
- aggValue.setAvg(aggValue.getSum() / aggValue.getCount());
-
- return aggValue;
- }
- },
- Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
- .withValueSerde(statisticsSerde)
- );
-
- WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
- WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
- Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
- max.toStream().to("reulst-temp-stat", Produced.with(windowedSerde, statisticsSerde));
-
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- final CountDownLatch latch = new CountDownLatch(1);
-
-
- Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
- @Override
- public void run() {
- streams.close();
- latch.countDown();
- }
- });
-
- try {
- streams.start();
- latch.await();
- } catch (Throwable e) {
- System.exit(1);
- }
- System.exit(0);
- }
-
- }
最终输出结果
结果输出情况
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。