当前位置:   article > 正文

关于在Flink上多次提交任务导致OOM_flink oom

flink oom

问题描述

最近在开发Flink任务过程当中发现了一个比较奇怪的问题,在Flink上多次提交任务后会出现一个OOM的异常,意思是Flink的内存已经满了,无法再提交新的任务。如下:
在这里插入图片描述
其中有这么一句:

If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak in user code or some of its dependencies which has to be investigated and fixed.
  • 1

该异常确实是多次提交任务后出现的,不禁让人怀疑:

是不是每次提交任务Flink都会加载一些新的类,而在任务被取消时类并没有被顺利卸载,从而导致Flink的内存空间占用越来越大,最终出现OOM?

PS: Flink使用的版本是1.13.1,部署模式是Standalone-cluster,一个JobManager多个TaskManager。
  • 1

为了验证这个怀疑,就不得不先了解Flink的类加载机制

Flink的类加载机制

在Flink中,有两种类加载机制:

  • child-first:Flink优先加载应用程序jar包中的类,如果没有则会尝试从Flink的lib目录下加载类
  • parent-first:Flink优先加载flink/lib目录下的类,如果没有则会尝试加载应用程序jar包中的类

因为Flink是基于JVM的,所以先回顾下JVM的类加载机制

JVM的类加载机制 - 双亲委派模型

在JVM中,一个类加载的过程大致分为加载、链接(验证、准备、解析)、初始化5个阶段。而我们通常提到类的加载,就是指利用类加载器(ClassLoader)通过类的全限定名来获取定义此类的二进制字节码流,进而构造出类的定义。
如果一个类加载器要加载一个类,它首先不会自己尝试加载这个类,而是把加载的请求委托给父加载器完成,所有的类加载请求最终都应该传递给最顶层的启动类加载器。只有当父加载器无法加载到这个类时,子加载器才会尝试自己加载。
在这里插入图片描述

  • BootStrap ClassLoader:主要加载JVM自身工作需要的类,如java.lang.*、java.uti.*等
  • Extension ClassLoader:加载位于$JAVA_HOME/jre/lib/ext目录下的扩展jar
  • App ClassLoader:父类是Extension ClassLoader,加载$CLASSPATH下的目录和jar

关于JVM提供的这几个类加载器,只用于加载Java指定核心目录下的类,这样做的好处就是避免了类被重复加载和核心类被篡改,因为父类加载器加载过的类 子类加载器就没有必要再加载一遍了。

Flink的类加载机制 - 自定义策略

相对于JVM的类加载机制来说,Flink的类加载“打破”了双亲委派一机制,而是允许用户自己定义类加载的规则。前面说到了,Flink对于类的加载提供了两种策略:child-firstparent-first,其实也对应了两种类加载器:ChildFirstClassLoaderParentFirstClassLoader
在这里插入图片描述
从Flink的类加载器继承关系图可以看到,Flink主要是使用FlinkUserCodeClassLoader来加载用户类,因为用户应用程序中的类是动态的,所以FlinkUserCodeClassLoader又继承了URLClassLoader,用来通过URL在用户Jar包内寻找全限定名对应的类来加载。

ParentFirstClassLoader

对于ParentFirstClassLoader,查看源码可以知道,这是一个继承了FlinkUserCodeClassLoader的空类:

/**
 * Regular URLClassLoader that first loads from the parent and only after that from the URLs.
 */
