Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
81c5f12
Merge pull request #1 from apache/master
kmader Jul 30, 2014
84035f1
adding binary and byte file support spark
kmader Jul 30, 2014
1cfa38a
added apache headers, added datainputstream directly as an output opt…
kmader Jul 31, 2014
1622935
changing the line lengths to make jenkins happy
kmader Jul 31, 2014
eacfaa6
Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab …
kmader Aug 13, 2014
f4841dc
un-optimizing imports, silly intellij
kmader Aug 13, 2014
edf5829
fixing line lengths, adding new lines
kmader Aug 13, 2014
9a313d5
making classes that needn't be public private, adding automatic file …
kmader Aug 14, 2014
df8e528
fixed line lengths and changed java test
kmader Aug 14, 2014
bc5c0b9
made minor stylistic adjustments from mateiz
kmader Aug 14, 2014
f032bc0
fixed bug in path name, renamed tests
kmader Aug 14, 2014
5deb79e
added new portabledatastream to code so that it can be serialized cor…
kmader Sep 7, 2014
12e7be1
removing imglib from maven (definitely not ready yet)
kmader Sep 7, 2014
441f79a
fixed a few small comments and dependency
kmader Sep 7, 2014
a01c9cf
Update RawFileInput.scala
kmader Sep 16, 2014
932a206
Update RawFileInput.scala
kmader Sep 16, 2014
238c83c
fixed several scala-style issues, changed structure of binaryFiles, r…
kmader Oct 1, 2014
19812a8
Fixed the serialization issue with PortableDataStream since neither C…
kmader Oct 1, 2014
4163e38
fixing line length and output from FSDataInputStream to DataInputStre…
kmader Oct 1, 2014
0588737
filename check in "binary file input as byte array" test now ignores …
kmader Oct 1, 2014
b348ce1
fixed order in check (prefix only appears on jenkins not when I run u…
kmader Oct 1, 2014
c27a8f1
jenkins crashed before running anything last time, so making minor ch…
kmader Oct 1, 2014
49174d9
removed unneeded classes added DeveloperApi note to portabledatastrea…
kmader Oct 2, 2014
a32fef7
removed unneeded classes added DeveloperApi note to portabledatastrea…
kmader Oct 2, 2014
92bda0d
added new tests, renamed files, fixed several of the javaapi function…
kmader Oct 20, 2014
8ac288b
fixed a single slightly over 100 character line
kmader Oct 21, 2014
7b9d181
removing developer API, cleaning up imports
kmader Oct 21, 2014
6379be4
reorganizing code
kmader Oct 21, 2014
359a096
making the final corrections suggested by @mateiz and renaming a few …
kmader Oct 30, 2014
3c49a30
fixing wholetextfileinput to it has the same setMinPartitions functio…
kmader Oct 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -510,6 +510,26 @@ class SparkContext(config: SparkConf) extends Logging {
minPartitions).setName(path)
}

/**
* Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data)
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call this binaryFiles.

Also, please add it to JavaSparkContext, and ideally we'd have a way to add it to Python as well. That one will be trickier -- we probably need to read the file in chunks and pass them to Python. But I think it's important to design the API as part of this change.

val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[ByteInputFormat],
classOf[String],
classOf[Array[Byte]],
updateConf,
minPartitions).setName(path)
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
102 changes: 102 additions & 0 deletions core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.apache.spark.input

import scala.collection.JavaConversions._
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader


/**
* The new (Hadoop 2.0) InputFormat for while binary files (not be to be confused with the recordreader itself)
*/
@serializable abstract class BinaryFileInputFormat[T]
extends CombineFileInputFormat[String,T] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
/**
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
*/
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
}.sum

/** val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong **/
val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
super.setMaxSplitSize(maxSplitSize)
}

def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T]

}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole tiff file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file as a TSliceReader (to keep the size information
*/
@serializable abstract class BinaryRecordReader[T](
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, T] {

private val path = split.getPath(index)
private val fs = path.getFileSystem(context.getConfiguration)

// True means the current file has been processed, then skip it.
private var processed = false

private val key = path.toString
private var value: T = null.asInstanceOf[T]
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

override def getCurrentKey = key

override def getCurrentValue = value

override def nextKeyValue = {
if (!processed) {
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
value = parseByteArray(innerBuffer)
Closeables.close(fileIn, false)

processed = true
true
} else {
false
}
}
def parseByteArray(inArray: Array[Byte]): T
}

/**
* A demo class for extracting just the byte array itself
*/

@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] {
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
{
new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader])
}
}

@serializable class ByteRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends BinaryRecordReader[Array[Byte]](split,context,index) {

def parseByteArray(inArray: Array[Byte]) = inArray
}
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.spark.rdd

/** Allows better control of the partitioning
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems unrelated, why is it up here?

*/
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.InterruptibleIterator
import org.apache.spark.Logging
import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}

import org.apache.spark.input.BinaryFileInputFormat

private[spark] class BinaryFileRDD[T](
sc : SparkContext,
inputFormatClass: Class[_ <: BinaryFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format is slightly wrong here, the constructor args should only be indented with 4 spaces

extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMaxSplitSize(jobContext, minPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually a max split size? It seems you're passing an int that means something else, but I might've misunderstood

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this comment was important too, what is the meaning of this parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this function was named incorrectly, it ultimately calls setMaxSplitSize after calculating the maximum size based on the number of partitions, I have now renamed it accordingly

val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
}