赞
踩
理解图计算的编程,有两个要素:
(1)构图
(2)消息发送
下面以spark的GraphX为例演示如何构图以及依托图结构进行消息发送操作以及编写简单的图算法
GraphX是什么:GraphX是spark的一个图计算框架。spark的核心概念是RDD,RDD是一个不可变的,分区的,可以并行操作的数据集。RDD和一般的scala和java上的数据集的区别是一般数据集只是存在于单机,而RDD可以分布在集群中的多个机器。用parallelize方法可以很容易地将scala中的数据集转换成RDD:
//假设我们有一个Seqval list=Seq(1,2,3)val rdd=sc.parallelize(list)
其中sc是环境中构建的spark Context。这里list是一个scala上的数据集,位于单机。rdd就是一个RDD,他是一个分布式的数据集,可以分布在集群中的多个节点并行地进行操作。
在实际用GraphX构图的时候,我们往往不是用parallelize方法把一个scala数据集转换成RDD,而是直接从hdfs上的文件或者其他数据源读到RDD来构图。
利用GraphX构图:构造GraphX的图需要提供节点RDD和边RDD。节点以Long作索引,边通过这个Long的索引指定这个边关联哪些节点。
下面以一个简单的图为例讲下怎么构图:
这个图可以由下面的方法构造:
// 假设spark context已经创建好。这里不再赘述val sc: SparkContext// 创建节点RDDval users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("张三", "A大学")), (7L, ("李四", "B研究院")), (5L, ("王五", "C公司")), (2L, ("赵六", "C公司"))))// 创建边RDDval relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "指导"), Edge(5L, 3L, "资助"), Edge(2L, 5L, "领导"), Edge(5L, 7L, "资助")))// 定义缺省节点val defaultUser = ("人名未知", "单位未知")// 构图val graph = Graph(users, relationships, defaultUser)
在实际中,我们一般不会直接用Graph的构造函数,而是用Graph的fromEdges方法先从Edge RDD构图, 然后再用outerJoinVertices把节点属性插到图上,构造新的图。比如上述例子中可以这么构图:
val graph=Graph.fromEdges(relationships,defaultValue=1)//通过outerJoinVertice将节点属性插上去,v来自于graph的节点属性,d来自于users,有可能有值有可能没有val final_graph=graph.outerJoinVertices(users){ (vid,v,d)=>d.getOrElse(("人名未知", "单位未知")) }
注意这里outerJoinVertices实际上就是一个Join操作,Join的key是节点的id,也就是上面括号里的vid。另外需要注意的是边是有向的,如果要表达无向关系,一般是通过正向反向两条边表示。
很多时候我们并不关心节点的属性,只关心节点之间的关系,那么outerJoinVertices这一步也省略了,直接调用graph的mapVertices方法,对节点rdd进行并行操作,比如说计算连通分支的时候将节点属性初始化为节点id,代码如下:
/*lambda的输入是两个,一个是节点id,一个是原先的节点属性,这里通过mapVertice操作将节点属性变成节点id*/graph.mapVertice(case (vid,v)=>vid)
这样节点的属性就是节点的序号。
利用GraphX进行消息发送:发送消息的函数是aggregateMessages。这个方法的参数如下(以下代码段摘自文档[1]):
class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg]}
需要提供三个参数:发送消息的函数sendMsg,指定多条消息如何合并的函数mergeMsg。参数tripletFields指定我们需要EdgeContext的哪些属性(作为对GraphX的提示,减少join的负载),设置成All就是选取全部属性。
sendMsg传入的参数是一个EdgeContext,指定当前这条边的信息,他是包含边的起点id,起点属性,终点id,终点属性,边属性的一个view。这些信息都是用来构造要发送的信息的。
另外,EdgeContext还有两个发送消息的方法sendToSrc和sendToDst,将构造的消息发送到这条边的起点和终点。这两个方法都调用,那么就是把消息发送到边的两个节点。如果只调用sendToSrc,就是只发送消息到这条边的起点,sendToDst同理。
mergeMsg就是一个合并函数,指定如何把两个Msg类型的对象合并成一个Msg类型的对象的方法,如果是两个double类型的数,这个方法可以是加减乘除等等,如果两个对象是集合类型,这个方法可以是取交集或者取并集。
整个aggregateMessages调用后得到的是一个节点RDD,把这个RDD用outerJoinVertices插到原图中,就得到发送消息之后的新图。
一个简单的例子,假设我们需要计算图中节点的度,可以通过aggregateMessages实现:
//使用aggregateMessages方法,沿着边传播1然后在节点处相加,就得到节点的度val vertexDegree=g.aggregateMessages( triplet => { triplet.sendToDst(1) triplet.sendToSrc(1) }, (a, b) => a+b // 合并消息的函数)
vertexDegree就是节点的度
举例:
HITS算法的实现(以sparkling graph的实现为例)
代码路径:
https://github.com/sparkling-graph/sparkling-graph/blob/master/operators/src/main/scala/ml/sparkling/graph/operators/measures/vertex/hits/Hits.scala
初始化:首先,使用mapVertices初始化节点的hub值和authority值为1除以节点的个数
Authority值的计算:使用aggregateMessages方法,每个节点把自己的hub值发送到自己指向的节点(context.sendToDst(sourceHub)),并在收到hub值的节点进行汇总( (a,b)=>a+b ) ,作为节点的authority值。注意这里也向指向自己的点发一个0,目的是确保起点也出现在aggregateMessages方法的返回值中(如果某个节点没有收到消息是不会出现的)
归一化使用的分母是所有authority值里面最大的那个(normAuths)。用outerJoinVertices把authority值插回图中,如果节点没收到消息就置0
Hub值的计算:同样的,使用aggregateMessages方法,每个节点把自己的authority值发送到指向自己的节点(context.sendToSrc(destinationAuth)),并在收到hub值的节点进行汇总( (a,b)=>a+b ) ,作为节点的hub值。和前一步同理,也往自己指向的节点发一个0.
hub值的归一化的方法与authority一样
外层用while循环,每个迭代就像上面所述更新hub值和authority值
利用GraphX Pregel进行迭代图计算
Pregel是一种图计算模型。这种模型描述的图算法有以下两个特点:
(1)Pregel是一种同步的,迭代的计算模型:他将计算过程分成一系列的超步(superstep),每个超步末尾所有节点通过消息传递同步节点状态
(2)Pregel以节点为中心:在每个超步中,所有节点执行同样的自定义函数处理消息以及修改状态
Pregel在很多图计算框架上都有实现。GraphX上也有Pregel的接口,我们来看下(以下代码段摘自[1]):
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)], mergeMsg: (A, A) => A) : Graph[VD, ED]
我们来看这个接口各个参数什么意思:这个方法一开始执行的时候,图中所有节点会同时收到initialMsg消息,并同时执行vprog函数进行进行初始化(这个vprog就是pregel定义里面那个“同样的自定义函数”)。注意我们一般把initialMsg设置成一个很特殊的值,并在vprog里面判断传入的消息是不是这个特殊值,如果是特殊值,就进行节点状态的初始化。
vprog有三个参数,VertexId是当前节点的节点id,VD是当前节点的属性,A是收到的消息(这是经过合并之后的),在vprog里面进行一些操作更新节点的状态。
执行完vprog之后在边上执行sendMsg函数,他和aggregateMessages里面的sendMsg其实是同一个东西,输入的参数其实都是包含边的起点id,起点属性,终点id,终点属性,边属性的一个view。通过这些我们可以构造和发送消息。
执行完sendMsg之后执行mergeMsg操作,这个操作和aggregateMessages里面的mergeMsg也是一个东西,对每一个节点收到的消息进行合并。
执行完mergeMsg之后就执行完一个迭代,回到vprog函数,vprog输入的A就是mergeMsg合并之后的消息。如此循环往复,直到达到预设的最大迭代次数maxIterations。
注意activeDirection控制的是哪些边上面会执行sendMsg函数。含义如下:
EdgeDirection.Out:所有上一轮迭代起点收到消息的边就会执行sendMsg
EdgeDirection.In:所有上一轮迭代终点收到消息的边就会执行sendMsg
EdgeDirection.Either:上一轮迭代起点或者终点任一个收到消息的边就会执行sendMsg
EdgeDirection.Both:上一轮迭代起点和终点都收到消息的边才会执行sendMsg
如果所有节点在上一轮迭代都没有收到任何消息,那么pregel迭代自动停止
下面以一个连通子图求解的问题举例。连通子图的含义是:在不考虑方向的图中,任意两点都有通路的子图为一个连通子图。我们需要知道哪些节点属于同一个连通子图,算法如下:
(1)初始情况下,每个节点的连通子图id都是他自己的节点id,第一个迭代,把自己的id发给自己邻居
(2)每个迭代:节点看邻居发来的id,如果发来的id的最小值都比自己现在的id小,就换成这个最小的id,并把这个最小的id发给自己的邻居;否则维持原来的id不变什么消息都不发
(3)当所有的节点从自己的邻居收到的消息都不比自己的id小的时候,大家都不发消息,算法结束。结束时同一个连通子图的节点的id是一样的。
我们看下GraphX内置的连通分支求解是怎么实现的,版本是Spark2.3:
代码路径为:
https://github.com/apache/spark/blob/branch-2.3/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala#L37
第一步是42行,不管什么样的图,都把节点属性换成节点的id,作为初始的连通分支id
sendMsg的定义在43行,他很简单,就是看起点和终点谁的属性(也就是潜在的连通分支id)小,就把小的属性发到另一边。45行的意思是把edge.srcAttr发送到edge.dstId对应的节点,47行同理,49行意思是不发送任何消息。mergeMsg函数是任何一对消息总是取小的,vprog是自己节点id和收到的id中取小的,这样就能实现上述算法第(2)步
我们要得到正确的连通分支结果,一般设置maxIterations为正无穷,算法收敛的时候所有边上sendMsg都不执行,Pregel方法的执行就自动结束了。
引用
[1]https://spark.apache.org/docs/latest/graphx-programming-guide.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。