From 7380d4a3450b985386b2f59516baa50c896c2659 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Fri, 15 Jul 2016 16:21:21 -0700 Subject: [PATCH] [SPARK-16669][SQL]Adding partition prunning to Metastore statistics for better join selection. Currently the metastore statistics returns the size of entire table which results in Join selection stretagy to not use broadcast joins even when only a single partition from a large table is selected.This PR addresses that issue by only estimating the size of the partition by applying partition pruning during size estimation. Currently it only works with partition columns used with equality checks under AND,OR,IN Operators. If a partition column is used in any other operator, it defaults back to total table size. We have also introduced a configuration to enable this optimization which will be off by default. Instead of trying to calculate the path we could make a metastore query to get all the valid paths but for simplicity we are just building the path in code. --- .../plans/logical/LocalRelation.scala | 2 +- .../catalyst/plans/logical/LogicalPlan.scala | 16 +- .../plans/logical/basicLogicalOperators.scala | 43 ++-- .../spark/sql/execution/ExistingRDD.scala | 9 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../execution/columnar/InMemoryRelation.scala | 4 +- .../datasources/LogicalRelation.scala | 5 +- .../apache/spark/sql/internal/SQLConf.scala | 15 ++ .../spark/sql/hive/MetastoreRelation.scala | 212 +++++++++++++++--- .../spark/sql/hive/StatisticsSuite.scala | 134 ++++++++++- 10 files changed, 383 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 9d64f35efcc6..9ea5b4cb3af7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -73,6 +73,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) } } - override lazy val statistics = + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6d7799151d93..68622b8dbfe0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -87,11 +87,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { * * [[LeafNode]]s must override this. */ - def statistics: Statistics = { + def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { +// def statistics(): Statistics = { if (children.isEmpty) { throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") } - Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product) + Statistics(sizeInBytes = children.map( + _.statistics(Option(this +: parents.getOrElse(Seq.empty[LogicalPlan]))).sizeInBytes).product) } /** @@ -306,20 +308,24 @@ abstract class UnaryNode extends LogicalPlan { override protected def validConstraints: Set[Expression] = child.constraints - override def statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { // There should be some overhead in Row object, the size should not be zero when there is // no columns, this help to prevent divide-by-zero error. val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8 val outputRowSize = output.map(_.dataType.defaultSize).sum + 8 + val childStats = child.statistics( + Option(this +: parents.getOrElse(Seq.empty[LogicalPlan])) + ) + val childNumRows = childStats.sizeInBytes / childRowSize // Assume there will be the same number of rows as child has. - var sizeInBytes = (child.statistics.sizeInBytes * outputRowSize) / childRowSize + var sizeInBytes = (childNumRows * outputRowSize) if (sizeInBytes == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). sizeInBytes = 1 } - child.statistics.copy(sizeInBytes = sizeInBytes) + childStats.copy(sizeInBytes = sizeInBytes) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2917d8d2a97a..0c990e608806 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -159,7 +159,7 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val leftSize = left.statistics.sizeInBytes val rightSize = right.statistics.sizeInBytes val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize @@ -184,7 +184,7 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && duplicateResolved - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { left.statistics.copy() } } @@ -224,7 +224,7 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan { children.length > 1 && childrenResolved && allChildrenCompatible } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val sizeInBytes = children.map(_.statistics.sizeInBytes).sum Statistics(sizeInBytes = sizeInBytes) } @@ -333,15 +333,16 @@ case class Join( case _ => resolvedExceptNatural } - override lazy val statistics: Statistics = joinType match { - case LeftAnti | LeftSemi => - // LeftSemi and LeftAnti won't ever be bigger than left - left.statistics.copy() - case _ => - // make sure we don't propagate isBroadcastable in other joins, because - // they could explode the size. - super.statistics.copy(isBroadcastable = false) - } + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + joinType match { + case LeftAnti | LeftSemi => + // LeftSemi and LeftAnti won't ever be bigger than left + left.statistics.copy() + case _ => + // make sure we don't propagate isBroadcastable in other joins, because + // they could explode the size. + super.statistics.copy(isBroadcastable = false) + } } /** @@ -351,7 +352,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output // set isBroadcastable to true so the child will be broadcasted - override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true) + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + super.statistics.copy(isBroadcastable = true) } case class InsertIntoTable( @@ -450,7 +452,7 @@ case class Range( override def newInstance(): Range = copy(output = output.map(_.newInstance())) - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val sizeInBytes = LongType.defaultSize * numElements Statistics( sizeInBytes = sizeInBytes ) } @@ -487,7 +489,7 @@ case class Aggregate( child.constraints.union(getAliasedConstraints(nonAgg)) } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { if (groupingExpressions.isEmpty) { super.statistics.copy(sizeInBytes = 1) } else { @@ -587,7 +589,7 @@ case class Expand( override def references: AttributeSet = AttributeSet(projections.flatten.flatMap(_.references)) - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val sizeInBytes = super.statistics.sizeInBytes * projections.length Statistics(sizeInBytes = sizeInBytes) } @@ -659,7 +661,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN case _ => None } } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero @@ -680,7 +682,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo case _ => None } } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero @@ -719,7 +721,7 @@ case class Sample( override def output: Seq[Attribute] = child.output - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val ratio = upperBound - lowerBound // BigInt can't multiply with Double var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100 @@ -766,5 +768,6 @@ case object OneRowRelation extends LeafNode { * * [[LeafNode]]s must override this. */ - override lazy val statistics: Statistics = Statistics(sizeInBytes = 1) + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + Statistics(sizeInBytes = 1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 6c4248c60e89..2c256c67ab4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -95,7 +95,11 @@ case class ExternalRDD[T]( override protected def stringArgs: Iterator[Any] = Iterator(output) - @transient override lazy val statistics: Statistics = Statistics( + override def producedAttributes: AttributeSet = outputSet + + @transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): + Statistics = + Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) @@ -147,7 +151,8 @@ case class LogicalRDD( override protected def stringArgs: Iterator[Any] = Iterator(output) - @transient override lazy val statistics: Statistics = Statistics( + @transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): + Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4aaf454285f4..f6a7b6b5f722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -114,8 +114,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ private def canBroadcast(plan: LogicalPlan): Boolean = { - plan.statistics.isBroadcastable || - plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold + val stats = plan.statistics + stats.isBroadcastable || stats.sizeInBytes <= conf.autoBroadcastJoinThreshold } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 479934a7afc7..68290039dfc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel import org.apache.spark.util.CollectionAccumulator @@ -73,7 +73,7 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { if (batchStats.value.isEmpty) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 90711f2b1dde..de06641af46b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -72,9 +72,8 @@ case class LogicalRelation( // expId can be different but the relation is still the same. override lazy val cleanArgs: Seq[Any] = Seq(relation) - @transient override lazy val statistics: Statistics = Statistics( - sizeInBytes = BigInt(relation.sizeInBytes) - ) + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + Statistics(sizeInBytes = BigInt(relation.sizeInBytes)) /** Used to lookup original attribute capitalization */ val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f2b1afd71adc..6ed57caac10b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -123,6 +123,19 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_PRUNED_PARTITION_STATS = + SQLConfigBuilder("spark.sql.statistics.prunedPartitionStats") + .doc("When enabled, spark-sql would try and calculate stats based on size of partitions" + + " specified in your filter condition. i.e. if you run a query like " + + "`select * from src s , dest d where s.partition = 'partition1' and s.key = d.key`" + + " instead of using the entire table's totalSize, rawSize stats, spark will consider only " + + " the size of partition 1 for table src. Currently the optimization is only available for" + + " equality check and won't be applied if partition column is specified in any other check" + + " or if partition column is used in operators other than And and OR. In those events stat" + + " calculation falls back to totalSize/rawSize.") + .booleanConf + .createWithDefault(false) + val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + @@ -629,6 +642,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def prunedPartitionStatsEnabled: Boolean = getConf(ENABLE_PRUNED_PARTITION_STATS) + def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 195fce835413..c2a2fec5f3ab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -22,7 +22,7 @@ import java.io.IOException import scala.collection.JavaConverters._ import com.google.common.base.Objects -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema @@ -32,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.types.StructField @@ -109,40 +109,202 @@ private[hive] case class MetastoreRelation( new HiveTable(tTable) } - @transient override lazy val statistics: Statistics = Statistics( + @transient def getSize(): Long = { + val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) + val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) + + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. + + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead + // if the size is still less than zero, we try to get the file size from HDFS. + // given this is only needed for optimization, if the HDFS call fails we return the default. + if (totalSize != null && totalSize.toLong > 0L) { + totalSize.toLong + } else if (rawDataSize != null && rawDataSize.toLong > 0) { + rawDataSize.toLong + } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + fs.getContentSummary(hiveQlTable.getPath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + } else { + sparkSession.sessionState.conf.defaultSizeInBytes + } + } + + @transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): + Statistics = Statistics( sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // if the size is still less than zero, we try to get the file size from HDFS. - // given this is only needed for optimization, if the HDFS call fails we return the default. - if (totalSize != null && totalSize.toLong > 0L) { - totalSize.toLong - } else if (rawDataSize != null && rawDataSize.toLong > 0) { - rawDataSize.toLong - } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + if (sparkSession.sessionState.conf.prunedPartitionStatsEnabled) { try { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength + // Get the immediate parent, if its not a filter we won't do the optimization. + // This is rigid for now but easy to traverse up the tree and find all filter blocks. + // but need to verify that will still lead to correct behavior. + val parentNodes = parents.getOrElse(Seq.empty[LogicalPlan]) + val filterNode: Option[Filter] = + if (parentNodes.isEmpty || !parentNodes.head.isInstanceOf[Filter]) { + None + } else { + Some(parentNodes.head.asInstanceOf[Filter]) + } + filterNode match { + case Some(filter) => + val partitionPaths = getPartitionPathFromFilter(filter) + if (partitionPaths.nonEmpty) { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + partitionPaths.foldLeft(0L)( + (sum, path) => sum + fs.getContentSummary(path).getLength + ) + } else { + getSize + } + case None => getSize + } } catch { case e: IOException => logWarning("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes + getSize } } else { - sparkSession.sessionState.conf.defaultSizeInBytes + getSize }) } ) + // Return list of paths for partitions that are valid for this operation, an empty list + // if no partition prunning is possible. + private def getPartitionPathFromFilter(filter: Filter): Seq[Path] = { + + // TODO: For now just considering equalTO and IN, We could add more operators later. + + // Following is the expected output for different cases: + // (p1 = b and p2 = c) or (p1 = d) should yield [p1=b/p2=c, p1=d] + // (p1 = b or p2 = c) should yield [p1=b, p2=c] + // (p1 = b or p2 = c) and (p3 = d) should yield [p1=b/p3=d, p2=c/p3=d] + // (p1 = b or p2 = c) or (p1 = d) should yield [p1=b, p2=c, p1=d] + // p1 IN (a,b,c) should yield [p1=a,p1=b,p1=c] + // If a column appears in any unsupported expression, we discard it from + // size estimate calculation + // TODO: We may still return incorrect size estimates due to operators that can add + // partition space and are higher up in parse tree so the columns never get blacklisted. + // i.e. if we have 'not (partition=1 or partition=2)' current code will provide estimate + // based on sizes of partition 1 and 2. + var blackListedPartitionColumns: Set[AttributeReference] = Set.empty[AttributeReference] + + def getPartitionInfo(expression: Expression): Seq[Map[AttributeReference, String]] = { + + def getPartitionTuple(attribute: AttributeReference, literal: Literal) = { + if (partitionKeys.contains(attribute)) { + Seq(Map(attribute -> literal.value.toString)) + } else { + Seq.empty[Map[AttributeReference, String]] + } + } + + expression match { + // Join adds notnull checks for all filter columns, + // to avoid blacklisting we handle it by returning empty map. + case e: IsNotNull => Seq.empty[Map[AttributeReference, String]] + + // If a partition column appears under Not expression, we black list it. + case e: Not => + getPartitionInfo(e.child).foldLeft(blackListedPartitionColumns)( + (s, m) => s ++ m.keySet + ) + Seq.empty[Map[AttributeReference, String]] + + case e: EqualTo => (e.left, e.right) match { + case (attr : AttributeReference, l: Literal) => getPartitionTuple(attr, l) + case (l: Literal, attr : AttributeReference) => getPartitionTuple(attr, l) + case _ => getPartitionInfo(e.left) ++ getPartitionInfo(e.right) + } + + // In: Only processes a partition in list of literal values. + case e: In => (e.value, e.list) + if (e.value.isInstanceOf[AttributeReference] && + e.list.filter(!_.isInstanceOf[Literal]).nonEmpty) { + e.list.map(literal => getPartitionTuple(e.value.asInstanceOf[AttributeReference], + literal.asInstanceOf[Literal])).flatten + } else { + getPartitionInfo(e.value) ++ e.list.flatMap(getPartitionInfo(_)) + } + + case e: And => + val right = getPartitionInfo(e.right) + val left = getPartitionInfo(e.left) + + if (left.isEmpty) { + right + } else if (right.isEmpty) { + left + } else { + left.map(lMap => right.map(rMap => if (lMap.keySet.intersect(rMap.keySet).isEmpty) { + lMap ++ rMap + } else { + Map.empty[AttributeReference, String] + })).flatten.filter(!_.isEmpty) + } + + case e: Or => getPartitionInfo(e.left) ++ getPartitionInfo(e.right) + + case e: AttributeReference => + // If any of our partition keys are part of a filter condition that + // we do not handle, we should ignore those columns from size estimation + // for correctness. + if (partitionKeys.contains(e)) { + blackListedPartitionColumns = blackListedPartitionColumns + e + } + Seq.empty[Map[AttributeReference, String]] + + case ex: Expression => + if (expression.children.nonEmpty) { + expression.children.foldLeft(Seq.empty[Map[AttributeReference, String]]) ( + (r, e) => r ++ getPartitionInfo(e) + ) + } else { + Seq.empty[Map[AttributeReference, String]] + } + } + } + + val partitionsWithEqualityCheck = getPartitionInfo(filter.condition).filter(!_.isEmpty) + + // Build partition Paths in the same order as the storage layer, as soon as first + // missing partition is found we have to stop. + partitionsWithEqualityCheck.map( + m => { + var missingPartitionCol = false + var blacklistedCol = false + var partitionPath = hiveQlTable.getPath + + for(partitionKey <- partitionKeys if !missingPartitionCol && !blacklistedCol) { + if (m.keySet.contains(partitionKey)) { + if (blackListedPartitionColumns.contains(partitionKey)) { + blacklistedCol = true + partitionPath = hiveQlTable.getPath + } else { + val path = partitionKey.name + "=" + m.get(partitionKey).get + partitionPath = new Path(partitionPath, path) + } + } else { + missingPartitionCol = true + } + } + partitionPath + }).filter(!_.equals(hiveQlTable.getPath)) + } + // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b275ab17a93c..80bd37767fc8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -18,16 +18,22 @@ package org.apache.spark.sql.hive import java.io.{File, PrintWriter} +import java.util.UUID import scala.reflect.ClassTag +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, GreaterThan, In, IsNotNull, Literal, Not, Or} +import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.DataTypes class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { @@ -106,7 +112,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0") assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") - val sizeInBytes = relation.statistics.sizeInBytes + val sizeInBytes = relation.statistics().sizeInBytes assert(sizeInBytes === BigInt(file1.length() + file2.length())) } } finally { @@ -115,6 +121,132 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("MetastoreRelations partition pruning enabled for stats estimation.") { + val enablePartitionPruning = spark.sessionState.conf.prunedPartitionStatsEnabled + val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled + try { + withTempDir { tempDir => + + def writeNRecords(n: Int): File = { + val file1 = new File(tempDir, UUID.randomUUID.toString) + val writer1 = new PrintWriter(file1) + for(i <- 1 to n) { + writer1.write("1,2") + } + writer1.close() + file1 + } + + val largeFile = writeNRecords(1000) + val smallFile = writeNRecords(10) + + spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true) + + sql( + s"""CREATE TABLE csv_table(page_id INT, impressions INT) + PARTITIONED BY (dateint INT, hour INT) + ROW FORMAT delimited fields terminated by ',' + """) + + sql( + s"""LOAD DATA LOCAL INPATH '${largeFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160717, hour=10) + """) + sql( + s"""LOAD DATA LOCAL INPATH '${largeFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160717, hour=11) + """) + sql( + s"""LOAD DATA LOCAL INPATH '${smallFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160718, hour=10) + """) + sql( + s"""LOAD DATA LOCAL INPATH '${smallFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160718, hour=11) + """) + + val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table")) + .asInstanceOf[MetastoreRelation] + + val hadoopConf = spark.sessionState.newHadoopConf + val fs: FileSystem = relation.hiveQlTable.getPath.getFileSystem(hadoopConf) + + val totalSize = relation.getSize + val lLen = fs.getContentSummary(new Path(relation.hiveQlTable.getPath, + "dateint=20160717")).getLength/2 + val sLen = fs.getContentSummary(new Path(relation.hiveQlTable.getPath, + "dateint=20160718")).getLength/2 + + def assertion(filter: Filter, partitionPrunedSize: Long, message: String): Unit = { + assert(relation.statistics(Some(Seq(filter))).sizeInBytes == BigInt(totalSize), message) + + spark.conf.set(SQLConf.ENABLE_PRUNED_PARTITION_STATS.key, true) + val sizeInBytes = relation.statistics(Some(Seq(filter))).sizeInBytes + assert(sizeInBytes == BigInt(partitionPrunedSize), message) + spark.conf.set(SQLConf.ENABLE_PRUNED_PARTITION_STATS.key, false) + } + + val dateInt = relation.partitionKeys.find(_.name.equals("dateint")).get + val hour = relation.partitionKeys.find(_.name.equals("hour")).get + val literalDate17 = Literal.create(20160717, DataTypes.IntegerType) + val literalDate18 = Literal.create(20160718, DataTypes.IntegerType) + val literalHour10 = Literal.create(10, DataTypes.IntegerType) + val literalHour11 = Literal.create(11, DataTypes.IntegerType) + + val equalToDate17 = EqualTo(dateInt, literalDate17) + val equalToDate18 = EqualTo(dateInt, literalDate18) + val equalToHour10 = EqualTo(hour, literalHour10) + val equalToHour11 = EqualTo(hour, literalHour11) + + val hour10Or11 = Or(equalToHour10, equalToHour11) + val date17AndHour10Or11 = And(equalToDate17, hour10Or11) + val date18AndHour11 = And(equalToDate18, equalToHour11) + val hour10orDate18AndHour11 = Or(equalToHour10, date18AndHour11) + + val notNull = IsNotNull(literalHour10) + val notNullAndDate18AndHour11 = And(notNull, date18AndHour11) + + val not = Not(hour10orDate18AndHour11) + val greaterThanHour10 = GreaterThan(hour, literalHour10) + val date18AndGreaterThanHour10 = And(equalToDate18, greaterThanHour10) + + val date18andHourIn1011 = And(equalToDate18, In(hour, List(literalHour10, literalHour11))) + + val testFilters = Seq( + // equality check + (new Filter(equalToDate17, null), lLen * 2, "equalToDate17"), + + // And and Or check + (new Filter(date17AndHour10Or11, null), lLen * 2, "date17AndHour10Or11"), + + // And and Or check, but or has only hour so it should be ignored. + (new Filter(hour10orDate18AndHour11, null), sLen, "hour10orDate18AndHour11"), + + // not Null should not blacklist partition column + (new Filter(notNullAndDate18AndHour11, null), sLen, "notNullAndDate18AndHour11"), + + // Not should blacklist partition col, falling back to total size. + (new Filter(not, null), totalSize, "not"), + + // unsupported operator should fall back to total size. + (new Filter(greaterThanHour10, null), totalSize, "greaterThanHour10"), + + // unsupported operator should only blacklist col used in that operator. + (new Filter(date18AndGreaterThanHour10, null), sLen * 2, "date18AndGreaterThanHour10"), + + // In Check with And + (new Filter(date18andHourIn1011, null), sLen * 2, "date18andHourIn1011") + ) + + testFilters.foreach(tuple => assertion(tuple._1, tuple._2, tuple._3)) + } + } finally { + spark.conf.set(SQLConf.ENABLE_PRUNED_PARTITION_STATS.key, enablePartitionPruning) + spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, enableFallBackToHdfsForStats) + sql("DROP TABLE csv_table ") + } + } + test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes