Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/sql-data-sources-binaryFile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
layout: global
title: Binary File Data Source
displayTitle: Binary File Data Source
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

Since Spark 3.0, Spark supports binary file data source,
which reads binary files and converts each file into a single record that contains the raw content
and metadata of the file.
It produces a DataFrame with the following columns and possibly partition columns:
* `path`: StringType
* `modificationTime`: TimestampType
* `length`: LongType
* `content`: BinaryType

To read whole binary files, you need to specify the data source `format` as `binaryFile`.
To load files with paths matching a given glob pattern while keeping the behavior of partition discovery,
you can use the general data source option `pathGlobFilter`.
For example, the following code reads all PNG files from the input directory:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

spark.read().format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data");

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data")

{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}

read.df("/path/to/data", source = "binaryFile", pathGlobFilter = "*.png")

{% endhighlight %}
</div>
</div>

Binary file data source does not support writing a DataFrame back to the original files.
21 changes: 21 additions & 0 deletions docs/sql-data-sources-load-save-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,27 @@ To load a CSV file you can use:
</div>
</div>

To load files with paths matching a given glob pattern while keeping the behavior of partition discovery,
you can use:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_path_glob_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example load_with_path_glob_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example load_with_path_glob_filter python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %}
</div>
</div>

The extra options are also used during write operation.
For example, you can control bloom filters and dictionary encodings for ORC data sources.
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ private static void runBasicDataSourceExample(SparkSession spark) {
.option("header", "true")
.load("examples/src/main/resources/people.csv");
// $example off:manual_load_options_csv$
// $example on:load_with_path_glob_filter$
Dataset<Row> partitionedUsersDF = spark.read().format("orc")
.option("pathGlobFilter", "*.orc")
.load("examples/src/main/resources/partitioned_users.orc");
// $example off:load_with_path_glob_filter$
// $example on:manual_save_options_orc$
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
Expand Down
5 changes: 5 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def basic_datasource_example(spark):
format="csv", sep=":", inferSchema="true", header="true")
# $example off:manual_load_options_csv$

# $example on:load_with_path_glob_filter$
df = spark.read.load("examples/src/main/resources/partitioned_users.orc",
format="orc", pathGlobFilter="*.orc")
# $example off:load_with_path_glob_filter$

# $example on:manual_save_options_orc$
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
Expand Down
4 changes: 4 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferS
namesAndAges <- select(df, "name", "age")
# $example off:manual_load_options_csv$

# $example on:load_with_path_glob_filter$
df <- read.df("examples/src/main/resources/partitioned_users.orc", "orc", pathGlobFilter = "*.orc")
# $example off:load_with_path_glob_filter$

# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
do not read this
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ object SQLDataSourceExample {
.option("header", "true")
.load("examples/src/main/resources/people.csv")
// $example off:manual_load_options_csv$
// $example on:load_with_path_glob_filter$
val partitionedUsersDF = spark.read.format("orc")
.option("pathGlobFilter", "*.orc")
.load("examples/src/main/resources/partitioned_users.orc")
// $example off:load_with_path_glob_filter$
// $example on:manual_save_options_orc$
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private[avro] class AvroFileFormat extends FileFormat
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sessionState.newHadoopConf()
if (options.contains("ignoreExtension")) {
logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
val parsedOptions = new AvroOptions(options, conf)

// User can specify an optional avro json schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ class AvroOptions(
* If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension`
* is taken into account. If the former one is not set too, file extensions are ignored.
*/
@deprecated("Use the general data source option pathGlobFilter for filtering file names", "3.0")
val ignoreExtension: Boolean = {
val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
ignoreFilesWithoutExtensionByDefault)

parameters
.get("ignoreExtension")
.get(AvroOptions.ignoreExtensionKey)
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
Expand All @@ -93,4 +94,6 @@ object AvroOptions {
.getOrElse(new Configuration())
new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
}

val ignoreExtensionKey = "ignoreExtension"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {

// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images/partitioned"
private lazy val recursiveImagePath = "../data/mllib/images"

test("image datasource count test") {
val df1 = spark.read.format("image").load(imagePath)
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
self._jreader = self._jreader.option(key, to_str(value))
return self
Expand All @@ -132,6 +135,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand All @@ -357,6 +360,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,14 @@ object SQLConf {
"a SparkConf entry.")
.booleanConf
.createWithDefault(true)

val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")
.doc("The max length of a file that can be read by the binary file data source. " +
"Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
"The theoretical max is Int.MaxValue, though VMs might implement a smaller max.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand Down Expand Up @@ -133,6 +136,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand All @@ -149,6 +155,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ case class FileSourceScanExec(
session.sessionState.newHadoopConf())

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.filter(_.getLen > 0).flatMap { file =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ case class DataSource(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
caseInsensitiveOptions, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.NextIterator
* that need to be prepended to each row.
*
* @param partitionValues value of partition columns to be prepended to each row.
* @param filePath path of the file to read
* @param filePath URI of the file to read
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param locations locality information (list of nodes that have the data).
Expand Down
Loading