赞
踩
最近在学习了尚硅谷的Flink内核源码解析,内容很多,因此想要整理学习一下。Flink的版本是1.12.0
。
第二章就来从源码层面学习一下Flink的组件通信。
问题整理:
1. Flink组件之间是怎么通信的?
2. Flink中的RPC方法。
Flink 内部节点之间的通信是用 Akka,比如 JobManager 和 TaskManager 之间的通信。而 operator 之间的数据传输是利用 Netty。
RPC
:远程方法调用,一个统称,具体实现如下:
Akka 是一个开发并发、容错和可伸缩应用的框架。它是 Actor Model 的一个实现。
一个 Actors 网络如下所示:
每个 actor 有一个邮箱(mailbox
),它收到的消息存储在里面。另外,每一个 actor 维护自身单独的状态
。
每个 actor 是一个单一的线程,它不断地从其邮箱中 poll(拉取)消息,并且连续不断地处理。
一个 Actor 系统包含了所有存活的 actors。它提供的共享服务包括调度、配置和日志等。Actor 系统同时包含一个线程池,所有 actor 从这里获取线程。多个Actor系统可以在一台机器上共存 。
所有 Actors 都是继承来组织的。每个新创建的 actor 将其创建的 actor 视作父 actor。继承被用来监督。每个父 actor 对自己的子 actor 负责监督。
Flink 系统由 3 个分布式组件构成:JobClient,JobManager 和 TaskManager。组件之间需要通信。
Flink 尝试使用异步消息和通过 Futures
(用来获取异步的响应)来处理响应。Futures 和很少的几个阻塞调用有一个超时时间,以防操作失败。这是为了防止死锁,当消息丢失或者分布式组件 crash。但是,如果在一个大集群或者慢网络的情况下,超时可能会使得情况更糟。因此,操作的超时时间可以通过“akka.timeout.timeout”来配置。
Akka 的另一个特点是限制发送的最大消息大小。原因是它保留了同样数据大小的序列化 buffer 和不想浪费空间。如果你曾经遇到过传输失败,因为消息超过了最大大小,你可以增加“akka.framesize”配置来增加大小。
Akka 系统的核心 ActorSystem 和 Actor,若需构建一个 Akka 系统,首先需要创建ActorSystem,创建完 ActorSystem 后,可通过其创建 Actor(注意:Akka 不允许直接 new 一 个 Actor,只能通过 Akka 提供的某些 API 才能创建或查找 Actor,一般会通过ActorSystem#actorOf 和 ActorContext#actorOf 来创建 Actor),另外,我们只能通过 ActorRef(Actor 的引用,其对原生的 Actor 实例做了良好的封装,外界不能随意修改其内部状态)来与 Actor 进行通信。
// 1. 构建 ActorSystem
// 使用缺省配置
ActorSystem system = ActorSystem.create("sys");
// 也可显示指定 appsys 配置
// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));
// 2. 构建 Actor,获取该 Actor 的引用,即 ActorRef
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");
// 3. 给 helloActor 发送消息
helloActor.tell("hello helloActor", ActorRef.noSender());
// 4. 关闭 ActorSystem
system.terminate();
在 Akka 中,创建的每个 Actor 都有自己的路径,该路径遵循 ActorSystem 的层级结构,
大致如下:
1)本地路径
在上面代码中,本地 Actor 路径为 akka://sys/user/helloActor
含义如下:
2)远程路径
在上面代码中,远程 Actor 路径为 akka.tcp://sys@l27.0.0.1:2020/user/remoteActor
含义如下:
若需要与远端 Actor 通信,路径中必须提供 ip:port。
Akka 有两种核心的异步通信方式:tell 和 ask。
当使用 tell 方式时,表示仅仅使用异步方式给某个 Actor 发送消息,无需等待 Actor 的响应结果,并且也不会阻塞后续代码的运行,如:
helloActor.tell("hello helloActor", ActorRef.noSender());
其中:第一个参数为消息,它可以是任何可序列化的数据或对象,第二个参数表示发送者,通常来讲是另外一个 Actor 的引用, ActorRef.noSender()表示无发送者(实际上是一个叫做 deadLetters 的 Actor)。
当我们需要从 Actor 获取响应结果时,可使用 ask 方法,ask 方法会将返回结果包装在scala.concurrent.Future
中,然后通过异步回调获取返回结果。
Flink组件通讯过程
RPC(本地/远程)调用,底层是通过 Akka 提供的 tell/ask 方法进行通信。
Flink 的 RPC 协议通过 RpcGateway 来定义,主要定义通信行为
;用于远程调用RpcEndpoint 的某些方法,可以理解为对方的客服端代理
。
若想与远端 Actor 通信,则必须提供地址(ip 和 port),如在 Flink-on-Yarn 模式下,JobMaster 会先启动 ActorSystem,此时 TaskExecutor 的 Container 还未分配,后面与TaskExecutor 通信时,必须让其提供对应地址。
RpcEndpoint 是通信终端,提供 RPC 服务组件的生命周期管理(start、stop)
。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了 RpcGateway 接口:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
// 保存 rpcService 和 endpointId
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 通过 RpcService 启动 RpcServer
this.rpcServer = rpcService.startServer(this);
// 主线程执行器,所有调用在主线程中串行执行
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
在 RpcEndpoint 中还定义了一些方法如 runAsync(Runnable)、callAsync(Callable, Time)方法来执行 Rpc 调用,值得注意的是在 Flink 的设计中,对于同一个 Endpoint,所有的调用都运行在主线程,因此不会有并发问题
,当启动 RpcEndpoint/进行 Rpc 调用时,其会委托RcpServer 进行处理。
RpcService 和 RpcServer 是 RpcEndPoint 的成员变量
。
1)RpcService 是 Rpc 服务的接口,其主要作用如下:
在 Flink 中实现类为 AkkaRpcService,是 Akka 的 ActorSystem 的封装
,基本可以理解成 ActorSystem 的一个适配器。在 ClusterEntrypoint(JobMaster)和 TaskManagerRunner(TaskExecutor)启动的过程中初始化并启动。
AkkaRpcService 中封装了ActorSystem,并保存了ActorRef 到 RpcEndpoint的映射关系。RpcService 跟 RpcGateway 类似,也提供了获取地址和端口的方法。
在构造 RpcEndpoint 时会启动指定 rpcEndpoint 上的 RpcServer,其会根据 RpcEndpoint类型(FencedRpcEndpoint 或其他)来创建不同的 AkkaRpcActor(FencedAkkaRpcActor 或AkkaRpcActor),并将RpcEndpoint和AkkaRpcActor对应的ActorRef保存起来,AkkaRpcActor是底层 Akka 调用的实际接收者,RPC 的请求在客户端被封装成 RpcInvocation 对象,以 Akka消息的形式发送。
最终使用动态代理将所有的消息转发到 InvocationHandler
2)RpcServer 负责接收响应远端 RPC 消息请求,自身的代理对象
。有两个实现:
RpcServer 的启动是通知底层的 AkkaRpcActor 切换为 START 状态,开始处理远程调用请求
AkkaRpcActor 是 Akka 的具体实现,主要负责处理如下类型消息:
RPC 通信过程分为请求和响应。
1 RPC 请求发送
在 RpcService 中调用 connect()方法与对端的 RpcEndpoint(RpcServer)建立连接,connect()方法根据给的地址返回InvocationHandler(AkkaInvocationHandler 或 FencedAkkaInvocationHandler,也就是对方的代理)。
2 RPC 请求响应
RPC 消息通过 RpcEndpoint 所绑定的 Actor 的 ActorRef 发送的,AkkaRpcActor 是消息接收的入口,AkkaRpcActor 在 RpcEndpoint 中构造生成,负责将消息交给不同的方法进行处理。
接收的消息有 3 种:
1)握手消息
在客户端构造时会通过 ActorSelection 发送过来。收到消息后检查接口、版本是否匹配。
2)控制消息
在 RpcEndpoint 调用 start 方法后,会向自身发送一条 Processing.START 消息来转换当前 Actor 的状态为 STARTED,STOP 也类似,并且只有在 Actor 状态为 STARTED 时才会处理 RPC 请求。
3)RPC 消息
通过解析 RpcInvocation 获取方法名和参数类型,并从 RpcEndpoint 类中找到 Method 对象,通过反射调用该方法。如果有返回结果,会以 Akka 消息的形式发送回发送者。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。