当前位置:   article > 正文

Kafka Stream流式计算

kafka stream流

用例

- 日志分析

- 大屏看板统计

- 公交实时数据

- 实时文章分值计算

执行流程

对单词个数进行统计

演示

1. 生产者

  1. /**
  2. * 生产者
  3. */
  4. public class ProducerQuickStart {
  5. public static void main(String[] args) {
  6. //1.kafka的配置信息
  7. Properties properties = new Properties();
  8. //kafka的连接地址
  9. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.128:9092");
  10. //发送失败,失败的重试次数
  11. properties.put(ProducerConfig.RETRIES_CONFIG,5);
  12. //消息key的序列化器
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  14. //消息value的序列化器
  15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  16. //数据压缩
  17. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
  18. //2.生产者对象
  19. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
  20. //封装发送的消息
  21. ProducerRecord<String,String> record = new ProducerRecord<String, String>("itcast-topic-input","key","hello 1");
  22. RecordMetadata recordMetadata = null;
  23. //3.发送消息
  24. //同步
  25. // try {
  26. // recordMetadata = producer.send(record).get();
  27. // } catch (InterruptedException e) {
  28. // throw new RuntimeException(e);
  29. // } catch (ExecutionException e) {
  30. // throw new RuntimeException(e);
  31. // }
  32. //异步
  33. //异步消息发送
  34. for (int i = 0; i < 10; i++) {
  35. producer.send(record, (recordMetadata1, e) -> {
  36. if(e != null){
  37. System.out.println("记录异常信息到日志表中");
  38. }
  39. System.out.println(recordMetadata1.offset());
  40. });
  41. }
  42. // System.out.println(recordMetadata.offset());
  43. //4.关闭消息通道,必须关闭,否则消息发送不成功
  44. producer.close();
  45. }
  46. }

2. Stream

  1. public class KafkaStreamQuickStart {
  2. public static void main(String[] args) {
  3. //kafka的配置信息
  4. Properties prop = new Properties();
  5. prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.128:9092");
  6. // 下方是Stream相关的序列化器
  7. prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  8. prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  9. prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
  10. //stream 构建器
  11. StreamsBuilder streamsBuilder = new StreamsBuilder();
  12. //流式计算
  13. streamProcessor(streamsBuilder);
  14. //创建kafkaStream对象
  15. KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
  16. //开启流式计算
  17. kafkaStreams.start();
  18. }
  19. /**
  20. * 流式计算
  21. * 消息的内容:hello kafka hello itcast
  22. * @param streamsBuilder
  23. */
  24. private static void streamProcessor(StreamsBuilder streamsBuilder) {
  25. //创建kstream对象,同时指定从那个topic中接收消息
  26. KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
  27. /**
  28. * 处理消息的value
  29. */
  30. stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
  31. //按照value进行聚合处理
  32. .groupBy((key,value)->value)
  33. //时间窗口 每十秒执行一次
  34. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  35. //统计单词的个数
  36. .count()
  37. //转换为kStream
  38. .toStream()
  39. .map((key,value)->{
  40. System.out.println("key:"+key+",value:"+value);
  41. return new KeyValue<>(key.key().toString(),value.toString());
  42. })
  43. //发送消息
  44. .to("itcast-topic-out");
  45. }
  46. }

3. 消费者

  1. public class ConsumerQuickStart {
  2. public static void main(String[] args) {
  3. //1.添加kafka的配置信息
  4. Properties properties = new Properties();
  5. //kafka的连接地址
  6. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.128:9092");
  7. //消费者组
  8. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
  9. //消息的反序列化器
  10. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  11. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  12. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
  13. //2.消费者对象
  14. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  15. //3.订阅主题
  16. consumer.subscribe(Collections.singletonList("itcast-topic-input"));
  17. //当前线程一直处于监听状态
  18. try{
  19. while (true){
  20. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  21. for (ConsumerRecord<String, String> record : records) {
  22. System.out.println(record.value());
  23. System.out.println(record.key());
  24. System.out.println(record.offset());
  25. //同步提交偏移量
  26. // try {
  27. // consumer.commitSync();//同步提交当前最新的偏移量
  28. // }catch (CommitFailedException e){
  29. // System.out.println("记录提交失败的异常:"+e);
  30. // }
  31. //异步提交偏移量
  32. // consumer.commitAsync((map, e) -> {
  33. // if(e!=null){
  34. // System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
  35. // }
  36. // });
  37. }
  38. consumer.commitAsync();
  39. }
  40. }catch (RuntimeException e){
  41. e.printStackTrace();
  42. }finally {
  43. try{
  44. //默认提交拉取的这些数据中最大的偏移量
  45. consumer.commitSync();
  46. }finally {
  47. consumer.close();
  48. }
  49. }
  50. }
  51. }

kafka流的配置主要包含四项

1. 编码器

2. 解码器

3. kafka地址

4. 分配ID

  1. Properties prop = new Properties();
  2. prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.128:9092");
  3. prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  4. prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  5. prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

Kafka Stream 集成SpringBoot

需要使用配置类对kafka stream进行配置

  1. @Setter
  2. @Getter
  3. @Configuration
  4. @EnableKafkaStreams
  5. @ConfigurationProperties(prefix="kafka")
  6. public class KafkaStreamConfig {
  7. private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
  8. private String hosts;
  9. private String group;
  10. @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  11. public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  12. Map<String, Object> props = new HashMap<>();
  13. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  14. props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
  15. props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
  16. props.put(StreamsConfig.RETRIES_CONFIG, 10);
  17. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  18. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  19. return new KafkaStreamsConfiguration(props);
  20. }
  21. }

同时需要 指定需要进行stream的topic,以及相关逻辑操作

  1. @Configuration
  2. @Slf4j
  3. public class KafkaStreamHelloListener {
  4. @Bean
  5. public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
  6. //创建kstream对象,同时指定从那个topic中接收消息
  7. KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
  8. stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
  9. @Override
  10. public Iterable<String> apply(String value) {
  11. return Arrays.asList(value.split(" "));
  12. }
  13. })
  14. //根据value进行聚合分组
  15. .groupBy((key,value)->value)
  16. //聚合计算时间间隔
  17. .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  18. //求单词的个数
  19. .count()
  20. .toStream()
  21. //处理后的结果转换为string字符串
  22. .map((key,value)->{
  23. System.out.println("key:"+key+",value:"+value);
  24. return new KeyValue<>(key.key().toString(),value.toString());
  25. })
  26. //发送消息
  27. .to("itcast-topic-out");
  28. return stream;
  29. }
  30. }

代码配置

  1. server:
  2. port: 9991
  3. spring:
  4. application:
  5. name: kafka-demo
  6. kafka:
  7. bootstrap-servers: 192.168.200.128:9092
  8. producer:
  9. retries: 10
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. compressionType: lz4
  13. consumer:
  14. group-id: ${spring.application.name}-test
  15. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  16. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17. kafka:
  18. hosts: 192.168.200.128:9092
  19. group: ${spring.application.name}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/639883
推荐阅读
相关标签
  

闽ICP备14008679号