赞
踩
最近在开发Flink任务过程当中发现了一个比较奇怪的问题,在Flink上多次提交任务后会出现一个OOM的异常,意思是Flink的内存已经满了,无法再提交新的任务。如下:
其中有这么一句:
If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak in user code or some of its dependencies which has to be investigated and fixed.
该异常确实是多次提交任务后出现的,不禁让人怀疑:
是不是每次提交任务Flink都会加载一些新的类,而在任务被取消时类并没有被顺利卸载,从而导致Flink的内存空间占用越来越大,最终出现OOM?
PS: Flink使用的版本是1.13.1,部署模式是Standalone-cluster,一个JobManager多个TaskManager。
为了验证这个怀疑,就不得不先了解Flink的类加载机制。
在Flink中,有两种类加载机制:
因为Flink是基于JVM的,所以先回顾下JVM的类加载机制
在JVM中,一个类加载的过程大致分为加载、链接(验证、准备、解析)、初始化5个阶段。而我们通常提到类的加载,就是指利用类加载器(ClassLoader)通过类的全限定名来获取定义此类的二进制字节码流,进而构造出类的定义。
如果一个类加载器要加载一个类,它首先不会自己尝试加载这个类,而是把加载的请求委托给父加载器完成,所有的类加载请求最终都应该传递给最顶层的启动类加载器。只有当父加载器无法加载到这个类时,子加载器才会尝试自己加载。
关于JVM提供的这几个类加载器,只用于加载Java指定核心目录下的类,这样做的好处就是避免了类被重复加载和核心类被篡改,因为父类加载器加载过的类 子类加载器就没有必要再加载一遍了。
相对于JVM的类加载机制来说,Flink的类加载“打破”了双亲委派一机制,而是允许用户自己定义类加载的规则。前面说到了,Flink对于类的加载提供了两种策略:child-first、parent-first,其实也对应了两种类加载器:ChildFirstClassLoader、ParentFirstClassLoader。
从Flink的类加载器继承关系图可以看到,Flink主要是使用FlinkUserCodeClassLoader来加载用户类,因为用户应用程序中的类是动态的,所以FlinkUserCodeClassLoader又继承了URLClassLoader,用来通过URL在用户Jar包内寻找全限定名对应的类来加载。
对于ParentFirstClassLoader,查看源码可以知道,这是一个继承了FlinkUserCodeClassLoader的空类:
/**
* Regular URLClassLoader that first loads from the parent and only after that from the URLs.
*/
public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {
ParentFirstClassLoader(
URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) {
super(urls, parent, classLoadingExceptionHandler);
}
static {
ClassLoader.registerAsParallelCapable();
}
}
也就是说,对于ParentFirstClassLoader,他的类加载机制与JVM的双亲委派模型是一致的,用户代码的类优先交给Flink的类加载器来加载,如果加载不到再由用户的类加载器来加载。但其实对于Flink来说,默认是使用child-first类加载机制,也就是ChildFirstClassLoader。
前面已经提到,JVM的双亲委派模型最大的好处就是保证了类不会被重复加载,并且保障了JVM运行环境的安全。但是对于Flink来说,Flink更像是一个平台,运行起来后支持创建多个任务,不同任务之间有很大可能会依赖不同的jar包甚至同一个jar包的不同版本,那么在如此错综复杂的jar包依赖情况下,如果把所有的jars都放在flink/lib目录,是极有可能出现jar包冲突等各种莫名其妙的问题。那么为了解决这一问题,Flink便使用child-first作为默认的类加载机制,这一机制不同于双亲委派模型,他允许用户应用程序中的类优先加载,也就是说,当用户提交应用程序时,Flink的ChildFirstClassLoader会优先加载应用程序中的类,这样保证了在多jars依赖的环境下用户的应用程序可以加载正确的类文件,但弊端也显而易见,有可能导致类被重复加载。其中核心的加载逻辑在loadClassWithoutExceptionHandling
方法中:
@Override protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve) throws ClassNotFoundException { // First, check if the class has already been loaded Class<?> c = findLoadedClass(name); if (c == null) { // check whether the class should go parent-first for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) { if (name.startsWith(alwaysParentFirstPattern)) { return super.loadClassWithoutExceptionHandling(name, resolve); } } try { // check the URLs c = findClass(name); } catch (ClassNotFoundException e) { // let URLClassLoader do it, which will eventually call the parent c = super.loadClassWithoutExceptionHandling(name, resolve); } } else if (resolve) { resolveClass(c); } return c; }
findLoadedClass(name)
方法检查类是否已经被加载过alwaysParentFirstPatterns
集合中的元素开头,如果是,则交给父类加载器来加载findClass(name)
方法尝试在用户代码中获取该类,如果出现异常,意味着类在用户代码中不存在,则尝试使用父类加载器来加载该类那了解到这里,大概清楚了Flink对于类加载的策略,但是又有了一个新的问题:
ChildFirstClassLoader明明在加载类之前会判断类是否已经被加载过,为什么还会出现疑似类被重复加载的问题呢?
其实,在ChildFirstClassLoader中,检查类是否已经加载的方法findLoadedClass(name)
是属于抽象类ClassLoader中的方法,而不管是实现抽象类ClassLoader,还是继承URLClassLoader类,它的父加载器都是AppClassLoader,因为不管调用哪个父类加载器,创建的对象都必须最终调用getSystemClassLoader()
作为父加载器,getSystemClassLoader()
方法获取到的正是AppClassLoader。而前面也提到过,AppClassLoader加载$CLASSPATH下的目录和Jar,也就是flink/lib目录下的Jars。那如果此时ChildFirstClassLoader正在加载的类所在的Jar只存在于应用程序中而不在flink/lib目录下,那么ChildFirstClassLoader在检查类是否已经存在时,也就相当于AppClassLoader在flink/lib目录下查找该类,结果肯定是查找不到的,所以对于只存在于用户应用程序Jar中的类来说,不管前面是否已经加载过该类,ChildFirstClassLoader总是会在应用程序提交时重新加载该类。
那么问题又来了:
既然每次提交任务都会重新加载一次类,那在任务结束的时候为什么类没有被卸载?
这是因为,类在ClassLoader存在的情况下不会被卸载,因为所有的类默认是ClassLoader加载的,默认情况下ClassLoader的生命周期与进程一致。所以当一个进程结束时,该进程中加载的类也会被卸载。也就是说在默认情况下,只有当Flink停止时那些重复加载的类才能被卸载。
其实Flink在官网就已经提到了,像JDBC这种驱动类的类,应该优先放到flink/lib目录下,这样能保证类只会被加载一次。官网地址
好,那理论基础已经有了,就来实际验证下⬇️
验证的思路也很简单,在本机上启动一个Flink,每一次提任务后分析当前Flink内存,分析内存可以使用jdk自带的jmap生成当前进程的Heap dump,然后使用Eclipse Memory Analyser来进行可视化分析。因为任务由TaskManager负责执行,所以分析TaskManager的进程就好了。
在flink目录下执行命令:./bin/start-cluster.sh
,看到如图后Flink便在本机启动好了
在浏览器访问http://localhost:8081/,提交任务
首先使用ps aux | grep taskmanager
命令得到正在运行的TaskManager的进程ID
其中48501便是TaskManager的进程ID,再使用jmap将堆内存的情况通过bin文件存储下来,使用命令:jmap -dump:live,format=b,file=flink_taskmanager_heap_1.bin 48501
,该命令会在当前路径下生成bin文件
下载Eclipse Memory Analyzer(也叫mat),官网地址,如果使用的是JDK11,那么直接下载最新发布的版本1.11.0(该版本只支持JDK11),如果是JDK1.8,那么下载1.10.0及以后的版本就可以。如果是Mac M1用户,只能下载1.10.0版本,在这版本之后的mat打开后就是一白板…
下载并启动后选择“File -> Open File…”,找到刚才生成bin文件的路径,打开文件
打开文件后选择第一项(泄漏嫌疑报告),点“Finish”
点击“Overview”,选择“Duplicate classes”
因为是第一次提交任务,所以用户代码中的类都是第一次加载,这里看不到任何重复的用户代码中的类
其中的java.lang.invoke.LambdaForm
是Java8中使用Lambda表达式用到的内部类,这可以不用管。
在第二次重新提交任务,并重新生成堆内存bin文件后,再使用mat查看,可以发现,有JDBC相关类已经被重复加载了。因为提交的任务中有使用到druid连接池来连接数据库,所以连接池相关类被重复加载。
再来重复一次
确实是重复提交n次任务,JDBC相关类就被重复加载n次,那如果把druid的Jar包放到flink/lib目录下是否就能保证JDBC相关类只被加载一次?来验证下
第一次提交任务,没有重复类出现,正常
第二次提交任务
奇怪的现象出现了,JDBC相关类确实没有被重复加载了,但是出来了一堆其他的类…其中的KafkaConnector相关类也被加载了2次,按理来说,Flink是推荐将这种Connector依赖包打到应用程序中,避免多任务的环境下产生版本冲突,那既然这样不还是会导致类被重复加载的问题?看来还是Flink研究的不细致。不过可以确定的是,相关Jars放到flink/lib目录下后确实能解决类被重复加载的问题。
Mark:第三方Connector的类重复加载问题?(unresolved)
有哪位大神知道的烦请指点指点
除了将Jar包放在flink/lib目录下之外,还需要确保对应的Jar包在用户的应用程序中不被打包进来,保证Flink只能从flink/lib目录下加载到类。但是这种方式存在局限性。
在这次使用到的Flink是使用Standalone集群模式部署的,这种部署模式优点就是独立运行,不依赖任何外部资源管理平台,但缺点也很明显,多个任务共享同一个Flink资源,且任务与任务之间可能会争夺资源。当Flink出现资源不足时,没法自动扩容,需要手动处理。就如本次遇到的问题,其中一个任务导致的OOM,使得后面其他任务无法再提交到Flink,需要重启Flink,而重启又影响到了正在运行的任务…
其实,Flink支持多种部署模式,其中比较常用的有三种:
显然,如果将部署模式改为Flink on YARN的Per-Job Mode或者Application Mode,那也不存在上述的OOM了,因为每次提交都是一个新的Flink集群。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。