Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,31 @@ abstract class PartitioningAwareFileCatalog(
}
}

override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
override def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
// For each of the input paths, get the list of files inside them
paths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)

// There are three cases possible with each path
// 1. The path is a directory and has children files in it. Then it must be present in
// leafDirToChildrenFiles as those children files will have been found as leaf files.
// Find its children files from leafDirToChildrenFiles and include them.
// 2. The path is a file, then it will be present in leafFiles. Include this path.
// 3. The path is a directory, but has no children files. Do not include this path.

leafDirToChildrenFiles.get(qualifiedPath)
.orElse {
leafFiles.get(path).map(Array(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use qualifiedPath instead of path?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leafFiles contain all qualified paths, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. I forgot to address that comment. Sorry!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we add a test to make sure all paths in leafFiles contain the scheme?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new FileCatalogSuite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we still need to update this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we add a test that will not pass if we use path at here?

}
.getOrElse(Array.empty)
}
} else {
leafFiles.values.toSeq
}
}

protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.File
import java.util.UUID

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
AddParquetFileData(seq.toDS().toDF(), src, tmp)
}

/** Write parquet files in a temp dir, and move the individual files to the 'src' dir */
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val file = Utils.tempFileWith(new File(tmp, "parquet"))
df.write.parquet(file.getCanonicalPath)
file.renameTo(new File(src, file.getName))
val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
df.write.parquet(tmpDir.getCanonicalPath)
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
}
}

