Skip to content

Commit e110d70

Browse files
committed
Merge branch 'master' into hll
Conflicts: project/MimaExcludes.scala
2 parents 354deb8 + b1feb60 commit e110d70

File tree

56 files changed

+952
-306
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+952
-306
lines changed

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
3838
sc.stop()
3939
sc = null
4040
}
41-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
42-
System.clearProperty("spark.driver.port")
4341
}
4442

4543
test("halting by voting") {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,8 +1015,26 @@ private[spark] class BlockManager(
10151015
bytes: ByteBuffer,
10161016
serializer: Serializer = defaultSerializer): Iterator[Any] = {
10171017
bytes.rewind()
1018-
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1019-
serializer.newInstance().deserializeStream(stream).asIterator
1018+
1019+
def getIterator = {
1020+
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1021+
serializer.newInstance().deserializeStream(stream).asIterator
1022+
}
1023+
1024+
if (blockId.isShuffle) {
1025+
// Reducer may need to read many local shuffle blocks and will wrap them into Iterators
1026+
// at the beginning. The wrapping will cost some memory (compression instance
1027+
// initialization, etc.). Reducer read shuffle blocks one by one so we could do the
1028+
// wrapping lazily to save memory.
1029+
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
1030+
lazy val proxy = f
1031+
override def hasNext: Boolean = proxy.hasNext
1032+
override def next(): Any = proxy.next()
1033+
}
1034+
new LazyProxyIterator(getIterator)
1035+
} else {
1036+
getIterator
1037+
}
10201038
}
10211039

