Skip to content

Commit dbb8143

Browse files
ryne.yanglinehrr
authored andcommitted
[MINOR][SS][DOC] Added missing config maxFileAge in file streaming source
## What changes were proposed in this pull request? added the missing config for structured streaming when using file source. from the code we have ``` /** * Maximum age of a file that can be found in this directory, before it is ignored. For the * first batch all files will be considered valid. If `latestFirst` is set to `true` and * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the * current system time is 2000, and max age is 200, the system will purge files older than * 800 (rather than 1800) from the internal state. * * Default to a week. */ val maxFileAgeMs: Long = Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d")) ``` which is not documented. also the file processing order was not mentioned but in the code we specifically select the file list based on file mtime: ```scala private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( """'latestFirst' is true. New files will be processed first, which may affect the watermark |value. In addition, 'maxFileAge' will be ignored.""".stripMargin) implicitly[Ordering[Long]].reverse } else { implicitly[Ordering[Long]] } val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } ``` --------- ![Screen Shot 2019-05-07 at 5 55 01 PM](https://user-images.githubusercontent.com/1124115/57335683-5a8b0400-70f1-11e9-98c8-99f173872842.png) --------- ![Screen Shot 2019-05-07 at 5 54 55 PM](https://user-images.githubusercontent.com/1124115/57335684-5a8b0400-70f1-11e9-996a-4bb1639e3d6b.png) Closes #24548 from linehrr/master. Lead-authored-by: ryne.yang <[email protected]> Co-authored-by: Ryne Yang <[email protected]> Co-authored-by: linehrr <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent bcd3b61 commit dbb8143

1 file changed

Lines changed: 3 additions & 2 deletions

File tree

docs/structured-streaming-programming-guide.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
510510
#### Input Sources
511511
There are a few built-in sources.
512512

513-
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
514-
513+
- **File source** - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If `latestFirst` is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
515514
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-0-10-integration.html) for more details.
516515

517516
- **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
@@ -541,6 +540,8 @@ Here are the details of all the sources in Spark.
541540
<br/>
542541
<code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
543542
<br/>
543+
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
544+
<br/>
544545
"file:///dataset.txt"<br/>
545546
"s3://a/dataset.txt"<br/>
546547
"s3n://a/b/dataset.txt"<br/>

0 commit comments

Comments
 (0)