From 0570a1a1c62df6de97f6e89dc30d8f80e04cfeae Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 28 May 2016 10:04:22 +0800 Subject: [PATCH 01/19] init commit --- .../apache/spark/sql/internal/SQLConf.scala | 8 +++ .../apache/spark/sql/hive/HiveOptimizer.scala | 59 ++++++++++++++++++ .../spark/sql/hive/HiveSessionState.scala | 8 +++ .../spark/sql/hive/MetastoreRelation.scala | 62 ++++++++++++++++--- 4 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f3064eb6ac6d..6b2361af77b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -251,6 +251,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HIVE_PARTITION_PRUNER_FOR_STATS = SQLConfigBuilder("spark.sql.hive.partitionPrunerForStats") + .doc("When true, some predicates will be pushed down into MetastoreRelation so that " + + "determining if partitions that are involved are small enough to use auto broadcast joins.") + .booleanConf + .createWithDefault(false) + val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView") .internal() .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " + @@ -577,6 +583,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + def hivePartitionPrunerForStats: Boolean = getConf(HIVE_PARTITION_PRUNER_FOR_STATS) + def nativeView: Boolean = getConf(NATIVE_VIEW) def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala new file mode 100644 index 000000000000..a5b94a07c7a3 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{ExperimentalMethods, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, PredicateHelper} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf + +class HiveOptimizer ( + sparkSession: SparkSession, + catalog: HiveSessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends Optimizer(catalog, conf) { + + override def batches: Seq[Batch] = super.batches :+ + Batch("Partition Pruner", Once, PartitionPruner(conf)) :+ + Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) +} + +case class PartitionPruner(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!conf.hivePartitionPrunerForStats) { + return plan + } + plan.transform { + case filter@Filter(condition, relation: MetastoreRelation) + if relation.partitionKeys.nonEmpty && condition.deterministic => + val partitionKeyIds = AttributeSet(relation.partitionKeys) + val predicates = splitConjunctivePredicates(condition) + val pruningPredicates = predicates.filter { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionKeyIds) + } + relation.partitionPruningPred = pruningPredicates + filter + //filter.withNewChildren(relation.newInstance(pruningPredicates) :: Nil) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 46579ecd85ca..5a9bbac78fdd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient @@ -74,6 +75,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) } } + /** + * Logical query plan optimizer that takes into account Hive. + */ + override lazy val optimizer: Optimizer = + new HiveOptimizer(sparkSession, catalog, conf, experimentalMethods) + + /** * Planner that takes into account Hive-specific strategies. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 1671228fd9b4..a0c5a8dc08a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -29,17 +29,19 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient - +import org.apache.spark.sql.types.{DataType, BooleanType} private[hive] case class MetastoreRelation( databaseName: String, tableName: String, - alias: Option[String]) + alias: Option[String], + var partitionPruningPred: Seq[Expression] = Seq.empty[Expression]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, @transient private val sparkSession: SparkSession) @@ -50,7 +52,9 @@ private[hive] case class MetastoreRelation( databaseName == relation.databaseName && tableName == relation.tableName && alias == relation.alias && - output == relation.output + output == relation.output && + partitionPruningPred.size == relation.partitionPruningPred.size && + (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } @@ -124,7 +128,7 @@ private[hive] case class MetastoreRelation( // if the size is still less than zero, we use default size Option(totalSize).map(_.toLong).filter(_ > 0) .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes))) + .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes))) } ) @@ -166,11 +170,45 @@ private[hive] case class MetastoreRelation( } } + private[this] def castFromString(value: String, dataType: DataType) = { + Cast(Literal(value), dataType).eval(null) + } + + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private[hive] def prunePartitions(partitions: Seq[Partition]) = { + val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + BindReferences.bindReference(pred, partitionKeys) + } + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = partitionKeys.map(_.dataType) + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => castFromString(value, dataType) } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { plan match { case mr: MetastoreRelation => - mr.databaseName == databaseName && mr.tableName == tableName + mr.databaseName == databaseName && mr.tableName == tableName && + partitionPruningPred.size == mr.partitionPruningPred.size && + (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } } @@ -211,6 +249,10 @@ private[hive] case class MetastoreRelation( /** An attribute map for determining the ordinal for non-partition columns. */ val columnOrdinals = AttributeMap(attributes.zipWithIndex) + lazy val partitions: Seq[Partition] = { + prunePartitions(getHiveQlPartitions(partitionPruningPred)) + } + override def inputFiles: Array[String] = { val partLocations = client .getPartitionsByFilter(catalogTable, Nil) @@ -226,6 +268,12 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sparkSession) + MetastoreRelation(databaseName, tableName, alias, partitionPruningPred)( + catalogTable, client, sparkSession) + } + + def newInstance(pruningPredicates: Seq[Expression]): MetastoreRelation = { + MetastoreRelation(databaseName, tableName, alias, pruningPredicates)( + catalogTable, client, sparkSession) } } From 77737f13063f9d8cdd924e16f7f543bd7fd34598 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 28 May 2016 13:38:51 +0800 Subject: [PATCH 02/19] update --- .../apache/spark/sql/hive/HiveOptimizer.scala | 3 +- .../spark/sql/hive/MetastoreRelation.scala | 59 ++++++++++++++++--- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index a5b94a07c7a3..50a4e647d985 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -52,8 +52,7 @@ case class PartitionPruner(conf: SQLConf) extends Rule[LogicalPlan] with Predica predicate.references.subsetOf(partitionKeyIds) } relation.partitionPruningPred = pruningPredicates - filter - //filter.withNewChildren(relation.newInstance(pruningPredicates) :: Nil) + filter.withNewChildren(relation :: Nil) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 9c820144aee1..b39e5b93cc5d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -32,17 +32,19 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient - +import org.apache.spark.sql.types.{DataType, BooleanType} private[hive] case class MetastoreRelation( databaseName: String, tableName: String, - alias: Option[String]) + alias: Option[String], + var partitionPruningPred: Seq[Expression] = Seq.empty[Expression]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, @transient private val sparkSession: SparkSession) @@ -53,7 +55,9 @@ private[hive] case class MetastoreRelation( databaseName == relation.databaseName && tableName == relation.tableName && alias == relation.alias && - output == relation.output + output == relation.output && + partitionPruningPred.size == relation.partitionPruningPred.size && + (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } @@ -132,8 +136,16 @@ private[hive] case class MetastoreRelation( } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength + if (partitionPruningPred.isEmpty) { + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + fs.getContentSummary(hiveQlTable.getPath).getLength + } else { + val partitions = prunePartitions(getHiveQlPartitions(partitionPruningPred)) + partitions.map { partition => + val fs: FileSystem = partition.getDataLocation.getFileSystem(hadoopConf) + fs.getContentSummary(partition.getDataLocation).getLength + }.sum + } } catch { case e: IOException => logWarning("Failed to get table size from hdfs.", e) @@ -183,11 +195,41 @@ private[hive] case class MetastoreRelation( } } + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private[hive] def prunePartitions(partitions: Seq[Partition]) = { + val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + BindReferences.bindReference(pred, partitionKeys) + } + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = partitionKeys.map(_.dataType) + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => Cast(Literal(value), dataType).eval(null) } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { plan match { case mr: MetastoreRelation => - mr.databaseName == databaseName && mr.tableName == tableName + mr.databaseName == databaseName && mr.tableName == tableName && + partitionPruningPred.size == mr.partitionPruningPred.size && + (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } } @@ -243,6 +285,7 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sparkSession) + MetastoreRelation(databaseName, tableName, alias, partitionPruningPred)( + catalogTable, client, sparkSession) } } From ca78723d5418aa754d468e461c97592725bc97ed Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 28 May 2016 15:28:33 +0800 Subject: [PATCH 03/19] fix style --- .../scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index b39e5b93cc5d..696e4f6ef19d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.types.{DataType, BooleanType} +import org.apache.spark.sql.types.BooleanType private[hive] case class MetastoreRelation( databaseName: String, From dd6bdf05b1156b6e1471ceadc817c3f8a54270b2 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 6 Jun 2016 10:16:36 +0800 Subject: [PATCH 04/19] update --- .../apache/spark/sql/hive/HiveOptimizer.scala | 45 ++++++++++++++----- .../spark/sql/hive/HiveStrategies.scala | 20 +++------ .../spark/sql/hive/MetastoreRelation.scala | 16 ++++--- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index 50a4e647d985..f83265c34414 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{ExperimentalMethods, SparkSession} -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, PredicateHelper} +import org.apache.spark.sql.{catalyst, ExperimentalMethods, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -32,27 +32,48 @@ class HiveOptimizer ( extends Optimizer(catalog, conf) { override def batches: Seq[Batch] = super.batches :+ - Batch("Partition Pruner", Once, PartitionPruner(conf)) :+ - Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + Batch("Push filter into relation", Once, PushFilterIntoRelation(conf)) :+ + Batch("Push Project into relation", Once, PushProjectIntoRelation(conf)) :+ + Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) } -case class PartitionPruner(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { +case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { override def apply(plan: LogicalPlan): LogicalPlan = { - if (!conf.hivePartitionPrunerForStats) { - return plan - } plan.transform { case filter@Filter(condition, relation: MetastoreRelation) if relation.partitionKeys.nonEmpty && condition.deterministic => val partitionKeyIds = AttributeSet(relation.partitionKeys) val predicates = splitConjunctivePredicates(condition) - val pruningPredicates = predicates.filter { predicate => + val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) } - relation.partitionPruningPred = pruningPredicates - filter.withNewChildren(relation :: Nil) + val filterCondition: Option[Expression] = + otherPredicates.reduceLeftOption(catalyst.expressions.And) + if (pruningPredicates.nonEmpty) { + relation.partitionPruningPred = pruningPredicates + } + + filterCondition.map(Filter(_, relation)).getOrElse(relation) + } + } +} + +case class PushProjectIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transform { + case p @Project(projectList, relation: MetastoreRelation) => + val projectSet = AttributeSet(projectList.flatMap(_.references)) + relation.requiredAttributes = projectSet.toSeq + relation + + case p @Project(projectList, filter@Filter(condition, relation: MetastoreRelation)) => + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val filterSet = AttributeSet(condition.flatMap(_.references)) + relation.requiredAttributes = (projectSet ++ filterSet).toSeq + filter } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 71b180e55b58..259d0dc2b862 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -60,20 +60,14 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => - // Filter out all predicates that only deal with partition keys, these are given to the - // hive table scan operator to be used for partition pruning. - val partitionKeyIds = AttributeSet(relation.partitionKeys) - val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && - predicate.references.subsetOf(partitionKeyIds) + case relation: MetastoreRelation => + val requiredAttributes = if (relation.requiredAttributes.isEmpty) { + relation.output + } else { + relation.requiredAttributes } - - pruneFilterProject( - projectList, - otherPredicates, - identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil + HiveTableScanExec(requiredAttributes, relation, relation.partitionPruningPred)( + sparkSession) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 696e4f6ef19d..0f3215a59521 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -44,7 +44,8 @@ private[hive] case class MetastoreRelation( databaseName: String, tableName: String, alias: Option[String], - var partitionPruningPred: Seq[Expression] = Seq.empty[Expression]) + var partitionPruningPred: Seq[Expression] = Seq.empty[Expression], + var requiredAttributes: Seq[Attribute] = Seq.empty[Attribute]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, @transient private val sparkSession: SparkSession) @@ -57,7 +58,9 @@ private[hive] case class MetastoreRelation( alias == relation.alias && output == relation.output && partitionPruningPred.size == relation.partitionPruningPred.size && - (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) + (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) && + requiredAttributes.size == relation.requiredAttributes.size && + (requiredAttributes, relation.requiredAttributes).zipped.forall(_ semanticEquals _) case _ => false } @@ -136,7 +139,8 @@ private[hive] case class MetastoreRelation( } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { val hadoopConf = sparkSession.sessionState.newHadoopConf() - if (partitionPruningPred.isEmpty) { + if (partitionPruningPred.isEmpty || + !sparkSession.sessionState.conf.hivePartitionPrunerForStats) { val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) fs.getContentSummary(hiveQlTable.getPath).getLength } else { @@ -229,7 +233,9 @@ private[hive] case class MetastoreRelation( case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName && partitionPruningPred.size == mr.partitionPruningPred.size && - (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) + (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) && + requiredAttributes.size == relation.requiredAttributes.size && + (requiredAttributes, relation.requiredAttributes).zipped.forall(_ semanticEquals _) case _ => false } } @@ -285,7 +291,7 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias, partitionPruningPred)( + MetastoreRelation(databaseName, tableName, alias, partitionPruningPred, requiredAttributes)( catalogTable, client, sparkSession) } } From 8b9b07d8ced030563c2485fa3ac271cb69aa4ed0 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 6 Jun 2016 10:29:23 +0800 Subject: [PATCH 05/19] fix errors --- .../scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 0f3215a59521..7cbb3ecc3510 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -234,8 +234,8 @@ private[hive] case class MetastoreRelation( mr.databaseName == databaseName && mr.tableName == tableName && partitionPruningPred.size == mr.partitionPruningPred.size && (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) && - requiredAttributes.size == relation.requiredAttributes.size && - (requiredAttributes, relation.requiredAttributes).zipped.forall(_ semanticEquals _) + requiredAttributes.size == mr.requiredAttributes.size && + (requiredAttributes, mr.requiredAttributes).zipped.forall(_ semanticEquals _) case _ => false } } From ab86f14939eda76c6940fd19609d8b058a042976 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 6 Jun 2016 21:26:05 +0800 Subject: [PATCH 06/19] fix unit test --- .../org/apache/spark/sql/hive/HiveOptimizer.scala | 13 +++++++++++-- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 -- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index f83265c34414..7adaa564a463 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -67,13 +67,22 @@ case class PushProjectIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] { case p @Project(projectList, relation: MetastoreRelation) => val projectSet = AttributeSet(projectList.flatMap(_.references)) relation.requiredAttributes = projectSet.toSeq - relation + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet) { + relation + } else { + Project(projectList, relation) + } case p @Project(projectList, filter@Filter(condition, relation: MetastoreRelation)) => val projectSet = AttributeSet(projectList.flatMap(_.references)) val filterSet = AttributeSet(condition.flatMap(_.references)) relation.requiredAttributes = (projectSet ++ filterSet).toSeq - filter + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && + filterSet.subsetOf(projectSet)) { + filter + } else { + Project(projectList, filter) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 259d0dc2b862..51af6ce7d65b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ From fc25f7269adb554ba2e01a92c19eb2ff3bf1ee93 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 7 Jun 2016 10:16:10 +0800 Subject: [PATCH 07/19] update --- .../org/apache/spark/sql/hive/MetastoreRelation.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 7cbb3ecc3510..9ea9be8ee678 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -58,9 +58,7 @@ private[hive] case class MetastoreRelation( alias == relation.alias && output == relation.output && partitionPruningPred.size == relation.partitionPruningPred.size && - (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) && - requiredAttributes.size == relation.requiredAttributes.size && - (requiredAttributes, relation.requiredAttributes).zipped.forall(_ semanticEquals _) + (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } @@ -233,9 +231,7 @@ private[hive] case class MetastoreRelation( case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName && partitionPruningPred.size == mr.partitionPruningPred.size && - (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) && - requiredAttributes.size == mr.requiredAttributes.size && - (requiredAttributes, mr.requiredAttributes).zipped.forall(_ semanticEquals _) + (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } } From c4f9bc6c10d9f20e454dd8be1f95100ca7826ea4 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 7 Jun 2016 14:12:36 +0800 Subject: [PATCH 08/19] fix unit test --- .../apache/spark/sql/hive/HiveOptimizer.scala | 32 ++----------------- .../spark/sql/hive/HiveStrategies.scala | 29 +++++++++++++---- .../spark/sql/hive/MetastoreRelation.scala | 5 ++- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index 7adaa564a463..645a726839f3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{catalyst, ExperimentalMethods, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -33,8 +33,7 @@ class HiveOptimizer ( override def batches: Seq[Batch] = super.batches :+ Batch("Push filter into relation", Once, PushFilterIntoRelation(conf)) :+ - Batch("Push Project into relation", Once, PushProjectIntoRelation(conf)) :+ - Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) } case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { @@ -59,30 +58,3 @@ case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with } } } - -case class PushProjectIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] { - - override def apply(plan: LogicalPlan): LogicalPlan = { - plan.transform { - case p @Project(projectList, relation: MetastoreRelation) => - val projectSet = AttributeSet(projectList.flatMap(_.references)) - relation.requiredAttributes = projectSet.toSeq - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet) { - relation - } else { - Project(projectList, relation) - } - - case p @Project(projectList, filter@Filter(condition, relation: MetastoreRelation)) => - val projectSet = AttributeSet(projectList.flatMap(_.references)) - val filterSet = AttributeSet(condition.flatMap(_.references)) - relation.requiredAttributes = (projectSet ++ filterSet).toSeq - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && - filterSet.subsetOf(projectSet)) { - filter - } else { - Project(projectList, filter) - } - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 51af6ce7d65b..6f0fada75eb2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ @@ -58,13 +59,29 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case relation: MetastoreRelation => - val requiredAttributes = if (relation.requiredAttributes.isEmpty) { - relation.output + case p @Project(projectList, relation: MetastoreRelation) => + val projectSet = AttributeSet(projectList.flatMap(_.references)) + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet) { + HiveTableScanExec(projectList.asInstanceOf[Seq[Attribute]], relation, + relation.partitionPruningPred)(sparkSession) :: Nil + } else { + ProjectExec(projectList, HiveTableScanExec(projectSet.toSeq, relation, + relation.partitionPruningPred)(sparkSession)) :: Nil + } + case p @Project(projectList, filter@Filter(condition, relation: MetastoreRelation)) => + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val filterSet = AttributeSet(condition.flatMap(_.references)) + if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && + filterSet.subsetOf(projectSet)) { + FilterExec(condition, HiveTableScanExec(projectList.asInstanceOf[Seq[Attribute]], + relation, relation.partitionPruningPred)(sparkSession)) :: Nil } else { - relation.requiredAttributes + val scan = HiveTableScanExec((projectSet ++ filterSet).toSeq, relation, + relation.partitionPruningPred)(sparkSession) + ProjectExec(projectList, FilterExec(condition, scan)) :: Nil } - HiveTableScanExec(requiredAttributes, relation, relation.partitionPruningPred)( + case relation: MetastoreRelation => + HiveTableScanExec(relation.output, relation, relation.partitionPruningPred)( sparkSession) :: Nil case _ => Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 9ea9be8ee678..b6a94569c8e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -44,8 +44,7 @@ private[hive] case class MetastoreRelation( databaseName: String, tableName: String, alias: Option[String], - var partitionPruningPred: Seq[Expression] = Seq.empty[Expression], - var requiredAttributes: Seq[Attribute] = Seq.empty[Attribute]) + var partitionPruningPred: Seq[Expression] = Seq.empty[Expression]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, @transient private val sparkSession: SparkSession) @@ -287,7 +286,7 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias, partitionPruningPred, requiredAttributes)( + MetastoreRelation(databaseName, tableName, alias, partitionPruningPred)( catalogTable, client, sparkSession) } } From 4a3e72e6321fcbc449eb2fad3c903bd790cea2cb Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 22 Jul 2016 11:16:28 +0800 Subject: [PATCH 09/19] update --- .../spark/sql/hive/HiveStrategies.scala | 45 +++++++------------ 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6f0fada75eb2..59e4278353fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ @@ -53,36 +54,22 @@ private[hive] trait HiveStrategies { } } - /** - * Retrieves data using a HiveTableScan. Partition pruning predicates are also detected and - * applied. - */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case p @Project(projectList, relation: MetastoreRelation) => - val projectSet = AttributeSet(projectList.flatMap(_.references)) - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet) { - HiveTableScanExec(projectList.asInstanceOf[Seq[Attribute]], relation, - relation.partitionPruningPred)(sparkSession) :: Nil - } else { - ProjectExec(projectList, HiveTableScanExec(projectSet.toSeq, relation, - relation.partitionPruningPred)(sparkSession)) :: Nil + case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => + // Filter out all predicates that only deal with partition keys, these are given to the + // hive table scan operator to be used for partition pruning. + val partitionKeyIds = AttributeSet(relation.partitionKeys) + val (pruningPredicates, otherPredicates) = predicates.partition { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionKeyIds) } - case p @Project(projectList, filter@Filter(condition, relation: MetastoreRelation)) => - val projectSet = AttributeSet(projectList.flatMap(_.references)) - val filterSet = AttributeSet(condition.flatMap(_.references)) - if (AttributeSet(projectList.map(_.toAttribute)) == projectSet && - filterSet.subsetOf(projectSet)) { - FilterExec(condition, HiveTableScanExec(projectList.asInstanceOf[Seq[Attribute]], - relation, relation.partitionPruningPred)(sparkSession)) :: Nil - } else { - val scan = HiveTableScanExec((projectSet ++ filterSet).toSeq, relation, - relation.partitionPruningPred)(sparkSession) - ProjectExec(projectList, FilterExec(condition, scan)) :: Nil - } - case relation: MetastoreRelation => - HiveTableScanExec(relation.output, relation, relation.partitionPruningPred)( - sparkSession) :: Nil + + pruneFilterProject( + projectList, + otherPredicates, + identity[Seq[Expression]], + HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil case _ => Nil } From 1999eb41df7dd1cffd076d881143001f03d44fac Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 22 Jul 2016 11:19:42 +0800 Subject: [PATCH 10/19] update HiveStrategies --- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 59e4278353fd..71b180e55b58 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -54,6 +54,10 @@ private[hive] trait HiveStrategies { } } + /** + * Retrieves data using a HiveTableScan. Partition pruning predicates are also detected and + * applied. + */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => @@ -62,7 +66,7 @@ private[hive] trait HiveStrategies { val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && - predicate.references.subsetOf(partitionKeyIds) + predicate.references.subsetOf(partitionKeyIds) } pruneFilterProject( From 2d9e32142748ebb92d27f23609607d6395b512b1 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 22 Jul 2016 11:50:33 +0800 Subject: [PATCH 11/19] update with master --- .../apache/spark/sql/internal/SQLConf.scala | 58 ++++++++----------- .../apache/spark/sql/hive/HiveOptimizer.scala | 9 ++- .../spark/sql/hive/MetastoreRelation.scala | 12 ++-- 3 files changed, 34 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2bf151d109b3..f0df35b27b52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -114,7 +114,7 @@ object SQLConf { .createWithDefault(10L * 1024 * 1024) val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = - SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats") + SQLConfigBuilder("spark.sql.statistics.fallBackToHdfs") .doc("If the table statistics are not available from table metadata enable fall back to hdfs." + " This is useful in determining if a table is small enough to use auto broadcast joins.") .booleanConf @@ -258,31 +258,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly") + .doc("When true, enable the metadata-only query optimization that use the table's metadata " + + "to produce the partition columns instead of table scans. It applies when all the columns " + + "scanned are partition columns and the query has an aggregate operator that satisfies " + + "distinct semantics.") + val HIVE_PARTITION_PRUNER_FOR_STATS = SQLConfigBuilder("spark.sql.hive.partitionPrunerForStats") .doc("When true, some predicates will be pushed down into MetastoreRelation so that " + "determining if partitions that are involved are small enough to use auto broadcast joins.") .booleanConf .createWithDefault(false) - val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView") - .internal() - .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " + - "Note that this function is experimental and should ony be used when you are using " + - "non-hive-compatible tables written by Spark SQL. The SQL string used to create " + - "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " + - "possible, or you may get wrong result.") - .booleanConf - .createWithDefault(true) - - val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical") - .internal() - .doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " + - "CREATE VIEW statement using SQL query string generated from view definition logical " + - "plan. If the logical plan doesn't have a SQL representation, we fallback to the " + - "original native view implementation.") - .booleanConf - .createWithDefault(true) - val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord") .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.") .stringConf @@ -434,12 +421,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields") + val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback") .internal() - .doc("The maximum number of fields (including nested fields) that will be supported before" + - " deactivating whole-stage codegen.") - .intConf - .createWithDefault(200) + .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + + " fail to compile generated code") + .booleanConf + .createWithDefault(true) val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches") .internal() @@ -483,14 +470,14 @@ object SQLConf { .createWithDefault(2) val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation") - .doc("The default location for storing checkpoint data for continuously executing queries.") + .doc("The default location for storing checkpoint data for streaming queries.") .stringConf .createOptional val UNSUPPORTED_OPERATION_CHECK_ENABLED = SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck") .internal() - .doc("When true, the logical plan for continuous query will be checked for unsupported" + + .doc("When true, the logical plan for streaming query will be checked for unsupported" + " operations.") .booleanConf .createWithDefault(true) @@ -533,7 +520,7 @@ object SQLConf { val FILE_SINK_LOG_CLEANUP_DELAY = SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") .internal() - .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") + .doc("How long that a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(60 * 1000L) // 10 minutes @@ -544,6 +531,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val STREAMING_POLLING_DELAY = + SQLConfigBuilder("spark.sql.streaming.pollingDelay") + .internal() + .doc("How long to delay polling new data when no data is available") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(10L) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -605,20 +599,18 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) - def hivePartitionPrunerForStats: Boolean = getConf(HIVE_PARTITION_PRUNER_FOR_STATS) + def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) - def nativeView: Boolean = getConf(NATIVE_VIEW) + def hivePartitionPrunerForStats: Boolean = getConf(HIVE_PARTITION_PRUNER_FOR_STATS) def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) - def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) - def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW) - def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) def subexpressionEliminationEnabled: Boolean = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index 645a726839f3..49a9a5ad304a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{catalyst, ExperimentalMethods, SparkSession} +import org.apache.spark.sql.execution.SparkOptimizer import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper} -import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -29,18 +29,17 @@ class HiveOptimizer ( catalog: HiveSessionCatalog, conf: SQLConf, experimentalMethods: ExperimentalMethods) - extends Optimizer(catalog, conf) { + extends SparkOptimizer(catalog, conf, experimentalMethods) { override def batches: Seq[Batch] = super.batches :+ - Batch("Push filter into relation", Once, PushFilterIntoRelation(conf)) :+ - Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + Batch("Push filter into relation", Once, PushFilterIntoRelation(conf)) } case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { override def apply(plan: LogicalPlan): LogicalPlan = { plan.transform { - case filter@Filter(condition, relation: MetastoreRelation) + case filter @ Filter(condition, relation: MetastoreRelation) if relation.partitionKeys.nonEmpty && condition.deterministic => val partitionKeyIds = AttributeSet(relation.partitionKeys) val predicates = splitConjunctivePredicates(condition) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index b6a94569c8e9..c6531851aeb0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -43,7 +43,6 @@ import org.apache.spark.sql.types.BooleanType private[hive] case class MetastoreRelation( databaseName: String, tableName: String, - alias: Option[String], var partitionPruningPred: Seq[Expression] = Seq.empty[Expression]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, @@ -54,7 +53,6 @@ private[hive] case class MetastoreRelation( case relation: MetastoreRelation => databaseName == relation.databaseName && tableName == relation.tableName && - alias == relation.alias && output == relation.output && partitionPruningPred.size == relation.partitionPruningPred.size && (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) @@ -62,7 +60,7 @@ private[hive] case class MetastoreRelation( } override def hashCode(): Int = { - Objects.hashCode(databaseName, tableName, alias, output) + Objects.hashCode(databaseName, tableName, output) } override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil @@ -173,7 +171,7 @@ private[hive] case class MetastoreRelation( val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) - tPartition.setValues(p.spec.values.toList.asJava) + tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) @@ -226,7 +224,7 @@ private[hive] case class MetastoreRelation( /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { - plan match { + plan.canonicalized match { case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName && partitionPruningPred.size == mr.partitionPruningPred.size && @@ -251,7 +249,7 @@ private[hive] case class MetastoreRelation( CatalystSqlParser.parseDataType(f.dataType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true - )(qualifier = Some(alias.getOrElse(tableName))) + )(qualifier = Some(tableName)) } /** PartitionKey attributes */ @@ -286,7 +284,7 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias, partitionPruningPred)( + MetastoreRelation(databaseName, tableName, partitionPruningPred)( catalogTable, client, sparkSession) } } From 1e0a6f296a73822b250ab4d4e9a35d4bec129765 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 22 Jul 2016 11:57:15 +0800 Subject: [PATCH 12/19] update name --- .../apache/spark/sql/internal/SQLConf.scala | 23 ++++++++++++++----- .../spark/sql/hive/MetastoreRelation.scala | 2 +- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f0df35b27b52..ebd3742007f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -120,6 +120,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_PARTITION_PRUNER_FOR_STATS = SQLConfigBuilder("spark.sql.statistics.partitionPruner") + .doc("When true, some predicates will be pushed down into MetastoreRelation so that " + + "determining if partitions that are involved are small enough to use auto broadcast joins.") + .booleanConf + .createWithDefault(false) + val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() .doc("The default table size used in query planning. By default, it is set to a larger " + @@ -263,10 +269,6 @@ object SQLConf { "to produce the partition columns instead of table scans. It applies when all the columns " + "scanned are partition columns and the query has an aggregate operator that satisfies " + "distinct semantics.") - - val HIVE_PARTITION_PRUNER_FOR_STATS = SQLConfigBuilder("spark.sql.hive.partitionPrunerForStats") - .doc("When true, some predicates will be pushed down into MetastoreRelation so that " + - "determining if partitions that are involved are small enough to use auto broadcast joins.") .booleanConf .createWithDefault(false) @@ -421,6 +423,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields") + .internal() + .doc("The maximum number of fields (including nested fields) that will be supported before" + + " deactivating whole-stage codegen.") + .intConf + .createWithDefault(100) + val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback") .internal() .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + @@ -601,10 +610,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) - def hivePartitionPrunerForStats: Boolean = getConf(HIVE_PARTITION_PRUNER_FOR_STATS) - def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) + def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) @@ -620,6 +629,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def partitionPrunerForStatsEnabled: Boolean = getConf(ENABLE_PARTITION_PRUNER_FOR_STATS) + def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index c6531851aeb0..b5c544cfb1bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -135,7 +135,7 @@ private[hive] case class MetastoreRelation( try { val hadoopConf = sparkSession.sessionState.newHadoopConf() if (partitionPruningPred.isEmpty || - !sparkSession.sessionState.conf.hivePartitionPrunerForStats) { + !sparkSession.sessionState.conf.partitionPrunerForStatsEnabled) { val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) fs.getContentSummary(hiveQlTable.getPath).getLength } else { From b5739193ecb4c50aacc9d450f37e7be0d49939c7 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 22 Jul 2016 12:52:29 +0800 Subject: [PATCH 13/19] add ut --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../apache/spark/sql/hive/HiveOptimizer.scala | 17 ++++----- .../spark/sql/hive/StatisticsSuite.scala | 36 +++++++++++++++++++ 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ebd3742007f6..c533818ba6c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -270,7 +270,7 @@ object SQLConf { "scanned are partition columns and the query has an aggregate operator that satisfies " + "distinct semantics.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord") .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index 49a9a5ad304a..e0852f23b745 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{catalyst, ExperimentalMethods, SparkSession} -import org.apache.spark.sql.execution.SparkOptimizer -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper} +import org.apache.spark.sql.{ExperimentalMethods, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, PredicateHelper} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkOptimizer import org.apache.spark.sql.internal.SQLConf class HiveOptimizer ( @@ -38,22 +38,23 @@ class HiveOptimizer ( case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { override def apply(plan: LogicalPlan): LogicalPlan = { + if (!conf.partitionPrunerForStatsEnabled) { + return plan + } + plan.transform { case filter @ Filter(condition, relation: MetastoreRelation) if relation.partitionKeys.nonEmpty && condition.deterministic => val partitionKeyIds = AttributeSet(relation.partitionKeys) val predicates = splitConjunctivePredicates(condition) - val (pruningPredicates, otherPredicates) = predicates.partition { predicate => + val pruningPredicates = predicates.filter { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) } - val filterCondition: Option[Expression] = - otherPredicates.reduceLeftOption(catalyst.expressions.And) if (pruningPredicates.nonEmpty) { relation.partitionPruningPred = pruningPredicates } - - filterCondition.map(Filter(_, relation)).getOrElse(relation) + filter } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b275ab17a93c..a66eb670462f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -115,6 +115,42 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("MetastoreRelations fallback to hdfs of scanned partitions for size estimation") { + withTempTable("tempTbl", "largeTbl", "partTbl") { + spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2") + .createOrReplaceTempView("tempTbl") + spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2"). + createOrReplaceTempView("largeTbl") + sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " + + "PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile") + for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) { + sql( + s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2') + |select col1, col2 from tempTbl + """.stripMargin) + } + val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " + + "and partTbl.part1 = 'a' and partTbl.part2 = 1)" + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", + SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") { + val df = sql(query) + val broadcastJoins = + df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.nonEmpty) + } + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", + SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") { + val df = sql(query) + val broadcastJoins = + df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.isEmpty) + } + } + } + test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes From bf74b0e7c76eb2462183e76c3e3c4f8405ff82f1 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 22 Jul 2016 13:04:16 +0800 Subject: [PATCH 14/19] refactor ut --- .../spark/sql/hive/StatisticsSuite.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a66eb670462f..4a4f71f374ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -133,20 +133,19 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " + "and partTbl.part1 = 'a' and partTbl.part2 = 1)" withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", - SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") { - val df = sql(query) - val broadcastJoins = - df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } - assert(broadcastJoins.nonEmpty) - } - withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", - SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "false", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") { - val df = sql(query) - val broadcastJoins = - df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } - assert(broadcastJoins.isEmpty) + + withSQLConf(SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "true") { + val broadcastJoins = + sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.nonEmpty) + } + + withSQLConf(SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "false") { + val broadcastJoins = + sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.isEmpty) + } } } } From c7b181e8d588d07c9da86f7985c700da01b4d290 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 11 Aug 2016 22:50:17 +0800 Subject: [PATCH 15/19] update --- .../main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala | 5 +++-- .../scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index e0852f23b745..755e086bce67 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -52,9 +52,10 @@ case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with predicate.references.subsetOf(partitionKeyIds) } if (pruningPredicates.nonEmpty) { - relation.partitionPruningPred = pruningPredicates + filter.withNewChildren(Seq(relation.copy(partitionPruningPred = pruningPredicates)())) + } else { + filter } - filter } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index b5c544cfb1bd..b11b1205fc15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.types.BooleanType private[hive] case class MetastoreRelation( databaseName: String, tableName: String, - var partitionPruningPred: Seq[Expression] = Seq.empty[Expression]) + partitionPruningPred: Seq[Expression] = Seq.empty) (val catalogTable: CatalogTable, @transient private val client: HiveClient, @transient private val sparkSession: SparkSession) From 7d5371a7e11f0b9372941b4a025e5aaa71ce35b4 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 11 Aug 2016 23:12:34 +0800 Subject: [PATCH 16/19] update --- .../main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala | 3 ++- .../scala/org/apache/spark/sql/hive/MetastoreRelation.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index 755e086bce67..25bf458e82b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -52,7 +52,8 @@ case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with predicate.references.subsetOf(partitionKeyIds) } if (pruningPredicates.nonEmpty) { - filter.withNewChildren(Seq(relation.copy(partitionPruningPred = pruningPredicates)())) + filter.withNewChildren(Seq(relation.copy(partitionPruningPred = pruningPredicates)( + relation.catalogTable, relation.client, relation.sparkSession))) } else { filter } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index b11b1205fc15..ad362cca032b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -45,8 +45,8 @@ private[hive] case class MetastoreRelation( tableName: String, partitionPruningPred: Seq[Expression] = Seq.empty) (val catalogTable: CatalogTable, - @transient private val client: HiveClient, - @transient private val sparkSession: SparkSession) + @transient val client: HiveClient, + @transient val sparkSession: SparkSession) extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { override def equals(other: Any): Boolean = other match { From 5d909742883c30a38a297c42fd486198dacdaa16 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 29 Oct 2016 23:37:26 +0800 Subject: [PATCH 17/19] fix style --- .../org/apache/spark/sql/hive/MetastoreRelation.scala | 9 +++++---- .../org/apache/spark/sql/hive/StatisticsSuite.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 85c59332b2c7..92db07782025 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -30,19 +30,20 @@ import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.BooleanType - +import org.apache.spark.sql.types.StructField private[hive] case class MetastoreRelation( databaseName: String, - tableName: String) + tableName: String, + partitionPruningPred: Seq[Expression] = Seq.empty) (val catalogTable: CatalogTable, @transient val client: HiveClient, @transient val sparkSession: SparkSession) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 600b070b882d..b4433f2e4425 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -119,7 +119,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } test("MetastoreRelations fallback to hdfs of scanned partitions for size estimation") { - withTempTable("tempTbl", "largeTbl", "partTbl") { + withTempView("tempTbl", "largeTbl", "partTbl") { spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2") .createOrReplaceTempView("tempTbl") spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2"). From 1a63649b1008777b62dfb4a21ca82075f8aa9ddc Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 30 Oct 2016 01:35:39 +0800 Subject: [PATCH 18/19] fix some bugs --- .../scala/org/apache/spark/sql/hive/HiveOptimizer.scala | 4 ++-- .../org/apache/spark/sql/hive/MetastoreRelation.scala | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index 25bf458e82b1..f6207c18f678 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -52,8 +52,8 @@ case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with predicate.references.subsetOf(partitionKeyIds) } if (pruningPredicates.nonEmpty) { - filter.withNewChildren(Seq(relation.copy(partitionPruningPred = pruningPredicates)( - relation.catalogTable, relation.client, relation.sparkSession))) + relation.partitionPruningPred = pruningPredicates + filter.withNewChildren(Seq(relation)) } else { filter } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 92db07782025..feb340172cc4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -36,16 +36,14 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.types.StructField private[hive] case class MetastoreRelation( databaseName: String, tableName: String, - partitionPruningPred: Seq[Expression] = Seq.empty) + var partitionPruningPred: Seq[Expression] = Seq.empty) (val catalogTable: CatalogTable, - @transient val client: HiveClient, @transient val sparkSession: SparkSession) extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { @@ -295,7 +293,6 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, partitionPruningPred)( - catalogTable, client, sparkSession) + MetastoreRelation(databaseName, tableName, partitionPruningPred)(catalogTable, sparkSession) } } From a6e2c57019f93963ca5b4ba7607d68884ebd7c27 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 30 Oct 2016 11:45:06 +0800 Subject: [PATCH 19/19] fix some uts --- .../main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala | 4 +--- .../org/apache/spark/sql/hive/MetastoreRelationSuite.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala index f6207c18f678..e0852f23b745 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -53,10 +53,8 @@ case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with } if (pruningPredicates.nonEmpty) { relation.partitionPruningPred = pruningPredicates - filter.withNewChildren(Seq(relation)) - } else { - filter } + filter } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala index 91ff711445e8..70d497b666f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala @@ -34,7 +34,7 @@ class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSi val relation = MetastoreRelation("db", "test")(table, null) // No exception should be thrown - relation.makeCopy(Array("db", "test")) + relation.makeCopy(Array("db", "test", Seq.empty)) // No exception should be thrown relation.toJSON }