Skip to content

Commit d7499ae

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-31256][SQL] DataFrameNaFunctions.drop should work for nested columns
### What changes were proposed in this pull request? #26700 removed the ability to drop a row whose nested column value is null. For example, for the following `df`: ``` val schema = new StructType() .add("c1", new StructType() .add("c1-1", StringType) .add("c1-2", StringType)) val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null)) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) df.show +--------+ | c1| +--------+ | [, a2]| |[b1, b2]| | null| +--------+ ``` In Spark 2.4.4, ``` df.na.drop("any", Seq("c1.c1-1")).show +--------+ | c1| +--------+ |[b1, b2]| +--------+ ``` In Spark 2.4.5 or Spark 3.0.0-preview2, if nested columns are specified, they are ignored. ``` df.na.drop("any", Seq("c1.c1-1")).show +--------+ | c1| +--------+ | [, a2]| |[b1, b2]| | null| +--------+ ``` ### Why are the changes needed? This seems like a regression. ### Does this PR introduce any user-facing change? Now, the nested column can be specified: ``` df.na.drop("any", Seq("c1.c1-1")).show +--------+ | c1| +--------+ |[b1, b2]| +--------+ ``` Also, if `*` is specified as a column, it will throw an `AnalysisException` that `*` cannot be resolved, which was the behavior in 2.4.4. Currently, in master, it has no effect. ### How was this patch tested? Updated existing tests. Closes #28266 from imback82/SPARK-31256. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent bc212df commit d7499ae

2 files changed

Lines changed: 35 additions & 25 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
8989
* @since 1.3.1
9090
*/
9191
def drop(how: String, cols: Seq[String]): DataFrame = {
92-
drop0(how, toAttributes(cols))
92+
drop0(how, cols.map(df.resolve(_)))
9393
}
9494

9595
/**
@@ -115,7 +115,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
115115
* @since 1.3.1
116116
*/
117117
def drop(minNonNulls: Int, cols: Seq[String]): DataFrame = {
118-
drop0(minNonNulls, toAttributes(cols))
118+
drop0(minNonNulls, cols.map(df.resolve(_)))
119119
}
120120

121121
/**
@@ -480,20 +480,18 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
480480
df.queryExecution.analyzed.output
481481
}
482482

483-
private def drop0(how: String, cols: Seq[Attribute]): DataFrame = {
483+
private def drop0(how: String, cols: Seq[NamedExpression]): DataFrame = {
484484
how.toLowerCase(Locale.ROOT) match {
485485
case "any" => drop0(cols.size, cols)
486486
case "all" => drop0(1, cols)
487487
case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' or 'all'")
488488
}
489489
}
490490

491-
private def drop0(minNonNulls: Int, cols: Seq[Attribute]): DataFrame = {
491+
private def drop0(minNonNulls: Int, cols: Seq[NamedExpression]): DataFrame = {
492492
// Filtering condition:
493493
// only keep the row if it has at least `minNonNulls` non-null and non-NaN values.
494-
val predicate = AtLeastNNonNulls(
495-
minNonNulls,
496-
outputAttributes.filter{ col => cols.exists(_.semanticEquals(col)) })
494+
val predicate = AtLeastNNonNulls(minNonNulls, cols)
497495
df.filter(Column(predicate))
498496
}
499497

sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSparkSession {
4545
).toDF("int", "long", "short", "byte", "float", "double")
4646
}
4747

48+
def createDFWithNestedColumns: DataFrame = {
49+
val schema = new StructType()
50+
.add("c1", new StructType()
51+
.add("c1-1", StringType)
52+
.add("c1-2", StringType))
53+
val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
54+
spark.createDataFrame(
55+
spark.sparkContext.parallelize(data), schema)
56+
}
57+
4858
test("drop") {
4959
val input = createDF()
5060
val rows = input.collect()
@@ -275,33 +285,35 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSparkSession {
275285
assert(message.contains("Reference 'f2' is ambiguous"))
276286
}
277287

278-
test("fill/drop with col(*)") {
288+
test("fill with col(*)") {
279289
val df = createDF()
280290
// If columns are specified with "*", they are ignored.
281291
checkAnswer(df.na.fill("new name", Seq("*")), df.collect())
282-
checkAnswer(df.na.drop("any", Seq("*")), df.collect())
283292
}
284293

285-
test("fill/drop with nested columns") {
286-
val schema = new StructType()
287-
.add("c1", new StructType()
288-
.add("c1-1", StringType)
289-
.add("c1-2", StringType))
294+
test("drop with col(*)") {
295+
val df = createDF()
296+
val exception = intercept[AnalysisException] {
297+
df.na.drop("any", Seq("*"))
298+
}
299+
assert(exception.getMessage.contains("Cannot resolve column name \"*\""))
300+
}
290301

291-
val data = Seq(
292-
Row(Row(null, "a2")),
293-
Row(Row("b1", "b2")),
294-
Row(null))
302+
test("fill with nested columns") {
303+
val df = createDFWithNestedColumns
295304

296-
val df = spark.createDataFrame(
297-
spark.sparkContext.parallelize(data), schema)
305+
// Nested columns are ignored for fill().
306+
checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), df)
307+
}
298308

299-
checkAnswer(df.select("c1.c1-1"),
300-
Row(null) :: Row("b1") :: Row(null) :: Nil)
309+
test("drop with nested columns") {
310+
val df = createDFWithNestedColumns
301311

302-
// Nested columns are ignored for fill() and drop().
303-
checkAnswer(df.na.fill("a1", Seq("c1.c1-1")), data)
304-
checkAnswer(df.na.drop("any", Seq("c1.c1-1")), data)
312+
// Rows with the specified nested columns whose null values are dropped.
313+
assert(df.count == 3)
314+
checkAnswer(
315+
df.na.drop("any", Seq("c1.c1-1")),
316+
Seq(Row(Row("b1", "b2"))))
305317
}
306318

307319
test("replace") {

0 commit comments

Comments
 (0)