Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c9acde
Initial version of changes to Source trait
frreiss Aug 9, 2016
dae72ff
Changes to files that depend on the Source trait
frreiss Aug 11, 2016
f78b4d5
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 11, 2016
cf426fa
Added method to garbage-collect the metadata log.
frreiss Aug 15, 2016
c028432
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 15, 2016
f92a9a7
Fixing problems with building from Maven.
frreiss Aug 16, 2016
4cd181d
Various bug fixes.
frreiss Aug 19, 2016
fcc90bd
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 19, 2016
35cdae9
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 22, 2016
9096c56
Merge branch 'master' of https://github.com/apache/spark into fred-16…
frreiss Aug 27, 2016
ecaf732
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
5638281
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
43ffbf3
Removed a few blank lines.
frreiss Aug 29, 2016
f5c15f8
Additional whitespace cleanup.
frreiss Aug 29, 2016
a79c557
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 30, 2016
7c6a30d
Narrowing the size of the diff by moving some changes out to future w…
frreiss Aug 31, 2016
5e340c2
Fixed a regression introduced in an earlier merge.
frreiss Sep 8, 2016
128f7fe
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 28, 2016
6334a4b
Fixed compilation problem from merging someone else's PR.
frreiss Sep 28, 2016
09e4b8e
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 29, 2016
aaf0307
Removed a safety check that was invalidated by SPARK-17643 and fixed …
frreiss Sep 29, 2016
947b510
Updating regression tests after merge.
frreiss Sep 30, 2016
ed887ca
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 15, 2016
ec67429
Changes to address review comments.
frreiss Oct 15, 2016
e7ef7ab
Fix compilation problems.
frreiss Oct 15, 2016
7d98c6b
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 19, 2016
c726549
Changes to address review comments.
frreiss Oct 21, 2016
47eee52
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 21, 2016
46f6411
Commit before merge.
frreiss Oct 26, 2016
d9eaf5a
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 26, 2016
0a56e4a
Addressing review comments.
frreiss Oct 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Try

import org.apache.hadoop.fs.Path
Expand All @@ -31,7 +33,17 @@ import org.apache.spark.util.collection.OpenHashSet
/**
* A very simple source that reads text files from the given directory as they appear.
*
* TODO Clean up the metadata files periodically
* Special options that this source can take (via `options`):
* <ul>
* <li><b>maxFilesPerTrigger</b>: The maximum number of files to include in a
* microbatch. If more than this number of files appear at once, an arbitrary
* subset of the files of this size will be used for the next batch.
* Default: No limit.
* <li><b>deleteCommittedFiles</b>: If true, the source will delete old files and
* clean up associated internal metadata when Spark has completed processing
* the data in those files.
* Default: true.
* </ul>
*/
class FileStreamSource(
sparkSession: SparkSession,
Expand All @@ -44,42 +56,45 @@ class FileStreamSource(
private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/**
* ID of the last batch committed and cleaned up, or -1 if no files have been removed.
*/
private var minBatchId = -1L

/**
* ID of the most recent batch that has been entered into the metadata log, or -1 if
* no files have been processed at all.
*/
private var maxBatchId = -1L

/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = getMaxFilesPerBatch()

/**
* Should files whose data has been committed be removed from the directory, along with
* their metadata entries?
*/
private val deleteCommittedFiles = getDeleteCommittedFiles()

private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
}

/**
* Returns the maximum offset that can be retrieved from the source.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
* Initialize the state of this source from the on-disk checkpoint, if present.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}
logTrace(s"Number of new files = ${newFiles.size})")
logTrace(s"Number of files selected for batch = ${batchFiles.size}")
logTrace(s"Number of seen files = ${seenFiles.size}")
if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
private def initialize() = {
maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

new LongOffset(maxBatchId)
minBatchId = maxBatchId
metadataLog.get(None, Some(maxBatchId)).foreach {
case (batchId, files) =>
files.foreach(seenFiles.add)
minBatchId = math.min(minBatchId, batchId)
}
}

initialize()

/**
* For test only. Run `func` with the internal lock to make sure when `func` is running,
* the current offset won't be changed and no new batch will be emitted.
Expand All @@ -93,9 +108,27 @@ class FileStreamSource(
new LongOffset(maxBatchId)
}

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

//////////////////////////////////
// BEGIN methods from Source trait

/**
* Returns the data that is between the offsets (`start`, `end`].
* Returns the highest offset that this source has <b>removed</b> from its internal buffer
* in response to a call to `commit`.
* Returns `None` if this source has not removed any data.
*/
override def getMinOffset: Option[Offset] = {
if (-1L == minBatchId) {
None
} else {
Some(new LongOffset(minBatchId))
}
}

override def getMaxOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)


override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
val endId = end.asInstanceOf[LongOffset].offset
Expand All @@ -104,6 +137,7 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))