public static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {

    ParentFirstClassLoader(
            URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) {
        super(urls, parent, classLoadingExceptionHandler);
    }

    static {
        ClassLoader.registerAsParallelCapable();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

也就是说,对于ParentFirstClassLoader,他的类加载机制与JVM的双亲委派模型是一致的,用户代码的类优先交给Flink的类加载器来加载,如果加载不到再由用户的类加载器来加载。但其实对于Flink来说,默认是使用child-first类加载机制,也就是ChildFirstClassLoader

ChildFirstClassLoader

前面已经提到,JVM的双亲委派模型最大的好处就是保证了类不会被重复加载,并且保障了JVM运行环境的安全。但是对于Flink来说,Flink更像是一个平台,运行起来后支持创建多个任务,不同任务之间有很大可能会依赖不同的jar包甚至同一个jar包的不同版本,那么在如此错综复杂的jar包依赖情况下,如果把所有的jars都放在flink/lib目录,是极有可能出现jar包冲突等各种莫名其妙的问题。那么为了解决这一问题,Flink便使用child-first作为默认的类加载机制,这一机制不同于双亲委派模型,他允许用户应用程序中的类优先加载,也就是说,当用户提交应用程序时,Flink的ChildFirstClassLoader会优先加载应用程序中的类,这样保证了在多jars依赖的环境下用户的应用程序可以加载正确的类文件,但弊端也显而易见,有可能导致类被重复加载。其中核心的加载逻辑在loadClassWithoutExceptionHandling方法中:

@Override
protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)
        throws ClassNotFoundException {

    // First, check if the class has already been loaded
    Class<?> c = findLoadedClass(name);

    if (c == null) {
        // check whether the class should go parent-first
        for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
            if (name.startsWith(alwaysParentFirstPattern)) {
                return super.loadClassWithoutExceptionHandling(name, resolve);
            }
        }

        try {
            // check the URLs
            c = findClass(name);
        } catch (ClassNotFoundException e) {
            // let URLClassLoader do it, which will eventually call the parent
            c = super.loadClassWithoutExceptionHandling(name, resolve);
        }
    } else if (resolve) {
        resolveClass(c);
    }

    return c;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 先调用findLoadedClass(name)方法检查类是否已经被加载过
  • 如果类没有被加载过,那么会先检查类是否以alwaysParentFirstPatterns集合中的元素开头,如果是,则交给父类加载器来加载
  • 如果不是,则调用findClass(name)方法尝试在用户代码中获取该类,如果出现异常,意味着类在用户代码中不存在,则尝试使用父类加载器来加载该类
  • 如果resolve参数为true,则调用resolveClass方法
  • 返回该类

回到问题

那了解到这里,大概清楚了Flink对于类加载的策略,但是又有了一个新的问题:

ChildFirstClassLoader明明在加载类之前会判断类是否已经被加载过,为什么还会出现疑似类被重复加载的问题呢?

其实,在ChildFirstClassLoader中,检查类是否已经加载的方法findLoadedClass(name)是属于抽象类ClassLoader中的方法,而不管是实现抽象类ClassLoader,还是继承URLClassLoader类,它的父加载器都是AppClassLoader,因为不管调用哪个父类加载器,创建的对象都必须最终调用getSystemClassLoader()作为父加载器,getSystemClassLoader()方法获取到的正是AppClassLoader。而前面也提到过,AppClassLoader加载$CLASSPATH下的目录和Jar,也就是flink/lib目录下的Jars。那如果此时ChildFirstClassLoader正在加载的类所在的Jar只存在于应用程序中而不在flink/lib目录下,那么ChildFirstClassLoader在检查类是否已经存在时,也就相当于AppClassLoader在flink/lib目录下查找该类,结果肯定是查找不到的,所以对于只存在于用户应用程序Jar中的类来说,不管前面是否已经加载过该类,ChildFirstClassLoader总是会在应用程序提交时重新加载该类。
那么问题又来了:

既然每次提交任务都会重新加载一次类,那在任务结束的时候为什么类没有被卸载?

这是因为,类在ClassLoader存在的情况下不会被卸载,因为所有的类默认是ClassLoader加载的,默认情况下ClassLoader的生命周期与进程一致。所以当一个进程结束时,该进程中加载的类也会被卸载。也就是说在默认情况下,只有当Flink停止时那些重复加载的类才能被卸载。
其实Flink在官网就已经提到了,像JDBC这种驱动类的类,应该优先放到flink/lib目录下,这样能保证类只会被加载一次。官网地址

好,那理论基础已经有了,就来实际验证下⬇️

验证过程

验证的思路也很简单,在本机上启动一个Flink,每一次提任务后分析当前Flink内存,分析内存可以使用jdk自带的jmap生成当前进程的Heap dump,然后使用Eclipse Memory Analyser来进行可视化分析。因为任务由TaskManager负责执行,所以分析TaskManager的进程就好了。

1. 运行Flink,并提交任务

在flink目录下执行命令:./bin/start-cluster.sh,看到如图后Flink便在本机启动好了
在这里插入图片描述
在浏览器访问http://localhost:8081/,提交任务
在这里插入图片描述

2. 生成Heap dump文件

首先使用ps aux | grep taskmanager命令得到正在运行的TaskManager的进程ID
在这里插入图片描述
其中48501便是TaskManager的进程ID,再使用jmap将堆内存的情况通过bin文件存储下来,使用命令:jmap -dump:live,format=b,file=flink_taskmanager_heap_1.bin 48501,该命令会在当前路径下生成bin文件
在这里插入图片描述

3. 使用Eclipse Memory Analyzer(mat)分析堆内存

下载Eclipse Memory Analyzer(也叫mat),官网地址,如果使用的是JDK11,那么直接下载最新发布的版本1.11.0(该版本只支持JDK11),如果是JDK1.8,那么下载1.10.0及以后的版本就可以。如果是Mac M1用户,只能下载1.10.0版本,在这版本之后的mat打开后就是一白板…
下载并启动后选择“File -> Open File…”,找到刚才生成bin文件的路径,打开文件
在这里插入图片描述
打开文件后选择第一项(泄漏嫌疑报告),点“Finish”
在这里插入图片描述
点击“Overview”,选择“Duplicate classes”
在这里插入图片描述
在这里插入图片描述
因为是第一次提交任务,所以用户代码中的类都是第一次加载,这里看不到任何重复的用户代码中的类
其中的java.lang.invoke.LambdaForm是Java8中使用Lambda表达式用到的内部类,这可以不用管。

4. 取消任务并重新提交任务,重复步骤2、3

在第二次重新提交任务,并重新生成堆内存bin文件后,再使用mat查看,可以发现,有JDBC相关类已经被重复加载了。因为提交的任务中有使用到druid连接池来连接数据库,所以连接池相关类被重复加载。
在这里插入图片描述
再来重复一次
在这里插入图片描述
确实是重复提交n次任务,JDBC相关类就被重复加载n次,那如果把druid的Jar包放到flink/lib目录下是否就能保证JDBC相关类只被加载一次?来验证下

5. 将Druid和JDBC相关依赖包放到flink/lib目录下

在这里插入图片描述

6. 重复步骤1、2、3

第一次提交任务,没有重复类出现,正常在这里插入图片描述
第二次提交任务在这里插入图片描述
奇怪的现象出现了,JDBC相关类确实没有被重复加载了,但是出来了一堆其他的类…其中的KafkaConnector相关类也被加载了2次,按理来说,Flink是推荐将这种Connector依赖包打到应用程序中,避免多任务的环境下产生版本冲突,那既然这样不还是会导致类被重复加载的问题?看来还是Flink研究的不细致。不过可以确定的是,相关Jars放到flink/lib目录下后确实能解决类被重复加载的问题。

Mark:第三方Connector的类重复加载问题?(unresolved)
有哪位大神知道的烦请指点指点

解决办法

1. 将Jars放到flink/lib目录下

除了将Jar包放在flink/lib目录下之外,还需要确保对应的Jar包在用户的应用程序中不被打包进来,保证Flink只能从flink/lib目录下加载到类。但是这种方式存在局限性。

  • 第一,比如前面提到的Jar包的版本问题,如果其他的应用程序也需要用到同一个依赖,但是版本不同呢?那可能就需要用户更改应用程序代码,如果是一个多任务的场景,这可能是个比较头疼的问题。
  • 第二,将Jar包提到flink/lib目录下之后,需要重启整个Flink才能使得Jar包生效,那么随着业务的发展,可能在开发过程中不断的有Jars需要被放到flink/lib目录下,那每添加一次Jars就意味着重启一次Flink,这样对正在运行的线上任务也有很大的影响。
    In fact,这里说到的问题只针对于Flink的Standalone部署模式,或者Session的运行模式下的。如果使用YARN来部署,其实上面的问题都能得到解决。

2. 更改Flink的部署模式和运行模式

在这次使用到的Flink是使用Standalone集群模式部署的,这种部署模式优点就是独立运行,不依赖任何外部资源管理平台,但缺点也很明显,多个任务共享同一个Flink资源,且任务与任务之间可能会争夺资源。当Flink出现资源不足时,没法自动扩容,需要手动处理。就如本次遇到的问题,其中一个任务导致的OOM,使得后面其他任务无法再提交到Flink,需要重启Flink,而重启又影响到了正在运行的任务…
其实,Flink支持多种部署模式,其中比较常用的有三种:

  • Local模式
    这种模式没啥好说的,本地模式,压缩包下载下来解压,执行./bin/start-cluster.sh就在本机拉起来了,通过默认的8081端口可以访问到。
  • Standalone-cluster模式
    这种模式也就是目前正在使用的模式,优点就是部署方便,缺点就是拓展性差,比较适用于开发测试和作业很少的场景下。
  • Flink on YARN模式
    在这种模式下Flink的资源由YARN来进行管理,Flink服务被提交到YARN的ResourceManager后,YARN的NodeManager会为Flink生成对应的容器,Flink再将JobManager和TaskManager实例部署到容器中。在这种情况下Flink可以通过JobManager所需要的slots数量来动态的调整TaskManager的资源,达到了资源的可拓展性。Flink官方也推荐正式的生产环境使用这种部署模式。
    在YARN上,又分为三种部署模式:
    • Session Mode
      共享JobManager和TaskManager,所有提交的任务都在一个集群中运行,集群的生命周期独立于任务,任务的开始、结束不影响集群的生命周期。类似于上面的Standalone-cluster模式,任务与任务之间不隔离,共享同一套资源。
    • Per-Job Mode
      为每个任务创建单独的JobManager和TaskManager集群,每个任务之间互相隔离互不干扰,集群的生命周期随着任务的生命周期结束而结束。这种模式的优点就是任务独占一个集群,资源的隔离性好。
    • Application Mode
      一个Application可以存在多个任务,这时YARN为每个Application创建集群,Application中的任务共享该集群,资源的隔离是Application级别的,集群的生命周期随着Application的生命周期结束。这种模式更像是Session Mode和Pre-Job Mode的折中方案,既做到了资源的隔离,又提高了任务之间资源的利用率。

显然,如果将部署模式改为Flink on YARN的Per-Job Mode或者Application Mode,那也不存在上述的OOM了,因为每次提交都是一个新的Flink集群。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/696295
推荐阅读
相关标签
  

闽ICP备14008679号