diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java index 4ff4a5b95b91..fccf986fb633 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java @@ -90,7 +90,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) // The file info must be loaded before the scanner can be used. // This seems like a bug in HBase, but it's easily worked around. - this.scanner = in.getScanner(conf, false, false); + this.scanner = in.getScanner(conf, false, false, false); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 51e9e1e7755f..81e4e1fb234e 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -470,7 +470,7 @@ public void testWritingPEData() throws Exception { LocatedFileStatus keyFileStatus = iterator.next(); HFile.Reader reader = HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false, false); kvCount += reader.getEntries(); scanner.seekTo(); @@ -516,7 +516,7 @@ public void test_WritingTagData() throws Exception { LocatedFileStatus keyFileStatus = iterator.next(); HFile.Reader reader = HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false, false); scanner.seekTo(); Cell cell = scanner.getCell(); List tagsFromCell = PrivateCellUtil.getTags(cell); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java index e15181e9c94d..3de7e2c865b2 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java @@ -466,7 +466,7 @@ private static void validateTable(Configuration conf, TableName tableName, Strin private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { Configuration conf = util.getConfiguration(); HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false); scanner.seekTo(); int count = 0; do { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 9316b09b8c93..385dbf8e3596 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -545,7 +545,7 @@ private static void validateHFiles(FileSystem fs, String outputPath, String fami private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { Configuration conf = util.getConfiguration(); HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false); scanner.seekTo(); int count = 0; do { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index cc680173a4e3..18018eab15f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -92,8 +92,8 @@ protected boolean isTop() { @Override public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { - final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction); + final boolean isCompaction, boolean checkpointingEnabled) { + final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction, checkpointingEnabled); return new HFileScanner() { final HFileScanner delegate = s; public boolean atEnd = false; @@ -277,6 +277,16 @@ public void close() { public void shipped() throws IOException { this.delegate.shipped(); } + + @Override + public void checkpoint(State state) { + this.delegate.checkpoint(state); + } + + @Override + public void retainBlock() { + this.delegate.retainBlock(); + } }; } @@ -315,7 +325,7 @@ public Optional midKey() throws IOException { @Override public Optional getFirstKey() { if (!firstKeySeeked) { - HFileScanner scanner = getScanner(true, true, false); + HFileScanner scanner = getScanner(true, true, false, false); try { if (scanner.seekTo()) { this.firstKey = Optional.ofNullable(scanner.getKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 73346e8ae4ac..09d41c71bf40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -390,7 +390,7 @@ public interface Reader extends Closeable, CachingBlockReader { CellComparator getComparator(); HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread, - boolean isCompaction); + boolean isCompaction, boolean checkpointingEnabled); HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; @@ -420,7 +420,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread, HFileBlockIndex.ByteArrayKeyBlockIndexReader getMetaBlockIndexReader(); - HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread); + HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread, + boolean checkpointingEnabled); /** * Retrieves general Bloom filter metadata as appropriate for each {@link HFile} version. Knows diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index c84836bcd532..851a6607340c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -2070,7 +2070,8 @@ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) { return createBuilder(blk, newBuf).build(); } - static HFileBlock deepCloneOnHeap(HFileBlock blk) { + // Publicly visible for access in tests + public static HFileBlock deepCloneOnHeap(HFileBlock blk) { ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit()))); return createBuilder(blk, deepCloned).build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java index 24db92b4de1c..b1e09c769dd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java @@ -310,7 +310,7 @@ public int processFile(Path file, boolean checkRootDir) throws IOException { if (verbose || printKey || checkRow || checkFamily || printStats || checkMobIntegrity) { // scan over file and read key/value's and check if requested - HFileScanner scanner = reader.getScanner(getConf(), false, false, false); + HFileScanner scanner = reader.getScanner(getConf(), false, false, false, false); fileStats = new KeyValueStatsCollector(); boolean shouldScanKeysValues; if (this.isSeekToRow && !Bytes.equals(row, reader.getFirstRowKey().orElse(null))) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index af3a9b960aa3..71b378dc6d1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -311,6 +312,7 @@ protected static class HFileScannerImpl implements HFileScanner { protected final boolean cacheBlocks; protected final boolean pread; protected final boolean isCompaction; + private final boolean checkpointingEnabled; private int currKeyLen; private int currValueLen; private int currMemstoreTSLen; @@ -336,38 +338,68 @@ protected static class HFileScannerImpl implements HFileScanner { // RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the // unreferenced block please. protected HFileBlock curBlock; - // Previous blocks that were used in the course of the read + + // Updated to the current prevBlocks size when checkpoint is called. Used to eagerly release + // any blocks accumulated in the fetching of a row, if that row is thrown away due to filterRow. + private int lastCheckpointIndex = 0; + + // Updated by retainBlock(), when a cell is included from the current block. Is reset whenever + // curBlock gets updated. Only honored when lastCheckpointIndex >= 0, meaning a checkpoint + // has occurred. + private boolean shouldRetainBlock = false; + + // Previous blocks that were used in the course of the read, to be released at close, + // checkpoint, or shipped protected final ArrayList prevBlocks = new ArrayList<>(); public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks, - final boolean pread, final boolean isCompaction) { + final boolean pread, final boolean isCompaction, boolean checkpointingEnabled) { this.reader = reader; this.cacheBlocks = cacheBlocks; this.pread = pread; this.isCompaction = isCompaction; + this.checkpointingEnabled = checkpointingEnabled; } void updateCurrBlockRef(HFileBlock block) { if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) { return; } - if (this.curBlock != null && this.curBlock.isSharedMem()) { - prevBlocks.add(this.curBlock); - } + handlePrevBlock(); this.curBlock = block; } void reset() { + handlePrevBlock(); + this.curBlock = null; + } + + /** + * Add curBlock to prevBlocks or release it immediately, depending on whether a checkpoint has + * occurred and we've been instructed to retain the block. If no checkpoint has occurred, we use + * original logic to always add to prevBlocks. If checkpoint occurred, release the block unless + * {@link #retainBlock()} has been called. + */ + private void handlePrevBlock() { // We don't have to keep ref to heap block if (this.curBlock != null && this.curBlock.isSharedMem()) { - this.prevBlocks.add(this.curBlock); + if (checkpointingEnabled && !shouldRetainBlock) { + this.curBlock.release(); + } else { + prevBlocks.add(this.curBlock); + } } - this.curBlock = null; + shouldRetainBlock = false; } private void returnBlocks(boolean returnAll) { - this.prevBlocks.forEach(HFileBlock::release); + this.prevBlocks.forEach((block) -> { + if (block != null) { + block.release(); + } + }); this.prevBlocks.clear(); + this.lastCheckpointIndex = 0; if (returnAll && this.curBlock != null) { this.curBlock.release(); this.curBlock = null; @@ -1047,6 +1079,39 @@ public int compareKey(CellComparator comparator, Cell key) { public void shipped() throws IOException { this.returnBlocks(false); } + + /** + * Sets the last checkpoint index to the current prevBlocks size. If called with State.FILTERED, + * releases and nulls out any prevBlocks entries which were added since the last checkpoint. + * Nulls out instead of removing to avoid unnecessary resizing of the list. + */ + @Override + public void checkpoint(State state) { + if (!checkpointingEnabled) { + return; + } + + if (state == State.FILTERED) { + assert lastCheckpointIndex >= 0; + for (int i = lastCheckpointIndex; i < prevBlocks.size(); i++) { + prevBlocks.get(i).release(); + prevBlocks.set(i, null); + } + } + lastCheckpointIndex = prevBlocks.size(); + } + + /** + * Sets state so that when curBlock is finished, it gets added onto prevBlocks. Otherwise, we + * eagerly release the block when checkpointing is enabled. + */ + @Override + public void retainBlock() { + if (!checkpointingEnabled) { + return; + } + shouldRetainBlock = true; + } } @Override @@ -1459,8 +1524,8 @@ protected static class EncodedScanner extends HFileScannerImpl { private final DataBlockEncoder dataBlockEncoder; public EncodedScanner(HFile.Reader reader, boolean cacheBlocks, boolean pread, - boolean isCompaction, HFileContext meta, Configuration conf) { - super(reader, cacheBlocks, pread, isCompaction); + boolean isCompaction, boolean checkpointingEnabled, Configuration conf, HFileContext meta) { + super(reader, cacheBlocks, pread, isCompaction, checkpointingEnabled); DataBlockEncoding encoding = reader.getDataBlockEncoding(); dataBlockEncoder = encoding.getEncoder(); decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(conf, meta); @@ -1650,36 +1715,46 @@ public boolean prefetchComplete() { * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up * in a Scanner. Letting go of your references to the scanner is sufficient. NOTE: Do not use this * overload of getScanner for compactions. See - * {@link #getScanner(Configuration, boolean, boolean, boolean)} - * @param conf Store configuration. - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @param pread Use positional read rather than seek+read if true (pread is better for - * random reads, seek+read is better scanning). + * {@link HFile.Reader#getScanner(Configuration, boolean, boolean, boolean, boolean)} + * @param conf Store configuration. + * @param cacheBlocks True if we should cache blocks read in by this scanner. + * @param pread Use positional read rather than seek+read if true (pread is better + * for random reads, seek+read is better scanning). + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated if + * {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks can + * be released early at safe checkpoints. * @return Scanner on this file. */ @Override - public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread) { - return getScanner(conf, cacheBlocks, pread, false); + public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread, + boolean checkpointingEnabled) { + return getScanner(conf, cacheBlocks, pread, false, checkpointingEnabled); } /** * Create a Scanner on this file. No seeks or reads are done on creation. Call * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up * in a Scanner. Letting go of your references to the scanner is sufficient. - * @param conf Store configuration. - * @param cacheBlocks True if we should cache blocks read in by this scanner. - * @param pread Use positional read rather than seek+read if true (pread is better for - * random reads, seek+read is better scanning). - * @param isCompaction is scanner being used for a compaction? + * @param conf Store configuration. + * @param cacheBlocks True if we should cache blocks read in by this scanner. + * @param pread Use positional read rather than seek+read if true (pread is better + * for random reads, seek+read is better scanning). + * @param isCompaction is scanner being used for a compaction? + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated if + * {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks can + * be released early at safe checkpoints. * @return Scanner on this file. */ @Override public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { + final boolean isCompaction, boolean checkpointingEnabled) { if (dataBlockEncoder.useEncodedScanner()) { - return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext, conf); + return new EncodedScanner(this, cacheBlocks, pread, isCompaction, checkpointingEnabled, conf, + this.hfileContext); } - return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction); + return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction, checkpointingEnabled); } public int getMajorVersion() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java index c254201db9ba..052195a3b891 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/HFileProcedurePrettyPrinter.java @@ -137,7 +137,7 @@ private void processFile(Path file) throws IOException { out.println("Scanning -> " + file); FileSystem fs = file.getFileSystem(conf); try (HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, conf); - HFileScanner scanner = reader.getScanner(conf, false, false, false)) { + HFileScanner scanner = reader.getScanner(conf, false, false, false, false)) { if (procId != null) { if ( scanner.seekTo(PrivateCellUtil.createFirstOnRow(Bytes.toBytes(procId.longValue()))) != -1 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5a2a74a61a46..2b36ce5e016c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -634,7 +634,7 @@ public void assertBulkLoadHFileOk(Path srcPath) throws IOException { long verificationStartTime = EnvironmentEdgeManager.currentTime(); LOG.info("Full verification started for bulk load hfile: {}", srcPath); Cell prevCell = null; - HFileScanner scanner = reader.getScanner(conf, false, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false, false); scanner.seekTo(); do { Cell cell = scanner.getCell(); @@ -949,25 +949,30 @@ public List getScanners(boolean cacheBlocks, boolean isGet, boo boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) throws IOException { return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, - readPt); + readPt, false); } /** * Get all scanners with no filtering based on TTL (that happens further down the line). - * @param cacheBlocks cache the blocks or not - * @param usePread true to use pread, false if not - * @param isCompaction true if the scanner is created for compaction - * @param matcher the scan query matcher - * @param startRow the start row - * @param includeStartRow true to include start row, false if not - * @param stopRow the stop row - * @param includeStopRow true to include stop row, false if not - * @param readPt the read point of the current scan + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param startRow the start row + * @param includeStartRow true to include start row, false if not + * @param stopRow the stop row + * @param includeStopRow true to include stop row, false if not + * @param readPt the read point of the current scan + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated if + * {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks can + * be released early at safe checkpoints. * @return all scanners for this store */ public List getScanners(boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, - byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { + byte[] stopRow, boolean includeStopRow, long readPt, boolean checkpointingEnabled) + throws IOException { Collection storeFilesToScan; List memStoreScanners; this.storeEngine.readLock(); @@ -991,8 +996,9 @@ public List getScanners(boolean cacheBlocks, boolean usePread, // TODO this used to get the store files in descending order, // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner.getScannersForStoreFiles( - storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt); + List sfScanners = + StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, + isCompaction, false, matcher, readPt, checkpointingEnabled); List scanners = new ArrayList<>(sfScanners.size() + 1); scanners.addAll(sfScanners); // Then the memstore scanners @@ -1028,14 +1034,18 @@ private static void clearAndClose(List scanners) { * @param stopRow the stop row * @param readPt the read point of the current scan * @param includeMemstoreScanner true if memstore has to be included + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated if + * {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks + * can be released early at safe checkpoints. * @return scanners on the given files and on the memstore if specified */ public List getScanners(List files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, - byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) - throws IOException { + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, + boolean checkpointingEnabled) throws IOException { return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, - false, readPt, includeMemstoreScanner); + false, readPt, includeMemstoreScanner, checkpointingEnabled); } /** @@ -1052,12 +1062,16 @@ public List getScanners(List files, boolean cacheBl * @param includeStopRow true to include stop row, false if not * @param readPt the read point of the current scan * @param includeMemstoreScanner true if memstore has to be included + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated if + * {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks + * can be released early at safe checkpoints. * @return scanners on the given files and on the memstore if specified */ public List getScanners(List files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean includeMemstoreScanner, boolean checkpointingEnabled) throws IOException { List memStoreScanners = null; if (includeMemstoreScanner) { this.storeEngine.readLock(); @@ -1069,7 +1083,7 @@ public List getScanners(List files, boolean cacheBl } try { List sfScanners = StoreFileScanner.getScannersForStoreFiles(files, - cacheBlocks, usePread, isCompaction, false, matcher, readPt); + cacheBlocks, usePread, isCompaction, false, matcher, readPt, checkpointingEnabled); List scanners = new ArrayList<>(sfScanners.size() + 1); scanners.addAll(sfScanners); // Then the memstore scanners @@ -1723,12 +1737,16 @@ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, * @param includeStopRow should the scan include the stop row * @param readPt the read point of the current scane * @param includeMemstoreScanner whether the current scanner should include memstorescanner + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated if + * {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks + * can be released early at safe checkpoints. * @return list of scanners recreated on the current Scanners */ public List recreateScanners(List currentFileScanners, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean includeMemstoreScanner, boolean checkpointingEnabled) throws IOException { this.storeEngine.readLock(); try { Map name2File = @@ -1752,7 +1770,7 @@ public List recreateScanners(List currentFileS return null; } return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, - includeStartRow, stopRow, includeStopRow, readPt, false); + includeStartRow, stopRow, includeStopRow, readPt, false, checkpointingEnabled); } finally { this.storeEngine.readUnlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index ae514f0aef8d..0b9579251e3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -543,9 +543,9 @@ private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOEx * Must be called after initReader. */ public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn) { + boolean canOptimizeForNonNullColumn, boolean checkpointingEnabled) { return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, - canOptimizeForNonNullColumn); + canOptimizeForNonNullColumn, checkpointingEnabled); } /** @@ -554,10 +554,10 @@ public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long s * Must be called after initReader. */ public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) - throws IOException { + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn, + boolean checkpointingEnabled) throws IOException { return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false, - isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); + isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn, checkpointingEnabled); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 1fe80bc58b01..fb8f40569631 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -420,4 +420,32 @@ public void shipped() throws IOException { } } } + + @Override + public void checkpoint(State state) { + if (current != null) { + current.checkpoint(state); + } + if (this.heap != null) { + for (KeyValueScanner scanner : this.heap) { + scanner.checkpoint(state); + } + } + // Also checkpoint any scanners for delayed close. These would be exhausted scanners, + // which may contain blocks that were totally filtered during a request. If so, the checkpoint + // will release them. + if (scannersForDelayedClose != null) { + for (KeyValueScanner scanner : scannersForDelayedClose) { + scanner.checkpoint(state); + } + } + } + + @Override + public void retainBlock() { + if (current != null) { + current.retainBlock(); + } + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 02d4d85d7e13..48b71015d194 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -78,4 +78,14 @@ public Cell getNextIndexedKey() { public void shipped() throws IOException { // do nothing } + + @Override + public void checkpoint(State state) { + // do nothing + } + + @Override + public void retainBlock() { + // do nothing + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index 11d4c20f581b..16baad4ab879 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -426,6 +426,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // Used to check time limit LimitScope limitScope = LimitScope.BETWEEN_CELLS; + // reset checkpoint at start of each row, this way if row is filtered we only release blocks + // exhausted since this row began. + checkpointIfFiltering(State.START); + // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, @@ -501,6 +505,7 @@ private boolean nextInternal(List results, ScannerContext scannerContext) return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } results.clear(); + checkpointIfFiltering(State.FILTERED); // Read nothing as the rowkey was filtered, but still need to check time limit if (scannerContext.checkTimeLimit(limitScope)) { @@ -553,6 +558,7 @@ private boolean nextInternal(List results, ScannerContext scannerContext) if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { incrementCountOfRowsFilteredMetric(scannerContext); results.clear(); + checkpointIfFiltering(State.FILTERED); boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -602,6 +608,7 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // Double check to prevent empty rows from appearing in result. It could be // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { + checkpointIfFiltering(State.FILTERED); incrementCountOfRowsFilteredMetric(scannerContext); boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { @@ -783,6 +790,32 @@ public void shipped() throws IOException { } } + /** + * Calls checkpoint with the given state, but only if this scanner has filters. It's unnecessary + * to do checkpointing at this level if there are no filters. + */ + private void checkpointIfFiltering(State state) { + if (filter == null) { + return; + } + checkpoint(state); + } + + @Override + public void checkpoint(State state) { + if (storeHeap != null) { + storeHeap.checkpoint(state); + } + if (joinedHeap != null) { + joinedHeap.checkpoint(state); + } + } + + @Override + public void retainBlock() { + // do nothing. this is really only called in StoreScanner + } + @Override public void run() throws IOException { // This is the RPC callback method executed. We do the close in of the scanner in this diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index c5dbca6b6e2b..c48480e3f3e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -309,6 +309,16 @@ public void shipped() throws IOException { // do nothing } + @Override + public void checkpoint(State state) { + // do nothing + } + + @Override + public void retainBlock() { + // do nothing + } + // debug method @Override public String toString() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java index a7e60c07d298..2d36b4b50de1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java @@ -23,7 +23,11 @@ /** * This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests * to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch - * {@link #shipped()} will get called. + * {@link #shipped()} will get called.
+ * Scans of large numbers of fully filtered blocks (due to Filter, or sparse columns, etc) can cause + * excess memory to be held while waiting for {@link #shipped()} to be called. Therefore, there's a + * checkpoint mechanism via {@link #checkpoint(State)}. These enable fully filtered blocks to be + * eagerly released, since they are not referenced by cells being returned to clients. */ @InterfaceAudience.Private public interface Shipper { @@ -33,4 +37,27 @@ public interface Shipper { * can be done here. */ void shipped() throws IOException; + + enum State { + START, + FILTERED + } + + /** + * Called during processing of a batch of scanned rows, before returning to the client. Allows + * releasing of blocks which have been totally skipped in the result set due to filters.
+ * Should be called with {@link State#START} at the beginning of a request for a row. This will + * set state necessary to handle {@link State#FILTERED}. Calling with {@link State#FILTERED} will + * release any blocks which have been fully processed since the last call to + * {@link #checkpoint(State)}. Calling again with {@link State#START} will reset the pointers. + */ + void checkpoint(State state); + + /** + * Used by upstream callers to notify the shipper that the current block should be retained for + * shipping when {@link #shipped()} or {@link #checkpoint(State)} are called. Otherwise, the block + * will be released immediately once it's no longer needed. Only has an effect after + * {@link #checkpoint(State)} has been called at least once. + */ + void retainBlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index a2778e54a725..cba2fc316bbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -139,11 +139,17 @@ public CellComparator getComparator() { * {@link KeyValueScanner#getScannerOrder()}. * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, * otherwise {@code false}. This is a hint for optimization. + * @param checkpointingEnabled if true, blocks will only be retained as they are iterated + * if {@link Shipper#retainBlock()} is called. Further, + * {@link Shipper#checkpoint(Shipper.State)} is enabled so + * blocks can be released early at safe checkpoints. * @return a scanner */ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, - boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { - return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn, + boolean checkpointingEnabled) { + return new StoreFileScanner(this, + getScanner(cacheBlocks, pread, isCompaction, checkpointingEnabled), !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } @@ -190,7 +196,7 @@ void readCompleted() { */ @Deprecated public HFileScanner getScanner(boolean cacheBlocks, boolean pread) { - return getScanner(cacheBlocks, pread, false); + return getScanner(cacheBlocks, pread, false, false); } /** @@ -203,8 +209,9 @@ public HFileScanner getScanner(boolean cacheBlocks, boolean pread) { * @see HBASE-15296 */ @Deprecated - public HFileScanner getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction) { - return reader.getScanner(conf, cacheBlocks, pread, isCompaction); + public HFileScanner getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, + boolean checkpointingEnabled) { + return reader.getScanner(conf, cacheBlocks, pread, isCompaction, checkpointingEnabled); } public void close(boolean evictOnClose) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 74147f8ec059..93f344ebd2c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -75,9 +75,9 @@ public class StoreFileScanner implements KeyValueScanner { * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * @param useMVCC If true, scanner will filter out updates with MVCC larger * than {@code readPt}. + * @param hasMVCC Set to true if underlying store file reader has MVCC info. * @param readPt MVCC value to use to filter out the updates newer than this * scanner. - * @param hasMVCC Set to true if underlying store file reader has MVCC info. * @param scannerOrder Order of the scanner relative to other scanners. See * {@link KeyValueScanner#getScannerOrder()}. * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column, @@ -102,7 +102,7 @@ public static List getScannersForStoreFiles(Collection getScannersForStoreFiles(Collection getScannersForStoreFiles(Collection files, boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, - ScanQueryMatcher matcher, long readPt) throws IOException { + ScanQueryMatcher matcher, long readPt, boolean checkpointingEnabled) throws IOException { if (files.isEmpty()) { return Collections.emptyList(); } @@ -130,10 +130,11 @@ public static List getScannersForStoreFiles(Collection getScannersForCompaction(Collection columns, + List scanners) throws IOException { + this(null, scan, scanInfo, columns, scanners); + } + + // Used to instantiate a scanner for user scan in test + StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, NavigableSet columns, List scanners) throws IOException { // 0 is passed as readpoint because the test bypasses Store - this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), + this(store, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), ScanType.USER_SCAN); this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); @@ -630,6 +636,10 @@ public boolean next(List outResult, ScannerContext scannerContext) throws // also update metric accordingly if (this.countPerRow > storeOffset) { outResult.add(cell); + // call retainBlock since we are including this cell in the result + // we only want to retain blocks that are backing cells that we include. + // if the cell is filtered upstream, it will be released by checkpoint. + this.heap.retainBlock(); // Update local tracking information count++; @@ -972,8 +982,9 @@ public void updateReaders(List sfs, List memStoreSc // Eagerly creating scanners so that we have the ref counting ticking on the newly created // store files. In case of stream scanners this eager creation does not induce performance // penalty because in scans (that uses stream scanners) the next() call is bound to happen. + // Set checkpointingEnabled to true here, because we handle checkpointing and retainBlock List scanners = store.getScanners(sfs, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); + isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false, true); flushedstoreFileScanners.addAll(scanners); if (!CollectionUtils.isEmpty(memStoreScanners)) { clearAndClose(memStoreScannersAfterFlush); @@ -1111,9 +1122,10 @@ void trySwitchToStreamRead() { try { // We must have a store instance here so no null check // recreate the scanners on the current file scanners + // Set checkpointingEnabled to true here, because we handle checkpointing and retainBlock fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), - readPt, false); + readPt, false, true); if (fileScanners == null) { return; } @@ -1241,4 +1253,17 @@ public void shipped() throws IOException { trySwitchToStreamRead(); } } + + @Override + public void checkpoint(State state) { + if (this.heap != null) { + this.heap.checkpoint(state); + } + // Also checkpoint any scanners for delayed close. These would be exhausted scanners, + // which may contain blocks that were totally filtered during a request. If so, the checkpoint + // will release them. + for (KeyValueScanner scanner : scannersForDelayedClose) { + scanner.checkpoint(state); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index e1bea90f49d9..5e13f03314b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -770,7 +770,7 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, .build(); halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); - HFileScanner scanner = halfReader.getScanner(false, false, false); + HFileScanner scanner = halfReader.getScanner(false, false, false, false); scanner.seekTo(); do { halfWriter.append(scanner.getCell()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java index 0870dbe6f9bc..df2943952ca2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java @@ -132,7 +132,7 @@ public static void doSmokeTest(FileSystem fs, Path path, String codec) throws Ex Cell cc = null; HFile.Reader reader = HFile.createReader(fs, path, CacheConfig.DISABLED, true, conf); try { - HFileScanner scanner = reader.getScanner(conf, false, true); + HFileScanner scanner = reader.getScanner(conf, false, true, false); scanner.seekTo(); // position to the start of file // Scanner does not do Cells yet. Do below for now till fixed. cc = scanner.getCell(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java index e60d23d12065..c34144920be1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java @@ -408,7 +408,7 @@ public SequentialReadBenchmark(Configuration conf, FileSystem fs, Path mf, int t @Override void setUp() throws Exception { super.setUp(); - this.scanner = this.reader.getScanner(conf, false, false); + this.scanner = this.reader.getScanner(conf, false, false, false); this.scanner.seekTo(); } @@ -437,7 +437,7 @@ public UniformRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, in @Override void doRow(int i) throws Exception { - HFileScanner scanner = this.reader.getScanner(conf, false, true); + HFileScanner scanner = this.reader.getScanner(conf, false, true, false); byte[] b = getRandomRow(); if (scanner.seekTo(createCell(b)) < 0) { LOG.info("Not able to seekTo " + new String(b)); @@ -462,7 +462,7 @@ public UniformRandomSmallScan(Configuration conf, FileSystem fs, Path mf, int to @Override void doRow(int i) throws Exception { - HFileScanner scanner = this.reader.getScanner(conf, false, false); + HFileScanner scanner = this.reader.getScanner(conf, false, false, false); byte[] b = getRandomRow(); // System.out.println("Random row: " + new String(b)); Cell c = createCell(b); @@ -500,7 +500,7 @@ public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, i @Override void doRow(int i) throws Exception { - HFileScanner scanner = this.reader.getScanner(conf, false, true); + HFileScanner scanner = this.reader.getScanner(conf, false, true, false); byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes(); scanner.seekTo(createCell(gaussianRandomRowBytes)); for (int ii = 0; ii < 30; ii++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index 86df2bab8d6a..31ef8760989c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -20,11 +20,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -33,6 +38,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; @@ -44,11 +51,18 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -91,6 +105,7 @@ public class TestBlockEvictionFromClient { private static byte[] ROW2 = Bytes.toBytes("testRow2"); private static byte[] ROW3 = Bytes.toBytes("testRow3"); private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[] FAMILY2 = Bytes.toBytes("testFamily1"); private static byte[][] FAMILIES_1 = new byte[1][0]; private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); @@ -179,7 +194,7 @@ public void tearDown() throws Exception { } @Test - public void testBlockEvictionWithParallelScans() throws Exception { + public void testBlockEvictionWithParallelScans() throws Throwable { Table table = null; try { latch = new CountDownLatch(1); @@ -215,7 +230,7 @@ public void testBlockEvictionWithParallelScans() throws Exception { Thread.sleep(100); checkForBlockEviction(cache, false, false); for (ScanThread thread : scanThreads) { - thread.join(); + thread.joinAndRethrow(); } // CustomInnerRegionObserver.sleepTime.set(0); Iterator iterator = cache.iterator(); @@ -265,7 +280,7 @@ public void testBlockEvictionWithParallelScans() throws Exception { } @Test - public void testParallelGetsAndScans() throws IOException, InterruptedException { + public void testParallelGetsAndScans() throws Throwable { Table table = null; try { latch = new CountDownLatch(2); @@ -300,7 +315,7 @@ public void testParallelGetsAndScans() throws IOException, InterruptedException CustomInnerRegionObserver.waitForGets.set(false); checkForBlockEviction(cache, false, false); for (GetThread thread : getThreads) { - thread.join(); + thread.joinAndRethrow(); } // Verify whether the gets have returned the blocks that it had CustomInnerRegionObserver.waitForGets.set(true); @@ -308,7 +323,7 @@ public void testParallelGetsAndScans() throws IOException, InterruptedException checkForBlockEviction(cache, true, false); getLatch.countDown(); for (ScanThread thread : scanThreads) { - thread.join(); + thread.joinAndRethrow(); } System.out.println("Scans should have returned the bloks"); // Check with either true or false @@ -323,7 +338,7 @@ public void testParallelGetsAndScans() throws IOException, InterruptedException } @Test - public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { + public void testGetWithCellsInDifferentFiles() throws Throwable { Table table = null; try { latch = new CountDownLatch(1); @@ -366,7 +381,7 @@ public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedEx Thread.sleep(200); CustomInnerRegionObserver.getCdl().get().countDown(); for (GetThread thread : getThreads) { - thread.join(); + thread.joinAndRethrow(); } // Verify whether the gets have returned the blocks that it had CustomInnerRegionObserver.waitForGets.set(true); @@ -383,8 +398,7 @@ public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedEx @Test // TODO : check how block index works here - public void testGetsWithMultiColumnsAndExplicitTracker() - throws IOException, InterruptedException { + public void testGetsWithMultiColumnsAndExplicitTracker() throws Throwable { Table table = null; try { latch = new CountDownLatch(1); @@ -428,34 +442,13 @@ public void testGetsWithMultiColumnsAndExplicitTracker() // Create three sets of gets GetThread[] getThreads = initiateGet(table, true, false); Thread.sleep(200); - Iterator iterator = cache.iterator(); - boolean usedBlocksFound = false; - int refCount = 0; - int noOfBlocksWithRef = 0; - while (iterator.hasNext()) { - CachedBlock next = iterator.next(); - BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - if (cache instanceof BucketCache) { - refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); - } else if (cache instanceof CombinedBlockCache) { - refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); - } else { - continue; - } - if (refCount != 0) { - // Blocks will be with count 3 - System.out.println("The refCount is " + refCount); - assertEquals(NO_OF_THREADS, refCount); - usedBlocksFound = true; - noOfBlocksWithRef++; - } - } - assertTrue(usedBlocksFound); - // the number of blocks referred - assertEquals(10, noOfBlocksWithRef); + int noOfBlocksWithRef = countReferences(cache); + // 3 blocks for the 3 returned cells, plus 1 extra because we don't fully exhaust one of the + // storefiles so one remains in curBlock after we SEEK_NEXT_ROW + assertEquals(4, noOfBlocksWithRef); CustomInnerRegionObserver.getCdl().get().countDown(); for (GetThread thread : getThreads) { - thread.join(); + thread.joinAndRethrow(); } // Verify whether the gets have returned the blocks that it had CustomInnerRegionObserver.waitForGets.set(true); @@ -471,7 +464,7 @@ public void testGetsWithMultiColumnsAndExplicitTracker() } @Test - public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { + public void testGetWithMultipleColumnFamilies() throws Throwable { Table table = null; try { latch = new CountDownLatch(1); @@ -522,34 +515,12 @@ public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedE // Create three sets of gets GetThread[] getThreads = initiateGet(table, true, true); Thread.sleep(200); - Iterator iterator = cache.iterator(); - boolean usedBlocksFound = false; - int refCount = 0; - int noOfBlocksWithRef = 0; - while (iterator.hasNext()) { - CachedBlock next = iterator.next(); - BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - if (cache instanceof BucketCache) { - refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); - } else if (cache instanceof CombinedBlockCache) { - refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); - } else { - continue; - } - if (refCount != 0) { - // Blocks will be with count 3 - System.out.println("The refCount is " + refCount); - assertEquals(NO_OF_THREADS, refCount); - usedBlocksFound = true; - noOfBlocksWithRef++; - } - } - assertTrue(usedBlocksFound); + int noOfBlocksWithRef = countReferences(cache); // the number of blocks referred assertEquals(3, noOfBlocksWithRef); CustomInnerRegionObserver.getCdl().get().countDown(); for (GetThread thread : getThreads) { - thread.join(); + thread.joinAndRethrow(); } // Verify whether the gets have returned the blocks that it had CustomInnerRegionObserver.waitForGets.set(true); @@ -629,7 +600,7 @@ public void testBlockRefCountAfterSplits() throws IOException, InterruptedExcept } @Test - public void testMultiGets() throws IOException, InterruptedException { + public void testMultiGets() throws Throwable { Table table = null; try { latch = new CountDownLatch(2); @@ -670,29 +641,13 @@ public void testMultiGets() throws IOException, InterruptedException { // Create three sets of gets MultiGetThread[] getThreads = initiateMultiGet(table); Thread.sleep(200); - int refCount; Iterator iterator = cache.iterator(); - boolean foundNonZeroBlock = false; - while (iterator.hasNext()) { - CachedBlock next = iterator.next(); - BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - if (cache instanceof BucketCache) { - refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); - } else if (cache instanceof CombinedBlockCache) { - refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); - } else { - continue; - } - if (refCount != 0) { - assertEquals(NO_OF_THREADS, refCount); - foundNonZeroBlock = true; - } - } - assertTrue("Should have found nonzero ref count block", foundNonZeroBlock); + int noOfBlocksWithRef = countReferences(cache); + assertTrue("Should have found nonzero ref count block", noOfBlocksWithRef > 0); CustomInnerRegionObserver.getCdl().get().countDown(); CustomInnerRegionObserver.getCdl().get().countDown(); for (MultiGetThread thread : getThreads) { - thread.join(); + thread.joinAndRethrow(); } // Verify whether the gets have returned the blocks that it had CustomInnerRegionObserver.waitForGets.set(true); @@ -708,7 +663,7 @@ public void testMultiGets() throws IOException, InterruptedException { } @Test - public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { + public void testScanWithMultipleColumnFamilies() throws Throwable { Table table = null; try { latch = new CountDownLatch(1); @@ -757,34 +712,12 @@ public void testScanWithMultipleColumnFamilies() throws IOException, Interrupted // Create three sets of gets ScanThread[] scanThreads = initiateScan(table, true); Thread.sleep(200); - Iterator iterator = cache.iterator(); - boolean usedBlocksFound = false; - int refCount = 0; - int noOfBlocksWithRef = 0; - while (iterator.hasNext()) { - CachedBlock next = iterator.next(); - BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - if (cache instanceof BucketCache) { - refCount = ((BucketCache) cache).getRpcRefCount(cacheKey); - } else if (cache instanceof CombinedBlockCache) { - refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey); - } else { - continue; - } - if (refCount != 0) { - // Blocks will be with count 3 - System.out.println("The refCount is " + refCount); - assertEquals(NO_OF_THREADS, refCount); - usedBlocksFound = true; - noOfBlocksWithRef++; - } - } - assertTrue(usedBlocksFound); + int noOfBlocksWithRef = countReferences(cache); // the number of blocks referred assertEquals(12, noOfBlocksWithRef); CustomInnerRegionObserver.getCdl().get().countDown(); for (ScanThread thread : scanThreads) { - thread.join(); + thread.joinAndRethrow(); } // giving some time for the block to be decremented checkForBlockEviction(cache, true, false); @@ -795,6 +728,347 @@ public void testScanWithMultipleColumnFamilies() throws IOException, Interrupted } } + /** + * This test is a baseline for the below filtered tests. It proves that a full unfiltered scan + * should retain 12 blocks based on the test data in {@link #setupStreamScanTest()}. Prior to + * HBASE-27227, a filtered scan would retain the same number of blocks. The further tests below + * show that with HBASE-27227 the filtered scans retain far fewer blocks.
+ * We use a stream scan to avoid switching to stream mid-request. This throws off the counting due + * to how the test coordinates with a countdown latch.
+ * In addition to verifying the actual data returned by every scan, + * {@link #countReferences(BlockCache)} also corrupts the byte buffs allocated to any blocks we + * eagerly release due to checkpointing. This validates that our checkpointing does not release + * any blocks that are necessary to serve the scan request. If we did, it'd blow up the test + * trying to create a cell block response from corrupted cells. + */ + @Test + public void testStreamScan() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // setupStreamScanTest writes 12 cells, each large enough to consume an entire block. + // A "full table scan" will retain all 12 blocks. + assertNoBlocksWithRef(testCase.table, testCase.baseScan, testCase.cache, 12, + new ExpectedResult(ROW, FAMILY, QUALIFIER, data2), + new ExpectedResult(ROW, FAMILY, QUALIFIER2, data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier1"), data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier2"), data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier3"), data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier4"), data2), + new ExpectedResult(ROW1, FAMILY2, Bytes.toBytes("testQualifier1"), data2), + new ExpectedResult(ROW1, FAMILY2, Bytes.toBytes("testQualifier2"), data2), + new ExpectedResult(ROW1, FAMILY2, Bytes.toBytes("testQualifier3"), data2), + new ExpectedResult(ROW1, FAMILY2, Bytes.toBytes("testQualifier4"), data2), + new ExpectedResult(ROW3, FAMILY, QUALIFIER, data), + new ExpectedResult(ROW3, FAMILY2, QUALIFIER2, data2)); + } + } + + @Test + public void testStreamScanWithOneColumn() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // We expect 2 blocks. Everything is filtered out at the StoreScanner layer, so we can + // just not retain the blocks for excluded cells right away. + byte[] qualifier = Bytes.toBytes("testQualifier1"); + assertNoBlocksWithRef(testCase.table, + testCase.baseScan + .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(qualifier))), + testCase.cache, 2, new ExpectedResult(ROW1, FAMILY, qualifier, data2), + new ExpectedResult(ROW1, FAMILY2, qualifier, data2)); + } + } + + @Test + public void testStreamScanWithOneColumnQualifierFilter() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // This is the same as testStreamScanWithOneColumn but using a filter instead. Same reasoning + // as that test. + assertNoBlocksWithRef(testCase.table, + testCase.baseScan + .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))), + testCase.cache, 2, new ExpectedResult(ROW, FAMILY, QUALIFIER, data2), + new ExpectedResult(ROW3, FAMILY, QUALIFIER, data)); + } + } + + @Test + public void testStreamScanWithRowFilter() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // we expect 2 because the "ROW" row has 2 cell and all cells are 1 block + assertNoBlocksWithRef(testCase.table, + testCase.baseScan + .setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW))), + testCase.cache, 2, new ExpectedResult(ROW, FAMILY, QUALIFIER, data2), + new ExpectedResult(ROW, FAMILY, QUALIFIER2, data2)); + } + } + + @Test + public void testStreamScanWithRowFilterMixedRows() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // we expect 2 because each of "ROW" and "ROW3" have 1 cell that match the filter + assertNoBlocksWithRef(testCase.table, + testCase.baseScan.addColumn(FAMILY, QUALIFIER) + .setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, + new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW)), + new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW3)))), + testCase.cache, 2, new ExpectedResult(ROW, FAMILY, QUALIFIER, data2), + new ExpectedResult(ROW3, FAMILY, QUALIFIER, data)); + } + } + + @Test + public void testStreamScanWithRowOffset() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // we expect 4, because 2 rows (ROW and ROW1) have enough columns in FAMILY to exceed offset. + // ROW has 1 column, and ROW1 has 3. Each retains 1 block. + assertNoBlocksWithRef(testCase.table, + testCase.baseScan.addFamily(FAMILY).setRowOffsetPerColumnFamily(1), testCase.cache, 4, + new ExpectedResult(ROW, FAMILY, QUALIFIER2, data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier2"), data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier3"), data2), + new ExpectedResult(ROW1, FAMILY, Bytes.toBytes("testQualifier4"), data2)); + } + } + + @Test + public void testStreamScanWithRowOffsetAndRowFilter() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // we expect 1 because while both ROW and ROW1 have enough columns to exceed offset, we + // drop ROW1 due to filterRow in RegionScannerImpl. + assertNoBlocksWithRef(testCase.table, + testCase.baseScan.addFamily(FAMILY).setRowOffsetPerColumnFamily(1) + .setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW))), + testCase.cache, 1, new ExpectedResult(ROW, FAMILY, QUALIFIER2, data2)); + } + } + + @Test + public void testStreamScanWithSingleColumnValueExcludeFilter() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // We expect 5 blocks. Initially, all 7 storefiles are opened. But FAMILY is essential due to + // setFilterIfMissing, so only storefiles within that family are iterated unless a match is + // found. 4 Storefiles are for FAMILY, the remaining 3 are FAMILY2. The row we match on (ROW) + // doesn't have any cells in FAMILY2, so we don't end up iterating those storefiles. So the + // initial blocks remain open and untouched -- this counts for 3 of the 5. + // Multiple rows match FAMILY, two rows match FAMILY/QUALIFIER, but only one row also matches + // the value. That row has 2 blocks, one for each cell. That's the remaining 2. + // SingleColumnValueExcludeFilter does not return the matched column, so we're only returning + // 1 of those cells (thus 1 of those blocks) but we can't release the other one since + // checkpointing based on filterRow happens at the row level and the row itself is returned. + SingleColumnValueExcludeFilter filter = new SingleColumnValueExcludeFilter(FAMILY, QUALIFIER, + CompareOperator.EQUAL, new BinaryComparator(data2)); + filter.setFilterIfMissing(true); + assertNoBlocksWithRef(testCase.table, testCase.baseScan.setFilter(filter), testCase.cache, 5, + new ExpectedResult(ROW, FAMILY, QUALIFIER2, data2)); + } + } + + @Test + public void testStreamScanWithSingleColumnValueExcludeFilterJoinedHeap() throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // We expect 2 blocks. Unlike the last test, this one actually iterates the joined heap. So + // we end up exhausting more of the storefiles and being able to relase more blocks. We end up + // retaining 1 block for the match on the SingleColumnValueExcludeFilter, then 1 block from + // the joined heap on that same row (ROW3). We can eagerly release all of the other blocks. We + // only return 1 cell, but still retain 2 blocks checkpointing happens at the row boundary. + // Since at least 1 cell is returned for the row, we keep both blocks. + SingleColumnValueExcludeFilter filter = new SingleColumnValueExcludeFilter(FAMILY, QUALIFIER, + CompareOperator.EQUAL, new BinaryComparator(data)); + filter.setFilterIfMissing(true); + assertNoBlocksWithRef(testCase.table, testCase.baseScan.setFilter(filter), testCase.cache, 2, + new ExpectedResult(ROW3, FAMILY2, QUALIFIER2, data2)); + } + } + + @Test + public void testStreamScanWithSingleColumnValueExcludeFilterJoinedHeapColumnExcluded() + throws Throwable { + try (TestCase testCase = setupStreamScanTest()) { + // We expect 0 blocks. Another permutation of joined heap tests, this one uses a filter list + // to cause the joined heap to return nothing. ROW3 is matched by + // SingleColumnValueExcludeFilter. That filter excludes the matched column, and causes joined + // heap to be checked. The joined heap returns nothing, because FAMILY2 only contains + // QUALIFIER2 while we want QUALIFIER. The final result is an empty list, so we can release + // all blocks accumulated. + SingleColumnValueExcludeFilter filter = new SingleColumnValueExcludeFilter(FAMILY, QUALIFIER, + CompareOperator.EQUAL, new BinaryComparator(data)); + filter.setFilterIfMissing(true); + assertNoBlocksWithRef(testCase.table, + testCase.baseScan.setFilter(new FilterList(filter, + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER)))), + testCase.cache, 0); + } + } + + private static final class TestCase implements AutoCloseable { + private final Table table; + private final BlockCache cache; + private final Scan baseScan; + + private TestCase(Table table, BlockCache cache, Scan baseScan) { + this.table = table; + this.cache = cache; + this.baseScan = baseScan; + } + + @Override + public void close() throws Exception { + if (table != null) { + table.close(); + } + } + } + + private TestCase setupStreamScanTest() throws IOException, InterruptedException { + latch = new CountDownLatch(1); + final TableName tableName = TableName.valueOf(name.getMethodName()); + byte[][] fams = new byte[][] { FAMILY, FAMILY2 }; + Table table = + TEST_UTIL.createTable(tableName, fams, 1, 1024, CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); + HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); + BlockCache cache = setCacheProperties(region); + + // this writes below all create 12 cells total, with each cell being an entire block. + // so 12 blocks total as well. + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, data2); + put.addColumn(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + put = new Put(ROW3); + put.addColumn(FAMILY, QUALIFIER, data); + put.addColumn(FAMILY2, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // below creates 8 of the 12 cells + for (int i = 1; i < 5; i++) { + byte[] qualifier = Bytes.toBytes("testQualifier" + i); + put = new Put(ROW1); + put.addColumn(FAMILY, qualifier, data2); + put.addColumn(FAMILY2, qualifier, data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + return new TestCase(table, cache, new Scan().setReadType(Scan.ReadType.STREAM)); + } + + private void assertNoBlocksWithRef(Table table, Scan scan, BlockCache cache, int expectedBlocks, + ExpectedResult... expectedResults) throws Throwable { + ScanThread[] scanThreads = initiateScan(table, scan, expectedResults); + Thread.sleep(500); + + int noOfBlocksWithRef = countReferences(cache); + // the number of blocks referred + assertEquals(expectedBlocks, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (ScanThread thread : scanThreads) { + thread.joinAndRethrow(); + } + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false); + } + + private static final class ExpectedResult { + private final byte[] row; + private final byte[] family; + private final byte[] qualifier; + private final byte[] value; + + private ExpectedResult(byte[] row, byte[] family, byte[] qualifier, byte[] value) { + this.row = row; + this.family = family; + this.qualifier = qualifier; + this.value = value; + } + + public void assertEquals(Cell cell, int index) { + assertTrue(getAssertMessage(index, "row", row, cell), CellUtil.matchingRows(cell, row)); + assertTrue(getAssertMessage(index, "family", family, cell), + CellUtil.matchingFamily(cell, family)); + assertTrue(getAssertMessage(index, "qualifier", qualifier, cell), + CellUtil.matchingQualifier(cell, qualifier)); + assertTrue(getAssertMessage(index, "value", value, cell), + CellUtil.matchingValue(cell, value)); + } + + private String getAssertMessage(int index, String component, byte[] value, Cell cell) { + return "Expected cell " + (index + 1) + " to have " + component + " " + + Bytes.toStringBinary(value) + ": " + CellUtil.toString(cell, true); + } + } + + /** + * Counts how many blocks still have references, expecting each of those blocks to have 1 + * reference per NO_OF_THREADS.
+ * Additionally, manipulates the bucket cache to "corrupt" any cells still referencing blocks that + * should not have any references. It does this by evicting those blocks and re-caching them in a + * different order. This causes the content of the buffers backing those cells to be the wrong + * size/position/data. As a result, if any cells do still reference blocks that they shouldn't, + * the requests will fail loudly at the RPC serialization step, failing the tests. + */ + private int countReferences(BlockCache cache) { + BucketCache bucketCache; + if (cache instanceof CombinedBlockCache) { + bucketCache = (BucketCache) ((CombinedBlockCache) cache).getSecondLevelCache(); + } else if (cache instanceof BucketCache) { + bucketCache = (BucketCache) cache; + } else { + throw new RuntimeException("Expected bucket cache but got " + cache); + } + + Iterator iterator = bucketCache.iterator(); + int refCount; + int noOfBlocksWithRef = 0; + + Map unreferencedBlocks = new HashMap<>(); + List cacheKeys = new ArrayList<>(); + + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + refCount = bucketCache.getRpcRefCount(cacheKey); + + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + noOfBlocksWithRef++; + } else if (cacheKey.getBlockType().isData()) { + System.out.println("Corrupting block " + cacheKey); + HFileBlock block = (HFileBlock) bucketCache.getBlock(cacheKey, false, false, false); + + // Clone to heap, then release and evict the block. This will cause the bucket cache + // to reclaim memory that is currently referenced by these blocks. + HFileBlock clonedBlock = HFileBlock.deepCloneOnHeap(block); + block.release(); + bucketCache.evictBlock(cacheKey); + + cacheKeys.add(cacheKey); + unreferencedBlocks.put(cacheKey, clonedBlock); + } + } + + // Write the blocks back to the bucket cache in a random order so they end up + // in the wrong offsets. This causes the ByteBufferExtendedCell in our results to be + // referencing the wrong spots if we erroneously released blocks that matter for the scanner + Collections.shuffle(cacheKeys); + + for (BlockCacheKey cacheKey : cacheKeys) { + bucketCache.cacheBlock(cacheKey, unreferencedBlocks.get(cacheKey)); + } + + System.out.println("Done corrupting blocks"); + + return noOfBlocksWithRef; + } + private BlockCache setCacheProperties(HRegion region) { Iterator strItr = region.getStores().iterator(); BlockCache cache = null; @@ -810,8 +1084,7 @@ private BlockCache setCacheProperties(HRegion region) { } @Test - public void testParallelGetsAndScanWithWrappedRegionScanner() - throws IOException, InterruptedException { + public void testParallelGetsAndScanWithWrappedRegionScanner() throws Throwable { Table table = null; try { latch = new CountDownLatch(2); @@ -854,11 +1127,11 @@ public void testParallelGetsAndScanWithWrappedRegionScanner() // countdown the latch CustomInnerRegionObserver.getCdl().get().countDown(); for (GetThread thread : getThreads) { - thread.join(); + thread.joinAndRethrow(); } getLatch.countDown(); for (ScanThread thread : scanThreads) { - thread.join(); + thread.joinAndRethrow(); } } finally { if (table != null) { @@ -868,17 +1141,17 @@ public void testParallelGetsAndScanWithWrappedRegionScanner() } @Test - public void testScanWithCompaction() throws IOException, InterruptedException { + public void testScanWithCompaction() throws Throwable { testScanWithCompactionInternals(name.getMethodName(), false); } @Test - public void testReverseScanWithCompaction() throws IOException, InterruptedException { + public void testReverseScanWithCompaction() throws Throwable { testScanWithCompactionInternals(name.getMethodName(), true); } private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) - throws IOException, InterruptedException { + throws Throwable { Table table = null; try { latch = new CountDownLatch(1); @@ -974,7 +1247,7 @@ private void testScanWithCompactionInternals(String tableNameStr, boolean revers compactionLatch.countDown(); latch.countDown(); for (ScanThread thread : scanThreads) { - thread.join(); + thread.joinAndRethrow(); } // by this time all blocks should have been evicted iterator = cache.iterator(); @@ -993,8 +1266,7 @@ private void testScanWithCompactionInternals(String tableNameStr, boolean revers } @Test - public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() - throws IOException, InterruptedException { + public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() throws Throwable { // do flush and scan in parallel Table table = null; try { @@ -1100,7 +1372,7 @@ public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush() compactionLatch.countDown(); latch.countDown(); for (ScanThread thread : scanThreads) { - thread.join(); + thread.joinAndRethrow(); } // by this time all blocks should have been evicted iterator = cache.iterator(); @@ -1179,7 +1451,13 @@ public void testScanWithException() throws IOException, InterruptedException { // countdown the latch CustomInnerRegionObserver.getCdl().get().countDown(); for (ScanThread thread : scanThreads) { - thread.join(); + // expect it to fail + try { + thread.joinAndRethrow(); + fail("Expected failure"); + } catch (Throwable t) { + assertTrue(t instanceof UncheckedIOException); + } } iterator = cache.iterator(); usedBlocksFound = false; @@ -1241,11 +1519,10 @@ private void insertData(Table table) throws IOException { table.put(put); } - private ScanThread[] initiateScan(Table table, boolean reverse) - throws IOException, InterruptedException { + private ScanThread[] initiateScan(Table table, Scan scan, ExpectedResult... expectedResults) { ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; for (int i = 0; i < NO_OF_THREADS; i++) { - scanThreads[i] = new ScanThread(table, reverse); + scanThreads[i] = new ScanThread(table, scan, expectedResults); } for (ScanThread thread : scanThreads) { thread.start(); @@ -1253,6 +1530,15 @@ private ScanThread[] initiateScan(Table table, boolean reverse) return scanThreads; } + private ScanThread[] initiateScan(Table table, boolean reverse) + throws IOException, InterruptedException { + Scan scan = new Scan(); + if (reverse) { + scan.setReversed(true); + } + return initiateScan(table, scan); + } + private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs) throws IOException, InterruptedException { GetThread[] getThreads = new GetThread[NO_OF_THREADS]; @@ -1341,6 +1627,7 @@ private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean private static class MultiGetThread extends Thread { private final Table table; private final List gets = new ArrayList<>(); + private volatile Throwable throwable = null; public MultiGetThread(Table table) { this.table = table; @@ -1355,7 +1642,20 @@ public void run() { Result[] r = table.get(gets); assertTrue(Bytes.equals(r[0].getRow(), ROW)); assertTrue(Bytes.equals(r[1].getRow(), ROW1)); - } catch (IOException e) { + } catch (Throwable t) { + throwable = t; + } + } + + /** + * Joins the thread and re-throws any throwable that was thrown by the runnable method. The + * thread runnable itself has assertions in it. Without this rethrow, if those other assertions + * failed we would never actually know because they don't bubble up to the main thread. + */ + public void joinAndRethrow() throws Throwable { + join(); + if (throwable != null) { + throw throwable; } } } @@ -1365,6 +1665,8 @@ private static class GetThread extends Thread { private final boolean tracker; private final boolean multipleCFs; + private volatile Throwable throwable = null; + public GetThread(Table table, boolean tracker, boolean multipleCFs) { this.table = table; this.tracker = tracker; @@ -1375,8 +1677,20 @@ public GetThread(Table table, boolean tracker, boolean multipleCFs) { public void run() { try { initiateGet(table); - } catch (IOException e) { - // do nothing + } catch (Throwable t) { + throwable = t; + } + } + + /** + * Joins the thread and re-throws any throwable that was thrown by the runnable method. The + * thread runnable itself has assertions in it. Without this rethrow, if those other assertions + * failed we would never actually know because they don't bubble up to the main thread. + */ + public void joinAndRethrow() throws Throwable { + join(); + if (throwable != null) { + throw throwable; } } @@ -1426,35 +1740,66 @@ private void initiateGet(Table table) throws IOException { private static class ScanThread extends Thread { private final Table table; - private final boolean reverse; + private final Scan scan; + private final ExpectedResult[] expectedResults; + + private volatile Throwable throwable = null; - public ScanThread(Table table, boolean reverse) { + public ScanThread(Table table, Scan scan, ExpectedResult... expectedResults) { this.table = table; - this.reverse = reverse; + this.scan = scan; + this.expectedResults = expectedResults; } @Override public void run() { try { initiateScan(table); - } catch (IOException e) { - // do nothing + } catch (Throwable t) { + throwable = t; } } - private void initiateScan(Table table) throws IOException { - Scan scan = new Scan(); - if (reverse) { - scan.setReversed(true); + /** + * Joins the thread and re-throws any throwable that was thrown by the runnable method. The + * thread runnable itself has assertions in it. Without this rethrow, if those other assertions + * failed we would never actually know because they don't bubble up to the main thread. + */ + public void joinAndRethrow() throws Throwable { + join(); + if (throwable != null) { + throw throwable; } + } + + private void initiateScan(Table table) throws IOException { CustomInnerRegionObserver.getCdl().set(latch); ResultScanner resScanner = table.getScanner(scan); - int i = (reverse ? ROWS.length - 1 : 0); + if (expectedResults != null && expectedResults.length > 0) { + assertExpectedRows(resScanner); + } else { + assertRowsMatch(resScanner); + } + } + + private void assertExpectedRows(ResultScanner scanner) { + int i = 0; + for (Result result : scanner) { + for (Cell cell : result.listCells()) { + expectedResults[i].assertEquals(cell, i++); + } + } + // verify we covered the full expected results + assertEquals(i, expectedResults.length); + } + + private void assertRowsMatch(ResultScanner scanner) { + int i = (scan.isReversed() ? ROWS.length - 1 : 0); boolean resultFound = false; - for (Result result : resScanner) { + for (Result result : scanner) { resultFound = true; - System.out.println(result); - if (!reverse) { + System.out.println("result: " + result); + if (!scan.isReversed()) { assertTrue(Bytes.equals(result.getRow(), ROWS[i])); i++; } else { @@ -1462,7 +1807,7 @@ private void initiateScan(Table table) throws IOException { i--; } } - assertTrue(resultFound); + assertEquals(!scan.hasFilter(), resultFound); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java index 024fb04c6873..3ef3977415f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java @@ -85,7 +85,7 @@ public void doTest(Configuration conf, Path path, Compression.Algorithm compress HFileScanner scanner = null; HFile.Reader reader = HFile.createReader(FS, path, cacheConf, true, conf); try { - scanner = reader.getScanner(conf, false, false); + scanner = reader.getScanner(conf, false, false, false); assertTrue("Initial seekTo failed", scanner.seekTo()); do { Cell kv = scanner.getCell(); @@ -105,7 +105,7 @@ public void doTest(Configuration conf, Path path, Compression.Algorithm compress LOG.info("Random seeking with " + fileContext); reader = HFile.createReader(FS, path, cacheConf, true, conf); try { - scanner = reader.getScanner(conf, false, true); + scanner = reader.getScanner(conf, false, true, false); assertTrue("Initial seekTo failed", scanner.seekTo()); for (i = 0; i < 100; i++) { KeyValue kv = testKvs.get(rand.nextInt(testKvs.size())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 6a03cfcad3e7..87b2397b052e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -270,7 +270,7 @@ private void readStoreFile(boolean useTags) throws IOException { .withIncludesTags(useTags).build(); final boolean cacheBlocks = false; final boolean pread = false; - HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread); + HFileScanner scanner = reader.getScanner(conf, cacheBlocks, pread, false); assertTrue(testDescription, scanner.seekTo()); long offset = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index c1114770786e..c57a0230ff1f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -655,7 +655,7 @@ void basicWithSomeCodec(String codec, boolean useTags) throws IOException { System.out.println(cacheConf.toString()); // Load up the index. // Get a scanner that caches and that does not use pread. - HFileScanner scanner = reader.getScanner(conf, true, false); + HFileScanner scanner = reader.getScanner(conf, true, false, false); // Align scanner at start of the file. scanner.seekTo(); readAllRecords(scanner); @@ -742,7 +742,7 @@ private void metablocks(final String compress) throws Exception { ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, mFile).build(); Reader reader = createReaderFromStream(context, cacheConf, conf); // No data -- this should return false. - assertFalse(reader.getScanner(conf, false, false).seekTo()); + assertFalse(reader.getScanner(conf, false, false, false).seekTo()); someReadingWithMetaBlock(reader); fs.delete(mFile, true); reader.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 5b8cfadfde78..5577a90de3dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -612,7 +612,7 @@ public void testHFileWriterAndReader() throws IOException { LOG.info("Last key: " + Bytes.toStringBinary(keys[NUM_KV - 1])); for (boolean pread : new boolean[] { false, true }) { - HFileScanner scanner = reader.getScanner(conf, true, pread); + HFileScanner scanner = reader.getScanner(conf, true, pread, false); for (int i = 0; i < NUM_KV; ++i) { checkSeekTo(keys, scanner, i); checkKeyValue("i=" + i, keys[i], values[i], @@ -731,7 +731,7 @@ public void testIntermediateLevelIndicesWithLargeKeys(int minNumEntries) throws HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, true, conf); // Scanner doesn't do Cells yet. Fix. - HFileScanner scanner = reader.getScanner(conf, true, true); + HFileScanner scanner = reader.getScanner(conf, true, true, false); for (int i = 0; i < keys.size(); ++i) { scanner.seekTo(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) .setRow(keys.get(i)).setFamily(HConstants.EMPTY_BYTE_ARRAY) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index f3711428ce53..c5a63180646b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -241,7 +241,7 @@ public void testHFileEncryption() throws Exception { try { FixedFileTrailer trailer = reader.getTrailer(); assertNotNull(trailer.getEncryptionKey()); - scanner = reader.getScanner(conf, false, false); + scanner = reader.getScanner(conf, false, false, false); assertTrue("Initial seekTo failed", scanner.seekTo()); do { Cell kv = scanner.getCell(); @@ -261,7 +261,7 @@ public void testHFileEncryption() throws Exception { Random rand = ThreadLocalRandom.current(); reader = HFile.createReader(fs, path, cacheConf, true, conf); try { - scanner = reader.getScanner(conf, false, true); + scanner = reader.getScanner(conf, false, true, false); assertTrue("Initial seekTo failed", scanner.seekTo()); for (i = 0; i < 100; i++) { KeyValue kv = testKvs.get(rand.nextInt(testKvs.size())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java index f031a96d15f4..08d9ebebc786 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java @@ -91,7 +91,7 @@ public void testWriteHFile() throws Exception { HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, true, conf); // Scanner doesn't do Cells yet. Fix. - HFileScanner scanner = reader.getScanner(conf, true, true); + HFileScanner scanner = reader.getScanner(conf, true, true, false); for (int i = 0; i < keys.size(); ++i) { scanner.seekTo(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) .setRow(keys.get(i)).setFamily(HConstants.EMPTY_BYTE_ARRAY) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java index f16008f29db1..671e6c593944 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -28,7 +32,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -59,10 +65,9 @@ static String toRowStr(Cell c) { return Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); } - Path makeNewFile() throws IOException { + Path makeNewFile(int blocksize) throws IOException { Path ncTFile = new Path(TEST_UTIL.getDataTestDir(), "basic.hfile"); FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile); - int blocksize = toKV("a").getLength() * 3; HFileContext context = new HFileContextBuilder().withBlockSize(blocksize).withIncludesTags(true).build(); Configuration conf = TEST_UTIL.getConfiguration(); @@ -81,9 +86,166 @@ Path makeNewFile() throws IOException { return ncTFile; } + @SuppressWarnings("checkstyle:EmptyBlock") + @Test + public void testRetainBlock() throws IOException { + // use very small blocksize to force every cell to be a different block. this gives us + // more room to work below in testing checkpointing between blocks. + Path p = makeNewFile(1); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); + + ByteBuffAllocator allocator = ByteBuffAllocator.create(conf, true); + CacheConfig cacheConfig = new CacheConfig(conf, null, null, allocator); + HFile.Reader reader = HFile.createReader(fs, p, cacheConfig, true, conf); + + int blocksRetained; + try (HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, false)) { + scanner.seekTo(); + + // checkpointing is disabled, so we should retain all of these blocks + // no need to call retainBlock + while (scanner.next()) { + // pass + } + + blocksRetained = scanner.prevBlocks.size(); + assertTrue(blocksRetained > 0); + } + + try (HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, true)) { + scanner.seekTo(); + + // checkpointing is enabled, but we are not calling retainBlock. so we expect no blocks + // to be retained bleow + + while (scanner.next()) { + // pass + } + + assertEquals(0, scanner.prevBlocks.size()); + } + + try (HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, true)) { + scanner.seekTo(); + + // checkpointing is enabled, and we will call retainBlock on just the first one. we + // expect just 1 block to be retained + scanner.retainBlock(); + + while (scanner.next()) { + // pass + } + + // expect the same number of blocks as the first time with no checkpoint + assertEquals(1, scanner.prevBlocks.size()); + } + + try (HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, true)) { + scanner.seekTo(); + + // checkpointing is enabled, and we will call retainBlock on every next. + // we expect the same number of blocks to be retained as in the original case where + // checkpointing was disabled above. + scanner.retainBlock(); + + while (scanner.next()) { + scanner.retainBlock(); + } + + // expect the same number of blocks as the first time with no checkpoint + assertEquals(blocksRetained, scanner.prevBlocks.size()); + } + } + + @Test + public void testCheckpoint() throws IOException { + // use very small blocksize to force every cell to be a different block. this gives us + // more room to work below in testing checkpointing between blocks. + Path p = makeNewFile(1); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); + + ByteBuffAllocator allocator = ByteBuffAllocator.create(conf, true); + CacheConfig cacheConfig = new CacheConfig(conf, null, null, allocator); + HFile.Reader reader = HFile.createReader(fs, p, cacheConfig, true, conf); + + HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, true); + + // we do an initial checkpoint. but we'll override it below, which is to prove that + // checkpoints can supersede each other (by updating index). if that didn't work, we'd see + // the first prevBlock entry get released early which would fail the assertions below. + scanner.checkpoint(Shipper.State.START); + + scanner.seekTo(); + + boolean started = false; + boolean finished = false; + + // our goal is to prove that we can clear out a slice of prevBlocks + // skip the first prevBlock entry by calling checkpoint START at that point + // once we get another prevBlocks entry we finish up with FILTERED + while (scanner.next()) { + // retainBlock on all of these so we can test releasing by checkpoint. + scanner.retainBlock(); + if (scanner.prevBlocks.size() > 0) { + if (started) { + finished = true; + scanner.checkpoint(Shipper.State.FILTERED); + break; + } else { + started = true; + scanner.checkpoint(Shipper.State.START); + } + } + } + + assertTrue(started); + assertTrue(finished); + assertNotEquals(0, allocator.getUsedBufferCount()); + + // checkpointing doesn't clear out prevBlocks, just releases and sets them all to null + // make sure there are still entries + assertNotEquals(0, scanner.prevBlocks.size()); + + // we expect to find 1 block at the head of the list which is non-null. this is the one we + // skipped above. after that any others should be null + boolean foundNonNull = false; + for (HFileBlock block : scanner.prevBlocks) { + if (!foundNonNull) { + assertNotNull(block); + foundNonNull = true; + } else { + assertNull(block); + } + } + + // we loaded at least 3 blocks -- 1 was skipped (still in prevBlocks) and 1 is held in curBlock. + // so skip two buffers in our check here + assertEquals(allocator.getUsedBufferCount() - 2, allocator.getFreeBufferCount()); + + scanner.shipped(); + + // shipped cleans up prevBlocks and releases anything left over + assertTrue(scanner.prevBlocks.isEmpty()); + // now just curBlock holds a buffer + assertEquals(allocator.getUsedBufferCount() - 1, allocator.getFreeBufferCount()); + + // close and validate that all buffers are returned + scanner.close(); + assertEquals(allocator.getUsedBufferCount(), allocator.getFreeBufferCount()); + } + @Test public void testSeekBefore() throws Exception { - Path p = makeNewFile(); + Path p = makeNewFile(toKV("a").getLength() * 3); FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); int[] bucketSizes = { 512, 2048, 4096, 64 * 1024, 128 * 1024 }; @@ -93,7 +255,7 @@ public void testSeekBefore() throws Exception { HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf, bucketcache), true, conf); // warm cache - HFileScanner scanner = reader.getScanner(conf, true, true); + HFileScanner scanner = reader.getScanner(conf, true, true, false); scanner.seekTo(toKV("i")); assertEquals("i", toRowStr(scanner.getCell())); scanner.close(); @@ -103,7 +265,7 @@ public void testSeekBefore() throws Exception { } // reopen again. - scanner = reader.getScanner(conf, true, true); + scanner = reader.getScanner(conf, true, true, false); scanner.seekTo(toKV("i")); assertEquals("i", toRowStr(scanner.getCell())); scanner.seekBefore(toKV("i")); @@ -118,7 +280,7 @@ public void testSeekBefore() throws Exception { } // case 2 - scanner = reader.getScanner(conf, true, true); + scanner = reader.getScanner(conf, true, true, false); scanner.seekTo(toKV("i")); assertEquals("i", toRowStr(scanner.getCell())); scanner.seekBefore(toKV("c")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java index 8ee46c17c4ad..e8d094bd30f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java @@ -196,7 +196,7 @@ private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding) // We've build a HFile tree with index = 16. Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); - HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); + HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false, false); HFileBlock block1 = reader.getDataBlockIndexReader() .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) .getHFileBlock(); @@ -279,7 +279,7 @@ public void testSeekBefore() throws Exception { // We've build a HFile tree with index = 16. Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); - HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); + HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false, false); HFileBlock block1 = reader.getDataBlockIndexReader() .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) .getHFileBlock(); @@ -408,7 +408,7 @@ public void testWithLruBlockCache() throws Exception { // We've build a HFile tree with index = 16. Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels()); - HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false); + HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(conf, true, true, false, false); HFileBlock block1 = reader.getDataBlockIndexReader() .loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE, reader) .getHFileBlock(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java index 3018d321480d..58732f2cd77d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java @@ -176,7 +176,7 @@ public void seekTFile() throws IOException { Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf); KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(), ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen); - HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD); + HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD, false); BytesWritable key = new BytesWritable(); timer.reset(); timer.start(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java index c757798a3940..bf397b7ee2a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java @@ -109,7 +109,7 @@ private void testReseekToInternals(TagUsage tagUsage) throws IOException { HFile.Reader reader = HFile.createReader(TEST_UTIL.getTestFileSystem(), ncTFile, cacheConf, true, TEST_UTIL.getConfiguration()); - HFileScanner scanner = reader.getScanner(TEST_UTIL.getConfiguration(), false, true); + HFileScanner scanner = reader.getScanner(TEST_UTIL.getConfiguration(), false, true, false); scanner.seekTo(); for (int i = 0; i < keyList.size(); i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java index 731e7ab79ac5..35716f06f30c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekBeforeWithInlineBlocks.java @@ -142,7 +142,7 @@ public void testMultiIndexLevelRandomHFileWithBlooms() throws IOException { // Check that we can seekBefore in either direction and with both pread // enabled and disabled for (boolean pread : new boolean[] { false, true }) { - HFileScanner scanner = reader.getScanner(conf, true, pread); + HFileScanner scanner = reader.getScanner(conf, true, pread, false); checkNoSeekBefore(cells, scanner, 0); for (int i = 1; i < NUM_KV; i++) { checkSeekBefore(cells, scanner, i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java index bf26b019e2a1..0ebad62b444a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java @@ -149,7 +149,7 @@ protected void testSeekBeforeInternals(TagUsage tagUsage) throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, true); + HFileScanner scanner = reader.getScanner(conf, false, true, false); assertFalse(scanner.seekBefore(toKV("a", tagUsage))); assertFalse(scanner.seekBefore(toKV("c", tagUsage))); @@ -207,7 +207,7 @@ protected void testSeekBeforeWithReSeekToInternals(TagUsage tagUsage) throws IOE FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, true); + HFileScanner scanner = reader.getScanner(conf, false, true, false); assertFalse(scanner.seekBefore(toKV("a", tagUsage))); assertFalse(scanner.seekBefore(toKV("b", tagUsage))); assertFalse(scanner.seekBefore(toKV("c", tagUsage))); @@ -301,7 +301,7 @@ protected void testSeekToInternals(TagUsage tagUsage) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); assertEquals(2, reader.getDataBlockIndexReader().getRootBlockCount()); - HFileScanner scanner = reader.getScanner(conf, false, true); + HFileScanner scanner = reader.getScanner(conf, false, true, false); // lies before the start of the file. assertEquals(-1, scanner.seekTo(toKV("a", tagUsage))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java index ec9de92e9f25..a0602f1e9575 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java @@ -587,7 +587,7 @@ public static void testCodecs(Configuration conf, int kvLimit, String hfilePath, StoreFileReader reader = hsf.getReader(); reader.loadFileInfo(); KeyValueScanner scanner = - reader.getStoreFileScanner(true, true, false, hsf.getMaxMemStoreTS(), 0, false); + reader.getStoreFileScanner(true, true, false, hsf.getMaxMemStoreTS(), 0, false, false); USE_TAG = reader.getHFileReader().getFileContext().isIncludesTags(); // run the utilities DataBlockEncodingTool comp = new DataBlockEncodingTool(conf, compressionName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java index 373e138a764b..bd795d9c7d07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -34,6 +34,16 @@ public void shipped() throws IOException { delegate.shipped(); } + @Override + public void checkpoint(State state) { + delegate.checkpoint(state); + } + + @Override + public void retainBlock() { + delegate.retainBlock(); + } + @Override public Cell peek() { return delegate.peek(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java index c274d3c3129a..c2736f14bc74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java @@ -62,7 +62,7 @@ private List prepareListOfTestSeeks(Path path) throws IOException { cacheConf, BloomType.NONE, true); storeFile.initReader(); StoreFileReader reader = storeFile.getReader(); - StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false); + StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false, false); Cell current; scanner.seek(KeyValue.LOWESTKEY); @@ -93,7 +93,7 @@ private void runTest(Path path, DataBlockEncoding blockEncoding, List seek long totalSize = 0; StoreFileReader reader = storeFile.getReader(); - StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false); + StoreFileScanner scanner = reader.getStoreFileScanner(true, false, false, 0, 0, false, false); long startReadingTime = System.nanoTime(); Cell current; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java index 9a3c5d2e218b..149785df7fb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; @@ -30,14 +31,35 @@ * file scanner. */ public class KeyValueScanFixture extends CollectionBackedScanner { + private final AtomicInteger retainBlockCount; + public KeyValueScanFixture(CellComparator comparator, Cell... cells) { + this(comparator, null, cells); + } + + public KeyValueScanFixture(CellComparator comparator, AtomicInteger retainBlockCount, + Cell... cells) { super(comparator, cells); + this.retainBlockCount = retainBlockCount; + } + + @Override + public void retainBlock() { + if (retainBlockCount != null) { + retainBlockCount.incrementAndGet(); + } + super.retainBlock(); } public static List scanFixture(KeyValue[]... kvArrays) { + return scanFixture(null, kvArrays); + } + + public static List scanFixture(AtomicInteger retainBlockCount, + KeyValue[]... kvArrays) { ArrayList scanners = new ArrayList<>(); for (KeyValue[] kvs : kvArrays) { - scanners.add(new KeyValueScanFixture(CellComparator.getInstance(), kvs)); + scanners.add(new KeyValueScanFixture(CellComparator.getInstance(), retainBlockCount, kvs)); } return scanners; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockHStoreFile.java index bd26c5474e08..49f3825b8fba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockHStoreFile.java @@ -152,17 +152,17 @@ public void initReader() throws IOException { @Override public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn) { + boolean canOptimizeForNonNullColumn, boolean checkpointingEnabled) { return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, - canOptimizeForNonNullColumn); + canOptimizeForNonNullColumn, false); } @Override public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) - throws IOException { + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn, + boolean checkpointingEnabled) throws IOException { return getReader().getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, - canOptimizeForNonNullColumn); + canOptimizeForNonNullColumn, false); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 037952035fdf..83d87e5c8e9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -233,7 +233,7 @@ private void readStoreFile(Path path) throws IOException { HFile.Reader reader = sf.getReader().getHFileReader(); try { // Open a scanner with (on read) caching disabled - HFileScanner scanner = reader.getScanner(conf, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false); assertTrue(testDescription, scanner.seekTo()); // Cribbed from io.hfile.TestCacheOnWrite long offset = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java index 52de7b326488..523caa8770cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java @@ -201,7 +201,7 @@ private void readStoreFile(int t, BloomType bt, List kvs, Path sfPath) sf.initReader(); StoreFileReader r = sf.getReader(); final boolean pread = true; // does not really matter - StoreFileScanner scanner = r.getStoreFileScanner(true, pread, false, 0, 0, false); + StoreFileScanner scanner = r.getStoreFileScanner(true, pread, false, 0, 0, false, false); { // Test for false negatives (not allowed). diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 897152f8b6dd..a04021d91a24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -133,7 +133,7 @@ public void testCanSplitJustAfterASplit() throws Exception { .filter(s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName())) .forEach(sf -> { StoreFileReader reader = ((HStoreFile) sf).getReader(); - reader.getStoreFileScanner(true, false, false, 0, 0, false); + reader.getStoreFileScanner(true, false, false, 0, 0, false, false); scanner.put(r.getRegionInfo().getEncodedName(), reader); LOG.info("Got reference to file = " + sf.getPath() + ",for region = " + r.getRegionInfo().getEncodedName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 2e999dfaa455..aa2851f20d8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1799,10 +1799,10 @@ private static class MyStore extends HStore { public List getScanners(List files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean includeMemstoreScanner, boolean checkpointingEnabled) throws IOException { hook.getScanners(this); return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, - stopRow, false, readPt, includeMemstoreScanner); + stopRow, false, readPt, includeMemstoreScanner, checkpointingEnabled); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index a0c23af5ef0d..5d94520050a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -578,7 +578,7 @@ private void checkHalfHFile(final HRegionFileSystem regionFs, final HStoreFile f private static StoreFileScanner getStoreFileScanner(StoreFileReader reader, boolean cacheBlocks, boolean pread) { - return reader.getStoreFileScanner(cacheBlocks, pread, false, 0, 0, false); + return reader.getStoreFileScanner(cacheBlocks, pread, false, 0, 0, false, false); } private static final String localFormatter = "%010d"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java index 27da15d59150..ad1cba3c0be1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowPrefixBloomFilter.java @@ -131,7 +131,7 @@ public void tearDown() throws Exception { } private static StoreFileScanner getStoreFileScanner(StoreFileReader reader) { - return reader.getStoreFileScanner(false, false, false, 0, 0, false); + return reader.getStoreFileScanner(false, false, false, 0, 0, false, false); } private void writeStoreFile(final Path f, BloomType bt, int expKeys) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java index 67671fe12fef..9525e6d65fd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileScannerWithTagCompression.java @@ -88,7 +88,7 @@ public void testReseek() throws Exception { storeFileInfo.initHFileInfo(context); StoreFileReader reader = storeFileInfo.createReader(context, cacheConf); storeFileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader()); - StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false); + StoreFileScanner s = reader.getStoreFileScanner(false, false, false, 0, 0, false, false); try { // Now do reseek with empty KV to position to the beginning of the file KeyValue k = KeyValueUtil.createFirstOnRow(Bytes.toBytes("k2")); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 91717060d998..53d3eff88122 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -1092,4 +1092,30 @@ public void close() { assertFalse(memStoreScanner.closed); } } + + /** + * Test that we call heap.retainBlock() when adding a cell to the return list + */ + @Test + public void testRetainBlock() throws IOException { + AtomicInteger retainBlockCount = new AtomicInteger(); + List scanners = scanFixture(retainBlockCount, kvs); + try (StoreScanner scan = new StoreScanner(new Scan(), scanInfo, getCols("a", "d"), scanners)) { + List results = new ArrayList<>(); + assertEquals(true, scan.next(results)); + assertEquals(2, results.size()); + assertEquals(2, retainBlockCount.getAndSet(0)); + assertEquals(kvs[0], results.get(0)); + assertEquals(kvs[3], results.get(1)); + results.clear(); + + assertEquals(true, scan.next(results)); + assertEquals(1, results.size()); + assertEquals(1, retainBlockCount.get()); + assertEquals(kvs[kvs.length - 1], results.get(0)); + + results.clear(); + assertEquals(false, scan.next(results)); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java index 9c03f368cb81..faaa0b4fd983 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java @@ -65,7 +65,7 @@ public static HStoreFile createDummyStoreFile(long maxSequenceId) throws Excepti when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), - anyBoolean())).thenReturn(mock(StoreFileScanner.class)); + anyBoolean(), anyBoolean())).thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.getMaxSequenceId()).thenReturn(maxSequenceId); return sf; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index c4f98f4d94ad..aa1789cceabc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -874,7 +874,7 @@ private static HStoreFile createFile(long size) throws Exception { when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong(), - anyBoolean())).thenReturn(mock(StoreFileScanner.class)); + anyBoolean(), anyBoolean())).thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); when(r.getMaxTimestamp()).thenReturn(TimeRange.INITIAL_MAX_TIMESTAMP); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 591d807c0da4..16b4abc20dfe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -612,7 +612,7 @@ private int verifyHFile(Path p) throws IOException { Configuration conf = util.getConfiguration(); HFile.Reader reader = HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); - HFileScanner scanner = reader.getScanner(conf, false, false); + HFileScanner scanner = reader.getScanner(conf, false, false, false); scanner.seekTo(); int count = 0; do {