@@ -24,9 +24,11 @@ import com.google.common.base.Charsets._
2424import com .google .common .io .Files
2525
2626import org .apache .hadoop .io .{BytesWritable , LongWritable , Text }
27+ import org .apache .hadoop .conf .Configuration
2728import org .apache .hadoop .mapred .TextInputFormat
2829import org .apache .hadoop .mapreduce .lib .input .{TextInputFormat => NewTextInputFormat }
2930import org .apache .spark .util .Utils
31+ import org .apache .spark .rdd .{RDD , HadoopRDD , NewHadoopRDD }
3032
3133import scala .concurrent .Await
3234import scala .concurrent .duration .Duration
@@ -272,4 +274,29 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
272274 sc.stop()
273275 }
274276 }
277+
278+ test(" Passing configuration into methods that create (New)HadoopRDD (SPARK-8398)" ) {
279+ try {
280+ sc = new SparkContext (new SparkConf ().setAppName(" test" ).setMaster(" local" ))
281+ val conf = new Configuration (sc.hadoopConfiguration)
282+ val k = " test"
283+ val v = " dummyForTest"
284+ conf.set(k, v)
285+ def sourceRDD (rdd : RDD [_]): RDD [_] =
286+ if (! rdd.dependencies.isEmpty) rdd.dependencies.head.rdd else rdd
287+
288+ assert(sourceRDD(sc.textFile(" nonexistent" , 1 , conf)).asInstanceOf [HadoopRDD [_, _]]
289+ .getConf.get(k) == v)
290+ assert(sourceRDD(sc.wholeTextFiles(" nonexistent" , 1 , conf)).asInstanceOf [NewHadoopRDD [_, _]]
291+ .getConf.get(k) == v)
292+ assert(sourceRDD(sc.binaryFiles(" nonexistent" , 1 , conf)).asInstanceOf [NewHadoopRDD [_, _]]
293+ .getConf.get(k) == v)
294+ assert(sourceRDD(sc.sequenceFile[Int , Int ](" nonexistent" , 1 , conf)).asInstanceOf [HadoopRDD [_, _]]
295+ .getConf.get(k) == v)
296+ assert(sourceRDD(sc.objectFile[Int ](" nonexistent" , 1 , conf)).asInstanceOf [HadoopRDD [_, _]]
297+ .getConf.get(k) == v)
298+ } finally {
299+ sc.stop()
300+ }
301+ }
275302}
0 commit comments