Skip to content

Commit 397c1d3

Browse files
boneanxsyuzhaojing
authored andcommitted
[HUDI-4363] Support Clustering row writer to improve performance (apache#6046)
1 parent 0d74c4d commit 397c1d3

File tree

28 files changed

+671
-170
lines changed

28 files changed

+671
-170
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,27 @@ public void setSuccessRecordKeys(List<String> successRecordKeys) {
149149
this.successRecordKeys = successRecordKeys;
150150
}
151151

152+
public double getFailureFraction() {
153+
return failureFraction;
154+
}
155+
156+
public boolean isTrackSuccessRecords() {
157+
return trackSuccessRecords;
158+
}
159+
152160
@Override
153161
public String toString() {
154162
return "PartitionPath " + partitionPath + ", FileID " + fileId + ", Success records "
155163
+ totalRecords + ", errored Rows " + totalErrorRecords
156164
+ ", global error " + (globalError != null);
157165
}
166+
167+
public WriteStatus toWriteStatus() {
168+
WriteStatus status = new WriteStatus(trackSuccessRecords, failureFraction);
169+
status.setFileId(fileId);
170+
status.setTotalRecords(totalRecords);
171+
status.setPartitionPath(partitionPath);
172+
status.setStat(stat);
173+
return status;
174+
}
158175
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,9 @@ public class HoodieWriteConfig extends HoodieConfig {
325325
.withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics),"
326326
+ "running on each writer's driver process, accepting requests during the write from executors.");
327327

328-
public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
328+
public static final ConfigProperty<Boolean> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
329329
.key("hoodie.embed.timeline.server.reuse.enabled")
330-
.defaultValue("false")
330+
.defaultValue(false)
331331
.withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)"
332332
+ "to avoid startup costs. This should rarely be changed.");
333333

@@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() {
10841084
}
10851085

10861086
public boolean isEmbeddedTimelineServerReuseEnabled() {
1087-
return Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED));
1087+
return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
10881088
}
10891089

10901090
public int getEmbeddedTimelineServerPort() {

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java

Lines changed: 160 additions & 28 deletions
Large diffs are not rendered by default.

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.hudi.client.clustering.run.strategy;
2121

22+
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
2223
import org.apache.hudi.client.WriteStatus;
2324
import org.apache.hudi.common.data.HoodieData;
2425
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -35,6 +36,8 @@
3536
import org.apache.avro.Schema;
3637
import org.apache.log4j.LogManager;
3738
import org.apache.log4j.Logger;
39+
import org.apache.spark.sql.Dataset;
40+
import org.apache.spark.sql.Row;
3841

3942
import java.util.List;
4043
import java.util.Map;
@@ -54,14 +57,40 @@ public SparkSingleFileSortExecutionStrategy(HoodieTable table,
5457
super(table, engineContext, writeConfig);
5558
}
5659

60+
@Override
61+
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
62+
int numOutputGroups,
63+
String instantTime,
64+
Map<String, String> strategyParams,
65+
Schema schema,
66+
List<HoodieFileGroupId> fileGroupIdList,
67+
boolean shouldPreserveHoodieMetadata,
68+
Map<String, String> extraMetadata) {
69+
if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
70+
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
71+
}
72+
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
73+
74+
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
75+
.withBulkInsertParallelism(numOutputGroups)
76+
.withProps(getWriteConfig().getProps()).build();
77+
78+
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
79+
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
80+
81+
return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
82+
getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
83+
}
84+
5785
@Override
5886
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords,
59-
int numOutputGroups,
60-
String instantTime,
61-
Map<String, String> strategyParams,
62-
Schema schema,
63-
List<HoodieFileGroupId> fileGroupIdList,
64-
boolean preserveHoodieMetadata) {
87+
int numOutputGroups,
88+
String instantTime,
89+
Map<String, String> strategyParams,
90+
Schema schema,
91+
List<HoodieFileGroupId> fileGroupIdList,
92+
boolean shouldPreserveHoodieMetadata,
93+
Map<String, String> extraMetadata) {
6594
if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
6695
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
6796
}
@@ -74,6 +103,6 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
74103
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
75104

76105
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
77-
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
106+
false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), shouldPreserveHoodieMetadata));
78107
}
79108
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.client.clustering.run.strategy;
2020

21+
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
2122
import org.apache.hudi.client.WriteStatus;
2223
import org.apache.hudi.common.data.HoodieData;
2324
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -33,6 +34,8 @@
3334
import org.apache.avro.Schema;
3435
import org.apache.log4j.LogManager;
3536
import org.apache.log4j.Logger;
37+
import org.apache.spark.sql.Dataset;
38+
import org.apache.spark.sql.Row;
3639

3740
import java.util.List;
3841
import java.util.Map;
@@ -53,16 +56,40 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table,
5356
}
5457

5558
@Override
56-
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords, final int numOutputGroups,
57-
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
58-
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
59+
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
60+
int numOutputGroups,
61+
String instantTime, Map<String, String> strategyParams,
62+
Schema schema,
63+
List<HoodieFileGroupId> fileGroupIdList,
64+
boolean shouldPreserveHoodieMetadata,
65+
Map<String, String> extraMetadata) {
66+
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
67+
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
68+
.withBulkInsertParallelism(numOutputGroups)
69+
.withProps(getWriteConfig().getProps()).build();
70+
71+
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
72+
73+
return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
74+
getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
75+
}
76+
77+
@Override
78+
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords,
79+
final int numOutputGroups,
80+
final String instantTime,
81+
final Map<String, String> strategyParams,
82+
final Schema schema,
83+
final List<HoodieFileGroupId> fileGroupIdList,
84+
final boolean shouldPreserveHoodieMetadata,
85+
final Map<String, String> extraMetadata) {
5986
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
6087

6188
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
6289
.withBulkInsertParallelism(numOutputGroups)
6390
.withProps(getWriteConfig().getProps()).build();
6491
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
65-
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
66-
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
92+
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(),
93+
newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
6794
}
6895
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,41 +29,31 @@
2929
import org.apache.hudi.common.model.RewriteAvroPayload;
3030
import org.apache.hudi.common.util.Option;
3131
import org.apache.hudi.config.HoodieClusteringConfig;
32-
import org.apache.hudi.sort.SpaceCurveSortingHelper;
33-
import org.apache.hudi.table.BulkInsertPartitioner;
3432