val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
val newDataSource =
DataSource(
Expand All @@ -115,6 +149,97 @@ class FileStreamSource(
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
if (end.isInstanceOf[LongOffset]) {
val lastCommittedBatch = end.asInstanceOf[LongOffset].offset

if (deleteCommittedFiles) {
// Build up a list of batches, then process them one by one
val firstCommittedBatch = math.max(minBatchId, 0)

var batchId = 0L;
for (batchId <- firstCommittedBatch to lastCommittedBatch) {
val files = metadataLog.get(batchId)
if (files.isDefined) {

// Files may actually be directories, so use the recursive version
// of the delete method.
// TODO: This delete() should be wrapped in more error handling.
// Examples of problems to catch: Spark does not have permission to delete
// the file; or the filesystem metadata is corrupt.
files.get.foreach(f => fs.delete(new Path(f), true))
}

}

// Clean up metadata for the files we've removed. It's important to do this
// AFTER deleting the files.
metadataLog.trim(minBatchId)
}

minBatchId = lastCommittedBatch

} else {
sys.error(s"FileStreamSource.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
}
}

override def stop(): Unit = {
logTrace(s"Stopping $this")
}

// END methods from Source trait
////////////////////////////////

// All methods that follow are internal to this class.

/**
* Scans the directory and creates batches from any new files found.
*
* Updates and returns the maximum offset that can be retrieved from the source.
* If `maxFilesPerBatch` is set, will consume the first (`maxFilesPerBatch`) files
* in alphabetical order.
*
* Note that this implementation relies on the fact that StreamExecution calls
* getMaxOffset() exactly once per clock tick. Otherwise the logic for
* `maxFilesPerBatch` will be incorrect.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
logTrace(s"Number of new files = ${newFiles.size})")

// To make the source's behavior less nondeterministic, we process files in
// alphabetical order.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) {
newFiles.sorted.take(maxFilesPerBatch.get)
} else {
newFiles
}

// Pretend we didn't see files that aren't in the batch.
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}

if (batchFiles.size > 0) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

new LongOffset(maxBatchId)
}

private def fetchAllFiles(): Seq[String] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
Expand All @@ -132,6 +257,16 @@ class FileStreamSource(
files
}

private def getDeleteCommittedFiles(): Boolean = {
val str = options.getOrElse("deleteCommittedFiles", "true")
try {
str.toBoolean
} catch {
case _ : Throwable => throw new IllegalArgumentException(
s"Invalid value '$str' for option 'deleteCommittedFiles', must be true or false")
}
}

private def getMaxFilesPerBatch(): Option[Int] = {
new CaseInsensitiveMap(options)
.get("maxFilesPerTrigger")
Expand All @@ -142,10 +277,4 @@ class FileStreamSource(
}
}
}

override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

override def stop() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
None
}

override def trim(batchId: Long) : Unit = {
val idsToDelete =
fileManager
.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
.filter( _ <= batchId )

// TODO: Log failed deletions (fileManager.delete currently fails silently)
idsToDelete.foreach(id => fileManager.delete(batchIdToPath(id)))
}

private def createFileManager(): FileManager = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming
* - Allow the user to query the latest batch id.
* - Allow the user to query the metadata object of a specified batch id.
* - Allow the user to query metadata objects in a range of batch ids.
* - Inform the log that it is safe to garbage-collect metadata from a batch
*/
trait MetadataLog[T] {

Expand All @@ -48,4 +49,13 @@ trait MetadataLog[T] {
* Return the latest batch Id and its metadata if exist.
*/
def getLatest(): Option[(Long, T)]


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in my local copy.

/**
* Inform the log that it may discard metadata about all batches up to and including the
* batch with the indicate ID.
*
* @param batchId ID of the highest batch to discard
*/
def trim(batchId: Long)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,42 @@ import org.apache.spark.sql.types.StructType
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] to see if any more data is available.
*/
trait Source {
trait Source {

/** Returns the schema of the data from this source */
def schema: StructType

/** Returns the maximum available offset for this source. */
def getOffset: Option[Offset]
/**
* Returns the highest offset that this source has <b>removed</b> from its internal buffer
* in response to a call to `commit`.
* Returns `None` if this source has not removed any data.
*/
def getMinOffset: Option[Offset]

/**
* Returns the maximum available offset for this source.
* Returns `None` if this source has never received any data.
*/
def getMaxOffset: Option[Offset]

/**
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
* the batch should begin with the first available record. This method must always return the
* same data for a particular `start` and `end` pair.
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`,
* then the batch should begin with the first record. This method must always return the
* same data for a particular `start` and `end` pair; even after the Source has been restarted
* on a different node.
* <p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not need <p> in scaladoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in my local copy.

* Higher layers will always call this method with a value of `start` greater than or equal
* to the last value passed to `commit` and a value of `end` less than or equal to the
* last value returned by `getMaxOffset`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please fix the comment about getMaxOffset as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in my local copy.

*/
def getBatch(start: Option[Offset], end: Offset): DataFrame

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
def commit(end: Offset)

/** Stop this source and free any resources it has allocated. */
def stop(): Unit
}
Loading