-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34638][SQL] Single field nested column prune on generator output #31966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5221be3
4758f96
7908788
fe286df
72e3e13
df5c44d
6c9d839
ad3d191
8d4309a
a719409
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -231,6 +231,27 @@ object NestedColumnAliasing { | |
| * of it. | ||
| */ | ||
| object GeneratorNestedColumnAliasing { | ||
| // Partitions `attrToAliases` based on whether the attribute is in Generator's output. | ||
| private def aliasesOnGeneratorOutput( | ||
| attrToAliases: Map[ExprId, Seq[Alias]], | ||
| generatorOutput: Seq[Attribute]) = { | ||
| val generatorOutputExprId = generatorOutput.map(_.exprId) | ||
| attrToAliases.partition { k => | ||
| generatorOutputExprId.contains(k._1) | ||
| } | ||
| } | ||
|
|
||
| // Partitions `nestedFieldToAlias` based on whether the attribute of nested field extractor | ||
| // is in Generator's output. | ||
| private def nestedFieldOnGeneratorOutput( | ||
| nestedFieldToAlias: Map[ExtractValue, Alias], | ||
| generatorOutput: Seq[Attribute]) = { | ||
| val generatorOutputSet = AttributeSet(generatorOutput) | ||
| nestedFieldToAlias.partition { pair => | ||
| pair._1.references.subsetOf(generatorOutputSet) | ||
| } | ||
| } | ||
|
|
||
| def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { | ||
| // Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we | ||
| // need to prune nested columns through Project and under Generate. The difference is | ||
|
|
@@ -241,12 +262,69 @@ object GeneratorNestedColumnAliasing { | |
| // On top on `Generate`, a `Project` that might have nested column accessors. | ||
| // We try to get alias maps for both project list and generator's children expressions. | ||
| val exprsToPrune = projectList ++ g.generator.children | ||
| NestedColumnAliasing.getAliasSubMap(exprsToPrune, g.qualifiedGeneratorOutput).map { | ||
| NestedColumnAliasing.getAliasSubMap(exprsToPrune).map { | ||
| case (nestedFieldToAlias, attrToAliases) => | ||
| // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`. | ||
|
||
| val newChild = | ||
| NestedColumnAliasing.replaceWithAliases(g, nestedFieldToAlias, attrToAliases) | ||
| Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild) | ||
|
|
||
| val (nestedFieldsOnGenerator, nestedFieldsNotOnGenerator) = | ||
| nestedFieldOnGeneratorOutput(nestedFieldToAlias, g.qualifiedGeneratorOutput) | ||
| val (attrToAliasesOnGenerator, attrToAliasesNotOnGenerator) = | ||
| aliasesOnGeneratorOutput(attrToAliases, g.qualifiedGeneratorOutput) | ||
|
|
||
| // Push nested column accessors through `Generator`. We cannot prune on `Generator`'s | ||
| // output. | ||
| val newChild = NestedColumnAliasing.replaceWithAliases(g, | ||
| nestedFieldsNotOnGenerator, attrToAliasesNotOnGenerator) | ||
| val pushedThrough = Project(NestedColumnAliasing | ||
| .getNewProjectList(projectList, nestedFieldsNotOnGenerator), newChild) | ||
|
|
||
| // Pruning on `Generator`'s output. We only process single field case. | ||
| // For multiple field case, we cannot directly move field extractor into | ||
| // the generator expression. A workaround is to re-construct array of struct | ||
| // from multiple fields. But it will be more complicated and may not worth. | ||
| if (nestedFieldsOnGenerator.size == 1) { | ||
dongjoon-hyun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Only one nested column accessor. | ||
| // E.g., df.select(explode($"items").as("item")).select($"item.a") | ||
| pushedThrough match { | ||
| case p @ Project(_, newG: Generate) => | ||
| // Replace the child expression of `ExplodeBase` generator with | ||
| // nested column accessor. | ||
| // E.g., df.select(explode($"items").as("item")) => | ||
| // df.select(explode($"items.a").as("item")) | ||
dongjoon-hyun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val rewrittenG = newG.transformExpressions { | ||
| case e: ExplodeBase => | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val extractor = nestedFieldsOnGenerator.head._1.transformUp { | ||
| case _: Attribute => | ||
| e.child | ||
| case g: GetStructField => | ||
| ExtractValue(g.child, Literal(g.extractFieldName), SQLConf.get.resolver) | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| e.withNewChildren(Seq(extractor)) | ||
| } | ||
|
|
||
| // As we change the child of the generator, its output data type must be updated. | ||
| val updatedGeneratorOutput = rewrittenG.generatorOutput | ||
| .zip(rewrittenG.generator.elementSchema.toAttributes) | ||
|
||
| .map { case (oldAttr, newAttr) => | ||
| newAttr.withExprId(oldAttr.exprId).withName(oldAttr.name) | ||
| } | ||
| assert(updatedGeneratorOutput.length == rewrittenG.generatorOutput.length, | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "Updated generator output must have same length as original generator output.") | ||
|
||
| val updatedGenerate = rewrittenG.copy(generatorOutput = updatedGeneratorOutput) | ||
|
|
||
| // Replace nested column accessor with generator output. | ||
| p.withNewChildren(Seq(updatedGenerate)).transformExpressions { | ||
| case f: ExtractValue if nestedFieldsOnGenerator.contains(f) => | ||
| updatedGenerate.output | ||
| .find(a => attrToAliasesOnGenerator.contains(a.exprId)) | ||
| .getOrElse(f) | ||
| } | ||
|
|
||
| case _ => pushedThrough | ||
dongjoon-hyun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } else { | ||
dongjoon-hyun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pushedThrough | ||
| } | ||
| } | ||
|
|
||
| case g: Generate if SQLConf.get.nestedSchemaPruningEnabled && | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.