From 4a6f903897d28a3038918997e692410259a90ae3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 19 Jun 2020 10:36:52 +0800 Subject: [PATCH 1/4] Reuse completeNextStageWithFetchFailure --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..762b14e170fcc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1796,9 +1796,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1870,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1896,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. From a2280cc446a182b9f57c14f3f174a0b2acffd152 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Nov 2020 10:55:02 +0800 Subject: [PATCH 2/4] Throw an error on window redefinition --- .../sql/catalyst/parser/AstBuilder.scala | 8 +++- .../resources/sql-tests/inputs/window.sql | 14 ++++++- .../sql-tests/results/window.sql.out | 38 ++++++++++++++++++- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 23de8ab09dd0a..909cc02f5f21d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -815,10 +815,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx: WindowClauseContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { // Collect all window specifications defined in the WINDOW clause. - val baseWindowMap = ctx.namedWindow.asScala.map { + val baseWindowTuple = ctx.namedWindow.asScala.map { wCtx => (wCtx.name.getText, typedVisit[WindowSpec](wCtx.windowSpec)) - }.toMap + } + baseWindowTuple.groupBy(_._1).foreach { case (k, v) if v.size > 1 => + throw new ParseException(s"The definition of window '$k' is repetitive", ctx) + } + val baseWindowMap = baseWindowTuple.toMap // Handle cases like // window w1 as (partition by p_mfgr order by p_name diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index f5223af9125f6..f0336d764bdea 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -250,4 +250,16 @@ WINDOW w AS ( ORDER BY salary DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) -ORDER BY department; \ No newline at end of file +ORDER BY department; + +SELECT + employee_name, + salary, + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary +FROM + basic_pays +WINDOW + w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING), + w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 2 FOLLOWING) +ORDER BY salary DESC; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index 1304dcf21d0b3..df2ad96649186 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 35 +-- Number of queries: 36 -- !query @@ -739,4 +739,38 @@ Gerard Hernandez SCM 6949 Larry Bott Pamela Castillo George Vanauf Sales 10563 George Vanauf Steve Patterson Steve Patterson Sales 9441 George Vanauf Steve Patterson Julie Firrelli Sales 9181 George Vanauf Steve Patterson -Foon Yue Tseng Sales 6660 George Vanauf Steve Patterson \ No newline at end of file +Foon Yue Tseng Sales 6660 George Vanauf Steve Patterson + + +-- !query +SELECT + employee_name, + salary, + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary +FROM + basic_pays +WINDOW + w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING), + w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 2 FOLLOWING) +ORDER BY salary DESC +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +The definition of window 'w' is repetitive(line 8, pos 0) + +== SQL == +SELECT + employee_name, + salary, + first_value(employee_name) OVER w highest_salary, + nth_value(employee_name, 2) OVER w second_highest_salary +FROM + basic_pays +WINDOW +^^^ + w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING), + w AS (ORDER BY salary DESC ROWS BETWEEN UNBOUNDED PRECEDING AND 2 FOLLOWING) +ORDER BY salary DESC \ No newline at end of file From 43e85422e28ec23ed057d41755d265a65a532dae Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Nov 2020 11:02:43 +0800 Subject: [PATCH 3/4] Rename variable --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 909cc02f5f21d..04f6542cb0ed3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -815,14 +815,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx: WindowClauseContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { // Collect all window specifications defined in the WINDOW clause. - val baseWindowTuple = ctx.namedWindow.asScala.map { + val baseWindowTuples = ctx.namedWindow.asScala.map { wCtx => (wCtx.name.getText, typedVisit[WindowSpec](wCtx.windowSpec)) } - baseWindowTuple.groupBy(_._1).foreach { case (k, v) if v.size > 1 => + baseWindowTuples.groupBy(_._1).foreach { case (k, v) if v.size > 1 => throw new ParseException(s"The definition of window '$k' is repetitive", ctx) } - val baseWindowMap = baseWindowTuple.toMap + val baseWindowMap = baseWindowTuples.toMap // Handle cases like // window w1 as (partition by p_mfgr order by p_name From 9f29766511aed5728707c5cfc3d46176826d4148 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 26 Nov 2020 15:08:51 +0800 Subject: [PATCH 4/4] Fix issue --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 04f6542cb0ed3..1cf2939b62fca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -819,8 +819,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg wCtx => (wCtx.name.getText, typedVisit[WindowSpec](wCtx.windowSpec)) } - baseWindowTuples.groupBy(_._1).foreach { case (k, v) if v.size > 1 => - throw new ParseException(s"The definition of window '$k' is repetitive", ctx) + baseWindowTuples.groupBy(_._1).foreach { kv => + if (kv._2.size > 1) { + throw new ParseException(s"The definition of window '${kv._1}' is repetitive", ctx) + } } val baseWindowMap = baseWindowTuples.toMap