赞
踩
Flink 以其独特的方式来处理数据类型及序列化,包括它自身的类型描述符、泛型类型提取以及类型序列化框架。
支持的数据类型
Java Tuples and Scala Case Classes
Java POJOs
Primitive Types
Regular Classes
Values
Hadoop Writables
Special Types
Tuples and Case Classes
元组是包含固定数量的具有各种类型的字段的复合类型,Java API 提供从 Tuple1 到 Tuple25 的类,元组的每个字段都可以是任意的 Flink 类型,包括元组,从而产生嵌套的元组;
元组的字段可以使用字段名称 tuple.f4 直接访问,也可以使用通用的 getter 方法 tuple.getField(int位置) 字段索引从0开始。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(value -> value.f0);
POJOs
如果 Java 和 Scala 类满足以下要求,Flink 会将它们视为特殊的 POJO 数据类型:
类必须是公开的
它必须有一个不带参数的公共构造函数(默认构造函数)
所有字段要么是公共的,要么必须可以通过 getter 和 setter 函数访问;对于名为 foo 的字段,getter 和 setter 方法必须命名为 getFoo() 和 setFoo()
字段的类型必须被已注册的序列化器支持
POJO 通常用 PojoTypeInfo 表示,并用 PojoSerializer 进行序列化(可以配置回退使用 Kryo 序列化器);除了 POJO 是 Avro 类型(Avro 特定记录)或作为 “Avro 反射类型” ,此时 POJO 由 AvroTypeInfo 表示,并使用 AvroSerializer 进行序列化;如果需要,还可以注册自定义的序列化程序。
Flink 可以分析 POJO 类型的结构,例如了解 POJO 的字段;POJO 类型比一般类型更易于使用,Flink 可以更有效地处理 POJO 类型。
可以通过 org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo() 测试[类是否符合 POJO 要求],如果想确保 POJO 的任何字段都不会使用 Kryo 进行序列化,可以使用用 assertSerializedAsPojoWithoutKryo。
测试所编写的类是否为 POJO 类型
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.19.0</version>
</dependency>
// 测试所编写的类是否为 POJO 类型
PojoTestUtils.assertSerializedAsPojo(WordWithCount.class);
// 确保 POJO 的任何字段都不会使用 Kryo 进行序列化
PojoTestUtils.assertSerializedAsPojoWithoutKryo(WordWithCount.class);
示例:带有两个公共字段的简单POJO。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy(value -> value.word);
Primitive Types
Flink 支持所有 Java 和 Scala 基本类型,如 Integer、String 和 Double。
General Class Types
Flink 支持大多数 Java 和 Scala 类(API和自定义),对于包含无法序列化字段的类,如文件指针、I/O流或其它本机资源,会受限制;遵循 JavaBeans 约定的类可以正常使用。
Flink 将所有未标识为 POJO 类型的类,作为通用类的类型进行处理,Flink 将这些数据类型视为黑匣子,无法访问其内容;一般类型使用序列化框架 Kryo 进行序列化和反序列化。
Values
值类型需要手动描述它们的序列化和反序列化器,它们不是通过通用的序列化框架,而是通过实现 org.apache.flink.type 自定义读取和写入的方法;
当通用序列化器效率很低时,使用 Value 类型是合理的;示例将元素的稀疏向量用数组实现,其中数组元素大部分为零,就可以对非零元素使用特殊编码,而通用序列化器只需写入所有数组元素。
org.apache.flink.CopyableValue 接口以类似的方式在内部支持手动克隆的逻辑。
Flink 提供了与基本数据类型相对应的预定义值类型(ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue)这些值类型充当基本数据类型的变体,它们的值可以更改,允许重用对象并减轻 GC 的压力。
Hadoop Writables
使用实现了 org.apache.hadoop 接口的类型,使用 write() 和 readFields() 方法定义序列化和反序列化逻辑。
Special Types
特殊的类型,包括 Scala 的 Either,Option 和 Try;Java API 有 Either 的自定义实现,表示两种可能类型的值。
类型擦除:Java 编译器在编译后会丢弃许多泛型的类型信息,因此在运行时,对象的实例不再知道其泛型类型,例如 DataStream<String> 和 DataStream<Long> 的实例在 JVM 中看起来是相同的。
Flink 在调用程序的 main 方法时需要知道类型信息,Flink Java API 试图重建丢弃的类型信息,并将其显式存储在数据集和 operator 中;可以通过 DataStream.getType() 检索类型,该方法返回 TypeInformation 的一个实例,这是 Flink 表示类型的内部方式。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 8888);
// String
System.out.println(source.getType());
类型推断有其局限性,有时需要手动指定数据的类型,例如从集合创建数据集 StreamExecutionEnvironment.fromCollection(),可以在其中传递描述类型的参数;像 MapFunction<I,O> 这样的通用函数有时也需要额外的类型信息。
Flink 试图推断出在分布式计算过程中交换和存储的数据类型信息,在大多数情况下,Flink 会推断出所有必要的信息,使用户无需关心序列化框架和无需注册数据类型。
Flink 对数据类型了解得越多,序列化方案就越好;这对于 Flink 中的内存使用模式非常重要(尽可能在堆内/外处理序列化数据,使序列化成本非常低);
当程序对 DataStream 进行调用时以及在调用 execute()、print()、count() 或 collect() 之前,需要有关数据类型的信息。
注册子类型:如果函数签名只描述超类型,但在执行过程中实际使用了超类型的子类型,那让 Flink 知道这些子类型会大大提高性能,需要在 StreamExecutionEnvironment 上为每个子类型调用.registerType(clazz)。
// 注册子类型[废弃]-PipelineOptions.SERIALIZATION_CONFIG[替代]
StreamExecutionEnvironment.registerType(String.class);
注册自定义序列化程序:Flink 会为无法处理的类型使用 Kryo 序列化器,如果 kryo 序列化器也不能处理,需要在StreamExecutionEnvironment 上调用 .getConfig().addDefaultKryoSerializer(clazz,serializer) 注册自定义的序列化器。
添加类型提示(TypeHints):当 Flink 无法推断出通用类型时,必须传递类型提示,通常只有在 Java API 中需要。
手动创建类型信息(TypeInformation):Java 的通用类型擦除会导致 Flink 无法推断数据类型。
TypeInformation 类是所有类型描述符的基类,它揭示了类型的一些基本属性,并可以为类型生成序列化器和比较器(Flink 中的比较器不仅定义顺序,还用于处理 keys)。
在内部,Flink 对类型区分如下:
基本类型:Java 基本类型及其装箱形式以及 void、String、Date、BigDecimal 和 BigInteger;
基本数组和对象数组
复合类型:
Flink 的 Java 元组(Flink Java API的一部分):最多25个字段,不支持空字段;
Scala case classes (包括 Scala 元组):不支持 null 字段;
Row:具有任意数量的字段和支持空字段的元组;
POJO:遵循特定 bean 模式的类;
辅助类型:Option,Either,Lists,Maps,…;
泛型类型:Flink 本身不会序列化这些类型,而是由 Kryo 进行序列化;
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许 “按名称” 字段引用):
该类是公共的和独立的(没有非静态内部类);
该类有一个公共的无参数构造函数;
类(和所有超类)中的所有非静态、非 transient 字段要么是公共的,要么有一个公共的 getter 和 setter 方法;
注意:当用户定义的数据类型无法识别为 POJO 类型时,必须将其处理为 GenericType 并使用 Kryo 进行序列化。
由于 Java 会擦除泛型类型信息,因此需要将类型传递给 TypeInformation:
对于非泛型类型,可以传递类:
TypeInformation<String> info = TypeInformation.of(String.class);
对于泛型类型,需要通过 TypeHint “捕获” 泛型类型信息:
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
在内部,创建了 TypeHint 的一个匿名子类,该子类捕获泛型信息以将其保留到运行时。
有两种方法可以创建 TypeSerializer:
对 TypeInformation 对象调用 typeInfo.createSerializer(config),config 参数的类型为 ExecutionConfig,包含有关程序注册的自定义序列化器的信息;尽量将正确的 ExecutionConfig 传递给程序,可以通过调用 getExecutionConfig() 从 DataStream 中获取它。
在函数(如RichMapFunction)内部使用 getRuntimeContext().createSerializer(typeInfo) 来获取它。
Java 会擦除通用类型信息;Flink 试图通过反射重建尽可能多的类型信息,使用 Java 保留的少数比特(主要是函数签名和子类信息);
函数的返回类型取决于其输入类型的情况以及一些简单的类型推断:
public class AppendOne<T> implements MapFunction<T, Tuple2<T, Long>> {
public Tuple2<T, Long> map(T value) {
return new Tuple2<T, Long>(value, 1L);
}
}
在某些情况下,Flink 无法重建所有泛型类型信息,此时用户必须提供类型提示。
Java API中的类型提示
在 Flink 无法重建擦除的通用类型信息时,Java API 提供所谓的类型提示,类型提示告诉系统函数生成的数据流或数据集的类型:
DataStream<SomeType> result = stream
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
return 语句指定生成的类型。
Java 8 Lambda的类型提取
Java 8 lambdas 的类型提取与非 lambdas 不同,因为 lambdas 与扩展函数接口的实现类无关。
Flink 试图使用 Java 的泛型签名来确定参数类型和返回类型;但并非所有编译器都会为 Lambda 生成这些签名,有时也需要手动指定数据类型。
POJO类型的序列化
PojoTypeInfo 为 POJO 中的所有字段创建序列化程序;int、long、String 等标准类型由 Flink 自带的序列化程序处理,对于其它类型使用 Kryo,如果 Kryo 无法处理该类型,可以要求 PojoTypeInfo 使用 Avro 序列化 POJO,调用方法如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
注意:Flink 会使用 Avro 序列化器自动序列化 Avro 生成的 POJO,如果希望 Kryo 序列化器处理整个 POJO 类型,配置如下
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
如果 Kryo 无法序列化 POJO,可以向 Kryo 添加自定义序列化程序。
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
禁用回退 Kryo
如果程序需要避免使用 Kryo 作为泛型类型的回退,确保通过 Flink 自己的序列化程序或通过用户自定义的自定义序列化程序有效地序列化所有类型。
注意:遇到要使用 Kryo 的数据类型时,以下设置会引发异常:
env.getConfig().disableGenericTypes();
类型信息工厂允许将用户定义的类型信息插入 Flink 类型系统,需要实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory 以返回自定义的类型信息。
如果相应的类型已经用 @org.apache.flink.api.common.typeinfo.TypeInfo 注解,则在类型提取阶段会调用工厂。
类型信息工厂可以在 Java 和 Scala API 中使用,在类型层次结构中,向上遍历时将选择最近的工厂,内置工厂具有最高优先级;工厂的优先级也高于 Flink 的内置类型。
示例:使用 Java 中的工厂对自定义类型 MyTuple 进行注解并为其提供自定义类型信息
@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
public T0 myfield0;
public T1 myfield1;
}
自定义类型信息的工厂
public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
@Override
public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
}
}
方法 createTypeInfo(Type,Map<String,TypeInformation<?>) 为工厂的目标类型创建类型信息;这些参数提供了有关类型本身的附加信息,以及类型的泛型类型参数。
如果类型包含需要从 Flink 函数的输入类型派生的泛型参数,请确保还实现了org.apache.Flink.api.common.typeinfo.TypeInformation#getGenericParameters 用于泛型参数到类型信息的双向映射。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。