diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 9d64f35efcc6..9ea5b4cb3af7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -73,6 +73,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) } } - override lazy val statistics = + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6d7799151d93..68622b8dbfe0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -87,11 +87,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { * * [[LeafNode]]s must override this. */ - def statistics: Statistics = { + def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { +// def statistics(): Statistics = { if (children.isEmpty) { throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") } - Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product) + Statistics(sizeInBytes = children.map( + _.statistics(Option(this +: parents.getOrElse(Seq.empty[LogicalPlan]))).sizeInBytes).product) } /** @@ -306,20 +308,24 @@ abstract class UnaryNode extends LogicalPlan { override protected def validConstraints: Set[Expression] = child.constraints - override def statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { // There should be some overhead in Row object, the size should not be zero when there is // no columns, this help to prevent divide-by-zero error. val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8 val outputRowSize = output.map(_.dataType.defaultSize).sum + 8 + val childStats = child.statistics( + Option(this +: parents.getOrElse(Seq.empty[LogicalPlan])) + ) + val childNumRows = childStats.sizeInBytes / childRowSize // Assume there will be the same number of rows as child has. - var sizeInBytes = (child.statistics.sizeInBytes * outputRowSize) / childRowSize + var sizeInBytes = (childNumRows * outputRowSize) if (sizeInBytes == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero // (product of children). sizeInBytes = 1 } - child.statistics.copy(sizeInBytes = sizeInBytes) + childStats.copy(sizeInBytes = sizeInBytes) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2917d8d2a97a..0c990e608806 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -159,7 +159,7 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation } } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val leftSize = left.statistics.sizeInBytes val rightSize = right.statistics.sizeInBytes val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize @@ -184,7 +184,7 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && duplicateResolved - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { left.statistics.copy() } } @@ -224,7 +224,7 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan { children.length > 1 && childrenResolved && allChildrenCompatible } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val sizeInBytes = children.map(_.statistics.sizeInBytes).sum Statistics(sizeInBytes = sizeInBytes) } @@ -333,15 +333,16 @@ case class Join( case _ => resolvedExceptNatural } - override lazy val statistics: Statistics = joinType match { - case LeftAnti | LeftSemi => - // LeftSemi and LeftAnti won't ever be bigger than left - left.statistics.copy() - case _ => - // make sure we don't propagate isBroadcastable in other joins, because - // they could explode the size. - super.statistics.copy(isBroadcastable = false) - } + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + joinType match { + case LeftAnti | LeftSemi => + // LeftSemi and LeftAnti won't ever be bigger than left + left.statistics.copy() + case _ => + // make sure we don't propagate isBroadcastable in other joins, because + // they could explode the size. + super.statistics.copy(isBroadcastable = false) + } } /** @@ -351,7 +352,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output // set isBroadcastable to true so the child will be broadcasted - override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true) + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + super.statistics.copy(isBroadcastable = true) } case class InsertIntoTable( @@ -450,7 +452,7 @@ case class Range( override def newInstance(): Range = copy(output = output.map(_.newInstance())) - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val sizeInBytes = LongType.defaultSize * numElements Statistics( sizeInBytes = sizeInBytes ) } @@ -487,7 +489,7 @@ case class Aggregate( child.constraints.union(getAliasedConstraints(nonAgg)) } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { if (groupingExpressions.isEmpty) { super.statistics.copy(sizeInBytes = 1) } else { @@ -587,7 +589,7 @@ case class Expand( override def references: AttributeSet = AttributeSet(projections.flatten.flatMap(_.references)) - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val sizeInBytes = super.statistics.sizeInBytes * projections.length Statistics(sizeInBytes = sizeInBytes) } @@ -659,7 +661,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN case _ => None } } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero @@ -680,7 +682,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo case _ => None } } - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] val sizeInBytes = if (limit == 0) { // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero @@ -719,7 +721,7 @@ case class Sample( override def output: Seq[Attribute] = child.output - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { val ratio = upperBound - lowerBound // BigInt can't multiply with Double var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100 @@ -766,5 +768,6 @@ case object OneRowRelation extends LeafNode { * * [[LeafNode]]s must override this. */ - override lazy val statistics: Statistics = Statistics(sizeInBytes = 1) + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + Statistics(sizeInBytes = 1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 6c4248c60e89..2c256c67ab4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -95,7 +95,11 @@ case class ExternalRDD[T]( override protected def stringArgs: Iterator[Any] = Iterator(output) - @transient override lazy val statistics: Statistics = Statistics( + override def producedAttributes: AttributeSet = outputSet + + @transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): + Statistics = + Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) @@ -147,7 +151,8 @@ case class LogicalRDD( override protected def stringArgs: Iterator[Any] = Iterator(output) - @transient override lazy val statistics: Statistics = Statistics( + @transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): + Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4aaf454285f4..f6a7b6b5f722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -114,8 +114,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ private def canBroadcast(plan: LogicalPlan): Boolean = { - plan.statistics.isBroadcastable || - plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold + val stats = plan.statistics + stats.isBroadcastable || stats.sizeInBytes <= conf.autoBroadcastJoinThreshold } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 479934a7afc7..68290039dfc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel import org.apache.spark.util.CollectionAccumulator @@ -73,7 +73,7 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) - override lazy val statistics: Statistics = { + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = { if (batchStats.value.isEmpty) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 90711f2b1dde..de06641af46b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -72,9 +72,8 @@ case class LogicalRelation( // expId can be different but the relation is still the same. override lazy val cleanArgs: Seq[Any] = Seq(relation) - @transient override lazy val statistics: Statistics = Statistics( - sizeInBytes = BigInt(relation.sizeInBytes) - ) + override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = + Statistics(sizeInBytes = BigInt(relation.sizeInBytes)) /** Used to lookup original attribute capitalization */ val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f2b1afd71adc..6ed57caac10b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -123,6 +123,19 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_PRUNED_PARTITION_STATS = + SQLConfigBuilder("spark.sql.statistics.prunedPartitionStats") + .doc("When enabled, spark-sql would try and calculate stats based on size of partitions" + + " specified in your filter condition. i.e. if you run a query like " + + "`select * from src s , dest d where s.partition = 'partition1' and s.key = d.key`" + + " instead of using the entire table's totalSize, rawSize stats, spark will consider only " + + " the size of partition 1 for table src. Currently the optimization is only available for" + + " equality check and won't be applied if partition column is specified in any other check" + + " or if partition column is used in operators other than And and OR. In those events stat" + + " calculation falls back to totalSize/rawSize.") + .booleanConf + .createWithDefault(false) + val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + @@ -629,6 +642,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def prunedPartitionStatsEnabled: Boolean = getConf(ENABLE_PRUNED_PARTITION_STATS) + def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 195fce835413..c2a2fec5f3ab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -22,7 +22,7 @@ import java.io.IOException import scala.collection.JavaConverters._ import com.google.common.base.Objects -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema @@ -32,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.types.StructField @@ -109,40 +109,202 @@ private[hive] case class MetastoreRelation( new HiveTable(tTable) } - @transient override lazy val statistics: Statistics = Statistics( + @transient def getSize(): Long = { + val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) + val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) + + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. + + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead + // if the size is still less than zero, we try to get the file size from HDFS. + // given this is only needed for optimization, if the HDFS call fails we return the default. + if (totalSize != null && totalSize.toLong > 0L) { + totalSize.toLong + } else if (rawDataSize != null && rawDataSize.toLong > 0) { + rawDataSize.toLong + } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + fs.getContentSummary(hiveQlTable.getPath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + } else { + sparkSession.sessionState.conf.defaultSizeInBytes + } + } + + @transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): + Statistics = Statistics( sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // if the size is still less than zero, we try to get the file size from HDFS. - // given this is only needed for optimization, if the HDFS call fails we return the default. - if (totalSize != null && totalSize.toLong > 0L) { - totalSize.toLong - } else if (rawDataSize != null && rawDataSize.toLong > 0) { - rawDataSize.toLong - } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + if (sparkSession.sessionState.conf.prunedPartitionStatsEnabled) { try { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength + // Get the immediate parent, if its not a filter we won't do the optimization. + // This is rigid for now but easy to traverse up the tree and find all filter blocks. + // but need to verify that will still lead to correct behavior. + val parentNodes = parents.getOrElse(Seq.empty[LogicalPlan]) + val filterNode: Option[Filter] = + if (parentNodes.isEmpty || !parentNodes.head.isInstanceOf[Filter]) { + None + } else { + Some(parentNodes.head.asInstanceOf[Filter]) + } + filterNode match { + case Some(filter) => + val partitionPaths = getPartitionPathFromFilter(filter) + if (partitionPaths.nonEmpty) { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + partitionPaths.foldLeft(0L)( + (sum, path) => sum + fs.getContentSummary(path).getLength + ) + } else { + getSize + } + case None => getSize + } } catch { case e: IOException => logWarning("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes + getSize } } else { - sparkSession.sessionState.conf.defaultSizeInBytes + getSize }) } ) + // Return list of paths for partitions that are valid for this operation, an empty list + // if no partition prunning is possible. + private def getPartitionPathFromFilter(filter: Filter): Seq[Path] = { + + // TODO: For now just considering equalTO and IN, We could add more operators later. + + // Following is the expected output for different cases: + // (p1 = b and p2 = c) or (p1 = d) should yield [p1=b/p2=c, p1=d] + // (p1 = b or p2 = c) should yield [p1=b, p2=c] + // (p1 = b or p2 = c) and (p3 = d) should yield [p1=b/p3=d, p2=c/p3=d] + // (p1 = b or p2 = c) or (p1 = d) should yield [p1=b, p2=c, p1=d] + // p1 IN (a,b,c) should yield [p1=a,p1=b,p1=c] + // If a column appears in any unsupported expression, we discard it from + // size estimate calculation + // TODO: We may still return incorrect size estimates due to operators that can add + // partition space and are higher up in parse tree so the columns never get blacklisted. + // i.e. if we have 'not (partition=1 or partition=2)' current code will provide estimate + // based on sizes of partition 1 and 2. + var blackListedPartitionColumns: Set[AttributeReference] = Set.empty[AttributeReference] + + def getPartitionInfo(expression: Expression): Seq[Map[AttributeReference, String]] = { + + def getPartitionTuple(attribute: AttributeReference, literal: Literal) = { + if (partitionKeys.contains(attribute)) { + Seq(Map(attribute -> literal.value.toString)) + } else { + Seq.empty[Map[AttributeReference, String]] + } + } + + expression match { + // Join adds notnull checks for all filter columns, + // to avoid blacklisting we handle it by returning empty map. + case e: IsNotNull => Seq.empty[Map[AttributeReference, String]] + + // If a partition column appears under Not expression, we black list it. + case e: Not => + getPartitionInfo(e.child).foldLeft(blackListedPartitionColumns)( + (s, m) => s ++ m.keySet + ) + Seq.empty[Map[AttributeReference, String]] + + case e: EqualTo => (e.left, e.right) match { + case (attr : AttributeReference, l: Literal) => getPartitionTuple(attr, l) + case (l: Literal, attr : AttributeReference) => getPartitionTuple(attr, l) + case _ => getPartitionInfo(e.left) ++ getPartitionInfo(e.right) + } + + // In: Only processes a partition in list of literal values. + case e: In => (e.value, e.list) + if (e.value.isInstanceOf[AttributeReference] && + e.list.filter(!_.isInstanceOf[Literal]).nonEmpty) { + e.list.map(literal => getPartitionTuple(e.value.asInstanceOf[AttributeReference], + literal.asInstanceOf[Literal])).flatten + } else { + getPartitionInfo(e.value) ++ e.list.flatMap(getPartitionInfo(_)) + } + + case e: And => + val right = getPartitionInfo(e.right) + val left = getPartitionInfo(e.left) + + if (left.isEmpty) { + right + } else if (right.isEmpty) { + left + } else { + left.map(lMap => right.map(rMap => if (lMap.keySet.intersect(rMap.keySet).isEmpty) { + lMap ++ rMap + } else { + Map.empty[AttributeReference, String] + })).flatten.filter(!_.isEmpty) + } + + case e: Or => getPartitionInfo(e.left) ++ getPartitionInfo(e.right) + + case e: AttributeReference => + // If any of our partition keys are part of a filter condition that + // we do not handle, we should ignore those columns from size estimation + // for correctness. + if (partitionKeys.contains(e)) { + blackListedPartitionColumns = blackListedPartitionColumns + e + } + Seq.empty[Map[AttributeReference, String]] + + case ex: Expression => + if (expression.children.nonEmpty) { + expression.children.foldLeft(Seq.empty[Map[AttributeReference, String]]) ( + (r, e) => r ++ getPartitionInfo(e) + ) + } else { + Seq.empty[Map[AttributeReference, String]] + } + } + } + + val partitionsWithEqualityCheck = getPartitionInfo(filter.condition).filter(!_.isEmpty) + + // Build partition Paths in the same order as the storage layer, as soon as first + // missing partition is found we have to stop. + partitionsWithEqualityCheck.map( + m => { + var missingPartitionCol = false + var blacklistedCol = false + var partitionPath = hiveQlTable.getPath + + for(partitionKey <- partitionKeys if !missingPartitionCol && !blacklistedCol) { + if (m.keySet.contains(partitionKey)) { + if (blackListedPartitionColumns.contains(partitionKey)) { + blacklistedCol = true + partitionPath = hiveQlTable.getPath + } else { + val path = partitionKey.name + "=" + m.get(partitionKey).get + partitionPath = new Path(partitionPath, path) + } + } else { + missingPartitionCol = true + } + } + partitionPath + }).filter(!_.equals(hiveQlTable.getPath)) + } + // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b275ab17a93c..80bd37767fc8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -18,16 +18,22 @@ package org.apache.spark.sql.hive import java.io.{File, PrintWriter} +import java.util.UUID import scala.reflect.ClassTag +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, GreaterThan, In, IsNotNull, Literal, Not, Or} +import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.DataTypes class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { @@ -106,7 +112,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0") assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") - val sizeInBytes = relation.statistics.sizeInBytes + val sizeInBytes = relation.statistics().sizeInBytes assert(sizeInBytes === BigInt(file1.length() + file2.length())) } } finally { @@ -115,6 +121,132 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("MetastoreRelations partition pruning enabled for stats estimation.") { + val enablePartitionPruning = spark.sessionState.conf.prunedPartitionStatsEnabled + val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled + try { + withTempDir { tempDir => + + def writeNRecords(n: Int): File = { + val file1 = new File(tempDir, UUID.randomUUID.toString) + val writer1 = new PrintWriter(file1) + for(i <- 1 to n) { + writer1.write("1,2") + } + writer1.close() + file1 + } + + val largeFile = writeNRecords(1000) + val smallFile = writeNRecords(10) + + spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true) + + sql( + s"""CREATE TABLE csv_table(page_id INT, impressions INT) + PARTITIONED BY (dateint INT, hour INT) + ROW FORMAT delimited fields terminated by ',' + """) + + sql( + s"""LOAD DATA LOCAL INPATH '${largeFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160717, hour=10) + """) + sql( + s"""LOAD DATA LOCAL INPATH '${largeFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160717, hour=11) + """) + sql( + s"""LOAD DATA LOCAL INPATH '${smallFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160718, hour=10) + """) + sql( + s"""LOAD DATA LOCAL INPATH '${smallFile.getAbsolutePath}' INTO TABLE csv_table partition + (dateint=20160718, hour=11) + """) + + val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table")) + .asInstanceOf[MetastoreRelation] + + val hadoopConf = spark.sessionState.newHadoopConf + val fs: FileSystem = relation.hiveQlTable.getPath.getFileSystem(hadoopConf) + + val totalSize = relation.getSize + val lLen = fs.getContentSummary(new Path(relation.hiveQlTable.getPath, + "dateint=20160717")).getLength/2 + val sLen = fs.getContentSummary(new Path(relation.hiveQlTable.getPath, + "dateint=20160718")).getLength/2 + + def assertion(filter: Filter, partitionPrunedSize: Long, message: String): Unit = { + assert(relation.statistics(Some(Seq(filter))).sizeInBytes == BigInt(totalSize), message) + + spark.conf.set(SQLConf.ENABLE_PRUNED_PARTITION_STATS.key, true) + val sizeInBytes = relation.statistics(Some(Seq(filter))).sizeInBytes + assert(sizeInBytes == BigInt(partitionPrunedSize), message) + spark.conf.set(SQLConf.ENABLE_PRUNED_PARTITION_STATS.key, false) + } + + val dateInt = relation.partitionKeys.find(_.name.equals("dateint")).get + val hour = relation.partitionKeys.find(_.name.equals("hour")).get + val literalDate17 = Literal.create(20160717, DataTypes.IntegerType) + val literalDate18 = Literal.create(20160718, DataTypes.IntegerType) + val literalHour10 = Literal.create(10, DataTypes.IntegerType) + val literalHour11 = Literal.create(11, DataTypes.IntegerType) + + val equalToDate17 = EqualTo(dateInt, literalDate17) + val equalToDate18 = EqualTo(dateInt, literalDate18) + val equalToHour10 = EqualTo(hour, literalHour10) + val equalToHour11 = EqualTo(hour, literalHour11) + + val hour10Or11 = Or(equalToHour10, equalToHour11) + val date17AndHour10Or11 = And(equalToDate17, hour10Or11) + val date18AndHour11 = And(equalToDate18, equalToHour11) + val hour10orDate18AndHour11 = Or(equalToHour10, date18AndHour11) + + val notNull = IsNotNull(literalHour10) + val notNullAndDate18AndHour11 = And(notNull, date18AndHour11) + + val not = Not(hour10orDate18AndHour11) + val greaterThanHour10 = GreaterThan(hour, literalHour10) + val date18AndGreaterThanHour10 = And(equalToDate18, greaterThanHour10) + + val date18andHourIn1011 = And(equalToDate18, In(hour, List(literalHour10, literalHour11))) + + val testFilters = Seq( + // equality check + (new Filter(equalToDate17, null), lLen * 2, "equalToDate17"), + + // And and Or check + (new Filter(date17AndHour10Or11, null), lLen * 2, "date17AndHour10Or11"), + + // And and Or check, but or has only hour so it should be ignored. + (new Filter(hour10orDate18AndHour11, null), sLen, "hour10orDate18AndHour11"), + + // not Null should not blacklist partition column + (new Filter(notNullAndDate18AndHour11, null), sLen, "notNullAndDate18AndHour11"), + + // Not should blacklist partition col, falling back to total size. + (new Filter(not, null), totalSize, "not"), + + // unsupported operator should fall back to total size. + (new Filter(greaterThanHour10, null), totalSize, "greaterThanHour10"), + + // unsupported operator should only blacklist col used in that operator. + (new Filter(date18AndGreaterThanHour10, null), sLen * 2, "date18AndGreaterThanHour10"), + + // In Check with And + (new Filter(date18andHourIn1011, null), sLen * 2, "date18andHourIn1011") + ) + + testFilters.foreach(tuple => assertion(tuple._1, tuple._2, tuple._3)) + } + } finally { + spark.conf.set(SQLConf.ENABLE_PRUNED_PARTITION_STATS.key, enablePartitionPruning) + spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, enableFallBackToHdfsForStats) + sql("DROP TABLE csv_table ") + } + } + test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes