Skip to content

Commit 4267e1c

Browse files
belieferchenzhx
authored andcommitted
[SPARK-39159][SQL] Add new Dataset API for Offset
### What changes were proposed in this pull request? Currently, Spark added `Offset` operator. This PR try to add `offset` API into `Dataset`. ### Why are the changes needed? `offset` API is very useful and construct test case more easily. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes apache#36519 from beliefer/SPARK-39159. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent cc592b9 commit 4267e1c

File tree

4 files changed

+34
-14
lines changed

4 files changed

+34
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -404,13 +404,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
404404

405405
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
406406

407-
case o if !o.isInstanceOf[GlobalLimit] && !o.isInstanceOf[LocalLimit]
408-
&& o.children.exists(_.isInstanceOf[Offset]) =>
409-
failAnalysis(
410-
s"""
411-
|The OFFSET clause is allowed in the LIMIT clause or be the outermost node,
412-
|but the OFFSET clause found in: ${o.nodeName}.""".stripMargin.replace("\n", " "))
413-
414407
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
415408

416409
case _: Union | _: SetOperation if operator.children.length > 1 =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -541,13 +541,6 @@ class AnalysisErrorSuite extends AnalysisTest {
541541
"The offset expression must be equal to or greater than 0, but got -1" :: Nil
542542
)
543543

544-
errorTest(
545-
"OFFSET clause in other node",
546-
testRelation2.offset(Literal(10, IntegerType)).where('b > 1),
547-
"The OFFSET clause is allowed in the LIMIT clause or be the outermost node," +
548-
" but the OFFSET clause found in: Filter." :: Nil
549-
)
550-
551544
errorTest(
552545
"the sum of num_rows in limit clause and num_rows in offset clause less than Int.MaxValue",
553546
testRelation.offset(Literal(2000000000, IntegerType)).limit(Literal(1000000000, IntegerType)),

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,6 +1955,16 @@ class Dataset[T] private[sql](
19551955
def limitRange(start: Int, end: Int): Dataset[T] = withTypedPlan {
19561956
Limit(Literal(end - start), Offset(Literal(start), logicalPlan))
19571957
}
1958+
/**
1959+
* Returns a new Dataset by skipping the first `m` rows.
1960+
*
1961+
* @group typedrel
1962+
* @since 3.4.0
1963+
*/
1964+
def offset(n: Int): Dataset[T] = withTypedPlan {
1965+
Offset(Literal(n), logicalPlan)
1966+
}
1967+
19581968
/**
19591969
* Returns a new Dataset containing union of rows in this Dataset and another Dataset.
19601970
*

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,30 @@ class DataFrameSuite extends QueryTest
603603
)
604604
}
605605

606+
test("offset") {
607+
checkAnswer(
608+
testData.offset(90),
609+
testData.collect().drop(90).toSeq)
610+
611+
checkAnswer(
612+
arrayData.toDF().offset(99),
613+
arrayData.collect().drop(99).map(r => Row.fromSeq(r.productIterator.toSeq)))
614+
615+
checkAnswer(
616+
mapData.toDF().offset(99),
617+
mapData.collect().drop(99).map(r => Row.fromSeq(r.productIterator.toSeq)))
618+
}
619+
620+
test("limit with offset") {
621+
checkAnswer(
622+
testData.limit(10).offset(5),
623+
testData.take(10).drop(5).toSeq)
624+
625+
checkAnswer(
626+
testData.offset(5).limit(10),
627+
testData.take(15).drop(5).toSeq)
628+
}
629+
606630
test("udf") {
607631
val foo = udf((a: Int, b: String) => a.toString + b)
608632

0 commit comments

Comments
 (0)