Skip to content

Commit 4a66167

Browse files
committed
[HUDI-4487] support to create ro/rt table by spark sql
1 parent c72d895 commit 4a66167

File tree

4 files changed

+169
-11
lines changed

4 files changed

+169
-11
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ import scala.collection.JavaConverters._
3939
import scala.collection.mutable
4040

4141
/**
42-
* Table definition for SQL funcitonalities. Depending on the way of data generation,
42+
* Table definition for SQL functionalities. Depending on the way of data generation,
4343
* meta of Hudi table can be from Spark catalog or meta directory on filesystem.
4444
* [[HoodieCatalogTable]] takes both meta sources into consideration when handling
4545
* EXTERNAL and MANAGED tables.
46+
*
47+
* NOTE: all the meta should be retrieved from meta directory on filesystem first.
4648
*/
4749
class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging {
4850

@@ -53,7 +55,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
5355
/**
5456
* database.table in catalog
5557
*/
56-
val catalogTableName = table.qualifiedName
58+
val catalogTableName: String = table.qualifiedName
5759

5860
/**
5961
* properties defined in catalog.

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.spark.sql.hudi.command
1919

2020
import org.apache.hadoop.fs.Path
21-
import org.apache.hudi.common.model.HoodieFileFormat
21+
22+
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType}
2223
import org.apache.hudi.common.table.HoodieTableConfig
2324
import org.apache.hudi.hadoop.HoodieParquetInputFormat
2425
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
2526
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
27+
import org.apache.hudi.sync.common.util.ConfigUtils
2628
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
29+
2730
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
2831
import org.apache.spark.sql.catalyst.catalog._
2932
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
@@ -33,7 +36,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
3336
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
3437
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
3538
import org.apache.spark.sql.types.StructType
36-
import org.apache.spark.sql.{Row, SparkSession}
39+
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3740
import org.apache.spark.{SPARK_VERSION, SparkConf}
3841

3942
import scala.collection.JavaConverters._
@@ -62,12 +65,22 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
6265
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
6366
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
6467

65-
// init hoodie table
66-
hoodieCatalogTable.initHoodieTable()
68+
val queryAsProp = hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
69+
if (queryAsProp.isEmpty) {
70+
// init hoodie table for a normal table (not a ro/rt table)
71+
hoodieCatalogTable.initHoodieTable()
72+
} else {
73+
if (!hoodieCatalogTable.hoodieTableExists) {
74+
throw new AnalysisException("Creating ro/rt table need the existence of the base table.")
75+
}
76+
if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) {
77+
throw new AnalysisException("Creating ro/rt table should only apply to a mor table.")
78+
}
79+
}
6780

6881
try {
6982
// create catalog table for this hoodie table
70-
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists)
83+
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp)
7184
} catch {
7285
case NonFatal(e) =>
7386
logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}")
@@ -92,8 +105,11 @@ object CreateHoodieTableCommand {
92105
}
93106
}
94107

95-
def createTableInCatalog(sparkSession: SparkSession,
96-
hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = {
108+
def createTableInCatalog(
109+
sparkSession: SparkSession,
110+
hoodieCatalogTable: HoodieCatalogTable,
111+
ignoreIfExists: Boolean,
112+
queryAsProp: Option[String] = None): Unit = {
97113
val table = hoodieCatalogTable.table
98114
assert(table.tableType != CatalogTableType.VIEW)
99115

@@ -121,7 +137,8 @@ object CreateHoodieTableCommand {
121137
Some(outputFormat),
122138
Some(serdeFormat),
123139
table.storage.compressed,
124-
storageProperties + ("path" -> path))
140+
storageProperties + ("path" -> path) ++ queryAsProp.map(ConfigUtils.IS_QUERY_AS_RO_TABLE -> _)
141+
)
125142

126143
val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table)
127144
val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.Path
22+
2223
import org.apache.hudi.DataSourceWriteOptions
2324
import org.apache.hudi.hive.HiveSyncConfigHolder
2425
import org.apache.hudi.sql.InsertMode
2526
import org.apache.hudi.sync.common.util.ConfigUtils
27+
2628
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
2729
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
2830
import org.apache.spark.sql.catalyst.plans.QueryPlan
2931
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
3032
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
31-
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
33+
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
3234

3335
import scala.collection.JavaConverters._
3436

@@ -45,6 +47,11 @@ case class CreateHoodieTableAsSelectCommand(
4547
assert(table.tableType != CatalogTableType.VIEW)
4648
assert(table.provider.isDefined)
4749

50+
val hasQueryAsProp = (table.storage.properties ++ table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
51+
if (hasQueryAsProp) {
52+
throw new AnalysisException("Not support CTAS for the ro/rt table")
53+
}
54+
4855
val sessionState = sparkSession.sessionState
4956
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
5057
val tableIdentWithDB = table.identifier.copy(database = Some(db))

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,138 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
382382
}
383383
}
384384

385+
test("Test Create ro/rt Table In The Right Way") {
386+
withTempDir { tmp =>
387+
val parentPath = tmp.getCanonicalPath
388+
val tableName1 = generateTableName
389+
spark.sql(
390+
s"""
391+
|create table $tableName1 (
392+
| id int,
393+
| name string,
394+
| ts long
395+
|) using hudi
396+
| tblproperties (
397+
| primaryKey = 'id',
398+
| preCombineField = 'ts',
399+
| type = 'mor'
400+
| )
401+
| location '$parentPath/$tableName1'
402+
""".stripMargin)
403+
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
404+
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
405+
406+
// drop ro and rt table, and recreate them
407+
val roTableName1 = tableName1 + "_ro"
408+
val rtTableName1 = tableName1 + "_rt"
409+
spark.sql(
410+
s"""
411+
|create table $roTableName1
412+
|using hudi
413+
|tblproperties (
414+
| 'hoodie.query.as.ro.table' = 'true'
415+
|)
416+
|location '$parentPath/$tableName1'
417+
|""".stripMargin
418+
)
419+
val roCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(roTableName1))
420+
assertResult(roCatalogTable.properties("type"))("mor")
421+
assertResult(roCatalogTable.properties("primaryKey"))("id")
422+
assertResult(roCatalogTable.properties("preCombineField"))("ts")
423+
assertResult(roCatalogTable.storage.properties("hoodie.query.as.ro.table"))("true")
424+
checkAnswer(s"select id, name, ts from $roTableName1")(
425+
Seq(1, "a1", 1000)
426+
)
427+
428+
spark.sql(
429+
s"""
430+
|create table $rtTableName1
431+
|using hudi
432+
|tblproperties (
433+
| 'hoodie.query.as.ro.table' = 'false'
434+
|)
435+
|location '$parentPath/$tableName1'
436+
|""".stripMargin
437+
)
438+
val rtCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(rtTableName1))
439+
assertResult(rtCatalogTable.properties("type"))("mor")
440+
assertResult(rtCatalogTable.properties("primaryKey"))("id")
441+
assertResult(rtCatalogTable.properties("preCombineField"))("ts")
442+
assertResult(rtCatalogTable.storage.properties("hoodie.query.as.ro.table"))("false")
443+
checkAnswer(s"select id, name, ts from $rtTableName1")(
444+
Seq(1, "a2", 1100)
445+
)
446+
}
447+
}
448+
449+
test("Test Create ro/rt Table In The Wrong Way") {
450+
withTempDir { tmp =>
451+
val parentPath = tmp.getCanonicalPath
452+
453+
// test the case that create rt/rt table on cow table
454+
val tableName1 = generateTableName
455+
spark.sql(
456+
s"""
457+
|create table $tableName1 (
458+
| id int,
459+
| name string,
460+
| ts long
461+
|) using hudi
462+
| tblproperties (
463+
| primaryKey = 'id',
464+
| preCombineField = 'ts',
465+
| type = 'cow'
466+
| )
467+
| location '$parentPath/$tableName1'
468+
""".stripMargin)
469+
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
470+
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
471+
472+
val roTableName1 = tableName1 + "_ro"
473+
checkExceptionContain(
474+
s"""
475+
|create table $roTableName1
476+
|using hudi
477+
|tblproperties (
478+
| 'hoodie.query.as.ro.table' = 'true'
479+
|)
480+
|location '$parentPath/$tableName1'
481+
|""".stripMargin
482+
)("Creating ro/rt table should only apply to a mor table.")
483+
484+
// test the case that create rt/rt table on a nonexistent table
485+
val tableName2 = generateTableName
486+
val rtTableName2 = tableName2 + "_rt"
487+
checkExceptionContain(
488+
s"""
489+
|create table $rtTableName2
490+
|using hudi
491+
|tblproperties (
492+
| 'hoodie.query.as.ro.table' = 'true'
493+
|)
494+
|location '$parentPath/$tableName2'
495+
|""".stripMargin
496+
)("Creating ro/rt table need the existence of the base table.")
497+
498+
// test the case that CTAS
499+
val tableName3 = generateTableName
500+
checkExceptionContain(
501+
s"""
502+
| create table $tableName3 using hudi
503+
| tblproperties(
504+
| primaryKey = 'id',
505+
| preCombineField = 'ts',
506+
| type = 'mor',
507+
| 'hoodie.query.as.ro.table' = 'true'
508+
| )
509+
| location '$parentPath/$tableName3'
510+
| AS
511+
| select 1 as id, 'a1' as name, 1000 as ts
512+
| """.stripMargin
513+
)("Not support CTAS for the ro/rt table")
514+
}
515+
}
516+
385517
test("Test Create Table As Select With Tblproperties For Filter Props") {
386518
Seq("cow", "mor").foreach { tableType =>
387519
val tableName = generateTableName

0 commit comments

Comments
 (0)