赞
踩
package com.shinho.wc; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class BoundryWordCount { public static void main(String[] args) throws Exception { //1创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lineDS = env.readTextFile("input/words.txt"); SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //分组 KeyedStream<Tuple2<String, Long>, Tuple> keyBy = wordAndOne.keyBy(0); //求和 SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1); // sum.print(); //启动执行 env.execute(); } }
控制台输出结果
1> (xx,1)
7> (kaikai,1)
3> (hello,1)
6> (word,1)
2> (gez,1)
7> (count,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)
6> (word,2)
前面是并行子任务的编码,子任务个数取决于并行度(电脑CPU核数)。同一个任务上才能进行词频叠加。
yum install -y nc
nc -lk 7777
package com.shinho.wc; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class NoBoundryWordCount { public static void main(String[] args) throws Exception { // 创建流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取文本流 DataStreamSource<String> lineDS = env.socketTextStream("192.168.10.132", 7777); SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //分组 KeyedStream<Tuple2<String, Long>, Tuple> keyBy = wordAndOne.keyBy(0); //求和 SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1); // sum.print(); //启动执行 env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。