Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -745,15 +745,15 @@ public boolean next(List<? super ExtendedCell> outResult, ScannerContext scanner
}
matcher.clearCurrentRow();
seekOrSkipToNextRow(cell);
NextState stateAfterSeekNextRow = needToReturn(outResult);
NextState stateAfterSeekNextRow = needToReturn();
if (stateAfterSeekNextRow != null) {
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
}
break;

case SEEK_NEXT_COL:
seekOrSkipToNextColumn(cell);
NextState stateAfterSeekNextColumn = needToReturn(outResult);
NextState stateAfterSeekNextColumn = needToReturn();
if (stateAfterSeekNextColumn != null) {
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
}
Expand All @@ -771,7 +771,7 @@ public boolean next(List<? super ExtendedCell> outResult, ScannerContext scanner
((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
) {
seekAsDirection(nextKV);
NextState stateAfterSeekByHint = needToReturn(outResult);
NextState stateAfterSeekByHint = needToReturn();
if (stateAfterSeekByHint != null) {
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
}
Expand Down Expand Up @@ -828,11 +828,10 @@ private void updateMetricsStore(boolean memstoreRead) {
* memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell
* is changed, we should return the current cells. Otherwise, we may return the cells across
* different rows.
* @param outResult the cells which are visible for user scan
* @return null is the top cell doesn't change. Otherwise, the NextState to return
*/
private NextState needToReturn(List<? super ExtendedCell> outResult) {
if (!outResult.isEmpty() && topChanged) {
private NextState needToReturn() {
if (topChanged) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The !outResult.isEmpty is there from the beginning. So would you mind explaining about why here we need to remove this check?

Copy link
Contributor Author

@mwkang mwkang Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s assume the data stored in HBase is as follows:

(1) row0/family2:qf1/DeleteColumn
(2) row0/family2:qf1/Put/value2

(3) row1/family1:qf1/Put/value2
(4) row1/family2:qf1/Put/value2

Now, suppose a user starts scanning from row0.

In RegionScannerImpl#nextInternal, when the current cell’s row is row0, after reading entry (2) in StoreScanner, if a flush happens, a topChanged occurs (Storescanner.peek() is changed where before ...), and the value of StoreScanner’s heap.peek() becomes (4) row1/family2:qf1/Put/value2.

Since it is the next row, StoreScanner should return at that point — but it fails to recognize that it has moved to the next row because outResult is empty, and ends up including the new row in the result.

Then, in RegionScannerImpl, it sees that nextKv’s row is different from the current cell’s row, and returns (since it has moved to a different row).

As a result, even though (3) and (4) belong to the same row (row1), they are returned to the client as if they were from different rows.

(3) and (4) should be combined into a single Result, but they end up being returned as separate Result instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the root cause of the issue described in this thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I understand the problem.

Let's see how to reproduce the problem in UT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing the code!
let me know if there's anything I should take care of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm that there is a behavior change, but if we can still get 3 cells at once, it does not make client get partial rows?

Copy link
Contributor Author

@mwkang mwkang Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that at the StoreScanner level, the behavior doesn't seem immediately problematic.
However, the issue becomes visible on the client side when multiple column families are involved — the row ends up getting split.

This happens because the result from row2 is included in the first next() call, which is supposed to return only row1.
As a result, the client receives fragmented results for row2.

To help illustrate the issue, I’ve added a reproducible test case.
If you run TestTopChanged.testTopChanged on the feature/bug-demo-HBASE-29254 branch, you can observe the behavior directly.

Here’s what you’ll see:

  • With the !outResult.isEmpty() condition in needToReturn (current behavior):
    RESULT: rowkey = row2, cf = cf2  
    RESULT: rowkey = row2, cf = cf1  
    
  • Without the !outResult.isEmpty() condition (modified behavior):
    RESULT: rowkey = row2, cf = cf1, cf2  
    

It would be great if you could take a look — I know it's a bit of a hassle, but your feedback would be really helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are no data for row1 in cf2? What is the current way to prevert loading row2?

I mean we have delete row1 in cf1, and row2 in cf1 and cf2, how can we make sure that we do not return row2 when calling cf2.next at the first time?

Copy link
Contributor Author

@mwkang mwkang Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn’t fully understand the question.

If there is no data for cf2 in row1, loading of row2 is prevented by moreCellsInRow in RegionScannerImpl.
This is because:

  • currentRowCell: row1/cf1:cq1/1745317264666/Put/vlen=10/seqid=4
  • nextKv: row2/cf1:cq1/1745317264674/Put/vlen=10/seqid=5

By "What if there are no data for row1 in cf2? What is the current way to prevent loading row2?", are you asking about the current mechanism that prevents row2 from being included in the result while the StoreScanner is reading row1 and topChanged has not occurred?

This is handled in ScanQueryMatcher.preCheck using the rowComparator.

next, StoreScanner (org.apache.hadoop.hbase.regionserver)
  match, NormalUserScanQueryMatcher (org.apache.hadoop.hbase.regionserver.querymatcher)
    preCheck, ScanQueryMatcher (org.apache.hadoop.hbase.regionserver.querymatcher)

When the currentRow is row1/cf1:cq1/1745314001315/DeleteColumn/vlen=0/seqid=5 and the cell is row2/cf1:cq1/1745314001320/Put/vlen=10/seqid=7, the MatchCode becomes DONE.

However, when topChanged occurs (which is the problematic condition), the currentRow becomes row2/cf2:cq1/1745314108845/Put/vlen=10/seqid=7 and the cell is row2/cf2:cq1/1745314108845/Put/vlen=10/seqid=7.

In ScanQueryMatcher.setToNewRow, the currentRow is set to row2/cf2:cq1/1745314108845/Put/vlen=10/seqid=7.

next, StoreScanner (org.apache.hadoop.hbase.regionserver)
  seekOrSkipToNextColumn, StoreScanner (org.apache.hadoop.hbase.regionserver)
    seekAsDirection, StoreScanner (org.apache.hadoop.hbase.regionserver)
      reseek, StoreScanner (org.apache.hadoop.hbase.regionserver)
        reopenAfterFlush, StoreScanner (org.apache.hadoop.hbase.regionserver)
          resetQueryMatcher, StoreScanner (org.apache.hadoop.hbase.regionserver)
            setToNewRow, ScanQueryMatcher (org.apache.hadoop.hbase.regionserver.querymatcher)

In this condition, "I mean we have delete row1 in cf1, and row2 in cf1 and cf2, how can we make sure that we do not return row2 when calling cf2.next at the first time?", it is determined as a different row by ScanQueryMatcher.preCheck.

When using a currentRow that hasn't been flushed to disk yet, it may change after a flush when the scanner is reopened (topChanged).
If outResult contains elements, there is no issue. However, if outResult is empty, the return does not occur, and the loop iterates again, which causes a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for the detailed review and your follow-up questions — I really appreciate it.
I feel that my earlier explanation may not have been clear enough.

To clarify: the main issue is not with partial results, but rather with a single row being split into two separate results.
This split happens because after a topChanged event, if outResult is empty, StoreScanner doesn't correctly detect that the row has changed.

(Upon reflecting on our previous discussion, I realized that my initial explanation may not have been as clear as it should have been. When I first encountered this issue, I thought it was related to partial results, which is why I initially mentioned that in thread. However, after further debugging, the core issue is actually that a single row is being split into two separate Results. I apologize if my earlier explanation was confusing, and I'm sorry for not making this clearer sooner.)

I’ve already added a reproducible test case (TestTopChanged.testTopChanged in the feature/bug-demo-HBASE-29254 branch) to demonstrate this behavior.

Please let me know if there’s anything else you would like me to work on to help move this forward.

Thanks again for your time and support!

return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntBinaryOperator;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -1277,6 +1278,12 @@ private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequ
.setValue(value).setSequenceId(sequenceId).build();
}

private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) {
return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row)
.setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn)
.setSequenceId(sequenceId).build();
}

@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
Expand Down Expand Up @@ -1421,6 +1428,74 @@ public long getSmallestReadPoint(HStore store) {
}
}

@Test
public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException {
final Configuration conf = HBaseConfiguration.create();

byte[] r1 = Bytes.toBytes("row1");
byte[] r2 = Bytes.toBytes("row2");

byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");

final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
final long ts = EnvironmentEdgeManager.currentTime();
final long seqId = 100;

init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(),
new MyStoreHook() {
@Override
long getSmallestReadPoint(HStore store) {
return seqId + 3;
}
});

store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing);
store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing);
store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing);

