-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20318][SQL] Use Catalyst type for min/max in ColumnStat for ease of estimation #17630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.sql.{AnalysisException, Row} | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -74,10 +75,11 @@ case class Statistics( | |
| * Statistics collected for a column. | ||
| * | ||
| * 1. Supported data types are defined in `ColumnStat.supportsType`. | ||
| * 2. The JVM data type stored in min/max is the external data type (used in Row) for the | ||
| * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for | ||
| * TimestampType we store java.sql.Timestamp. | ||
| * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. | ||
| * 2. The JVM data type stored in min/max is the internal data type for the corresponding | ||
| * Catalyst data type. For example, the internal type of DateType is Int, and that the internal | ||
| * type of TimestampType is Long. | ||
| * 3. For integral types, they are all upcasted to Longs, i.e. Shorts are stored as Longs. | ||
| * For FloatType, Float is upcasted to Double. | ||
| * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms | ||
| * (sketches) might have been used, and the data collected can also be stale. | ||
| * | ||
|
|
@@ -104,22 +106,45 @@ case class ColumnStat( | |
| /** | ||
| * Returns a map from string to string that can be used to serialize the column stats. | ||
| * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string | ||
| * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. | ||
| * representation for the value. min/max values are converted to the external data type. For | ||
| * example, for DateType we store java.sql.Date, and for TimestampType we store | ||
| * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]]. | ||
| * | ||
| * As part of the protocol, the returned map always contains a key called "version". | ||
| * In the case min/max values are null (None), they won't appear in the map. | ||
| */ | ||
| def toMap: Map[String, String] = { | ||
| def toMap(colName: String, dataType: DataType): Map[String, String] = { | ||
| val map = new scala.collection.mutable.HashMap[String, String] | ||
| map.put(ColumnStat.KEY_VERSION, "1") | ||
| map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString) | ||
| map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString) | ||
| map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString) | ||
| map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString) | ||
| min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) } | ||
| max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) } | ||
| min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) } | ||
| max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) } | ||
| map.toMap | ||
| } | ||
|
|
||
| /** | ||
| * Converts the given value from Catalyst data type to string representation of external | ||
| * data type. | ||
| */ | ||
| private def toExternalString(v: Any, colName: String, dataType: DataType): String = { | ||
| val externalValue = dataType match { | ||
| case BooleanType => v.asInstanceOf[Boolean] | ||
| case _: IntegralType => v.toString.toLong | ||
| case DateType => DateTimeUtils.toJavaDate(v.toString.toInt) | ||
|
||
| case TimestampType => DateTimeUtils.toJavaTimestamp(v.toString.toLong) | ||
|
||
| case FloatType | DoubleType => v.toString.toDouble | ||
| case _: DecimalType => Decimal.fromDecimal(v).toJavaBigDecimal | ||
|
||
| // This version of Spark does not use min/max for binary/string types so we ignore it. | ||
| case _ => | ||
| throw new AnalysisException("Column statistics deserialization is not supported for " + | ||
| s"column $colName of data type: $dataType.") | ||
| } | ||
| externalValue.toString | ||
| } | ||
|
|
||
| } | ||
|
|
||
|
|
||
|
|
@@ -150,28 +175,15 @@ object ColumnStat extends Logging { | |
| * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats | ||
| * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. | ||
| */ | ||
| def fromMap(table: String, field: StructField, map: Map[String, String]) | ||
| : Option[ColumnStat] = { | ||
| val str2val: (String => Any) = field.dataType match { | ||
| case _: IntegralType => _.toLong | ||
| case _: DecimalType => new java.math.BigDecimal(_) | ||
| case DoubleType | FloatType => _.toDouble | ||
| case BooleanType => _.toBoolean | ||
| case DateType => java.sql.Date.valueOf | ||
| case TimestampType => java.sql.Timestamp.valueOf | ||
| // This version of Spark does not use min/max for binary/string types so we ignore it. | ||
| case BinaryType | StringType => _ => null | ||
| case _ => | ||
| throw new AnalysisException("Column statistics deserialization is not supported for " + | ||
| s"column ${field.name} of data type: ${field.dataType}.") | ||
| } | ||
|
|
||
| def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = { | ||
| try { | ||
| Some(ColumnStat( | ||
| distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong), | ||
| // Note that flatMap(Option.apply) turns Option(null) into None. | ||
| min = map.get(KEY_MIN_VALUE).map(str2val).flatMap(Option.apply), | ||
| max = map.get(KEY_MAX_VALUE).map(str2val).flatMap(Option.apply), | ||
| min = map.get(KEY_MIN_VALUE) | ||
| .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply), | ||
| max = map.get(KEY_MAX_VALUE) | ||
| .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply), | ||
| nullCount = BigInt(map(KEY_NULL_COUNT).toLong), | ||
| avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong, | ||
| maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong | ||
|
|
@@ -183,6 +195,26 @@ object ColumnStat extends Logging { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Converts from string representation of external data type to the corresponding Catalyst data | ||
| * type. | ||
| */ | ||
| private def fromExternalString(s: String, name: String, dataType: DataType): Any = { | ||
| dataType match { | ||
| case BooleanType => s.toBoolean | ||
| case _: IntegralType => s.toLong | ||
| case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s)).toLong | ||
|
||
| case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s)) | ||
| case FloatType | DoubleType => s.toDouble | ||
| case _: DecimalType => Decimal(s) | ||
| // This version of Spark does not use min/max for binary/string types so we ignore it. | ||
| case BinaryType | StringType => null | ||
| case _ => | ||
| throw new AnalysisException("Column statistics deserialization is not supported for " + | ||
| s"column $name of data type: $dataType.") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Constructs an expression to compute column statistics for a given column. | ||
| * | ||
|
|
@@ -232,11 +264,14 @@ object ColumnStat extends Logging { | |
| } | ||
|
|
||
| /** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */ | ||
| def rowToColumnStat(row: Row): ColumnStat = { | ||
| def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = { | ||
| ColumnStat( | ||
| distinctCount = BigInt(row.getLong(0)), | ||
| min = Option(row.get(1)), // for string/binary min/max, get should return null | ||
| max = Option(row.get(2)), | ||
| // for string/binary min/max, get should return null | ||
| min = Option(row.get(1)) | ||
| .map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply), | ||
| max = Option(row.get(2)) | ||
| .map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply), | ||
| nullCount = BigInt(row.getLong(3)), | ||
| avgLen = row.getLong(4), | ||
| maxLen = row.getLong(5) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we get the
externalValuefirst and then calltoString? this means for long we will dol.toString.toLong.toStringUh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea good point. I should use
asInstanceto replace all thesetoString/toLong. Then calltoStringafter conversion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should just return string in each cases of this pattern match.