Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,12 @@ class HDFSFileCatalog(
refresh()

override def listFiles(filters: Seq[Expression]): Seq[Partition] = {

if (partitionSpec().partitionColumns.isEmpty) {
Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
Partition(
InternalRow.empty,
unpartitionedDataFiles().filterNot(_.getPath.getName startsWith "_")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call allFiles at here?

Copy link
Contributor Author

@tdas tdas May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont know for sure. if there is a partitioning scheme, there is an additional filter on files that start with "_" in listFiles, that does not seem to be present in allFiles. So I am not sure where its best to merge.

Also, I think this way is slightly cleaner than listFiles conditionally depending on allFiles

) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) =>
Expand Down Expand Up @@ -337,7 +341,13 @@ class HDFSFileCatalog(
}
}

def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
unpartitionedDataFiles()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some comments at here?

} else {
leafFiles.values.toSeq
}
}

def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path)

Expand Down Expand Up @@ -387,7 +397,7 @@ class HDFSFileCatalog(
}
}

def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
private def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.keys.toSeq
schema match {
Expand Down Expand Up @@ -443,6 +453,22 @@ class HDFSFileCatalog(
}
}

/** List of files to consider when there is not inferred partitioning scheme */
private def unpartitionedDataFiles(): Seq[FileStatus] = {
// For each of the input paths, get the list of files inside them
paths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)

// If it is a directory (i.e. exists in leafDirToChildrenFiles), return its children files
// Or if it is a file (i.e. exists in leafFiles), return the path itself
leafDirToChildrenFiles.get(qualifiedPath).orElse {
Copy link
Contributor

@yhuai yhuai May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have multiple layers of sub-directories, seems leafDirToChildrenFiles.get(qualifiedPath) will always be false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. This logic is correct.

leafFiles.get(path).map(Array(_))
}.getOrElse(Array.empty)
}
}

def refresh(): Unit = {
val files = listLeafFiles(paths)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.sources

import java.io.File

import scala.util.Random

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -486,7 +488,133 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

test("Hadoop style globbing") {
test("load() - with directory of unpartitioned data in nested subdirs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put it in partition discovery suite? If we put it at here, we will run it with every data source, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well probably it should run with all data sources to make sure no data source violates this rule. this is not really partition discovery but more like what to do when there is not partition to discover.

withTempPath { file =>
val dir = file.getCanonicalPath
val subdir = new File(dir, "subdir").getCanonicalPath

val dataInDir = Seq(1, 2, 3).toDF("value")
val dataInSubdir = Seq(4, 5, 6).toDF("value")

/*

Directory structure to be generated

dir
|
|___ [ files of dataInDir ]
|
|___ subsubdir
|
|___ [ files of dataInSubdir ]
*/

// Generated dataInSubdir, not data in dir
partitionedTestDF1.write
.format(dataSourceName)
.mode(SaveMode.Overwrite)
.save(subdir)

// Inferring schema should throw error as it should not find any file to infer
val e = intercept[AnalysisException] {
sqlContext.read.format(dataSourceName).load(dir)
}
assert(e.getMessage.contains("infer"))

/** Test whether data is read with the given path matches the expected answer */
def testWithPath(path: String, expectedAnswer: Seq[Row]): Unit = {
val df = sqlContext.read
.format(dataSourceName)
.schema(dataInDir.schema) // avoid schema inference for any format
.load(path)
checkAnswer(df, expectedAnswer)
}

// Reading by the path 'file/' *not by 'file/subdir') should give empty results
// as there are no files in 'file' and it should not pick up files in 'file/subdir'
testWithPath(dir, Seq.empty)

dataInDir.write
.format(dataSourceName)
.mode(SaveMode.Overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use overwrite, we will delete subsubdir. Is it what we want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah... i should fix that and test later whether reading both cases dir and subdir works as expected

.save(dir)

// Should give only rows from partitionedTestDF2
testWithPath(dir, dataInDir.collect())
}
}

test("Hadoop style globbing - unpartitioned data") {
withTempPath { file =>

val dir = file.getCanonicalPath
val subdir = new File(dir, "subdir").getCanonicalPath
val subsubdir = new File(subdir, "subsubdir").getCanonicalPath
val anotherSubsubdir =
new File(new File(dir, "another-subdir"), "another-subsubdir").getCanonicalPath

val dataInSubdir = Seq(1, 2, 3).toDF("value")
val dataInSubsubdir = Seq(4, 5, 6).toDF("value")
val dataInAnotherSubsubdir = Seq(7, 8, 9).toDF("value")

dataInSubdir.write
.format (dataSourceName)
.mode (SaveMode.Overwrite)
.save (subdir)

dataInSubsubdir.write
.format (dataSourceName)
.mode (SaveMode.Overwrite)
.save (subsubdir)

dataInAnotherSubsubdir.write
.format (dataSourceName)
.mode (SaveMode.Overwrite)
.save (anotherSubsubdir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe manually check if we are generating the desired dir structure? (I think we will. But, it is good to double check.)

Copy link
Contributor Author

@tdas tdas May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. i get the concern. I didnt realize Overwrite might delete subdirs. Better to add a check the dir structures.


/*

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty lines

Directory structure generated

dir
|
|___ subdir
| |
| |___ [ files of dataInSubdir ]
| |
| |___ subsubdir
| |
| |___ [ files of dataInSubsubdir ]
|
|
|___ anotherSubdir
|
|___ anotherSubsubdir
|
|___ [ files of dataInAnotherSubsubdir ]
*/

val schema = dataInSubdir.schema

/** Test whether data is read with the given path matches the expected answer */
def testWithPath(path: String, expectedDf: DataFrame): Unit = {
val df = sqlContext.read
.format(dataSourceName)
.schema(schema) // avoid schema inference for any format
.load(path)
checkAnswer(df, expectedDf)
}

testWithPath(s"$dir/*/", dataInSubdir)
testWithPath(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir))
testWithPath(s"$dir/another*/*", dataInAnotherSubsubdir)
testWithPath(s"$dir/*/another*", dataInAnotherSubsubdir)
testWithPath(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty lines.

}
}

test("Hadoop style globbing - partitioned data") {
withTempPath { file =>
partitionedTestDF.write
.format(dataSourceName)
Expand Down