Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new ThreadLocal[Properties]
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

def initLocalProperties() {
localProperties.set(new Properties())
Expand All @@ -273,6 +275,8 @@ class SparkContext(
}
}

def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure the line does not exceed 100 chars.


/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
Expand Down
38 changes: 37 additions & 1 deletion core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ThreadingSuiteState {
}

class ThreadingSuite extends FunSuite with LocalSparkContext {

test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
Expand Down Expand Up @@ -149,4 +149,40 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}

test("set local properties in different thread") {
sc = new SparkContext("local", "test")

val threads = (1 to 5).map{ i =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before {

new Thread() {
override def run() {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
}
}
}

threads.foreach(_.start())

assert(sc.getLocalProperty("test") === null)
}

test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")

val threads = (1 to 5).map{ i =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a space before {

new Thread() {
override def run() {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
}
}
}

threads.foreach(_.start())
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need a barrier here to properly test this?

}
}