当前位置:   article > 正文

kafka-stream流式处理示例_kafka stream流式处理

kafka stream流式处理

   一 首生是kafka -stream 版本号问题,然后是springboot1.5.6兼容问题,发现springboot2.0不支持kafka -stream1.0.2包

  1. 第一个是窗口聚合的初始值,第二个是进行聚合的聚合器,第三个就是窗口时间,第四个是类似于序列化的东西
  2. KTable aggregate = groupedStream.aggregate(initializer, aggregator, timeWindows, resultSerde);

下面直接依赖包  坑了很久,各种找版本

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <!-- 1.5.6版本 能配置 kafka-stream 1.0.2 最新版本的流依赖版,简化很多步骤 不支持kafak的 springboot 2.0-->
  5. <version>1.5.6.RELEASE</version>
  6. <relativePath/>
  7. </parent>
  8. <dependency>
  9. <groupId>org.apache.kafka</groupId>
  10. <artifactId>kafka_2.12</artifactId>
  11. <version>1.1.0</version>
  12. <exclusions>
  13. <exclusion>
  14. <groupId>com.101tec</groupId>
  15. <artifactId>zkclient</artifactId>
  16. </exclusion>
  17. <exclusion>
  18. <groupId>org.slf4j</groupId>
  19. <artifactId>slf4j-log4j12</artifactId>
  20. </exclusion>
  21. </exclusions>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.kafka</groupId>
  25. <artifactId>kafka-streams</artifactId>
  26. <version>1.0.2</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>com.101tec</groupId>
  30. <artifactId>zkclient</artifactId>
  31. <version>0.10</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>commons-io</groupId>
  35. <artifactId>commons-io</artifactId>
  36. <version>2.5</version>
  37. </dependency>

二 最后计算是成功了, 但是因为序列化原因一直显示不出结果,所有结果都是0

下面是能用序列化方法

GenericDeserializer.java

  1. package cloud.stream.serdes;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.kafka.common.errors.SerializationException;
  4. import org.apache.kafka.common.serialization.Deserializer;
  5. import java.util.Map;
  6. //反序列化实现
  7. public class GenericDeserializer<T> implements Deserializer<T> {
  8. private ObjectMapper objectMapper = new ObjectMapper();
  9. private Class<T> type;
  10. /**
  11. * Default constructor needed by Kafka
  12. */
  13. public GenericDeserializer() {
  14. }
  15. @SuppressWarnings("unchecked")
  16. @Override
  17. public void configure(Map<String, ?> props, boolean isKey) {
  18. type = (Class<T>) props.get("JsonPOJOClass");
  19. }
  20. @Override
  21. public T deserialize(String topic, byte[] bytes) {
  22. if (bytes == null)
  23. return null;
  24. T data;
  25. try {
  26. data = objectMapper.readValue(bytes, type);
  27. } catch (Exception e) {
  28. throw new SerializationException(e);
  29. }
  30. return data;
  31. }
  32. @Override
  33. public void close() {
  34. }
  35. }

GenericSerializer.java

 

  1. package cloud.stream.serdes;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.kafka.common.errors.SerializationException;
  4. import org.apache.kafka.common.serialization.Serializer;
  5. import java.util.Map;
  6. //序列化实现
  7. public class GenericSerializer<T> implements Serializer<T> {
  8. private ObjectMapper objectMapper = new ObjectMapper();
  9. // public GenericSerializer(Class<T> pojoClass) {
  10. public GenericSerializer() {
  11. }
  12. @Override
  13. public void configure(Map<String, ?> props, boolean isKey) {
  14. }
  15. @Override
  16. public byte[] serialize(String topic, T data) {
  17. if (data == null)
  18. return null;
  19. try {
  20. return objectMapper.writeValueAsBytes(data);
  21. } catch (Exception e) {
  22. throw new SerializationException("Error serializing JSON message", e);
  23. }
  24. }
  25. @Override
  26. public void close() {
  27. }
  28. }

