Skip to content

Commit 8ce4693

Browse files
committed
delete deprecated clustering plan strategy and add clustering ITTest
1 parent 2ccf74d commit 8ce4693

File tree

8 files changed

+102
-148
lines changed

8 files changed

+102
-148
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
7070
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
7171
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
7272
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
73-
String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
74-
String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
75-
String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
7673
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
7774
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
7875

@@ -85,14 +82,6 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
8582
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
8683
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
8784
return sparkSizeBasedClassName;
88-
} else if (flinkRecentDaysClassName.equals(className)) {
89-
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
90-
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
91-
return flinkSizeBasedClassName;
92-
} else if (flinkSelectedPartitionsClassName.equals(className)) {
93-
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
94-
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
95-
return flinkSizeBasedClassName;
9685
} else if (javaSelectedPartitionClassName.equals(className)) {
9786
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
9887
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
@@ -173,7 +162,7 @@ protected Map<String, Double> buildMetrics(List<FileSlice> fileSlices) {
173162
return metrics;
174163
}
175164

176-
protected HoodieTable<T,I,K, O> getHoodieTable() {
165+
protected HoodieTable<T, I, K, O> getHoodieTable() {
177166
return this.hoodieTable;
178167
}
179168

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkRecentDaysClusteringPlanStrategy.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSelectedPartitionsClusteringPlanStrategy.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.hudi.configuration;
2020

21-
import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
21+
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
2222
import org.apache.hudi.common.config.ConfigClassProperty;
2323
import org.apache.hudi.common.config.ConfigGroups;
2424
import org.apache.hudi.common.config.HoodieConfig;
@@ -45,6 +45,11 @@
4545
import java.util.Map;
4646
import java.util.Set;
4747

48+
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
49+
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
50+
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION;
51+
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST;
52+
4853
/**
4954
* Hoodie Flink config options.
5055
*
@@ -621,11 +626,22 @@ private FlinkOptions() {
621626
public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
622627
.key("clustering.plan.strategy.class")
623628
.stringType()
624-
.defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
629+
.defaultValue(FlinkSizeBasedClusteringPlanStrategy.class.getName())
625630
.withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
626631
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
627632
+ CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
628633

634+
public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions
635+
.key("clustering.plan.partition.filter.mode")
636+
.stringType()
637+
.defaultValue("NONE")
638+
.withDescription("Partition filter mode used in the creation of clustering plan. Available values are - "
639+
+ "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
640+
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
641+
+ PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
642+
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
643+
+ PARTITION_FILTER_END_PARTITION.key() + "'].");
644+
629645
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
630646
.key("clustering.plan.strategy.target.file.max.bytes")
631647
.intType()

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/FlinkClusteringConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.sink.clustering;
2020

21+
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
2122
import org.apache.hudi.common.table.HoodieTableConfig;
2223
import org.apache.hudi.configuration.FlinkOptions;
2324
import org.apache.hudi.util.StreamerUtil;
@@ -78,7 +79,10 @@ public class FlinkClusteringConfig extends Configuration {
7879
public Boolean cleanAsyncEnable = false;
7980

8081
@Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
81-
public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
82+
public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
83+
84+
@Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan", required = false)
85+
public String planPartitionFilterMode = "NONE";
8286

8387
@Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
8488
public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
@@ -129,6 +133,7 @@ public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
129133
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
130134
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
131135
conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
136+
conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode);
132137
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
133138
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
134139
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.hudi.client.HoodieFlinkWriteClient;
2222
import org.apache.hudi.common.table.timeline.HoodieInstant;
23-
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2423
import org.apache.hudi.common.util.ClusteringUtils;
2524
import org.apache.hudi.common.util.Option;
2625
import org.apache.hudi.configuration.OptionsResolver;

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hudi.sink.transform.ChainedTransformer;
5454
import org.apache.hudi.sink.transform.Transformer;
5555
import org.apache.hudi.streamer.FlinkStreamerConfig;
56+
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
5657
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
5758

5859
import org.apache.avro.Schema;
@@ -168,6 +169,8 @@ public static HoodieWriteConfig getHoodieClientConfig(
168169
HoodieClusteringConfig.newBuilder()
169170
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
170171
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
172+
.withClusteringPlanPartitionFilterMode(
173+
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
171174
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
172175
.withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
173176
.withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,17 @@ public void testWriteMergeOnReadWithCompaction(String indexType) throws Exceptio
152152
testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED);
153153
}
154154

155+
@Test
156+
public void testWriteMergeOnReadWithClustering() throws Exception {
157+
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
158+
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
159+
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
160+
conf.setString(FlinkOptions.OPERATION, "insert");
161+
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
162+
163+
testWriteToHoodieWithCluster(conf, "cow_write_with_clustering", 1, EXPECTED);
164+
}
165+
155166
private void testWriteToHoodie(
156167
Transformer transformer,
157168
String jobName,
@@ -250,6 +261,69 @@ private void testWriteToHoodie(
250261
}
251262

252263
TestData.checkWrittenFullData(tempFile, expected);
264+
}
265+
266+
private void testWriteToHoodieWithCluster(
267+
Configuration conf,
268+
String jobName,
269+
int checkpoints,
270+
Map<String, List<String>> expected) throws Exception {
271+
272+
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
273+
execEnv.getConfig().disableObjectReuse();
274+
execEnv.setParallelism(4);
275+
// set up checkpoint interval
276+
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
277+
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
278+
279+
// Read from file source
280+
RowType rowType =
281+
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
282+
.getLogicalType();
253283

284+
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
285+
rowType,
286+
InternalTypeInfo.of(rowType),
287+
false,
288+
true,
289+
TimestampFormat.ISO_8601
290+
);
291+
String sourcePath = Objects.requireNonNull(Thread.currentThread()
292+
.getContextClassLoader().getResource("test_source.data")).toString();
293+
294+
boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
295+
296+
DataStream<RowData> dataStream;
297+
if (isMor) {
298+
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
299+
format.setFilesFilter(FilePathFilter.createDefaultFilter());
300+
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
301+
format.setCharsetName("UTF-8");
302+
303+
dataStream = execEnv
304+
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
305+
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
306+
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
307+
.setParallelism(1);
308+
} else {
309+
dataStream = execEnv
310+
// use continuous file source to trigger checkpoint
311+
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints))
312+
.name("continuous_file_source")
313+
.setParallelism(1)
314+
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
315+
.setParallelism(4);
316+
}
317+
318+
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, true);
319+
execEnv.addOperator(pipeline.getTransformation());
320+
321+
Pipelines.cluster(conf, rowType, pipeline);
322+
JobClient client = execEnv.executeAsync(jobName);
323+
324+
// wait for the streaming job to finish
325+
client.getJobExecutionResult().get();
326+
327+
TestData.checkWrittenFullData(tempFile, expected);
254328
}
255329
}

0 commit comments

Comments
 (0)