当前位置:   article > 正文

大数据基础之SparkGraphX——SparkGraphX基本简介及寻找网络红人项目实战分析_spark graphx实战

spark graphx实战

图(Graph)的基本概念

  • 图是有顶点集合(vertex)及顶点之间的关系结合(边egde)组成的一种网状数据结构
      - 通常表示为二元祖:Graph=(V,E)
      - 可以对事物之间的关系建模
  • 应用场景
      - 在地图应用中寻找最短路径
      - 社交网络关系
      - 网页间超链接关系
    在这里插入图片描述
  • 顶点(Vertex)
  • 边(Edge)
//顶点集合
V={v1,v2,v3}
//边集合
E={(v1,v2),(v2,v3),(v1,v3)}
Graph=(V,E)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 度(一个顶点所有边的数量)
      - 出度:指从当前顶点指向其他顶点的边的数量
      - 入度:其他顶点指向当前顶点的边的数量

图的经典表示法:邻接矩阵

在这里插入图片描述

  • 对于每条边,矩阵中相对应的单元格值为1
  • 对于每个循环,矩阵中相对应的单元格值为2,方便在行或列上求得顶点度

Spark GraphX

  • GraphX是Spark提供分布式图计算API
  • GraphX特点
      - 基于内存实现了数据的复用与快速读取
      - 通过弹性分布式属性图(Property Graph)统一了图视图与表视图
      - 与Spark Streaming、Spark SQL和Spark MLlib等无缝衔接

GraphX API

  • Graph[VD,ED]
  • VertexRDD[VD]
  • EdgeRDD[ED]
  • EdgeTriplet[VD,ED]
  • Edge:样例类
  • VertexId:Long的别名
//    建立所有的点
    val vects = sc.makeRDD(Seq((3l,("rxin","stu")),(5l,("张三","prof")),(2l,("lisi","prof")),(7l,("xx","prodoc"))))

//    建立所有的边
    val edges = sc.makeRDD(Seq(Edge(2l,5l,"ts"),Edge(5l,3l,"zd"),Edge(5l,7l,"pi"),Edge(3l,7l,"collab")))

//    建立图
    val graph = Graph(vects,edges)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

图属性值

val conf = new SparkConf().setAppName("mygraphx").setMaster("local[*]")
    val sc = SparkContext.getOrCreate(conf)

//    建立所有的点
    val vects = sc.makeRDD(Seq((3l,("rxin","stu")),(5l,("张三","prof")),(2l,("lisi","prof")),(7l,("xx","prodoc"))))

//    建立所有的边
    val edges = sc.makeRDD(Seq(Edge(2l,5l,"ts"),Edge(5l,3l,"zd"),Edge(5l,7l,"pi"),Edge(3l,7l,"collab")))

//    建立图
    val graph = Graph(vects,edges)
//    打印图的点和边的个数
    println(graph.numEdges,graph.numVertices)
//	   遍历图的所有点信息
    graph.vertices.foreach(x=>println(x._1,x._2))
//	   遍历图的所有边信息
    graph.edges.foreach(x=>println(x.srcId,x.dstId,x.attr))
//	   遍历图的所有边信息(边中的点信息是点的值的信息,不是点的别名)
    graph.triplets.foreach(x=>println(x.srcAttr,x.dstAttr,x.attr))
//	   遍历所有边的入度
    graph.inDegrees.foreach(x=>println(x))
//	   遍历所有边的出度
    graph.outDegrees.foreach(x=>println(x))
//	   遍历所有边的出入度的和
    graph.degrees.foreach(println)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

图的算子:

属性算子

  • 类似于RDD的map操作
//    建立所有的点
val vects = sc.makeRDD(Seq((3l,("rxin","stu")),(5l,("张三","prof")),(2l,("lisi","prof")),(7l,("xx","prodoc"))))

//    建立所有的边
    val edges = sc.makeRDD(Seq(Edge(2l,5l,"ts"),Edge(5l,3l,"zd"),Edge(5l,7l,"pi"),Edge(3l,7l,"collab")))

//    建立图
    val graph = Graph(vects,edges)
  graph.mapEdges(e=>Edge(e.srcId,e.dstId,e.attr+",hello")).edges.foreach(x=>println(x.attr))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

结构算子

val t1_graph = tweeter_graph.reverse             //改变边的方向
//生成满足顶点和边的条件的子图
val t2_graph = tweeter_graph.subgraph(vpred=(id,attr)=>attr._2<65)  //attr:(name,age)
  • 1
  • 2
  • 3

Join算子

  • 从外部的RDD加载数据,修改顶点属性,(join的后边必须为RDD)
//外部RDD数据
val newPoint = sc.parallelize(Array((3l,"hehe"),(5l,"xixi"),(4l,"cici")))
//关联点,修改点的值
graph.joinVertices(newPoint)((id,src,newVal)=>(src._1+"@"+newVal,src._2)).vertices.foreach(f=>println(f._2))

//当与RDD中的顶点不匹配时,值为None
graph.outerJoinVertices(newPoint)((id,src,newVal)=>(src._1+"@"+newVal,src._2)).vertices.foreach(f=>println(f._2))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

网络红人项目实战

  • 数据:(这里只是一部分,如果需要更多请自己仿造生成,这里不便上传太多)
((User47,86566510),(User83,15647839))
((User47,86566510),(User42,197134784))
((User89,74286565),(User49,19315174))
((User16,22679419),(User69,45705189))
((User37,14559570),(User64,24742040))
((User31,63644892),(User10,123004655))
((User10,123004655),(User50,17613979))
((User37,14559570),(User11,14269220))
((User78,3365291),(User30,93905958))
((User14,199097645),(User60,16547411))
((User3,14874480),(User42,197134784))
((User40,813286),(User9,15434432))
((User10,123004655),(User34,10211502))
((User90,34870269),(User53,25566593))
((User12,24741956),(User60,16547411))
((User12,24741956),(User5,18927441))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 格式:((user* , ),(user , ))
      - (user
    , *)=(用户名,用户ID)
      - 前一个用户表示被跟随者(followee)
      - 后一个用户表示跟随者(follower)
  • 创建图并计算每个用户的粉丝数量
  • 找出网络红人

代码实现:

object TestMyGraphx {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[1]").setAppName("MyFans")
    val sc = SparkContext.getOrCreate(conf)
//    1.读文件
    var rdd = sc.textFile("file:///d:/twitter_graph_data.txt").cache()
//    2.分割数据,获得所有的点并去重
    val partten = """([a-zA-Z0-9]+),([0-9]+)""".r

    val vects = rdd.flatMap(str=>{
      partten.findAllIn(str).map(e=>{
        val ff = e.split(",")
        (ff(1).toLong,ff(0))
      })
    }).distinct()


//    3.分割数据,获得所有的边,权重设定为1
        val partten1 = """\(\([a-zA-Z0-9]+,([0-9]+)\),\([a-zA-Z0-9]+,([0-9]+)\)\)""".r
        val edgs =rdd.flatMap(str=>{
          partten1.findAllMatchIn(str).map(x=>(Edge(x.group(1).toLong,x.group(2).toLong,1)))
        })

//    4.构成图
    var graphx = Graph(vects,edgs)
//    5.获取所有点的入度,按入度排序,前n名即为网络红人
    val hong = graphx.inDegrees.repartition(1).sortBy(-_._2).take(3)
    sc.makeRDD(hong).join(vects).foreach(println)
  }
}

//结果展示:
(15913,(67,User94))
(59804598,(58,User36))
(24742040,(58,User64))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/542105
推荐阅读
相关标签
  

闽ICP备14008679号