SerdesFactory.java

 

  1. package cloud.stream.serdes;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.apache.kafka.common.serialization.Deserializer;
  5. import org.apache.kafka.common.serialization.Serde;
  6. import org.apache.kafka.common.serialization.Serdes;
  7. import org.apache.kafka.common.serialization.Serializer;
  8. import cloud.stream.model.Statistics;
  9. public class SerdesFactory {
  10. /**
  11. * @param <T> The class should have a constructor without any
  12. * arguments and have setter and getter for every member variable
  13. * @param pojoClass POJO class.
  14. * @return Instance of {@link Serde}
  15. *
  16. * 序列化和反序列化能用方法,
  17. */
  18. public static <T> Serde<T> serdFrom(Class<T> pojoClass) {
  19. Map<String, Object> serdeProps = new HashMap<>();
  20. final Serializer<Statistics> statisticsSerializer = new GenericSerializer<>();
  21. serdeProps.put("JsonPOJOClass", pojoClass);
  22. statisticsSerializer.configure(serdeProps, false);
  23. final Deserializer<Statistics> statisticsDeserializer = new GenericDeserializer<>();
  24. serdeProps.put("JsonPOJOClass", pojoClass);
  25. statisticsDeserializer.configure(serdeProps, false);
  26. // return Serdes.serdeFrom(new GenericSerializer<T>(pojoClass), new GenericDeserializer<T>(pojoClass));
  27. return (Serde<T>) Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);
  28. }
  29. }

下面是主程序代码

 

  1. * 统计60秒内,温度值的最大值 topic中的消息格式为数字,30, 21或者{"temp":19, "humidity": 25}
  2. */
  3. public class TemperatureAvgDemo {
  4. private static final int TEMPERATURE_WINDOW_SIZE = 60;
  5. public static void main(String[] args) throws Exception {
  6. Properties props = new Properties();
  7. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temp-avg");
  8. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "itcast:9092");
  9. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  10. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  11. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  12. props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
  13. //序列化和反序列化
  14. final Serde<Statistics> statisticsSerde =SerdesFactory.serdFrom( Statistics.class);
  15. StreamsBuilder builder = new StreamsBuilder();
  16. KStream<String, String> source = builder.stream("snmp-temp1");
  17. KTable<Windowed<String>, Statistics> max = source
  18. .selectKey(new KeyValueMapper<String, String, String>() {
  19. @Override
  20. public String apply(String key, String value) {
  21. return "stat";
  22. }
  23. })
  24. .groupByKey()
  25. .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
  26. .aggregate(
  27. new Initializer<Statistics>() {
  28. @Override
  29. public Statistics apply() {
  30. Statistics avgAndSum = new Statistics(0L,0L,0L);
  31. return avgAndSum;
  32. }
  33. },
  34. new Aggregator<String, String, Statistics>() {
  35. @Override
  36. public Statistics apply(String aggKey, String newValue, Statistics aggValue) {
  37. //topic中的消息格式为{"temp":19, "humidity": 25}
  38. System.out.println("aggKey:" + aggKey + ", newValue:" + newValue + ", aggKey:" + aggValue);
  39. Long newValueLong = null;
  40. try {
  41. JSONObject json = JSON.parseObject(newValue);
  42. newValueLong = json.getLong("temp");
  43. }
  44. catch (ClassCastException ex) {
  45. newValueLong = Long.valueOf(newValue);
  46. }
  47. aggValue.setCount(aggValue.getCount() + 1);
  48. aggValue.setSum(aggValue.getSum() + newValueLong);
  49. aggValue.setAvg(aggValue.getSum() / aggValue.getCount());
  50. return aggValue;
  51. }
  52. },
  53. Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
  54. .withValueSerde(statisticsSerde)
  55. );
  56. WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
  57. WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
  58. Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
  59. max.toStream().to("reulst-temp-stat", Produced.with(windowedSerde, statisticsSerde));
  60. final KafkaStreams streams = new KafkaStreams(builder.build(), props);
  61. final CountDownLatch latch = new CountDownLatch(1);
  62. Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
  63. @Override
  64. public void run() {
  65. streams.close();
  66. latch.countDown();
  67. }
  68. });
  69. try {
  70. streams.start();
  71. latch.await();
  72. } catch (Throwable e) {
  73. System.exit(1);
  74. }
  75. System.exit(0);
  76. }
  77. }

最终输出结果

结果输出情况

 

 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/749815
推荐阅读
相关标签
  

闽ICP备14008679号