Skip to content

Commit 98c5c6c

Browse files
author
RexAn
authored
[HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row (#5502)
* Along the lines of RDDCustomColumnsSortPartitioner but for Row
1 parent 4e42ed5 commit 98c5c6c

3 files changed

Lines changed: 112 additions & 8 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.avro.Schema;
3030
import org.apache.spark.api.java.JavaRDD;
3131

32+
import java.util.Arrays;
33+
3234
/**
3335
* A partitioner that does sorting based on specified column values for each RDD partition.
3436
*
@@ -78,6 +80,7 @@ public boolean arePartitionRecordsSorted() {
7880
}
7981

8082
private String[] getSortColumnName(HoodieWriteConfig config) {
81-
return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
83+
return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
84+
.map(String::trim).toArray(String[]::new);
8285
}
8386
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.execution.bulkinsert;
20+
21+
import org.apache.hudi.common.model.HoodieRecord;
22+
import org.apache.hudi.config.HoodieWriteConfig;
23+
import org.apache.hudi.table.BulkInsertPartitioner;
24+
import org.apache.spark.sql.Dataset;
25+
import org.apache.spark.sql.Row;
26+
27+
import java.util.Arrays;
28+
29+
/**
30+
* A partitioner that does sorting based on specified column values for each spark partitions.
31+
*/
32+
public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {
33+
34+
private final String[] sortColumnNames;
35+
36+
public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
37+
this.sortColumnNames = getSortColumnName(config);
38+
}
39+
40+
public RowCustomColumnsSortPartitioner(String[] columnNames) {
41+
this.sortColumnNames = columnNames;
42+
}
43+
44+
@Override
45+
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
46+
final String[] sortColumns = this.sortColumnNames;
47+
return records.coalesce(outputSparkPartitions)
48+
.sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns);
49+
}
50+
51+
@Override
52+
public boolean arePartitionRecordsSorted() {
53+
return true;
54+
}
55+
56+
private String[] getSortColumnName(HoodieWriteConfig config) {
57+
return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
58+
.map(String::trim).toArray(String[]::new);
59+
}
60+
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.hudi.common.model.HoodieRecord;
2222
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
23+
import org.apache.hudi.common.util.Option;
24+
import org.apache.hudi.config.HoodieWriteConfig;
2325
import org.apache.hudi.table.BulkInsertPartitioner;
2426
import org.apache.hudi.testutils.HoodieClientTestHarness;
2527
import org.apache.hudi.testutils.SparkDatasetTestUtils;
@@ -29,6 +31,7 @@
2931
import org.apache.spark.sql.Row;
3032
import org.junit.jupiter.api.AfterEach;
3133
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
3235
import org.junit.jupiter.params.ParameterizedTest;
3336
import org.junit.jupiter.params.provider.Arguments;
3437
import org.junit.jupiter.params.provider.MethodSource;
@@ -48,6 +51,8 @@
4851
*/
4952
public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness {
5053

54+
private static final Comparator<Row> KEY_COMPARATOR =
55+
Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
5156
@BeforeEach
5257
public void setUp() throws Exception {
5358
initSparkContexts("TestBulkInsertInternalPartitionerForRows");
@@ -77,29 +82,55 @@ public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
7782
Dataset<Row> records1 = generateTestRecords();
7883
Dataset<Row> records2 = generateTestRecords();
7984
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
80-
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1));
85+
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty());
8186
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
82-
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2));
87+
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty());
88+
}
89+
90+
@Test
91+
public void testCustomColumnSortPartitionerWithRows() {
92+
Dataset<Row> records1 = generateTestRecords();
93+
Dataset<Row> records2 = generateTestRecords();
94+
String sortColumnString = records1.columns()[5];
95+
String[] sortColumns = sortColumnString.split(",");
96+
Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
97+
98+
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
99+
records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator));
100+
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
101+
records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator));
102+
103+
HoodieWriteConfig config = HoodieWriteConfig
104+
.newBuilder()
105+
.withPath("/")
106+
.withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName())
107+
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
108+
.build();
109+
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
110+
records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator));
111+
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
112+
records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator));
83113
}
84114

85115
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
86116
Dataset<Row> rows,
87117
boolean isGloballySorted, boolean isLocallySorted,
88-
Map<String, Long> expectedPartitionNumRecords) {
118+
Map<String, Long> expectedPartitionNumRecords,
119+
Option<Comparator<Row>> comparator) {
89120
int numPartitions = 2;
90121
Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions);
91122
List<Row> collectedActualRecords = actualRecords.collectAsList();
92123
if (isGloballySorted) {
93124
// Verify global order
94-
verifyRowsAscendingOrder(collectedActualRecords);
125+
verifyRowsAscendingOrder(collectedActualRecords, comparator);
95126
} else if (isLocallySorted) {
96127
// Verify local order
97128
actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input -> {
98129
List<Row> partitionRows = new ArrayList<>();
99130
while (input.hasNext()) {
100131
partitionRows.add(input.next());
101132
}
102-
verifyRowsAscendingOrder(partitionRows);
133+
verifyRowsAscendingOrder(partitionRows, comparator);
103134
return Collections.emptyList().iterator();
104135
}, SparkDatasetTestUtils.ENCODER);
105136
}
@@ -130,10 +161,20 @@ public Dataset<Row> generateTestRecords() {
130161
return rowsPart1.union(rowsPart2);
131162
}
132163

133-
private void verifyRowsAscendingOrder(List<Row> records) {
164+
private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) {
134165
List<Row> expectedRecords = new ArrayList<>(records);
135-
Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
166+
Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR));
136167
assertEquals(expectedRecords, records);
137168
}
138169

170+
private Comparator<Row> getCustomColumnComparator(String[] sortColumns) {
171+
Comparator<Row> comparator = Comparator.comparing(row -> {
172+
StringBuilder sb = new StringBuilder();
173+
for (String col : sortColumns) {
174+
sb.append(row.getAs(col).toString());
175+
}
176+
return sb.toString();
177+
});
178+
return comparator;
179+
}
139180
}

0 commit comments

Comments
 (0)