Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.URI

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, let's not touch this because we didn't change anything in this file. It would be helpful for backporting. SPARK-25271 is reported as a regression in 2.3.x. I assume that we need to backport this for 2.4.1 and 2.3.3 at least.

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,14 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -244,27 +243,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
})
}

private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = readHiveTable(tableMeta))
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta)

case UnresolvedCatalogRelation(tableMeta) =>
readHiveTable(tableMeta)
DDLUtils.readHiveTable(tableMeta)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2648,7 +2648,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
"transform_values(" +
"z,(k, v) -> map_from_arrays(ARRAY(1, 2, 3), " +
"ARRAY('one', 'two', 'three'))[k] || '_' || CAST(v AS String))"),
Seq(Row(Map(1 -> "one_1.0", 2 -> "two_1.4", 3 ->"three_1.7"))))
Seq(Row(Map(1 -> "one_1.0", 2 -> "two_1.4", 3 -> "three_1.7"))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one, too. Let's not touch this because we didn't change anything in this file. It would be helpful for backporting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will put those change into a minor PR instead.


checkAnswer(
dfExample4.selectExpr("transform_values(z, (k, v) -> k-v)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import java.util.Locale

import scala.util.control.NonFatal

import com.google.common.util.concurrent.Striped
Expand All @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -113,7 +117,54 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def convertToLogicalRelation(
def isConvertible(relation: HiveTableRelation): Boolean = {
isConvertible(relation.tableMeta)
}

def isConvertible(tableMeta: CatalogTable): Boolean = {
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

private def convertToLogicalRelation(
relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.hive

import java.io.IOException
import java.util.Locale

import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -31,8 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}

Expand Down Expand Up @@ -180,63 +178,23 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
private def isConvertible(relation: HiveTableRelation): Boolean = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}
private val metastoreCatalog = sessionCatalog.metastoreCatalog

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
!r.isPartitioned && metastoreCatalog.isConvertible(r) =>
InsertIntoTable(metastoreCatalog.convert(r), partition,
query, overwrite, ifPartitionNotExists)

// Read path
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
if DDLUtils.isHiveTable(relation.tableMeta) && metastoreCatalog.isConvertible(relation) =>
metastoreCatalog.convert(relation)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.{HiveMetastoreCatalog, HiveSessionCatalog}


/**
Expand All @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand(

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more thoughts:

CreateHiveTableAsSelectCommand just runs another command, so we will not get any metric for this plan node. It's OK if we use the hive writer, as we indeed can't get any metrics(the writing is done by hive). However, if we can convert and use Spark's native writer, we do have metrics. I think a better fix is to replace Hive CTAS with data source CTAS during optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the table metadata created by data source CTAS and Hive CTAS are different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then how about we create a special Hive CTAS command that follows data source CTAS command but creates Hive table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about it. But then we will have two Hive CTAS commands. Is it good for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with that, since we do have 2 different ways to do Hive CTAS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a Hive CTAS with data source command.

val catalog = sparkSession.sessionState.catalog
val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog

// Whether this table is convertible to data source relation.
val isConvertible = metastoreCatalog.isConvertible(tableDesc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another idea: can we move this logic to the RelationConversions rule? e.g.

case CreateTable(tbl, mode, Some(query)) if DDLUtils.isHiveTable(tbl) && isConvertible(tbl) =>
  Union(CreateTable(tbl, mode, None), InsertIntoTable ...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel CreateHiveTableAsSelectCommand is not useful. It simply creates the table first and then call InsertIntoHiveTable.run. Maybe we should just remove it and implement hive table CTAS as Union(CreateTable, InsertIntoTable).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is interesting idea. Let me try it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a try on this idea.

There is an issue that convertToLogicalRelation needs that the HiveTableRelation is an existing relation. It is good for InsertIntoTable case.

For CTAS now, this relation doesn't exist. Although we use an Union and CreateTable will be run first, the conversion is happened during analysis stage and the table is not created yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah makes sense, thanks for trying!


if (catalog.tableExists(tableIdentifier)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
Expand All @@ -57,13 +63,18 @@ case class CreateHiveTableAsSelectCommand(
return Seq.empty
}

InsertIntoHiveTable(
tableDesc,
Map.empty,
query,
overwrite = false,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
if (!isConvertible) {
InsertIntoHiveTable(
tableDesc,
Map.empty,
query,
overwrite = false,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
} else {
getHadoopFsRelationCommand(sparkSession, metastoreCatalog, tableDesc, mode)
.run(sparkSession, child)
}
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
Expand All @@ -75,15 +86,20 @@ case class CreateHiveTableAsSelectCommand(
try {
// Read back the metadata of the table which was created just now.
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
// For CTAS, there is no static partition values to insert.
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
createdTableMeta,
partition,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
if (!isConvertible) {
// For CTAS, there is no static partition values to insert.
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
createdTableMeta,
partition,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
} else {
getHadoopFsRelationCommand(sparkSession, metastoreCatalog, createdTableMeta,
SaveMode.Overwrite).run(sparkSession, child)
}
} catch {
case NonFatal(e) =>
// drop the created table.
Expand All @@ -95,6 +111,34 @@ case class CreateHiveTableAsSelectCommand(
Seq.empty[Row]
}

// Converts Hive table to data source one and returns an `InsertIntoHadoopFsRelationCommand`
// used to write data into it.
private def getHadoopFsRelationCommand(
sparkSession: SparkSession,
metastoreCatalog: HiveMetastoreCatalog,
tableDesc: CatalogTable,
mode: SaveMode): InsertIntoHadoopFsRelationCommand = {
val hiveTable = DDLUtils.readHiveTable(tableDesc)
val hadoopRelation = metastoreCatalog.convert(hiveTable) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " +
"HadoopFsRelation.")
}
InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
false,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
mode,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))
}

override def argString: String = {
s"[Database:${tableDesc.database}}, " +
s"TableName: ${tableDesc.identifier.table}, " +
Expand Down
2 changes: 2 additions & 0 deletions sql/hive/src/test/resources/data/files/empty_map.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0,1$abc2$pqr:3$xyz
1,
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
}
}
}

test("SPARK-25271: write empty map into hive parquet table") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is the original reported test case in SPARK-25271 Creating parquet table with all the column null throws exception, this PR is aiming to resolve the root cause for general issues Hive ctas commands should use data source if it is convertible. Can we have more additional test cases outside HiveParquetSuite?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed. Now because we have two Hive CTAS commands, it is easier to test it. Will add tests later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new test for that.

val testData = hiveContext.getHiveFile("data/files/empty_map.dat").getCanonicalFile()
val sourceTable = "sourceTable"
val targetTable = "targetTable"
withTable(sourceTable, targetTable) {
sql(s"CREATE TABLE $sourceTable (i int,m map<int, string>) ROW FORMAT DELIMITED FIELDS " +
"TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' MAP KEYS TERMINATED BY '$'")
sql(s"LOAD DATA LOCAL INPATH '${testData.toURI}' INTO TABLE $sourceTable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we generate the input data with a temp view? e.g. create a dataframe with literals and register temp view.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

sql(s"CREATE TABLE $targetTable STORED AS PARQUET AS SELECT m FROM $sourceTable")
checkAnswer(sql(s"SELECT m FROM $targetTable"),
Row(Map(1 -> "abc2$pqr", 3 -> "xyz")) :: Row(Map.empty[Int, String]) :: Nil)
}
}
}