Skip to content

Commit 0a18435

Browse files
committed
support old table which doesn't store schema in table properties
1 parent 39e044e commit 0a18435

17 files changed

Lines changed: 90 additions & 83 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -689,12 +689,7 @@ class SessionCatalog(
689689
child = parser.parsePlan(viewText))
690690
SubqueryAlias(table, child)
691691
} else {
692-
val tableRelation = CatalogRelation(
693-
metadata,
694-
// we assume all the columns are nullable.
695-
metadata.dataSchema.asNullable.toAttributes,
696-
metadata.partitionSchema.asNullable.toAttributes)
697-
SubqueryAlias(table, tableRelation)
692+
SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
698693
}
699694
} else {
700695
SubqueryAlias(table, tempTables(table))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException
2828
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
3030
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
31-
import org.apache.spark.sql.catalyst.plans.QueryPlan
3231
import org.apache.spark.sql.catalyst.plans.logical._
3332
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3433
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -400,11 +399,22 @@ object CatalogTypes {
400399
type TablePartitionSpec = Map[String, String]
401400
}
402401

402+
/**
403+
* A placeholder for a table relation, which will be replaced by concrete relation like
404+
* `LogicalRelation` or `HiveTableRelation`, during analysis.
405+
*/
406+
case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
407+
assert(tableMeta.identifier.database.isDefined)
408+
override lazy val resolved: Boolean = false
409+
override def output: Seq[Attribute] = Nil
410+
}
403411

404412
/**
405-
* A [[LogicalPlan]] that represents a table.
413+
* A `LogicalPlan` that represents a hive table.
414+
*
415+
* TODO: remove this after we completely make hive as a data source.
406416
*/
407-
case class CatalogRelation(
417+
case class HiveTableRelation(
408418
tableMeta: CatalogTable,
409419
dataCols: Seq[AttributeReference],
410420
partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
@@ -418,15 +428,15 @@ case class CatalogRelation(
418428
def isPartitioned: Boolean = partitionCols.nonEmpty
419429

420430
override def equals(relation: Any): Boolean = relation match {
421-
case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
431+
case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
422432
case _ => false
423433
}
424434

425435
override def hashCode(): Int = {
426436
Objects.hashCode(tableMeta.identifier, output)
427437
}
428438

429-
override lazy val canonicalized: LogicalPlan = copy(
439+
override lazy val canonicalized: HiveTableRelation = copy(
430440
tableMeta = tableMeta.copy(
431441
storage = CatalogStorageFormat.empty,
432442
createTime = -1
@@ -439,15 +449,12 @@ case class CatalogRelation(
439449
})
440450

441451
override def computeStats(): Statistics = {
442-
// For data source tables, we will create a `LogicalRelation` and won't call this method, for
443-
// hive serde tables, we will always generate a statistics.
444-
// TODO: unify the table stats generation.
445452
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
446453
throw new IllegalStateException("table stats must be specified.")
447454
}
448455
}
449456

450-
override def newInstance(): LogicalPlan = copy(
457+
override def newInstance(): HiveTableRelation = copy(
451458
dataCols = dataCols.map(_.newInstance()),
452459
partitionCols = partitionCols.map(_.newInstance()))
453460
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2222
import org.apache.spark.sql.catalyst.analysis._
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
25-
import org.apache.spark.sql.catalyst.plans.PlanTest
2625
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
2726
import org.apache.spark.sql.internal.SQLConf
2827
import org.apache.spark.sql.types._
@@ -529,14 +528,14 @@ abstract class SessionCatalogSuite extends AnalysisTest {
529528
catalog.setCurrentDatabase("db2")
530529
// If we explicitly specify the database, we'll look up the relation in that database
531530
assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
532-
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
531+
.asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
533532
// Otherwise, we'll first look up a temporary table with the same name
534533
assert(catalog.lookupRelation(TableIdentifier("tbl1"))
535534
== SubqueryAlias("tbl1", tempTable1))
536535
// Then, if that does not exist, look up the relation in the current database
537536
catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
538537
assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
539-
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
538+
.asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
540539
}
541540
}
542541

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
2424
import org.apache.spark.annotation.InterfaceStability
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
27-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
27+
import org.apache.spark.sql.catalyst.catalog._
2828
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
2929
import org.apache.spark.sql.execution.SQLExecution
3030
import org.apache.spark.sql.execution.command.DDLUtils
31-
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
31+
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
3232
import org.apache.spark.sql.sources.BaseRelation
3333
import org.apache.spark.sql.types.StructType
3434

@@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
372372
// Get all input data source or hive relations of the query.
373373
val srcRelations = df.logicalPlan.collect {
374374
case LogicalRelation(src: BaseRelation, _, _) => src
375-
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
376-
relation.tableMeta.identifier
375+
case relation: HiveTableRelation => relation.tableMeta.identifier
377376
}
378377

379378
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
383382
throw new AnalysisException(
384383
s"Cannot overwrite table $tableName that is also being read from")
385384
// check hive table relation when overwrite mode
386-
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
387-
&& srcRelations.contains(relation.tableMeta.identifier) =>
385+
case relation: HiveTableRelation
386+
if srcRelations.contains(relation.tableMeta.identifier) =>
388387
throw new AnalysisException(
389388
s"Cannot overwrite table $tableName that is also being read from")
390389
case _ => // OK

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast
3636
import org.apache.spark.rdd.RDD
3737
import org.apache.spark.sql.catalyst._
3838
import org.apache.spark.sql.catalyst.analysis._
39-
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
39+
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
4040
import org.apache.spark.sql.catalyst.encoders._
4141
import org.apache.spark.sql.catalyst.expressions._
4242
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
@@ -2965,7 +2965,7 @@ class Dataset[T] private[sql](
29652965
fsBasedRelation.inputFiles
29662966
case fr: FileRelation =>
29672967
fr.inputFiles
2968-
case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
2968+
case r: HiveTableRelation =>
29692969
r.tableMeta.storage.locationUri.map(_.toString).toArray
29702970
}.flatten
29712971
files.toSet.toArray

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.catalyst.InternalRow
21-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
21+
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.aggregate._
2424
import org.apache.spark.sql.catalyst.plans.logical._
@@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
9999
val partitionData = fsRelation.location.listFiles(Nil, Nil)
100100
LocalRelation(partAttrs, partitionData.map(_.values))
101101

102-
case relation: CatalogRelation =>
102+
case relation: HiveTableRelation =>
103103
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
104104
val caseInsensitiveProperties =
105105
CaseInsensitiveMap(relation.tableMeta.storage.properties)
@@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
135135
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
136136
Some((AttributeSet(partAttrs), l))
137137

138-
case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
138+
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
139139
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
140140
Some((AttributeSet(partAttrs), relation))
141141

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import org.apache.spark.sql._
2525
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
2626
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
2727
import org.apache.spark.sql.catalyst.analysis._
28-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
28+
import org.apache.spark.sql.catalyst.catalog._
2929
import org.apache.spark.sql.catalyst.expressions
3030
import org.apache.spark.sql.catalyst.expressions._
3131
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
3232
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
33-
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
33+
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3434
import org.apache.spark.sql.catalyst.rules.Rule
3535
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
3636
import org.apache.spark.sql.execution.command._
@@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
207207

208208

209209
/**
210-
* Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
210+
* Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
211+
*
212+
* TODO: we should remove the special handling for hive tables after completely making hive as a
213+
* data source.
211214
*/
212215
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
213-
private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
214-
val table = r.tableMeta
216+
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
215217
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
216-
val catalogProxy = sparkSession.sessionState.catalog
217-
218-
val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
218+
val catalog = sparkSession.sessionState.catalog
219+
catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
219220
override def call(): LogicalPlan = {
220221
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
221222
val dataSource =
@@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
232233

233234
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
234235
}
235-
}).asInstanceOf[LogicalRelation]
236+
})
237+
}
236238