Expand Down Expand Up @@ -210,8 +214,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {

test("FileStreamSource schema: parquet, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF()
.write.parquet(new File(src, "1").getCanonicalPath)
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.parquet(src.getCanonicalPath)
val schema = createFileStreamSourceAndGetSchema(
format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
assert(schema === new StructType().add("value", StringType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.sources

import java.io.File

import scala.util.Random

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -486,7 +488,151 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

test("Hadoop style globbing") {
test("load() - with directory of unpartitioned data in nested subdirs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put it in partition discovery suite? If we put it at here, we will run it with every data source, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well probably it should run with all data sources to make sure no data source violates this rule. this is not really partition discovery but more like what to do when there is not partition to discover.

withTempPath { dir =>
val subdir = new File(dir, "subdir")

val dataInDir = Seq(1, 2, 3).toDF("value")
val dataInSubdir = Seq(4, 5, 6).toDF("value")

/*

Directory structure to be generated

dir
|
|___ [ files of dataInDir ]
|
|___ subsubdir
|
|___ [ files of dataInSubdir ]
*/

// Generated dataInSubdir, not data in dir
dataInSubdir.write
.format(dataSourceName)
.mode(SaveMode.Overwrite)
.save(subdir.getCanonicalPath)

// Inferring schema should throw error as it should not find any file to infer
val e = intercept[Exception] {
sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath)
}

e match {
case _: AnalysisException =>
assert(e.getMessage.contains("infer"))
Copy link
Contributor

@yhuai yhuai May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where will this error be thrown (the place where we complain that there is no file)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


case _: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") =>
// ignore, the source format requires schema to be provided by user
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will we reach here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TheSimpleTextSource, used for testing purposes only, does not support schema inference, it expects the option dataSchema to be present. Without this case, SimpleTextHadoopFsRelationSuite fails this test with

org.scalatest.exceptions.TestFailedException: Expected exception org.apache.spark.sql.AnalysisException to be thrown, but java.util.NoSuchElementException was thrown.

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: Expected exception org.apache.spark.sql.AnalysisException to be thrown, but java.util.NoSuchElementException was thrown.
    at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:496)
    at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
    at org.scalatest.Assertions$class.intercept(Assertions.scala:1004)
    at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
    at org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$23$$anonfun$apply$mcV$sp$36.apply(HadoopFsRelationTest.scala:519)
    at org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$23$$anonfun$apply$mcV$sp$36.apply(HadoopFsRelationTest.scala:492)
    at org.apache.spark.sql.test.SQLTestUtils$class.withTempPath(SQLTestUtils.scala:114)
    at org.apache.spark.sql.sources.HadoopFsRelationTest.withTempPath(HadoopFsRelationTest.scala:39)
    at org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$23.apply$mcV$sp(HadoopFsRelationTest.scala:492)
    at org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$23.apply(HadoopFsRelationTest.scala:492)
    at org.apache.spark.sql.sources.HadoopFsRelationTest$$anonfun$23.apply(HadoopFsRelationTest.scala:492)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. How about add the reason at here as a comment?


case _ =>
fail("Unexpected error trying to infer schema from empty dir", e)
}

/** Test whether data is read with the given path matches the expected answer */
def testWithPath(path: File, expectedAnswer: Seq[Row]): Unit = {
val df = sqlContext.read
.format(dataSourceName)
.schema(dataInDir.schema) // avoid schema inference for any format
.load(path.getCanonicalPath)
checkAnswer(df, expectedAnswer)
}

// Verify that reading by path 'dir/' gives empty results as there are no files in 'file'
// and it should not pick up files in 'dir/subdir'
require(subdir.exists)
require(subdir.listFiles().exists(!_.isDirectory))
testWithPath(dir, Seq.empty)

// Verify that if there is data in dir, then reading by path 'dir/' reads only dataInDir
dataInDir.write
.format(dataSourceName)
.mode(SaveMode.Append) // append to prevent subdir from being deleted
.save(dir.getCanonicalPath)
require(dir.listFiles().exists(!_.isDirectory))
require(subdir.exists())
require(subdir.listFiles().exists(!_.isDirectory))
testWithPath(dir, dataInDir.collect())
}
}

test("Hadoop style globbing - unpartitioned data") {
withTempPath { file =>

val dir = file.getCanonicalPath
val subdir = new File(dir, "subdir")
val subsubdir = new File(subdir, "subsubdir")
val anotherSubsubdir =
new File(new File(dir, "another-subdir"), "another-subsubdir")

val dataInSubdir = Seq(1, 2, 3).toDF("value")
val dataInSubsubdir = Seq(4, 5, 6).toDF("value")
val dataInAnotherSubsubdir = Seq(7, 8, 9).toDF("value")

dataInSubdir.write
.format (dataSourceName)
.mode (SaveMode.Overwrite)
.save (subdir.getCanonicalPath)

dataInSubsubdir.write
.format (dataSourceName)
.mode (SaveMode.Overwrite)
.save (subsubdir.getCanonicalPath)

dataInAnotherSubsubdir.write
.format (dataSourceName)
.mode (SaveMode.Overwrite)
.save (anotherSubsubdir.getCanonicalPath)

require(subdir.exists)
require(subdir.listFiles().exists(!_.isDirectory))
require(subsubdir.exists)
require(subsubdir.listFiles().exists(!_.isDirectory))
require(anotherSubsubdir.exists)
require(anotherSubsubdir.listFiles().exists(!_.isDirectory))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty lines

/*
Directory structure generated

dir
|
|___ subdir
| |
| |___ [ files of dataInSubdir ]
| |
| |___ subsubdir
| |
| |___ [ files of dataInSubsubdir ]
|
|
|___ anotherSubdir
|
|___ anotherSubsubdir
|
|___ [ files of dataInAnotherSubsubdir ]
*/

val schema = dataInSubdir.schema

/** Test whether data is read with the given path matches the expected answer */
def testWithPath(path: String, expectedDf: DataFrame): Unit = {
val df = sqlContext.read
.format(dataSourceName)
.schema(schema) // avoid schema inference for any format
.load(path)
checkAnswer(df, expectedDf)
}

testWithPath(s"$dir/*/", dataInSubdir)
testWithPath(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir))
testWithPath(s"$dir/another*/*", dataInAnotherSubsubdir)
testWithPath(s"$dir/*/another*", dataInAnotherSubsubdir)
testWithPath(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir))
}
}

test("Hadoop style globbing - partitioned data") {
withTempPath { file =>
partitionedTestDF.write
.format(dataSourceName)
Expand Down