@@ -79,30 +79,43 @@ object PageRank extends Logging {
7979 def run [VD : ClassTag , ED : ClassTag ](
8080 graph : Graph [VD , ED ], numIter : Int , resetProb : Double = 0.15 ): Graph [Double , Double ] =
8181 {
82- // Initialize the pagerankGraph with each edge attribute having
82+ // Initialize the PageRank graph with each edge attribute having
8383 // weight 1/outDegree and each vertex with attribute 1.0.
84- val pagerankGraph : Graph [Double , Double ] = graph
84+ var rankGraph : Graph [Double , Double ] = graph
8585 // Associate the degree with each vertex
8686 .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0 ) }
8787 // Set the weight on the edges based on the degree
8888 .mapTriplets( e => 1.0 / e.srcAttr )
8989 // Set the vertex attributes to the initial pagerank values
90- .mapVertices( (id, attr) => 1.0 )
91- .cache()
90+ .mapVertices( (id, attr) => resetProb )
9291
93- // Define the three functions needed to implement PageRank in the GraphX
94- // version of Pregel
95- def vertexProgram (id : VertexId , attr : Double , msgSum : Double ): Double =
96- resetProb + (1.0 - resetProb) * msgSum
97- def sendMessage (edge : EdgeTriplet [Double , Double ]) =
98- Iterator ((edge.dstId, edge.srcAttr * edge.attr))
99- def messageCombiner (a : Double , b : Double ): Double = a + b
100- // The initial message received by all vertices in PageRank
101- val initialMessage = 0.0
92+ var iteration = 0
93+ var prevRankGraph : Graph [Double , Double ] = null
94+ while (iteration < numIter) {
95+ rankGraph.cache()
10296
103- // Execute pregel for a fixed number of iterations.
104- Pregel (pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection .Out )(
105- vertexProgram, sendMessage, messageCombiner)
97+ // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
98+ // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
99+ val rankUpdates = rankGraph.mapReduceTriplets[Double ](
100+ e => Iterator ((e.dstId, e.srcAttr * e.attr)), _ + _)
101+
102+ // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
103+ // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
104+ // edge partitions.
105+ prevRankGraph = rankGraph
106+ rankGraph = rankGraph.joinVertices(rankUpdates) {
107+ (id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
108+ }.cache()
109+
110+ rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
111+ logInfo(s " PageRank finished iteration $iteration. " )
112+ prevRankGraph.vertices.unpersist(false )
113+ prevRankGraph.edges.unpersist(false )
114+
115+ iteration += 1
116+ }
117+
118+ rankGraph
106119 }
107120
108121 /**
0 commit comments