Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4dcea38
Move withSystemProperty to TestUtils class.
JoshRosen Dec 19, 2014
9e3e0dd
Add ResetSystemProperties test fixture mixin; use it in SparkSubmitSu…
JoshRosen Dec 19, 2014
628f46c
Use ResetSystemProperties in DistributedSuite
JoshRosen Dec 19, 2014
14a92e4
Use withSystemProperty in FileServerSuite
JoshRosen Dec 19, 2014
60a63a1
Use ResetSystemProperties in JobCancellationSuite
JoshRosen Dec 19, 2014
51aa870
Use withSystemProperty in ShuffleSuite
JoshRosen Dec 19, 2014
c83ded8
Use ResetSystemProperties in SparkConfSuite
JoshRosen Dec 19, 2014
0995c4b
Use ResetSystemProperties in SparkContextSchedulerCreationSuite
JoshRosen Dec 19, 2014
5b3cb54
Use ResetSystemProperties in SparkListenerSuite
JoshRosen Dec 19, 2014
e9ded62
Use ResetSystemProperties in TaskSchedulerImplSuite
JoshRosen Dec 19, 2014
b0daff2
Use ResetSystemProperties in BlockManagerSuite
JoshRosen Dec 19, 2014
dd9492b
Use ResetSystemProperties in AkkaUtilsSuite
JoshRosen Dec 19, 2014
1d1aa5a
Use ResetSystemProperties in SizeEstimatorSuite
JoshRosen Dec 19, 2014
25bfce2
Use ResetSystemProperties in UtilsSuite
JoshRosen Dec 19, 2014
633a84a
Remove use of system properties in FileServerSuite
JoshRosen Dec 24, 2014
8783ab0
Remove TestUtils.setSystemProperty, since it is subsumed by the Reset…
JoshRosen Dec 24, 2014
cfe9cce
Remove use of system properties in SparkContextSuite
JoshRosen Dec 24, 2014
3f2f955
Remove System.setProperty calls in DistributedSuite
JoshRosen Dec 24, 2014
655587c
Remove setProperty calls in JobCancellationSuite
JoshRosen Dec 24, 2014
bee20df
Remove setProperty calls in SparkContextSchedulerCreationSuite
JoshRosen Dec 24, 2014
3fdb554
Remove setProperty call in TaskSchedulerImplSuite
JoshRosen Dec 24, 2014
7a3d224
Fix trait ordering
JoshRosen Dec 24, 2014
0eaf0b6
Remove setProperty call in TaskResultGetterSuite.
JoshRosen Dec 24, 2014
4742a5b
Clarify ResetSystemProperties trait inheritance ordering.
JoshRosen Dec 25, 2014
4f4031d
Add note on why SparkSubmitSuite needs ResetSystemProperties
JoshRosen Dec 25, 2014
3888fe3
Remove setProperty use in LocalJavaStreamingContext
JoshRosen Dec 25, 2014
0236d66
Replace setProperty uses in two example programs / tools
JoshRosen Dec 25, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,19 @@ private[spark] object TestUtils {
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
out
}

/** Allows system properties to be changed in tests */
def withSystemProperty[T](property: String, value: String)(block: => T): T = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was some old code that was originally located in another file. I suppose I could remove it now, since it's probably subsumed by ResetSystemProperties in most cases.

val originalValue = System.getProperty(property)
try {
System.setProperty(property, value)
block
} finally {
if (originalValue == null) {
System.clearProperty(property)
} else {
System.setProperty(property, originalValue)
}
}
}
}
13 changes: 2 additions & 11 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,23 @@

package org.apache.spark

import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.{Millis, Span}

import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.ResetSystemProperties

class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}


class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
class DistributedSuite extends FunSuite with Matchers with ResetSystemProperties
with LocalSparkContext {

val clusterUrl = "local-cluster[2,1,512]"

after {
System.clearProperty("spark.reducer.maxMbInFlight")
System.clearProperty("spark.storage.memoryFraction")
}

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
Expand Down Expand Up @@ -92,7 +87,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
assert(groups.length === 16)
assert(groups.map(_._2).sum === 2000)
// Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
}

