Skip to content

Commit e4e42c3

Browse files
committed
Merge branch 'master' into SPARK-16323
2 parents 067b788 + 54b27c1 commit e4e42c3

File tree

20 files changed

+567
-157
lines changed

20 files changed

+567
-157
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,8 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
376376
// for tests
377377
assert(inMemSorter != null);
378378
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
379-
logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
379+
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
380+
numElementsForSpillThreshold);
380381
spill();
381382
}
382383

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import org.apache.spark.SparkEnv;
3130
import org.apache.spark.TaskContext;
3231
import org.apache.spark.executor.ShuffleWriteMetrics;
3332
import org.apache.spark.memory.MemoryConsumer;
@@ -99,8 +98,8 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
9998
long numElementsForSpillThreshold,
10099
UnsafeInMemorySorter inMemorySorter) throws IOException {
101100
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
102-
serializerManager, taskContext, recordComparator, prefixComparator, initialSize, numElementsForSpillThreshold,
103-
pageSizeBytes, inMemorySorter, false /* ignored */);
101+
serializerManager, taskContext, recordComparator, prefixComparator, initialSize,
102+
numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* ignored */);
104103
sorter.spill(Long.MAX_VALUE, sorter);
105104
// The external sorter will be used to insert records, in-memory sorter is not needed.
106105
sorter.inMemSorter = null;
@@ -119,8 +118,8 @@ public static UnsafeExternalSorter create(
119118
long numElementsForSpillThreshold,
120119
boolean canUseRadixSort) {
121120
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
122-
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null,
123-
canUseRadixSort);
121+
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes,
122+
numElementsForSpillThreshold, null, canUseRadixSort);
124123
}
125124

126125
private UnsafeExternalSorter(
@@ -387,7 +386,8 @@ public void insertRecord(
387386

388387
assert(inMemSorter != null);
389388
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
390-
logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold);
389+
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
390+
numElementsForSpillThreshold);
391391
spill();
392392
}
393393

docs/graphx-programming-guide.md

Lines changed: 6 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -603,29 +603,7 @@ slightly unreliable and instead opted for more explicit user control.
603603
In the following example we use the [`aggregateMessages`][Graph.aggregateMessages] operator to
604604
compute the average age of the more senior followers of each user.
605605

606-
{% highlight scala %}
607-
// Import random graph generation library
608-
import org.apache.spark.graphx.util.GraphGenerators
609-
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
610-
val graph: Graph[Double, Int] =
611-
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
612-
// Compute the number of older followers and their total age
613-
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
614-
triplet => { // Map Function
615-
if (triplet.srcAttr > triplet.dstAttr) {
616-
// Send message to destination vertex containing counter and age
617-
triplet.sendToDst(1, triplet.srcAttr)
618-
}
619-
},
620-
// Add counter and age
621-
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
622-
)
623-
// Divide total age by number of older followers to get average age of older followers
624-
val avgAgeOfOlderFollowers: VertexRDD[Double] =
625-
olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
626-
// Display the results
627-
avgAgeOfOlderFollowers.collect.foreach(println(_))
628-
{% endhighlight %}
606+
{% include_example scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala %}
629607

630608
> The `aggregateMessages` operation performs optimally when the messages (and the sums of
631609
> messages) are constant sized (e.g., floats and addition instead of lists and concatenation).
@@ -793,29 +771,7 @@ second argument list contains the user defined functions for receiving messages
793771
We can use the Pregel operator to express computation such as single source
794772
shortest path in the following example.
795773

796-
{% highlight scala %}
797-
import org.apache.spark.graphx._
798-
// Import random graph generation library
799-
import org.apache.spark.graphx.util.GraphGenerators
800-
// A graph with edge attributes containing distances
801-
val graph: Graph[Long, Double] =
802-
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
803-
val sourceId: VertexId = 42 // The ultimate source
804-
// Initialize the graph such that all vertices except the root have distance infinity.
805-
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
806-
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
807-
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
808-
triplet => { // Send Message
809-
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
810-
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
811-
} else {
812-
Iterator.empty
813-
}
814-
},
815-
(a,b) => math.min(a,b) // Merge Message
816-
)
817-
println(sssp.vertices.collect.mkString("\n"))
818-
{% endhighlight %}
774+
{% include_example scala/org/apache/spark/examples/graphx/SSSPExample.scala %}
819775

