赞
踩
- - 1.原生数据类型
- * Flink通过实现BasicTypeInfo数据类型,能够支持任意Java原生基本类型(装箱)或String类型,例如Integer、String、Double等
- DataSource<Integer> inputStream= environment.fromElements(1, 2, 3, 4, 5, 6);
-
- DataSource<String> inputStream= environment.fromElements("1", "2", "3", "4", "5", "6");
- - 2.Java Tuples类型
- * Flink通过定义TupleTypeInfo来标书Tuple类型数据
DataSource<Tuple2> inputStreamTuple = environment.fromElements(new Tuple2("fangpc", 1), new Tuple2("fangpengcheng", 2));
- - 3.POJOs类型
- * Flink通过PojoTypeInfo来描述任意的POJOs,包括Java和Scala类
- * POJOs类必须是Public修饰且必须独立定义,不能是内部类
- * POJOs类中必须含有默认构造器
- * POJOs类中所有的Fields必须是Public或者具有普Public修饰的getter和setter方法
- * POJOs类中的字段必须是Flink支持的
var personStream = environment.fromElements(new Person("fangpc", 24), new Person("fangpengcheng", 25));
- - 4. Flink Value类型
- * Value数据类型实现了org.apache.flink.types.Value,其中包括read()和write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。Flink提供的内建Value类型有IntValue、DoubleValue、StringValue等
-
- - 5. 特殊数据类型
- * Scala中的List、Map、Either、Option、Try数据类型
- * Java中Either
- * Hadoop的Writable数据类型
- - 1.Java API类型信息
- * 由于Java泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息
- * 如果Kryo序列化工具无法对POJOs类序列化时,可以使用Avro对POJOs类进行序列化
- *
environment.getConfig().enableForceAvro();
- . DataStream接口编程中的基本操作,包括定义数据源、数据转换、数据输出、操作拓展
- . Flink流式计算过程,对时间概念的区分和使用包括事件时间(Event Time)、注入时间(Ingestion Time)、处理时间(Process Time)
- . 流式计算中常见的窗口计算类型,如滚动窗口、滑动窗口、会话窗口
- . Flink任务优化
- - DataStream API 主要分为三个部分,DataSource模块、Transformationmok、DataSink模块
- - DataSource模块主要定义了数据接入功能
- - Transformation模块定义了对DataStream数据集的各种转换操作,例如进行map、filter、windows等操作
- - DataSink模块将数据写出到外部存储介质,例如将数据输出到文件或Kafka消息中间件
-
- - Flink将数据源主要分为内置数据源、第三方数据源
- * 内置数据源包含文件、Socket网络端口、集合类型数据
- * Flink中定义了非常多的第三方数据源连接器,例如Apache kafa Connector、Elatic Search Connector等
- * 用户也可以自定义实现Flink中数据接入函数SourceFunction,并封装成第三方数据源Connector
-
- - 内置数据源
- * 文件数据源
- 1. 使用readTextFile直接读取文本文件
- 2. 使用readFile方法通过指定文件的InputFormat来读取指定类型的文件,比如CsvInputFormat,用户可以自定义InputFormat接口类
- var csvStream = environment.readFile(new CsvInputFormat<String>(new Path(inputPath)) {
- @Override
- protected String fillRecord(String reuse, Object[] parsedValues) {
- return null;
- }
- }, inputPath);
- - Socket数据源
- * StreamExecutionEnvironment调用socket-TextStream方法(参数为IP地址、端口号、字符串切割符delimiter、最大尝试次数maxRetry)
var socketDataStream = streamExecutionEnvironment.socketTextStream("localhost", 8080);
- - 集合数据源
- * Flink可以直接将Java集合类转换成DataStream数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中
- // 通过fromElements从元素集合中穿件创建DataStream数据集
- var dataStream = environment.fromElements(new Tuple2(1L, 2L), new Tuple2(2L, 3L));
-
- // 通过fromCollection从数组中创建DataStream数据集
- var collectionStream = environment.fromCollection(Arrays.asList("fangpc", "fang"));
- - 外部数据源
- * 数据源连接器
- 1. 部分连接器仅支持读取数据:如Twitter Streaming API、Netty等
- 2. 既支持数据输入也支持数据输出:Apache Kafka、Amazon Kinesis、RabbitMQ等连接器
- * Flink内部提供了常用的序列化协议的Schema,例如TypeInfomationSerializationSchema、JsonDeserializationSchema和AvroDeserializationSchema等
- * 以Kafka为例进行数据接入
-
- //maven 依赖
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
- <version>1.9.0</version>
- </dependency>
-
- // 创建和使用Kafka的Connector
- // Properties参数定义
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "localhost:9092");
- properties.setProperty("zookeeper.connect", "localhost:2181");
- properties.setProperty("group.id", "test");
- DataStream<String> input = streamExecutionEnvironment
- .addSource(new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), properties));
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- * 从一个或多个DataStream生成新的DataStream的过程被称为Transformation操作
- * 将每种操作类型被定义为不同的Operator,Flink程序能够将多个Transformation组成一个DataFlow的拓扑。
- * DataStream的转换操作可以分为单Single-DataStream、Multi-DataStream、物理分区三类类型
-
- - Single-DataStream操作
- * (1). Map[DataStream -> DataStream],调用用户定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。