Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Expression, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
Expand Down Expand Up @@ -610,7 +610,10 @@ case class HiveTableRelation(
tableMeta: CatalogTable,
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference],
tableStats: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {
tableStats: Option[Statistics] = None,
@transient normalizedFilters: Seq[Expression] = Nil,
@transient prunedPartitions: Seq[CatalogTablePartition] = Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we distinguish 0 partitions after pruning, and not being partition pruned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have another field called normalizedFilters, when it's empty(Nil), then the prunedPartitions are not pruned, otherwise it could be 0 partitions after pruning when prunedPartitions = Nil

extends LeafNode with MultiInstanceRelation {
assert(tableMeta.identifier.database.isDefined)
assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
Expand All @@ -630,7 +633,9 @@ case class HiveTableRelation(
},
partitionCols = partitionCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
}
},
normalizedFilters = Nil,
prunedPartitions = Nil
)

override def computeStats(): Statistics = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
queryRelations.add(alias.identifier)
case LogicalRelation(_, _, Some(catalogTable), _) =>
queryRelations.add(catalogTable.identifier.table)
case HiveTableRelation(tableMeta, _, _, _) =>
case HiveTableRelation(tableMeta, _, _, _, _, _) =>
queryRelations.add(tableMeta.identifier.table)
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import org.apache.spark.annotation.Unstable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.{SparkOptimizer, SparkPlanner}
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck
Expand Down Expand Up @@ -93,6 +94,20 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
customCheckRules
}

/**
* Logical query plan optimizer that takes into account Hive.
*/
override protected def optimizer: Optimizer = {
new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
override def postHocOptimizationBatches: Seq[Batch] = Seq(
Batch("Prune Hive Table Partitions", Once, PruneHiveTablePartitions(session))
)

override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
}
}

/**
* Planner that takes into account Hive-specific strategies.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ import java.io.IOException
import java.util.Locale

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.StatsSetupConst

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -231,6 +232,68 @@ case class RelationConversions(
}
}

/**
* TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source.
*/
case class PruneHiveTablePartitions(
session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we follow PruneFileSourcePartitions? I think we should also support Filter(Project(HiveScan))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

val predicates = splitConjunctivePredicates(condition)
val normalizedFilters = predicates.map { e =>
e transform {
case a: AttributeReference =>
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
}
}
val partitionSet = AttributeSet(relation.partitionCols)
// SPARK-24085: remove scalar subquery in partition expression then get normalized predicates
val pruningPredicates = normalizedFilters
.filterNot(SubqueryExpression.hasSubquery)
.filter { predicate =>
!predicate.references.isEmpty && predicate.references.subsetOf(partitionSet)
}
val conf = session.sessionState.conf
if (conf.metastorePartitionPruning && pruningPredicates.nonEmpty) {
val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter(
relation.tableMeta.database,
relation.tableMeta.identifier.table,
pruningPredicates,
conf.sessionLocalTimeZone)
val sizeInBytes = try {
val sizeOfPartitions = prunedPartitions.map { part =>
val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
if (rawDataSize.isDefined && rawDataSize.get > 0) {
rawDataSize.get
} else if (totalSize.isDefined && totalSize.get > 0L) {
totalSize.get
} else if (conf.fallBackToHdfsForStatsEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the doc of the conf "spark.sql.statistics.fallBackToHdfs", it is only for non-partitioned hive table :
"This flag is effective only for non-partitioned Hive tables."

CommandUtils.calculateLocationSize(
session.sessionState, relation.tableMeta.identifier, part.storage.locationUri)
} else { // we cannot get any size statics here. Use 0 as the default size to sum up.
0L
}
}.sum
// If size of partitions is zero fall back to the default size.
if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions
} catch {
case e: IOException =>
logWarning("Failed to get table size from HDFS.", e)
conf.defaultSizeInBytes
}
val withStats = relation.tableMeta.copy(
stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
val prunedHiveTableRelation = relation.copy(tableMeta = withStats,
normalizedFilters = pruningPredicates, prunedPartitions = prunedPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to keep pruningPredicates? IIUC the approach should be very simply:

  1. this rule only changes HiveTableRelation to hold an optional partition list.
  2. the HiveTableScanExec will get the partition list from HiveTableRelation or call listPartitionsByFilter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to SPARK-24085, the pruningPredicates(we eliminate the subquery) could be different than the filters passed to HiveTableScan. So I keep the pruningPredicates, and only retrieves the prunedPartitions when HiveTableScanExec's pruningPartitionPredict matches exactly with HiveTableRelation's normalizedFilters.

The simplified solution occurred to me first, then I thought the filters could be different for some reason, and SPARK-24085 is an example, hence the proposed solution here.

val filterExpression = predicates.reduceLeft(And)
Filter(filterExpression, prunedHiveTableRelation)
} else {
filter
}
}
}

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,20 @@ case class HiveTableScanExec(
@transient lazy val rawPartitions = {
val prunedPartitions =
if (sparkSession.sessionState.conf.metastorePartitionPruning &&
partitionPruningPred.size > 0) {
partitionPruningPred.nonEmpty) {
// Retrieve the original attributes based on expression ID so that capitalization matches.
val normalizedFilters = partitionPruningPred.map(_.transform {
case a: AttributeReference => originalAttributes(a)
})
sparkSession.sessionState.catalog.listPartitionsByFilter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @maropu @advancedxy
Since the rawPartitions are called by "prunePartitions(rawPartitions)" in doExecute method, it seems prunePartitions will filter out all irrelevant partitions using "boundPruningPred". Then why we still need to call listpartitionsByFilter here ?
Could you please help me understand this ? thanks a lot in advance.

relation.tableMeta.identifier,
normalizedFilters)
val isFiltersEqual = normalizedFilters.zip(relation.normalizedFilters)
.forall { case (e1, e2) => e1.semanticEquals(e2) }
if (isFiltersEqual) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we doing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only under exactly matched pruning filters, we can simply get partitions from HiveTableRelation

relation.prunedPartitions
} else {
sparkSession.sessionState.catalog.listPartitionsByFilter(
relation.tableMeta.identifier,
normalizedFilters)
}
} else {
sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1514,4 +1514,35 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}
}

