diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java index d3f858439ce69..8b6d47b9a9aaf 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java @@ -23,20 +23,11 @@ import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; -import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.index.HoodieIndex; - -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; @@ -162,13 +153,4 @@ public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a save } return String.format("Savepoint \"%s\" deleted.", instantTime); } - - private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); - return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); - } - } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala index b8777eddb2392..876bb503ee196 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala @@ -111,7 +111,7 @@ abstract class BaseProcedure extends Procedure { } } - protected def getBasePath(tableName: Option[Any], tablePath: Option[Any]): String = { + protected def getBasePath(tableName: Option[Any], tablePath: Option[Any] = Option.empty): String = { tableName.map( t => HoodieCatalogTable(sparkSession, new TableIdentifier(t.asInstanceOf[String])).tableLocation) .getOrElse( diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointsProcedure.scala new file mode 100644 index 0000000000000..ed4905ed047a7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointsProcedure.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.exception.{HoodieException, HoodieSavepointException} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "commit_Time", DataTypes.StringType, None), + ProcedureParameter.optional(2, "user", DataTypes.StringType, ""), + ProcedureParameter.optional(3, "comments", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("create_savepoint_result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val commitTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val user = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val comments = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String] + + val basePath: String = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline + if (!activeTimeline.getCommitsTimeline.filterCompletedInstants.containsInstant(commitTime)) { + throw new HoodieException("Commit " + commitTime + " not found in Commits " + activeTimeline) + } + + val client = createHoodieClient(jsc, basePath) + var result = false + + try { + client.savepoint(commitTime, user, comments) + logInfo(s"The commit $commitTime has been savepointed.") + result = true + } catch { + case _: HoodieSavepointException => + logWarning(s"Failed: Could not create savepoint $commitTime.") + } + + Seq(Row(result)) + } + + override def build: Procedure = new CreateSavepointsProcedure() +} + +object CreateSavepointsProcedure { + val NAME: String = "create_savepoints" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): CreateSavepointsProcedure = new CreateSavepointsProcedure() + } +} + + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointsProcedure.scala new file mode 100644 index 0000000000000..11416ac22c56f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointsProcedure.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.exception.{HoodieException, HoodieSavepointException} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("delete_savepoint_result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + + val basePath: String = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants + if (completedInstants.empty) throw new HoodieException("There are no completed savepoint to run delete") + val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, instantTime) + + if (!completedInstants.containsInstant(savePoint)) { + throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants) + } + + val client = createHoodieClient(jsc, basePath) + var result = false + + try { + client.deleteSavepoint(instantTime) + logInfo(s"The commit $instantTime has been deleted savepoint.") + result = true + } catch { + case _: HoodieSavepointException => + logWarning(s"Failed: Could not delete savepoint $instantTime.") + } + + Seq(Row(result)) + } + + override def build: Procedure = new DeleteSavepointsProcedure() +} + +object DeleteSavepointsProcedure { + val NAME: String = "delete_savepoints" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): DeleteSavepointsProcedure = new DeleteSavepointsProcedure() + } +} + + + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 2993bcae7e2ff..9c05773531322 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -33,11 +33,15 @@ object HoodieProcedures { private def initProcedureBuilders: util.Map[String, Supplier[ProcedureBuilder]] = { val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder() - mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder) - mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder) + mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder) + mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder) + mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder) mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder) mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder) mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder) + mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder) + mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder) + mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackSavepointsProcedure.scala new file mode 100644 index 0000000000000..a11e614176f57 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackSavepointsProcedure.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.exception.{HoodieException, HoodieSavepointException} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("rollback_savepoint_result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + + val basePath: String = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants + if (completedInstants.empty) throw new HoodieException("There are no completed savepoint to run delete") + val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, instantTime) + + if (!completedInstants.containsInstant(savePoint)) { + throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants) + } + + val client = createHoodieClient(jsc, basePath) + var result = false + + try { + client.restoreToSavepoint(instantTime) + logInfo("The commit $instantTime rolled back.") + result = true + } catch { + case _: HoodieSavepointException => + logWarning(s"The commit $instantTime failed to roll back.") + } + + Seq(Row(result)) + } + + override def build: Procedure = new RollbackSavepointsProcedure() +} + +object RollbackSavepointsProcedure { + val NAME: String = "rollback_savepoints" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): RollbackSavepointsProcedure = new RollbackSavepointsProcedure() + } +} + + + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala new file mode 100644 index 0000000000000..e866e21555baf --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline} +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import java.util.stream.Collectors + +class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("savepoint_time", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + + val basePath: String = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline + val timeline: HoodieTimeline = activeTimeline.getSavePointTimeline.filterCompletedInstants + val commits: util.List[HoodieInstant] = timeline.getReverseOrderedInstants.collect(Collectors.toList[HoodieInstant]) + + if (commits.isEmpty) Seq.empty[Row] else { + commits.toArray.map(instant => instant.asInstanceOf[HoodieInstant].getTimestamp).map(p => Row(p)).toSeq + } + } + + override def build: Procedure = new ShowSavepointsProcedure() +} + +object ShowSavepointsProcedure { + val NAME: String = "show_savepoints" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ShowSavepointsProcedure = new ShowSavepointsProcedure() + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallCommandParser.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallCommandParser.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala index 9d1c02ad99faa..e26e6617f1871 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallCommandParser.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallCommandParser.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.spark.sql.hudi +package org.apache.spark.sql.hudi.procedure import com.google.common.collect.ImmutableList import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{CallCommand, NamedArgument, PositionalArgument} +import org.apache.spark.sql.hudi.TestHoodieSqlBase import org.apache.spark.sql.types.{DataType, DataTypes} import java.math.BigDecimal diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala similarity index 97% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala index eb2c614df201b..bdf4cbe7ba0ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala @@ -15,7 +15,9 @@ * limitations under the License. */ -package org.apache.spark.sql.hudi +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.TestHoodieSqlBase class TestCallProcedure extends TestHoodieSqlBase { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunClusteringProcedure.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunClusteringProcedure.scala index 5ad0b2583402e..068cd65387057 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunClusteringProcedure.scala @@ -17,12 +17,13 @@ * under the License. */ -package org.apache.spark.sql.hudi +package org.apache.spark.sql.hudi.procedure import org.apache.hadoop.fs.Path import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} +import org.apache.spark.sql.hudi.TestHoodieSqlBase import scala.collection.JavaConverters.asScalaIteratorConverter diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala new file mode 100644 index 0000000000000..7d60ca018d32a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.TestHoodieSqlBase + +class TestSavepointsProcedure extends TestHoodieSqlBase { + + test("Test Call create_savepoints Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + + val commits = spark.sql(s"""call show_commits(table => '$tableName')""").limit(1).collect() + assertResult(1) { + commits.length + } + + val commitTime = commits.apply(0).getString(0) + checkAnswer(s"""call create_savepoints('$tableName', '$commitTime', 'admin', '1')""")(Seq(true)) + } + } + + test("Test Call show_savepoints Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + + val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect() + assertResult(3) { + commits.length + } + + val commitTime = commits.apply(1).getString(0) + checkAnswer(s"""call create_savepoints('$tableName', '$commitTime')""")(Seq(true)) + + // show savepoints + val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() + assertResult(1) { + savepoints.length + } + } + } + + test("Test Call delete_savepoints Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + + val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect() + assertResult(3) { + commits.length + } + + // create 3 savepoints + commits.foreach(r => { + checkAnswer(s"""call create_savepoints('$tableName', '${r.getString(0)}')""")(Seq(true)) + }) + + // delete savepoints + checkAnswer(s"""call delete_savepoints('$tableName', '${commits.apply(1).getString(0)}')""")(Seq(true)) + + // show savepoints with only 2 + val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() + assertResult(2) { + savepoints.length + } + } + } + + test("Test Call rollback_savepoints Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect() + assertResult(2) { + commits.length + } + + // create 2 savepoints + commits.foreach(r => { + checkAnswer(s"""call create_savepoints('$tableName', '${r.getString(0)}')""")(Seq(true)) + }) + + // rollback savepoints + checkAnswer(s"""call rollback_savepoints('$tableName', '${commits.apply(0).getString(0)}')""")(Seq(true)) + } + } +}