Skip to content

Commit 1df405f

Browse files
chenzhxcloud-fan
authored andcommitted
[SPARK-38899][SQL] DS V2 supports push down datetime functions
### What changes were proposed in this pull request? Currently, Spark have some datetime functions. Please refer https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L577 These functions show below: `DATE_ADD,` `DATEDIFF`, `TRUNC`, `EXTRACT`, `SECOND`, `MINUTE`, `HOUR`, `MONTH`, `QUARTER`, `YEAR`, `DAYOFWEEK`, `DAYOFMONTH`, `DAYOFYEAR` The mainstream databases support these functions show below. Function|PostgreSQL|ClickHouse|H2|MySQL|Oracle|Presto|Teradata|Snowflake|DB2|Vertica|Exasol|Impala|Mariadb|Druid|Singlestore|ElasticSearch -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- `DateAdd`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|No|No|Yes|Yes|No|Yes|Yes `DateDiff`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|Yes|No|Yes|Yes|No|Yes|Yes `DateTrunc`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes| Yes|Yes|Yes|Yes|No|Yes|Yes|Yes `Hour`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Minute`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Month`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Quarter`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Second`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Year`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `DayOfMonth`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes `DayOfWeek`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes `DayOfYear`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes `WEEK_OF_YEAR`|Yes|No|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes `YEAR_OF_WEEK`|No|No|Yes|Yes|Yes|Yes|No|Yes|No|No|No|No|Yes|No|No|No DS V2 should supports push down these datetime functions. ### Why are the changes needed? DS V2 supports push down datetime functions. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36663 from chenzhx/datetime. Authored-by: chenzhx <chen@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 231d376 commit 1df405f

6 files changed

Lines changed: 296 additions & 24 deletions

