diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 016d503f5eab..6e52d7360dc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -745,7 +745,7 @@ public boolean next(List outResult, ScannerContext scanner } matcher.clearCurrentRow(); seekOrSkipToNextRow(cell); - NextState stateAfterSeekNextRow = needToReturn(outResult); + NextState stateAfterSeekNextRow = needToReturn(); if (stateAfterSeekNextRow != null) { return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); } @@ -753,7 +753,7 @@ public boolean next(List outResult, ScannerContext scanner case SEEK_NEXT_COL: seekOrSkipToNextColumn(cell); - NextState stateAfterSeekNextColumn = needToReturn(outResult); + NextState stateAfterSeekNextColumn = needToReturn(); if (stateAfterSeekNextColumn != null) { return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); } @@ -771,7 +771,7 @@ public boolean next(List outResult, ScannerContext scanner ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0)) ) { seekAsDirection(nextKV); - NextState stateAfterSeekByHint = needToReturn(outResult); + NextState stateAfterSeekByHint = needToReturn(); if (stateAfterSeekByHint != null) { return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); } @@ -828,11 +828,10 @@ private void updateMetricsStore(boolean memstoreRead) { * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell * is changed, we should return the current cells. Otherwise, we may return the cells across * different rows. - * @param outResult the cells which are visible for user scan * @return null is the top cell doesn't change. Otherwise, the NextState to return */ - private NextState needToReturn(List outResult) { - if (!outResult.isEmpty() && topChanged) { + private NextState needToReturn() { + if (topChanged) { return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; } return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 06b8695cea2c..92c288539bc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -67,6 +67,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.IntBinaryOperator; +import java.util.function.IntConsumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -1277,6 +1278,12 @@ private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequ .setValue(value).setSequenceId(sequenceId).build(); } + private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) { + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) + .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn) + .setSequenceId(sequenceId).build(); + } + @Test public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); @@ -1421,6 +1428,74 @@ public long getSmallestReadPoint(HStore store) { } } + @Test + public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException { + final Configuration conf = HBaseConfiguration.create(); + + byte[] r1 = Bytes.toBytes("row1"); + byte[] r2 = Bytes.toBytes("row2"); + + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + + final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + final long ts = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + + init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), + ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), + new MyStoreHook() { + @Override + long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + + store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing); + store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing); + store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing); + + store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing); + store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing); + store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing); + + store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing); + store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing); + store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing); + + Scan scan = new Scan().withStartRow(r1); + + try (final InternalScanner scanner = + new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) { + @Override + protected KeyValueHeap newKVHeap(List scanners, + CellComparator comparator) throws IOException { + return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> { + if (recordBlockSizeCallCount == 6) { + try { + flushStore(store, id++); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + }) { + List cellResult = new ArrayList<>(); + + scanner.next(cellResult); + assertEquals(0, cellResult.size()); + + cellResult.clear(); + + scanner.next(cellResult); + assertEquals(3, cellResult.size()); + for (Cell cell : cellResult) { + assertArrayEquals(r2, CellUtil.cloneRow(cell)); + } + } + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -3137,6 +3212,28 @@ public List subList(int fromIndex, int toIndex) { } } + private interface MyKeyValueHeapHook { + void onRecordBlockSize(int recordBlockSizeCallCount); + } + + private static class MyKeyValueHeap extends KeyValueHeap { + private final MyKeyValueHeapHook hook; + private int recordBlockSizeCallCount; + + public MyKeyValueHeap(List scanners, CellComparator comparator, + MyKeyValueHeapHook hook) throws IOException { + super(scanners, comparator); + this.hook = hook; + } + + @Override + public void recordBlockSize(IntConsumer blockSizeConsumer) { + recordBlockSizeCallCount++; + hook.onRecordBlockSize(recordBlockSizeCallCount); + super.recordBlockSize(blockSizeConsumer); + } + } + public static class MyCompactingMemStore2 extends CompactingMemStore { private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";