Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.Locale

import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand All @@ -36,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Expand Down Expand Up @@ -588,8 +587,15 @@ case class AlterTableRecoverPartitionsCommand(
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)

val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

Expand All @@ -610,8 +616,6 @@ case class AlterTableRecoverPartitionsCommand(
Seq.empty[Row]
}

@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))

private def scanPartitions(
spark: SparkSession,
fs: FileSystem,
Expand All @@ -620,7 +624,8 @@ case class AlterTableRecoverPartitionsCommand(
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
resolver: Resolver,
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
Expand All @@ -644,7 +649,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Expand Down