@@ -40,7 +40,7 @@ object ThreadingSuiteState {
4040}
4141
4242class ThreadingSuite extends FunSuite with LocalSparkContext {
43-
43+
4444 test(" accessing SparkContext form a different thread" ) {
4545 sc = new SparkContext (" local" , " test" )
4646 val nums = sc.parallelize(1 to 10 , 2 )
@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
149149 fail(" One or more threads didn't see runningThreads = 4" )
150150 }
151151 }
152+
153+ test(" set local properties in different thread" ) {
154+ sc = new SparkContext (" local" , " test" )
155+ val sem = new Semaphore (0 )
156+
157+ val threads = (1 to 5 ).map { i =>
158+ new Thread () {
159+ override def run () {
160+ sc.setLocalProperty(" test" , i.toString)
161+ assert(sc.getLocalProperty(" test" ) === i.toString)
162+ sem.release()
163+ }
164+ }
165+ }
166+
167+ threads.foreach(_.start())
168+
169+ sem.acquire(5 )
170+ assert(sc.getLocalProperty(" test" ) === null )
171+ }
172+
173+ test(" set and get local properties in parent-children thread" ) {
174+ sc = new SparkContext (" local" , " test" )
175+ sc.setLocalProperty(" test" , " parent" )
176+ val sem = new Semaphore (0 )
177+
178+ val threads = (1 to 5 ).map { i =>
179+ new Thread () {
180+ override def run () {
181+ assert(sc.getLocalProperty(" test" ) === " parent" )
182+ sc.setLocalProperty(" test" , i.toString)
183+ assert(sc.getLocalProperty(" test" ) === i.toString)
184+ sem.release()
185+ }
186+ }
187+ }
188+
189+ threads.foreach(_.start())
190+
191+ sem.acquire(5 )
192+ assert(sc.getLocalProperty(" test" ) === " parent" )
193+ assert(sc.getLocalProperty(" Foo" ) === null )
194+ }
152195}
0 commit comments