当前位置:   article > 正文

一、Flink数据类型_flink 数据类型

flink 数据类型

 

  1. - 1.原生数据类型
  2. * Flink通过实现BasicTypeInfo数据类型,能够支持任意Java原生基本类型(装箱)或String类型,例如IntegerStringDouble
  1. DataSource<Integer> inputStream= environment.fromElements(1, 2, 3, 4, 5, 6);
  2. DataSource<String> inputStream= environment.fromElements("1", "2", "3", "4", "5", "6");
  1. - 2.Java Tuples类型
  2. * Flink通过定义TupleTypeInfo来标书Tuple类型数据
DataSource<Tuple2> inputStreamTuple = environment.fromElements(new Tuple2("fangpc", 1), new Tuple2("fangpengcheng", 2));
  1. - 3.POJOs类型
  2. * Flink通过PojoTypeInfo来描述任意的POJOs,包括Java和Scala类
  3. * POJOs类必须是Public修饰且必须独立定义,不能是内部类
  4. * POJOs类中必须含有默认构造器
  5. * POJOs类中所有的Fields必须是Public或者具有普Public修饰的getter和setter方法
  6. * POJOs类中的字段必须是Flink支持的
var personStream = environment.fromElements(new Person("fangpc", 24), new Person("fangpengcheng", 25));
  1. - 4. Flink Value类型
  2. * Value数据类型实现了org.apache.flink.types.Value,其中包括read()和write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。Flink提供的内建Value类型有IntValue、DoubleValue、StringValue等
  3. - 5. 特殊数据类型
  4. * Scala中的List、Map、Either、Option、Try数据类型
  5. * Java中Either
  6. * Hadoop的Writable数据类型

二、TypeInfomation信息获取

  1. - 1.Java API类型信息
  2. * 由于Java泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息
  3. * 如果Kryo序列化工具无法对POJOs类序列化时,可以使用Avro对POJOs类进行序列化
  4. *
environment.getConfig().enableForceAvro();

第四章 DataStream API介绍与使用

  1. . DataStream接口编程中的基本操作,包括定义数据源、数据转换、数据输出、操作拓展
  2. . Flink流式计算过程,对时间概念的区分和使用包括事件时间(Event Time)、注入时间(Ingestion Time)、处理时间(Process Time)
  3. . 流式计算中常见的窗口计算类型,如滚动窗口、滑动窗口、会话窗口
  4. . Flink任务优化

一、DataStream编程模型

  1. - DataStream API 主要分为三个部分,DataSource模块、Transformationmok、DataSink模块
  2. - DataSource模块主要定义了数据接入功能
  3. - Transformation模块定义了对DataStream数据集的各种转换操作,例如进行map、filter、windows等操作
  4. - DataSink模块将数据写出到外部存储介质,例如将数据输出到文件或Kafka消息中间件
  5. - Flink将数据源主要分为内置数据源、第三方数据源
  6. * 内置数据源包含文件、Socket网络端口、集合类型数据
  7. * Flink中定义了非常多的第三方数据源连接器,例如Apache kafa Connector、Elatic Search Connector等
  8. * 用户也可以自定义实现Flink中数据接入函数SourceFunction,并封装成第三方数据源Connector
  9. - 内置数据源
  10. * 文件数据源
  11. 1. 使用readTextFile直接读取文本文件
  12. 2. 使用readFile方法通过指定文件的InputFormat来读取指定类型的文件,比如CsvInputFormat,用户可以自定义InputFormat接口类
  1. var csvStream = environment.readFile(new CsvInputFormat<String>(new Path(inputPath)) {
  2. @Override
  3. protected String fillRecord(String reuse, Object[] parsedValues) {
  4. return null;
  5. }
  6. }, inputPath);
  1. - Socket数据源
  2. * StreamExecutionEnvironment调用socket-TextStream方法(参数为IP地址、端口号、字符串切割符delimiter、最大尝试次数maxRetry)
var socketDataStream = streamExecutionEnvironment.socketTextStream("localhost", 8080);
  1. - 集合数据源
  2. * Flink可以直接将Java集合类转换成DataStream数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中
  1. // 通过fromElements从元素集合中穿件创建DataStream数据集
  2. var dataStream = environment.fromElements(new Tuple2(1L, 2L), new Tuple2(2L, 3L));
  3. // 通过fromCollection从数组中创建DataStream数据集
  4. var collectionStream = environment.fromCollection(Arrays.asList("fangpc", "fang"));
  1. - 外部数据源
  2. * 数据源连接器
  3. 1. 部分连接器仅支持读取数据:如Twitter Streaming API、Netty等
  4. 2. 既支持数据输入也支持数据输出:Apache Kafka、Amazon Kinesis、RabbitMQ等连接器
  5. * Flink内部提供了常用的序列化协议的Schema,例如TypeInfomationSerializationSchema、JsonDeserializationSchema和AvroDeserializationSchema等
  6. * 以Kafka为例进行数据接入
  1. //maven 依赖
  2. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
  6. <version>1.9.0</version>
  7. </dependency>
  8. // 创建和使用Kafka的Connector
  9. // Properties参数定义
  10. Properties properties = new Properties();
  11. properties.setProperty("bootstrap.servers", "localhost:9092");
  12. properties.setProperty("zookeeper.connect", "localhost:2181");
  13. properties.setProperty("group.id", "test");
  14. DataStream<String> input = streamExecutionEnvironment
  15. .addSource(new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), properties));

二、DataStream转换操作

  1. * 从一个或多个DataStream生成新的DataStream的过程被称为Transformation操作
  2. * 将每种操作类型被定义为不同的Operator,Flink程序能够将多个Transformation组成一个DataFlow的拓扑。
  3. * DataStream的转换操作可以分为单Single-DataStream、Multi-DataStream、物理分区三类类型
  4. - Single-DataStream操作
  5. * (1). Map[DataStream -> DataStream],调用用户定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/828719
推荐阅读
相关标签
  

闽ICP备14008679号