当前位置:   article > 正文

【Flink】二、数据源:DataStreamSource_flink datastreamsource

flink datastreamsource

前言

本篇来聊聊flink支持哪些数据源。

1 集合

  • fromCollection:从集合或迭代器中获取数据流,集合或迭代器内元素类型相同
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<NetConnectEvent> eventStream = env.fromCollection(Arrays.asList(
  3. new NetConnectEvent("192.168.27.63","192.168.27.1",true),
  4. new NetConnectEvent("192.168.27.64","192.168.27.2",false)
  5. ));
  • fromElements:本质上还是fromCollection
  1. public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
  2. if (data.length == 0) {
  3. throw new IllegalArgumentException("fromElements needs at least one element as argument");
  4. } else {
  5. TypeInformation typeInfo;
  6. try {
  7. typeInfo = TypeExtractor.getForObject(data[0]);
  8. } catch (Exception var4) {
  9. throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", var4);
  10. }
  11. return this.fromCollection((Collection)Arrays.asList(data), (TypeInformation)typeInfo);
  12. }
  13. }
  • generateSequence:生成某个范围内随机数字(Long)序列
  1. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<Long> dataStreamSource = environment.generateSequence(1L,10L);

2 文件

  • readFile:读取文件,可自定义文件内容格式
  • readTextFile:本质上就是readFile,只是文件要满足TextInputFormat规范,按行读取返回字符串
  1. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  2. environment.readTextFile("D:\\test.txt")

3 套接字

  • socketTextStream:从socket中获取数据流,一般测试用,因为socket无状态无法保证可靠性且是单线程
  1. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  2. environment.socketTextStream("127.0.0.1",9999)

4 自定义

  • 实现SourceFunction

一般来说会直接扩展RichParallelSourceFunction来自定义数据源。

  1. class TestSource implements SourceFunction<String> {
  2. private volatile boolean isRunning = true;
  3. @Override
  4. public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
  5. while (isRunning) {
  6. ctx.collect("some string");
  7. Thread.sleep(1L);
  8. }
  9. }
  10. @Override
  11. public void cancel() {
  12. isRunning = false;
  13. }
  14. }
  • Kafka数据源:FlinkKafkaConsumer本质上也是实现了SourceFunction
  1. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  2. Properties properties = new Properties();
  3. properties.setProperty("bootstrap.servers", "10.20.178.243:9092");
  4. DataStreamSource<String> dataStreamSource = environment.addSource(new FlinkKafkaConsumer<>("flink",new SimpleStringSchema(), properties));
  5. dataStreamSource.print();
  6. environment.execute("KafkaDataSourceTest");

 

 

 


爱家人,爱生活,爱设计,爱编程,拥抱精彩人生!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/779532
推荐阅读
相关标签
  

闽ICP备14008679号