store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing);
store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing);
store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing);

store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing);
store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing);

Scan scan = new Scan().withStartRow(r1);

try (final InternalScanner scanner =
new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) {
@Override
protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
CellComparator comparator) throws IOException {
return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> {
if (recordBlockSizeCallCount == 6) {
try {
flushStore(store, id++);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
}) {
List<Cell> cellResult = new ArrayList<>();

scanner.next(cellResult);
assertEquals(0, cellResult.size());

cellResult.clear();

scanner.next(cellResult);
assertEquals(3, cellResult.size());
for (Cell cell : cellResult) {
assertArrayEquals(r2, CellUtil.cloneRow(cell));
}
}
}

@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
Expand Down Expand Up @@ -3137,6 +3212,28 @@ public List<T> subList(int fromIndex, int toIndex) {
}
}

private interface MyKeyValueHeapHook {
void onRecordBlockSize(int recordBlockSizeCallCount);
}

private static class MyKeyValueHeap extends KeyValueHeap {
private final MyKeyValueHeapHook hook;
private int recordBlockSizeCallCount;

public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator,
MyKeyValueHeapHook hook) throws IOException {
super(scanners, comparator);
this.hook = hook;
}

@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
recordBlockSizeCallCount++;
hook.onRecordBlockSize(recordBlockSizeCallCount);
super.recordBlockSize(blockSizeConsumer);
}
}

public static class MyCompactingMemStore2 extends CompactingMemStore {
private static final String LARGE_CELL_THREAD_NAME = "largeCellThread";
private static final String SMALL_CELL_THREAD_NAME = "smallCellThread";
Expand Down