Skip to content

Commit 345dd81

Browse files
authored
[HUDI-4520] Support qualified table 'db.table' in call procedures (#6274)
1 parent 7307ff3 commit 345dd81

29 files changed

+77
-53
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ package org.apache.hudi
2222
import org.apache.hudi.avro.model.HoodieClusteringGroup
2323
import org.apache.hudi.client.SparkRDDWriteClient
2424
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
25+
import org.apache.spark.SparkException
2526
import org.apache.spark.api.java.JavaSparkContext
2627
import org.apache.spark.sql.SparkSession
28+
import org.apache.spark.sql.catalyst.TableIdentifier
29+
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
2730
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
2831

2932
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}
@@ -57,4 +60,16 @@ object HoodieCLIUtils {
5760

5861
partitionPaths.sorted.mkString(",")
5962
}
63+
64+
def getHoodieCatalogTable(sparkSession: SparkSession, table: String): HoodieCatalogTable = {
65+
val seq: Seq[String] = table.split('.')
66+
seq match {
67+
case Seq(tableName) =>
68+
HoodieCatalogTable(sparkSession, TableIdentifier(tableName))
69+
case Seq(database, tableName) =>
70+
HoodieCatalogTable(sparkSession, TableIdentifier(tableName, Some(database)))
71+
case _ =>
72+
throw new SparkException(s"Unsupported identifier $table")
73+
}
74+
}
6075
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.client.SparkRDDWriteClient
2122
import org.apache.hudi.client.common.HoodieSparkEngineContext
2223
import org.apache.hudi.common.model.HoodieRecordPayload
2324
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
2425
import org.apache.hudi.exception.HoodieClusteringException
2526
import org.apache.hudi.index.HoodieIndex.IndexType
27+
import org.apache.spark.SparkException
2628
import org.apache.spark.api.java.JavaSparkContext
2729
import org.apache.spark.sql.SparkSession
2830
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -112,7 +114,7 @@ abstract class BaseProcedure extends Procedure {
112114

113115
protected def getBasePath(tableName: Option[Any], tablePath: Option[Any] = Option.empty): String = {
114116
tableName.map(
115-
t => HoodieCatalogTable(sparkSession, new TableIdentifier(t.asInstanceOf[String])).tableLocation)
117+
t => HoodieCLIUtils.getHoodieCatalogTable(sparkSession, t.asInstanceOf[String]).tableLocation)
116118
.getOrElse(
117119
tablePath.map(p => p.asInstanceOf[String]).getOrElse(
118120
throw new HoodieClusteringException("Table name or table path must be given one"))

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.table.HoodieTableMetaClient
2122
import org.apache.hudi.common.table.timeline.HoodieTimeline
2223
import org.apache.spark.sql.Row
@@ -47,7 +48,7 @@ class CommitsCompareProcedure() extends BaseProcedure with ProcedureBuilder {
4748
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
4849
val path = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
4950

50-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
51+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
5152
val basePath = hoodieCatalogTable.tableLocation
5253
val source = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
5354
val target = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(path).build

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
2020
import org.apache.avro.generic.GenericRecord
2121
import org.apache.avro.specific.SpecificData
2222
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
23+
import org.apache.hudi.HoodieCLIUtils
2324
import org.apache.hudi.avro.HoodieAvroUtils
2425
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
2526
import org.apache.hudi.common.fs.FSUtils
@@ -72,7 +73,7 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L
7273
val actions: String = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
7374
val desc = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]
7475

75-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
76+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
7677
val basePath = hoodieCatalogTable.tableLocation
7778
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
7879
val archivePath = new Path(basePath + "/.hoodie/.commits_.archive*")

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.table.HoodieTableMetaClient
2122
import org.apache.hudi.common.table.timeline.HoodieTimeline
2223
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
@@ -49,7 +50,7 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder
4950
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
5051
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
5152

52-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
53+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
5354
val basePath = hoodieCatalogTable.tableLocation
5455
val client = createHoodieClient(jsc, basePath)
5556
client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.model.HoodieCommitMetadata
2122
import org.apache.hudi.common.table.HoodieTableMetaClient
2223
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieDefaultTimeline, HoodieInstant}
@@ -82,7 +83,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
8283
var startTs = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
8384
var endTs = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
8485

85-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
86+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
8687
val basePath = hoodieCatalogTable.tableLocation
8788
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
8889

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata, HoodieWriteStat}
2122
import org.apache.hudi.common.table.HoodieTableMetaClient
2223
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -61,7 +62,7 @@ class ShowCommitFilesProcedure() extends BaseProcedure with ProcedureBuilder {
6162
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
6263
val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
6364

64-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
65+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
6566
val basePath = hoodieCatalogTable.tableLocation
6667
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
6768
val activeTimeline = metaClient.getActiveTimeline

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata, HoodieWriteStat}
2122
import org.apache.hudi.common.table.HoodieTableMetaClient
2223
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -60,7 +61,7 @@ class ShowCommitPartitionsProcedure() extends BaseProcedure with ProcedureBuilde
6061
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
6162
val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
6263

63-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
64+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
6465
val basePath = hoodieCatalogTable.tableLocation
6566
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
6667
val activeTimeline = metaClient.getActiveTimeline

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
2122
import org.apache.hudi.common.table.HoodieTableMetaClient
2223
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -55,7 +56,7 @@ class ShowCommitWriteStatsProcedure() extends BaseProcedure with ProcedureBuilde
5556
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
5657
val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
5758

58-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
59+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
5960
val basePath = hoodieCatalogTable.tableLocation
6061
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
6162
val activeTimeline = metaClient.getActiveTimeline

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20+
import org.apache.hudi.HoodieCLIUtils
2021
import org.apache.hudi.common.model.HoodieCommitMetadata
2122
import org.apache.hudi.common.table.HoodieTableMetaClient
2223
import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, HoodieInstant}
@@ -78,7 +79,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
7879
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
7980
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
8081

81-
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
82+
val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
8283
val basePath = hoodieCatalogTable.tableLocation
8384
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
8485

0 commit comments

Comments
 (0)