赞
踩
//顶点集合
V={v1,v2,v3}
//边集合
E={(v1,v2),(v2,v3),(v1,v3)}
Graph=(V,E)
// 建立所有的点
val vects = sc.makeRDD(Seq((3l,("rxin","stu")),(5l,("张三","prof")),(2l,("lisi","prof")),(7l,("xx","prodoc"))))
// 建立所有的边
val edges = sc.makeRDD(Seq(Edge(2l,5l,"ts"),Edge(5l,3l,"zd"),Edge(5l,7l,"pi"),Edge(3l,7l,"collab")))
// 建立图
val graph = Graph(vects,edges)
val conf = new SparkConf().setAppName("mygraphx").setMaster("local[*]") val sc = SparkContext.getOrCreate(conf) // 建立所有的点 val vects = sc.makeRDD(Seq((3l,("rxin","stu")),(5l,("张三","prof")),(2l,("lisi","prof")),(7l,("xx","prodoc")))) // 建立所有的边 val edges = sc.makeRDD(Seq(Edge(2l,5l,"ts"),Edge(5l,3l,"zd"),Edge(5l,7l,"pi"),Edge(3l,7l,"collab"))) // 建立图 val graph = Graph(vects,edges) // 打印图的点和边的个数 println(graph.numEdges,graph.numVertices) // 遍历图的所有点信息 graph.vertices.foreach(x=>println(x._1,x._2)) // 遍历图的所有边信息 graph.edges.foreach(x=>println(x.srcId,x.dstId,x.attr)) // 遍历图的所有边信息(边中的点信息是点的值的信息,不是点的别名) graph.triplets.foreach(x=>println(x.srcAttr,x.dstAttr,x.attr)) // 遍历所有边的入度 graph.inDegrees.foreach(x=>println(x)) // 遍历所有边的出度 graph.outDegrees.foreach(x=>println(x)) // 遍历所有边的出入度的和 graph.degrees.foreach(println)
// 建立所有的点
val vects = sc.makeRDD(Seq((3l,("rxin","stu")),(5l,("张三","prof")),(2l,("lisi","prof")),(7l,("xx","prodoc"))))
// 建立所有的边
val edges = sc.makeRDD(Seq(Edge(2l,5l,"ts"),Edge(5l,3l,"zd"),Edge(5l,7l,"pi"),Edge(3l,7l,"collab")))
// 建立图
val graph = Graph(vects,edges)
graph.mapEdges(e=>Edge(e.srcId,e.dstId,e.attr+",hello")).edges.foreach(x=>println(x.attr))
val t1_graph = tweeter_graph.reverse //改变边的方向
//生成满足顶点和边的条件的子图
val t2_graph = tweeter_graph.subgraph(vpred=(id,attr)=>attr._2<65) //attr:(name,age)
//外部RDD数据
val newPoint = sc.parallelize(Array((3l,"hehe"),(5l,"xixi"),(4l,"cici")))
//关联点,修改点的值
graph.joinVertices(newPoint)((id,src,newVal)=>(src._1+"@"+newVal,src._2)).vertices.foreach(f=>println(f._2))
//当与RDD中的顶点不匹配时,值为None
graph.outerJoinVertices(newPoint)((id,src,newVal)=>(src._1+"@"+newVal,src._2)).vertices.foreach(f=>println(f._2))
((User47,86566510),(User83,15647839)) ((User47,86566510),(User42,197134784)) ((User89,74286565),(User49,19315174)) ((User16,22679419),(User69,45705189)) ((User37,14559570),(User64,24742040)) ((User31,63644892),(User10,123004655)) ((User10,123004655),(User50,17613979)) ((User37,14559570),(User11,14269220)) ((User78,3365291),(User30,93905958)) ((User14,199097645),(User60,16547411)) ((User3,14874480),(User42,197134784)) ((User40,813286),(User9,15434432)) ((User10,123004655),(User34,10211502)) ((User90,34870269),(User53,25566593)) ((User12,24741956),(User60,16547411)) ((User12,24741956),(User5,18927441))
代码实现:
object TestMyGraphx { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[1]").setAppName("MyFans") val sc = SparkContext.getOrCreate(conf) // 1.读文件 var rdd = sc.textFile("file:///d:/twitter_graph_data.txt").cache() // 2.分割数据,获得所有的点并去重 val partten = """([a-zA-Z0-9]+),([0-9]+)""".r val vects = rdd.flatMap(str=>{ partten.findAllIn(str).map(e=>{ val ff = e.split(",") (ff(1).toLong,ff(0)) }) }).distinct() // 3.分割数据,获得所有的边,权重设定为1 val partten1 = """\(\([a-zA-Z0-9]+,([0-9]+)\),\([a-zA-Z0-9]+,([0-9]+)\)\)""".r val edgs =rdd.flatMap(str=>{ partten1.findAllMatchIn(str).map(x=>(Edge(x.group(1).toLong,x.group(2).toLong,1))) }) // 4.构成图 var graphx = Graph(vects,edgs) // 5.获取所有点的入度,按入度排序,前n名即为网络红人 val hong = graphx.inDegrees.repartition(1).sortBy(-_._2).take(3) sc.makeRDD(hong).join(vects).foreach(println) } } //结果展示: (15913,(67,User94)) (59804598,(58,User36)) (24742040,(58,User64))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。