From 6d6d1a0b6d04d3e85dfaf6280facd66b52509de1 Mon Sep 17 00:00:00 2001 From: changgyoopark-db Date: Fri, 17 Jan 2025 09:35:10 +0100 Subject: [PATCH 1/3] Add the timezone when casting to timestamp in V2ScanRelationPushDown --- .../datasources/v2/V2ScanRelationPushDown.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 23b2647b62a1..30ead26a6aa9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -324,12 +324,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } } - private def addCastIfNeeded(expression: Expression, expectedDataType: DataType) = - if (expression.dataType == expectedDataType) { - expression + private def addCastIfNeeded(expression: Expression, expectedDataType: DataType) = { + val cast = Cast(expression, expectedDataType) + if (cast.needsTimeZone) { + cast.withTimeZone(conf.sessionLocalTimeZone) } else { - Cast(expression, expectedDataType) + cast } + } def buildScanWithPushedAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { case holder: ScanBuilderHolder if holder.pushedAggregate.isDefined => From 48de8a6821a1fe071d4978708d4a0292e972601c Mon Sep 17 00:00:00 2001 From: changgyoopark-db Date: Mon, 20 Jan 2025 10:15:31 +0100 Subject: [PATCH 2/3] Safeguard to avoid copying the cast if timeZoneId is present --- .../sql/execution/datasources/v2/V2ScanRelationPushDown.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 30ead26a6aa9..ad59723a4437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -326,7 +326,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { private def addCastIfNeeded(expression: Expression, expectedDataType: DataType) = { val cast = Cast(expression, expectedDataType) - if (cast.needsTimeZone) { + if (cast.timeZoneId.isEmpty && cast.needsTimeZone) { cast.withTimeZone(conf.sessionLocalTimeZone) } else { cast From 377c4e3022b483c75df539dc9f9b4dd9622531ee Mon Sep 17 00:00:00 2001 From: changgyoopark-db Date: Mon, 20 Jan 2025 14:58:19 +0100 Subject: [PATCH 3/3] Fix --- .../datasources/v2/V2ScanRelationPushDown.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index ad59723a4437..5f7e86cab524 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -324,14 +324,17 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } } - private def addCastIfNeeded(expression: Expression, expectedDataType: DataType) = { - val cast = Cast(expression, expectedDataType) - if (cast.timeZoneId.isEmpty && cast.needsTimeZone) { - cast.withTimeZone(conf.sessionLocalTimeZone) + private def addCastIfNeeded(expression: Expression, expectedDataType: DataType) = + if (expression.dataType == expectedDataType) { + expression } else { - cast + val cast = Cast(expression, expectedDataType) + if (cast.timeZoneId.isEmpty && cast.needsTimeZone) { + cast.withTimeZone(conf.sessionLocalTimeZone) + } else { + cast + } } - } def buildScanWithPushedAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { case holder: ScanBuilderHolder if holder.pushedAggregate.isDefined =>