赞
踩
之前提到一直在写一个数据挖掘工具包的小项目dami(该项目不再维护,并在未来删除),由于忙碌等各种原因进度很慢甚至停滞不前,索性把项目调整,以提高技术为目的,重新做成一个解决方案的平台,也就是这里要介绍了feluca;之后会将dami算法部分调整移动到这里。
feluca是JAVA实现的,目标就是搭建一个简易使用,简易编程的数据挖掘工具平台,并且包含分布式计算, 目标有点类似于weka和老hadoop。当然“简易“这两字只是我自己的愿望,未必符合大众标准。然而这个目标确实太大了,尤其只有我一个人而且是业余时间慢慢做,因此这个项目我也把他预期为一个精致的玩具就满足了。有兴趣的朋友欢迎一起来练手,项目在慢慢进行中,代码放在github上:https://github.com/lgnlgn/feluca。
架构
架构
接着之前说到的架构设计。架构设计其实很早就在心里画好了,当然草稿纸上也画过,之后为了巩固记忆在processon上画了几幅图。而在实现架构的编码过程却比想象中的难,想象太美好实际上难以在想象的时间内完成,可以说是功力不够或者经验不足。现在终于体会到一个项目最初要想做成一个完整可用的东西,是多么的难(这估计也是很多老板创业失败的一个直接原因,高估自己低估困难),在写博文的时间点,我已经决定不再继续把这个项目做下去了,只是做些收尾工作,但是文档会慢慢记录下来,作为一个过程的见证。
feluca包含4个部分:feluca-core [功能与分布式管理]、feluca-cargo [数据处理]、 feluca-sail [单机算法]、feluca-paddle [分布式模型与算法]。sail和paddle分别代表帆和桨,单机和分布式上取名上选择:"风越大越快;有桨就能划" VS "船帆只有一块;桨越多越快"。最后选了后者。feluca要交互可用,所以选择HTTP的REST的方式,这在core中实现。如果只想用单机算法(依赖cargo),那也可以支持嵌入式开发,也就是其中的数据处理和单机算法可以以jar的方式使用,这两个很大程度继承dami的代码。
feluca节点分为leader和worker,任务提交在leader,可以有单机任务和分布式任务。我只实现了基本的功能,而且限制一次只能进行一个单机和分布式任务。job管理也十分简单,主job生成之后,提交到后台线程,它会不断的与子任务探测结果。这里我代码级抽象了一下导致做了很多应该没什么意义的工作。leader和worker都持有一样的jobmanager, 当leader接受的是单机任务时候,需要转换成一到多个子任务在新线程或者process中完成;当leader接受的job是分布式时候,要首先生成很多子任务,这些子任务在worker那就变成"主"任务,worker也做一遍单机子任务那样的事。虽然看起来没问题,但是分布式任务leader的子任务和worker的主任务是一样的,而两种节点jobmanager也是一样,那么中间就得协调和判断。
feluca的分布式文件系统其实只有简单的逻辑,即数据再leader节点的硬盘上切割好,需要做分布式计算的时候,分发到worker节点硬盘上。分发方式是用ftp,即在leader开了ftp,分发其实是worker的拉取。feluca节点管理通过zookeeper来协调,在这只是简简单单的可见而已,当然zookeeper功能还很多,用来做跨进程协调很合适,我在分布式算法中用到了。
起初feluca设计成可以在及其普通的机器上能用的平台,支持单机和分布式。那么单机算法最少也是半边天了。但是单机算法其实好做,除了数据格式有自己的定义之外,算法只要公开了那就是一定能实现的,最多性能上有差异。而算法分布式化,对我来说才新鲜,需要在很多点上充分考虑计算机部件性能。想做得通用一点就要有一定抽象,我懂的算法不多,当然也抽不出太高层的,借鉴改进是最好的。一个算法能分布式或者并行化首先得算法本身支持,或者近似支持,只有在算法细节上分析出并行的地方,和必须同步交换的地方才能最大化进行分布式。mapreduce当然是一个很基础的范式,另外hadoop的计算本地化的思路也很有借鉴意义,计算是跟随数据一起的。
数据挖掘算法就是要从数据中抽取出能反映或表达数据的模型,模型通常远远小于数据,比如121212121212121212 的模型就可以表示成(12)^9 。数据挖掘侧面看就是压缩,当然压缩也没有统一标准,之前有博文提过。从空间角度看算法过程可以简单划分为 数据---(通过计算)-->模型。分布式的情况下,数据必然是需要切分的,模型也可能根据需要需要被切分到多个节点,计算通常空间需要不大则可以选择是依附在数据还是模型节点。mapreduce中 mapper和reducer中的计算是独立可并行的,map完成到reduce才实现了数据一次交换。对于很多算法来说粒度太大。对很多算法来说一次迭代可能需要多次或者不同粒度的数据交换,要想让算法飞起来,就得自己控制数据同步的方式和节奏。
因为切分了数据和模型,它俩肯定不在一个进程中。以LR为例,公式是:y= 1/(1+ e^[ w0x0 + w1x1 +... w9x9...]) 需要把x向量学习出来,学习保准是预测y和真实偏差最少,问题正好可以用SGD方式求解。算法模型是一个向量,基本过程是:每一个或者多个样本获取当前模型,根据样本特征和模型计算出error之后根据error计算出需要特征调整的偏移量从而更新模型。如果按计算在数据还是模型这边分:
计算依附数据方式,一次数据迭代:1.本地读取出数据,2.本地跨进程获取数据所需要的模型,3.本地计算出模型error和偏移量,4.本地跨进程更新模型偏移量。
耗时的部分只有4个,这样的方式简单直接;需要一次本地IO、一次CPU计算、两次跨进程通信,即RPC;但如果考虑模型占空间就会引出几个问题,1如果太大,每次迭代需要的通信的数据量是否可能过大,反而比数据还多?2 模型是否切分?如果不切分就是每个计算节点都一样大的模型只是每次同步不一样的数据,模型太大就不行了;3切分的话那么每次传输回来的小模型读取起来就要耗时了,例如特征的权重都是拿id作为下标,一旦需要压缩到小模型,id就需要进行hash等操作,而且需要进行不断垃圾回收;
这几个问题其实在实现上已经遇到了;首先我是按切分-压缩的方式来做;在SGDLR中每次迭代消耗看起来还行,还远比不上单机效率。但是这个方式在处理推荐的SVD时候就很慢了,估计是因为模型很大每次都要传输几十倍于向量的数据量导致的。这里严重挫败了我的积极性,思考之后,分析只能靠取消压缩来提高,但对于传输量没有任何办法,只能考虑另一种方式:计算依附在模型这边。
计算依附模型方式,一次数据迭代:1. 数据节点读取数据,2. 按照特征RPC分发到不同模型切片节点,3. 模型节点计算出部分结果RPC回原来的数据节点,4.数据节点收回所有结果合并得出error,并将偏差通知模型节点,5.模型节点根据error计算出模型需要更新的偏移量并更新。
这种方式与上一种最大不同就是,计算可以保证与模型在一起并且不需要hash等其他方式转换,因此计算效率保证了,还有个好处就是过程其实和流计算非常接近。这种方式就是RPC传输主要是数据,传数据恐怕更不科学了。这两种方式从原则上就没有优劣而是取决于数据与模型的关系,通常模型远小于数据但是对于特定不同计算来说未必通信的模型量未必会小。而且实现起来肯定比第一种要难,所以我一开始就考虑如何优化,后来想到了其实1 2 两步可以合并到一起,一开始就按特征切分好,以本地读取的方式进行,那么本地其实也可以和模型合在一起了。第3步 变成只需要合并所有切分集上的模型数据w0x0这些,也就是 任意计算节点在第4第5步必要的error需要知道全局的w * x 才算完整。 所以整个过程变得更简单了: 1. 计算节点读取本地只有部分特征的数据, 2 根据这些部分特征获取部分模型发送到中介节点,等待其合并好,还原完整结果回来。 3. 根据这个结果计算error 并计算自己的模型上的偏移量。
整个过程变得步骤更少,而实际上是利用了数据已经按特征切分的这点优势。所以这个方式比第一种占了便宜,也是一个重要区分,即计算依附于数据的,数据是水平切分;计算依附于模型的,数据是要纵向切分。纵向切分数据也有自己要考虑的地方,比如水平切分可以起任意的节点,而纵向的则不行,一旦切好就意味着模型也得这么切。不管哪种方式都没逃出现有计算框架的范畴,只是粒度和控制上交给自己了而已,所以流计算现在很火很成功,未来估计会更加。
我在实现了第二种方式的SGDLR看到比第一种方式效率更高一些,也思考过如何切分数据,但已经不打算继续做了,最多把SVD部分补上比较一下,以及在多台机器上感受一下有限内存下的并行化的算法是否真能如愿提升效率。项目没完成是一个遗憾,未来我将把时间用来进行一些代码整理和文档记录。在代码过程中因为不熟悉别人的轮子而自己造,感觉只有辛苦和挫败。比如cargo中数据读取本来可以简单用一些开源的序列化方式,但我为了讲究效率希望IO能与CPU计算叠加起来,就自己折腾了一套数据格式和生产者消费者实现。前者只有一两天的工作量,而后者得数十倍;再比如期间我还想用akka来实现这套系统,本来能用的zkclient想用curator替换;就跑去看了些文档浪费了时间缺没得到有价值的结果;还比如上面所说的Job管理。只是后来慢慢发现这样会拖死自己才开始抛弃太好的设想,分布式算法的实现部分开发效率就明显高了,这也是年轻经验不足的表现,必然要挖的坑吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。