Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
.build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
long shippedCallSizeLimit = Math.min(compactScannerSizeLimit,
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize());

ExtendedCell mobCell = null;
List<String> committedMobWriterFileNames = new ArrayList<>();
Expand Down Expand Up @@ -557,6 +557,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener) writer).beforeShipped();
kvs.shipped();
scannerContext.clearBlockSizeProgress();
bytesWrittenProgressForShippedCall = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
Expand Down Expand Up @@ -266,6 +267,17 @@ void clearProgress() {
progress.setFields(0, 0, 0, getBlockSizeProgress());
}

/**
* Clear away the block size progress. Mainly used in compaction, as we will use a single
* ScannerContext across all the compaction lifetime, and we will call Shipper.shipped to clear
* the block reference, so it is safe to clear the block size progress in compaction.
*/
@RestrictedApi(explanation = "Should only be called in Compactor", link = "",
allowedOnPath = ".*/org/apache/hadoop/hbase/.*/*Compactor.java|.*/src/test/.*")
public void clearBlockSizeProgress() {
progress.setBlockSize(0);
}

/**
* Note that this is not a typical setter. This setter returns the {@link NextState} that was
* passed in so that methods can be invoked against the new state. Furthermore, this pattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,14 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel

throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
long shippedCallSizeLimit =
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
// when hitting the block size limit, i.e, compactScannerSizeLimit, we need to reset the block
// size progress otherwise the scanner will only return 1 cell in the next method because all
// the block scanned are still referenced, so we need to call shipped to release them.
// Usually compactScannerSizeLimit will be much greater than blockSize * fileSize, but anyway,
// let's use Math.min for safety.
// See HBASE-29742
long shippedCallSizeLimit = Math.min(compactScannerSizeLimit,
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize());
try {
do {
// InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but
Expand Down Expand Up @@ -488,25 +494,32 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
}
}
writer.appendAll(cells);
if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
// ShipperListener will do a clone of the last cells it refer, so need to set back
// sequence id before ShipperListener.beforeShipped
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
if (bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (shipper != null) {
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
// ShipperListener will do a clone of the last cells it refer, so need to set back
// sequence id before ShipperListener.beforeShipped
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
}
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener) writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by
// we are doing the similar thing. In between the compaction (after every N cells
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
// may clear prevBlocks list.
shipper.shipped();
}
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener) writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by
// we are doing the similar thing. In between the compaction (after every N cells
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
// may clear prevBlocks list.
shipper.shipped();
// clear the block progress in ScannerContext, so we can reuse it. In normal rpc call,
// ScannerContext will be dropped after shipping, so we do not need to clear the block
// progress there
scannerContext.clearBlockSizeProgress();
bytesWrittenProgressForShippedCall = 0;
}

if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener) writer).beforeShipped();
kvs.shipped();
scannerContext.clearBlockSizeProgress();
bytesWrittenProgressForShippedCall = 0;
}
}
Expand Down