Skip to content
Closed
7 changes: 7 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark.
"s3://a/dataset.txt"<br/>
"s3n://a/b/dataset.txt"<br/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd drop s3n & s3 refs as they have gone from deprecated to deceased

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like beyond of this PR: we can address it in separate PR. Could you raise another one?

"s3a://a/b/c/dataset.txt"<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.<br/>
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.util.Locale

import scala.util.Try

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -74,6 +76,30 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)

/**
* The archive directory to move completed files. The option will be only effective when
* "cleanSource" is set to "archive".
*
* Note that the completed file will be moved to this archive directory with respecting to
* its own path.
*
* For example, if the path of source file is "/a/b/dataset.txt", and the path of archive
* directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt".
*/
val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")

/**
* Defines how to clean up completed files. Available options are "archive", "delete", "off".
*/
val cleanSource: CleanSourceMode.Value = {
val matchedMode = CleanSourceMode.fromString(parameters.get("cleanSource"))
if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) {
throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " +
"option.")
}
matchedMode
}

private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
Expand All @@ -86,3 +112,14 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}.getOrElse(default)
}
}

object CleanSourceMode extends Enumeration {
val ARCHIVE, DELETE, OFF = Value

def fromString(value: Option[String]): CleanSourceMode.Value = value.map { v =>
CleanSourceMode.values.find(_.toString == v.toUpperCase(Locale.ROOT))
.getOrElse(throw new IllegalArgumentException(
s"Invalid mode for clean source option $value." +
s" Must be one of ${CleanSourceMode.values.mkString(",")}"))
}.getOrElse(OFF)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.streaming
import java.net.URI
import java.util.concurrent.TimeUnit._

import org.apache.hadoop.fs.{FileStatus, Path}
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -53,6 +55,18 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contain glob patterns
}

private val sourceCleaner: FileStreamSourceCleaner = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't going to ask, but since I have more comments... I think it's better if this were an Option and set to None if the cleaner is off.

Similarly, below, you'll resolve the sourceArchiveDir even if the cleaner is not set to archive, which is not necessary.

(I'm almost suggesting that there should be a separate implementation for delete and for archive to make this, and the code calling it, a bit cleaner.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good suggestion. Will address.

val (archiveFs, qualifiedArchivePath) = sourceOptions.sourceArchiveDir match {
case Some(dir) =>
val path = new Path(dir)
val fs = path.getFileSystem(hadoopConf)
(Some(fs), Some(fs.makeQualified(path)))

case None => (None, None)
}
new FileStreamSourceCleaner(fs, qualifiedBasePath, archiveFs, qualifiedArchivePath)
}

private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
Map("basePath" -> path)
Expand Down Expand Up @@ -237,6 +251,7 @@ class FileStreamSource(
val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}

val endTime = System.nanoTime
val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
if (listingTimeMs > 2000) {
Expand All @@ -258,16 +273,33 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
// No-op for now; FileStreamSource currently garbage-collects files based on timestamp
// and the value of the maxFileAge parameter.
val logOffset = FileStreamSourceOffset(end).logOffset

if (sourceOptions.cleanSource != CleanSourceMode.OFF) {
val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)
val validFileEntities = files.filter(_.batchId == logOffset)
logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
sourceOptions.cleanSource match {
case CleanSourceMode.ARCHIVE =>
validFileEntities.foreach(sourceCleaner.archive)

case CleanSourceMode.DELETE =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the delete operation update the commit to remove the file as well if possible? Asking because if a file gets deleted and then another file with the same name was uploaded, it wont get processed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... not sure how it works with current file stream source. I'd rather keep compatibility with current behavior so I need to check it first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change the file which uploaded with the same name not get processed again. It may cause issues with exactly once...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't touch the mechanism how file source deals with files, which means it doesn't change the behavior. (Please correct me if I'm missing here.) That's why I need to check how current file stream source deals with new file with same name. If the behavior of current master and this patch are same and it doesn't work as we expect/want, addressing it is beyond this PR and better to be filed and addressed separately.

Another thing to note is, we need to also consider that some options like fileNameOnly which make things more complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm seeing #23782 to address this: if #23782 doesn't need to remove the completed files from commit, logically it should be same here. So I'd rather rely on #23782 and have a discussion there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR reading back my comment from yesterday is a bit... well not really clean :)

It may cause issues with exactly once...

references the same thing what you've mentioned on #23782 and agree with you.

Reprocessing files with the same name may cause several issues. What if file being archived and some producer is trying to append it, etc...
I've left a more detailed comment on the mentioned PR. There I've added how streaming dealing with incoming files.

validFileEntities.foreach(sourceCleaner.delete)

case _ =>
}
} else {
// No-op for now; FileStreamSource currently garbage-collects files based on timestamp
// and the value of the maxFileAge parameter.
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also purge metadata here to prevent growing infinitely given that Spark guarantees it will not happen to request offset equal or less than logOffset (I guess it would also apply to getBatch).

}

