当前位置:   article > 正文

Flink内核源码(二)组件通信_多个flink程序通信

多个flink程序通信

最近在学习了尚硅谷的Flink内核源码解析,内容很多,因此想要整理学习一下。Flink的版本是1.12.0

第二章就来从源码层面学习一下Flink的组件通信。

问题整理:
1. Flink组件之间是怎么通信的?
2. Flink中的RPC方法。

Flink 内部节点之间的通信是用 Akka,比如 JobManager 和 TaskManager 之间的通信。而 operator 之间的数据传输是利用 Netty。

RPC:远程方法调用,一个统称,具体实现如下:

  • Akka:组件间通信
  • Netty:网络间通信

1. Akka与Actor模型

Akka 是一个开发并发、容错和可伸缩应用的框架。它是 Actor Model 的一个实现。

一个 Actors 网络如下所示:
在这里插入图片描述

每个 actor 有一个邮箱(mailbox),它收到的消息存储在里面。另外,每一个 actor 维护自身单独的状态

每个 actor 是一个单一的线程,它不断地从其邮箱中 poll(拉取)消息,并且连续不断地处理。

1.1 Actor 系统

一个 Actor 系统包含了所有存活的 actors。它提供的共享服务包括调度、配置和日志等。Actor 系统同时包含一个线程池,所有 actor 从这里获取线程。多个Actor系统可以在一台机器上共存 。

所有 Actors 都是继承来组织的。每个新创建的 actor 将其创建的 actor 视作父 actor。继承被用来监督。每个父 actor 对自己的子 actor 负责监督

1.2 Flink中的Actors

Flink 系统由 3 个分布式组件构成:JobClient,JobManager 和 TaskManager。组件之间需要通信。
在这里插入图片描述

1.3 异步 vs 同步消息

Flink 尝试使用异步消息和通过 Futures用来获取异步的响应)来处理响应。Futures 和很少的几个阻塞调用有一个超时时间,以防操作失败。这是为了防止死锁,当消息丢失或者分布式组件 crash。但是,如果在一个大集群或者慢网络的情况下,超时可能会使得情况更糟。因此,操作的超时时间可以通过“akka.timeout.timeout”来配置。

Akka 的另一个特点是限制发送的最大消息大小。原因是它保留了同样数据大小的序列化 buffer 和不想浪费空间。如果你曾经遇到过传输失败,因为消息超过了最大大小,你可以增加“akka.framesize”配置来增加大小。

2. 使用Akka

Akka 系统的核心 ActorSystem 和 Actor,若需构建一个 Akka 系统,首先需要创建ActorSystem,创建完 ActorSystem 后,可通过其创建 Actor(注意:Akka 不允许直接 new 一 个 Actor,只能通过 Akka 提供的某些 API 才能创建或查找 Actor,一般会通过ActorSystem#actorOf 和 ActorContext#actorOf 来创建 Actor),另外,我们只能通过 ActorRef(Actor 的引用,其对原生的 Actor 实例做了良好的封装,外界不能随意修改其内部状态)来与 Actor 进行通信。

// 1. 构建 ActorSystem
// 使用缺省配置
ActorSystem system = ActorSystem.create("sys");
// 也可显示指定 appsys 配置
// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));
// 2. 构建 Actor,获取该 Actor 的引用,即 ActorRef
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");
// 3. 给 helloActor 发送消息
helloActor.tell("hello helloActor", ActorRef.noSender());
// 4. 关闭 ActorSystem
system.terminate();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
2.1 Actor 路径

在 Akka 中,创建的每个 Actor 都有自己的路径,该路径遵循 ActorSystem 的层级结构,
大致如下:
1)本地路径
在上面代码中,本地 Actor 路径为 akka://sys/user/helloActor
含义如下:

  • sys,创建的 ActorSystem 的名字;
  • user,通过 ActorSystem#actorOf 和ActorContext#actorOf 方法创建的 Actor 都属于/user 下,与/user 对应的是/system,
    其是系统层面创建的,与系统整体行为有关,在开发阶段并不需要对其过多关注
  • helloActor,我们创建的 HelloActor

2)远程路径
在上面代码中,远程 Actor 路径为 akka.tcp://sys@l27.0.0.1:2020/user/remoteActor
含义如下:

  • akka.tcp,远程通信方式为 tcp;
  • sys@127.0.0.1:2020,ActorSystem 名字及远程主机 ip 和端口号。
  • user,与本地的含义一样
  • remoteActor,创建的远程 Actor

若需要与远端 Actor 通信,路径中必须提供 ip:port。

3. 与 Actor 通信

Akka 有两种核心的异步通信方式:tell 和 ask。

3.1 tell方式

当使用 tell 方式时,表示仅仅使用异步方式给某个 Actor 发送消息,无需等待 Actor 的响应结果,并且也不会阻塞后续代码的运行,如:

helloActor.tell("hello helloActor", ActorRef.noSender());
  • 1

其中:第一个参数为消息,它可以是任何可序列化的数据或对象,第二个参数表示发送者,通常来讲是另外一个 Actor 的引用, ActorRef.noSender()表示无发送者(实际上是一个叫做 deadLetters 的 Actor)。

3.2 ask 方式

当我们需要从 Actor 获取响应结果时,可使用 ask 方法,ask 方法会将返回结果包装在scala.concurrent.Future 中,然后通过异步回调获取返回结果。

