Skip to content

Commit 146f2f9

Browse files
committed
Support call procedure for build action
1 parent 4e70dff commit 146f2f9

8 files changed

Lines changed: 881 additions & 29 deletions

File tree

hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
package org.apache.hudi;
2020

21+
import org.apache.hudi.avro.model.HoodieBuildPlan;
2122
import org.apache.hudi.avro.model.HoodieClusteringPlan;
2223
import org.apache.hudi.common.model.HoodieTableType;
2324
import org.apache.hudi.common.table.HoodieTableMetaClient;
2425
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
2526
import org.apache.hudi.common.table.timeline.HoodieInstant;
2627
import org.apache.hudi.common.table.timeline.HoodieTimeline;
28+
import org.apache.hudi.common.util.BuildUtils;
2729
import org.apache.hudi.common.util.ClusteringUtils;
2830
import org.apache.hudi.common.util.CollectionUtils;
2931
import org.apache.hudi.common.util.Option;
@@ -97,4 +99,28 @@ public static Option<HoodieClusteringPlan> getClusteringPlan(FileSystem fs, Stri
9799
return Option.empty();
98100
}
99101
}
102+
103+
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
104+
public static HoodieTimeline allBuildCommits(FileSystem fs, String basePath) {
105+
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
106+
.setConf(fs.getConf())
107+
.setBasePath(basePath)
108+
.setLoadActiveTimelineOnLoad(true)
109+
.build();
110+
return metaClient.getActiveTimeline()
111+
.getTimelineOfActions(CollectionUtils.createSet(HoodieActiveTimeline.BUILD_ACTION));
112+
}
113+
114+
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
115+
public static Option<HoodieBuildPlan> getBuildPlan(FileSystem fs, String basePath, String instantTime) {
116+
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf())
117+
.setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
118+
HoodieInstant hoodieInstant = HoodieTimeline.getBuildRequestedInstant(instantTime);
119+
Option<Pair<HoodieInstant, HoodieBuildPlan>> buildPlan = BuildUtils.getBuildPlan(metaClient, hoodieInstant);
120+
if (buildPlan.isPresent()) {
121+
return Option.of(buildPlan.get().getValue());
122+
} else {
123+
return Option.empty();
124+
}
125+
}
100126
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ object HoodieProcedures {
7979
,(RunCleanProcedure.NAME, RunCleanProcedure.builder)
8080
,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
8181
,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
82+
,(RunBuildProcedure.NAME, RunBuildProcedure.builder)
83+
,(ShowBuildProcedure.NAME, ShowBuildProcedure.builder)
8284
)
8385
}
8486
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.hudi.command.procedures
21+
22+
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
23+
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
24+
import org.apache.hudi.common.util.ValidationUtils.checkArgument
25+
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex}
26+
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, splitPartitionAndDataPredicates}
27+
import org.apache.spark.sql.SparkSession
28+
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
29+
import org.apache.spark.sql.execution.datasources.FileStatusCache
30+
31+
trait ProcedurePredicateHelper extends PredicateHelper {
32+
33+
def prunePartition(
34+
sparkSession: SparkSession,
35+
metaClient: HoodieTableMetaClient,
36+
predicate: String): Seq[String] = {
37+
val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath)
38+
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options,
39+
FileStatusCache.getOrCreate(sparkSession))
40+
41+
// Resolve partition predicates
42+
val schemaResolver = new TableSchemaResolver(metaClient)
43+
val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema)
44+
val condition = resolveExpr(sparkSession, predicate, tableSchema)
45+
val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
46+
val (partitionPredicates, dataPredicates) = splitPartitionAndDataPredicates(
47+
sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns)
48+
checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed")
49+
50+
// Get all partitions and prune partition by predicates
51+
val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates)
52+
prunedPartitions.map(partitionPath => partitionPath.getPath)
53+
}
54+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.hudi.command.procedures
21+
22+
import org.apache.hudi.HoodieCLIUtils
23+
import org.apache.hudi.common.config.HoodieBuildConfig
24+
import org.apache.hudi.common.table.HoodieTableMetaClient
25+
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
26+
import org.apache.hudi.common.util.{BuildUtils, HoodieTimer, Option => HOption}
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.sql.Row
29+
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
30+
31+
import java.util.function.Supplier
32+
import scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorConverter}
33+
34+
class RunBuildProcedure extends BaseProcedure
35+
with ProcedureBuilder
36+
with ProcedurePredicateHelper
37+
with Logging {
38+
private val PARAMETERS = Array[ProcedureParameter](
39+
ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
40+
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
41+
ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
42+
ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
43+
)
44+
45+
private val OUTPUT_TYPE = new StructType(Array[StructField](
46+
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
47+
StructField("task_num", DataTypes.IntegerType, nullable = true, Metadata.empty),
48+
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
49+
StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
50+
))
51+
52+
/**
53+
* Returns the input parameters of this procedure.
54+
*/
55+
override def parameters: Array[ProcedureParameter] = PARAMETERS
56+
57+
/**
58+
* Returns the type of rows produced by this procedure.
59+
*/
60+
override def outputType: StructType = OUTPUT_TYPE
61+
62+
/**
63+
* Executes this procedure.
64+
* <p>
65+
* Spark will align the provided arguments according to the input parameters
66+
* defined in {@link # parameters ( )} either by position or by name before execution.
67+
* <p>
68+
* Implementations may provide a summary of execution by returning one or many rows
69+
* as a result. The schema of output rows must match the defined output type
70+
* in {@link # outputType ( )}.
71+
*
72+
* @param args input arguments
73+
* @return the result of executing this procedure with the given arguments
74+
*/
75+
override def call(args: ProcedureArgs): Seq[Row] = {
76+
super.checkArgs(PARAMETERS, args)
77+
78+
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
79+
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
80+
val predicate = getArgValueOrDefault(args, PARAMETERS(2))
81+
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean]
82+
83+
val basePath: String = getBasePath(tableName, tablePath)
84+
val metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build()
85+
var conf: Map[String, String] = Map.empty
86+
predicate match {
87+
case Some(p) =>
88+
val prunedPartitions = prunePartition(spark, metaClient, p.asInstanceOf[String])
89+
conf = conf ++ Map(
90+
HoodieBuildConfig.PARTITION_SELECTED.key() -> prunedPartitions.mkString(",")
91+
)
92+
logInfo(s"Partition predicates: ${p}, partition selected: ${prunedPartitions}")
93+
case _ =>
94+
logInfo("No partition predicates")
95+
}
96+
97+
var pendingBuild = BuildUtils.getAllPendingBuildPlans(metaClient)
98+
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
99+
logInfo(s"Pending build instants: ${pendingBuild.mkString(",")}")
100+
101+
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
102+
val instantTime = HoodieActiveTimeline.createNewInstantTime()
103+
if (client.scheduleBuildAtInstant(instantTime, HOption.empty())) {
104+
pendingBuild ++= Seq(instantTime)
105+
}
106+
logInfo(s"Build instants to run: ${pendingBuild.mkString(",")}")
107+
108+
val timer = new HoodieTimer
109+
timer.startTimer()
110+
pendingBuild.foreach(instant => {
111+
timer.startTimer()
112+
client.build(instant, true)
113+
logInfo(s"Finish build for instant: $instant, time cost: ${timer.endTimer()}ms")
114+
})
115+
client.close()
116+
logInfo(s"Finish build all instants: ${pendingBuild.mkString(",")}, time cost: ${timer.endTimer()}ms")
117+
118+
val buildInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
119+
.filter(p => p.getAction == HoodieTimeline.BUILD_ACTION && pendingBuild.contains(p.getTimestamp))
120+
.toSeq
121+
.sortBy(f => f.getTimestamp)
122+
.reverse
123+
124+
val buildPlans = buildInstants.map(instant =>
125+
BuildUtils.getBuildPlan(metaClient, instant)
126+
)
127+
128+
if (showInvolvedPartitions) {
129+
buildPlans.map { p =>
130+
Row(p.get().getLeft.getTimestamp, p.get().getRight.getTasks.size(),
131+
p.get().getLeft.getState.name(),
132+
BuildUtils.extractPartitions(p.get().getRight.getTasks).asScala.mkString(","))
133+
}
134+
} else {
135+
buildPlans.map { p =>
136+
Row(p.get().getLeft.getTimestamp, p.get().getRight.getTasks.size(), p.get().getLeft.getState.name(), "*")
137+
}
138+
}
139+
}
140+
141+
override def build: Procedure = new RunBuildProcedure
142+
}
143+
144+
145+
object RunBuildProcedure {
146+
val NAME = "run_build"
147+
148+
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
149+
override def get() = new RunBuildProcedure
150+
}
151+
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,23 @@
1717