3533
import org.apache.avro.Schema;
3634
import org.apache.avro.generic.GenericRecord;
3735
import org.apache.spark.api.java.JavaRDD;
3836
import org.apache.spark.sql.Dataset;
3937
import org.apache.spark.sql.Row;
4038

41-
import java.util.Arrays;
42-
import java.util.List;
43-
4439
/**
4540
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
4641
* support z-curve optimization, hilbert will come soon.
4742
* @param <T> HoodieRecordPayload type
4843
*/
4944
public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
50-
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
45+
extends SpatialCurveSortPartitionerBase<JavaRDD<HoodieRecord<T>>> {
5146

5247
private final transient HoodieSparkEngineContext sparkEngineContext;
53-
private final String[] orderByColumns;
5448
private final SerializableSchema schema;
55-
private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy;
56-
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType;
5749

5850
public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext,
5951
String[] orderByColumns,
6052
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
6153
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType,
6254
Schema schema) {
55+
super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
6356
this.sparkEngineContext = sparkEngineContext;
64-
this.orderByColumns = orderByColumns;
65-
this.layoutOptStrategy = layoutOptStrategy;
66-
this.curveCompositionStrategyType = curveCompositionStrategyType;
6757
this.schema = new SerializableSchema(schema);
6858
}
6959

@@ -91,27 +81,4 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
9181
return hoodieRecord;
9282
});
9383
}
94-
95-
private Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
96-
if (orderByColumns.length == 0) {
97-
// No-op
98-
return dataset;
99-
}
100-
101-
List<String> orderedCols = Arrays.asList(orderByColumns);
102-
103-
switch (curveCompositionStrategyType) {
104-
case DIRECT:
105-
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
106-
case SAMPLE:
107-
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
108-
default:
109-
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
110-
}
111-
}
112-
113-
@Override
114-
public boolean arePartitionRecordsSorted() {
115-
return true;
116-
}
11784
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.execution.bulkinsert;
20+
21+
import org.apache.hudi.config.HoodieClusteringConfig;
22+
import org.apache.hudi.config.HoodieWriteConfig;
23+
import org.apache.spark.sql.Dataset;
24+
import org.apache.spark.sql.Row;
25+
26+
public class RowSpatialCurveSortPartitioner extends SpatialCurveSortPartitionerBase<Dataset<Row>> {
27+
28+
public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) {
29+
super(config.getClusteringSortColumns(), config.getLayoutOptimizationStrategy(), config.getLayoutOptimizationCurveBuildMethod());
30+
}
31+
32+
public RowSpatialCurveSortPartitioner(String[] orderByColumns,
33+
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
34+
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) {
35+
super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
36+
}
37+
38+
@Override
39+
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {
40+
return reorder(records, outputPartitions);
41+
}
42+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.execution.bulkinsert;
20+
21+
import org.apache.hudi.config.HoodieClusteringConfig;
22+
import org.apache.hudi.sort.SpaceCurveSortingHelper;
23+
import org.apache.hudi.table.BulkInsertPartitioner;
24+
import org.apache.spark.sql.Dataset;
25+
import org.apache.spark.sql.Row;
26+
27+
import java.util.Arrays;
28+
import java.util.List;
29+
30+
public abstract class SpatialCurveSortPartitionerBase<T> implements BulkInsertPartitioner<T> {
31+
32+
private final String[] orderByColumns;
33+
private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy;
34+
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType;
35+
36+
public SpatialCurveSortPartitionerBase(String orderByColumns,
37+
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
38+
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) {
39+
if (orderByColumns != null) {
40+
this.orderByColumns = Arrays.stream(orderByColumns.split(","))
41+
.map(String::trim).toArray(String[]::new);
42+
} else {
43+
throw new IllegalArgumentException("The config "
44+
+ HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() + " must be provided");
45+
}
46+
this.layoutOptStrategy = layoutOptStrategy;
47+
this.curveCompositionStrategyType = curveCompositionStrategyType;
48+
}
49+
50+
public SpatialCurveSortPartitionerBase(String[] orderByColumns,
51+
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
52+
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) {
53+
this.orderByColumns = orderByColumns;
54+
this.layoutOptStrategy = layoutOptStrategy;
55+
this.curveCompositionStrategyType = curveCompositionStrategyType;
56+
}
57+
58+
/**
59+
* Mapping specified multi need-to-order columns to one dimension while preserving data locality.
60+
*/
61+
protected Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
62+
if (orderByColumns.length == 0) {
63+
// No-op
64+
return dataset;
65+
}
66+
67+
List<String> orderedCols = Arrays.asList(orderByColumns);
68+
69+
switch (curveCompositionStrategyType) {
70+
case DIRECT:
71+
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
72+
case SAMPLE:
73+
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
74+
default:
75+
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
76+
}
77+
}
78+
79+
@Override
80+
public boolean arePartitionRecordsSorted() {
81+
return true;
82+
}
83+
}

0 commit comments

Comments
 (0)