test("Broadcast join can by inferred if partitioned table can be pruned under threshold") {
withTempView("tempTbl", "largeTbl") {
withTable("partTbl") {
spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2")
.createOrReplaceTempView("tempTbl")
spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2")
.createOrReplaceTempView("largeTbl")
sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " +
"PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile")
for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) {
sql(
s"""
|INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2')
|select col1, col2 from tempTbl
""".stripMargin)
}
val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " +
"and partTbl.part1 = 'a' and partTbl.part2 = 1)"
Seq(true, false).foreach { partitionPruning =>
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001",
SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> s"$partitionPruning") {
val broadcastJoins =
sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(broadcastJoins.nonEmpty === partitionPruning)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,23 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
// If the pruning predicate is used, getHiveQlPartitions should only return the
// qualified partition; Otherwise, it return all the partitions.
val expectedNumPartitions = if (hivePruning == "true") 1 else 2
checkNumScannedPartitions(
stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'", expectedNumPartitions)
val stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'"
checkNumScannedPartitions(stmt = stmt, expectedNumPartitions)
// prunedPartitions are held in HiveTableRelation
val prunedNumPartitions = if (hivePruning == "true") 1 else 0
assert(
getHiveTableScanExec(stmt).relation.prunedPartitions.size === prunedNumPartitions)
}
}

Seq("true", "false").foreach { hivePruning =>
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> hivePruning) {
// If the pruning predicate does not exist, getHiveQlPartitions should always
// return all the partitions.
checkNumScannedPartitions(
stmt = s"SELECT id, p2 FROM $table WHERE id <= 3", expectedNumParts = 2)
val stmt = s"SELECT id, p2 FROM $table WHERE id <= 3"
checkNumScannedPartitions(stmt = stmt, expectedNumParts = 2)
// no pruning is triggered, no partitions are held in HiveTableRelation
assert(getHiveTableScanExec(stmt).relation.prunedPartitions.isEmpty)
}
}
}
Expand Down