Skip to content

Commit 54808ec

Browse files
XuQianJin-Starsstayrascal
authored andcommitted
[HUDI-3501] Support savepoints command based on Call Produce Command (apache#5025)
1 parent ff16cdc commit 54808ec

11 files changed

Lines changed: 525 additions & 24 deletions

File tree

hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,11 @@
2323
import org.apache.hudi.cli.HoodieTableHeaderFields;
2424
import org.apache.hudi.cli.utils.InputStreamConsumer;
2525
import org.apache.hudi.cli.utils.SparkUtil;
26-
import org.apache.hudi.client.SparkRDDWriteClient;
27-
import org.apache.hudi.client.common.HoodieSparkEngineContext;
28-
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
2926
import org.apache.hudi.common.table.HoodieTableMetaClient;
3027
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
3128
import org.apache.hudi.common.table.timeline.HoodieInstant;
3229
import org.apache.hudi.common.table.timeline.HoodieTimeline;
33-
import org.apache.hudi.config.HoodieCompactionConfig;
34-
import org.apache.hudi.config.HoodieIndexConfig;
35-
import org.apache.hudi.config.HoodieWriteConfig;
3630
import org.apache.hudi.exception.HoodieException;
37-
import org.apache.hudi.index.HoodieIndex;
38-
39-
import org.apache.spark.api.java.JavaSparkContext;
4031
import org.apache.spark.launcher.SparkLauncher;
4132
import org.springframework.shell.core.CommandMarker;
4233
import org.springframework.shell.core.annotation.CliCommand;
@@ -162,13 +153,4 @@ public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a save
162153
}
163154
return String.format("Savepoint \"%s\" deleted.", instantTime);
164155
}
165-
166-
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
167-
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
168-
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
169-
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
170-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
171-
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
172-
}
173-
174156
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ abstract class BaseProcedure extends Procedure {
111111
}
112112
}
113113

