当前位置:   article > 正文

spark Graph 的PregelAPI 理解和使用

认识pregel api

spark Graph 的PregelAPI 理解和使用

图本质上是一种递归的数据结构,可以使用Spark GraphX 的PregelAPI接口对图数据进行批量计算,
之前一直不怎么理解Pregel计算模型,因此花点时间整理一下,该api的理解以及使用方法等。

1、Pregel的计算模型

Pregel接口的官方定义:

  1. /**
  2. * Execute a Pregel-like iterative vertex-parallel abstraction. The
  3. * user-defined vertex-program `vprog` is executed in parallel on
  4. * each vertex receiving any inbound messages and computing a new
  5. * value for the vertex. The `sendMsg` function is then invoked on
  6. * all out-edges and is used to compute an optional message to the
  7. * destination vertex. The `mergeMsg` function is a commutative
  8. * associative function used to combine messages destined to the
  9. * same vertex.
  10. *
  11. * On the first iteration all vertices receive the `initialMsg` and
  12. * on subsequent iterations if a vertex does not receive a message
  13. * then the vertex-program is not invoked.
  14. *
  15. * This function iterates until there are no remaining messages, or
  16. * for `maxIterations` iterations.
  17. *
  18. * @param A the Pregel message type
  19. *
  20. * @param initialMsg the message each vertex will receive at the on
  21. * the first iteration
  22. *
  23. * @param maxIterations the maximum number of iterations to run for
  24. *
  25. * @param activeDirection the direction of edges incident to a vertex that received a message in
  26. * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
  27. * out-edges of vertices that received a message in the previous round will run.
  28. *
  29. * @param vprog the user-defined vertex program which runs on each
  30. * vertex and receives the inbound message and computes a new vertex
  31. * value. On the first iteration the vertex program is invoked on
  32. * all vertices and is passed the default message. On subsequent
  33. * iterations the vertex program is only invoked on those vertices
  34. * that receive messages.
  35. *
  36. * @param sendMsg a user supplied function that is applied to out
  37. * edges of vertices that received messages in the current
  38. * iteration
  39. *
  40. * @param mergeMsg a user supplied function that takes two incoming
  41. * messages of type A and merges them into a single message of type
  42. * A. ''This function must be commutative and associative and
  43. * ideally the size of A should not increase.''
  44. *
  45. * @return the resulting graph at the end of the computation
  46. *
  47. */
  48. def pregel[A: ClassTag](
  49. initialMsg: A,
  50. maxIterations: Int = Int.MaxValue,
  51. activeDirection: EdgeDirection = EdgeDirection.Either)(
  52. vprog: (VertexId, VD, A) => VD,
  53. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  54. mergeMsg: (A, A) => A)
  55. : Graph[VD, ED] = {
  56. Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  57. }

方法的注释根据自己的实验理解如下:

执行类似Pregel的迭代顶点并行抽象。
在一次迭代计算中,图的各个顶点收到默认消息或者上一轮迭代发送的消息后;
首先调用mergeMsg函数将具有相同目的地的消息合并成一个消息;
然后调用vprog顶点函数计算出新的顶点属性值;
然后再调用sendMsg 函数向出边顶点发送下一轮迭代的消息;
迭代计算直到没有消息剩余或者达到最大迭代次数退出。

在首轮迭代的时候,所有的顶点都会接收到initialMsg消息,在次轮迭代的时候,如果顶点没有接收到消息,verteProgram则不会被调用。

这些函数迭代会一直持续到没有剩余消息或者达到最大迭代次数maxIterations

VD : 顶点的属性的数据类型。
ED : 边的属性的数据类型
VertexId : 顶点ID的类型
A : Pregel message的类型。
graph:计算的输入的图
initialMsg : 图的每个顶点在首轮迭代时收到的初始化消息
maxIterations:最大迭代的次数
vprog
vprog是用户定义的顶点程序,会运行在每一个顶点上,该vprog函数的功能是负责接收入站的message,
并计算出的顶点的新属性值。
在首轮迭代时,在所有的顶点上都会调用程序vprog函数,传人默认的defaultMessage;在次轮迭代时,只有接收到message消息的顶点才会调用vprog函数。

  1. vprog: (VertexId, VD, A) => VD
  2. 输入参数: 顶点ID ,该顶点对应的顶点属性值,本轮迭代收到的message
  3. 输出结果: 新的顶点属性值