override def stop(): Unit = {}
}


object FileStreamSource {

/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

Expand Down Expand Up @@ -330,4 +362,77 @@ object FileStreamSource {

def size: Int = map.size()
}

private[sql] class FileStreamSourceCleaner(
fileSystem: FileSystem,
sourcePath: Path,
baseArchiveFileSystem: Option[FileSystem],
baseArchivePath: Option[Path]) extends Logging {
assertParameters()

private def assertParameters(): Unit = {
require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)

baseArchiveFileSystem.foreach { fs =>
require(fileSystem.getUri == fs.getUri, "Base archive path is located on a different " +
s"file system than the source files. source path: $sourcePath" +
s" / base archive path: ${baseArchivePath.get}")
}

baseArchivePath.foreach { path =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

/**
* FileStreamSource reads the files which one of below conditions is met:
* 1) file itself is matched with source path
* 2) parent directory is matched with source path
*
* Checking with glob pattern is costly, so set this requirement to eliminate the cases
* where the archive path can be matched with source path. For example, when file is moved
* to archive directory, destination path will retain input file's path as suffix, so
* destination path can't be matched with source path if archive directory's depth is longer
* than 2, as neither file nor parent directory of destination path can be matched with
* source path.
*/
require(path.depth() > 2, "Base archive path must have a depth of at least 2 " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check says "greater than 2" but the error says "at least 2". Which one is right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the explanation says about 2 "subdirectories", not 2 "depth". / denotes its own depth. I don't think depth is the term end users are familiar with - I'll remove the part "a depth of".

"subdirectories. e.g. '/data/archive'")
}
}

def archive(entry: FileEntry): Unit = {
require(baseArchivePath.isDefined)

val curPath = new Path(new URI(entry.path))
val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm revisiting two issues and not sure there's a viable workaround. Looks like the issue pointed out was ":" isn't a valid char for HDFS but might be a valid char for other filesystems so Path API doesn't restrict it and leads problem. Even HDFS-14762 is closed as "Won't fix".

Would this only occur on Path(parent, child) and Path(pathstr) is safe? Would it work if we manually concat two paths as string and pass to Path's constructor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only occurs in Path(parent, child). I think we can manually concat two paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick feedback! I'll reflect it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it makes people any happier know that "." isn't allowed as the last char in an ABFS filename. Every store has surprises


try {
logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
if (!fileSystem.exists(newPath.getParent)) {
fileSystem.mkdirs(newPath.getParent)
}

logDebug(s"Archiving completed file $curPath to $newPath")
if (!fileSystem.rename(curPath, newPath)) {
logWarning(s"Fail to move $curPath to $newPath / skip moving file.")
}
} catch {
case NonFatal(e) =>
logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
}
}

def delete(entry: FileEntry): Unit = {
val curPath = new Path(new URI(entry.path))
try {
logDebug(s"Removing completed file $curPath")

if (!fileSystem.delete(curPath, false)) {
logWarning(s"Failed to remove $curPath / skip removing file.")
}
} catch {
case NonFatal(e) =>
// Log to error but swallow exception to avoid process being stopped
logWarning(s"Fail to remove $curPath / skip removing file.", e)
}
}
}
}
Loading