赞
踩
本篇来聊聊flink支持哪些数据源。
fromCollection:从集合或迭代器中获取数据流,集合或迭代器内元素类型相同
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<NetConnectEvent> eventStream = env.fromCollection(Arrays.asList(
- new NetConnectEvent("192.168.27.63","192.168.27.1",true),
- new NetConnectEvent("192.168.27.64","192.168.27.2",false)
- ));
fromElements:本质上还是fromCollection
- public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
- if (data.length == 0) {
- throw new IllegalArgumentException("fromElements needs at least one element as argument");
- } else {
- TypeInformation typeInfo;
- try {
- typeInfo = TypeExtractor.getForObject(data[0]);
- } catch (Exception var4) {
- throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", var4);
- }
-
- return this.fromCollection((Collection)Arrays.asList(data), (TypeInformation)typeInfo);
- }
- }
- StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Long> dataStreamSource = environment.generateSequence(1L,10L);
readFile:读取文件,可自定义文件内容格式
readTextFile:本质上就是readFile,只是文件要满足TextInputFormat规范,按行读取返回字符串
- StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- environment.readTextFile("D:\\test.txt")
socketTextStream:从socket中获取数据流,一般测试用,因为socket无状态无法保证可靠性且是单线程
- StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- environment.socketTextStream("127.0.0.1",9999)
一般来说会直接扩展RichParallelSourceFunction来自定义数据源。
- class TestSource implements SourceFunction<String> {
-
- private volatile boolean isRunning = true;
-
- @Override
- public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
- while (isRunning) {
- ctx.collect("some string");
- Thread.sleep(1L);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
- StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "10.20.178.243:9092");
- DataStreamSource<String> dataStreamSource = environment.addSource(new FlinkKafkaConsumer<>("flink",new SimpleStringSchema(), properties));
- dataStreamSource.print();
- environment.execute("KafkaDataSourceTest");
爱家人,爱生活,爱设计,爱编程,拥抱精彩人生!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。