Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag

import org.apache.hadoop.mapred.FileSplit
import org.apache.spark.{Partition, SparkEnv, TaskContext}


/**
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
Expand Down Expand Up @@ -59,6 +61,20 @@ class PipedRDD[T: ClassTag](
val currentEnvVars = pb.environment()
envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) }

// for compatibility with Hadoop which sets these env variables
// so the user code can access the input filename
if (split.isInstanceOf[HadoopPartition]) {
val hadoopSplit = split.asInstanceOf[HadoopPartition]

if (hadoopSplit.inputSplit.value.isInstanceOf[FileSplit]) {
val is: FileSplit = hadoopSplit.inputSplit.value.asInstanceOf[FileSplit]
// map.input.file is deprecated in favor of mapreduce.map.input.file but set both
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: but mind saying map_input_file etc here (using underscores and not periods). Otherwise it's sorta confusing because it looks like these are hadoop conf options.

// since its not removed yet
currentEnvVars.put("map_input_file", is.getPath().toString())
currentEnvVars.put("mapreduce_map_input_file", is.getPath().toString())
}
}

val proc = pb.start()
val env = SparkEnv.get

Expand Down
41 changes: 41 additions & 0 deletions core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ package org.apache.spark

import org.scalatest.FunSuite


import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
import org.apache.hadoop.fs.Path

import scala.collection.Map
import org.apache.hadoop.io.{Text, LongWritable}

class PipedRDDSuite extends FunSuite with SharedSparkContext {

test("basic pipe") {
Expand Down Expand Up @@ -89,4 +97,37 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
}

test("test pipe exports map_input_file") {
testExportInputFile("map_input_file")
}

test("test pipe exports mapreduce_map_input_file") {
testExportInputFile("mapreduce_map_input_file")
}

def testExportInputFile(varName:String) {
val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering - any reason to create this whole fake HadoopRDD rather than just make a temporary file and call sc.textFile()? Seems okay either way, I just couldn't tell if there was something else gained by doing that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason I just was trying to avoid temporary file.

classOf[Text], 2) {
override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
override val getDependencies = List[Dependency[_]]()
override def compute(theSplit: Partition, context: TaskContext) = {
new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
new Text("b"))))
}
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good if this test first checks whether printenv is a valid command and just passes if it's not... since some distros or operating systems won't have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a convention or perhaps utilities for doing this already? I didn't see one doing quick look but I might have missed it.

Note I copied the printenv command from a test above so I'll change both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - well then since your'e just doing what's there already I guess it's fine. But if you want to be a true hero, I think something like this would work:

import scala.util.Try
import scala.sys.process._
val hasPrintEnv = Try(Process("printenv")!!).isSuccess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm definitely fine with fixing it, I was just wondering if we had some generic utilities that perhaps handled it for various platforms. Or perhaps a class that handled calling correct function depending on OS.

val tContext = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false,
taskMetrics = null)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
}

def generateFakeHadoopPartition(): HadoopPartition = {
val split = new FileSplit(new Path("/some/path"), 0, 1,
Array[String]("loc1", "loc2", "loc3", "loc4", "loc5"))
new HadoopPartition(sc.newRddId(), 1, split)
}

}