@@ -68,7 +68,7 @@ private[spark] class PythonRDD(
6868 // Start a thread to feed the process input from our parent's iterator
6969 val writerThread = new WriterThread (env, worker, split, context)
7070
71- context.addOnCompleteCallback { () =>
71+ context.addTaskCompletionListener { context =>
7272 writerThread.shutdownOnTaskCompletion()
7373
7474 // Cleanup the worker socket. This will also cause the Python worker to exit.
@@ -137,7 +137,7 @@ private[spark] class PythonRDD(
137137 }
138138 } catch {
139139
140- case e : Exception if context.interrupted =>
140+ case e : Exception if context.isInterrupted =>
141141 logDebug(" Exception thrown after task interruption" , e)
142142 throw new TaskKilledException
143143
@@ -176,7 +176,7 @@ private[spark] class PythonRDD(
176176
177177 /** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
178178 def shutdownOnTaskCompletion () {
179- assert(context.completed )
179+ assert(context.isCompleted )
180180 this .interrupt()
181181 }
182182
@@ -209,7 +209,7 @@ private[spark] class PythonRDD(
209209 PythonRDD .writeIteratorToStream(parent.iterator(split, context), dataOut)
210210 dataOut.flush()
211211 } catch {
212- case e : Exception if context.completed || context.interrupted =>
212+ case e : Exception if context.isCompleted || context.isInterrupted =>
213213 logDebug(" Exception thrown after task completion (likely due to cleanup)" , e)
214214
215215 case e : Exception =>
@@ -235,10 +235,10 @@ private[spark] class PythonRDD(
235235 override def run () {
236236 // Kill the worker if it is interrupted, checking until task completion.
237237 // TODO: This has a race condition if interruption occurs, as completed may still become true.
238- while (! context.interrupted && ! context.completed ) {
238+ while (! context.isInterrupted && ! context.isCompleted ) {
239239 Thread .sleep(2000 )
240240 }
241- if (! context.completed ) {
241+ if (! context.isCompleted ) {
242242 try {
243243 logWarning(" Incomplete task interrupted: Attempting to kill Python Worker" )
244244 env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
@@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
315315 JavaRDD .fromRDD(sc.sc.parallelize(objs, parallelism))
316316 }
317317
318+ def readBroadcastFromFile (sc : JavaSparkContext , filename : String ): Broadcast [Array [Byte ]] = {
319+ val file = new DataInputStream (new FileInputStream (filename))
320+ val length = file.readInt()
321+ val obj = new Array [Byte ](length)
322+ file.readFully(obj)
323+ sc.broadcast(obj)
324+ }
325+
318326 def writeIteratorToStream [T ](iter : Iterator [T ], dataOut : DataOutputStream ) {
319327 // The right way to implement this would be to use TypeTags to get the full
320328 // type of T. Since I don't want to introduce breaking changes throughout the
@@ -372,8 +380,8 @@ private[spark] object PythonRDD extends Logging {
372380 batchSize : Int ) = {
373381 val keyClass = Option (keyClassMaybeNull).getOrElse(" org.apache.hadoop.io.Text" )
374382 val valueClass = Option (valueClassMaybeNull).getOrElse(" org.apache.hadoop.io.Text" )
375- val kc = Class .forName (keyClass).asInstanceOf [Class [K ]]
376- val vc = Class .forName (valueClass).asInstanceOf [Class [V ]]
383+ val kc = Utils .classForName (keyClass).asInstanceOf [Class [K ]]
384+ val vc = Utils .classForName (valueClass).asInstanceOf [Class [V ]]
377385 val rdd = sc.sc.sequenceFile[K , V ](path, kc, vc, minSplits)
378386 val confBroadcasted = sc.sc.broadcast(new SerializableWritable (sc.hadoopConfiguration()))
379387 val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +448,9 @@ private[spark] object PythonRDD extends Logging {
440448 keyClass : String ,
441449 valueClass : String ,
442450 conf : Configuration ) = {
443- val kc = Class .forName (keyClass).asInstanceOf [Class [K ]]
444- val vc = Class .forName (valueClass).asInstanceOf [Class [V ]]
445- val fc = Class .forName (inputFormatClass).asInstanceOf [Class [F ]]
451+ val kc = Utils .classForName (keyClass).asInstanceOf [Class [K ]]
452+ val vc = Utils .classForName (valueClass).asInstanceOf [Class [V ]]
453+ val fc = Utils .classForName (inputFormatClass).asInstanceOf [Class [F ]]
446454 if (path.isDefined) {
447455 sc.sc.newAPIHadoopFile[K , V , F ](path.get, fc, kc, vc, conf)
448456 } else {
@@ -509,9 +517,9 @@ private[spark] object PythonRDD extends Logging {
509517 keyClass : String ,
510518 valueClass : String ,
511519 conf : Configuration ) = {
512- val kc = Class .forName (keyClass).asInstanceOf [Class [K ]]
513- val vc = Class .forName (valueClass).asInstanceOf [Class [V ]]
514- val fc = Class .forName (inputFormatClass).asInstanceOf [Class [F ]]
520+ val kc = Utils .classForName (keyClass).asInstanceOf [Class [K ]]
521+ val vc = Utils .classForName (valueClass).asInstanceOf [Class [V ]]
522+ val fc = Utils .classForName (inputFormatClass).asInstanceOf [Class [F ]]
515523 if (path.isDefined) {
516524 sc.sc.hadoopFile(path.get, fc, kc, vc)
517525 } else {
@@ -558,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
558566 for {
559567 k <- Option (keyClass)
560568 v <- Option (valueClass)
561- } yield (Class .forName (k), Class .forName (v))
569+ } yield (Utils .classForName (k), Utils .classForName (v))
562570 }
563571
564572 private def getKeyValueConverters (keyConverterClass : String , valueConverterClass : String ,
@@ -621,10 +629,10 @@ private[spark] object PythonRDD extends Logging {
621629 val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
622630 inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
623631 val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
624- val codec = Option (compressionCodecClass).map(Class .forName (_).asInstanceOf [Class [C ]])
632+ val codec = Option (compressionCodecClass).map(Utils .classForName (_).asInstanceOf [Class [C ]])
625633 val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
626634 new JavaToWritableConverter )
627- val fc = Class .forName (outputFormatClass).asInstanceOf [Class [F ]]
635+ val fc = Utils .classForName (outputFormatClass).asInstanceOf [Class [F ]]
628636 converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf (mergedConf), codec= codec)
629637 }
630638
@@ -653,7 +661,7 @@ private[spark] object PythonRDD extends Logging {
653661 val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
654662 val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
655663 new JavaToWritableConverter )
656- val fc = Class .forName (outputFormatClass).asInstanceOf [Class [F ]]
664+ val fc = Utils .classForName (outputFormatClass).asInstanceOf [Class [F ]]
657665 converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
658666 }
659667
0 commit comments