Skip to content

Commit c733079

Browse files
authored
[HUDI-4924] Auto-tune dedup parallelism (#6802)
1 parent 9966b2c commit c733079

7 files changed

Lines changed: 77 additions & 6 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
5656
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
5757
boolean isIndexingGlobal = index.isGlobal();
5858
final SerializableSchema schema = new SerializableSchema(schemaStr);
59+
// Auto-tunes the parallelism for reduce transformation based on the number of data partitions
60+
// in engine-specific representation
61+
int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism));
5962
return records.mapToPair(record -> {
6063
HoodieKey hoodieKey = record.getKey();
6164
// If index used is global, then records are expected to differ in their partitionPath
@@ -67,7 +70,7 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
6770
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
6871

6972
return new HoodieAvroRecord<>(reducedKey, reducedData);
70-
}, parallelism).map(Pair::getRight);
73+
}, reduceParallelism).map(Pair::getRight);
7174
}
7275

7376
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public long count() {
102102
return rddData.count();
103103
}
104104

105+
@Override
106+
public int getNumPartitions() {
107+
return rddData.getNumPartitions();
108+
}
109+
105110
@Override
106111
public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
107112
return HoodieJavaRDD.of(rddData.map(func::apply));

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,22 +461,27 @@ private void testDeduplication(
461461
HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
462462
jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
463463
HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
464-
.combineInput(true, true);
464+
.combineInput(true, true);
465465
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
466466
HoodieWriteConfig writeConfig = configBuilder.build();
467467

468468
// Global dedup should be done based on recordKey only
469469
HoodieIndex index = mock(HoodieIndex.class);
470470
when(index.isGlobal()).thenReturn(true);
471-
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
471+
int dedupParallelism = records.getNumPartitions() + 100;
472+
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema());
473+
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
474+
assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
472475
assertEquals(1, dedupedRecs.size());
473476
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
474477
assertNodupesWithinPartition(dedupedRecs);
475478

476479
// non-Global dedup should be done based on both recordKey and partitionPath
477480
index = mock(HoodieIndex.class);
478481
when(index.isGlobal()).thenReturn(false);
479-
dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList();
482+
dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema());
483+
dedupedRecs = dedupedRecsRdd.collectAsList();
484+
assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
480485
assertEquals(2, dedupedRecs.size());
481486
assertNodupesWithinPartition(dedupedRecs);
482487

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.data;
21+
22+
import org.apache.hudi.common.data.HoodieData;
23+
import org.apache.hudi.testutils.HoodieClientTestBase;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.stream.Collectors;
28+
import java.util.stream.IntStream;
29+
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
32+
public class TestHoodieJavaRDD extends HoodieClientTestBase {
33+
@Test
34+
public void testGetNumPartitions() {
35+
int numPartitions = 6;
36+
HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
37+
IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions));
38+
assertEquals(numPartitions, rddData.getNumPartitions());
39+
}
40+
}

hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,19 @@ public interface HoodieData<T> extends Serializable {
6767

6868
/**
6969
* Returns number of objects held in the collection
70-
*
70+
* <p>
7171
* NOTE: This is a terminal operation
7272
*/
7373
long count();
7474

75+
/**
76+
* @return the number of data partitions in the engine-specific representation.
77+
*/
78+
int getNumPartitions();
79+
7580
/**
7681
* Maps every element in the collection using provided mapping {@code func}.
77-
*
82+
* <p>
7883
* This is an intermediate operation
7984
*
8085
* @param func serializable map function

hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ public long count() {
175175
return super.count();
176176
}
177177

178+
@Override
179+
public int getNumPartitions() {
180+
return 1;
181+
}
182+
178183
@Override
179184
public List<T> collectAsList() {
180185
return super.collectAsList();

hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Arrays;
3030
import java.util.List;
3131
import java.util.stream.Collectors;
32+
import java.util.stream.IntStream;
3233
import java.util.stream.Stream;
3334

3435
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -64,4 +65,11 @@ void testEagerSemantic() {
6465
assertEquals(3, originalListData.count());
6566
assertEquals(sourceList, originalListData.collectAsList());
6667
}
68+
69+
@Test
70+
public void testGetNumPartitions() {
71+
HoodieData<Integer> listData = HoodieListData.eager(
72+
IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()));
73+
assertEquals(1, listData.getNumPartitions());
74+
}
6775
}

0 commit comments

Comments
 (0)