Skip to content

Commit 9966b2c

Browse files
hj2016huangjing02nsivabalan
authored
[HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
Co-authored-by: huangjing02 <huangjing02@bilibili.com> Co-authored-by: sivabalan <n.siva.b@gmail.com>
1 parent 96ce4b2 commit 9966b2c

2 files changed

Lines changed: 57 additions & 37 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,21 +150,22 @@ private void addShutDownHook() {
150150
// for max of Integer size
151151
private HoodieLogBlock readBlock() throws IOException {
152152
int blockSize;
153+
long blockStartPos = inputStream.getPos();
153154
try {
154155
// 1 Read the total size of the block
155156
blockSize = (int) inputStream.readLong();
156157
} catch (EOFException | CorruptedLogFileException e) {
157158
// An exception reading any of the above indicates a corrupt block
158159
// Create a corrupt block by finding the next MAGIC marker or EOF
159-
return createCorruptBlock();
160+
return createCorruptBlock(blockStartPos);
160161
}
161162

162163
// We may have had a crash which could have written this block partially
163164
// Skip blockSize in the stream and we should either find a sync marker (start of the next
164165
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
165166
boolean isCorrupted = isBlockCorrupted(blockSize);
166167
if (isCorrupted) {
167-
return createCorruptBlock();
168+
return createCorruptBlock(blockStartPos);
168169
}
169170

170171
// 2. Read the version for this log format
@@ -253,14 +254,14 @@ private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blo
253254
return HoodieLogBlockType.values()[type];
254255
}
255256

256-
private HoodieLogBlock createCorruptBlock() throws IOException {
257-
LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
258-
long currentPos = inputStream.getPos();
257+
private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
258+
LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
259+
inputStream.seek(blockStartPos);
259260
long nextBlockOffset = scanForNextAvailableBlockOffset();
260261
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
261-
inputStream.seek(currentPos);
262+
inputStream.seek(blockStartPos);
262263
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
263-
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
264+
int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
264265
long contentPosition = inputStream.getPos();
265266
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
266267
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =

hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -700,20 +700,11 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
700700

701701
@Test
702702
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
703-
Writer writer =
704-
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
705-
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
706-
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
707-
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
708-
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
709-
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
710-
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
711-
writer.appendBlock(dataBlock);
712-
writer.close();
703+
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
713704

714705
// Append some arbit byte[] to the end of the log (mimics a partially written commit)
715706
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
716-
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
707+
FSDataOutputStream outputStream = fs.append(logFile.getPath());
717708
// create a block with
718709
outputStream.write(HoodieLogFormat.MAGIC);
719710
// Write out a length that does not confirm with the content
@@ -728,17 +719,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
728719
outputStream.close();
729720

730721
// Append a proper block that is of the missing length of the corrupted block
731-
writer =
732-
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
733-
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
734-
records = SchemaTestUtil.generateTestRecords(0, 10);
735-
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
736-
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
737-
writer.appendBlock(dataBlock);
738-
writer.close();
722+
logFile = addValidBlock("test-fileId1", "100", 10);
739723

740724
// First round of reads - we should be able to read the first block and then EOF
741-
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
725+
Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
742726
assertTrue(reader.hasNext(), "First block should be available");
743727
reader.next();
744728
assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -751,7 +735,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
751735
reader.close();
752736

753737
// Simulate another failure back to back
754-
outputStream = fs.append(writer.getLogFile().getPath());
738+
outputStream = fs.append(logFile.getPath());
755739
// create a block with
756740
outputStream.write(HoodieLogFormat.MAGIC);
757741
// Write out a length that does not confirm with the content
@@ -766,17 +750,10 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
766750
outputStream.close();
767751

768752
// Should be able to append a new block
769-
writer =
770-
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
771-
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
772-
records = SchemaTestUtil.generateTestRecords(0, 100);
773-
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
774-
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
775-
writer.appendBlock(dataBlock);
776-
writer.close();
753+
logFile = addValidBlock("test-fileId1", "100", 100);
777754

778755
// Second round of reads - we should be able to read the first and last block
779-
reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
756+
reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
780757
assertTrue(reader.hasNext(), "First block should be available");
781758
reader.next();
782759
assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -792,6 +769,48 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep
792769
reader.close();
793770
}
794771

772+
@Test
773+
public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
774+
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
775+
776+
// Append just magic bytes and move onto next block
777+
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
778+
FSDataOutputStream outputStream = fs.append(logFile.getPath());
779+
outputStream.write(HoodieLogFormat.MAGIC);
780+
outputStream.flush();
781+
outputStream.close();
782+
783+
// Append a proper block
784+
logFile = addValidBlock("test-fileId1", "100", 10);
785+
786+
// First round of reads - we should be able to read the first block and then EOF
787+
Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
788+
assertTrue(reader.hasNext(), "First block should be available");
789+
reader.next();
790+
assertTrue(reader.hasNext(), "We should have corrupted block next");
791+
HoodieLogBlock block = reader.next();
792+
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
793+
assertTrue(reader.hasNext(), "Third block should be available");
794+
reader.next();
795+
assertFalse(reader.hasNext(), "There should be no more block left");
796+
797+
reader.close();
798+
}
799+
800+
private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
801+
Writer writer =
802+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
803+
.withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
804+
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords);
805+
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
806+
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
807+
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
808+
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
809+
writer.appendBlock(dataBlock);
810+
writer.close();
811+
return writer.getLogFile();
812+
}
813+
795814
@Test
796815
public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
797816
Writer writer =

0 commit comments

Comments
 (0)