114-
protected def getBasePath(tableName: Option[Any], tablePath: Option[Any]): String = {
114+
protected def getBasePath(tableName: Option[Any], tablePath: Option[Any] = Option.empty): String = {
115115
tableName.map(
116116
t => HoodieCatalogTable(sparkSession, new TableIdentifier(t.asInstanceOf[String])).tableLocation)
117117
.getOrElse(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi.command.procedures
19+
20+
import org.apache.hudi.common.table.HoodieTableMetaClient
21+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
22+
import org.apache.hudi.exception.{HoodieException, HoodieSavepointException}
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.sql.Row
25+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
26+
27+
import java.util.function.Supplier
28+
29+
class CreateSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
30+
private val PARAMETERS = Array[ProcedureParameter](
31+
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
32+
ProcedureParameter.required(1, "commit_Time", DataTypes.StringType, None),
33+
ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
34+
ProcedureParameter.optional(3, "comments", DataTypes.StringType, "")
35+
)
36+
37+
private val OUTPUT_TYPE = new StructType(Array[StructField](
38+
StructField("create_savepoint_result", DataTypes.BooleanType, nullable = true, Metadata.empty))
39+
)
40+
41+
def parameters: Array[ProcedureParameter] = PARAMETERS
42+
43+
def outputType: StructType = OUTPUT_TYPE
44+
45+
override def call(args: ProcedureArgs): Seq[Row] = {
46+
super.checkArgs(PARAMETERS, args)
47+
48+
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
49+
val commitTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
50+
val user = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
51+
val comments = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
52+
53+
val basePath: String = getBasePath(tableName)
54+
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
55+
56+
val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline
57+
if (!activeTimeline.getCommitsTimeline.filterCompletedInstants.containsInstant(commitTime)) {
58+
throw new HoodieException("Commit " + commitTime + " not found in Commits " + activeTimeline)
59+
}
60+
61+
val client = createHoodieClient(jsc, basePath)
62+
var result = false
63+
64+
try {
65+
client.savepoint(commitTime, user, comments)
66+
logInfo(s"The commit $commitTime has been savepointed.")
67+
result = true
68+
} catch {
69+
case _: HoodieSavepointException =>
70+
logWarning(s"Failed: Could not create savepoint $commitTime.")
71+
}
72+
73+
Seq(Row(result))
74+
}
75+
76+
override def build: Procedure = new CreateSavepointsProcedure()
77+
}
78+
79+
object CreateSavepointsProcedure {
80+
val NAME: String = "create_savepoints"
81+
82+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
83+
override def get(): CreateSavepointsProcedure = new CreateSavepointsProcedure()
84+
}
85+
}
86+
87+
88+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi.command.procedures
19+
20+
import org.apache.hudi.common.table.HoodieTableMetaClient
21+
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
22+
import org.apache.hudi.exception.{HoodieException, HoodieSavepointException}
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.sql.Row
25+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
26+
27+
import java.util.function.Supplier
28+
29+
class DeleteSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
30+
private val PARAMETERS = Array[ProcedureParameter](
31+
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
32+
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
33+
)
34+
35+
private val OUTPUT_TYPE = new StructType(Array[StructField](
36+
StructField("delete_savepoint_result", DataTypes.BooleanType, nullable = true, Metadata.empty))
37+
)
38+
39+
def parameters: Array[ProcedureParameter] = PARAMETERS
40+
41+
def outputType: StructType = OUTPUT_TYPE
42+
43+
override def call(args: ProcedureArgs): Seq[Row] = {
44+
super.checkArgs(PARAMETERS, args)
45+
46+
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
47+
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
48+
49+
val basePath: String = getBasePath(tableName)
50+
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
51+
52+
val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants
53+
if (completedInstants.empty) throw new HoodieException("There are no completed savepoint to run delete")
54+
val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, instantTime)
55+
56+
if (!completedInstants.containsInstant(savePoint)) {
57+
throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants)
58+
}
59+
60+
val client = createHoodieClient(jsc, basePath)
61+
var result = false
62+
63+
try {
64+
client.deleteSavepoint(instantTime)
65+
logInfo(s"The commit $instantTime has been deleted savepoint.")
66+
result = true
67+
} catch {
68+
case _: HoodieSavepointException =>
69+
logWarning(s"Failed: Could not delete savepoint $instantTime.")
70+
}
71+
72+
Seq(Row(result))
73+
}
74+
75+
override def build: Procedure = new DeleteSavepointsProcedure()
76+
}
77+
78+
object DeleteSavepointsProcedure {
79+
val NAME: String = "delete_savepoints"
80+
81+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
82+
override def get(): DeleteSavepointsProcedure = new DeleteSavepointsProcedure()
83+
}
84+
}
85+
86+
87+
88+

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@ object HoodieProcedures {
3333

3434
private def initProcedureBuilders: util.Map[String, Supplier[ProcedureBuilder]] = {
3535
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
36-
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
37-
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
36+
mapBuilder.put(CreateSavepointsProcedure.NAME, CreateSavepointsProcedure.builder)
37+
mapBuilder.put(DeleteSavepointsProcedure.NAME, DeleteSavepointsProcedure.builder)
38+
mapBuilder.put(RollbackSavepointsProcedure.NAME, RollbackSavepointsProcedure.builder)
3839
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
3940
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
4041
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
42+
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
43+
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
44+
mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
4145
mapBuilder.build
4246
}
4347
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi.command.procedures
19+
20+
import org.apache.hudi.common.table.HoodieTableMetaClient
21+
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
22+
import org.apache.hudi.exception.{HoodieException, HoodieSavepointException}
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.sql.Row
25+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
26+
27+
import java.util.function.Supplier
28+
29+
class RollbackSavepointsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
30+
private val PARAMETERS = Array[ProcedureParameter](
31+
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
32+
ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
33+
)
34+
35+
private val OUTPUT_TYPE = new StructType(Array[StructField](
36+
StructField("rollback_savepoint_result", DataTypes.BooleanType, nullable = true, Metadata.empty))
37+
)
38+
39+
def parameters: Array[ProcedureParameter] = PARAMETERS
40+
41+
def outputType: StructType = OUTPUT_TYPE
42+
43+
override def call(args: ProcedureArgs): Seq[Row] = {
44+
super.checkArgs(PARAMETERS, args)
45+
46+
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
47+
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
48+
49+
val basePath: String = getBasePath(tableName)
50+
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
51+
52+
val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants
53+
if (completedInstants.empty) throw new HoodieException("There are no completed savepoint to run delete")
54+
val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, instantTime)
55+
56+
if (!completedInstants.containsInstant(savePoint)) {
57+
throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants)
58+
}
59+
60+
val client = createHoodieClient(jsc, basePath)
61+
var result = false
62+
63+
try {
64+
client.restoreToSavepoint(instantTime)
65+
logInfo("The commit $instantTime rolled back.")
66+
result = true
67+
} catch {
68+
case _: HoodieSavepointException =>
69+
logWarning(s"The commit $instantTime failed to roll back.")
70+
}
71+
72+
Seq(Row(result))
73+
}
74+
75+
override def build: Procedure = new RollbackSavepointsProcedure()
76+
}
77+
78+
object RollbackSavepointsProcedure {
79+
val NAME: String = "rollback_savepoints"
80+
81+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
82+
override def get(): RollbackSavepointsProcedure = new RollbackSavepointsProcedure()
83+
}
84+
}
85+
86+
87+
88+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi.command.procedures
19+
20+
import org.apache.hudi.common.table.HoodieTableMetaClient
21+
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
22+
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
24+
25+
import java.util
26+
import java.util.function.Supplier
27+
import java.util.stream.Collectors
28+
29+
class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
30+
private val PARAMETERS = Array[ProcedureParameter](
31+
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
32+
)
33+
34+
private val OUTPUT_TYPE = new StructType(Array[StructField](
35+
StructField("savepoint_time", DataTypes.StringType, nullable = true, Metadata.empty))
36+
)
37+
38+
def parameters: Array[ProcedureParameter] = PARAMETERS
39+
40+
def outputType: StructType = OUTPUT_TYPE
41+
42+
override def call(args: ProcedureArgs): Seq[Row] = {
43+
super.checkArgs(PARAMETERS, args)
44+
45+
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
46+
47+
val basePath: String = getBasePath(tableName)
48+
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
49+
50+
val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline
51+
val timeline: HoodieTimeline = activeTimeline.getSavePointTimeline.filterCompletedInstants
52+
val commits: util.List[HoodieInstant] = timeline.getReverseOrderedInstants.collect(Collectors.toList[HoodieInstant])
53+
54+
if (commits.isEmpty) Seq.empty[Row] else {
55+
commits.toArray.map(instant => instant.asInstanceOf[HoodieInstant].getTimestamp).map(p => Row(p)).toSeq
56+
}
57+
}
58+
59+
override def build: Procedure = new ShowSavepointsProcedure()
60+
}
61+
62+
object ShowSavepointsProcedure {
63+
val NAME: String = "show_savepoints"
64+
65+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
66+
override def get(): ShowSavepointsProcedure = new ShowSavepointsProcedure()
67+
}
68+
}
69+

0 commit comments

Comments
 (0)