File tree

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.expressions;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
import java.io.Serializable;
23+
24+
/**
25+
* Represent an extract function, which extracts and returns the value of a
26+
* specified datetime field from a datetime or interval value expression.
27+
* <p>
28+
* The currently supported fields names following the ISO standard:
29+
* <ol>
30+
* <li> <code>SECOND</code> Since 3.4.0 </li>
31+
* <li> <code>MINUTE</code> Since 3.4.0 </li>
32+
* <li> <code>HOUR</code> Since 3.4.0 </li>
33+
* <li> <code>MONTH</code> Since 3.4.0 </li>
34+
* <li> <code>QUARTER</code> Since 3.4.0 </li>
35+
* <li> <code>YEAR</code> Since 3.4.0 </li>
36+
* <li> <code>DAY_OF_WEEK</code> Since 3.4.0 </li>
37+
* <li> <code>DAY</code> Since 3.4.0 </li>
38+
* <li> <code>DAY_OF_YEAR</code> Since 3.4.0 </li>
39+
* <li> <code>WEEK</code> Since 3.4.0 </li>
40+
* <li> <code>YEAR_OF_WEEK</code> Since 3.4.0 </li>
41+
* </ol>
42+
*
43+
* @since 3.4.0
44+
*/
45+
46+
@Evolving
47+
public class Extract implements Expression, Serializable {
48+
49+
private String field;
50+
private Expression source;
51+
52+
public Extract(String field, Expression source) {
53+
this.field = field;
54+
this.source = source;
55+
}
56+
57+
public String field() { return field; }
58+
public Expression source() { return source; }
59+
60+
@Override
61+
public Expression[] children() { return new Expression[]{ source() }; }
62+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,24 @@
346346
* <li>Since version: 3.4.0</li>
347347
* </ul>
348348
* </li>
349+
* <li>Name: <code>DATE_ADD</code>
350+
* <ul>
351+
* <li>SQL semantic: <code>DATE_ADD(start_date, num_days)</code></li>
352+
* <li>Since version: 3.4.0</li>
353+
* </ul>
354+
* </li>
355+
* <li>Name: <code>DATE_DIFF</code>
356+
* <ul>
357+
* <li>SQL semantic: <code>DATE_DIFF(end_date, start_date)</code></li>
358+
* <li>Since version: 3.4.0</li>
359+
* </ul>
360+
* </li>
361+
* <li>Name: <code>TRUNC</code>
362+
* <ul>
363+
* <li>SQL semantic: <code>TRUNC(date, format)</code></li>
364+
* <li>Since version: 3.4.0</li>
365+
* </ul>
366+
* </li>
349367
* </ol>
350368
* Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
351369
* including: add, subtract, multiply, divide, remainder, pmod.

sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.spark.sql.connector.expressions.Cast;
2525
import org.apache.spark.sql.connector.expressions.Expression;
26+
import org.apache.spark.sql.connector.expressions.Extract;
2627
import org.apache.spark.sql.connector.expressions.NamedReference;
2728
import org.apache.spark.sql.connector.expressions.GeneralScalarExpression;
2829
import org.apache.spark.sql.connector.expressions.Literal;
@@ -46,6 +47,9 @@ public String build(Expression expr) {
4647
} else if (expr instanceof Cast) {
4748
Cast cast = (Cast) expr;
4849
return visitCast(build(cast.expression()), cast.dataType());
50+
} else if (expr instanceof Extract) {
51+
Extract extract = (Extract) expr;
52+
return visitExtract(extract.field(), build(extract.source()));
4953
} else if (expr instanceof GeneralScalarExpression) {
5054
GeneralScalarExpression e = (GeneralScalarExpression) expr;
5155
String name = e.name();
@@ -136,6 +140,9 @@ public String build(Expression expr) {
136140
case "UPPER":
137141
case "LOWER":
138142
case "TRANSLATE":
143+
case "DATE_ADD":
144+
case "DATE_DIFF":
145+
case "TRUNC":
139146
return visitSQLFunction(name,
140147
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
141148
case "CASE_WHEN": {
@@ -327,4 +334,8 @@ protected String visitTrim(String direction, String[] inputs) {
327334
return "TRIM(" + direction + " " + inputs[1] + " FROM " + inputs[0] + ")";
328335
}
329336
}
337+
338+
protected String visitExtract(String field, String source) {
339+
return "EXTRACT(" + field + " FROM " + source + ")";
340+
}
330341
}

sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.spark.sql.catalyst.util
1919

2020
import org.apache.spark.sql.catalyst.expressions._
21-
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
21+
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
2222
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
23-
import org.apache.spark.sql.types.BooleanType
23+
import org.apache.spark.sql.types.{BooleanType, IntegerType}
2424

2525
/**
2626
* The builder to generate V2 expressions from catalyst expressions.
@@ -344,6 +344,59 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
344344
} else {
345345
None
346346
}
347+
case date: DateAdd =>
348+
val childrenExpressions = date.children.flatMap(generateExpression(_))
349+
if (childrenExpressions.length == date.children.length) {
350+
Some(new GeneralScalarExpression("DATE_ADD", childrenExpressions.toArray[V2Expression]))
351+
} else {
352+
None
353+
}
354+
case date: DateDiff =>
355+
val childrenExpressions = date.children.flatMap(generateExpression(_))
356+
if (childrenExpressions.length == date.children.length) {
357+
Some(new GeneralScalarExpression("DATE_DIFF", childrenExpressions.toArray[V2Expression]))
358+
} else {
359+
None
360+
}
361+
case date: TruncDate =>
362+
val childrenExpressions = date.children.flatMap(generateExpression(_))
363+
if (childrenExpressions.length == date.children.length) {
364+
Some(new GeneralScalarExpression("TRUNC", childrenExpressions.toArray[V2Expression]))
365+
} else {
366+
None
367+
}
368+
case Second(child, _) =>
369+
generateExpression(child).map(v => new V2Extract("SECOND", v))
370+
case Minute(child, _) =>
371+
generateExpression(child).map(v => new V2Extract("MINUTE", v))
372+
case Hour(child, _) =>
373+
generateExpression(child).map(v => new V2Extract("HOUR", v))
374+
case Month(child) =>
375+
generateExpression(child).map(v => new V2Extract("MONTH", v))
376+
case Quarter(child) =>
377+
generateExpression(child).map(v => new V2Extract("QUARTER", v))
378+
case Year(child) =>
379+
generateExpression(child).map(v => new V2Extract("YEAR", v))
380+
// DayOfWeek uses Sunday = 1, Monday = 2, ... and ISO standard is Monday = 1, ...,
381+
// so we use the formula ((ISO_standard % 7) + 1) to do translation.
382+
case DayOfWeek(child) =>
383+
generateExpression(child).map(v => new GeneralScalarExpression("+",
384+
Array[V2Expression](new GeneralScalarExpression("%",
385+
Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))),
386+
LiteralValue(1, IntegerType))))
387+
// WeekDay uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ...,
388+
// so we use the formula (ISO_standard - 1) to do translation.
389+
case WeekDay(child) =>
390+
generateExpression(child).map(v => new GeneralScalarExpression("-",
391+
Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType))))
392+
case DayOfMonth(child) =>
393+
generateExpression(child).map(v => new V2Extract("DAY", v))
394+
case DayOfYear(child) =>
395+
generateExpression(child).map(v => new V2Extract("DAY_OF_YEAR", v))
396+
case WeekOfYear(child) =>
397+
generateExpression(child).map(v => new V2Extract("WEEK", v))
398+
case YearOfWeek(child) =>
399+
generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v))
347400
// TODO supports other expressions
348401
case ApplyFunctionExpression(function, children) =>
349402
val childrenExpressions = children.flatMap(generateExpression(_))

sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import java.util.Locale
2222
import java.util.concurrent.ConcurrentHashMap
2323

2424
import scala.collection.JavaConverters._
25+
import scala.util.control.NonFatal
2526

2627
import org.apache.spark.sql.AnalysisException
2728
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2829
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
30+
import org.apache.spark.sql.connector.expressions.Expression
2931
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
3032
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
3133
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
@@ -132,4 +134,28 @@ private[sql] object H2Dialect extends JdbcDialect {
132134
}
133135
super.classifyException(message, e)
134136
}
137+
138+
override def compileExpression(expr: Expression): Option[String] = {
139+
val jdbcSQLBuilder = new H2JDBCSQLBuilder()
140+
try {
141+
Some(jdbcSQLBuilder.build(expr))
142+
} catch {
143+
case NonFatal(e) =>
144+
logWarning("Error occurs while compiling V2 expression", e)
145+
None
146+
}
147+
}
148+
149+
class H2JDBCSQLBuilder extends JDBCSQLBuilder {
150+
151+
override def visitExtract(field: String, source: String): String = {
152+
val newField = field match {
153+
case "DAY_OF_WEEK" => "ISO_DAY_OF_WEEK"
154+
case "WEEK" => "ISO_WEEK"
155+
case "YEAR_OF_WEEK" => "ISO_WEEK_YEAR"
156+
case _ => field
157+
}
158+
s"EXTRACT($newField FROM $source)"
159+
}
160+
}
135161
}

0 commit comments

Comments
 (0)