Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,20 @@ object NestedColumnAliasing {
plan: LogicalPlan,
nestedFieldToAlias: Map[Expression, Alias],
attrToAliases: AttributeMap[Seq[Alias]]): LogicalPlan = {
plan.withNewChildren(plan.children.map { plan =>
Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan)
}).transformExpressions {
val newChildPlan = plan match {
case g: Generate =>
g.withNewChildren(g.children.map { childPlan =>
val origOutput = childPlan.output
val fromAlias = childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Nil))
Comment on lines +190 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we intend to replace some attributes with its nested fields if they are accessed on top of the plan. So we can prune unused fields later.

If we keep original outputs, I think pruning will not work actually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to add a pruning test case to make sure pruning still works.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, this change does not address the real issue. The real issue is, that Generate contains a list unrequiredChildIndex of child output indices, that are not needed in the Generate output. This list has to be adjusted to fit the inserted Project node of NestedColumnAliasing. Here it just fits accidentally, because the original list is included completely at the beginning of the new Project node. But this may include unnecessary outputs, that ColumnPruning is trying to avoid. I have a different proposal, that adjust the list of indices to point to the new positions after attribute aliasing: #49061

Project(origOutput ++ fromAlias, childPlan)
})
case p =>
p.withNewChildren(p.children.map { childPlan =>
Project(childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), childPlan)
})
}

newChildPlan.transformExpressions {
case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) =>
nestedFieldToAlias(f.canonicalized).toAttribute
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,48 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-39854: replaceWithAliases should keep the order of Generate children") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for providing the end-to-end test. Can we have a test case in NestedColumnAliasingSuite.scala because we are touching NestedColumnAliasing?

Copy link
Author

@jiaji-wu jiaji-wu Sep 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dongjoon-hyun , thanks for taking time looking into this!

The test is put in the core module (instead of the catalyst module) because it has dependency on things like SparkSession, sql.functions, which is only available there -- not sure where is the best place to put it, or are there better ways to re-write the test (my knowledge on that part is limited 😓).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add an end-to-end test, the Apache Spark test time increases prohibitively. We prefer to narrow down the issue and have isolated unit tests. So, in this PR, NestedColumnAliasingSuite.scala is the best place to have a test coverage. In short, I don't think we need a heavy end-to-end test case like this. We need to have a test case similar to the other in NestedColumnAliasingSuite.scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. will try to create a test case in NestedColumnAliasingSuite.scala instead.

import org.apache.spark.sql.functions.{explode, struct}
import org.apache.spark.sql.SparkSession
Comment on lines +148 to +149
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually put imports at the beginning.

val ss: SparkSession = spark
import ss.implicits._
val testJson =
"""{
| "b": {
| "id": "id00",
| "data": [{
| "b1": "vb1",
| "b2": 101,
| "ex2": [
| { "fb1": false, "fb2": 11, "fb3": "t1" },
| { "fb1": true, "fb2": 12, "fb3": "t2" }
| ]}, {
| "b1": "vb2",
| "b2": 102,
| "ex2": [
| { "fb1": false, "fb2": 13, "fb3": "t3" },
| { "fb1": true, "fb2": 14, "fb3": "t4" }
| ]}
| ],
| "fa": "tes",
| "v": "1.5"
| }
|}
|""".stripMargin
val df = spark.read.json((testJson :: Nil).toDS())
.withColumn("ex_b", explode($"b.data.ex2"))
.withColumn("ex_b2", explode($"ex_b"))
val df1 = df
.withColumn("rt", struct(
$"b.fa".alias("rt_fa"),
$"b.v".alias("rt_v")
))
.drop("b", "ex_b")

val result = df1.collect()
assert(result.length == 4)
}
}

case class ColumnarOp(child: SparkPlan) extends UnaryExecNode {
Expand Down