当前位置:   article > 正文

Kafka Streams练习_kafka streams实战 编程题目

kafka streams实战 编程题目

练习需求

现有这样一个csv文件:
在这里插入图片描述
现在要通过Kafka Streams把它变成这样:
在这里插入图片描述
也就是说,原来的数据有"event,yes,maybe,invited,no"这五列,而且这五列的数据都是长度相当的id形式的数据。而"event"这一列只有一个数据,而其他的列里都有用空格隔开的多个数据。
需求是要把它变成event所对应的每一个列名中的每一个数据。
这个可能说的不是很清楚
举个例子来说
比如第一行的数据是这样的:
在这里插入图片描述
现在要把它变成:

在这里插入图片描述
这个应该比较清楚了:

代码编写

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class MyEventsAttendees {
    public static void main(String[] args) {
        Properties prop=new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoopt:9092");
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"kb07_eventatt");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        //todo 我们将定义Streams应用程序的计算逻辑。
        // todo 在卡夫卡的流中,这种计算逻辑被定义为连接处理器节点的拓扑。
        // todo 我们可以使用拓扑构建器来构造这样的拓扑
        StreamsBuilder builder=new StreamsBuilder();

        //todo 然后设置一个输入流的主题:
        KStream<Object, Object> event_attendees = builder.stream("eventatt")
                .filter((k, v) -> (!v.toString().startsWith("event,")
                        && (v.toString().split(",").length == 5)));

        //todo 现在,就可以根据业务逻辑需要,从输入流主题中源源不断的获取数据,处理完成后发送到输出流中:
        event_attendees.flatMap((k,v)->{
            System.out.println(k+"  "+v);
            List<KeyValue<String,String>> keyValues=new ArrayList<>();
            String[] split = v.toString().split(",");
            String eventId = split[0];
            String[] yess = split[1].split(" ");
            String[] maybes = split[2].split(" ");
            String[] inviteds = split[3].split(" ");
            String[] nos = split[4].split(" ");
            for (String yes:yess){
                KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+yes+",yes");
                keyValues.add(keyValue);
            }
            for (String maybe:maybes){
                KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+maybe+",maybe");
                keyValues.add(keyValue);
            }
            for (String invited:inviteds){
                KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+invited+",invited");
                keyValues.add(keyValue);
            }
            for (String no:nos){
                KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+no+",no");
                keyValues.add(keyValue);
            }
            return keyValues;
        }).to("event_attendees");

        //todo 我们可以通过以下步骤来检查构建器所创建的拓扑结构:
        Topology topo=builder.build();
        //System.out.println(topo.describe());

        //todo 有了拓扑结构和属性配置后,就可以构造一个KafkaStream对象:
        KafkaStreams streams = new KafkaStreams(topo, prop);

        //todo 一旦调用 start()方法,流处理就会一直执行直到调用close().
        //todo 我们可以注册一个JVM钩子来在程序关闭的时候终止流处理程序:
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("kb07_eventatt"){
            @Override
            public void run() {
                streams.close();
                countDownLatch.countDown();
            }
        });

        try{
            streams.start();
            countDownLatch.await();
        }catch(InterruptedException e){
            e.printStackTrace();
        }

        System.exit(0);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/1017777
推荐阅读
相关标签
  

闽ICP备14008679号