@@ -31,6 +31,8 @@ import scala.util.control.NonFatal
3131import akka .pattern .ask
3232import akka .util .Timeout
3333
34+ import org .apache .commons .lang3 .SerializationUtils
35+
3436import org .apache .spark ._
3537import org .apache .spark .broadcast .Broadcast
3638import org .apache .spark .executor .TaskMetrics
@@ -493,7 +495,8 @@ class DAGScheduler(
493495 val func2 = func.asInstanceOf [(TaskContext , Iterator [_]) => _]
494496 val waiter = new JobWaiter (this , jobId, partitions.size, resultHandler)
495497 eventProcessLoop.post(JobSubmitted (
496- jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
498+ jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
499+ SerializationUtils .clone(properties)))
497500 waiter
498501 }
499502
@@ -534,7 +537,8 @@ class DAGScheduler(
534537 val partitions = (0 until rdd.partitions.size).toArray
535538 val jobId = nextJobId.getAndIncrement()
536539 eventProcessLoop.post(JobSubmitted (
537- jobId, rdd, func2, partitions, allowLocal = false , callSite, listener, properties))
540+ jobId, rdd, func2, partitions, allowLocal = false , callSite, listener,
541+ SerializationUtils .clone(properties)))
538542 listener.awaitResult() // Will throw an exception if the job fails
539543 }
540544
0 commit comments