From 2332fe0498d59d7205830ee4ffabad260cf30345 Mon Sep 17 00:00:00 2001 From: chenglei Date: Thu, 19 Aug 2021 22:28:35 +0800 Subject: [PATCH 1/4] HBASE-26210 HBase Write should be doomed to hang when cell size exceeds InmemoryFlushSize for CompactingMemStore --- .../regionserver/CompactingMemStore.java | 93 +++---- .../regionserver/CompactionPipeline.java | 2 +- .../TestCompactingToCellFlatMapMemStore.java | 6 +- .../hadoop/hbase/regionserver/TestHStore.java | 236 ++++++++++++++++-- 4 files changed, 273 insertions(+), 64 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 6434460a5d18..6d1d21d95e31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -208,7 +208,7 @@ public MemStoreSnapshot snapshot() { stopCompaction(); // region level lock ensures pushing active to pipeline is done in isolation // no concurrent update operations trying to flush the active segment - pushActiveToPipeline(getActive()); + pushActiveToPipeline(getActive(), true); resetTimeOfOldestEdit(); snapshotId = EnvironmentEdgeManager.currentTime(); // in both cases whatever is pushed to snapshot is cleared from the pipeline @@ -413,34 +413,61 @@ protected List createList(int capacity) { } /** - * Check whether anything need to be done based on the current active set size. - * The method is invoked upon every addition to the active set. - * For CompactingMemStore, flush the active set to the read-only memory if it's - * size is above threshold + * Check whether anything need to be done based on the current active set size. The method is + * invoked upon every addition to the active set. For CompactingMemStore, flush the active set to + * the read-only memory if it's size is above threshold * @param currActive intended segment to update * @param cellToAdd cell to be added to the segment * @param memstoreSizing object to accumulate changed size - * @return true if the cell can be added to the + * @return true if the cell can be added to the currActive */ - private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, + protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, MemStoreSizing memstoreSizing) { - if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) { - if (currActive.setInMemoryFlushed()) { - flushInMemory(currActive); - if (setInMemoryCompactionFlag()) { - // The thread is dispatched to do in-memory compaction in the background - InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); - if (LOG.isTraceEnabled()) { - LOG.trace("Dispatching the MemStore in-memory flush for store " + store - .getColumnFamilyName()); - } - getPool().execute(runnable); + long cellSize = MutableSegment.getCellLength(cellToAdd); + boolean successAdd = false; + while (true) { + long segmentDataSize = currActive.getDataSize(); + if (!inWalReplay && segmentDataSize > inmemoryFlushSize) { + // when replaying edits from WAL there is no need in in-memory flush regardless the size + // otherwise size below flush threshold try to update atomically + break; + } + if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { + if (memstoreSizing != null) { + memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); } + successAdd = true; + break; } - return false; } - return true; - } + + if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) { + // size above flush threshold so we flush in memory + this.tryFlushInMemoryAndCompactingAsync(currActive); + } + return successAdd; + } + + /** + * Try to flush the currActive in memory and submit the in memory compact task to + * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual + * flushing in memory. + * @param currActive + */ + private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) { + if (currActive.setInMemoryFlushed()) { + flushInMemory(currActive); + if (setInMemoryCompactionFlag()) { + // The thread is dispatched to do in-memory compaction in the background + InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); + } + getPool().execute(runnable); + } + } + } // externally visible only for tests // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, @@ -497,26 +524,6 @@ private ThreadPoolExecutor getPool() { return getRegionServices().getInMemoryCompactionPool(); } - protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, - MemStoreSizing memstoreSizing) { - long cellSize = MutableSegment.getCellLength(cellToAdd); - long segmentDataSize = currActive.getDataSize(); - while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) { - // when replaying edits from WAL there is no need in in-memory flush regardless the size - // otherwise size below flush threshold try to update atomically - if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { - if (memstoreSizing != null) { - memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); - } - // enough space for cell - no need to flush - return false; - } - segmentDataSize = currActive.getDataSize(); - } - // size above flush threshold - return true; - } - /** * The request to cancel the compaction asynchronous task (caused by in-memory flush) * The compaction may still happen if the request was sent too late @@ -528,10 +535,6 @@ private void stopCompaction() { } } - protected void pushActiveToPipeline(MutableSegment currActive) { - pushActiveToPipeline(currActive, true); - } - /** * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to * concurrent writes and because we first add cell size to currActive.getDataSize and then diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 28e439efd4e6..965529739ed9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -229,7 +229,7 @@ public boolean flattenOneSegment(long requesterVersion, if ( s.canBeFlattened() ) { s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed if (s.isEmpty()) { - // after s.waitForUpdates() is called, there is no updates preceding,if no cells in s, + // after s.waitForUpdates() is called, there is no updates pending,if no cells in s, // we can skip it. continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java index 4e861da3a5cd..56a46eacff6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -821,11 +821,11 @@ public void testFlatteningToJumboCellChunkMap() throws IOException { // The in-memory flush size is bigger than the size of a single cell, // but smaller than the size of two cells. - // Therefore, the two created cells are flattened together. + // Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and + // flattened. totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM - + 1 * oneCellOnCSLMHeapSize - + 1 * oneCellOnCCMHeapSize; + + 2 * oneCellOnCCMHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 798ddae6f265..f75d2c7d2f0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -50,6 +50,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -1713,8 +1716,11 @@ public void testHFileContextSetWithCFAndTable() throws Exception { assertArrayEquals(table, hFileContext.getTableName()); } + // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell + // but its dataSize exceeds inmemoryFlushSize @Test - public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException { + public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() + throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); byte[] smallValue = new byte[3]; @@ -1738,12 +1744,15 @@ public void testCompactingMemStoreStuckBug26026() throws IOException, Interrupte MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); myCompactingMemStore.smallCellPreUpdateCounter.set(0); - myCompactingMemStore.smallCellPostUpdateCounter.set(0); myCompactingMemStore.largeCellPreUpdateCounter.set(0); - myCompactingMemStore.largeCellPostUpdateCounter.set(0); + final AtomicReference exceptionRef = new AtomicReference(); Thread smallCellThread = new Thread(() -> { + try { store.add(smallCell, new NonThreadSafeMemStoreSizing()); + } catch (Throwable exception) { + exceptionRef.set(exception); + } }); smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); smallCellThread.start(); @@ -1751,9 +1760,9 @@ public void testCompactingMemStoreStuckBug26026() throws IOException, Interrupte String oldThreadName = Thread.currentThread().getName(); try { /** - * 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread - * enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could - * not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true. + * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then + * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread + * invokes flushInMemory. *

* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread * can add cell to currentActive . That is to say when largeCellThread called flushInMemory @@ -1772,6 +1781,131 @@ public void testCompactingMemStoreStuckBug26026() throws IOException, Interrupte Thread.currentThread().setName(oldThreadName); } + assertTrue(exceptionRef.get() == null); + + } + + // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds + // InmemoryFlushSize + @Test(timeout = 60000) + public void testCompactingMemStoreCellExceedInmemoryFlushSize() + throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + int size = (int) ((CompactingMemStore) store.memstore).getInmemoryFlushSize(); + byte[] value = new byte[size + 1]; + + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + long timestamp = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + Cell cell = createCell(qf1, timestamp, seqId, value); + int cellByteSize = MutableSegment.getCellLength(cell); + store.add(cell, memStoreSizing); + assertTrue(memStoreSizing.getCellsCount() == 1); + assertTrue(memStoreSizing.getDataSize() == cellByteSize); + } + + @Test + public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() + throws IOException, InterruptedException { + doTestLargeCellAndSmallCellConcurrently(-1); + doTestLargeCellAndSmallCellConcurrently(0); + doTestLargeCellAndSmallCellConcurrently(1); + + } + + private void doTestLargeCellAndSmallCellConcurrently(int flag) + throws IOException, InterruptedException { + + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[100]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); + int smallCellByteSize = MutableSegment.getCellLength(smallCell); + int largeCellByteSize = MutableSegment.getCellLength(largeCell); + int flushByteSize = 0; + switch (flag) { + case -1: + flushByteSize = largeCellByteSize - 1; + break; + case 0: + flushByteSize = largeCellByteSize; + break; + case 1: + flushByteSize = smallCellByteSize + largeCellByteSize - 1; + break; + default: + throw new IllegalArgumentException(); + + } + + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); + + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); + assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); + myCompactingMemStore.disableCompaction(); + + final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); + final AtomicLong totalCellByteSize = new AtomicLong(0); + final AtomicReference exceptionRef = new AtomicReference(); + Thread smallCellThread = new Thread(() -> { + try { + for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { + long currentTimestamp = timestamp + i; + Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue); + totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); + store.add(cell, memStoreSizing); + } + } catch (Throwable exception) { + exceptionRef.set(exception); + + } + }); + smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); + smallCellThread.start(); + + String oldThreadName = Thread.currentThread().getName(); + try { + /** + * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then + * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then + * largeCellThread invokes flushInMemory. + *

+ * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread + * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. + */ + Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); + for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { + long currentTimestamp = timestamp + i; + Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); + totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); + store.add(cell, memStoreSizing); + } + smallCellThread.join(); + + } finally { + Thread.currentThread().setName(oldThreadName); + } + + assertTrue(exceptionRef.get() == null); + assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); + assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); + assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); + } private HStoreFile mockStoreFileWithLength(long length) { @@ -1875,7 +2009,7 @@ protected List createList(int capacity) { return new ArrayList<>(capacity); } @Override - protected void pushActiveToPipeline(MutableSegment active) { + protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { if (START_TEST.get()) { try { getScannerLatch.await(); @@ -1884,7 +2018,7 @@ protected void pushActiveToPipeline(MutableSegment active) { } } - super.pushActiveToPipeline(active); + super.pushActiveToPipeline(active, checkEmpty); if (START_TEST.get()) { snapshotLatch.countDown(); } @@ -1981,8 +2115,6 @@ public static class MyCompactingMemStore2 extends CompactingMemStore { private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); - private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0); - private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0); public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, HStore store, RegionServicesForStores regionServices, @@ -1990,16 +2122,17 @@ public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparat super(conf, cellComparator, store, regionServices, compactionPolicy); } - protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, + @Override + protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, MemStoreSizing memstoreSizing) { if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { int currentCount = largeCellPreUpdateCounter.incrementAndGet(); if (currentCount <= 1) { try { /** - * smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters - * super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and - * super.shouldFlushInMemory return true. + * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then + * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then + * largeCellThread invokes flushInMemory. */ preCyclicBarrier.await(); } catch (Throwable e) { @@ -2008,7 +2141,7 @@ protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, } } - boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing); + boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { try { preCyclicBarrier.await(); @@ -2051,4 +2184,77 @@ protected void flushInMemory(MutableSegment currentActiveMutableSegment) { } } + + public static class MyCompactingMemStore3 extends CompactingMemStore { + private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; + private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; + + private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); + private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); + private final AtomicInteger flushCounter = new AtomicInteger(0); + private static final int CELL_COUNT = 5; + + public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, cellComparator, store, regionServices, compactionPolicy); + } + + @Override + protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, + MemStoreSizing memstoreSizing) { + if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return returnValue; + } + + @Override + protected void postUpdate(MutableSegment currentActiveMutableSegment) { + super.postUpdate(currentActiveMutableSegment); + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + @Override + protected void flushInMemory(MutableSegment currentActiveMutableSegment) { + super.flushInMemory(currentActiveMutableSegment); + assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); + flushCounter.incrementAndGet(); + if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + void disableCompaction() { + allowCompaction.set(false); + } + + void enableCompaction() { + allowCompaction.set(true); + } + + } } From 79965a922d2f9feea4a0d43af1043fa79b8c33e2 Mon Sep 17 00:00:00 2001 From: chenglei Date: Tue, 24 Aug 2021 15:38:02 +0800 Subject: [PATCH 2/4] add more tests --- .../regionserver/CompactingMemStore.java | 5 +- .../hadoop/hbase/regionserver/TestHStore.java | 90 ++++++++++++++----- 2 files changed, 70 insertions(+), 25 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 6d1d21d95e31..5da0de9a3045 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -449,10 +449,11 @@ protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellTo } /** - * Try to flush the currActive in memory and submit the in memory compact task to + * Try to flush the currActive in memory and submit the background + * {@link InMemoryCompactionRunnable} to * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual * flushing in memory. - * @param currActive + * @param currActive current Active Segment to be flush in memory. */ private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) { if (currActive.setInMemoryFlushed()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index f75d2c7d2f0f..920d9ed967ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1809,16 +1809,19 @@ public void testCompactingMemStoreCellExceedInmemoryFlushSize() assertTrue(memStoreSizing.getDataSize() == cellByteSize); } + // This test is for HBASE-26210 also, test write large cell and small cell concurrently when + // InmemoryFlushSize is smaller,equal with and larger than cell size. @Test public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() throws IOException, InterruptedException { - doTestLargeCellAndSmallCellConcurrently(-1); - doTestLargeCellAndSmallCellConcurrently(0); - doTestLargeCellAndSmallCellConcurrently(1); - + doWriteTestLargeCellAndSmallCellConcurrently(-1); + doWriteTestLargeCellAndSmallCellConcurrently(0); + doWriteTestLargeCellAndSmallCellConcurrently(1); + doWriteTestLargeCellAndSmallCellConcurrently(2); + doWriteTestLargeCellAndSmallCellConcurrently(3); } - private void doTestLargeCellAndSmallCellConcurrently(int flag) + private void doWriteTestLargeCellAndSmallCellConcurrently(int flag) throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -1832,6 +1835,7 @@ private void doTestLargeCellAndSmallCellConcurrently(int flag) int smallCellByteSize = MutableSegment.getCellLength(smallCell); int largeCellByteSize = MutableSegment.getCellLength(largeCell); int flushByteSize = 0; + boolean flushByteSizeLessThanSmallAndLargeCellSize = true; switch (flag) { case -1: flushByteSize = largeCellByteSize - 1; @@ -1842,6 +1846,14 @@ private void doTestLargeCellAndSmallCellConcurrently(int flag) case 1: flushByteSize = smallCellByteSize + largeCellByteSize - 1; break; + case 2: + flushByteSize = smallCellByteSize + largeCellByteSize; + flushByteSizeLessThanSmallAndLargeCellSize = false; + break; + case 3: + flushByteSize = smallCellByteSize + largeCellByteSize + 1; + flushByteSizeLessThanSmallAndLargeCellSize = false; + break; default: throw new IllegalArgumentException(); @@ -1858,6 +1870,12 @@ private void doTestLargeCellAndSmallCellConcurrently(int flag) MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); myCompactingMemStore.disableCompaction(); + if (flushByteSizeLessThanSmallAndLargeCellSize) { + myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; + } else { + myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; + } + final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); final AtomicLong totalCellByteSize = new AtomicLong(0); @@ -1881,12 +1899,18 @@ private void doTestLargeCellAndSmallCellConcurrently(int flag) String oldThreadName = Thread.currentThread().getName(); try { /** + * When flushByteSizeLessThanSmallAndLargeCellSize is true: + *

* 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then * largeCellThread invokes flushInMemory. *

* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. + *

+ * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and + * largeCellThread concurrently write one cell and wait each other, and then write another + * cell etc. */ Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { @@ -1897,15 +1921,18 @@ private void doTestLargeCellAndSmallCellConcurrently(int flag) } smallCellThread.join(); + assertTrue(exceptionRef.get() == null); + assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); + assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); + if (flushByteSizeLessThanSmallAndLargeCellSize) { + assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); + } else { + assertTrue( + myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); + } } finally { Thread.currentThread().setName(oldThreadName); } - - assertTrue(exceptionRef.get() == null); - assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); - assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); - assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); - } private HStoreFile mockStoreFileWithLength(long length) { @@ -2193,6 +2220,7 @@ public static class MyCompactingMemStore3 extends CompactingMemStore { private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); private final AtomicInteger flushCounter = new AtomicInteger(0); private static final int CELL_COUNT = 5; + private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, HStore store, RegionServicesForStores regionServices, @@ -2203,6 +2231,9 @@ public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparat @Override protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, MemStoreSizing memstoreSizing) { + if (!flushByteSizeLessThanSmallAndLargeCellSize) { + return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); + } if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { try { preCyclicBarrier.await(); @@ -2213,11 +2244,12 @@ protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellTo boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { - try { - preCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } return returnValue; } @@ -2225,6 +2257,15 @@ protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellTo @Override protected void postUpdate(MutableSegment currentActiveMutableSegment) { super.postUpdate(currentActiveMutableSegment); + if (!flushByteSizeLessThanSmallAndLargeCellSize) { + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + return; + } + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { try { postCyclicBarrier.await(); @@ -2237,15 +2278,18 @@ protected void postUpdate(MutableSegment currentActiveMutableSegment) { @Override protected void flushInMemory(MutableSegment currentActiveMutableSegment) { super.flushInMemory(currentActiveMutableSegment); - assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); flushCounter.incrementAndGet(); - if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { - try { - postCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } + if (!flushByteSizeLessThanSmallAndLargeCellSize) { + return; } + + assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } void disableCompaction() { From b5b7f31f07534bf547898965ef59deddb96c6c87 Mon Sep 17 00:00:00 2001 From: chenglei Date: Tue, 24 Aug 2021 17:05:20 +0800 Subject: [PATCH 3/4] fix code style --- .../hadoop/hbase/regionserver/TestHStore.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 920d9ed967ad..127f5456aa10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1749,7 +1749,7 @@ public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() final AtomicReference exceptionRef = new AtomicReference(); Thread smallCellThread = new Thread(() -> { try { - store.add(smallCell, new NonThreadSafeMemStoreSizing()); + store.add(smallCell, new NonThreadSafeMemStoreSizing()); } catch (Throwable exception) { exceptionRef.set(exception); } @@ -2235,21 +2235,20 @@ protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellTo return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); } if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { - try { - preCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } } boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { - try { - preCyclicBarrier.await(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } } return returnValue; } From 16ea4b856cbcd4fe0a2543d88e2b4649489ed853 Mon Sep 17 00:00:00 2001 From: chenglei Date: Thu, 26 Aug 2021 15:56:09 +0800 Subject: [PATCH 4/4] simplify tests --- .../hadoop/hbase/regionserver/TestHStore.java | 50 +++++++------------ 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 127f5456aa10..b64f52192740 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntBinaryOperator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -1814,14 +1815,20 @@ public void testCompactingMemStoreCellExceedInmemoryFlushSize() @Test public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() throws IOException, InterruptedException { - doWriteTestLargeCellAndSmallCellConcurrently(-1); - doWriteTestLargeCellAndSmallCellConcurrently(0); - doWriteTestLargeCellAndSmallCellConcurrently(1); - doWriteTestLargeCellAndSmallCellConcurrently(2); - doWriteTestLargeCellAndSmallCellConcurrently(3); - } - - private void doWriteTestLargeCellAndSmallCellConcurrently(int flag) + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); + } + + private void doWriteTestLargeCellAndSmallCellConcurrently( + IntBinaryOperator getFlushByteSize) throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -1834,30 +1841,9 @@ private void doWriteTestLargeCellAndSmallCellConcurrently(int flag) final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); int smallCellByteSize = MutableSegment.getCellLength(smallCell); int largeCellByteSize = MutableSegment.getCellLength(largeCell); - int flushByteSize = 0; - boolean flushByteSizeLessThanSmallAndLargeCellSize = true; - switch (flag) { - case -1: - flushByteSize = largeCellByteSize - 1; - break; - case 0: - flushByteSize = largeCellByteSize; - break; - case 1: - flushByteSize = smallCellByteSize + largeCellByteSize - 1; - break; - case 2: - flushByteSize = smallCellByteSize + largeCellByteSize; - flushByteSizeLessThanSmallAndLargeCellSize = false; - break; - case 3: - flushByteSize = smallCellByteSize + largeCellByteSize + 1; - flushByteSizeLessThanSmallAndLargeCellSize = false; - break; - default: - throw new IllegalArgumentException(); - - } + int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); + boolean flushByteSizeLessThanSmallAndLargeCellSize = + flushByteSize < (smallCellByteSize + largeCellByteSize); conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);