|
67 | 67 | import java.util.concurrent.locks.ReentrantReadWriteLock; |
68 | 68 | import java.util.function.Consumer; |
69 | 69 | import java.util.function.IntBinaryOperator; |
| 70 | +import java.util.function.IntConsumer; |
70 | 71 | import org.apache.hadoop.conf.Configuration; |
71 | 72 | import org.apache.hadoop.fs.FSDataOutputStream; |
72 | 73 | import org.apache.hadoop.fs.FileStatus; |
@@ -1277,6 +1278,12 @@ private ExtendedCell createCell(byte[] row, byte[] qualifier, long ts, long sequ |
1277 | 1278 | .setValue(value).setSequenceId(sequenceId).build(); |
1278 | 1279 | } |
1279 | 1280 |
|
| 1281 | + private ExtendedCell createDeleteCell(byte[] row, byte[] qualifier, long ts, long sequenceId) { |
| 1282 | + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row) |
| 1283 | + .setFamily(family).setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.DeleteColumn) |
| 1284 | + .setSequenceId(sequenceId).build(); |
| 1285 | + } |
| 1286 | + |
1280 | 1287 | @Test |
1281 | 1288 | public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException { |
1282 | 1289 | final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false); |
@@ -1421,6 +1428,74 @@ public long getSmallestReadPoint(HStore store) { |
1421 | 1428 | } |
1422 | 1429 | } |
1423 | 1430 |
|
| 1431 | + @Test |
| 1432 | + public void testFlushBeforeCompletingScanWithDeleteCell() throws IOException { |
| 1433 | + final Configuration conf = HBaseConfiguration.create(); |
| 1434 | + |
| 1435 | + byte[] r1 = Bytes.toBytes("row1"); |
| 1436 | + byte[] r2 = Bytes.toBytes("row2"); |
| 1437 | + |
| 1438 | + byte[] value1 = Bytes.toBytes("value1"); |
| 1439 | + byte[] value2 = Bytes.toBytes("value2"); |
| 1440 | + |
| 1441 | + final MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); |
| 1442 | + final long ts = EnvironmentEdgeManager.currentTime(); |
| 1443 | + final long seqId = 100; |
| 1444 | + |
| 1445 | + init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), |
| 1446 | + ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(1).build(), |
| 1447 | + new MyStoreHook() { |
| 1448 | + @Override |
| 1449 | + long getSmallestReadPoint(HStore store) { |
| 1450 | + return seqId + 3; |
| 1451 | + } |
| 1452 | + }); |
| 1453 | + |
| 1454 | + store.add(createCell(r1, qf1, ts + 1, seqId + 1, value2), memStoreSizing); |
| 1455 | + store.add(createCell(r1, qf2, ts + 1, seqId + 1, value2), memStoreSizing); |
| 1456 | + store.add(createCell(r1, qf3, ts + 1, seqId + 1, value2), memStoreSizing); |
| 1457 | + |
| 1458 | + store.add(createDeleteCell(r1, qf1, ts + 2, seqId + 2), memStoreSizing); |
| 1459 | + store.add(createDeleteCell(r1, qf2, ts + 2, seqId + 2), memStoreSizing); |
| 1460 | + store.add(createDeleteCell(r1, qf3, ts + 2, seqId + 2), memStoreSizing); |
| 1461 | + |
| 1462 | + store.add(createCell(r2, qf1, ts + 3, seqId + 3, value1), memStoreSizing); |
| 1463 | + store.add(createCell(r2, qf2, ts + 3, seqId + 3, value1), memStoreSizing); |
| 1464 | + store.add(createCell(r2, qf3, ts + 3, seqId + 3, value1), memStoreSizing); |
| 1465 | + |
| 1466 | + Scan scan = new Scan().withStartRow(r1); |
| 1467 | + |
| 1468 | + try (final InternalScanner scanner = |
| 1469 | + new StoreScanner(store, store.getScanInfo(), scan, null, seqId + 3) { |
| 1470 | + @Override |
| 1471 | + protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, |
| 1472 | + CellComparator comparator) throws IOException { |
| 1473 | + return new MyKeyValueHeap(scanners, comparator, recordBlockSizeCallCount -> { |
| 1474 | + if (recordBlockSizeCallCount == 6) { |
| 1475 | + try { |
| 1476 | + flushStore(store, id++); |
| 1477 | + } catch (IOException e) { |
| 1478 | + throw new RuntimeException(e); |
| 1479 | + } |
| 1480 | + } |
| 1481 | + }); |
| 1482 | + } |
| 1483 | + }) { |
| 1484 | + List<Cell> cellResult = new ArrayList<>(); |
| 1485 | + |
| 1486 | + scanner.next(cellResult); |
| 1487 | + assertEquals(0, cellResult.size()); |
| 1488 | + |
| 1489 | + cellResult.clear(); |
| 1490 | + |
| 1491 | + scanner.next(cellResult); |
| 1492 | + assertEquals(3, cellResult.size()); |
| 1493 | + for (Cell cell : cellResult) { |
| 1494 | + assertArrayEquals(r2, CellUtil.cloneRow(cell)); |
| 1495 | + } |
| 1496 | + } |
| 1497 | + } |
| 1498 | + |
1424 | 1499 | @Test |
1425 | 1500 | public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { |
1426 | 1501 | Configuration conf = HBaseConfiguration.create(); |
@@ -3137,6 +3212,28 @@ public List<T> subList(int fromIndex, int toIndex) { |
3137 | 3212 | } |
3138 | 3213 | } |
3139 | 3214 |
|
| 3215 | + private interface MyKeyValueHeapHook { |
| 3216 | + void onRecordBlockSize(int recordBlockSizeCallCount); |
| 3217 | + } |
| 3218 | + |
| 3219 | + private static class MyKeyValueHeap extends KeyValueHeap { |
| 3220 | + private final MyKeyValueHeapHook hook; |
| 3221 | + private int recordBlockSizeCallCount; |
| 3222 | + |
| 3223 | + public MyKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator, |
| 3224 | + MyKeyValueHeapHook hook) throws IOException { |
| 3225 | + super(scanners, comparator); |
| 3226 | + this.hook = hook; |
| 3227 | + } |
| 3228 | + |
| 3229 | + @Override |
| 3230 | + public void recordBlockSize(IntConsumer blockSizeConsumer) { |
| 3231 | + recordBlockSizeCallCount++; |
| 3232 | + hook.onRecordBlockSize(recordBlockSizeCallCount); |
| 3233 | + super.recordBlockSize(blockSizeConsumer); |
| 3234 | + } |
| 3235 | + } |
| 3236 | + |
3140 | 3237 | public static class MyCompactingMemStore2 extends CompactingMemStore { |
3141 | 3238 | private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; |
3142 | 3239 | private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; |
|
0 commit comments