Skip to content

Commit d800a46

Browse files
committed
Merge pull request #2 from markhamstra/master-csd
Catching up with Apache and reverting the Zookeeper version
2 parents db11c21 + 2f95938 commit d800a46

File tree

52 files changed

+2897
-740
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2897
-740
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ described below.
6969

7070
When developing a Spark application, specify the Hadoop version by adding the
7171
"hadoop-client" artifact to your project's dependencies. For example, if you're
72-
using Hadoop 1.0.1 and build your application using SBT, add this entry to
72+
using Hadoop 1.2.1 and build your application using SBT, add this entry to
7373
`libraryDependencies`:
7474

7575
"org.apache.hadoop" % "hadoop-client" % "1.2.1"

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
2424

2525
import scala.collection.Map
2626
import scala.collection.generic.Growable
27-
import scala.collection.JavaConversions._
27+
import scala.collection.JavaConverters._
2828
import scala.collection.mutable.ArrayBuffer
2929
import scala.collection.mutable.HashMap
3030

@@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
5151

5252
import org.apache.mesos.MesosNativeLibrary
5353

54-
import org.apache.spark.deploy.LocalSparkCluster
54+
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
5555
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5656
import org.apache.spark.rdd._
5757
import org.apache.spark.scheduler._
@@ -248,7 +248,7 @@ class SparkContext(
248248
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
249249
val hadoopConfiguration = {
250250
val env = SparkEnv.get
251-
val conf = env.hadoop.newConfiguration()
251+
val conf = SparkHadoopUtil.get.newConfiguration()
252252
// Explicitly check for S3 environment variables
253253
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
254254
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
@@ -258,8 +258,10 @@ class SparkContext(
258258
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
259259
}
260260
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
261-
for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
262-
conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
261+
Utils.getSystemProperties.foreach { case (key, value) =>
262+
if (key.startsWith("spark.hadoop.")) {
263+
conf.set(key.substring("spark.hadoop.".length), value)
264+
}
263265
}
264266
val bufferSize = System.getProperty("spark.buffer.size", "65536")
265267
conf.set("io.file.buffer.size", bufferSize)
@@ -382,7 +384,7 @@ class SparkContext(
382384
minSplits: Int = defaultMinSplits
383385
): RDD[(K, V)] = {
384386
// Add necessary security credentials to the JobConf before broadcasting it.
385-
SparkEnv.get.hadoop.addCredentials(conf)
387+
SparkHadoopUtil.get.addCredentials(conf)
386388
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
387389
}
388390

@@ -592,7 +594,8 @@ class SparkContext(
592594
val uri = new URI(path)
593595
val key = uri.getScheme match {
594596
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
595-
case _ => path
597+
case "local" => "file:" + uri.getPath
598+
case _ => path
596599
}
597600
addedFiles(key) = System.currentTimeMillis
598601

@@ -686,7 +689,7 @@ class SparkContext(
686689
/**
687690
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
688691
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
689-
* filesystems), or an HTTP, HTTPS or FTP URI.
692+
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
690693
*/
691694
def addJar(path: String) {
692695
if (path == null) {
@@ -699,8 +702,9 @@ class SparkContext(
699702
} else {
700703
val uri = new URI(path)
701704
key = uri.getScheme match {
705+
// A JAR file which exists only on the driver node
702706
case null | "file" =>
703-
if (env.hadoop.isYarnMode()) {
707+
if (SparkHadoopUtil.get.isYarnMode()) {
704708
// In order for this to work on yarn the user must specify the --addjars option to
705709
// the client to upload the file into the distributed cache to make it show up in the
706710
// current working directory.
@@ -716,6 +720,9 @@ class SparkContext(
716720
} else {
717721
env.httpFileServer.addJar(new File(uri.getPath))
718722
}
723+
// A JAR file which exists locally on every worker node
724+
case "local" =>
725+
"file:" + uri.getPath
719726
case _ =>
720727
path
721728
}
@@ -935,9 +942,8 @@ class SparkContext(
935942
* prevent accidental overriding of checkpoint files in the existing directory.
936943
*/
937944
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
938-
val env = SparkEnv.get
939945
val path = new Path(dir)
940-
val fs = path.getFileSystem(env.hadoop.newConfiguration())
946+
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
941947
if (!useExisting) {
942948
if (fs.exists(path)) {
943949
throw new Exception("Checkpoint directory '" + path + "' already exists.")

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider
2525

2626
import org.apache.spark.broadcast.BroadcastManager
2727
import org.apache.spark.metrics.MetricsSystem
28-
import org.apache.spark.deploy.SparkHadoopUtil
2928
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
3029
import org.apache.spark.network.ConnectionManager
3130
import org.apache.spark.serializer.{Serializer, SerializerManager}
3231
import org.apache.spark.util.{Utils, AkkaUtils}
3332
import org.apache.spark.api.python.PythonWorkerFactory
3433

34+
import com.google.common.collect.MapMaker
3535

3636
/**
3737
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
@@ -58,18 +58,9 @@ class SparkEnv (
5858

5959
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
6060

61-
val hadoop = {
62-
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
63-
if(yarnMode) {
64-
try {
65-
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
66-
} catch {
67-
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
68-
}
69-
} else {
70-
new SparkHadoopUtil
71-
}
72-
}
61+
// A general, soft-reference map for metadata needed during HadoopRDD split computation
62+
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
63+
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
7364

7465
def stop() {
7566
pythonWorkers.foreach { case(key, worker) => worker.stop() }

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,4 +622,15 @@ object JavaPairRDD {
622622
new JavaPairRDD[K, V](rdd)
623623

624624
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
625+
626+
627+
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
628+
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
629+
implicit val cmk: ClassManifest[K] =
630+
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
631+
implicit val cmv: ClassManifest[V] =
632+
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
633+
new JavaPairRDD[K, V](rdd.rdd)
634+
}
635+
625636
}

core/src/main/scala/org/apache/spark/api/java/function/Function3.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import scala.reflect.ClassManifest;
2121
import scala.reflect.ClassManifest$;
22-
import scala.runtime.AbstractFunction3;
2322

2423
import java.io.Serializable;
2524

@@ -35,4 +34,3 @@ public ClassManifest<R> returnType() {
3534
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
3635
}
3736
}
38-

core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import scala.runtime.AbstractFunction3
2424
* apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
2525
* isn't marked to allow that).
2626
*/
27-
private[spark] abstract class WrappedFunction3[T1, T2, T3, R] extends AbstractFunction3[T1, T2, T3, R] {
27+
private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
28+
extends AbstractFunction3[T1, T2, T3, R] {
2829
@throws(classOf[Exception])
2930
def call(t1: T1, t2: T2, t3: T3): R
3031

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,13 @@ package org.apache.spark.deploy
2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.mapred.JobConf
2222

23-
import com.google.common.collect.MapMaker
24-
23+
import org.apache.spark.SparkException
2524

2625
/**
2726
* Contains util methods to interact with Hadoop from Spark.
2827
*/
2928
private[spark]
3029
class SparkHadoopUtil {
31-
// A general, soft-reference map for metadata needed during HadoopRDD split computation
32-
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
33-
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
3430

3531
/**
3632
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
@@ -45,5 +41,23 @@ class SparkHadoopUtil {
4541
def addCredentials(conf: JobConf) {}
4642

4743
def isYarnMode(): Boolean = { false }
48-
44+
}
45+
46+
object SparkHadoopUtil {
47+
private val hadoop = {
48+
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
49+
if (yarnMode) {
50+
try {
51+
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
52+
} catch {
53+
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
54+
}
55+
} else {
56+
new SparkHadoopUtil
57+
}
58+
}
59+
60+
def get: SparkHadoopUtil = {
61+
hadoop
62+
}
4963
}

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.rdd
1919

2020
import org.apache.spark._
21+
import org.apache.spark.deploy.SparkHadoopUtil
2122
import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.io.{NullWritable, BytesWritable}
@@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
8384
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
8485
val env = SparkEnv.get
8586
val outputDir = new Path(path)
86-
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
87+
val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
8788

8889
val finalOutputName = splitIdToFile(ctx.partitionId)
8990
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {
122123

123124
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
124125
val env = SparkEnv.get
125-
val fs = path.getFileSystem(env.hadoop.newConfiguration())
126+
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
126127
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
127128
val fileInputStream = fs.open(path, bufferSize)
128129
val serializer = env.serializer.newInstance()
@@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
145146
val sc = new SparkContext(cluster, "CheckpointRDD Test")
146147
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
147148
val path = new Path(hdfsPath, "temp")
148-
val fs = path.getFileSystem(env.hadoop.newConfiguration())
149+
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
149150
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
150151
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
151152
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils
2929

3030
import org.apache.spark._
3131
import org.apache.spark.broadcast.Broadcast
32+
import org.apache.spark.deploy.SparkHadoopUtil
3233
import org.apache.spark.util.NextIterator
3334
import org.apache.hadoop.conf.{Configuration, Configurable}
3435

@@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
198199
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
199200
* the local process.
200201
*/
201-
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
202+
def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)
202203

203-
def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
204+
def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)
204205

205206
def putCachedMetadata(key: String, value: Any) =
206-
SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
207+
SparkEnv.get.hadoopJobMetadata.put(key, value)
207208
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,15 +420,14 @@ class DAGScheduler(
420420
case ExecutorLost(execId) =>
421421
handleExecutorLost(execId)
422422

423-
case begin: BeginEvent =>
424-
listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo))
423+
case BeginEvent(task, taskInfo) =>
424+
listenerBus.post(SparkListenerTaskStart(task, taskInfo))
425425

426-
case gettingResult: GettingResultEvent =>
427-
listenerBus.post(SparkListenerTaskGettingResult(gettingResult.task, gettingResult.taskInfo))
426+
case GettingResultEvent(task, taskInfo) =>
427+
listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))
428428

429-
case completion: CompletionEvent =>
430-
listenerBus.post(SparkListenerTaskEnd(
431-
completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))
429+
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
430+
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
432431
handleTaskCompletion(completion)
433432

434433
case TaskSetFailed(taskSet, reason) =>

0 commit comments

Comments
 (0)