Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ case class BucketSpec(
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
* configured.
* @param ignoredProperties is a list of table properties that are used by the underlying table
* but ignored by Spark SQL yet.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -221,7 +223,8 @@ case class CatalogTable(
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true) {
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty) {

import CatalogTable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ class TreeNodeSuite extends SparkFunSuite {
"tracksPartitionsInCatalog" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String],
"schemaPreservesCase" -> JBool(true)))
"schemaPreservesCase" -> JBool(true),
"ignoredProperties" -> JNull))

// For unknown case class, returns JNull.
val bigValue = new Array[Int](10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
case relation: CatalogRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead.
val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) {
totalSize.get
} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
rawDataSize.get
} else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
Expand Down Expand Up @@ -414,6 +415,50 @@ private[hive] class HiveClientImpl(

val properties = Option(h.getParameters).map(_.asScala.toMap).orNull

// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
for (key <- HiveStatisticsProperties; value <- properties.get(key)) {
ignoredProperties += key -> value
}

val excludedTableProperties = HiveStatisticsProperties ++ Set(
// The property value of "comment" is moved to the dedicated field "comment"
"comment",
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
"EXTERNAL"
)

val filteredProperties = properties.filterNot {
case (key, _) => excludedTableProperties.contains(key)
}
val comment = properties.get("comment")

val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
def rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use def?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only used once. We also can use lazy val

case Some(c) if c >= 0 => Some(c)
case _ => None
}
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Currently, only totalSize, rawDataSize, and row_count are used to build the field `stats`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rowCount

// TODO: stats should include all the other two fields (`numFiles` and `numPartitions`).
// (see StatsSetupConst in Hive)
val stats =
// When table is external, `totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero,
// return None. Later, we will use the other ways to estimate the statistics.
if (totalSize.isDefined && totalSize.get > 0L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indention is wrong

Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount))
} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount))
} else {
// TODO: still fill the rowCount even if sizeInBytes is empty. Break some assumptions?
None
}

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
Expand Down Expand Up @@ -451,13 +496,15 @@ private[hive] class HiveClientImpl(
),
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
comment = properties.get("comment"),
properties = filteredProperties,
stats = stats,
comment = comment,
// In older versions of Spark(before 2.2.0), we expand the view original text and store
// that into `viewExpandedText`, and that should be used in view resolution. So we get
// `viewExpandedText` instead of `viewOriginalText` for viewText here.
viewText = Option(h.getViewExpandedText),
unsupportedFeatures = unsupportedFeatures)
unsupportedFeatures = unsupportedFeatures,
ignoredProperties = ignoredProperties.toMap)
}
}

Expand All @@ -474,7 +521,12 @@ private[hive] class HiveClientImpl(
}

override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
// getTableOption removes all the Hive-specific properties. Here, we fill them back to ensure
// these properties are still available to the others that share the same Hive metastore.
// If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect
// these user-specified values.
val hiveTable = toHiveTable(
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
shim.alterTable(client, qualifiedTableName, hiveTable)
Expand Down Expand Up @@ -956,4 +1008,14 @@ private[hive] object HiveClientImpl {
parameters =
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
}

// Below is the key of table properties for storing Hive-generated statistics
private val HiveStatisticsProperties = Set(
StatsSetupConst.COLUMN_STATS_ACCURATE,
StatsSetupConst.NUM_FILES,
StatsSetupConst.NUM_PARTITIONS,
StatsSetupConst.ROW_COUNT,
StatsSetupConst.RAW_DATA_SIZE,
StatsSetupConst.TOTAL_SIZE
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,26 +325,24 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
"last_modified_by",
"last_modified_time",
"Owner:",
"COLUMN_STATS_ACCURATE",
// The following are hive specific schema parameters which we do not need to match exactly.
"numFiles",
"numRows",
"rawDataSize",
"totalSize",
"totalNumberFiles",
"maxFileSize",
"minFileSize",
// EXTERNAL is not non-deterministic, but it is filtered out for external tables.
"EXTERNAL"
"minFileSize"
)

table.copy(
createTime = 0L,
lastAccessTime = 0L,
properties = table.properties.filterKeys(!nondeterministicProps.contains(_))
properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
stats = None,
ignoredProperties = Map.empty
)
}

val e = normalize(actual)
val m = normalize(expected)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?


assert(normalize(actual) == normalize(expected))
}
}
Loading