Skip to content

Commit 8977daf

Browse files
TJX2014xiaoxingstack
authored andcommitted
[HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in lo… (apache#6630)
* [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in log file issue Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>
1 parent 52c5588 commit 8977daf

5 files changed

Lines changed: 62 additions & 26 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hadoop.fs.Path;
2323
import org.apache.hudi.common.engine.HoodieEngineContext;
2424
import org.apache.hudi.common.fs.FSUtils;
25+
import org.apache.hudi.common.model.FileSlice;
2526
import org.apache.hudi.common.model.HoodieBaseFile;
2627
import org.apache.hudi.common.model.HoodieRecord;
2728
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -72,6 +73,26 @@ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
7273
return Collections.emptyList();
7374
}
7475

76+
/**
77+
* Fetches Pair of partition path and {@link FileSlice}s for interested partitions.
78+
*
79+
* @param partition Partition of interest
80+
* @param hoodieTable Instance of {@link HoodieTable} of interest
81+
* @return the list of {@link FileSlice}
82+
*/
83+
public static List<FileSlice> getLatestFileSlicesForPartition(
84+
final String partition,
85+
final HoodieTable hoodieTable) {
86+
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
87+
.filterCompletedInstants().lastInstant();
88+
if (latestCommitTime.isPresent()) {
89+
return hoodieTable.getHoodieView()
90+
.getLatestFileSlicesBeforeOrOn(partition, latestCommitTime.get().getTimestamp(), true)
91+
.collect(toList());
92+
}
93+
return Collections.emptyList();
94+
}
95+
7596
/**
7697
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
7798
*

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hudi.exception.HoodieIOException;
2626
import org.apache.hudi.index.HoodieIndexUtils;
2727
import org.apache.hudi.table.HoodieTable;
28-
2928
import org.apache.log4j.LogManager;
3029
import org.apache.log4j.Logger;
3130

@@ -52,10 +51,11 @@ private Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
5251
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new HashMap<>();
5352
hoodieTable.getMetaClient().reloadActiveTimeline();
5453
HoodieIndexUtils
55-
.getLatestBaseFilesForPartition(partition, hoodieTable)
56-
.forEach(file -> {
57-
String fileId = file.getFileId();
58-
String commitTime = file.getCommitTime();
54+
.getLatestFileSlicesForPartition(partition, hoodieTable)
55+
.forEach(fileSlice -> {
56+
String fileId = fileSlice.getFileId();
57+
String commitTime = fileSlice.getBaseInstantTime();
58+
5959
int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
6060
if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
6161
bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId));

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.hudi.common.model.HoodieFileFormat;
3333
import org.apache.hudi.common.model.HoodieLogFile;
3434
import org.apache.hudi.common.model.HoodieRecord;
35-
import org.apache.hudi.common.model.HoodieRecordLocation;
3635
import org.apache.hudi.common.model.HoodieRecordPayload;
3736
import org.apache.hudi.common.table.HoodieTableConfig;
3837
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -45,9 +44,9 @@
4544
import org.apache.hudi.common.util.collection.Pair;
4645
import org.apache.hudi.config.HoodieStorageConfig;
4746
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
48-
import org.apache.hudi.io.storage.HoodieParquetConfig;
4947
import org.apache.hudi.io.storage.HoodieOrcConfig;
5048
import org.apache.hudi.io.storage.HoodieOrcWriter;
49+
import org.apache.hudi.io.storage.HoodieParquetConfig;
5150
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
5251
import org.apache.log4j.LogManager;
5352
import org.apache.log4j.Logger;
@@ -152,27 +151,21 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
152151
return baseFilePath;
153152
}
154153

155-
public Map<String, List<HoodieLogFile>> withLogAppends(List<HoodieRecord> records) throws Exception {
154+
public Map<String, List<HoodieLogFile>> withLogAppends(String partition, String fileId, List<HoodieRecord> records) throws Exception {
156155
Map<String, List<HoodieLogFile>> partitionToLogfilesMap = new HashMap<>();
157-
for (List<HoodieRecord> groupedRecords : records.stream()
158-
.collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
159-
final Pair<String, HoodieLogFile> appendedLogFile = appendRecordsToLogFile(groupedRecords);
160-
partitionToLogfilesMap.computeIfAbsent(
161-
appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue());
162-
}
156+
final Pair<String, HoodieLogFile> appendedLogFile = appendRecordsToLogFile(partition, fileId, records);
157+
partitionToLogfilesMap.computeIfAbsent(appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue());
163158
return partitionToLogfilesMap;
164159
}
165160

166-
private Pair<String, HoodieLogFile> appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
167-
String partitionPath = groupedRecords.get(0).getPartitionPath();
168-
HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
161+
private Pair<String, HoodieLogFile> appendRecordsToLogFile(String partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
169162
try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
170-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
171-
.overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
163+
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
164+
.overBaseCommit(currentInstantTime).withFs(fs).build()) {
172165
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
173-
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
166+
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, currentInstantTime);
174167
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
175-
logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
168+
logWriter.appendBlock(new HoodieAvroDataBlock(records.stream().map(r -> {
176169
try {
177170
GenericRecord val = (GenericRecord) ((HoodieRecordPayload) r.getData()).getInsertValue(schema).get();
178171
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.junit.jupiter.api.AfterEach;
4343
import org.junit.jupiter.api.BeforeEach;
4444
import org.junit.jupiter.api.Test;
45+
import org.junit.jupiter.params.ParameterizedTest;
46+
import org.junit.jupiter.params.provider.ValueSource;
4547

4648
import java.util.Arrays;
4749
import java.util.Properties;
@@ -89,8 +91,9 @@ public void testBucketIndexValidityCheck() {
8991
.withBucketNum("8").build();
9092
}
9193

92-
@Test
93-
public void testTagLocation() throws Exception {
94+
@ParameterizedTest
95+
@ValueSource(booleans = {true, false})
96+
public void testTagLocation(boolean isInsert) throws Exception {
9497
String rowKey1 = UUID.randomUUID().toString();
9598
String rowKey2 = UUID.randomUUID().toString();
9699
String rowKey3 = UUID.randomUUID().toString();
@@ -119,9 +122,17 @@ public void testTagLocation() throws Exception {
119122
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown()));
120123

121124
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(table, SCHEMA);
122-
testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1);
123-
testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2);
124-
testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3);
125+
126+
if (isInsert) {
127+
testTable.addCommit("001").withInserts("2016/01/31", getRecordFileId(record1), record1);
128+
testTable.addCommit("002").withInserts("2016/01/31", getRecordFileId(record2), record2);
129+
testTable.addCommit("003").withInserts("2016/01/31", getRecordFileId(record3), record3);
130+
} else {
131+
testTable.addCommit("001").withLogAppends("2016/01/31", getRecordFileId(record1), record1);
132+
testTable.addCommit("002").withLogAppends("2016/01/31", getRecordFileId(record2), record2);
133+
testTable.addCommit("003").withLogAppends("2016/01/31", getRecordFileId(record3), record3);
134+
}
135+
125136
taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context,
126137
HoodieSparkTable.create(config, context, metaClient));
127138
assertFalse(taggedRecordRDD.collectAsList().stream().filter(r -> r.isCurrentLocationKnown())

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.common.bloom.BloomFilter;
2424
import org.apache.hudi.common.bloom.BloomFilterFactory;
2525
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
26+
import org.apache.hudi.common.model.HoodieLogFile;
2627
import org.apache.hudi.common.model.HoodieRecord;
2728
import org.apache.hudi.common.table.HoodieTableMetaClient;
2829
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -36,6 +37,7 @@
3637

3738
import java.util.Arrays;
3839
import java.util.List;
40+
import java.util.Map;
3941
import java.util.UUID;
4042

4143
public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
@@ -116,4 +118,13 @@ public HoodieSparkWriteableTestTable withInserts(String partition, String fileId
116118
public Path withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
117119
return super.withInserts(partition, fileId, records, new SparkTaskContextSupplier());
118120
}
121+
122+
public HoodieSparkWriteableTestTable withLogAppends(String partition, String fileId, HoodieRecord... records) throws Exception {
123+
withLogAppends(partition, fileId, Arrays.asList(records));
124+
return this;
125+
}
126+
127+
public Map<String, List<HoodieLogFile>> withLogAppends(String partition, String fileId, List<HoodieRecord> records) throws Exception {
128+
return super.withLogAppends(partition, fileId, records);
129+
}
119130
}

0 commit comments

Comments
 (0)