赞
踩
通过socket数据源,去请求一个socket服务(9999),得到数据流然后统计数据流中出现的单词及其个数
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 批处理入口环境
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); //流批一体的入口环境
streamEnv.setParallelism(1);
本地运行模式时,程序的默认并行度为CPU的逻辑核数
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
- SingleOutputStreamOperator<Tuple2<String, Integer>> words = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
- //切单词
- String[] split = s.split("//s+");
- for (String word : split) {
- //返回每一对(单词,1)
- collector.collect(Tuple2.of(word, 1));
- }
- }
- });
输入数据流由文本行组成,flatMap 函数将每行拆分为单独的单词,并将它们作为单独的输出元素发出。生成的数据流包含所有输入行中的所有单词。
- KeyedStream<Tuple2<String, Integer>, String> Keyed = words.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
- @Override
- public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
-
- return tuple2.f0;
- }
- });
- SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = Keyed.sum("f1");
将输入数据流 words 转换成一个键控流 (KeyedStream),使用 .keyBy() 方法指定按照元组中的第一个元素 f0 进行键控,也就是相同的 f0 值将被分到同一个分区中。这里用了匿名类实现了 KeySelector 接口,用于从元组中提取出键值。然后,调用 .sum() 方法对键控流进行求和操作。由于之前已经根据元组中的第一个元素进行了分区,所以对每个分区内所有元组的第二个元素 f1 求和得到的结果是每个不同键的数量总和。最后,将结果流 resultStream 作为输出。
resultStream.print();
env.execute();
通过netcat来创建一个socket连接
监听端口9999,当有实体连接时,就可以相互发送socket信息
当连接后,我们输入词以后,程序会对这些词进行分区统计
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。