赞
踩
package com.lyj.sx.flink.wordCount; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class LocalWithWebUI { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("pxj62", 8889); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String string : s.split(" ")) { collector.collect(Tuple2.of(string, 1)); } } }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> s) throws Exception { return s.f0; } }).sum(1); summed.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.wordCount; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamingWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); int parallelism = env.getParallelism(); System.out.println("parallelism:" + parallelism); DataStreamSource<String> source = env.socketTextStream("pxj62", 8881); System.out.println("source"+source.getParallelism()); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] strings = s.split(" "); for (String string : strings) { collector.collect(Tuple2.of(string, 1)); } } }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> s) throws Exception { return s.f0; } }).sum(1); summed.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.wordCount; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamingWordCountV3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("pxj62", 8889); SingleOutputStreamOperator<Tuple2<String, Integer>> data = source.flatMap(new MyFlatMap()); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = data.keyBy(0).sum(1); summed.print(); env.execute("pxj"); } public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String,Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String string : s.split(" ")) { collector.collect(Tuple2.of(string,1)); } } } }
package com.lyj.sx.flink.day02; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ReadTextFileDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.readTextFile("data/a.txt"); source.map(new MapFunction<String, Tuple2<String,Integer>>() { Tuple2<String,Integer> s1; @Override public Tuple2<String, Integer> map(String s) throws Exception { String[] strings = s.split(" "); for (String string : strings) { s1=Tuple2.of(string,1); } return s1; } }).print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day02; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Arrays; import java.util.List; import java.util.UUID; public class CustomNoParSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); System.out.println("环境执行的并行度:"+env.getParallelism()); DataStreamSource<String> source = env.addSource(new Mysource2()); System.out.println("source的并行度为:"+source.getParallelism()); source.print(); // env.execute("pxj"); env.execute(); } private static class Mysource1 implements SourceFunction<String> { //启动,并产生数据,产生的数据用SourceContext输出 @Override public void run(SourceContext<String> cx) throws Exception { List<String> lists = Arrays.asList("a", "b", "c", "pxj", "sx", "lyj"); for (String list : lists) { cx.collect(list); } } //将Source停掉 @Override public void cancel() { } } private static class Mysource2 implements SourceFunction<String>{ private Boolean flag=true; @Override public void run(SourceContext<String> cx) throws Exception { System.out.println("run...."); while (flag){ cx.collect(UUID.randomUUID().toString()); } } @Override public void cancel() { System.out.println("cancel"); flag=false; } } }
作者:pxj_sx(潘陈)
日期:2024-04-11 0:26:20
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。