赞
踩
总结:其实还是两个数据集的整合操作
package waterChuli.flinkDetil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * * * https://www.pianshen.com/article/60011255975/ * nc -lp 9999 */ public class Gongxiang { private static final Logger LOG = LoggerFactory.getLogger(Gongxiang.class); public static void main(String[] args) { try { StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Integer, String>> ds1 = fsEnv.fromElements(Tuple2.of(1, "男"), Tuple2.of(2, "女")); //自定义状态描述器 MapStateDescriptor<Integer, String> genderInfo= new MapStateDescriptor( "genderInfo", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.CHAR_TYPE_INFO ); //通过将ds1将自己进行广播 BroadcastStream<Tuple2<Integer,String>> bcStream=ds1.broadcast(genderInfo); SingleOutputStreamOperator<Tuple5<Integer, String, Integer, String, Double>> ds2 = fsEnv.socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple5<Integer, String, Integer, String, Double>>() { @Override public Tuple5<Integer, String, Integer, String, Double> map(String s) throws Exception { String[] strings=s.split(","); int id= Integer.valueOf(strings[0]);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。