Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.URI

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, let's not touch this because we didn't change anything in this file. It would be helpful for backporting. SPARK-25271 is reported as a regression in 2.3.x. I assume that we need to backport this for 2.4.1 and 2.3.3 at least.

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,14 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -132,11 +131,11 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
case CreateTable(tableDesc, mode, None, _) if DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)

case CreateTable(tableDesc, mode, Some(query))
case CreateTable(tableDesc, mode, Some(query), false)
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))
Expand Down Expand Up @@ -244,27 +243,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
})
}

private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = readHiveTable(tableMeta))
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta)

case UnresolvedCatalogRelation(tableMeta) =>
readHiveTable(tableMeta)
DDLUtils.readHiveTable(tableMeta)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ import org.apache.spark.sql.types._
* @param tableDesc the metadata of the table to be created.
* @param mode the data writing mode
* @param query an optional logical plan representing data to write into the created table.
* @param useExternalSerde whether to use external serde to write data, e.g., Hive Serde. Currently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too hacky. We should not leak hive specific knowledge to general logical plans.

Copy link
Member Author

@viirya viirya Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because all rules related to conversion to data source are located in RelationConversions. So now I need to set a flag at this logical plan and pass to CreateHiveTableAsSelectCommand.

If we loose this requirement, we can avoid this flag and let CreateHiveTableAsSelectCommand decide to convert it to data source or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is better to put all this conversion stuff of Hive CTAS into CreateHiveTableAsSelectCommand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a clear idea now, but CreateTable is a general logical plan for CREATE TABLE, we may even public in to data source/catalog APIs in the future, we should not put hive specific concept here.

* this is only used by Hive. When we are planing `CreateTable`, and a Hive
* table to be created can be converted to data source table, we set this
* to false so later we know we can use data source writer to write data.
*/
case class CreateTable(
tableDesc: CatalogTable,
mode: SaveMode,
query: Option[LogicalPlan]) extends LogicalPlan {
query: Option[LogicalPlan],
useExternalSerde: Boolean = false) extends LogicalPlan {
assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")

if (query.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// bucketing information is specified, as we can't infer bucketing from data files currently.
// Since the runtime inferred partition columns could be different from what user specified,
// we fail the query if the partitioning information is specified.
case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
case c @ CreateTable(tableDesc, _, None, _) if tableDesc.schema.isEmpty =>
if (tableDesc.bucketSpec.isDefined) {
failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
"when creating and will be inferred at runtime")
Expand All @@ -93,7 +93,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// When we append data to an existing table, check if the given provider, partition columns,
// bucket spec, etc. match the existing table, and adjust the columns order of the given query
// if necessary.
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query), _)
if query.resolved && catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
assert(tableDesc.provider.isDefined)
Expand Down Expand Up @@ -198,7 +198,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// * partition columns' type must be AtomicType.
// * sort columns' type must be orderable.
// * reorder table schema or output of query plan, to put partition columns at the end.
case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) =>
case c @ CreateTable(tableDesc, _, query, _) if query.forall(_.resolved) =>
if (query.isDefined) {
assert(tableDesc.schema.isEmpty,
"Schema may not be specified in a Create Table As Select (CTAS) statement")
Expand Down Expand Up @@ -388,7 +388,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
case CreateTable(tableDesc, _, _, _) if DDLUtils.isHiveTable(tableDesc) =>
throw new AnalysisException("Hive support is required to CREATE Hive TABLE (AS SELECT)")
case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2648,7 +2648,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
"transform_values(" +
"z,(k, v) -> map_from_arrays(ARRAY(1, 2, 3), " +
"ARRAY('one', 'two', 'three'))[k] || '_' || CAST(v AS String))"),
Seq(Row(Map(1 -> "one_1.0", 2 -> "two_1.4", 3 ->"three_1.7"))))
Seq(Row(Map(1 -> "one_1.0", 2 -> "two_1.4", 3 -> "three_1.7"))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one, too. Let's not touch this because we didn't change anything in this file. It would be helpful for backporting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will put those change into a minor PR instead.


checkAnswer(
dfExample4.selectExpr("transform_values(z, (k, v) -> k-v)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SparkSqlParserSuite extends AnalysisTest {
*/
override def normalizePlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case CreateTable(tableDesc, mode, query) =>
case CreateTable(tableDesc, mode, query, _) =>
val newTableDesc = tableDesc.copy(createTime = -1L)
CreateTable(newTableDesc, mode, query)
case _ => plan // Don't transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {

private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, _, _) => (tableDesc, mode == SaveMode.Ignore)
}.head
}

Expand Down Expand Up @@ -431,7 +431,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
)

parser.parsePlan(query) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand All @@ -453,7 +453,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
)

parser.parsePlan(query) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand All @@ -473,7 +473,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
comment = Some("abc"))

parser.parsePlan(sql) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand All @@ -493,7 +493,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
properties = Map("test" -> "test"))

parser.parsePlan(sql) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand Down Expand Up @@ -543,7 +543,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
provider = Some("parquet"))

parser.parsePlan(v1) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand Down Expand Up @@ -574,7 +574,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
provider = Some("parquet"))

parser.parsePlan(sql) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand Down Expand Up @@ -1183,7 +1183,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
)

parser.parsePlan(sql) match {
case CreateTable(tableDesc, _, None) =>
case CreateTable(tableDesc, _, None, _) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import java.util.Locale

import scala.util.control.NonFatal

import com.google.common.util.concurrent.Striped
Expand All @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -111,7 +115,54 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def convertToLogicalRelation(
def isConvertible(relation: HiveTableRelation): Boolean = {
isConvertible(relation.tableMeta)
}

def isConvertible(tableMeta: CatalogTable): Boolean = {
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

private def convertToLogicalRelation(
relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
Expand Down
Loading