-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource #24830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
1003e5d
784e63b
d5090b0
5251ac7
0072083
6f2e75a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex( | |
| pathGlobFilter.forall(_.accept(file.getPath)) | ||
| } | ||
|
|
||
| protected lazy val recursiveFileLookup = { | ||
| parameters.getOrElse("recursiveFileLookup", "false").toBoolean | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we document the option in
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ngone51 Could you submit a follow-up PR to document this? This affects all the built-in file sources. We need to update the documentation of both PySpark and Scala APIs.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, there is a Jira about adding this documentation which you will want to reference: SPARK-29903
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nchammas Could you submit a PR to fix readwriter.py for supporting this new option?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do. I suppose we'll do that separately from adding the docs, which will get their own PR, correct?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guys, we should also update
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I'll submit a PR to document it. @gatorsmile
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| override def listFiles( | ||
| partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { | ||
| def isNonEmptyFile(f: FileStatus): Boolean = { | ||
|
|
@@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex( | |
| val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { | ||
| PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil | ||
| } else { | ||
| if (recursiveFileLookup) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This branch seems not reachable. Should we simply use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, it is reachable I think. |
||
| throw new IllegalArgumentException( | ||
| "Datasource with partition do not allow recursive file loading.") | ||
| } | ||
| prunePartitions(partitionFilters, partitionSpec()).map { | ||
| case PartitionPath(values, path) => | ||
| val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { | ||
|
|
@@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex( | |
| override def sizeInBytes: Long = allFiles().map(_.getLen).sum | ||
|
|
||
| def allFiles(): Seq[FileStatus] = { | ||
| val files = if (partitionSpec().partitionColumns.isEmpty) { | ||
| val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) { | ||
| // For each of the root input paths, get the list of files inside them | ||
| rootPaths.flatMap { path => | ||
| // Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles). | ||
|
|
@@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex( | |
| } | ||
|
|
||
| protected def inferPartitioning(): PartitionSpec = { | ||
| // We use leaf dirs containing data files to discover the schema. | ||
| val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => | ||
| files.exists(f => isDataPath(f.getPath)) | ||
| }.keys.toSeq | ||
|
|
||
| val caseInsensitiveOptions = CaseInsensitiveMap(parameters) | ||
| val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) | ||
| .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) | ||
|
|
||
| PartitioningUtils.parsePartitions( | ||
| leafDirs, | ||
| typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, | ||
| basePaths = basePaths, | ||
| userSpecifiedSchema = userSpecifiedSchema, | ||
| caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis, | ||
| validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns, | ||
| timeZoneId = timeZoneId) | ||
| if (recursiveFileLookup) { | ||
| PartitionSpec.emptySpec | ||
| } else { | ||
| // We use leaf dirs containing data files to discover the schema. | ||
| val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => | ||
| files.exists(f => isDataPath(f.getPath)) | ||
| }.keys.toSeq | ||
|
|
||
| val caseInsensitiveOptions = CaseInsensitiveMap(parameters) | ||
| val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) | ||
| .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) | ||
|
|
||
| PartitioningUtils.parsePartitions( | ||
| leafDirs, | ||
| typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, | ||
| basePaths = basePaths, | ||
| userSpecifiedSchema = userSpecifiedSchema, | ||
| caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis, | ||
| validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns, | ||
| timeZoneId = timeZoneId) | ||
| } | ||
| } | ||
|
|
||
| private def prunePartitions( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.