1818
package org.apache.spark.sql.hudi.command.procedures
1919

20-
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
20+
import org.apache.hudi.HoodieCLIUtils
2121
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
2222
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
23-
import org.apache.hudi.common.util.ValidationUtils.checkArgument
2423
import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
2524
import org.apache.hudi.config.HoodieClusteringConfig
2625
import org.apache.hudi.exception.HoodieClusteringException
27-
import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
2826
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, splitPartitionAndDataPredicates}
3027
import org.apache.spark.sql.Row
31-
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
32-
import org.apache.spark.sql.execution.datasources.FileStatusCache
3328
import org.apache.spark.sql.types._
3429

3530
import java.util.function.Supplier
31+
3632
import scala.collection.JavaConverters._
3733

3834
class RunClusteringProcedure extends BaseProcedure
3935
with ProcedureBuilder
40-
with PredicateHelper
36+
with ProcedurePredicateHelper
4137
with Logging {
4238

4339
/**
@@ -77,10 +73,10 @@ class RunClusteringProcedure extends BaseProcedure
7773
var conf: Map[String, String] = Map.empty
7874
predicate match {
7975
case Some(p) =>
80-
val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
76+
val prunedPartitions = prunePartition(spark, metaClient, p.asInstanceOf[String])
8177
conf = conf ++ Map(
8278
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> "SELECTED_PARTITIONS",
83-
HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions
79+
HoodieClusteringConfig.PARTITION_SELECTED.key() -> prunedPartitions.mkString(",")
8480
)
8581
logInfo(s"Partition predicates: $p, partition selected: $prunedPartitions")
8682
case _ =>
@@ -113,6 +109,7 @@ class RunClusteringProcedure extends BaseProcedure
113109

114110
val startTs = System.currentTimeMillis()
115111
pendingClustering.foreach(client.cluster(_, true))
112+
client.close()
116113
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
117114
s" time cost: ${System.currentTimeMillis() - startTs}ms.")
118115

@@ -140,25 +137,6 @@ class RunClusteringProcedure extends BaseProcedure
140137

141138
override def build: Procedure = new RunClusteringProcedure()
142139

143-
def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): String = {
144-
val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" -> metaClient.getBasePath)
145-
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, options,
146-
FileStatusCache.getOrCreate(sparkSession))
147-
148-
// Resolve partition predicates
149-
val schemaResolver = new TableSchemaResolver(metaClient)
150-
val tableSchema = AvroConversionUtils.convertAvroSchemaToStructType(schemaResolver.getTableAvroSchema)
151-
val condition = resolveExpr(sparkSession, predicate, tableSchema)
152-
val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
153-
val (partitionPredicates, dataPredicates) = splitPartitionAndDataPredicates(
154-
sparkSession, splitConjunctivePredicates(condition).toArray, partitionColumns)
155-
checkArgument(dataPredicates.isEmpty, "Only partition predicates are allowed")
156-
157-
// Get all partitions and prune partition by predicates
158-
val prunedPartitions = hoodieFileIndex.getPartitionPaths(partitionPredicates)
159-
prunedPartitions.map(partitionPath => partitionPath.getPath).toSet.mkString(",")
160-
}
161-
162140
private def validateOrderColumns(orderColumns: String, metaClient: HoodieTableMetaClient): Unit = {
163141
if (orderColumns == null) {
164142
throw new HoodieClusteringException("Order columns is null")
@@ -173,7 +151,6 @@ class RunClusteringProcedure extends BaseProcedure
173151
}
174152
})
175153
}
176-
177154
}
178155

179156
object RunClusteringProcedure {

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
119119
}
120120
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
121121
}
122+
client.close()
122123

123124
val compactionInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
124125
.filter(instant => willCompactionInstants.contains(instant.getTimestamp))

0 commit comments

Comments
 (0)