diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 1651574ad0eff..1c58727b39952 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -18,18 +18,6 @@ package org.apache.hudi.common.functional; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.compress.Compression; - import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.DeleteRecord; @@ -68,11 +56,19 @@ import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; - import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -114,39 +110,33 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { private static final HoodieLogBlockType DEFAULT_DATA_BLOCK_TYPE = HoodieLogBlockType.AVRO_DATA_BLOCK; + private static final int BUFFER_SIZE = 4096; - private static String BASE_OUTPUT_PATH = "/tmp/"; - private FileSystem fs; + private static FileSystem fs; private Path partitionPath; - private int bufferSize = 4096; + private String spillableBasePath; @BeforeAll public static void setUpClass() throws IOException, InterruptedException { // Append is not supported in LocalFileSystem. HDFS needs to be setup. MiniClusterUtil.setUp(); + fs = MiniClusterUtil.fileSystem; } @AfterAll public static void tearDownClass() { MiniClusterUtil.shutdown(); + fs = null; } @BeforeEach public void setUp() throws IOException, InterruptedException { - this.fs = MiniClusterUtil.fileSystem; - - assertTrue(fs.mkdirs(new Path(tempDir.toAbsolutePath().toString()))); - this.partitionPath = new Path(tempDir.toAbsolutePath().toString()); - this.basePath = tempDir.getParent().toString(); + this.basePath = tempDir.toUri().getPath(); + this.partitionPath = new Path(basePath, "partition_path"); + this.spillableBasePath = new Path(basePath, ".spillable_path").toUri().getPath(); HoodieTestUtils.init(MiniClusterUtil.configuration, basePath, HoodieTableType.MERGE_ON_READ); } - @AfterEach - public void tearDown() throws IOException { - fs.delete(partitionPath, true); - fs.delete(new Path(basePath), true); - } - @Test public void testEmptyLog() throws IOException { Writer writer = @@ -684,8 +674,8 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -919,8 +909,8 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1004,8 +994,8 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1094,8 +1084,8 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(true) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1175,8 +1165,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1223,8 +1213,8 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); @@ -1338,8 +1328,8 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); @@ -1444,8 +1434,8 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1516,8 +1506,8 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1571,8 +1561,8 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1645,8 +1635,8 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1755,8 +1745,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -1935,8 +1925,8 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogB .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(true) @@ -2023,8 +2013,8 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1 .withMaxMemorySizeInBytes(10240L) .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) - .withBufferSize(bufferSize) - .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .withUseScanV2(useScanv2) @@ -2122,7 +2112,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) FileCreateUtils.createDeltaCommit(basePath, "100", fs); HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); - try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) { + try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) { assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock prevBlock = reader.prev(); @@ -2204,7 +2194,7 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) { + new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) { assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock block = reader.prev(); @@ -2256,7 +2246,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) { + new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) { assertTrue(reader.hasPrev(), "Third block should be available"); reader.moveToPrev();