4. RPC

Flink组件通讯过程
在这里插入图片描述
RPC(本地/远程)调用,底层是通过 Akka 提供的 tell/ask 方法进行通信。

4.1 RpcGateway

Flink 的 RPC 协议通过 RpcGateway 来定义,主要定义通信行为;用于远程调用RpcEndpoint 的某些方法,可以理解为对方的客服端代理

若想与远端 Actor 通信,则必须提供地址(ip 和 port),如在 Flink-on-Yarn 模式下,JobMaster 会先启动 ActorSystem,此时 TaskExecutor 的 Container 还未分配,后面与TaskExecutor 通信时,必须让其提供对应地址。

在这里插入图片描述

4.2 RpcEndpoint

RpcEndpoint 是通信终端,提供 RPC 服务组件的生命周期管理(start、stop)。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了 RpcGateway 接口:

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
	// 保存 rpcService 和 endpointId
	this.rpcService = checkNotNull(rpcService, "rpcService");
	this.endpointId = checkNotNull(endpointId, "endpointId");
	// 通过 RpcService 启动 RpcServer
	this.rpcServer = rpcService.startServer(this);
	// 主线程执行器,所有调用在主线程中串行执行
	this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在 RpcEndpoint 中还定义了一些方法如 runAsync(Runnable)、callAsync(Callable, Time)方法来执行 Rpc 调用,值得注意的是在 Flink 的设计中,对于同一个 Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动 RpcEndpoint/进行 Rpc 调用时,其会委托RcpServer 进行处理。

4.3 RpcService 和 RpcServer

RpcService 和 RpcServer 是 RpcEndPoint 的成员变量

1)RpcService 是 Rpc 服务的接口,其主要作用如下:

  • 根据提供的 RpcEndpoint 来启动和停止 RpcServer(Actor);
  • 根据提供的地址连接到(对方的)RpcServer,并返回一个 RpcGateway;
  • 延迟/立刻调度Runnable、Callable;

在 Flink 中实现类为 AkkaRpcService,是 Akka 的 ActorSystem 的封装,基本可以理解成 ActorSystem 的一个适配器。在 ClusterEntrypoint(JobMaster)和 TaskManagerRunner(TaskExecutor)启动的过程中初始化并启动。

AkkaRpcService 中封装了ActorSystem,并保存了ActorRef 到 RpcEndpoint的映射关系。RpcService 跟 RpcGateway 类似,也提供了获取地址和端口的方法。

在构造 RpcEndpoint 时会启动指定 rpcEndpoint 上的 RpcServer,其会根据 RpcEndpoint类型(FencedRpcEndpoint 或其他)来创建不同的 AkkaRpcActor(FencedAkkaRpcActor 或AkkaRpcActor),并将RpcEndpoint和AkkaRpcActor对应的ActorRef保存起来,AkkaRpcActor是底层 Akka 调用的实际接收者,RPC 的请求在客户端被封装成 RpcInvocation 对象,以 Akka消息的形式发送。

最终使用动态代理将所有的消息转发到 InvocationHandler

2)RpcServer 负责接收响应远端 RPC 消息请求,自身的代理对象。有两个实现:

  • AkkaInvocationHandler
  • FencedAkkaInvocationHandler

RpcServer 的启动是通知底层的 AkkaRpcActor 切换为 START 状态,开始处理远程调用请求

4.4 AkkaRpcActor

AkkaRpcActor 是 Akka 的具体实现,主要负责处理如下类型消息:

  1. 本地 Rpc 调用 LocalRpcInvocation会指派给 RpcEndpoint
    进行处理,如果有响应结果,则将响应结果返还给 Sender。
  2. RunAsync &CallAsync这类消息带有可执行的代码,直接在 Actor 的线程中执行。
  3. 控制消息 ControlMessages用来控制 Actor 行为,START 启动,STOP 停止,停止后收到的消息会丢弃掉。
4.5 RPC 交互过程

RPC 通信过程分为请求和响应。

1 RPC 请求发送

在 RpcService 中调用 connect()方法与对端的 RpcEndpoint(RpcServer)建立连接,connect()方法根据给的地址返回InvocationHandler(AkkaInvocationHandler 或 FencedAkkaInvocationHandler,也就是对方的代理)。

2 RPC 请求响应

RPC 消息通过 RpcEndpoint 所绑定的 Actor 的 ActorRef 发送的,AkkaRpcActor 是消息接收的入口,AkkaRpcActor 在 RpcEndpoint 中构造生成,负责将消息交给不同的方法进行处理。

接收的消息有 3 种:

1)握手消息
在客户端构造时会通过 ActorSelection 发送过来。收到消息后检查接口、版本是否匹配。

2)控制消息
在 RpcEndpoint 调用 start 方法后,会向自身发送一条 Processing.START 消息来转换当前 Actor 的状态为 STARTED,STOP 也类似,并且只有在 Actor 状态为 STARTED 时才会处理 RPC 请求。

3)RPC 消息
通过解析 RpcInvocation 获取方法名和参数类型,并从 RpcEndpoint 类中找到 Method 对象,通过反射调用该方法。如果有返回结果,会以 Akka 消息的形式发送回发送者。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/633920
推荐阅读
相关标签
  

闽ICP备14008679号