赞
踩
楔子
笔者曾在数据平台的数据同步传输组工作过,其中一项重要工作就是将来自于不同数据源(包括但不限于binlog/oplog/用户行为日志) 采集处理传输到不同的数据汇(包括但不限于Kafka/Es/Mysql/HBase) 打造一个性能良好,运行稳定,用户友好的通用传输系统是完成这项任务的基础,以Binlog->Kafka为例:原有(2017.10)的系统是以阿里开源的Canal为基础进行改造
系统的稳定性、性能、易用性都有待提升
当时的笔者是个本番真鰯(现在也是,对改造多线程代码提升性能有些没有信心
Why Akka笔者是个Scala/Akka吹
对Actor模型有了解的同学一定知道Actor是通过消息传递来驱动的,这与数据传输系统具有着相似之处
数据传输系统的数据流向基本上来说是“单向”的
在合适的场景下,利用Akka编程以更低的心智负担获得更好的性能
Akka自带监督(Supervior)机制,在稳定性这方面有一定优势
基于此,笔者决定利用Akka来重写一套数据传输系统
设计思路与架构
设计的核心原则有几条:分治:尽可能地将任务拆分,粒度尽可能细,不同类型子任务尽量不要产生依赖,一种Actor尽量只持有一种职责
尽可能避免竞态条件(race condition): 使用Akka本身就可以很好地避免的同时,在设计上也要尽可能少地使用锁/信号量一类同步操作
尽量使消息流是单向的
资源生命周期和Actor生命周期的统一
谨慎对待阻塞操作
依据不同情况采取合理的失败处理策略,控制失败边界,"let it crash"后使重启尽可能优雅干净
micro batch思路
Actor设计
对于参与同步任务的每个组件,都被定义做worker,每个worker基本上持有一个职责
trait worker {
implicit val workerType: WorkerType
/*** 错位次数阈值*/
def errorCountThreshold: Int
/*** 错位次数*/
var errorCount: Int
/*** 错误次数超过重试次数时,返回true*/
def isCrashed: Boolean = errorCount >= errorCountThreshold
/*** 错误处理*/
def processError(e: Throwable, message: WorkerMessage)
}
每个Worker实际上是由Actor来“扮演”的
trait ActorPrototype extends Actor with ActorLogging {
/*** 任务信息管理器*/
def taskManager: TaskManager
/*** 同步任务id*/
def syncTaskId:String
}
一个真正的Worker实现需要混入ActorPrototype和Worker 两种trait
trait SourceDataSinkerPrototype[S <: sinkfunc extends actorprototype with sourcedatasinker>
/*** 任务信息管理器*/
def taskManager: TaskManager
/*** 资源管理器*/
def sinkManger: SinkManager[S]
/*** sink*/
protected lazy val sink = sinkManger.sink
/*** 同步任务id*/
def syncTaskId: String
/*** sinkFunc*/
def sinkFunc: S
/*** 编号**/
def num: Int
protected def handleSinkTask[I <: r i try>
protected def handleBatchSinkTask[I<:r>
override def preStart(): Unit = {
log.debug(s"init sinker$num,id:$syncTaskId")
sinkFunc.start
}
override def postStop(): Unit = {
log.debug(s"sinker$numprocessing postStop,id:$syncTaskId")
if (sinkFunc.isTerminated) sinkFunc.close
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.debug(s"sinker$numprocessing preRestart,id:$syncTaskId")
}
override def postRestart(reason: Throwable): Unit = {
log.debug(s"sinker$numprocessing preRestart,id:$syncTaskId")
super.postRestart(reason)
}
}
下面梳理整个处理流程,笔者借鉴了Flink的思路分为三个阶段: source->transformation->sink
对应3大类Actor:SourceFetcher => source
Mapper => transformation
Sinker => sink
此外还有:SyncController => 整个传输同步任务的控制单元,负责整个任务的生命周期,资源管理,Worker管理
Listener => 监听数据源
SnapshotCoordinator => 快照/恢复
BackPressureHandler => 反压
MetricsHandler => 监控相关
消息、命令与事件
Actor 内部的receive方法的特点(case 偏函数),笔者针对不同类型的Worker,对应的定义了不同类型的消息体
package object lifecycle {
//部分消息定义 sealed trait WorkerMessage{def msg: Any}
final case class SyncControllerMessage(override val msg: Any) extends WorkerMessage
final case class ListenerMessage(override val msg: Any) extends WorkerMessage
final case class SinkerMessage(override val msg: Any) extends WorkerMessage
final case class FetcherMessage(override val msg: Any) extends WorkerMessage
}
提前规范Actor之间的通信,Command尽可能预先集中定义好,每个command都有对应的Event,这个在开发量大的时候很有帮助,大大降低心智负担
sealed trait MysqlBinlogInOrderFetcherCommand
object MysqlBinlogInOrderFetcherCommand {
//部分command 定义 final case object MysqlBinlogInOrderFetcherStart extends MysqlBinlogInOrderFetcherCommand
final case object MysqlBinlogInOrderFetcherRestart extends MysqlBinlogInOrderFetcherCommand
final case object MysqlBinlogInOrderFetcherSuspend extends MysqlBinlogInOrderFetcherCommand
final case object MysqlBinlogInOrderFetcherBusy extends MysqlBinlogInOrderFetcherCommand
}
与外界交互
提供一层rest API 包含任务的生命周期操作,监控指标的查看等
开发、踩坑与优化
SourceFetcher的开发
开发SourceFetcher的朴素思路就是想把Canal从获取二进制流到LogEvent(转换成entry的动作交给Mapper来做), 研究过Canal代码后发现,初始化链接后的fetch()动作,事实上是个阻塞操作(时间点2017.10),如果只是简单的写成无限循环调用Fetch的话(我发誓我没黑Flink)
@tailrec
private def doFetch(): Unit = {
beforeFetch()
val messege = fetch()
downStream ! message
afterFetch()
doFetch()
}
这个Actor将一直循环下去,无法继续处理邮箱的消息,这与一开始的设计原则4和5相悖,而且极为危险,可能带来很多意外的情况(不如重启不能)
所以我转换了一下实现思路: 1. 将Fetcher拆分为FetcherManager 和DirectFetcher两部分,DirectFetcher只负责Fetch数据,而FetcherManager会处理剩下所有的消息 2. DirectFetcher初始化完链接,每次接到Fetch命令后出触发一次fetch,并在fetch前会向fetchManager 发送一条消息告知自己准备fetch并进入阻塞,完成后会发送一条Free
override def receive: Receive = {
case FetcherMessage(MysqlBinlogInOrderFetcherFetch) => Try(handleFetchTask(true))
.failed
.foreach(processError(_, FetcherMessage(MysqlBinlogInOrderFetcherFetch)))
}
//fetch的部分逻辑 //告訴上級處於拉取狀態 context.parent ! FetcherMessage(MysqlBinlogInOrderFetcherBusy)
doFetch()
context.parent ! FetcherMessage(MysqlBinlogInOrderFetcherFree)
对于数据量比较大的Source,可以一条Fetch命令,发起多次fetch操作(micro batch 思路)
Mapper的开发
Mapper 比较好理解,开辟尽可能多Mapper来处理fetcher 发送来的数据,压榨多核。对于Mysql Binlog数据来说,有多种分发策略: 1. RoundRobin 顾名思义 2. ByPrimaryKey 保证相同key的binlog可以按照binlog顺序进行处理 3. ByDatabaseAndTable 保证相同库表可以按照binlog顺序进行处理 4. Transactional 严格按照Transaction来处理
Sink的开发
对于每个Sinker,由三部分组成: Sinker,DirectSinker, BatchHolder 1. Sinker负责初始化自己的DirectSinker和BatchHolder 2. Sinker接收到数据后直接发送给BatchHolder 3. BatchHolder触发到时间阈值或者BatchSize大小后会触发Flush动作 4. 通过Batch 提升吞吐,通过时间阈值控制延迟不要过高 5. 针对不同类型的Sink介质,还可以做Sink特定的优化
SyncController的开发
Actor模型有一个缺点:消息的发送者和接收者其实是耦合的,消息的发送者要么知道接收者的地址(ActorSelection) 要么需要持有消息接收者的Ref Handler(ActorRef)。笔者通过SyncController来初始化其他所有的Actor,并根据消息传递的路径为每个Actor分配其需要的ActorRef,同时其他的Actor很容易地和Controller相互传递消息,Controller作为其他Actor的监督者,天然处理其他Actor的失败。
从资源和任务配置信息信息的角度看,Controller 负责所有资源和信息的生命周期管理。
监控与反压的设计开发
对于主流程的各个Actor,会定期向MetricHandler上报metric信息,其中对于处理条数,会一直计算出各个部件的处理速度(条/s)和处理总量,BackpressureHandler会根据以上指标计算并给予SourceFetcher一定的延迟拉取数据的信号,当压力下来时,BackpressureHandler会取消掉之前的延迟信号,达到自动处理反压的效果
快照/恢复的设计开发
在设计这部分的时候,实现思路主要是Chandy-Lamport算法,也就是Flink的异步快照思路,实现了Coordinator和CheckpointedFunction和CheckpointListner类似的东西,使用Actor处理消息,天然满足算法要求的FIFO,SnapshotCoordinator会定期下发Signal,从Source到Sink下推Barrier完成Snapshot,快照完成后把对应的位点保存到外部介质中(笔者使用的是ZK)。保证传输系统内部的Exactly-Once并不困难,但是其实考虑到外部世界(Source和Sink)实现全流程的Exactly-Once语义是件特别艰深的事情,不光对处理系统内部有要求,对Source和Sink都有要求,Source至少要能时空回溯;而Sink,幂等写和事务写至少支持一个,而当时(2017.10)使用的Kafka版本并不支持二者,所以只能退而求其次的实现At Least-Once了
结论总体来说选型Akka在这个场景是非常成功的,笔者大约花费了两周来完成第一个MVP版本,拉到线上实测性能是原来的10倍左右。对于一个20核的服务器,算上HyperThreading一共40核,很轻松地跑到了3300%CPU,而且sys的消耗并没有特别高,多核被充分压榨。
编程是没有银弹可言的,借用 @neo lin 的话来说 :绝大部分的程序,并不需要这个组合,但如果需要这个组合的场景,却又会无比契合,而且绝顶强大在编写Akka的时候其实很多时候并不需要操心多线程编程的很多问题,可以把大量的精力放在逻辑本身。可是毕竟Akka是架设在JVM上的,必要的时候可能还是要和其他的编程模型和手段进行组合。
Akka, worth a try!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。