10221040
def stop() {

core/src/main/scala/org/apache/spark/storage/StorageLevel.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,27 @@ object StorageLevel {
147147
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
148148
val OFF_HEAP = new StorageLevel(false, false, true, false)
149149

150+
/**
151+
* :: DeveloperApi ::
152+
* Return the StorageLevel object with the specified name.
153+
*/
154+
@DeveloperApi
155+
def fromString(s: String): StorageLevel = s match {
156+
case "NONE" => NONE
157+
case "DISK_ONLY" => DISK_ONLY
158+
case "DISK_ONLY_2" => DISK_ONLY_2
159+
case "MEMORY_ONLY" => MEMORY_ONLY
160+
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
161+
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
162+
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
163+
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
164+
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
165+
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
166+
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
167+
case "OFF_HEAP" => OFF_HEAP
168+
case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
169+
}
170+
150171
/**
151172
* :: DeveloperApi ::
152173
* Create a new StorageLevel object without setting useOffHeap.

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ public void setUp() {
6868
public void tearDown() {
6969
sc.stop();
7070
sc = null;
71-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
72-
System.clearProperty("spark.driver.port");
73-
Utils.deleteRecursively(tempDir);
7471
}
7572

7673
static class ReverseIntComparator implements Comparator<Integer>, Serializable {

core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
3939
val hostname = "localhost"
4040
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
4141
conf = conf, securityManager = securityManager)
42-
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
4342
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
4443
assert(securityManager.isAuthenticationEnabled() === true)
4544

@@ -77,7 +76,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
7776
val hostname = "localhost"
7877
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
7978
conf = conf, securityManager = securityManager)
80-
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
8179
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
8280

8381
assert(securityManager.isAuthenticationEnabled() === false)
@@ -129,7 +127,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
129127
val hostname = "localhost"
130128
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
131129
conf = conf, securityManager = securityManager)
132-
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
133130
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
134131

135132
assert(securityManager.isAuthenticationEnabled() === true)
@@ -182,7 +179,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
182179
val hostname = "localhost"
183180
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
184181
conf = conf, securityManager = securityManager)
185-
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
186182
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
187183

188184
assert(securityManager.isAuthenticationEnabled() === true)

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
124124
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
125125
securityManager = new SecurityManager(conf))
126126

127-
// Will be cleared by LocalSparkContext
128-
System.setProperty("spark.driver.port", boundPort.toString)
129-
130127
val masterTracker = new MapOutputTrackerMaster(conf)
131128
masterTracker.trackerActor = actorSystem.actorOf(
132129
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
7878
}
7979

8080
after {
81-
System.clearProperty("spark.driver.port")
82-
8381
if (store != null) {
8482
store.stop()
8583
store = null

docs/spark-debugger.md

Lines changed: 0 additions & 121 deletions
This file was deleted.
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.graphx
19+
20+
import org.apache.spark.SparkContext._
21+
import org.apache.spark.graphx.PartitionStrategy
22+
import org.apache.spark.{SparkContext, SparkConf}
23+
import org.apache.spark.graphx.util.GraphGenerators
24+
import java.io.{PrintWriter, FileOutputStream}
25+
26+
/**
27+
* The SynthBenchmark application can be used to run various GraphX algorithms on
28+
* synthetic log-normal graphs. The intent of this code is to enable users to
29+
* profile the GraphX system without access to large graph datasets.
30+
*/
31+
object SynthBenchmark {
32+
33+
/**
34+
* To run this program use the following:
35+
*
36+
* MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
37+
*
38+
* Options:
39+
* -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
40+
* -niters the number of iterations of pagerank to use (Default: 10)
41+
* -numVertices the number of vertices in the graph (Default: 1000000)
42+
* -numEPart the number of edge partitions in the graph (Default: number of cores)
43+
* -partStrategy the graph partitioning strategy to use
44+
* -mu the mean parameter for the log-normal graph (Default: 4.0)
45+
* -sigma the stdev parameter for the log-normal graph (Default: 1.3)
46+
* -degFile the local file to save the degree information (Default: Empty)
47+
*/
48+
def main(args: Array[String]) {
49+
val options = args.map {
50+
arg =>
51+
arg.dropWhile(_ == '-').split('=') match {
52+
case Array(opt, v) => (opt -> v)
53+
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
54+
}
55+
}
56+
57+
var app = "pagerank"
58+
var niter = 10
59+
var numVertices = 100000
60+
var numEPart: Option[Int] = None
61+
var partitionStrategy: Option[PartitionStrategy] = None
62+
var mu: Double = 4.0
63+
var sigma: Double = 1.3
64+
var degFile: String = ""
65+
66+
options.foreach {
67+
case ("app", v) => app = v
68+
case ("niter", v) => niter = v.toInt
69+
case ("nverts", v) => numVertices = v.toInt
70+
case ("numEPart", v) => numEPart = Some(v.toInt)
71+
case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
72+
case ("mu", v) => mu = v.toDouble
73+
case ("sigma", v) => sigma = v.toDouble
74+
case ("degFile", v) => degFile = v
75+
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
76+
}
77+
78+
val conf = new SparkConf()
79+
.setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
80+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
81+
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
82+
83+
val sc = new SparkContext(conf)
84+
85+
// Create the graph
86+
println(s"Creating graph...")
87+
val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
88+
numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
89+
// Repartition the graph
90+
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
91+
92+
var startTime = System.currentTimeMillis()
93+
val numEdges = graph.edges.count()
94+
println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
95+
val loadTime = System.currentTimeMillis() - startTime
96+
97+
// Collect the degree distribution (if desired)
98+
if (!degFile.isEmpty) {
99+
val fos = new FileOutputStream(degFile)
100+
val pos = new PrintWriter(fos)
101+
val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
102+
.map(p => p._2).countByValue()
103+
hist.foreach {
104+
case (deg, count) => pos.println(s"$deg \t $count")
105+
}
106+
}
107+
108+
// Run PageRank
109+
startTime = System.currentTimeMillis()
110+
if (app == "pagerank") {
111+
println("Running PageRank")
112+
val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
113+
println(s"Total PageRank = $totalPR")
114+
} else if (app == "cc") {
115+
println("Running Connected Components")
116+
val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
117+
println(s"Number of components = $numComponents")
118+
}
119+
val runTime = System.currentTimeMillis() - startTime
120+
121+
println(s"Num Vertices = $numVertices")
122+
println(s"Num Edges = $numEdges")
123+
println(s"Creation time = ${loadTime/1000.0} seconds")
124+
println(s"Run time = ${runTime/1000.0} seconds")
125+
126+
sc.stop()
127+
}
128+
}

examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object RDDRelation {
4343
sql("SELECT * FROM records").collect().foreach(println)
4444

4545
// Aggregation queries are also supported.
46-
val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0)
46+
val count = sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)
4747
println(s"COUNT(*): $count")
4848

4949
// The results of SQL queries are themselves RDDs and support all normal RDD functions. The

0 commit comments

Comments
 (0)