820776
<a name="graph_builders"></a>
821777

@@ -1009,64 +965,19 @@ GraphX comes with static and dynamic implementations of PageRank as methods on t
1009965

1010966
GraphX also includes an example social network dataset that we can run PageRank on. A set of users is given in `data/graphx/users.txt`, and a set of relationships between users is given in `data/graphx/followers.txt`. We compute the PageRank of each user as follows:
1011967

1012-
{% highlight scala %}
1013-
// Load the edges as a graph
1014-
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
1015-
// Run PageRank
1016-
val ranks = graph.pageRank(0.0001).vertices
1017-
// Join the ranks with the usernames
1018-
val users = sc.textFile("data/graphx/users.txt").map { line =>
1019-
val fields = line.split(",")
1020-
(fields(0).toLong, fields(1))
1021-
}
1022-
val ranksByUsername = users.join(ranks).map {
1023-
case (id, (username, rank)) => (username, rank)
1024-
}
1025-
// Print the result
1026-
println(ranksByUsername.collect().mkString("\n"))
1027-
{% endhighlight %}
968+
{% include_example scala/org/apache/spark/examples/graphx/PageRankExample.scala %}
1028969

1029970
## Connected Components
1030971

1031972
The connected components algorithm labels each connected component of the graph with the ID of its lowest-numbered vertex. For example, in a social network, connected components can approximate clusters. GraphX contains an implementation of the algorithm in the [`ConnectedComponents` object][ConnectedComponents], and we compute the connected components of the example social network dataset from the [PageRank section](#pagerank) as follows:
1032973

1033-
{% highlight scala %}
1034-
// Load the graph as in the PageRank example
1035-
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
1036-
// Find the connected components
1037-
val cc = graph.connectedComponents().vertices
1038-
// Join the connected components with the usernames
1039-
val users = sc.textFile("data/graphx/users.txt").map { line =>
1040-
val fields = line.split(",")
1041-
(fields(0).toLong, fields(1))
1042-
}
1043-
val ccByUsername = users.join(cc).map {
1044-
case (id, (username, cc)) => (username, cc)
1045-
}
1046-
// Print the result
1047-
println(ccByUsername.collect().mkString("\n"))
1048-
{% endhighlight %}
974+
{% include_example scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala %}
1049975

1050976
## Triangle Counting
1051977

1052978
A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the [`TriangleCount` object][TriangleCount] that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the [PageRank section](#pagerank). *Note that `TriangleCount` requires the edges to be in canonical orientation (`srcId < dstId`) and the graph to be partitioned using [`Graph.partitionBy`][Graph.partitionBy].*
1053979

1054-
{% highlight scala %}
1055-
// Load the edges in canonical order and partition the graph for triangle count
1056-
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
1057-
// Find the triangle count for each vertex
1058-
val triCounts = graph.triangleCount().vertices
1059-
// Join the triangle counts with the usernames
1060-
val users = sc.textFile("data/graphx/users.txt").map { line =>
1061-
val fields = line.split(",")
1062-
(fields(0).toLong, fields(1))
1063-
}
1064-
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
1065-
(username, tc)
1066-
}
1067-
// Print the result
1068-
println(triCountByUsername.collect().mkString("\n"))
1069-
{% endhighlight %}
980+
{% include_example scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala %}
1070981

1071982

1072983
# Examples
@@ -1076,36 +987,4 @@ to important relationships and users, run page-rank on the sub-graph, and
1076987
then finally return attributes associated with the top users. I can do
1077988
all of this in just a few lines with GraphX:
1078989

1079-
{% highlight scala %}
1080-
// Connect to the Spark cluster
1081-
val sc = new SparkContext("spark://master.amplab.org", "research")
1082-
1083-
// Load my user data and parse into tuples of user id and attribute list
1084-
val users = (sc.textFile("data/graphx/users.txt")
1085-
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
1086-
1087-
// Parse the edge data which is already in userId -> userId format
1088-
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
1089-
1090-
// Attach the user attributes
1091-
val graph = followerGraph.outerJoinVertices(users) {
1092-
case (uid, deg, Some(attrList)) => attrList
1093-
// Some users may not have attributes so we set them as empty
1094-
case (uid, deg, None) => Array.empty[String]
1095-
}
1096-
1097-
// Restrict the graph to users with usernames and names
1098-
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
1099-
1100-
// Compute the PageRank
1101-
val pagerankGraph = subgraph.pageRank(0.001)
1102-
1103-
// Get the attributes of the top pagerank users
1104-
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
1105-
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
1106-
case (uid, attrList, None) => (0.0, attrList.toList)
1107-
}
1108-
1109-
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
1110-
1111-
{% endhighlight %}
990+
{% include_example scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala %}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// scalastyle:off println
19+
package org.apache.spark.examples.graphx
20+
21+
// $example on$
22+
import org.apache.spark.graphx.{Graph, VertexRDD}
23+
import org.apache.spark.graphx.util.GraphGenerators
24+
// $example off$
25+
import org.apache.spark.sql.SparkSession
26+
27+
/**
28+
* An example use the [`aggregateMessages`][Graph.aggregateMessages] operator to
29+
* compute the average age of the more senior followers of each user
30+
* Run with
31+
* {{{
32+
* bin/run-example graphx.AggregateMessagesExample
33+
* }}}
34+
*/
35+
object AggregateMessagesExample {
36+
37+
def main(args: Array[String]): Unit = {
38+
// Creates a SparkSession.
39+
val spark = SparkSession
40+
.builder
41+
.appName(s"${this.getClass.getSimpleName}")
42+
.getOrCreate()
43+
val sc = spark.sparkContext
44+
45+
// $example on$
46+
// Create a graph with "age" as the vertex property.
47+
// Here we use a random graph for simplicity.
48+
val graph: Graph[Double, Int] =
49+
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
50+
// Compute the number of older followers and their total age
51+
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
52+
triplet => { // Map Function
53+
if (triplet.srcAttr > triplet.dstAttr) {
54+
// Send message to destination vertex containing counter and age
55+
triplet.sendToDst(1, triplet.srcAttr)
56+
}
57+
},
58+
// Add counter and age
59+
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
60+
)
61+
// Divide total age by number of older followers to get average age of older followers
62+
val avgAgeOfOlderFollowers: VertexRDD[Double] =
63+
olderFollowers.mapValues( (id, value) =>
64+
value match { case (count, totalAge) => totalAge / count } )
65+
// Display the results
66+
avgAgeOfOlderFollowers.collect.foreach(println(_))
67+
// $example off$
68+
69+
spark.stop()
70+
}
71+
}
72+
// scalastyle:on println
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// scalastyle:off println
19+
package org.apache.spark.examples.graphx
20+
21+
// $example on$
22+
import org.apache.spark.graphx.GraphLoader
23+
// $example off$
24+
import org.apache.spark.sql.SparkSession
25+
26+
/**
27+
* Suppose I want to build a graph from some text files, restrict the graph
28+
* to important relationships and users, run page-rank on the sub-graph, and
29+
* then finally return attributes associated with the top users.
30+
* This example do all of this in just a few lines with GraphX.
31+
*
32+
* Run with
33+
* {{{
34+
* bin/run-example graphx.ComprehensiveExample
35+
* }}}
36+
*/
37+
object ComprehensiveExample {
38+
39+
def main(args: Array[String]): Unit = {
40+
// Creates a SparkSession.
41+
val spark = SparkSession
42+
.builder
43+
.appName(s"${this.getClass.getSimpleName}")
44+
.getOrCreate()
45+
val sc = spark.sparkContext
46+
47+
// $example on$
48+
// Load my user data and parse into tuples of user id and attribute list
49+
val users = (sc.textFile("data/graphx/users.txt")
50+
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
51+
52+
// Parse the edge data which is already in userId -> userId format
53+
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
54+
55+
// Attach the user attributes
56+
val graph = followerGraph.outerJoinVertices(users) {
57+
case (uid, deg, Some(attrList)) => attrList
58+
// Some users may not have attributes so we set them as empty
59+
case (uid, deg, None) => Array.empty[String]
60+
}
61+
62+
// Restrict the graph to users with usernames and names
63+
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
64+
65+
// Compute the PageRank
66+
val pagerankGraph = subgraph.pageRank(0.001)
67+
68+
// Get the attributes of the top pagerank users
69+
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
70+
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
71+
case (uid, attrList, None) => (0.0, attrList.toList)
72+
}
73+
74+
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
75+
// $example off$
76+
77+
spark.stop()
78+
}
79+
}
80+
// scalastyle:on println

0 commit comments

Comments
 (0)