当前位置:   article > 正文

从一例状态引发的性能问题谈Flink状态序列化

flink serialize map

前言

好久不见(鞠躬

最近处在转型期,每天忙到飞起,关注具体技术细节的精力自然就比较少了(上一篇许下的周更承诺也食言了 = =)。上周帮助他人快速解决了一个因误用Flink状态类型引发的性能问题,在这里做个quick notes,并简要介绍一下Flink状态序列化方面的基础知识。

问题及排查

上游部门同事反馈,一个计算逻辑并不复杂的多流join DataStream API作业频繁发生消费积压、checkpoint失败(现场截图已丢失)。作业拓扑如下图所示。

为了脱敏所以缩得很小 = =

按大状态作业的pattern对集群参数进行调优,未果。

通过Flink Web UI定位到问题点位于拓扑中倒数第二个算子,部分sub-task checkpoint总是过不去。观察Metrics面板,发现有少量数据倾斜,而上下游反压度量值全部为0。

经过持续观察,存在倾斜的sub-task数据量最多只比其他sub-task多出10%~15%,按照常理不应引起如此严重的性能问题。遂找到对应的TaskManager pod打印火焰图,结果如下。

可见RocksDB状态读写的耗时极长,大部分时间花在了Kryo序列化上,说明状态内存储了Flink序列化框架原生不支持的对象。直接让相关研发同学show me the code,真相大白:

  1. private transient MapState<String, HashSet<String>> state1;
  2. private transient MapState<String, HashSet<String>> state2;
  3. private transient ValueState<Map<String, String>> state3;

Flink序列化框架内并没有针对HashSet的序列化器,自然会fallback到Kryo。即使这些Set并不算大,状态操作的开销也会急剧上升。当然,ValueState<Map<String, String>>用法也是错误的,应改成MapState<String, String>

最快的临时解决方法很简单:把所有状态内用到的HashSet全部改成Map<String, Boolean>,同样可以去重。虽然并不优雅,但因为有了原生MapSerializer支持,效率大幅提升。下面简要介绍Flink的状态序列化。

TypeSerializer

在我们创建状态句柄所需的描述符StateDescriptor时,要指定状态数据的类型,如:

  1. ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("myState", Integer.class);
  2. ValueState<Integer> state = this.getRuntimeContext().getState(stateDesc);

与此同时,也就指定了对应数据类型的Serializer。我们知道,TypeSerializer是Flink Runtime序列化机制的底层抽象,状态数据的序列化也不例外。以处理Map类型的MapSerializer为例,代码如下,比较清晰。

  1. @Internal
  2. public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
  3. private static final long serialVersionUID = -6885593032367050078L;
  4. /** The serializer for the keys in the map */
  5. private final TypeSerializer<K> keySerializer;
  6. /** The serializer for the values in the map */
  7. private final TypeSerializer<V> valueSerializer;
  8. /**
  9. * Creates a map serializer that uses the given serializers to serialize the key-value pairs in
  10. * the map.
  11. *
  12. * @param keySerializer The serializer for the keys in the map
  13. * @param valueSerializer The serializer for the values in the map
  14. */
  15. public MapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
  16. this.keySerializer =
  17. Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
  18. this.valueSerializer =
  19. Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
  20. }
  21. // ------------------------------------------------------------------------
  22. // MapSerializer specific properties
  23. // ------------------------------------------------------------------------
  24. public TypeSerializer<K> getKeySerializer() {
  25. return keySerializer;
  26. }
  27. public TypeSerializer<V> getValueSerializer() {
  28. return valueSerializer;
  29. }
  30. // ------------------------------------------------------------------------
  31. // Type Serializer implementation
  32. // ------------------------------------------------------------------------
  33. @Override
  34. public boolean isImmutableType() {
  35. return false;
  36. }
  37. @Override
  38. public TypeSerializer<Map<K, V>> duplicate() {
  39. TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
  40. TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
  41. return (duplicateKeySerializer == keySerializer)
  42. && (duplicateValueSerializer == valueSerializer)
  43. ? this
  44. : new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
  45. }
  46. @Override
  47. public Map<K, V> createInstance() {
  48. return new HashMap<>();
  49. }
  50. @Override
  51. public Map<K, V> copy(Map<K, V> from) {
  52. Map<K, V> newMap = new HashMap<>(from.size());
  53. for (Map.Entry<K, V> entry : from.entrySet()) {
  54. K newKey = keySerializer.copy(entry.getKey());
  55. V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());
  56. newMap.put(newKey, newValue);
  57. }
  58. return newMap;
  59. }
  60. @Override
  61. public Map<K, V> copy(Map<K, V> from, Map<K, V> reuse) {
  62. return copy(from);
  63. }
  64. @Override
  65. public int getLength() {
  66. return -1; // var length
  67. }
  68. @Override
  69. public void serialize(Map<K, V> map, DataOutputView target) throws IOException {
  70. final int size = map.size();
  71. target.writeInt(size);
  72. for (Map.Entry<K, V> entry : map.entrySet()) {
  73. keySerializer.serialize(entry.getKey(), target);
  74. if (entry.getValue() == null) {
  75. target.writeBoolean(true);
  76. } else {
  77. target.writeBoolean(false);
  78. valueSerializer.serialize(entry.getValue(), target);
  79. }
  80. }
  81. }
  82. @Override
  83. public Map<K, V> deserialize(DataInputView source) throws IOException {
  84. final int size = source.readInt();
  85. final Map<K, V> map = new HashMap<>(size);
  86. for (int i = 0; i < size; ++i) {
  87. K key = keySerializer.deserialize(source);
  88. boolean isNull = source.readBoolean();
  89. V value = isNull ? null : valueSerializer.deserialize(source);
  90. map.put(key, value);
  91. }
  92. return map;
  93. }
  94. @Override
  95. public Map<K, V> deserialize(Map<K, V> reuse, DataInputView source) throws IOException {
  96. return deserialize(source);
  97. }
  98. @Override
  99. public void copy(DataInputView source, DataOutputView target) throws IOException {
  100. final int size = source.readInt();
  101. target.writeInt(size);
  102. for (int i = 0; i < size; ++i) {
  103. keySerializer.copy(source, target);
  104. boolean isNull = source.readBoolean();
  105. target.writeBoolean(isNull);
  106. if (!isNull) {
  107. valueSerializer.copy(source, target);
  108. }
  109. }
  110. }
  111. @Override
  112. public boolean equals(Object obj) {
  113. return obj == this
  114. || (obj != null
  115. && obj.getClass() == getClass()
  116. && keySerializer.equals(((MapSerializer<?, ?>) obj).getKeySerializer())
  117. && valueSerializer.equals(
  118. ((MapSerializer<?, ?>) obj).getValueSerializer()));
  119. }
  120. @Override
  121. public int hashCode() {
  122. return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
  123. }
  124. // --------------------------------------------------------------------------------------------
  125. // Serializer configuration snapshotting
  126. // --------------------------------------------------------------------------------------------
  127. @Override
  128. public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
  129. return new MapSerializerSnapshot<>(this);
  130. }
  131. }

