赞
踩
在进行序列化前,flink已经把每个数据的类型存到了 field中,当进行序列化时么,对于复合类型,例如对于tuple,会依次获取每个元素的数据类型,然后进行相应的序列化。
flink把类型分为
BasicType
BasicArrayType
WritableType
TupleType
RowTypeInfo
MapType
PojoType
ListType
GenericType
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}
然后根据数据类型的不同,进入相对应的的Serializer中
2.BasicArrayType
在BasicArrayTypeInfo进入 createSerializer
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
// special case the string array
if (componentInfo.getTypeClass().equals(String.class)) {
return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
} else {
return (TypeSerializer<T>) new GenericArraySerializer<>(
this.componentInfo.getTypeClass(),
this.componentInfo.createSerializer(executionConfig));
}
}
如果不是String类型的数组,就进入 GenericArraySerializer ,其中的序列化程序为
public void serialize(C[] value, DataOutputView target) throws IOException {
target.writeInt(value.length);
for (int i = 0; i < value.length; i++) {
C val = value[i];
if (val == null) {
target.writeBoolean(false);
} else {
target.writeBoolean(true);
componentSerializer.serialize(val, target);
}
}
}
反序列化
public C[] deserialize(DataInputView source) throws IOException { int len = source.readInt(); C[] array = create(len); for (int i = 0; i < len; i++) { boolean isNonNull = source.readBoolean(); if (isNonNull) { array[i] = componentSerializer.deserialize(source); } else { array[i] = null; } } return array; }
3.WritableType
WritableTypeInfo 中的CreateSerializer
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return new WritableSerializer<T>(typeClass);
}
序列化与反序列化
@Override
public void serialize(T record, DataOutputView target) throws IOException {
record.write(target);
}
@Override
public T deserialize(DataInputView source) throws IOException {
return deserialize(createInstance(), source);
}
4.TupleType
TupleTypeInfo 中的createSerializer ,然后调用对应的TupleSerializer
public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) {
if (getTypeClass() == Tuple0.class) {
return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
}
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[getArity()];
for (int i = 0; i < types.length; i++) {
fieldSerializers[i] = types[i].createSerializer(executionConfig);
}
Class<T> tupleClass = getTypeClass();
return new TupleSerializer<T>(tupleClass, fieldSerializers);
}
序列化,反序列化
public void serialize(T value, DataOutputView target) throws IOException { for (int i = 0; i < arity; i++) { Object o = value.getField(i); try { fieldSerializers[i].serialize(o, target); } catch (NullPointerException npex) { throw new NullFieldException(i, npex); } } } public T deserialize(DataInputView source) throws IOException { T tuple = instantiateRaw(); for (int i = 0; i < arity; i++) { Object field = fieldSerializers[i].deserialize(source); tuple.setField(field, i); } return tuple; }
public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
int len = getArity();
TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
for (int i = 0; i < len; i++) {
fieldSerializers[i] = types[i].createSerializer(config);
}
return new RowSerializer(fieldSerializers);
}
序列化与反序列化
@Override public void serialize(Row record, DataOutputView target) throws IOException { int len = fieldSerializers.length; if (record.getArity() != len) { throw new RuntimeException("Row arity of from does not match serializers."); } // write a null mask writeNullMask(len, record, target); // serialize non-null fields for (int i = 0; i < len; i++) { Object o = record.getField(i); if (o != null) { fieldSerializers[i].serialize(o, target); } } } @Override public Row deserialize(DataInputView source) throws IOException { int len = fieldSerializers.length; Row result = new Row(len); // read null mask readIntoNullMask(len, source, nullMask); for (int i = 0; i < len; i++) { if (nullMask[i]) { result.setField(i, null); } else { result.setField(i, fieldSerializers[i].deserialize(source)); } } return result; }
@Override
public TypeSerializer<Map<K, V>> createSerializer(ExecutionConfig config) {
TypeSerializer<K> keyTypeSerializer = keyTypeInfo.createSerializer(config);
TypeSerializer<V> valueTypeSerializer = valueTypeInfo.createSerializer(config);
return new MapSerializer<>(keyTypeSerializer, valueTypeSerializer);
}
相对应的 map serializer 序列化与反序列化
@Override public void serialize(Map<K, V> map, DataOutputView target) throws IOException { final int size = map.size(); target.writeInt(size); for (Map.Entry<K, V> entry : map.entrySet()) { keySerializer.serialize(entry.getKey(), target); if (entry.getValue() == null) { target.writeBoolean(true); } else { target.writeBoolean(false); valueSerializer.serialize(entry.getValue(), target); } } } @Override public Map<K, V> deserialize(DataInputView source) throws IOException { final int size = source.readInt(); final Map<K, V> map = new HashMap<>(size); for (int i = 0; i < size; ++i) { K key = keySerializer.deserialize(source); boolean isNull = source.readBoolean(); V value = isNull ? null : valueSerializer.deserialize(source); map.put(key, value); } return map; }
7.Pojo 的解析:
获取数据类型的流程不变,在 privateGetForClass 的中获取到type的类为pojoType ,然后通过调用 analyzePojo来对tuple中的类型进行解析,主要解析 tuple中类型的代码如下:
List<PojoField> pojoFields = new ArrayList<PojoField>(); for (Field field : fields) { Type fieldType = field.getGenericType(); if(!isValidPojoField(field, clazz, typeHierarchy)) { LOG.info("Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. Please read the Flink documentation " + "on \"Data Types & Serialization\" for details of the effect on performance."); return null; } try { ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy); fieldTypeHierarchy.add(fieldType); TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type); pojoFields.add(new PojoField(field, ti)); } catch (InvalidTypesException e) { Class<?> genericClass = Object.class; if(isClassType(fieldType)) { genericClass = typeToClass(fieldType); } pojoFields.add(new PojoField(field, new GenericTypeInfo<OUT>((Class<OUT>) genericClass))); } }
最后 可获取到如下的结果,里面包含了每个元素的类型。
analyzePojo 解析 自定义的tuple
进入这个分支:
isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))
然后进入 createSubTypesInfo 函数进行解析里面包含的类型。主要的代码如下
TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length]; for (int i = 0; i < subtypes.length; i++) { final ArrayList<Type> subTypeHierarchy = new ArrayList<>(typeHierarchy); subTypeHierarchy.add(subtypes[i]); // sub type could not be determined with materializing // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt if (subtypes[i] instanceof TypeVariable<?>) { subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type); // variable could not be determined if (subTypesInfo[i] == null && !lenient) { throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '" + ((TypeVariable<?>) subtypes[i]).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. " + "The type extraction currently supports types with generic variables only in cases where " + "all variables in the return type can be deduced from the input type(s). " + "Otherwise the type has to be specified explicitly using type information."); } } else { // create the type information of the subtype or null/exception try { subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type); } catch (InvalidTypesException e) { if (lenient) { subTypesInfo[i] = null; } else { throw e; } } } }
tuple 为 poji类型的序列化
对于一些flink不能自己序列化的类型,flink会交给kryo处理,如果kryo不能处理的类型,则去调用
avro来处理,这样的操作:
ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
交给 kryo处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
自定义序列化器
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<?
extends Serializer<?>> serializerClass)
序列化的过程:
当得到 pojo的类型后,会进到 PojoTypeInfo 下的createSerializer
来判断 用什么方法来进行序列化,createPojoSerializer 为 自定义的
if (config.isForceKryoEnabled()) {
return new KryoSerializer<>(getTypeClass(), config);
}
if (config.isForceAvroEnabled()) {
return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
}
return createPojoSerializer(config);
}
创建 pojpSerializer 代码如下,下面代码就会调用,pojo pojoSerializer来进行序列化了
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
Field[] reflectiveFields = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
reflectiveFields[i] = fields[i].getField();
}
return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
pojo的序列化核心代码
public void serialize(T value, DataOutputView target) throws IOException { int flags = 0; // handle null values if (value == null) { flags |= IS_NULL; target.writeByte(flags); return; } Integer subclassTag = -1; Class<?> actualClass = value.getClass(); TypeSerializer subclassSerializer = null; if (clazz != actualClass) { subclassTag = registeredClasses.get(actualClass); if (subclassTag != null) { flags |= IS_TAGGED_SUBCLASS; subclassSerializer = registeredSerializers[subclassTag]; } else { flags |= IS_SUBCLASS; subclassSerializer = getSubclassSerializer(actualClass); } } else { flags |= NO_SUBCLASS; } target.writeByte(flags); // if its a registered subclass, write the class tag id, otherwise write the full classname if ((flags & IS_SUBCLASS) != 0) { target.writeUTF(actualClass.getName()); } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { target.writeByte(subclassTag); } // if its a subclass, use the corresponding subclass serializer, // otherwise serialize each field with our field serializers if ((flags & NO_SUBCLASS) != 0) { try { for (int i = 0; i < numFields; i++) { Object o = (fields[i] != null) ? fields[i].get(value) : null; if (o == null) { target.writeBoolean(true); // null field handling } else { target.writeBoolean(false); fieldSerializers[i].serialize(o, target); } } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { // subclass if (subclassSerializer != null) { subclassSerializer.serialize(value, target); } } }
反序列化的核心代码
public T deserialize(DataInputView source) throws IOException { int flags = source.readByte(); if((flags & IS_NULL) != 0) { return null; } T target; Class<?> actualSubclass = null; TypeSerializer subclassSerializer = null; if ((flags & IS_SUBCLASS) != 0) { String subclassName = source.readUTF(); try { actualSubclass = Class.forName(subclassName, true, cl); } catch (ClassNotFoundException e) { throw new RuntimeException("Cannot instantiate class.", e); } subclassSerializer = getSubclassSerializer(actualSubclass); target = (T) subclassSerializer.createInstance(); // also initialize fields for which the subclass serializer is not responsible initializeFields(target); } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { int subclassTag = source.readByte(); subclassSerializer = registeredSerializers[subclassTag]; target = (T) subclassSerializer.createInstance(); // also initialize fields for which the subclass serializer is not responsible initializeFields(target); } else { target = createInstance(); } if ((flags & NO_SUBCLASS) != 0) { try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); if (fields[i] != null) { if (isNull) { fields[i].set(target, null); } else { Object field = fieldSerializers[i].deserialize(source); fields[i].set(target, field); } } else if (!isNull) { // read and dump a pre-existing field value fieldSerializers[i].deserialize(source); } } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { if (subclassSerializer != null) { target = (T) subclassSerializer.deserialize(target, source); //调用下面的反序列化代码 } } return target; } public T deserialize(T reuse, DataInputView source) throws IOException { // handle null values int flags = source.readByte(); if((flags & IS_NULL) != 0) { return null; } Class<?> subclass = null; TypeSerializer subclassSerializer = null; if ((flags & IS_SUBCLASS) != 0) { String subclassName = source.readUTF(); try { subclass = Class.forName(subclassName, true, cl); } catch (ClassNotFoundException e) { throw new RuntimeException("Cannot instantiate class.", e); } subclassSerializer = getSubclassSerializer(subclass); if (reuse == null || subclass != reuse.getClass()) { // cannot reuse reuse = (T) subclassSerializer.createInstance(); // also initialize fields for which the subclass serializer is not responsible initializeFields(reuse); } } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { int subclassTag = source.readByte(); subclassSerializer = registeredSerializers[subclassTag]; if (reuse == null || ((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) { // cannot reuse reuse = (T) subclassSerializer.createInstance(); // also initialize fields for which the subclass serializer is not responsible initializeFields(reuse); } } else { if (reuse == null || clazz != reuse.getClass()) { reuse = createInstance(); } } if ((flags & NO_SUBCLASS) != 0) { try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); if (fields[i] != null) { if (isNull) { fields[i].set(reuse, null); } else { Object field; Object reuseField = fields[i].get(reuse); if (reuseField != null) { field = fieldSerializers[i].deserialize(reuseField, source); } else { field = fieldSerializers[i].deserialize(source); } fields[i].set(reuse, field); } } else if (!isNull) { // read and dump a pre-existing field value fieldSerializers[i].deserialize(source); } } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { if (subclassSerializer != null) { reuse = (T) subclassSerializer.deserialize(reuse, source); } } return reuse; }
8.List
在 ListTypeInfo 中
public TypeSerializer<List<T>> createSerializer(ExecutionConfig config) {
TypeSerializer<T> elementTypeSerializer = elementTypeInfo.createSerializer(config);
return new ListSerializer<>(elementTypeSerializer);
}
ListSerializer 的序列化和反序列化
@Override public void serialize(List<T> list, DataOutputView target) throws IOException { final int size = list.size(); target.writeInt(size); // We iterate here rather than accessing by index, because we cannot be sure that // the given list supports RandomAccess. // The Iterator should be stack allocated on new JVMs (due to escape analysis) for (T element : list) { elementSerializer.serialize(element, target); } } @Override public List<T> deserialize(DataInputView source) throws IOException { final int size = source.readInt(); // create new list with (size + 1) capacity to prevent expensive growth when a single element is added final List<T> list = new ArrayList<>(size + 1); for (int i = 0; i < size; i++) { list.add(elementSerializer.deserialize(source)); } return list; }
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if (config.hasGenericTypesDisabled()) {
throw new UnsupportedOperationException(
"Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() +
" is treated as a generic type.");
}
return new KryoSerializer<T>(this.typeClass, config);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。