赞
踩
图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构
应用场景
图的术语
Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}
有环图:包含一系列顶点连接的回路(环路)
无环图:DAG即为有向无环图
度:一个顶点所有边的数量
出度:指从当前顶点指向其他顶点的边的数量
入度:其他顶点指向当前顶点的边的数量
图的经典表示法:邻接矩阵
1、对于每条边,矩阵中相应单元格值为1
2、对于每个循环,矩阵中相应单元格值为2,方便在行或列上求得顶点度数
GraphX是Spark提供分布式图计算API
GraphX特点
GraphX核心抽象
弹性分布式属性图(Resilient Distributed Property Graph)
顶点和边都带属性的有向多重图
一份物理存储,两种视图
对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
}
import org.apache.spark.graphx._
val vertices:RDD[(VertexId,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
val edges=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))
val graph=Graph(vertices,edges) //Graph[Int,Int] ?
import org.apache.spark.graphx.GraphLoader
//加载边列表文件创建图,文件每行描述一条边,格式:srcId dstId。顶点与边的属性均为1
val graph = GraphLoader.edgeListFile(sc,
"file:///opt/spark/data/graphx/followers.txt")
构建用户合作关系属性图
val userGraph: Graph[(String, String), String]
构建用户社交网络关系
找出大于30岁的用户
import org.apache.spark.graphx._
val v1 = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))))
val e1 = sc.makeRDD(Array(Edge(4L,1L,1L),Edge(2L,1L,7L),Edge(2L,4L,2L),Edge(5L,2L,2L),Edge(5L,3L,8L),Edge(5L,6L,3L),Edge(3L,6L,3L),Edge(3L,2L,4L)))
val graph1 =Graph(v1,e1)
//大于30岁的人
graph1.vertices.filter{case(id,(name,age))=>age>30}.collect
//打call超过5次
graph1.edges.filter{case(Edge(from,to,call))=>call>5}.collect
class Graph[VD, ED] {
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
}
属性算子:类似于RDD的map操作
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
val t1_graph = tweeter_graph.mapVertices { case(vertextId, (name, age)) => (vertextId, name) }
val t2_graph = tweeter_graph.mapVertices { (vertextId, attr) => (vertextId, attr._1) }
val t3_graph = tweeter_graph.mapEdges(e => Edge(e.srcId, e.dstId, e.attr*7.0))
结构算子
class Graph[VD, ED] {
def reverse: Graph[VD, ED] //改变边的方向
//生成满足顶点与边的条件的子图
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
}
val t1_graph = tweeter_graph.reverse
val t2_graph = tweeter_graph.subgraph(vpred=(id,attr)=>attr._2<65) //attr:(name,age)
Join算子:从外部的RDDs加载数据,修改顶点属性
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
//RDD中的顶点不匹配时,值为None
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
val tweeters_comps:RDD[(VertexId,String)]= sc.parallelize(Array((1L, "kgc.cn"), (2L, "berkeley.edu"), (3L, "apache.org")))
val t_graph = tweeter_graph.joinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2))
t_graph.vertices.collect
val s_graph = tweeter_graph.outerJoinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2))
s_graph.vertices.collect
import org.apache.spark.graphx._ val v1 = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50)))) val e1 = sc.makeRDD(Array(Edge(4L,1L,1),Edge(2L,1L,7),Edge(2L,4L,2),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3),Edge(3L,6L,3),Edge(3L,2L,4))) val graph1 =Graph(v1,e1) case class User(name: String, age: Int, inDeg: Int, outDeg: Int) //修改顶点属性 val initialUserGraph: Graph[User, Int] = tweeter_graph.mapVertices{ case (id, (name, age)) => User(name, age, 0, 0) } //将顶点入度、出度存入顶点属性中 val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) { case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg) }.outerJoinVertices(initialUserGraph.outDegrees) { case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0)) } //顶点的入度即为粉丝数量 for ((id, property) <- userGraph.vertices.collect) println(s"User $id is ${property.name} and is liked by ${property.inDeg} people.")
User 1 is called Alice and is liked by 2 people.
User 2 is called Bob and is liked by 2 people.
User 3 is called Charlie and is liked by 1 people.
User 4 is called David and is liked by 1 people.
User 5 is called Ed and is liked by 0 people.
User 6 is called Fran and is liked by 2 people.
需求说明
数据:twitter-graph-data.txt
数据获取地址:链接: https://pan.baidu.com/s/1OM5DFo3qO_HDDmCS2PIaWQ 提取码: v8r3
格式:((User*, *),(User*,*))
(User*, *)=(用户名,用户ID)
第一个用户表示被跟随者(followee)
第二个用户表示跟随者(follower)
创建图并计算每个用户的粉丝数量
找出网络红人
import org.apache.spark.graphx._ //正则表达式截取数据 val pattern = """\(\((User[0-9]{1,},[0-9]{1,})\),\((User[0-9]{1,},[0-9]{1,})\)\)""".r //加载数据 val t1 = sc.textFile("file:///data/twitter_graph_data.txt") val t2 = t1.map(line=>line match{case pattern(followee,follower)=>(Some(followee),Some(follower));case _ => (None,None)}) val t3 = t2.filter(x=>x._1!=None && x._2!=None).map(x=>(x._1.get.split(","),x._2.get.split(","))).map(x=>(x._1(0),x._1(1).toLong,x._2(0),x._2(1).toLong)) //形成顶点,边和图 val verts = t3.flatMap(x => Array((x._2, x._1), (x._4, x._3))).distinct val edges = t3.map(x=>Edge(x._2,x._4,"follow")) val graph = Graph(verts,edges) val defaultUser = ("") val graph = Graph(verts, edges, defaultUser) //找出网络红人即粉丝数最大的人 graph.inDegrees.sortBy(_._2,false).take(1) //找出影响力最大的红人 val sub_graph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) => attr + "," + msg, triplet => Iterator((triplet.srcId, triplet.dstAttr)), (a, b) => (a + "," + b)) val lengthRDD = sub_graph.vertices.map(v=>(v._1,v._2.split(",").distinct.length-2)).max()(new Ordering[Tuple2[VertexId,Int]](){override def compare(x:(VertexId,Int),y:(VertexId,Int)):Int= Ordering[Int].compare(x._2,y._2)}) val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().head println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")
PageRank(PR)算法
class Graph[VD, ED] {
//第一个参数为收敛时允许的误差,越小越精确, 确定迭代是否结束的参数
//第二个参数为随机重置概率
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
}
PageRank应用
//找出用户社交网络中最重要的用户
val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))
val vertexRDD: RDD[(Long, (String, Int))] = spark.sparkContext.parallelize(tweeters)
val followRelations = Array(Edge[Int](2L, 1L, 7), Edge[Int](2L, 4L, 2), Edge[Int](3L, 2L, 4), Edge[Int](3L, 6L, 3), Edge[Int](4L, 1L, 1), Edge[Int](5L, 2L, 2), Edge[Int](5L, 3L, 8), Edge[Int](5L, 6L, 3))
val edgeRDD = spark.sparkContext.parallelize(followRelations)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
val ranks = graph.pageRank(0.0001)
ranks.vertices.sortBy(_._2, false).collect
/*需求说明 现有followers.txt、users.txt,通过followers.txt创建图,并使用PageRank算法找出图中最重要的用户,输出用户名称与重要程度 */ //导包 import org.apache.spark.graphx.GraphLoader //加载边数据形成图 val graph = GraphLoader.edgeListFile(sc, "file:///data/followers.txt") //给定参数形成重要系数 val ranks = graph.pageRank(0.0001).vertices //加载顶点数据 val users = sc.textFile("file:///data/user.txt").map { line => val fields = line.split(",") (fields(0).toLong, fields(1)) } //join操作形成名字和重要系数的rdd val ranksByUsername = users.join(ranks).map { case (id, (username, rank)) => (username, rank) } //找出重要系数最大的人 ranksByUsername.sortBy(_._2,false).take(1)
Pregel是Google提出的用于大规模分布式图计算框架
Pregel的计算由一系列迭代组成,称为supersteps
Pregel迭代过程
//源码
class Graph[VD, ED] {
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
}
//参数说明
initialMsg:在“superstep 0”之前发送至顶点的初始消息
maxIterations:将要执行的最大迭代次数
activeDirection:发送消息方向(默认是出边方向:EdgeDirection.Out)
vprog:用户定义函数,用于顶点接收消息
sendMsg:用户定义的函数,用于确定下一个迭代发送的消息及发往何处
mergeMsg:用户定义的函数,在vprog前,合并到达顶点的多个消息
GraphX Pregel 应用
//需求说明:求出图中最小值 // 创建顶点集RDD val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)), (3L, (2,-1)), (4L, (6,-1)))) // 创建边集RDD val relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true))) // 创建图 val graph = Graph(vertices, relationships) //Pregel val initialMsg = 9999//最大迭代次数 def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = { if (message == initialMsg) value else (message min value._1, value._1) } def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = { val sourceVertex = triplet.srcAttr if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triplet.dstId, sourceVertex._1)) } def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2 val minGraph = graph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(vprog, sendMsg, mergeMsg) minGraph.vertices.collect.foreach{ case (vertexId, (value, original_value)) => println(value) }
/*需求说明
求从0到任意点的最短路径(SSSP)
实现思路
初始化 Vertex 的 Message 为最大值
将源点(0)的 Message 设为 0
每步每个节点将自己目前的 Message 加上边的权值发送到相邻节点,每个节点聚合出自身所有消息的最小值
当某一步当中一个节点Message 值无变化,该节点停止迭代
*/
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.11</artifactId> <version>2.2.0</version> </dependency> </dependencies>
object pregelDemo03 extends App{ //求顶点0到各顶点最短距离 val sc = new SparkContext( new SparkConf().setAppName("test").setMaster("local[*]")) val vertexRDD = sc.makeRDD(Array((0L,""),(1L,""),(2L,""),(3L,""),(4L,""))) val edgeRDD = sc.makeRDD(Array(Edge(0L,4L,10),Edge(4L,3L,50),Edge(0L,1L,100),Edge(0L,2L,30),Edge(3L,1L,10),Edge(2L,1L,60),Edge(2L,3L,60))) val graph =Graph(vertexRDD,edgeRDD) //起始顶点 val srcVertextId = 0L val initialGraph = graph.mapVertices{case (id,property)=>if (id==srcVertextId) 0.0 else Double.PositiveInfinity} //调用pregel val pregelGraph = initialGraph.pregel(Double.PositiveInfinity,Int.MaxValue,EdgeDirection.Out)( (vid:VertexId,vd:Double,distMsg:Double)=>{ val minDist = math.min(vd,distMsg) println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}") minDist }, (edgeTriplet: EdgeTriplet[Double,PartitionID]) => { if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) { println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}") Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr)) } else { Iterator.empty } }, (msg1: Double, msg2: Double) => math.min(msg1, msg2) ) // pregelGraph.triplets.collect().foreach(println) println(pregelGraph.vertices.collect.mkString("\n")) sc.stop() }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。