总结:

  • 序列化和反序列化本质上都是对MemorySegment的操作,通过DataOutputView写出二进制数据,通过DataInputView读入二进制数据;
  • 对于复合数据类型,也应嵌套定义并调用内部元素类型的TypeSerializer
  • 必须要有对应的TypeSerializerSnapshot。该组件定义了TypeSerializer本身及其所包含的元数据(即state schema)的序列化方式,这些信息会存储在快照中。可见,通过TypeSerializerSnapshot可以判断状态恢复时数据的兼容性,是Flink实现state schema evolution特性的关键所在。

TypeSerializerSnapshot

TypeSerializerSnapshot接口有以下几个重要的方法。注释写得很清晰,不再废话了(实际是因为懒而且累 = =

  1. /**
  2. * Returns the version of the current snapshot's written binary format.
  3. *
  4. * @return the version of the current snapshot's written binary format.
  5. */
  6. int getCurrentVersion();
  7. /**
  8. * Writes the serializer snapshot to the provided {@link DataOutputView}. The current version of
  9. * the written serializer snapshot's binary format is specified by the {@link
  10. * #getCurrentVersion()} method.
  11. *
  12. * @param out the {@link DataOutputView} to write the snapshot to.
  13. * @throws IOException Thrown if the snapshot data could not be written.
  14. * @see #writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)
  15. */
  16. void writeSnapshot(DataOutputView out) throws IOException;
  17. /**
  18. * Reads the serializer snapshot from the provided {@link DataInputView}. The version of the
  19. * binary format that the serializer snapshot was written with is provided. This version can be
  20. * used to determine how the serializer snapshot should be read.
  21. *
  22. * @param readVersion version of the serializer snapshot's written binary format
  23. * @param in the {@link DataInputView} to read the snapshot from.
  24. * @param userCodeClassLoader the user code classloader
  25. * @throws IOException Thrown if the snapshot data could be read or parsed.
  26. * @see #readVersionedSnapshot(DataInputView, ClassLoader)
  27. */
  28. void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
  29. throws IOException;
  30. /**
  31. * Recreates a serializer instance from this snapshot. The returned serializer can be safely
  32. * used to read data written by the prior serializer (i.e., the serializer that created this
  33. * snapshot).
  34. *
  35. * @return a serializer instance restored from this serializer snapshot.
  36. */
  37. TypeSerializer<T> restoreSerializer();
  38. /**
  39. * Checks a new serializer's compatibility to read data written by the prior serializer.
  40. *
  41. * <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
  42. * format of the data in the checkpoint/savepoint is compatible for the format of the serializer
  43. * used by the program that restores the checkpoint/savepoint. The outcome can be that the
  44. * serialization format is compatible, that the program's serializer needs to reconfigure itself
  45. * (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible),
  46. * that the format is outright incompatible, or that a migration needed. In the latter case, the
  47. * TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring
  48. * program's serializer re-serializes the data, thus converting the format during the restore
  49. * operation.
  50. *
  51. * @param newSerializer the new serializer to check.
  52. * @return the serializer compatibility result.
  53. */
  54. TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
  55. TypeSerializer<T> newSerializer);