sendMsg
用户提供的函数,应用于以当前迭代计算收到消息的顶点为源顶点的边edges;sendMsg函数的功能
是发送消息,消息的发送方向默认是沿着出边反向(向边的目的顶点发送消息)。

  1. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  2. 输入参数是 EdgeTriplet :当前迭代计算收到消息的顶点为源顶点的边edges的EdgeTriplet对象。
  3. 输出结果: 下一迭代的消息。

mergeMsg
用户提供定义的函数,将具有相同目的地的消息合并成一个;如果一个顶点,收到两个以上的A类型的消息message,该函数将他们合并成一个A类型消息。 这个函数必须是可交换的和关联的。理想情况下,A类型的message的size大小不应增加。

  1. mergeMsg: (A, A) => A)
  2. 输入参数:当前迭代中,一个顶点收到的2个A类型的message。
  3. 输出结果:A类型的消息

下面的例子是使用Pregel计算单源最短路径,在图中节点间查找最短的路径是非常常见的图算法,所谓“单源最短路径”,就是指给定初始节点StartV,
计算图中其他任意节点到该节点的最短距离。我简化了官方的示例,使我们可以更简单的理解pregel计算模型。

  1. package graphxTest
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.graphx.{Edge, Graph, VertexId}
  5. /**
  6. * Created by Mtime on 2018/1/25.
  7. */
  8. object GraphxPregelTest {
  9. val spark = SparkSession
  10. .builder
  11. .appName(s"${this.getClass.getSimpleName}").master("local[2]")
  12. .getOrCreate()
  13. val sc = spark.sparkContext
  14. /**
  15. * 计算最短路径
  16. **/
  17. def shortestPath(): Unit = {
  18. //生成一个图对象
  19. val graph: Graph[Long, Double] = genGraph
  20. //打印出图的值
  21. graph.triplets.foreach(t => {
  22. println(s"t.srcId=${t.srcId} t.dstId=${t.dstId} t.srcAttr=${t.srcAttr} t.dstAttr=${t.dstAttr}")
  23. })
  24. val sourceId: VertexId = 1 // 计算顶点1到图各个顶点的最短路径
  25. // Initialize the graph such that all vertices except the root have distance infinity.
  26. val initialGraph = graph.mapVertices((id, att) =>
  27. if (id == sourceId) 0.0 else Double.PositiveInfinity)
  28. println("------------------------------")
  29. //打印出图的值
  30. initialGraph.triplets.foreach(t => {
  31. println(s"t.srcId=${t.srcId} t.dstId=${t.dstId} t.srcAttr=${t.srcAttr} t.dstAttr=${t.dstAttr}")
  32. })
  33. val sssp:Graph[Double,Double] = initialGraph.pregel(Double.PositiveInfinity)(
  34. (vid, vidAttr, message) => math.min(vidAttr, message), // Vertex Program
  35. triplet => {
  36. // Send Message
  37. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  38. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  39. } else {
  40. Iterator.empty
  41. }
  42. },
  43. (message_a, message_b) => math.min(message_a, message_b) // Merge Message
  44. )
  45. println("------------------------------")
  46. //打印出计算结果
  47. println(sssp.vertices.collect.mkString("\n"))
  48. }
  49. /**
  50. * 初始化图对象
  51. *
  52. * @return
  53. */
  54. private def genGraph(): Graph[Long, Double] = {
  55. val vertices: RDD[(VertexId, Long)] =
  56. sc.parallelize(Array(
  57. (1L, 0L),
  58. (2L, 0L),
  59. (3L, 0L),
  60. (4L, 0L),
  61. (5L, 0L),
  62. (6L, 0L))
  63. )
  64. // Create an RDD for edges
  65. val edges: RDD[Edge[Double]] =
  66. sc.parallelize(Array(
  67. Edge(1L, 2L, 1.0),
  68. Edge(1L, 4L, 1.0),
  69. Edge(1L, 5L, 1.0),
  70. Edge(2L, 3L, 1.0),
  71. Edge(4L, 3L, 1.0),
  72. Edge(5L, 4L, 1.0),
  73. Edge(3L, 6L, 1.0)
  74. )
  75. )
  76. val graph: Graph[Long, Double] = Graph(vertices, edges, 0)
  77. graph
  78. }
  79. def main(args: Array[String]) {
  80. shortestPath
  81. }
  82. }

转载于:https://www.cnblogs.com/honeybee/p/8422338.html

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号