diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index f029bbe33b2a3..08e95bb57ba42 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -66,10 +66,19 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -94,6 +103,56 @@ public void testTableOperations(boolean reuseReaders) throws Exception { verifyBaseMetadataTable(reuseReaders); } + /** + * Create a cow table and call getAllFilesInPartition api in parallel which reads data files from MDT + * This UT is guard that multi readers for MDT#getAllFilesInPartition api is safety. + * @param reuse + * @throws Exception + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) throws Exception { + final int taskNumber = 20; + HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; + init(tableType); + testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1); + HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse); + assertTrue(tableMetadata.enabled()); + List metadataPartitions = tableMetadata.getAllPartitionPaths(); + String partition = metadataPartitions.get(0); + String finalPartition = basePath + "/" + partition; + ExecutorService executors = Executors.newFixedThreadPool(taskNumber); + AtomicBoolean flag = new AtomicBoolean(false); + CountDownLatch downLatch = new CountDownLatch(taskNumber); + AtomicInteger filesNumber = new AtomicInteger(0); + + // call getAllFilesInPartition api from meta data table in parallel + for (int i = 0; i < taskNumber; i++) { + executors.submit(new Runnable() { + @Override + public void run() { + try { + downLatch.countDown(); + downLatch.await(); + FileStatus[] files = tableMetadata.getAllFilesInPartition(new Path(finalPartition)); + if (files.length != 1) { + LOG.warn("Miss match data file numbers."); + throw new RuntimeException("Miss match data file numbers."); + } + filesNumber.addAndGet(files.length); + } catch (Exception e) { + LOG.warn("Catch Exception while reading data files from MDT.", e); + flag.compareAndSet(false, true); + } + } + }); + } + executors.shutdown(); + executors.awaitTermination(5, TimeUnit.MINUTES); + assertFalse(flag.get()); + assertEquals(filesNumber.get(), taskNumber); + } + private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception { doWriteInsertAndUpsert(testTable, "0000001", "0000002", false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index f8a0389da3d4c..ef37ac8fa557a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -18,9 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -54,6 +51,10 @@ import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -231,7 +232,7 @@ public List>>> getRecord throw new HoodieIOException("Error merging records from metadata table for " + sortedKeys.size() + " key : ", ioe); } finally { if (!reuse) { - close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId())); + closeReader(readers); } } }); @@ -399,7 +400,12 @@ private Map, List> getPartitionFileSliceToKeysMa * @return File reader and the record scanner pair for the requested file slice */ private Pair getOrCreateReaders(String partitionName, FileSlice slice) { - return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice)); + if (reuse) { + return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> { + return openReaders(partitionName, slice); }); + } else { + return openReaders(partitionName, slice); + } } private Pair openReaders(String partitionName, FileSlice slice) {