Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ statement
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
Expand Down Expand Up @@ -121,6 +122,7 @@ statement
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE tableIdentifier #repairTable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: outline

| op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
Expand Down Expand Up @@ -154,7 +156,6 @@ unsupportedHiveNativeCommands
| kw1=UNLOCK kw2=DATABASE
| kw1=CREATE kw2=TEMPORARY kw3=MACRO
| kw1=DROP kw2=TEMPORARY kw3=MACRO
| kw1=MSCK kw2=REPAIR kw3=TABLE
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
| kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
Expand Down Expand Up @@ -652,7 +653,7 @@ nonReserved
| CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
| DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
| ASC | DESC | LIMIT | RENAME | SETS
| AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE
Expand Down Expand Up @@ -865,6 +866,7 @@ LOCK: 'LOCK';
UNLOCK: 'UNLOCK';
MSCK: 'MSCK';
REPAIR: 'REPAIR';
RECOVER: 'RECOVER';
EXPORT: 'EXPORT';
IMPORT: 'IMPORT';
LOAD: 'LOAD';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create a [[RepairTableCommand]] command.
*
* For example:
* {{{
* MSCK REPAIR TABLE tablename
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are AlterTableRecoverPartitionsCommand and RepairTableCommand the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Hive support MSCK REPAIR TABLE, EMR support ALTER TABLE RECOVER PARTITIONS

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. How about we use a single command internally?

}

/**
* Convert a table property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
Expand Down Expand Up @@ -778,6 +790,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.PURGE != null)
}

/**
* Create an [[AlterTableDiscoverPartitionsCommand]] command
*
* For example:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Update the syntax and the comments here.

* ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
* }}}
*
* ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
* is associated with physical tables
*/
override def visitRecoverPartitions(
ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
}

/**
* Create an [[AlterTableSetLocationCommand]] command
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

package org.apache.spark.sql.execution.command

import java.io.File

import scala.collection.GenSeq
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -425,6 +431,96 @@ case class AlterTableDropPartitionCommand(

}

/**
* Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and
Copy link
Contributor

@hvanhovell hvanhovell Aug 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discover or recover? Lets update the doc/class name accordingly.

* update the catalog.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DISCOVER PARTITIONS;
* }}}
*/
case class AlterTableRecoverPartitionsCommand(
tableName: TableIdentifier) extends RunnableCommand {
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(
s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.")
}
val table = catalog.getTableMetadata(tableName)
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName")
}
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on datasource tables: $tableName")
}
if (table.tableType != CatalogTableType.EXTERNAL) {
throw new AnalysisException(
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " +
s"tables: $tableName")
}
if (DDLUtils.isTablePartitioned(table)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the same issue.

throw new AnalysisException(
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " +
s"tables: $tableName")
}
if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on tables with " +
s"location provided: $tableName")
}

recoverPartitions(spark, table)
Seq.empty[Row]
}

def recoverPartitions(spark: SparkSession, table: CatalogTable): Unit = {
val root = new Path(table.storage.locationUri.get)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
val partitionSpecsAndLocs = scanPartitions(spark, fs, root, Map(), table.partitionSchema.size)
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
}
spark.sessionState.catalog.createPartitions(tableName,
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we get thousands of new partitions of tens thousands of new partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, see the implementation in HiveShim:

  // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
  override def createPartitions(
      hive: Hive,
      database: String,
      tableName: String,
      parts: Seq[CatalogTablePartition],
      ignoreIfExists: Boolean): Unit = {
    val table = hive.getTable(database, tableName)
    parts.foreach { s =>
      val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
      val spec = s.spec.asJava
      if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
        // Ignore this partition since it already exists and ignoreIfExists == true
      } else {
        if (location == null && table.isView()) {
          throw new HiveException("LOCATION clause illegal for view partition");
        }

        createPartitionMethod.invoke(
          hive,
          table,
          spec,
          location,
          null, // partParams
          null, // inputFormat
          null, // outputFormat
          -1: JInteger, // numBuckets
          null, // cols
          null, // serializationLib
          null, // serdeParams
          null, // bucketCols
          null) // sortCols
      }
    }
  }

All these partitions will be insert into Hive in sequential way, so group them as batches will not help here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is true for Hive <=0.12, for Hive 0.13+, they are sent in single RPC. so we should verify that what's limit for a single RPC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the Hive Metastore can't handle a RPC with millions of partitions, I will send a patch to do it in batch.

}

@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))

