From 791b4e26df64165219ac1504a23d470e8450ac2b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 18:36:35 +0100 Subject: [PATCH 01/17] Add SQL config spark.sql.legacy.utcTimestampFunc.enabled --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a4ca1a0a72ae2..b11e1dccdcaaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1717,6 +1717,12 @@ object SQLConf { "and java.sql.Date are used for the same purpose.") .booleanConf .createWithDefault(false) + + val UTC_TIMESTAMP_FUNC_ENABLED = buildConf("spark.sql.legacy.utcTimestampFunc.enabled") + .doc("The configuration property enables the to_utc_timestamp() " + + "and from_utc_timestamp() functions.") + .booleanConf + .createWithDefault(false) } /** @@ -1908,6 +1914,8 @@ class SQLConf extends Serializable with Logging { def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_EANBLED) + def utcTimestampFuncEnabled: Boolean = getConf(UTC_TIMESTAMP_FUNC_ENABLED) + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. From 86651594858761ee5d14212bfb64bc742c29dcff Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 18:37:17 +0100 Subject: [PATCH 02/17] Throw an exception when utc functions are disabled --- .../catalyst/expressions/datetimeExpressions.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 7878a87572a24..c89e084782090 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -26,11 +26,13 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -1025,6 +1027,11 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S case class FromUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) override def dataType: DataType = TimestampType override def prettyName: String = "from_utc_timestamp" @@ -1231,6 +1238,11 @@ case class MonthsBetween( case class ToUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) override def dataType: DataType = TimestampType override def prettyName: String = "to_utc_timestamp" From 9c00cfbc7c4f7164012a0589ea0a480ac8733407 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 18:40:14 +0100 Subject: [PATCH 03/17] Add a gap at the end of the first sentence in the exception. --- .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index c89e084782090..8d4c964663a61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -1239,7 +1239,7 @@ case class ToUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { if (!SQLConf.get.utcTimestampFuncEnabled) { - throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." + + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") } From 2f6fcf2a43c97f02f66a463147e2f58af9e1c4fa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 19:02:17 +0100 Subject: [PATCH 04/17] Fix DateExpressionsSuite --- .../expressions/DateExpressionsSuite.scala | 63 ++++++++++++------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 61ee8f0cd46d2..64bf89926b479 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -25,10 +25,12 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -816,21 +818,29 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { NonFoldableLiteral.create(tz, StringType)), if (expected != null) Timestamp.valueOf(expected) else null) } - test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") - test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") - test(null, "UTC", null) - test("2015-07-24 00:00:00", null, null) - test(null, null, null) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + val msg = intercept[AnalysisException] { + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("to_utc_timestamp - invalid time zone id") { - Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => - val msg = intercept[java.time.DateTimeException] { - GenerateUnsafeProjection.generate( - ToUTCTimestamp( - Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil) - }.getMessage - assert(msg.contains(invalidTz)) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => + val msg = intercept[java.time.DateTimeException] { + GenerateUnsafeProjection.generate( + ToUTCTimestamp( + Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil) + }.getMessage + assert(msg.contains(invalidTz)) + } } } @@ -847,19 +857,28 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { NonFoldableLiteral.create(tz, StringType)), if (expected != null) Timestamp.valueOf(expected) else null) } - test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") - test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") - test(null, "UTC", null) - test("2015-07-24 00:00:00", null, null) - test(null, null, null) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + val msg = intercept[AnalysisException] { + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("from_utc_timestamp - invalid time zone id") { - Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => - val msg = intercept[java.time.DateTimeException] { - GenerateUnsafeProjection.generate(FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil) - }.getMessage - assert(msg.contains(invalidTz)) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => + val msg = intercept[java.time.DateTimeException] { + GenerateUnsafeProjection.generate( + FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil) + }.getMessage + assert(msg.contains(invalidTz)) + } } } } From d37483e74f976478bff2994d821ba1b40c6ad941 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 20:01:17 +0100 Subject: [PATCH 05/17] Fix DateFunctionsSuite --- .../apache/spark/sql/DateFunctionsSuite.scala | 113 ++++++++++-------- 1 file changed, 65 insertions(+), 48 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index b06d52ddebc79..29cef693ea833 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.unsafe.types.CalendarInterval @@ -672,33 +673,41 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") ).toDF("a", "b") - checkAnswer( - df.select(from_utc_timestamp(col("a"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-23 17:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) - checkAnswer( - df.select(from_utc_timestamp(col("b"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-23 17:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + checkAnswer( + df.select(from_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } + val msg = intercept[AnalysisException] { + df.select(from_utc_timestamp(col("a"), "PST")).collect() + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("from_utc_timestamp with column zone") { - val df = Seq( - (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"), - (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST") - ).toDF("a", "b", "c") - checkAnswer( - df.select(from_utc_timestamp(col("a"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 02:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) - checkAnswer( - df.select(from_utc_timestamp(col("b"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 02:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST") + ).toDF("a", "b", "c") + checkAnswer( + df.select(from_utc_timestamp(col("a"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 02:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 02:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } } test("to_utc_timestamp with literal zone") { @@ -706,32 +715,40 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") ).toDF("a", "b") - checkAnswer( - df.select(to_utc_timestamp(col("a"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-25 07:00:00")))) - checkAnswer( - df.select(to_utc_timestamp(col("b"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + checkAnswer( + df.select(to_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + } + val msg = intercept[AnalysisException] { + df.select(to_utc_timestamp(col("a"), "PST")).collect() + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("to_utc_timestamp with column zone") { - val df = Seq( - (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"), - (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET") - ).toDF("a", "b", "c") - checkAnswer( - df.select(to_utc_timestamp(col("a"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-24 22:00:00")))) - checkAnswer( - df.select(to_utc_timestamp(col("b"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET") + ).toDF("a", "b", "c") + checkAnswer( + df.select(to_utc_timestamp(col("a"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + } } } From 1720e5bff4566446a649506eee9272e9af0ad0b7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 20:10:26 +0100 Subject: [PATCH 06/17] Fix DateTimeBenchmark --- .../sql/execution/benchmark/DateTimeBenchmark.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala index cbd51b490141b..17bdd218dc172 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.internal.SQLConf /** * Synthetic benchmark for date and timestamp functions. @@ -86,9 +87,11 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')") } runBenchmark("Convert timestamps") { - val timestampExpr = "cast(id as timestamp)" - run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") - run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val timestampExpr = "cast(id as timestamp)" + run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") + run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") + } } runBenchmark("Intervals") { val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)") From a4971e1960c7cac3828735a03771d3c231141240 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 20:18:32 +0100 Subject: [PATCH 07/17] Fix StreamingAggregationSuite --- .../apache/spark/sql/streaming/StreamingAggregationSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 116fd74352fe2..e9fee7f860d6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -344,11 +344,10 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { testWithAllStateVersions("prune results by current_date, complete mode") { import testImplicits._ val clock = new StreamManualClock - val tz = TimeZone.getDefault.getID val inputData = MemoryStream[Long] val aggregated = inputData.toDF() - .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .select(to_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY))) .toDF("value") .groupBy($"value") .agg(count("*")) From 9c896b0daf58333f2c8f90098115710e36f3a80a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 21:29:08 +0100 Subject: [PATCH 08/17] Fix StreamingAggregationSuite --- .../spark/sql/streaming/StreamingAggregationSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index e9fee7f860d6b..485eb7bcb2e60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.streaming import java.io.File -import java.util.{Locale, TimeZone} +import java.util.Locale import org.apache.commons.io.FileUtils -import org.scalatest.{Assertions, BeforeAndAfterAll} +import org.scalatest.Assertions import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.rdd.BlockRDD @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.state.{StateStore, StreamingAggregationStateManager} +import org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManager import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf From c4a4a933397cbe4fc621e01afe3fc48b167962fd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 21:47:44 +0100 Subject: [PATCH 09/17] Fix CodeGenerationSuite --- .../expressions/CodeGenerationSuite.scala | 61 +++++++++++-------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 7d656fc57d753..4e64313da136b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import org.apache.log4j.{Appender, AppenderSkeleton, Logger} +import org.apache.log4j.AppenderSkeleton import org.apache.log4j.spi.LoggingEvent import org.apache.spark.SparkFunSuite @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ThreadUtils @@ -189,36 +190,42 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") { - val length = 5000 - val expressions = Seq.fill(length) { - ToUTCTimestamp( - Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType), - Literal.create("PST", StringType)) - } - val plan = GenerateMutableProjection.generate(expressions) - val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) - val expected = Seq.fill(length)( - DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00"))) - - if (actual != expected) { - fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val length = 5000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00"))) + + if (actual != expected) { + fail( + s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } } test("SPARK-22226: group splitted expressions into one method per nested class") { - val length = 10000 - val expressions = Seq.fill(length) { - ToUTCTimestamp( - Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), - Literal.create("PST", StringType)) - } - val plan = GenerateMutableProjection.generate(expressions) - val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) - val expected = Seq.fill(length)( - DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) - - if (actual != expected) { - fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val length = 10000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) + + if (actual != expected) { + fail( + s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } } From daa1eb7843a3dc854e02a64c4cf0c6fa8a424e7c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Mar 2019 23:05:47 +0100 Subject: [PATCH 10/17] Deprecate functions --- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index f99186cabc26d..bcb57836021e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2988,6 +2988,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def from_utc_timestamp(ts: Column, tz: String): Column = withExpr { FromUTCTimestamp(ts.expr, Literal(tz)) } @@ -2999,6 +3000,7 @@ object functions { * @group datetime_funcs * @since 2.4.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr { FromUTCTimestamp(ts.expr, tz.expr) } @@ -3017,6 +3019,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def to_utc_timestamp(ts: Column, tz: String): Column = withExpr { ToUTCTimestamp(ts.expr, Literal(tz)) } @@ -3028,6 +3031,7 @@ object functions { * @group datetime_funcs * @since 2.4.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr { ToUTCTimestamp(ts.expr, tz.expr) } From 56e2b3fbbaef738a13980c608872bb60139af65a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 27 Mar 2019 11:10:03 +0100 Subject: [PATCH 11/17] Deprecate in SparkR --- R/pkg/R/functions.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d91896a9a9943..3ce733bae53d5 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2459,6 +2459,7 @@ setMethod("schema_of_csv", signature(x = "characterOrColumn"), #' @note from_utc_timestamp since 1.5.0 setMethod("from_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { + .Deprecated("from_utc_timestamp") jc <- callJStatic("org.apache.spark.sql.functions", "from_utc_timestamp", y@jc, x) column(jc) }) @@ -2517,6 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = "character"), #' @note to_utc_timestamp since 1.5.0 setMethod("to_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { + .Deprecated("to_utc_timestamp") jc <- callJStatic("org.apache.spark.sql.functions", "to_utc_timestamp", y@jc, x) column(jc) }) From eb0fb525695dd333c86a67bb981b60f24b197f55 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 27 Mar 2019 11:52:23 +0100 Subject: [PATCH 12/17] Guard examples by the flag --- python/pyspark/sql/functions.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6ae23576e7bc8..0c40699b16273 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1301,11 +1301,13 @@ def from_utc_timestamp(timestamp, tz): .. versionchanged:: 2.4 `tz` can take a :class:`Column` containing timezone ID strings. + >>> spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") >>> df = spark.createDataFrame([('1997-02-28 10:30:00', 'JST')], ['ts', 'tz']) >>> df.select(from_utc_timestamp(df.ts, "PST").alias('local_time')).collect() [Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))] >>> df.select(from_utc_timestamp(df.ts, df.tz).alias('local_time')).collect() [Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))] + >>> spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") """ sc = SparkContext._active_spark_context if isinstance(tz, Column): @@ -1335,11 +1337,13 @@ def to_utc_timestamp(timestamp, tz): .. versionchanged:: 2.4 `tz` can take a :class:`Column` containing timezone ID strings. + >>> spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") >>> df = spark.createDataFrame([('1997-02-28 10:30:00', 'JST')], ['ts', 'tz']) >>> df.select(to_utc_timestamp(df.ts, "PST").alias('utc_time')).collect() [Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))] >>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect() [Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))] + >>> spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") """ sc = SparkContext._active_spark_context if isinstance(tz, Column): From 828da8b2cf6eb873481ea69b69c33d8f7108c784 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 27 Mar 2019 11:56:13 +0100 Subject: [PATCH 13/17] Add from_utc_timestamp/to_utc_timestamp to the deprecation list --- python/pyspark/sql/functions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0c40699b16273..9d8b7dd3d8d8e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -287,6 +287,8 @@ def _(): # Wraps deprecated functions (keys) with the messages (values). _functions_deprecated = { + 'from_utc_timestamp': 'Deprecated in 3.0.', + 'to_utc_timestamp': 'Deprecated in 3.0.', } for _name, _doc in _functions.items(): From 036bfd7ae633a403c9554a21bae201fc2bf83ad7 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 28 Mar 2019 21:02:34 +0900 Subject: [PATCH 14/17] Address my own / felix comments --- R/pkg/R/functions.R | 4 ++-- python/pyspark/sql/functions.py | 16 ++++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 3ce733bae53d5..0566a47cc8755 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2459,7 +2459,7 @@ setMethod("schema_of_csv", signature(x = "characterOrColumn"), #' @note from_utc_timestamp since 1.5.0 setMethod("from_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { - .Deprecated("from_utc_timestamp") + .Deprecated(msg = "from_utc_timestamp is deprecated. See SPARK-25496.") jc <- callJStatic("org.apache.spark.sql.functions", "from_utc_timestamp", y@jc, x) column(jc) }) @@ -2518,7 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = "character"), #' @note to_utc_timestamp since 1.5.0 setMethod("to_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { - .Deprecated("to_utc_timestamp") + .Deprecated(msg = "to_utc_timestamp is deprecated. See SPARK-25496.") jc <- callJStatic("org.apache.spark.sql.functions", "to_utc_timestamp", y@jc, x) column(jc) }) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9d8b7dd3d8d8e..22163f52b472b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -287,8 +287,6 @@ def _(): # Wraps deprecated functions (keys) with the messages (values). _functions_deprecated = { - 'from_utc_timestamp': 'Deprecated in 3.0.', - 'to_utc_timestamp': 'Deprecated in 3.0.', } for _name, _doc in _functions.items(): @@ -1303,14 +1301,15 @@ def from_utc_timestamp(timestamp, tz): .. versionchanged:: 2.4 `tz` can take a :class:`Column` containing timezone ID strings. - >>> spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") >>> df = spark.createDataFrame([('1997-02-28 10:30:00', 'JST')], ['ts', 'tz']) >>> df.select(from_utc_timestamp(df.ts, "PST").alias('local_time')).collect() [Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))] >>> df.select(from_utc_timestamp(df.ts, df.tz).alias('local_time')).collect() [Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))] - >>> spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") + + .. note:: Deprecated in 3.0. See SPARK-25496 """ + warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning) sc = SparkContext._active_spark_context if isinstance(tz, Column): tz = _to_java_column(tz) @@ -1339,14 +1338,15 @@ def to_utc_timestamp(timestamp, tz): .. versionchanged:: 2.4 `tz` can take a :class:`Column` containing timezone ID strings. - >>> spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") >>> df = spark.createDataFrame([('1997-02-28 10:30:00', 'JST')], ['ts', 'tz']) >>> df.select(to_utc_timestamp(df.ts, "PST").alias('utc_time')).collect() [Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))] >>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect() [Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))] - >>> spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") + + .. note:: Deprecated in 3.0. See SPARK-25496 """ + warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning) sc = SparkContext._active_spark_context if isinstance(tz, Column): tz = _to_java_column(tz) @@ -3197,9 +3197,13 @@ def _test(): globs['sc'] = sc globs['spark'] = spark globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)]) + + spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") + spark.stop() if failure_count: sys.exit(-1) From 7d232d8267e9c5623b5b747fbf584a1ab3bdb994 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 28 Mar 2019 19:59:28 +0100 Subject: [PATCH 15/17] Fix SparkR tests --- R/pkg/tests/fulltests/test_sparkSQL.R | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 2394f74142847..7b0c04c34f477 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1905,13 +1905,22 @@ test_that("date functions on a DataFrame", { df2 <- createDataFrame(l2) expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24)) expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34)) - expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], + conf <- callJMethod(sparkSession, "conf") + isUtcTimestampFuncEnabled <- callJMethod(conf, "get", "spark.sql.legacy.utcTimestampFunc.enabled") + callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true") + tryCatch({ + expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) - expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], + expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) - expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) - expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) - expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) + expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) + expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) + expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) + }, + finally = { + # Reverting the conf back + callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", isUtcTimestampFuncEnabled) + }) expect_equal(collect(select(df2, month(date_trunc("yyyy", df2$b))))[, 1], c(1, 1)) l3 <- list(list(a = 1000), list(a = -1000)) From c80be0adc8cefd26ed02be79adb60098dbba61a2 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 29 Mar 2019 08:45:58 +0900 Subject: [PATCH 16/17] Fix R tests --- R/pkg/tests/fulltests/test_sparkSQL.R | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 7b0c04c34f477..fdc7474820659 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1909,18 +1909,19 @@ test_that("date functions on a DataFrame", { isUtcTimestampFuncEnabled <- callJMethod(conf, "get", "spark.sql.legacy.utcTimestampFunc.enabled") callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true") tryCatch({ - expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) - expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) - expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) - expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) - expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) + # Both from_utc_timestamp and to_utc_timestamp are deprecated as of SPARK-25496 + expect_equal(suppressWarnings(collect(select(df2, from_utc_timestamp(df2$b, "JST"))))[, 1], + c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) + expect_equal(suppressWarnings(collect(select(df2, to_utc_timestamp(df2$b, "JST"))))[, 1], + c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) }, finally = { # Reverting the conf back callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", isUtcTimestampFuncEnabled) }) + expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) + expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) + expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) expect_equal(collect(select(df2, month(date_trunc("yyyy", df2$b))))[, 1], c(1, 1)) l3 <- list(list(a = 1000), list(a = -1000)) From de77ac23346810bed33bf14a17dc2498db1ef477 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 2 Apr 2019 12:14:44 +0200 Subject: [PATCH 17/17] Revert to_utc_timestamp back and guard it by the flag --- .../streaming/StreamingAggregationSuite.scala | 96 ++++++++++--------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 485eb7bcb2e60..2a9e6b849d897 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.streaming import java.io.File -import java.util.Locale +import java.util.{Locale, TimeZone} import org.apache.commons.io.FileUtils import org.scalatest.Assertions @@ -342,51 +342,55 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } testWithAllStateVersions("prune results by current_date, complete mode") { - import testImplicits._ - val clock = new StreamManualClock - val inputData = MemoryStream[Long] - val aggregated = - inputData.toDF() - .select(to_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY))) - .toDF("value") - .groupBy($"value") - .agg(count("*")) - .where($"value".cast("date") >= date_sub(current_date(), 10)) - .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") - testStream(aggregated, Complete)( - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), - // advance clock to 10 days, should retain all keys - AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 days, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 days, should retain keys >= 20 - AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((20L, 1), (85L, 1)), - - // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 90 days. - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - q.commitLog.purge(3) - // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) - true - }, - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), - // Commit log blown, causing a re-run of the last batch - CheckLastBatch((20L, 1), (85L, 1)), - - // advance clock to 100 days, should retain keys >= 90 - AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) - ) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + import testImplicits._ + val clock = new StreamManualClock + val tz = TimeZone.getDefault.getID + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF() + .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .toDF("value") + .groupBy($"value") + .agg(count("*")) + .where($"value".cast("date") >= date_sub(current_date(), 10)) + .select( + ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") + testStream(aggregated, Complete)( + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + // advance clock to 10 days, should retain all keys + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + // advance clock to 20 days, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + // advance clock to 30 days, should retain keys >= 20 + AddData(inputData, 85L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((20L, 1), (85L, 1)), + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 90 days. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + q.commitLog.purge(3) + // advance by 60 days i.e., 90 days total + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) + true + }, + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch + CheckLastBatch((20L, 1), (85L, 1)), + + // advance clock to 100 days, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) + ) + } } testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in streaming query " +