赞
踩
Flink 支持任意的 Java 或是 Scala 类型。不需要像 Hadoop 一样去实现一个特定的接口(org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。
例如,下图中的 Person 类,是一个复合类型的一个 Pojo,在 Flink 内部是用 PojoTypeInfo 来表示,它继承自 TypeInformation,也即在 Flink 中用 TypeInformation 作为类型描述符来表示每一种要表示的数据类型
在 Flink 中每一个具体的类型都对应了一个具体的 TypeInformation 实现类,例如 BasicTypeInformation 中的 IntegerTypeInformation 和 FractionalTypeInformation 都具体的对应了一个 TypeInformation。然后还有 BasicArrayTypeInformation、CompositeType 以及一些其它类型。
每一个具体的数据类型都对应了一个 TypeInformation 的具体实现。
TypeInformation 是 Flink 类型系统的核心类。
例如,对于一个自定义的 Function 来说,需要一个类型信息来作为该函数的输入输出类型,即 TypeInfomation。该类型信息类作为一个工具来生成对应类型的序列化器 TypeSerializer,并用于执行语义检查,比如当一些字段在作为 joing 或 grouping 的键时,检查这些字段是否在该类型中存在。
每一个具体的数据类型都对应一个 TypeInformation 的具体实现,每一个 TypeInformation 都会为对应的具体数据类型提供一个专属的序列化器。
TypeInformation 提供一个 createSerialize() 方法,通过这个方法就可以得到该类型的序列化器 TypeSerializer,TypeSerializer 可以对该类型进行序列化与反序列化操作。
大多数数据类型 Flink 可以自动生成对应的序列化器。
但对于 GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
在 Flink 中,如果使用 POJO 数据类型需要遵循以下规则:
当用户定义的数据类型无法识别为 POJO 类型时,必须将其作为 GenericType 处理并使用 Kryo 进行序列化。
如果 Flink 内置的数据类型和序列化方式不能满足需求,Flink 的类型信息系统也支持拓展。只需要实现 TypeInformation、TypeSerializer 和 TypeComparator 即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。
下图是一个 Tuple3 的序列化过程。
序列化就是将数据结构或者对象转换成一个二进制串的过程,在 Java 里面可以简单地理解成一个 byte 数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。
上面的 Tuple 3 包含三个层面,一是 int 类型,一是 double 类型,还有一个是 Person。Person 包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。
MemorySegment
通常是用 TypeInformation.of() 方法来创建一个类型信息的对象。
PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
public T0 myfield0;
public T1 myfield1;
}
public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
@Override
public TypeInfomation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParmeters) {
return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
}
}
Flink 认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。
StreamExecutionEnvironment 和 ExecutionEnvironment 提供 registerType() 方法用来向 Flink 注册子类信息。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Env. registerType(typeClass);
在 registerType() 方法内部,会使用 TypeExtractor 来提取类型信息,获取到的类型信息属于 PojoTypeInfo 及其子类,那么需要将其注册到一起,否则统一交给 Kryo 去处理,Flink 并不过问(这种情况下性能会变差)。
对于 Flink 无法序列化的类型(例如用户自定义类型,没有 registerType,也没有自定义 TypeInfo 和 TypeInfoFactory),默认会交给 Kryo 处理,如果 Kryo 仍然无法处理(例如 Guava、Thrift、Protobuf 等第三方库的一些类),有两种解决方案:
env.getConfig().enableForceAvro();
env.getConfig().addDefaultKryoSerializer(clazz, serializer);
注:如果希望完全禁用 Kryo(100% 使用 Flink 的序列化机制),可以通过 Kryo-env.getConfig().disableGenericTypes() 的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。
Flink 的 Task 之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入 NetworkBufferPool,然后下层的 Task 读出之后再进行反序列化操作,最后进行逻辑处理。
为了使得记录以及事件能够被写入 Buffer,随后在消费时再从 Buffer 中读出,Flink 提供了数据记录序列化器(RecordSerializer)与反序列化器(RecordDeserializer)以及事件序列化器(EventSerializer)。
Function 发送的数据被封装成 SerializationDelegate,它将任意元素公开为 IOReadableWritable 以进行序列化,通过 setInstance() 来传入要序列化的数据。
以上内容是对 https://www.bilibili.com/video/av54080907/ 的学习总结。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。