Skip to content

Commit 66fa6bd

Browse files
jiangxb1987gatorsmile
authored andcommitted
[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary
## What changes were proposed in this pull request? Long values can be passed to `rangeBetween` as range frame boundaries, but we silently convert it to Int values, this can cause wrong results and we should fix this. Further more, we should accept any legal literal values as range frame boundaries. In this PR, we make it possible for Long values, and make accepting other DataTypes really easy to add. This PR is mostly based on Herman's previous amazing work: hvanhovell@596f53c After this been merged, we can close #16818 . ## How was this patch tested? Add new tests in `DataFrameWindowFunctionsSuite` and `TypeCoercionSuite`. Author: Xingbo Jiang <[email protected]> Closes #18540 from jiangxb1987/rangeFrame. (cherry picked from commit 92d8563) Signed-off-by: gatorsmile <[email protected]>
1 parent 24a9bac commit 66fa6bd

17 files changed

Lines changed: 533 additions & 304 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,9 @@ trait CheckAnalysis extends PredicateHelper {
106106
case w @ WindowExpression(AggregateExpression(_, _, true, _), _) =>
107107
failAnalysis(s"Distinct window functions are not supported: $w")
108108

109-
case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order,
110-
SpecifiedWindowFrame(frame,
111-
FrameBoundary(l),
112-
FrameBoundary(h))))
113-
if order.isEmpty || frame != RowFrame || l != h =>
109+
case w @ WindowExpression(_: OffsetWindowFunction,
110+
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
111+
if order.isEmpty || !frame.isOffset =>
114112
failAnalysis("An offset window function can only be evaluated in an ordered " +
115113
s"row-based window frame with a single offset: $w")
116114

@@ -119,15 +117,10 @@ trait CheckAnalysis extends PredicateHelper {
119117
// function.
120118
e match {
121119
case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
120+
w
122121
case _ =>
123122
failAnalysis(s"Expression '$e' not supported within a window function.")
124123
}
125-
// Make sure the window specification is valid.
126-
s.validate match {
127-
case Some(m) =>
128-
failAnalysis(s"Window specification $s is not valid because $m")
129-
case None => w
130-
}
131124

132125
case s @ ScalarSubquery(query, conditions, _) =>
133126
checkAnalysis(query)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ object TypeCoercion {
5858
PropagateTypes ::
5959
ImplicitTypeCasts ::
6060
DateTimeOperations ::
61+
WindowFrameCoercion ::
6162
Nil
6263

6364
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
@@ -785,4 +786,26 @@ object TypeCoercion {
785786
Option(ret)
786787
}
787788
}
789+
790+
/**
791+
* Cast WindowFrame boundaries to the type they operate upon.
792+
*/
793+
object WindowFrameCoercion extends Rule[LogicalPlan] {
794+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
795+
case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
796+
if order.resolved =>
797+
s.copy(frameSpecification = SpecifiedWindowFrame(
798+
RangeFrame,
799+
createBoundaryCast(lower, order.dataType),
800+
createBoundaryCast(upper, order.dataType)))
801+
}
802+
803+
private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = {
804+
boundary match {
805+
case e: SpecialFrameBoundary => e
806+
case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt)
807+
case _ => boundary
808+
}
809+
}
810+
}
788811
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ package object expressions {
7474
def initialize(partitionIndex: Int): Unit = {}
7575
}
7676

77+
/**
78+
* An identity projection. This returns the input row.
79+
*/
80+
object IdentityProjection extends Projection {
81+
override def apply(row: InternalRow): InternalRow = row
82+
}
83+
7784
/**
7885
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
7986
* column of the new row. If the schema of the input row is specified, then the given expression

0 commit comments

Comments
 (0)