237-
if (r.output.isEmpty) {
238-
// It's possible that the table schema is empty and need to be inferred at runtime. For this
239-
// case, we don't need to change the output of the cached plan.
240-
plan
241-
} else {
242-
plan.copy(output = r.output)
243-
}
239+
private def readHiveTable(table: CatalogTable): LogicalPlan = {
240+
HiveTableRelation(
241+
table,
242+
// Hive table columns are always nullable.
243+
table.dataSchema.asNullable.toAttributes,
244+
table.partitionSchema.asNullable.toAttributes)
244245
}
245246

246247
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
247-
case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
248-
if DDLUtils.isDatasourceTable(r.tableMeta) =>
249-
i.copy(table = readDataSourceTable(r))
248+
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
249+
if DDLUtils.isDatasourceTable(tableMeta) =>
250+
i.copy(table = readDataSourceTable(tableMeta))
251+
252+
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
253+
i.copy(table = readHiveTable(tableMeta))
254+
255+
case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
256+
readDataSourceTable(tableMeta)
250257

251-
case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
252-
readDataSourceTable(r)
258+
case UnresolvedCatalogRelation(tableMeta) =>
259+
readHiveTable(tableMeta)
253260
}
254261
}
255262

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
382382
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
383383
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
384384
table match {
385-
case relation: CatalogRelation =>
385+
case relation: HiveTableRelation =>
386386
val metadata = relation.tableMeta
387387
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
388388
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
@@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {
427427

428428
private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
429429
operator match {
430-
case _: CatalogRelation => 1
430+
case _: HiveTableRelation => 1
431431
case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
432432
case _: LeafNode => 0
433433
// UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable
2424
import scala.util.Random
2525

2626
import org.apache.spark.sql.catalyst.TableIdentifier
27-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
27+
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
2828
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3030
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
171171
// Analyze only one column.
172172
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
173173
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
174-
case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
174+
case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
175175
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
176176
}.head
177177
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
112112
}
113113

114114
def convertToLogicalRelation(
115-
relation: CatalogRelation,
115+
relation: HiveTableRelation,
116116
options: Map[String, String],
117117
fileFormatClass: Class[_ <: FileFormat],
118118
fileType: String): LogicalRelation = {
@@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
210210
logicalRelation
211211
})
212212
}
213-
// The inferred schema may have different filed names as the table schema, we should respect
213+
// The inferred schema may have different field names as the table schema, we should respect
214214
// it, but also respect the exprId in table relation output.
215215
assert(result.output.length == relation.output.length &&
216216
result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
@@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
221221
}
222222

223223
private def inferIfNeeded(
224-
relation: CatalogRelation,
224+
relation: HiveTableRelation,
225225
options: Map[String, String],
226226
fileFormat: FileFormat,
227227
fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {

0 commit comments

Comments
 (0)