test("accumulators") {
Expand Down Expand Up @@ -210,15 +204,13 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("compute without caching when no partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.0001")
sc = new SparkContext(clusterUrl, "test")
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("compute when only some partitions fit in memory") {
Expand All @@ -231,7 +223,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("passing environment variables to cluster") {
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var tmpJarUrl: String = _

override def beforeEach() {
super.beforeEach()
override def withFixture(test: NoArgTest) = {
resetSparkContext()
System.setProperty("spark.authenticate", "false")
TestUtils.withSystemProperty("spark.authenticate", "false") {
super.withFixture(test)
}
}

override def beforeAll() {
Expand All @@ -52,7 +53,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarFile = new File(testTempDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")

val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.Matchers

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.util.ResetSystemProperties

/**
* Test suite for cancelling running jobs. We run the cancellation tasks for single job action
* (e.g. count) as well as multi-job action (e.g. take). We test the local and cluster schedulers
* in both FIFO and fair scheduling modes.
*/
class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
with LocalSparkContext {
with ResetSystemProperties with LocalSparkContext {

override def afterEach() {
super.afterEach()
resetSparkContext()
System.clearProperty("spark.scheduler.mode")
}

test("local mode, FIFO scheduler") {
Expand Down
5 changes: 1 addition & 4 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
conf.set("spark.test.noStageRetry", "true")

test("groupByKey without compression") {
try {
System.setProperty("spark.shuffle.compress", "false")
TestUtils.withSystemProperty("spark.shuffle.compress", "false") {
sc = new SparkContext("local", "test", conf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
Expand All @@ -45,8 +44,6 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
} finally {
System.setProperty("spark.shuffle.compress", "true")
}
}

Expand Down
51 changes: 19 additions & 32 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,20 @@ package org.apache.spark

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext {
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("loading from system properties") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
}

test("initializing without loading defaults") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
}

test("named set methods") {
Expand Down Expand Up @@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {

test("nested property names") {
// This wasn't supported by some external conf parsing libraries
try {
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
} finally {
System.clearProperty("spark.test.a")
System.clearProperty("spark.test.a.b")
System.clearProperty("spark.test.a.b.c")
}
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}

test("register kryo classes through registerKryoClasses") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedule
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.util.ResetSystemProperties

class SparkContextSchedulerCreationSuite
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging
with ResetSystemProperties {

def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
Expand Down Expand Up @@ -102,19 +104,13 @@ class SparkContextSchedulerCreationSuite
}

test("local-default-parallelism") {
val defaultParallelism = System.getProperty("spark.default.parallelism")
System.setProperty("spark.default.parallelism", "16")
val sched = createTaskScheduler("local")

sched.backend match {
case s: LocalBackend => assert(s.defaultParallelism() === 16)
case _ => fail()
}

Option(defaultParallelism) match {
case Some(v) => System.setProperty("spark.default.parallelism", v)
case _ => System.clearProperty("spark.default.parallelism")
}
}

test("simr") {
Expand Down
21 changes: 3 additions & 18 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,9 @@ import org.apache.hadoop.io.BytesWritable

class SparkContextSuite extends FunSuite with LocalSparkContext {

/** Allows system properties to be changed in tests */
private def withSystemProperty[T](property: String, value: String)(block: => T): T = {
val originalValue = System.getProperty(property)
try {
System.setProperty(property, value)
block
} finally {
if (originalValue == null) {
System.clearProperty(property)
} else {
System.setProperty(property, originalValue)
}
}
}

test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
withSystemProperty("spark.driver.allowMultipleContexts", "false") {
TestUtils.withSystemProperty("spark.driver.allowMultipleContexts", "false") {
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf)
// A SparkContext is already running, so we shouldn't be able to create a second one
Expand All @@ -52,7 +37,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
}

test("Can still construct a new SparkContext after failing to construct a previous one") {
withSystemProperty("spark.driver.allowMultipleContexts", "false") {
TestUtils.withSystemProperty("spark.driver.allowMultipleContexts", "false") {
// This is an invalid configuration (no app name or master URL)
intercept[SparkException] {
new SparkContext(new SparkConf())
Expand All @@ -63,7 +48,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
}

test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
withSystemProperty("spark.driver.allowMultipleContexts", "true") {
TestUtils.withSystemProperty("spark.driver.allowMultipleContexts", "true") {
var secondSparkContext: SparkContext = null
try {
val conf = new SparkConf().setAppName("test").setMaster("local")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark._
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ResetSystemProperties, Utils}
import org.scalatest.FunSuite
import org.scalatest.Matchers

class SparkSubmitSuite extends FunSuite with Matchers {
class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
def beforeAll() {
System.setProperty("spark.testing", "true")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties

class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
with BeforeAndAfter with BeforeAndAfterAll {
class SparkListenerSuite extends FunSuite with ResetSystemProperties with LocalSparkContext
with Matchers with BeforeAndAfter with BeforeAndAfterAll {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
Expand All @@ -37,10 +38,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
sc = new SparkContext("local", "SparkListenerSuite")
}

override def afterAll() {
System.clearProperty("spark.akka.frameSize")
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.scheduler

import java.util.Properties

import org.apache.spark.util.ResetSystemProperties
import org.scalatest.FunSuite

import org.apache.spark._
Expand Down Expand Up @@ -109,7 +110,8 @@ class FakeTaskSetManager(
}
}

class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
class TaskSchedulerImplSuite extends FunSuite with ResetSystemProperties with LocalSparkContext
with Logging {

def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl,
taskSet: TaskSet): FakeTaskSetManager = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@ import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
import org.apache.spark.util._


class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
with PrivateMethodTester {
with PrivateMethodTester with ResetSystemProperties {

private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
conf.set("spark.authenticate", "false")
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
Expand Down Expand Up @@ -85,7 +84,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
this.actorSystem = actorSystem

// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
System.setProperty("os.arch", "amd64")
conf.set("os.arch", "amd64")
conf.set("spark.test.useCompressedOops", "true")
conf.set("spark.driver.port", boundPort.toString)
Expand Down Expand Up @@ -113,14 +112,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
actorSystem.awaitTermination()
actorSystem = null
master = null

if (oldArch != null) {
conf.set("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}

System.clearProperty("spark.test.useCompressedOops")
}

test("StorageLevel object caching") {
Expand Down
Loading