赞
踩
现有这样一个csv文件:
现在要通过Kafka Streams把它变成这样:
也就是说,原来的数据有"event,yes,maybe,invited,no"这五列,而且这五列的数据都是长度相当的id形式的数据。而"event"这一列只有一个数据,而其他的列里都有用空格隔开的多个数据。
需求是要把它变成event所对应的每一个列名中的每一个数据。
这个可能说的不是很清楚
举个例子来说
比如第一行的数据是这样的:
现在要把它变成:
这个应该比较清楚了:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.KStream; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class MyEventsAttendees { public static void main(String[] args) { Properties prop=new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoopt:9092"); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"kb07_eventatt"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); //todo 我们将定义Streams应用程序的计算逻辑。 // todo 在卡夫卡的流中,这种计算逻辑被定义为连接处理器节点的拓扑。 // todo 我们可以使用拓扑构建器来构造这样的拓扑 StreamsBuilder builder=new StreamsBuilder(); //todo 然后设置一个输入流的主题: KStream<Object, Object> event_attendees = builder.stream("eventatt") .filter((k, v) -> (!v.toString().startsWith("event,") && (v.toString().split(",").length == 5))); //todo 现在,就可以根据业务逻辑需要,从输入流主题中源源不断的获取数据,处理完成后发送到输出流中: event_attendees.flatMap((k,v)->{ System.out.println(k+" "+v); List<KeyValue<String,String>> keyValues=new ArrayList<>(); String[] split = v.toString().split(","); String eventId = split[0]; String[] yess = split[1].split(" "); String[] maybes = split[2].split(" "); String[] inviteds = split[3].split(" "); String[] nos = split[4].split(" "); for (String yes:yess){ KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+yes+",yes"); keyValues.add(keyValue); } for (String maybe:maybes){ KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+maybe+",maybe"); keyValues.add(keyValue); } for (String invited:inviteds){ KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+invited+",invited"); keyValues.add(keyValue); } for (String no:nos){ KeyValue<String,String> keyValue=new KeyValue<>(null,eventId+","+no+",no"); keyValues.add(keyValue); } return keyValues; }).to("event_attendees"); //todo 我们可以通过以下步骤来检查构建器所创建的拓扑结构: Topology topo=builder.build(); //System.out.println(topo.describe()); //todo 有了拓扑结构和属性配置后,就可以构造一个KafkaStream对象: KafkaStreams streams = new KafkaStreams(topo, prop); //todo 一旦调用 start()方法,流处理就会一直执行直到调用close(). //todo 我们可以注册一个JVM钩子来在程序关闭的时候终止流处理程序: CountDownLatch countDownLatch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("kb07_eventatt"){ @Override public void run() { streams.close(); countDownLatch.countDown(); } }); try{ streams.start(); countDownLatch.await(); }catch(InterruptedException e){ e.printStackTrace(); } System.exit(0); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。