Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions docs/sql-data-sources-binaryFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,50 +28,36 @@ It produces a DataFrame with the following columns and possibly partition column
* `length`: LongType
* `content`: BinaryType

It supports the following read option:
<table class="table">
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
<tr>
<td><code>pathGlobFilter</code></td>
<td>none (accepts all)</td>
<td>
An optional glob pattern to only include files with paths matching the pattern.
The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
It does not change the behavior of partition discovery.
</td>
</tr>
</table>

To read whole binary files, you need to specify the data source `format` as `binaryFile`.
For example, the following code reads all PNG files from the input directory:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the pathGlobFilter option in the example? It is actually important for the use case. Just mention pathGlobFilter is a global option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will revert this.

For example, the following code reads all the files from the input directory:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")
spark.read.format("binaryFile").load("/path/to/data")

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

spark.read().format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data");
spark.read().format("binaryFile").load("/path/to/data");

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")
spark.read.format("binaryFile").load("/path/to/data")

{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}

read.df("/path/to/data", source = "binaryFile", pathGlobFilter = "*.png")
read.df("/path/to/data", source = "binaryFile")

{% endhighlight %}
</div>
Expand Down
22 changes: 22 additions & 0 deletions docs/sql-data-sources-load-save-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,28 @@ To load a CSV file you can use:
</div>
</div>

To load files with paths matching a given glob pattern recursively while keeping the behavior of partition discovery,
you can use:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_path_glob_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example load_with_path_glob_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example load_with_path_glob_filter python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %}

</div>
</div>

The extra options are also used during write operation.
For example, you can control bloom filters and dictionary encodings for ORC data sources.
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ private static void runBasicDataSourceExample(SparkSession spark) {
.option("header", "true")
.load("examples/src/main/resources/people.csv");
// $example off:manual_load_options_csv$
// $example on:load_with_path_glob_filter$
Dataset<Row> partitionedUsersDF = spark.read().format("orc")
.option("pathGlobFilter", "*.orc")
.load("examples/src/main/resources/partitioned_users.orc");
// $example off:load_with_path_glob_filter$
// $example on:manual_save_options_orc$
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
Expand Down
5 changes: 5 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def basic_datasource_example(spark):
format="csv", sep=":", inferSchema="true", header="true")
# $example off:manual_load_options_csv$

# $example on:load_with_path_glob_filter$
df = spark.read.load("examples/src/main/resources/partitioned_users.orc",
format="orc", pathGlobFilter="*.orc")
# $example off:load_with_path_glob_filter$

# $example on:manual_save_options_orc$
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
Expand Down
4 changes: 4 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferS
namesAndAges <- select(df, "name", "age")
# $example off:manual_load_options_csv$

# $example on:load_with_path_glob_filter$
df <- read.df("examples/src/main/resources/partitioned_users.orc", "orc", pathGlobFilter = "*.orc")
# $example off:load_with_path_glob_filter$

# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
do not read this
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ object SQLDataSourceExample {
.option("header", "true")
.load("examples/src/main/resources/people.csv")
// $example off:manual_load_options_csv$
// $example on:load_with_path_glob_filter$
val partitionedUsersDF = spark.read.format("orc")
.option("pathGlobFilter", "*.orc")
.load("examples/src/main/resources/partitioned_users.orc")
// $example off:load_with_path_glob_filter$
// $example on:manual_save_options_orc$
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private[avro] class AvroFileFormat extends FileFormat
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sessionState.newHadoopConf()
if (options.contains("ignoreExtension")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ignoreExtension " -> AvroOptions.ignoreExtensionKey

logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
val parsedOptions = new AvroOptions(options, conf)

// User can specify an optional avro json schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ class AvroOptions(
* If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension`
* is taken into account. If the former one is not set too, file extensions are ignored.
*/
@deprecated("Use the general data source option pathGlobFilter for filtering file names", "3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whom is this deprecation warning to? Spark users don't use ignoreExtension directly. I do think we should print a warning when we read & detect that AvroFileFormat.IgnoreFilesWithoutExtensionProperty and/or AvroOptions.ignoreExtensionKey are set otherwise users will never see the deprecation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only in one case at schema inferring. I would remove this annotation and print warning in initialization of AvroOptions. The deprecation warning is printed only while Spark compilation which is useless for users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove deprecated.

It would be great if we can put that logic into AvroOptions e.g.:

    parameters
      .get(AvroOptions.ignoreExtensionKey)
      .map { v =>
        logWarning(...)
        v.toBoolean
      }.getOrElse(!ignoreFilesWithoutExtension)

However, can you make it doesn't show the logs too many times? If we put there, seems like it will show the same logs multiple times.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can find a better way, please go and open a PR (and some nits I picked below)

val ignoreExtension: Boolean = {
val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
ignoreFilesWithoutExtensionByDefault)

parameters
.get("ignoreExtension")
.get(AvroOptions.ignoreExtensionKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoreExtensionKey -> IGNORE_EXTENTION_KEY to be consistent with other XXXOptions

.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
Expand All @@ -93,4 +94,6 @@ object AvroOptions {
.getOrElse(new Configuration())
new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
}

val ignoreExtensionKey = "ignoreExtension"
}
12 changes: 12 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, actually can we move this documentation to each implementation of CSV, Parquet, ORC, text? It will only work with such internal file based sources.

the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
self._jreader = self._jreader.option(key, to_str(value))
return self
Expand All @@ -132,6 +135,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
Expand Down Expand Up @@ -624,6 +630,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to format
timestamps in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
self._jwrite = self._jwrite.option(key, to_str(value))
return self
Expand All @@ -636,6 +645,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to format
timestamps in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
for k in options:
self._jwrite = self._jwrite.option(k, to_str(options[k]))
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand All @@ -357,6 +360,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand Down Expand Up @@ -769,6 +775,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to format
timestamps in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.
"""
Expand All @@ -783,6 +792,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to format
timestamps in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand Down Expand Up @@ -135,6 +138,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand All @@ -151,6 +157,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ case class DataSource(
sparkSession.sessionState.newHadoopConf(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
caseInsensitiveOptions, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ abstract class PartitioningAwareFileIndex(

protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]

protected lazy val pathGlobFilter = parameters.get("pathGlobFilter").map(new GlobFilter(_))

protected def matchGlobPattern(file: FileStatus): Boolean = {
pathGlobFilter.forall(_.accept(file.getPath))
}

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
def isNonEmptyFile(f: FileStatus): Boolean = {
Expand All @@ -69,7 +75,7 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(isNonEmptyFile)
existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f))

case None =>
// Directory does not exist, or has no children files
Expand All @@ -89,7 +95,7 @@ abstract class PartitioningAwareFileIndex(
override def sizeInBytes: Long = allFiles().map(_.getLen).sum

def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
val files = if (partitionSpec().partitionColumns.isEmpty) {
// For each of the root input paths, get the list of files inside them
rootPaths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles).
Expand Down Expand Up @@ -118,6 +124,7 @@ abstract class PartitioningAwareFileIndex(
} else {
leafFiles.values.toSeq
}
files.filter(matchGlobPattern)
}

protected def inferPartitioning(): PartitionSpec = {
Expand Down
Loading