diff --git a/docs/sql-data-sources-generic-options.md b/docs/sql-data-sources-generic-options.md
index 6bcf48235bced..e6afc25de77d4 100644
--- a/docs/sql-data-sources-generic-options.md
+++ b/docs/sql-data-sources-generic-options.md
@@ -119,3 +119,38 @@ To load all files recursively, you can use:
{% include_example recursive_file_lookup r/RSparkSQLExample.R %}
+
+### Modification Time Path Filters
+`modifiedBefore` and `modifiedAfter` are options that can be
+applied together or separately in order to achieve greater
+granularity over which files may load during a Spark batch query.
+
+* `modifiedBefore`: an optional timestamp to only include files with
+modification times occurring before the specified time. The provided timestamp
+must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+* `modifiedAfter`: an optional timestamp to only include files with
+modification times occurring after the specified time. The provided timestamp
+must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
+When a timezone option is not provided, the timestamps will be interpreted according
+to the Spark session timezone (`spark.sql.session.timeZone`).
+
+To load files with paths matching a given modified time range, you can use:
+
+
+
+{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+
+
+
+{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+
+
+
+{% include_example load_with_modified_time_filter python/sql/datasource.py %}
+
+
+
+{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %}
+
+
\ No newline at end of file
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 2295225387a33..46e740d78bffb 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -147,6 +147,22 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
+ // $example on:load_with_modified_time_filter$
+ Dataset beforeFilterDF = spark.read().format("parquet")
+ // Only load files modified before 7/1/2020 at 05:30
+ .option("modifiedBefore", "2020-07-01T05:30:00")
+ // Only load files modified after 6/1/2020 at 05:30
+ .option("modifiedAfter", "2020-06-01T05:30:00")
+ // Interpret both times above relative to CST timezone
+ .option("timeZone", "CST")
+ .load("examples/src/main/resources/dir1");
+ beforeFilterDF.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // |file1.parquet|
+ // +-------------+
+ // $example off:load_with_modified_time_filter$
}
private static void runBasicDataSourceExample(SparkSession spark) {
diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py
index eecd8c2d84788..8c146ba0c9455 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -67,6 +67,26 @@ def generic_file_source_options_example(spark):
# +-------------+
# $example off:load_with_path_glob_filter$
+ # $example on:load_with_modified_time_filter$
+ # Only load files modified before 07/1/2050 @ 08:30:00
+ df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", modifiedBefore="2050-07-01T08:30:00")
+ df.show()
+ # +-------------+
+ # | file|
+ # +-------------+
+ # |file1.parquet|
+ # +-------------+
+ # Only load files modified after 06/01/2050 @ 08:30:00
+ df = spark.read.load("examples/src/main/resources/dir1",
+ format="parquet", modifiedAfter="2050-06-01T08:30:00")
+ df.show()
+ # +-------------+
+ # | file|
+ # +-------------+
+ # +-------------+
+ # $example off:load_with_modified_time_filter$
+
def basic_datasource_example(spark):
# $example on:generic_load_save_functions$
diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R
index 8685cfb5c05f2..86ad5334248bc 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -144,6 +144,14 @@ df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*
# 1 file1.parquet
# $example off:load_with_path_glob_filter$
+# $example on:load_with_modified_time_filter$
+beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
+# file
+# 1 file1.parquet
+afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
+# file
+# $example off:load_with_modified_time_filter$
+
# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 2c7abfcd335d1..feb7611e1c2bc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -81,6 +81,27 @@ object SQLDataSourceExample {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
+ // $example on:load_with_modified_time_filter$
+ val beforeFilterDF = spark.read.format("parquet")
+ // Files modified before 07/01/2020 at 05:30 are allowed
+ .option("modifiedBefore", "2020-07-01T05:30:00")
+ .load("examples/src/main/resources/dir1");
+ beforeFilterDF.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // |file1.parquet|
+ // +-------------+
+ val afterFilterDF = spark.read.format("parquet")
+ // Files modified after 06/01/2020 at 05:30 are allowed
+ .option("modifiedAfter", "2020-06-01T05:30:00")
+ .load("examples/src/main/resources/dir1");
+ afterFilterDF.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // +-------------+
+ // $example off:load_with_modified_time_filter$
}
private def runBasicDataSourceExample(spark: SparkSession): Unit = {
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 2ed991c87f506..ec0f9578ea904 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -125,6 +125,12 @@ def option(self, key, value):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
+ * ``modifiedBefore``: an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * ``modifiedAfter``: an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
"""
self._jreader = self._jreader.option(key, to_str(value))
return self
@@ -149,6 +155,12 @@ def options(self, **options):
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
+ * ``modifiedBefore``: an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * ``modifiedAfter``: an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
@@ -203,7 +215,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
- recursiveFileLookup=None, allowNonNumericNumbers=None):
+ recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,
+ allowNonNumericNumbers=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
@@ -309,6 +322,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of
`partition discovery `_. # noqa
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
recursiveFileLookup : str or bool, optional
recursively scan a directory for files. Using this option
disables
@@ -344,6 +363,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
allowNonNumericNumbers=allowNonNumericNumbers)
if isinstance(path, str):
path = [path]
@@ -410,6 +430,15 @@ def parquet(self, *paths, **options):
disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
@@ -418,13 +447,18 @@ def parquet(self, *paths, **options):
"""
mergeSchema = options.get('mergeSchema', None)
pathGlobFilter = options.get('pathGlobFilter', None)
+ modifiedBefore = options.get('modifiedBefore', None)
+ modifiedAfter = options.get('modifiedAfter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
- recursiveFileLookup=recursiveFileLookup)
+ recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
+ modifiedAfter=modifiedAfter)
+
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
- recursiveFileLookup=None):
+ recursiveFileLookup=None, modifiedBefore=None,
+ modifiedAfter=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
@@ -453,6 +487,15 @@ def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
recursively scan a directory for files. Using this option disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
@@ -464,7 +507,9 @@ def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
"""
self._set_opts(
wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
- recursiveFileLookup=recursiveFileLookup)
+ recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
+ modifiedAfter=modifiedAfter)
+
if isinstance(paths, str):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@@ -476,7 +521,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
- pathGlobFilter=None, recursiveFileLookup=None):
+ pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -631,6 +676,15 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
recursively scan a directory for files. Using this option disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
@@ -652,7 +706,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
- pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
+ pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
if isinstance(path, str):
path = [path]
if type(path) == list:
@@ -679,7 +734,8 @@ def func(iterator):
else:
raise TypeError("path can be only string, list or RDD")
- def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
+ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
+ modifiedBefore=None, modifiedAfter=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.
.. versionadded:: 1.5.0
@@ -701,6 +757,15 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N
disables
`partition discovery `_. # noqa
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedBefore : an optional timestamp to only include files with
+ modification times occurring before the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ modifiedAfter : an optional timestamp to only include files with
+ modification times occurring after the specified time. The provided timestamp
+ must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+
Examples
--------
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
@@ -708,6 +773,7 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, str):
path = [path]
@@ -765,7 +831,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar
"""
if properties is None:
properties = dict()
- jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
+ jprop = JavaClass("java.util.Properties",
+ self._spark._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
if column is not None:
@@ -777,7 +844,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar
int(numPartitions), jprop))
if predicates is not None:
gateway = self._spark._sc._gateway
- jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
+ jpredicates = utils.toJArray(
+ gateway, gateway.jvm.java.lang.String, predicates)
return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
return self._df(self._jreader.jdbc(url, table, jprop))
@@ -790,6 +858,7 @@ class DataFrameWriter(OptionUtils):
.. versionadded:: 1.4
"""
+
def __init__(self, df):
self._df = df
self._spark = df.sql_ctx
@@ -931,18 +1000,21 @@ def bucketBy(self, numBuckets, col, *cols):
... .saveAsTable('bucketed_table'))
"""
if not isinstance(numBuckets, int):
- raise TypeError("numBuckets should be an int, got {0}.".format(type(numBuckets)))
+ raise TypeError(
+ "numBuckets should be an int, got {0}.".format(type(numBuckets)))
if isinstance(col, (list, tuple)):
if cols:
- raise ValueError("col is a {0} but cols are not empty".format(type(col)))
+ raise ValueError(
+ "col is a {0} but cols are not empty".format(type(col)))
col, cols = col[0], col[1:]
if not all(isinstance(c, str) for c in cols) or not(isinstance(col, str)):
raise TypeError("all names should be `str`")
- self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
+ self._jwrite = self._jwrite.bucketBy(
+ numBuckets, col, _to_seq(self._spark._sc, cols))
return self
def sortBy(self, col, *cols):
@@ -967,7 +1039,8 @@ def sortBy(self, col, *cols):
"""
if isinstance(col, (list, tuple)):
if cols:
- raise ValueError("col is a {0} but cols are not empty".format(type(col)))
+ raise ValueError(
+ "col is a {0} but cols are not empty".format(type(col)))
col, cols = col[0], col[1:]
@@ -1349,7 +1422,8 @@ def jdbc(self, url, table, mode=None, properties=None):
"""
if properties is None:
properties = dict()
- jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
+ jprop = JavaClass("java.util.Properties",
+ self._spark._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
self.mode(mode)._jwrite.jdbc(url, table, jprop)
@@ -1515,7 +1589,8 @@ def _test():
globs['os'] = os
globs['sc'] = sc
globs['spark'] = spark
- globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
+ globs['df'] = spark.read.parquet(
+ 'python/test_support/sql/parquet_partitioned')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index bd986d0138256..48d667f5ea11d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -493,6 +493,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore`: an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter`: an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
* `allowNonNumericNumbers` (default `true`): allows JSON parser to recognize set of
@@ -750,6 +756,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore`: an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter`: an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
@@ -781,6 +793,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore`: an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter`: an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
@@ -814,6 +832,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore`: an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter`: an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
@@ -872,6 +896,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
* It does not change the behavior of partition discovery.
+ * `modifiedBefore`: an optional timestamp to only include files with
+ * modification times occurring before the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ * `modifiedAfter`: an optional timestamp to only include files with
+ * modification times occurring after the specified Time. The provided timestamp
+ * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 5341e22f5e670..2f85efdc94e89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -57,13 +57,10 @@ abstract class PartitioningAwareFileIndex(
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
private val caseInsensitiveMap = CaseInsensitiveMap(parameters)
+ protected val pathFilters = PathFilterFactory.create(caseInsensitiveMap)
- protected lazy val pathGlobFilter: Option[GlobFilter] =
- caseInsensitiveMap.get("pathGlobFilter").map(new GlobFilter(_))
-
- protected def matchGlobPattern(file: FileStatus): Boolean = {
- pathGlobFilter.forall(_.accept(file.getPath))
- }
+ protected def matchPathPattern(file: FileStatus): Boolean =
+ pathFilters.forall(_.accept(file))
protected lazy val recursiveFileLookup: Boolean = {
caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean
@@ -86,7 +83,7 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
- existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f))
+ existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
case None =>
// Directory does not exist, or has no children files
@@ -135,7 +132,7 @@ abstract class PartitioningAwareFileIndex(
} else {
leafFiles.values.toSeq
}
- files.filter(matchGlobPattern)
+ files.filter(matchPathPattern)
}
protected def inferPartitioning(): PartitionSpec = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
new file mode 100644
index 0000000000000..d6028653410cb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/pathFilters.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.hadoop.fs.{FileStatus, GlobFilter}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
+
+trait PathFilterStrategy extends Serializable {
+ def accept(fileStatus: FileStatus): Boolean
+}
+
+trait StrategyBuilder {
+ def strategy: String
+ def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy
+}
+
+class PathGlobFilter(filePatten: String) extends PathFilterStrategy {
+
+ private val globFilter = new GlobFilter(filePatten)
+
+ override def accept(fileStatus: FileStatus): Boolean =
+ globFilter.accept(fileStatus.getPath)
+}
+
+object PathGlobFilter extends StrategyBuilder {
+
+ override def strategy: String = "pathglobfilter"
+
+ override def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy = {
+ new PathGlobFilter(parameters(strategy))
+ }
+}
+
+/**
+ * Provide modifiedAfter and modifiedBefore options when
+ * filtering from a batch-based file data source.
+ *
+ * Example Usages
+ * Load all CSV files modified after date:
+ * {{{
+ * spark.read.format("csv").option("modifiedAfter","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified before date:
+ * {{{
+ * spark.read.format("csv").option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ *
+ * Load all CSV files modified between two dates:
+ * {{{
+ * spark.read.format("csv").option("modifiedAfter","2019-01-15T05:00:00")
+ * .option("modifiedBefore","2020-06-15T05:00:00").load()
+ * }}}
+ */
+abstract class ModifiedDateFilter extends PathFilterStrategy {
+
+ def timeZoneId: String
+
+ protected def localTime(micros: Long): Long =
+ DateTimeUtils.fromUTCTime(micros, timeZoneId)
+}
+
+object ModifiedDateFilter {
+
+ def getTimeZoneId(options: CaseInsensitiveMap[String]): String = {
+ options.getOrElse(
+ DateTimeUtils.TIMEZONE_OPTION.toLowerCase(Locale.ROOT),
+ SQLConf.get.sessionLocalTimeZone)
+ }
+
+ def toThreshold(timeString: String, timeZoneId: String, strategy: String): Long = {
+ val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId)
+ val ts = UTF8String.fromString(timeString)
+ DateTimeUtils.stringToTimestamp(ts, timeZone.toZoneId).getOrElse {
+ throw new AnalysisException(
+ s"The timestamp provided for the '$strategy' option is invalid. The expected format " +
+ s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString")
+ }
+ }
+}
+
+/**
+ * Filter used to determine whether file was modified before the provided timestamp.
+ */
+class ModifiedBeforeFilter(thresholdTime: Long, val timeZoneId: String)
+ extends ModifiedDateFilter {
+
+ override def accept(fileStatus: FileStatus): Boolean =
+ // We standardize on microseconds wherever possible
+ // getModificationTime returns in milliseconds
+ thresholdTime - localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) > 0
+}
+
+object ModifiedBeforeFilter extends StrategyBuilder {
+ import ModifiedDateFilter._
+
+ override val strategy: String = "modifiedbefore"
+
+ override def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy = {
+ val timeZoneId = getTimeZoneId(parameters)
+ val thresholdTime = toThreshold(parameters(strategy), timeZoneId, strategy)
+ new ModifiedBeforeFilter(thresholdTime, timeZoneId)
+ }
+}
+
+/**
+ * Filter used to determine whether file was modified after the provided timestamp.
+ */
+class ModifiedAfterFilter(thresholdTime: Long, val timeZoneId: String)
+ extends ModifiedDateFilter {
+
+ override def accept(fileStatus: FileStatus): Boolean =
+ // getModificationTime returns in milliseconds
+ // We standardize on microseconds wherever possible
+ localTime(DateTimeUtils.millisToMicros(fileStatus.getModificationTime)) - thresholdTime > 0
+}
+
+object ModifiedAfterFilter extends StrategyBuilder {
+ import ModifiedDateFilter._
+
+ override val strategy: String = "modifiedafter"
+
+ override def create(parameters: CaseInsensitiveMap[String]): PathFilterStrategy = {
+ val timeZoneId = getTimeZoneId(parameters)
+ val thresholdTime = toThreshold(parameters(strategy), timeZoneId, strategy)
+ new ModifiedAfterFilter(thresholdTime, timeZoneId)
+ }
+}
+
+object PathFilterFactory {
+
+ private val strategies =
+ Seq(PathGlobFilter, ModifiedBeforeFilter, ModifiedAfterFilter)
+
+ def create(parameters: CaseInsensitiveMap[String]): Seq[PathFilterStrategy] = {
+ strategies.flatMap { s =>
+ parameters.get(s.strategy).map { _ =>
+ s.create(parameters)
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index b27c1145181bd..876f62803dc7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -577,38 +577,6 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
- test("Option pathGlobFilter: filter files correctly") {
- withTempPath { path =>
- val dataDir = path.getCanonicalPath
- Seq("foo").toDS().write.text(dataDir)
- Seq("bar").toDS().write.mode("append").orc(dataDir)
- val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
- checkAnswer(df, Row("foo"))
-
- // Both glob pattern in option and path should be effective to filter files.
- val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc")
- checkAnswer(df2, Seq.empty)
-
- val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt")
- checkAnswer(df3, Row("foo"))
- }
- }
-
- test("Option pathGlobFilter: simple extension filtering should contains partition info") {
- withTempPath { path =>
- val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b")
- input.write.partitionBy("b").text(path.getCanonicalPath)
- Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1")
-
- // If we use glob pattern in the path, the partition column won't be shown in the result.
- val df = spark.read.text(path.getCanonicalPath + "/*/*.txt")
- checkAnswer(df, input.select("a"))
-
- val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath)
- checkAnswer(df2, input)
- }
- }
-
test("Option recursiveFileLookup: recursive loading correctly") {
val expectedFileList = mutable.ListBuffer[String]()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala
new file mode 100644
index 0000000000000..b965a78c9eec0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterStrategySuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterStrategySuite extends QueryTest with SharedSparkSession {
+
+ test("SPARK-31962: PathFilterStrategies - modifiedAfter option") {
+ val options =
+ CaseInsensitiveMap[String](Map("modifiedAfter" -> "2010-10-01T01:01:00"))
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.head.isInstanceOf[ModifiedAfterFilter])
+ assert(strategy.size == 1)
+ }
+
+ test("SPARK-31962: PathFilterStrategies - modifiedBefore option") {
+ val options =
+ CaseInsensitiveMap[String](Map("modifiedBefore" -> "2020-10-01T01:01:00"))
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.head.isInstanceOf[ModifiedBeforeFilter])
+ assert(strategy.size == 1)
+ }
+
+ test("SPARK-31962: PathFilterStrategies - pathGlobFilter option") {
+ val options = CaseInsensitiveMap[String](Map("pathGlobFilter" -> "*.txt"))
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.head.isInstanceOf[PathGlobFilter])
+ assert(strategy.size == 1)
+ }
+
+ test("SPARK-31962: PathFilterStrategies - no options") {
+ val options = CaseInsensitiveMap[String](Map.empty)
+ val strategy = PathFilterFactory.create(options)
+ assert(strategy.isEmpty)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
new file mode 100644
index 0000000000000..7b24b4ea75667
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PathFilterSuite.scala
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.time.{LocalDateTime, ZoneOffset}
+import java.time.format.DateTimeFormatter
+
+import scala.util.Random
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class PathFilterSuite extends QueryTest with SharedSparkSession {
+ import testImplicits._
+
+ def createSingleFile(dir: File): File = {
+ val file = new File(dir, "temp" + Random.nextInt(1000) + ".csv")
+ stringToFile(file, "text")
+ }
+
+ def setFileTime(time: LocalDateTime, file: File): Boolean = {
+ val sameTime = time.toEpochSecond(ZoneOffset.UTC)
+ file.setLastModified(sameTime * 1000)
+ }
+
+ def setPlusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = {
+ val sameTime = time.plusDays(interval).toEpochSecond(ZoneOffset.UTC)
+ file.setLastModified(sameTime * 1000)
+ }
+
+ def setMinusFileTime(time: LocalDateTime, file: File, interval: Long): Boolean = {
+ val sameTime = time.minusDays(interval).toEpochSecond(ZoneOffset.UTC)
+ file.setLastModified(sameTime * 1000)
+ }
+
+ def formatTime(time: LocalDateTime): String = {
+ time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore specified" +
+ " and sharing same timestamp with file last modified time.") {
+ withTempDir { dir =>
+ val file = createSingleFile(dir)
+ val time = LocalDateTime.now(ZoneOffset.UTC)
+ setFileTime(time, file)
+ val formattedTime = formatTime(time)
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedBefore", formattedTime)
+ .option("timeZone", "UTC")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedAfter specified" +
+ " and sharing same timestamp with file last modified time.") {
+ withTempDir { dir =>
+ val file = createSingleFile(dir)
+ val time = LocalDateTime.now()
+ setFileTime(time, file)
+ val formattedTime = formatTime(time)
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", formattedTime)
+ .option("timeZone", "UTC")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+ " share same timestamp with file last modified time.") {
+ withTempDir { dir =>
+ val file = createSingleFile(dir)
+ val time = LocalDateTime.now()
+ setFileTime(time, file)
+ val formattedTime = formatTime(time)
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", formattedTime)
+ .option("modifiedBefore", formattedTime)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+ " share same timestamp with earlier file last modified time.") {
+ withTempDir { dir =>
+ val file = createSingleFile(dir)
+ val time = LocalDateTime.now()
+ setMinusFileTime(time, file, 3)
+
+ val formattedTime = formatTime(time)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", formattedTime)
+ .option("modifiedBefore", formattedTime)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore and modifiedAfter option" +
+ " share same timestamp with later file last modified time.") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val time = LocalDateTime.now()
+ val formattedTime = formatTime(time)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", formattedTime)
+ .option("modifiedBefore", formattedTime)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test("SPARK-31962: when modifiedAfter specified with a past date") {
+ withTempDir { dir =>
+ val file = createSingleFile(dir)
+ file.setLastModified(DateTimeUtils.currentTimestamp())
+ val df = spark.read
+ .option("modifiedAfter", "2019-05-10T01:11:00")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 1)
+ }
+ }
+
+ test("SPARK-31962: when modifiedBefore specified with a future date") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val afterTime = LocalDateTime.now().plusDays(25)
+ val formattedTime = formatTime(afterTime)
+ val df = spark.read
+ .option("modifiedBefore", formattedTime)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 1)
+ }
+ }
+
+ test("SPARK-31962: with modifiedBefore option provided using a past date") {
+ withTempDir { dir =>
+ val file = createSingleFile(dir)
+ file.setLastModified(DateTimeUtils.currentTimestamp())
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedBefore", "1984-05-01T01:00:00")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, one valid") {
+ withTempDir { dir =>
+ val file1 = createSingleFile(dir)
+ val file2 = createSingleFile(dir)
+ file1.setLastModified(DateTimeUtils.currentTimestamp())
+ file2.setLastModified(0)
+
+ val df = spark.read
+ .option("modifiedAfter", "2019-05-10T01:11:00")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 1)
+ }
+ }
+
+ test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, both valid") {
+ withTempDir { dir =>
+ val file1 = createSingleFile(dir)
+ val file2 = createSingleFile(dir)
+ file1.setLastModified(DateTimeUtils.currentTimestamp())
+ file2.setLastModified(DateTimeUtils.currentTimestamp())
+ val df = spark.read
+ .option("modifiedAfter", "2019-05-10T01:11:00")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 2)
+ }
+ }
+
+ test("SPARK-31962: when modifiedAfter specified with a past date, multiple files, none valid") {
+ withTempDir { dir =>
+ val file1 = createSingleFile(dir)
+ val file2 = createSingleFile(dir)
+ file1.setLastModified(0)
+ file2.setLastModified(0)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", "1984-05-01T01:00:00")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore specified with a future date, " +
+ "multiple files, both valid") {
+ withTempDir { dir =>
+ val file1 = createSingleFile(dir)
+ val file2 = createSingleFile(dir)
+ file1.setLastModified(0)
+ file2.setLastModified(0)
+
+ val time = LocalDateTime.now().plusDays(3)
+ val formattedTime = formatTime(time)
+
+ val df = spark.read
+ .option("modifiedBefore", formattedTime)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 2)
+ }
+ }
+
+ test("SPARK-31962: when modifiedBefore specified with a future date, multiple files, one valid") {
+ withTempDir { dir =>
+ val file1 = createSingleFile(dir)
+ val file2 = createSingleFile(dir)
+ file1.setLastModified(0)
+ val time = LocalDateTime.now()
+ setPlusFileTime(time, file2, 3)
+
+ val formattedTime = formatTime(time)
+ val df = spark.read
+ .option("modifiedBefore", formattedTime)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 1)
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore specified with a future date, " +
+ "multiple files, none valid") {
+ withTempDir { dir =>
+ val file1 = createSingleFile(dir)
+ val file2 = createSingleFile(dir)
+
+ val time = LocalDateTime
+ .now()
+ .minusDays(1)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+ file1.setLastModified(DateTimeUtils.currentTimestamp())
+ file2.setLastModified(DateTimeUtils.currentTimestamp())
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedBefore", time)
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedAfter specified with a past date and " +
+ "pathGlobalFilter returning results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val df = spark.read
+ .option("modifiedAfter", "1984-05-10T01:11:00")
+ .option("pathGlobFilter", "*.csv")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 1)
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedAfter specified with past date " +
+ "and pathGlobFilter filtering results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", "1984-05-01T01:00:00")
+ .option("pathGlobFilter", "*.txt")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedAfter specified with future date and " +
+ "pathGlobFilter returning results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+
+ val time = LocalDateTime
+ .now()
+ .plusDays(10)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", time)
+ .option("pathGlobFilter", "*.csv")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedAfter specified with future date and " +
+ "pathGlobFilter filtering results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+
+ val time = LocalDateTime
+ .now()
+ .plusDays(10)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", time)
+ .option("pathGlobFilter", "*.txt")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore and modifiedAfter are specified out of range and " +
+ "pathGlobFilter returning results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+
+ val time = LocalDateTime.now().plusDays(10)
+ val formattedTime = formatTime(time)
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", formattedTime)
+ .option("modifiedBefore", formattedTime)
+ .option("pathGlobFilter", "*.csv")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range and " +
+ "pathGlobFilter returning results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+
+ val beforeTime = LocalDateTime
+ .now()
+ .minusDays(25)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+ val afterTime = LocalDateTime
+ .now()
+ .plusDays(25)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+ val df = spark.read
+ .option("modifiedAfter", beforeTime)
+ .option("modifiedBefore", afterTime)
+ .option("pathGlobFilter", "*.csv")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ assert(df.count() == 1)
+ }
+ }
+
+ test(
+ "SPARK-31962: when modifiedBefore and modifiedAfter are specified in range and " +
+ "pathGlobFilter filtering results") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+
+ val beforeTime = LocalDateTime
+ .now()
+ .minusDays(25)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+ val afterTime = LocalDateTime
+ .now()
+ .plusDays(25)
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
+
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", beforeTime)
+ .option("modifiedBefore", afterTime)
+ .option("pathGlobFilter", "*.txt")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(msg.contains("Unable to infer schema for CSV"))
+ }
+ }
+
+ test("SPARK-31962: when modifiedAfter is specified with an invalid date") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", "2024-05+1 01:00:00")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ Seq("The timestamp provided", "modifiedafter", "2024-05+1 01:00:00").foreach {
+ expectedMsg =>
+ assert(msg.contains(expectedMsg))
+ }
+ }
+ }
+
+ test("SPARK-31962: modifiedBefore - empty option") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedBefore", "")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ assert(
+ msg.contains("The timestamp provided for")
+ && msg.contains("modifiedbefore"))
+ }
+ }
+
+ test("SPARK-31962: modifiedAfter - empty option") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val msg = intercept[AnalysisException] {
+ spark.read
+ .option("modifiedAfter", "")
+ .format("csv")
+ .load(dir.getCanonicalPath)
+ }.getMessage
+ Seq("The timestamp provided", "modifiedafter").foreach { expectedMsg =>
+ assert(msg.contains(expectedMsg))
+ }
+ }
+ }
+
+ test(
+ "SPARK-31962: modifiedAfter filter takes into account local timezone " +
+ "when specified as an option. After UTC.") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val timeZone = DateTimeUtils.getTimeZone("UTC")
+ val strategyTime =
+ ModifiedDateFilter.toThreshold(
+ LocalDateTime.now(timeZone.toZoneId).toString,
+ "HST",
+ "modifiedafter")
+
+ assert(
+ strategyTime - DateTimeUtils
+ .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) > 0)
+ }
+ }
+
+ test(
+ "SPARK-31962: modifiedAfter filter takes into account local timezone " +
+ "when specified as an option. Before UTC.") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+
+ val timeZone = DateTimeUtils.getTimeZone("UTC")
+ val strategyTime =
+ ModifiedDateFilter.toThreshold(
+ LocalDateTime.now(timeZone.toZoneId).toString,
+ "HST",
+ "modifiedafter")
+ assert(
+ DateTimeUtils
+ .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - strategyTime < 0)
+ }
+ }
+
+ test(
+ "SPARK-31962: modifiedBefore filter takes into account local timezone " +
+ "when specified as an option. After UTC.") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val timeZone = DateTimeUtils.getTimeZone("UTC")
+ val strategyTime =
+ ModifiedDateFilter.toThreshold(
+ LocalDateTime.now(timeZone.toZoneId).toString,
+ "CET",
+ "modifiedbefore")
+ assert(
+ DateTimeUtils
+ .getMicroseconds(DateTimeUtils.currentTimestamp(), ZoneOffset.UTC) - strategyTime < 0)
+ }
+ }
+
+ test(
+ "SPARK-31962: modifiedBefore filter takes into account local timezone " +
+ "when specified as an option. Before UTC.") {
+ withTempDir { dir =>
+ createSingleFile(dir)
+ val timeZone = DateTimeUtils.getTimeZone("UTC")
+ val strategyTime =
+ ModifiedDateFilter.toThreshold(
+ LocalDateTime.now(timeZone.toZoneId).toString,
+ "HST",
+ "modifiedbefore")
+ assert(
+ strategyTime - DateTimeUtils.fromUTCTime(DateTimeUtils.currentTimestamp(), "UTC") > 0)
+ }
+ }
+
+ test("Option pathGlobFilter: filter files correctly") {
+ withTempPath { path =>
+ val dataDir = path.getCanonicalPath
+ Seq("foo").toDS().write.text(dataDir)
+ Seq("bar").toDS().write.mode("append").orc(dataDir)
+ val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir)
+ checkAnswer(df, Row("foo"))
+
+ // Both glob pattern in option and path should be effective to filter files.
+ val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc")
+ checkAnswer(df2, Seq.empty)
+
+ val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt")
+ checkAnswer(df3, Row("foo"))
+ }
+ }
+
+ test("Option pathGlobFilter: simple extension filtering should contains partition info") {
+ withTempPath { path =>
+ val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b")
+ input.write.partitionBy("b").text(path.getCanonicalPath)
+ Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1")
+
+ // If we use glob pattern in the path, the partition column won't be shown in the result.
+ val df = spark.read.text(path.getCanonicalPath + "/*/*.txt")
+ checkAnswer(df, input.select("a"))
+
+ val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath)
+ checkAnswer(df2, input)
+ }
+ }
+}