Skip to content

Commit 7101ece

Browse files
committed
delete deprecated clustering plan strategy and add clustering ITTest
1 parent 2ccf74d commit 7101ece

File tree

11 files changed

+115
-155
lines changed

11 files changed

+115
-155
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
7070
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
7171
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
7272
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
73-
String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
74-
String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
75-
String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
7673
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
7774
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
7875

@@ -85,14 +82,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
8582
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
8683
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
8784
return sparkSizeBasedClassName;
88-
} else if (flinkRecentDaysClassName.equals(className)) {
89-
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
90-
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
91-
return flinkSizeBasedClassName;
92-
} else if (flinkSelectedPartitionsClassName.equals(className)) {
93-
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
94-
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
95-
return flinkSizeBasedClassName;
9685
} else if (javaSelectedPartitionClassName.equals(className)) {
9786
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
9887
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
@@ -173,7 +162,7 @@ protected Map<String, Double> buildMetrics(List<FileSlice> fileSlices) {
173162
return metrics;
174163
}
175164

176-
protected HoodieTable<T,I,K, O> getHoodieTable() {
165+
protected HoodieTable<T, I, K, O> getHoodieTable() {
177166
return this.hoodieTable;
178167
}
179168

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.hudi.configuration;
2020

21-
import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
21+
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
2222
import org.apache.hudi.common.config.ConfigClassProperty;
2323
import org.apache.hudi.common.config.ConfigGroups;
2424
import org.apache.hudi.common.config.HoodieConfig;
@@ -45,6 +45,11 @@
4545
import java.util.Map;
4646
import java.util.Set;
4747

48+
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
49+
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
50+
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION;
51+
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST;
52+
4853
/**
4954
* Hoodie Flink config options.
5055
*
@@ -621,11 +626,22 @@ private FlinkOptions() {
621626
public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
622627
.key("clustering.plan.strategy.class")
623628
.stringType()
624-
.defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
629+
.defaultValue(FlinkSizeBasedClusteringPlanStrategy.class.getName())
625630
.withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
626631
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
627632
+ CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
628633

634+
public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions
635+
.key("clustering.plan.partition.filter.mode")
636+
.stringType()
637+
.defaultValue("NONE")
638+
.withDescription("Partition filter mode used in the creation of clustering plan. Available values are - "
639+
+ "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
640+
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
641+
+ PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
642+
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
643+
+ PARTITION_FILTER_END_PARTITION.key() + "'].");
644+
629645
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
630646
.key("clustering.plan.strategy.target.file.max.bytes")
631647
.intType()

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
115115
*/
116116
private final boolean asyncClustering;
117117

118+
/**
119+
* Whether the clustering sort is enabled.
120+
*/
121+
private final boolean sortClusteringEnabled;
122+
118123
/**
119124
* Executor service to execute the clustering task.
120125
*/
@@ -124,6 +129,7 @@ public ClusteringOperator(Configuration conf, RowType rowType) {
124129
this.conf = conf;
125130
this.rowType = rowType;
126131
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
132+
this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
127133
}
128134

129135
@Override
@@ -142,7 +148,7 @@ public void open() throws Exception {
142148
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
143149
this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
144150

145-
if (OptionsResolver.sortClusteringEnabled(conf)) {
151+
if (this.sortClusteringEnabled) {
146152
initSorter();
147153
}
148154

@@ -208,7 +214,7 @@ private void doClustering(String instantTime, ClusteringPlanEvent event) throws
208214

209215
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
210216

211-
if (!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS))) {
217+
if (this.sortClusteringEnabled) {
212218
while (iterator.hasNext()) {
213219
RowData rowData = iterator.next();
214220
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,9 @@ private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
102102

103103
// generate clustering plan
104104
// should support configurable commit metadata
105+
HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
105106
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
106-
table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime));
107+
table.getMetaClient(), clusteringInstant);
107108

108109
if (!clusteringPlanOption.isPresent()) {
109110
// do nothing.
@@ -118,13 +119,12 @@ private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
118119
// do nothing.
119120
LOG.info("Empty clustering plan for instant " + clusteringInstantTime);
120121
} else {
121-
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
122122
// Mark instant as clustering inflight
123-
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
123+
table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());
124124
table.getMetaClient().reloadActiveTimeline();
125125

126126
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
127-
LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files");
127+
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());
128128
output.collect(new StreamRecord<>(
129129
new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())
130130
));

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void open(Configuration parameters) throws Exception {
7373
@Override
7474
public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception {
7575
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
76-
LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files");
76+
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());
7777
sourceContext.collect(new ClusteringPlanEvent(this.clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()));
7878
}
7979
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.sink.clustering;
2020

21+
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
2122
import org.apache.hudi.common.table.HoodieTableConfig;
2223
import org.apache.hudi.configuration.FlinkOptions;
2324
import org.apache.hudi.util.StreamerUtil;
@@ -78,7 +79,10 @@ public class FlinkClusteringConfig extends Configuration {
7879
public Boolean cleanAsyncEnable = false;
7980

8081
@Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
81-
public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
82+
public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
83+
84+
@Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan", required = false)
85+
public String planPartitionFilterMode = "NONE";
8286

8387
@Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
8488
public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
@@ -129,6 +133,7 @@ public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
129133
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
130134
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
131135
conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
136+
conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode);
132137
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
133138
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
134139
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.hudi.client.HoodieFlinkWriteClient;
2222
import org.apache.hudi.common.table.timeline.HoodieInstant;
23-
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2423
import org.apache.hudi.common.util.ClusteringUtils;
2524
import org.apache.hudi.common.util.Option;
2625
import org.apache.hudi.configuration.OptionsResolver;

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hudi.sink.transform.ChainedTransformer;
5454
import org.apache.hudi.sink.transform.Transformer;
5555
import org.apache.hudi.streamer.FlinkStreamerConfig;
56+
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
5657
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
5758

5859
import org.apache.avro.Schema;
@@ -168,6 +169,8 @@ public static HoodieWriteConfig getHoodieClientConfig(
168169
HoodieClusteringConfig.newBuilder()
169170
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
170171
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
172+
.withClusteringPlanPartitionFilterMode(
173+
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
171174
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
172175
.withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
173176
.withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))

0 commit comments

Comments
 (0)