赞
踩
- 日志分析
- 大屏看板统计
- 公交实时数据
- 实时文章分值计算
对单词个数进行统计
1. 生产者
- /**
- * 生产者
- */
- public class ProducerQuickStart {
-
- public static void main(String[] args) {
- //1.kafka的配置信息
- Properties properties = new Properties();
- //kafka的连接地址
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.128:9092");
- //发送失败,失败的重试次数
- properties.put(ProducerConfig.RETRIES_CONFIG,5);
- //消息key的序列化器
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
- //消息value的序列化器
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
- //数据压缩
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
- //2.生产者对象
- KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
-
- //封装发送的消息
- ProducerRecord<String,String> record = new ProducerRecord<String, String>("itcast-topic-input","key","hello 1");
- RecordMetadata recordMetadata = null;
- //3.发送消息
- //同步
- // try {
- // recordMetadata = producer.send(record).get();
- // } catch (InterruptedException e) {
- // throw new RuntimeException(e);
- // } catch (ExecutionException e) {
- // throw new RuntimeException(e);
- // }
- //异步
- //异步消息发送
- for (int i = 0; i < 10; i++) {
- producer.send(record, (recordMetadata1, e) -> {
- if(e != null){
- System.out.println("记录异常信息到日志表中");
- }
- System.out.println(recordMetadata1.offset());
- });
- }
-
- // System.out.println(recordMetadata.offset());
-
-
- //4.关闭消息通道,必须关闭,否则消息发送不成功
- producer.close();
- }
-
- }
2. Stream
- public class KafkaStreamQuickStart {
-
- public static void main(String[] args) {
-
- //kafka的配置信息
- Properties prop = new Properties();
- prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.128:9092");
- // 下方是Stream相关的序列化器
- prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
-
- //stream 构建器
- StreamsBuilder streamsBuilder = new StreamsBuilder();
-
- //流式计算
- streamProcessor(streamsBuilder);
-
-
- //创建kafkaStream对象
- KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
- //开启流式计算
- kafkaStreams.start();
- }
-
- /**
- * 流式计算
- * 消息的内容:hello kafka hello itcast
- * @param streamsBuilder
- */
- private static void streamProcessor(StreamsBuilder streamsBuilder) {
- //创建kstream对象,同时指定从那个topic中接收消息
- KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
-
- /**
- * 处理消息的value
- */
- stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
- //按照value进行聚合处理
- .groupBy((key,value)->value)
- //时间窗口 每十秒执行一次
- .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
- //统计单词的个数
- .count()
- //转换为kStream
- .toStream()
- .map((key,value)->{
- System.out.println("key:"+key+",value:"+value);
- return new KeyValue<>(key.key().toString(),value.toString());
- })
- //发送消息
- .to("itcast-topic-out");
- }
- }
3. 消费者
- public class ConsumerQuickStart {
-
- public static void main(String[] args) {
- //1.添加kafka的配置信息
- Properties properties = new Properties();
- //kafka的连接地址
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.128:9092");
- //消费者组
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
- //消息的反序列化器
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
- //2.消费者对象
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
-
- //3.订阅主题
- consumer.subscribe(Collections.singletonList("itcast-topic-input"));
-
- //当前线程一直处于监听状态
-
- try{
- while (true){
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<String, String> record : records) {
- System.out.println(record.value());
- System.out.println(record.key());
- System.out.println(record.offset());
- //同步提交偏移量
- // try {
- // consumer.commitSync();//同步提交当前最新的偏移量
- // }catch (CommitFailedException e){
- // System.out.println("记录提交失败的异常:"+e);
- // }
-
- //异步提交偏移量
- // consumer.commitAsync((map, e) -> {
- // if(e!=null){
- // System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
- // }
- // });
-
- }
-
- consumer.commitAsync();
- }
- }catch (RuntimeException e){
- e.printStackTrace();
- }finally {
- try{
- //默认提交拉取的这些数据中最大的偏移量
- consumer.commitSync();
- }finally {
- consumer.close();
- }
-
- }
-
-
-
-
- }
-
- }
kafka流的配置主要包含四项
1. 编码器
2. 解码器
3. kafka地址
4. 分配ID
- Properties prop = new Properties();
- prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.128:9092");
- prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
需要使用配置类对kafka stream进行配置
- @Setter
- @Getter
- @Configuration
- @EnableKafkaStreams
- @ConfigurationProperties(prefix="kafka")
- public class KafkaStreamConfig {
- private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
- private String hosts;
- private String group;
- @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
- public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
- Map<String, Object> props = new HashMap<>();
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
- props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
- props.put(StreamsConfig.RETRIES_CONFIG, 10);
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- return new KafkaStreamsConfiguration(props);
- }
- }
同时需要 指定需要进行stream的topic,以及相关逻辑操作
- @Configuration
- @Slf4j
- public class KafkaStreamHelloListener {
-
- @Bean
- public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
- //创建kstream对象,同时指定从那个topic中接收消息
- KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
- stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(String value) {
- return Arrays.asList(value.split(" "));
- }
- })
- //根据value进行聚合分组
- .groupBy((key,value)->value)
- //聚合计算时间间隔
- .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
- //求单词的个数
- .count()
- .toStream()
- //处理后的结果转换为string字符串
- .map((key,value)->{
- System.out.println("key:"+key+",value:"+value);
- return new KeyValue<>(key.key().toString(),value.toString());
- })
- //发送消息
- .to("itcast-topic-out");
- return stream;
- }
- }
- server:
- port: 9991
- spring:
- application:
- name: kafka-demo
- kafka:
- bootstrap-servers: 192.168.200.128:9092
- producer:
- retries: 10
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- compressionType: lz4
- consumer:
- group-id: ${spring.application.name}-test
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-
- kafka:
- hosts: 192.168.200.128:9092
- group: ${spring.application.name}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。