Skip to content

Commit a186792

Browse files
committed
Merge pull request mesos#937 from jerryshao/localProperties-fix
Fix PR926 local properties issues in Spark Streaming like scenarios (cherry picked from commit a2ea069) Signed-off-by: Reynold Xin <[email protected]>
1 parent f3c60c9 commit a186792

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ class SparkContext(
256256
private[spark] var checkpointDir: Option[String] = None
257257

258258
// Thread Local variable that can be used by users to pass information down the stack
259-
private val localProperties = new ThreadLocal[Properties]
259+
private val localProperties = new InheritableThreadLocal[Properties] {
260+
override protected def childValue(parent: Properties): Properties = new Properties(parent)
261+
}
260262

261263
def initLocalProperties() {
262264
localProperties.set(new Properties())
@@ -273,6 +275,9 @@ class SparkContext(
273275
}
274276
}
275277

278+
def getLocalProperty(key: String): String =
279+
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
280+
276281
/** Set a human readable description of the current job. */
277282
def setJobDescription(value: String) {
278283
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
4040
}
4141

4242
class ThreadingSuite extends FunSuite with LocalSparkContext {
43-
43+
4444
test("accessing SparkContext form a different thread") {
4545
sc = new SparkContext("local", "test")
4646
val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
149149
fail("One or more threads didn't see runningThreads = 4")
150150
}
151151
}
152+
153+
test("set local properties in different thread") {
154+
sc = new SparkContext("local", "test")
155+
val sem = new Semaphore(0)
156+
157+
val threads = (1 to 5).map { i =>
158+
new Thread() {
159+
override def run() {
160+
sc.setLocalProperty("test", i.toString)
161+
assert(sc.getLocalProperty("test") === i.toString)
162+
sem.release()
163+
}
164+
}
165+
}
166+
167+
threads.foreach(_.start())
168+
169+
sem.acquire(5)
170+
assert(sc.getLocalProperty("test") === null)
171+
}
172+
173+
test("set and get local properties in parent-children thread") {
174+
sc = new SparkContext("local", "test")
175+
sc.setLocalProperty("test", "parent")
176+
val sem = new Semaphore(0)
177+
178+
val threads = (1 to 5).map { i =>
179+
new Thread() {
180+
override def run() {
181+
assert(sc.getLocalProperty("test") === "parent")
182+
sc.setLocalProperty("test", i.toString)
183+
assert(sc.getLocalProperty("test") === i.toString)
184+
sem.release()
185+
}
186+
}
187+
}
188+
189+
threads.foreach(_.start())
190+
191+
sem.acquire(5)
192+
assert(sc.getLocalProperty("test") === "parent")
193+
assert(sc.getLocalProperty("Foo") === null)
194+
}
152195
}

0 commit comments

Comments
 (0)