Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ abstract class HashExpression[E] extends Expression {
}
}

protected def genHashTimestamp(t: String, result: String): String = genHashLong(t, result)

protected def genHashCalendarInterval(input: String, result: String): String = {
val microsecondsHash = s"$hasherClassName.hashLong($input.microseconds, $result)"
s"$result = $hasherClassName.hashInt($input.months, $microsecondsHash);"
Expand Down Expand Up @@ -400,7 +402,8 @@ abstract class HashExpression[E] extends Expression {
case NullType => ""
case BooleanType => genHashBoolean(input, result)
case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
case LongType | TimestampType => genHashLong(input, result)
case LongType => genHashLong(input, result)
case TimestampType => genHashTimestamp(input, result)
case FloatType => genHashFloat(input, result)
case DoubleType => genHashDouble(input, result)
case d: DecimalType => genHashDecimal(ctx, d, input, result)
Expand Down Expand Up @@ -580,8 +583,6 @@ object XxHash64Function extends InterpretedHashFunction {
*
* We should use this hash function for both shuffle and bucket of Hive tables, so that
* we can guarantee shuffle and bucketing have same data distribution
*
* TODO: Support date related types
*/
@ExpressionDescription(
usage = "_FUNC_(expr1, expr2, ...) - Returns a hash value of the arguments.")
Expand Down Expand Up @@ -648,11 +649,16 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {

override protected def genHashCalendarInterval(input: String, result: String): String = {
s"""
$result = (31 * $hasherClassName.hashInt($input.months)) +
$hasherClassName.hashLong($input.microseconds);"
$result = (int)
${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashCalendarInterval($input);
"""
}

override protected def genHashTimestamp(input: String, result: String): String =
s"""
$result = (int) ${HiveHashFunction.getClass.getName.stripSuffix("$")}.hashTimestamp($input);
"""

override protected def genHashString(input: String, result: String): String = {
val baseObject = s"$input.getBaseObject()"
val baseOffset = s"$input.getBaseOffset()"
Expand Down Expand Up @@ -781,6 +787,49 @@ object HiveHashFunction extends InterpretedHashFunction {
result
}

/**
* Mimics TimestampWritable.hashCode() in Hive
*/
def hashTimestamp(timestamp: Long): Long = {
val timestampInSeconds = timestamp / 1000000
val nanoSecondsPortion = (timestamp % 1000000) * 1000

var result = timestampInSeconds
result <<= 30 // the nanosecond part fits in 30 bits
result |= nanoSecondsPortion
((result >>> 32) ^ result).toInt
}

/**
* Hive allows input intervals to be defined using units below but the intervals
* have to be from the same category:
* - year, month (stored as HiveIntervalYearMonth)
* - day, hour, minute, second, nanosecond (stored as HiveIntervalDayTime)
*
* eg. (INTERVAL '30' YEAR + INTERVAL '-23' DAY) fails in Hive
*
* This method mimics HiveIntervalDayTime.hashCode() in Hive.
*
* Two differences wrt Hive due to how intervals are stored in Spark vs Hive:
*
* - If the `INTERVAL` is backed as HiveIntervalYearMonth in Hive, then this method will not
* produce Hive compatible result. The reason being Spark's representation of calendar does not
* have such categories based on the interval and is unified.
*
* - Spark's [[CalendarInterval]] has precision upto microseconds but Hive's
* HiveIntervalDayTime can store data with precision upto nanoseconds. So, any input intervals
* with nanosecond values will lead to wrong output hashes (ie. non adherent with Hive output)
*/
def hashCalendarInterval(calendarInterval: CalendarInterval): Long = {
val totalSeconds = calendarInterval.microseconds / CalendarInterval.MICROS_PER_SECOND.toInt
val result: Int = (17 * 37) + (totalSeconds ^ totalSeconds >> 32).toInt

val nanoSeconds =
(calendarInterval.microseconds -
(totalSeconds * CalendarInterval.MICROS_PER_SECOND.toInt)).toInt * 1000
(result * 37) + nanoSeconds
}

override def hash(value: Any, dataType: DataType, seed: Long): Long = {
value match {
case null => 0
Expand Down Expand Up @@ -834,10 +883,10 @@ object HiveHashFunction extends InterpretedHashFunction {
}
result

case d: Decimal =>
normalizeDecimal(d.toJavaBigDecimal).hashCode()

case _ => super.hash(value, dataType, seed)
case d: Decimal => normalizeDecimal(d.toJavaBigDecimal).hashCode()
case timestamp: Long if dataType.isInstanceOf[TimestampType] => hashTimestamp(timestamp)
case calendarInterval: CalendarInterval => hashCalendarInterval(calendarInterval)
case _ => super.hash(value, dataType, 0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
package org.apache.spark.sql.catalyst.expressions

import java.nio.charset.StandardCharsets
import java.util.TimeZone

import scala.collection.mutable.ArrayBuffer

import org.apache.commons.codec.digest.DigestUtils
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types.{ArrayType, StructType, _}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val random = new scala.util.Random
Expand Down Expand Up @@ -168,6 +170,208 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:on nonascii
}

test("hive-hash for date type") {
def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
checkHiveHash(
DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
DateType,
expected)
}

// basic case
checkHiveHashForDateType("2017-01-01", 17167)
Copy link
Contributor Author

@tejasapatil tejasapatil Feb 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected values computed over hive 1.2.1.

Here are the queries in Hive:

SELECT HASH( CAST( "2017-01-01" AS DATE) );
SELECT HASH( CAST( "0000-01-01" AS DATE) );
SELECT HASH( CAST( "9999-12-31" AS DATE) );
SELECT HASH( CAST( "1970-01-01" AS DATE) );
SELECT HASH( CAST( "1800-01-01" AS DATE) );


// boundary cases
checkHiveHashForDateType("0000-01-01", -719530)
checkHiveHashForDateType("9999-12-31", 2932896)

// epoch
checkHiveHashForDateType("1970-01-01", 0)

// before epoch
checkHiveHashForDateType("1800-01-01", -62091)

// Invalid input: bad date string. Hive returns 0 for such cases
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark does not allow creating Date which do not fit its spec and throws exception. Hive will not fail but fallback to null and return 0 as hash value.

intercept[NoSuchElementException](checkHiveHashForDateType("0-0-0", 0))
intercept[NoSuchElementException](checkHiveHashForDateType("-1212-01-01", 0))
intercept[NoSuchElementException](checkHiveHashForDateType("2016-99-99", 0))

// Invalid input: Empty string. Hive returns 0 for this case
intercept[NoSuchElementException](checkHiveHashForDateType("", 0))

// Invalid input: February 30th for a leap year. Hive supports this but Spark doesn't
intercept[NoSuchElementException](checkHiveHashForDateType("2016-02-30", 16861))
}

test("hive-hash for timestamp type") {
def checkHiveHashForTimestampType(
timestamp: String,
expected: Long,
timeZone: TimeZone = TimeZone.getTimeZone("UTC")): Unit = {
checkHiveHash(
DateTimeUtils.stringToTimestamp(UTF8String.fromString(timestamp), timeZone).get,
TimestampType,
expected)
}

// basic case
checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445725271)
Copy link
Contributor Author

@tejasapatil tejasapatil Feb 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponding hive query.

select HASH(CAST("2017-02-24 10:56:29" AS TIMESTAMP));
select HASH(CAST("2017-02-24 10:56:29.111111" AS TIMESTAMP));
select HASH(CAST("0001-01-01 00:00:00" AS TIMESTAMP));
select HASH(CAST("9999-01-01 00:00:00" AS TIMESTAMP));
select HASH(CAST("1970-01-01 00:00:00" AS TIMESTAMP));
select HASH(CAST("1800-01-01 03:12:45" AS TIMESTAMP));

Note that this is with system's timezone set to UTC (export TZ=/usr/share/zoneinfo/UTC). One of the tests below was with US/Pacific timezone

select HASH(CAST("2017-02-24 10:56:29" AS TIMESTAMP));


// with higher precision
checkHiveHashForTimestampType("2017-02-24 10:56:29.111111", 1353936655)

// with different timezone
checkHiveHashForTimestampType("2017-02-24 10:56:29", 1445732471,
TimeZone.getTimeZone("US/Pacific"))

// boundary cases
checkHiveHashForTimestampType("0001-01-01 00:00:00", 1645926784)
checkHiveHashForTimestampType("9999-01-01 00:00:00", -1081818240)

// epoch
checkHiveHashForTimestampType("1970-01-01 00:00:00", 0)

// before epoch
checkHiveHashForTimestampType("1800-01-01 03:12:45", -267420885)

// Invalid input: bad timestamp string. Hive returns 0 for such cases
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as Date, invalid timestamp values are not allowed in Spark and it will fail. Hive will not fail but fallback to null and return 0 as hash value.

intercept[NoSuchElementException](checkHiveHashForTimestampType("0-0-0 0:0:0", 0))
intercept[NoSuchElementException](checkHiveHashForTimestampType("-99-99-99 99:99:45", 0))
intercept[NoSuchElementException](checkHiveHashForTimestampType("555555-55555-5555", 0))

// Invalid input: Empty string. Hive returns 0 for this case
intercept[NoSuchElementException](checkHiveHashForTimestampType("", 0))

// Invalid input: February 30th is a leap year. Hive supports this but Spark doesn't
intercept[NoSuchElementException](checkHiveHashForTimestampType("2016-02-30 00:00:00", 0))

// Invalid input: Hive accepts upto 9 decimal place precision but Spark uses upto 6
intercept[TestFailedException](checkHiveHashForTimestampType("2017-02-24 10:56:29.11111111", 0))
}

test("hive-hash for CalendarInterval type") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive queries for all the tests below. Outputs are generated by running against Hive-1.2.1

// ----- MICROSEC -----
SELECT HASH(interval_day_time("0 0:0:0.000001") );
SELECT HASH(interval_day_time("-0 0:0:0.000001") );
SELECT HASH(interval_day_time("0 0:0:0.000000") );
SELECT HASH(interval_day_time("0 0:0:0.000999") );
SELECT HASH(interval_day_time("-0 0:0:0.000999") );

// ----- MILLISEC -----
SELECT HASH(interval_day_time("0 0:0:0.001") );
SELECT HASH(interval_day_time("-0 0:0:0.001") );
SELECT HASH(interval_day_time("0 0:0:0.000") );
SELECT HASH(interval_day_time("0 0:0:0.999") );
SELECT HASH(interval_day_time("-0 0:0:0.999") );

// ----- SECOND -----
SELECT HASH( INTERVAL '1' SECOND);
SELECT HASH( INTERVAL '-1' SECOND);
SELECT HASH( INTERVAL '0' SECOND);
SELECT HASH( INTERVAL '2147483647' SECOND);
SELECT HASH( INTERVAL '-2147483648' SECOND);

// ----- MINUTE -----
SELECT HASH( INTERVAL '1' MINUTE);
SELECT HASH( INTERVAL '-1' MINUTE);
SELECT HASH( INTERVAL '0' MINUTE);
SELECT HASH( INTERVAL '2147483647' MINUTE);
SELECT HASH( INTERVAL '-2147483648' MINUTE);

// ----- HOUR -----
SELECT HASH( INTERVAL '1' HOUR);
SELECT HASH( INTERVAL '-1' HOUR);
SELECT HASH( INTERVAL '0' HOUR);
SELECT HASH( INTERVAL '2147483647' HOUR);
SELECT HASH( INTERVAL '-2147483648' HOUR);

// ----- DAY -----
SELECT HASH( INTERVAL '1' DAY);
SELECT HASH( INTERVAL '-1' DAY);
SELECT HASH( INTERVAL '0' DAY);
SELECT HASH( INTERVAL '106751991' DAY);
SELECT HASH( INTERVAL '-106751991' DAY);

// ----- MIX -----
SELECT HASH( INTERVAL '0' DAY );
SELECT HASH( INTERVAL '0' DAY + INTERVAL '0' HOUR );
SELECT HASH( INTERVAL '0' DAY + INTERVAL '0' HOUR + INTERVAL '0' MINUTE);
SELECT HASH( INTERVAL '0' DAY + INTERVAL '0' HOUR + INTERVAL '0' MINUTE + INTERVAL '0' SECOND);
SELECT HASH(interval_day_time("0 0:0:0.000") );
SELECT HASH(interval_day_time("0 0:0:0.000000") );

SELECT HASH( INTERVAL '6' DAY + INTERVAL '15' HOUR );
SELECT HASH( INTERVAL '5' DAY + INTERVAL '4' HOUR + INTERVAL '8' MINUTE);
SELECT HASH ( INTERVAL '-23' DAY + INTERVAL '56' HOUR + INTERVAL '-1111113' MINUTE + INTERVAL '9898989' SECOND );
SELECT HASH(interval_day_time("66 12:39:23.987") );
SELECT HASH(interval_day_time("66 12:39:23.987123") );

def checkHiveHashForIntervalType(interval: String, expected: Long): Unit = {
checkHiveHash(CalendarInterval.fromString(interval), CalendarIntervalType, expected)
}

// ----- MICROSEC -----

// basic case
checkHiveHashForIntervalType("interval 1 microsecond", 24273)

// negative
checkHiveHashForIntervalType("interval -1 microsecond", 22273)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 microsecond", 23273)
checkHiveHashForIntervalType("interval 999 microsecond", 1022273)
checkHiveHashForIntervalType("interval -999 microsecond", -975727)

// ----- MILLISEC -----

// basic case
checkHiveHashForIntervalType("interval 1 millisecond", 1023273)

// negative
checkHiveHashForIntervalType("interval -1 millisecond", -976727)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 millisecond", 23273)
checkHiveHashForIntervalType("interval 999 millisecond", 999023273)
checkHiveHashForIntervalType("interval -999 millisecond", -998976727)

// ----- SECOND -----

// basic case
checkHiveHashForIntervalType("interval 1 second", 23310)

// negative
checkHiveHashForIntervalType("interval -1 second", 23273)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 second", 23273)
checkHiveHashForIntervalType("interval 2147483647 second", -2147460412)
checkHiveHashForIntervalType("interval -2147483648 second", -2147460412)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 day", -4767228)
Copy link
Member

@gatorsmile gatorsmile Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fix it before merging this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of Spark SQL, the query with fails with exception (see below). However for the test case since I am by-passing and creating raw interval object which does not go through that check

scala> hc.sql("SELECT interval 9999999999 day ").show
org.apache.spark.sql.catalyst.parser.ParseException:
Error parsing interval string: day 9999999999 outside range [-106751991, 106751991](line 1, pos 16)

== SQL ==
SELECT interval 9999999999 day
scala> df.select("INTERVAL 9999999999 day").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`INTERVAL 9999999999 day`' given input columns: [key, value];;
'Project ['INTERVAL 9999999999 day]


// ----- MINUTE -----

// basic cases
checkHiveHashForIntervalType("interval 1 minute", 25493)

// negative
checkHiveHashForIntervalType("interval -1 minute", 25456)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 minute", 23273)
checkHiveHashForIntervalType("interval 2147483647 minute", 21830)
checkHiveHashForIntervalType("interval -2147483648 minute", 22163)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 day", -4767228)
Copy link
Member

@gatorsmile gatorsmile Mar 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit sounds incorrect. The same to the other cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


// ----- HOUR -----

// basic case
checkHiveHashForIntervalType("interval 1 hour", 156473)

// negative
checkHiveHashForIntervalType("interval -1 hour", 156436)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 hour", 23273)
checkHiveHashForIntervalType("interval 2147483647 hour", -62308)
checkHiveHashForIntervalType("interval -2147483648 hour", -43327)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 day", -4767228)

// ----- DAY -----

// basic cases
checkHiveHashForIntervalType("interval 1 day", 3220073)

// negative
checkHiveHashForIntervalType("interval -1 day", 3220036)

// edge / boundary cases
checkHiveHashForIntervalType("interval 0 day", 23273)
checkHiveHashForIntervalType("interval 106751991 day", -451506760)
checkHiveHashForIntervalType("interval -106751991 day", -451514123)

// Hive supports `day` for a longer range but Spark's range is smaller
// The check for range is done at the parser level so this does not fail in Spark
// checkHiveHashForIntervalType("interval -2147483648 day", -1575127)
// checkHiveHashForIntervalType("interval 2147483647 day", -4767228)

// Out of range for both Hive and Spark
// Hive throws an exception. Spark overflows and returns wrong output
// checkHiveHashForIntervalType("interval 9999999999 day", -4767228)

// ----- MIX -----

checkHiveHashForIntervalType("interval 0 day 0 hour", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second", 23273)
checkHiveHashForIntervalType("interval 0 day 0 hour 0 minute 0 second 0 millisecond", 23273)
checkHiveHashForIntervalType(
"interval 0 day 0 hour 0 minute 0 second 0 millisecond 0 microsecond", 23273)

checkHiveHashForIntervalType("interval 6 day 15 hour", 21202073)
checkHiveHashForIntervalType("interval 5 day 4 hour 8 minute", 16557833)
checkHiveHashForIntervalType("interval -23 day 56 hour -1111113 minute 9898989 second",
-2128468593)
checkHiveHashForIntervalType("interval 66 day 12 hour 39 minute 23 second 987 millisecond",
1199697904)
checkHiveHashForIntervalType(
"interval 66 day 12 hour 39 minute 23 second 987 millisecond 123 microsecond", 1199820904)
}

test("hive-hash for array") {
// empty array
checkHiveHash(
Expand Down