Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.expressions.postgreSQL.PostgreCastToBoolean
import org.apache.spark.sql.catalyst.expressions.postgreSQL.{PostgreCastToBoolean, PostgreCastToTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, StringType}
import org.apache.spark.sql.types.{BooleanType, TimestampType}

object PostgreSQLDialect {
val postgreSQLDialectRules: List[Rule[LogicalPlan]] =
CastToBoolean ::
CastToBoolean :: CastToTimestamp ::
Nil

object CastToBoolean extends Rule[LogicalPlan] with Logging {
Expand All @@ -46,4 +46,19 @@ object PostgreSQLDialect {
}
}
}

object CastToTimestamp extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
val conf = SQLConf.get
if (conf.usePostgreSQLDialect) {
plan.transformExpressions {
case Cast(child, dataType, timeZoneId)
if dataType == TimestampType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave cast timestamp to timestamp case to Optimizer to do optimization.

Copy link
Member

@Ngone51 Ngone51 Nov 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. Here, I mean that we should change the if condition to: if child.dataType != TimestampType && dataType == TimestampType =>. Because Optimizer, currently, can not optimize Pg cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got this. I will update this.

PostgreCastToTimestamp(child, timeZoneId)
}
} else {
plan
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
}

// TimestampConverter
private[this] def castToTimestamp(from: DataType): Any => Any = from match {
protected[this] def castToTimestamp(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, zoneId).orNull)
case BooleanType =>
Expand Down Expand Up @@ -1159,7 +1159,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
}
}

private[this] def castToTimestampCode(
protected[this] def castToTimestampCode(
from: DataType,
ctx: CodegenContext): CastFunction = from match {
case StringType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@
*/
package org.apache.spark.sql.catalyst.expressions.postgreSQL

import java.time.ZoneId

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{CastBase, Expression, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, JavaCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.postgreSQL.StringUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{BooleanType, DataType, DateType, IntegerType, NullType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])
extends CastBase {
abstract class PostgreCastBase extends CastBase{

override protected def ansiEnabled =
override protected def ansiEnabled: Boolean =
throw new UnsupportedOperationException("PostgreSQL dialect doesn't support ansi mode")

override def nullable: Boolean = child.nullable
}

case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])
extends PostgreCastBase {

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

Expand Down Expand Up @@ -75,9 +85,57 @@ case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])

override def dataType: DataType = BooleanType

override def nullable: Boolean = child.nullable

override def toString: String = s"PostgreCastToBoolean($child as ${dataType.simpleString})"

override def sql: String = s"CAST(${child.sql} AS ${dataType.sql})"
}

case class PostgreCastToTimestamp(child: Expression, timeZoneId: Option[String])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we need to define a new rule and a new cast expr for each Pg cast pattern? I mean we cannot define all the Pg cast patterns in a single rule and a cast expr? cc: @cloud-fan @Ngone51

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can and should combine them into a single one(both rule and expression) when more types get in. Just like the original Cast does. But I'm not sure where shall we start. Maybe, this one ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I personally think so. @cloud-fan

extends PostgreCastBase {
override def dataType: DataType = TimestampType

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
case StringType | DateType =>
TypeCheckResult.TypeCheckSuccess
case _ =>
TypeCheckResult.TypeCheckFailure(s"cannot cast type ${child.dataType} to timestamp")
}
/** Returns a copy of this expression with the specified timeZoneId. */
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def castToTimestamp(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, zoneId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that postgre could correctly parse string 19700101, 1970/01/01, January 1 04:05:06 1970 PST while spark can't. So, I think that we may also need to support it in PostgreCastToTimestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. I will check this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postgres# select cast('19700101' as timestamp);
01.01.1970 00:00:00
postgres# select cast('1970/01/01' as timestamp);
01.01.1970 00:00:00
postgres# select cast('January 1 04:05:06 1970 PST' as timestamp);
01.01.1970 04:05:06

Spark results with NULL for all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu kindly review latest changes and give your feedback on supporting above queries.

Do we need to support them in this PR? If yes, we need to list all formats for timestamps which postgres supports but spark don't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that the support above is not a main issue of this pr, so better to separate the two work: the timestamp cast support and the timestamp format support for the pg dialect.

.getOrElse(throw new AnalysisException(s"invalid input syntax for type timestamp:$utfs")))
case DateType =>
super.castToTimestamp(from)
}

override def castToTimestampCode(
from: DataType,
ctx: CodegenContext): CastFunction = from match {
case StringType =>
val zoneIdClass = classOf[ZoneId]
val zid = JavaCode.global(
ctx.addReferenceObj("zoneId", zoneId, zoneIdClass.getName),
zoneIdClass)
val longOpt = ctx.freshVariable("longOpt", classOf[Option[Long]])
(c, evPrim, evNull) =>
code"""
scala.Option<Long> $longOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid);
if ($longOpt.isDefined()) {
$evPrim = ((Long) $longOpt.get()).longValue();
} else {
$evNull = throw new AnalysisException(s"invalid input syntax for type timestamp:$c");
}
"""
case DateType =>
super.castToTimestampCode(from, ctx)
}

override def toString: String = s"PostgreCastToTimestamp($child as ${dataType.simpleString})"

override def sql: String = s"CAST(${child.sql} AS ${dataType.sql})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,14 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
assert(PostgreCastToBoolean(Literal(1.toDouble), None).checkInputDataTypes().isFailure)
assert(PostgreCastToBoolean(Literal(1.toFloat), None).checkInputDataTypes().isFailure)
}

test("unsupported data types to cast to tiestamp") {
assert(PostgreCastToTimestamp(Literal(1), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toByte), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toDouble), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toFloat), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toLong), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toShort), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(BigDecimal(1.0)), None).checkInputDataTypes().isFailure)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,14 @@ class PostgreSQLDialectQuerySuite extends QueryTest with SharedSparkSession {
intercept[IllegalArgumentException](sql(s"select cast('$input' as boolean)").collect())
}
}

test("cast to timestamp") {
Seq(1, 0.1, 1.toDouble, 5.toFloat, true, 3.toByte, 4.toShort) foreach { value =>
val actualResult = intercept[AnalysisException](
sql(s"SELECT CAST(${value} AS timestamp)")
).getMessage
val expectedResult = s"Cannot cast type ${value.getClass} to Timestamp."
assert(actualResult == expectedResult)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move these tests to SQLQueryTestSuite,e.g., input/postgreSQL/cast.sql?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved these test cases. cast.sql.out needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to delete this test case.

}