diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 17c8404f8a79..eec5e7ad39df 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -88,6 +88,13 @@ statement (AS? query)? #createHiveTable | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier LIKE source=tableIdentifier locationSpec? #createTableLike + | createTableHeader + LIKE sourceFormat=STRING sourceLocation=STRING + (COMMENT comment=STRING)? + (PARTITIONED BY '(' partitionColumns=colTypeList ')')? + bucketSpec? skewSpec? + rowFormat? createFileFormat? locationSpec? + (TBLPROPERTIES tablePropertyList)? #createTableLikeFile | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS (identifier | FOR COLUMNS identifierSeq)? #analyze | ALTER TABLE tableIdentifier diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6de9ea0efd2c..0d17d81c2cd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -24,6 +24,10 @@ import scala.collection.JavaConverters._ import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetFileReader + import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -33,6 +37,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructType @@ -1195,6 +1200,101 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } } + /** + * Create a Hive serde table, returning a [[CreateTable]] logical plan. + * + * Expect Format: + * {{{ + * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name].[table_name] + * LIKE 'parquet' 'parquert-file-location' + * [COMMENT table_comment] + * [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)] + * [ROW FORMAT row_format] + * [STORED AS file_format] + * [LOCATION path] + * [TBLPROPERTIES (property_name=property_value, ...)] + * }}} + */ + override def visitCreateTableLikeFile(ctx: CreateTableLikeFileContext): LogicalPlan = + withOrigin(ctx) { + + val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + // TODO: implement temporary tables + if (temp) { + throw new ParseException( + "CREATE TEMPORARY TABLE is not supported yet. " + + "Please use CREATE TEMPORARY VIEW as an alternative.", ctx) + } + if (ctx.skewSpec != null) { + operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx) + } + + val sourceFileFormat = Option(ctx.sourceFormat.getText).get.toString.replace("'", "") + if (!sourceFileFormat.toLowerCase.equals("parquet")) { + operationNotAllowed("CREATE TABLE ... LIKE File only supports parquet", ctx) + } + val sourceFileLocation = Option(ctx.sourceLocation.getText).get.toString.replace("'", "") + + val hadoopConf = new Configuration() + val metaData = ParquetFileReader.readFooter(hadoopConf, new Path(sourceFileLocation.toString)) + val parquetSchema = metaData.getFileMetaData.getSchema + + val dataCols: StructType = ParquetSchemaConverter. + SqlParquetSchemaConverter.convert(parquetSchema) + val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) + val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues). + getOrElse(Map.empty) + val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) + + // Note: Hive requires partition columns to be distinct from the schema, so we need + // to include the partition columns here explicitly + val schema = StructType(dataCols ++ partitionCols) + + // Storage format + val defaultStorage = HiveSerDe.getDefaultStorage(conf) + validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(CatalogStorageFormat.empty) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) + .getOrElse(CatalogStorageFormat.empty) + val location = Option(ctx.locationSpec).map(visitLocationSpec) + // If we are creating an EXTERNAL table, then the LOCATION field is required + if (external && location.isEmpty) { + operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) + } + + val locUri = location.map(CatalogUtils.stringToURI(_)) + val storage = CatalogStorageFormat( + locationUri = locUri, + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + compressed = false, + properties = rowStorage.properties ++ fileStorage.properties) + // If location is defined, we'll assume this is an external table. + // Otherwise, we may accidentally delete existing data. + val tableType = if (external || location.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + + // TODO support the sql text - have a proper location for this! + val tableDesc = CatalogTable( + identifier = name, + tableType = tableType, + storage = storage, + schema = schema, + bucketSpec = bucketSpec, + provider = Some(DDLUtils.HIVE_PROVIDER), + partitionColumnNames = partitionCols.map(_.name), + properties = properties, + comment = Option(ctx.comment).map(string)) + + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, None) + } + /** * Create a [[CreateTableLikeCommand]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index b3781cfc4a60..52cfe9e65326 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -600,4 +600,6 @@ private[sql] object ParquetSchemaConverter { Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes .asInstanceOf[Int] } -} + + def SqlParquetSchemaConverter: ParquetSchemaConverter = new ParquetSchemaConverter() +} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index eb7c33590b60..0528e14becfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.command +import java.io.File import java.net.URI import java.util.Locale -import scala.reflect.{classTag, ClassTag} - +import scala.reflect.{ClassTag, classTag} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -1631,6 +1631,38 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { comparePlans(parsed, expected) } + test("create table like parquet") { + + val f = getClass.getClassLoader. + getResource("test-data/dec-in-fixed-len.parquet").getPath + val v1 = + """ + |create table if not exists db1.table1 like 'parquet' + """.stripMargin.concat("'" + f + "'").concat( + """ + |stored as sequencefile + |location '/tmp/table1' + """.stripMargin + ) + + val (desc, allowExisting) = extractTableDesc(v1) + + assert(allowExisting) + assert(desc.identifier.database == Some("db1")) + assert(desc.identifier.table == "table1") + assert(desc.tableType == CatalogTableType.EXTERNAL) + assert(desc.schema == new StructType() + .add("fixed_len_dec", "decimal(10,2)")) + assert(desc.bucketSpec.isEmpty) + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.storage.locationUri == Some(new URI("/tmp/table1"))) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.SequenceFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.mapred.SequenceFileOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + } + test("create table like") { val v1 = "CREATE TABLE table1 LIKE table2" val (target, source, location, exists) = parser.parsePlan(v1).collect {