特别注意,在状态恢复时,state schema的兼容性判断结果TypeSerializerSchemaCompatibility有4种:

  • COMPATIBLE_AS_IS:兼容,可以直接使用新Serializer;
  • COMPATIBLE_AFTER_MIGRATION:兼容,但需要用快照中的旧Serializer反序列化一遍数据,再将数据用新Serializer重新序列化。最常见的场景如状态POJO中增加或删除字段,详情可以参考PojoSerializerSnapshot类的相关代码;
  • COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:兼容,但需要将新Serializer重新配置之后再使用。此类场景不太常见,举例如状态POJO的类继承关系发生变化;
  • INCOMPATIBLE:不兼容,无法恢复。例如,更改POJO中的一个简单类型字段的type(e.g. String → Integer),由于负责处理简单数据类型的SimpleTypeSerializerSnapshot不支持此类更改,就会抛出异常:
  1. @Override
  2. public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
  3. TypeSerializer<T> newSerializer) {
  4. return newSerializer.getClass() == serializerSupplier.get().getClass()
  5. ? TypeSerializerSchemaCompatibility.compatibleAsIs()
  6. : TypeSerializerSchemaCompatibility.incompatible();
  7. }

显然,对于复合类型(如List、Map),需要先判断外部容器Serializer的兼容性,再判断嵌套Serializer的兼容性。详情可以参考Flink内部专门为此定义的CompositeTypeSerializerSnapshot抽象类,该类比较复杂,在此按下不表。

The End

在一些特殊的场景下,我们需要自定义Serializers来实现更好的状态序列化(例如用RoaringBitmap代替Set在状态中进行高效的去重),今天时间已经很晚,暂时不给出具体实现了。关于自定义状态序列化器的更多细节,请看官参见官方文档<<Custom Serialization for Managed State>>一章。

晚安晚安。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/767622
推荐阅读
相关标签
  

闽ICP备14008679号