private def scanPartitions(
spark: SparkSession,
fs: FileSystem,
path: Path,
spec: TablePartitionSpec,
numPartitionsLeft: Int): GenSeq[(TablePartitionSpec, Path)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if we can reuse code in PartitionUtils. Also, path name can be escaped. We need to handle this kind of cases (we have unescapePathName in PartitionUtils).

if (numPartitionsLeft == 0) {
return Seq(spec -> path)
}

val statuses = fs.listStatus(path)
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val statusPar: GenSeq[FileStatus] =
if (numPartitionsLeft > 1 && statuses.length > threshold || numPartitionsLeft > 2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition looks confusing.

val parArray = statuses.par
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't look carefully - but if you are using the default exec context, please create a new one. otherwise it'd block.

Copy link
Contributor

@hvanhovell hvanhovell Aug 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. can we make it explicit, e.g. statuses.par(evalTaskSupport)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from UnionRDD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not figure out how it work, at least statuses.par(evalTaskSupport) does not work.

parArray.tasksupport = evalTaskSupport
parArray
} else {
statuses
}
statusPar.flatMap { st =>
val ps = st.getPath.getName.split("=", 2)
if (ps.length != 2) {
throw new AnalysisException(s"Invalid partition path: ${st.getPath}")
}
scanPartitions(spark, fs, st.getPath, spec ++ Map(ps(0) -> ps(1)), numPartitionsLeft - 1)
}
}
}


/**
* A command that sets the location of a table or a partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -388,6 +388,50 @@ case class TruncateTableCommand(
}
}

/**
* A command to repair a table by discovery all the partitions in the directory.
*
* The syntax of this command is:
* {{{
* MSCK REPAIR TABLE table_name;
* }}}
*
* This command is the same as AlterTableRecoverPartitions
*/
case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have the command? It is exactly the same as the AlterTableRecoverPartitionsCommand, except for the wording of the errors.

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
if (!catalog.tableExists(tableName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dead code. The previous line already checks whether the table exists or not.

val table = catalog.getTableMetadata(tableName)

throw new AnalysisException(s"Table $tableName in MSCK REPAIR TABLE does not exist.")
}
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName")
}
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: MSCK REPAIR TABLE on datasource tables: $tableName")
}
if (table.tableType != CatalogTableType.EXTERNAL) {
throw new AnalysisException(
s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName")
}
if (DDLUtils.isTablePartitioned(table)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checking conflicts with the error message reported below. Now, if the table is partitioned, we will report the message: MSCK REPAIR TABLE only works on partitioned tables

throw new AnalysisException(
s"Operation not allowed: MSCK REPAIR TABLE only works on partitioned tables: $tableName")
}
if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: MSCK REPAIR TABLE only works on tables with location provided: " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can remove s

s"$tableName")
}

AlterTableRecoverPartitionsCommand(tableName).recoverPartitions(spark, table)
Seq.empty[Row]
}
}

/**
* Command that looks like
* {{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,14 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}

test("alter table: recover partitions") {
val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
val parsed = parser.parsePlan(sql)
val expected = AlterTableRecoverPartitionsCommand(
TableIdentifier("table_name", None))
comparePlans(parsed, expected)
}

test("alter view: add partition (not supported)") {
assertUnsupported(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = true)
}

test("alter table: recover partitions (sequential)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
testRecoverPartitions()
}
}

test("after table: recover partition (parallel)") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after -> alter

withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
}
}

private def testRecoverPartitions() {
val catalog = spark.sessionState.catalog
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
}

val tableIdent = TableIdentifier("tab1")
createTable(catalog, tableIdent)
val part1 = Map("a" -> "1", "b" -> "5")
createTablePartition(catalog, part1, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))

val part2 = Map("a" -> "2", "b" -> "6")
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.mkdirs(new Path(new Path(root, "a=2"), "b=6"))
try {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
} finally {
fs.delete(root, true)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add tests to exercise the command more. Here are three examples.

  1. There is an partition dir has a bad name (not in the format of key=value).
  2. Say that we have two partition columns. We have some files under the first layer (e.g. _SUCCESS, parquet's metadata files, and/or regular data files).
  3. Some dirs do not have the expected number of partition columns. For example, the schema specifies 3 partition columns. But, a path only has two partition columns.
  4. The partition column columns encoded in the path does not match the name specified in the schema. For example, when we create the table, we specify c1 as the first partition column. However, the dir in fs has c2 as the first partition column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


test("alter table: add partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down Expand Up @@ -499,8 +500,12 @@ class HiveDDLCommandSuite extends PlanTest {
}
}

test("MSCK repair table (not supported)") {
assertUnsupported("MSCK REPAIR TABLE tab1")
test("MSCK REPAIR table") {
val sql = "MSCK REPAIR TABLE tab1"
val parsed = parser.parsePlan(sql)
val expected = RepairTableCommand(
TableIdentifier("tab1", None))
comparePlans(parsed, expected)
}

test("create table like") {
Expand Down