Skip to content

Commit 578fa6a

Browse files
authored
HBASE-27519 Another case for FNFE on StoreFileScanner after a flush f… (#4912)
* HBASE-27519 Another case for FNFE on StoreFileScanner after a flush followed by a compaction Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent e1ad781 commit 578fa6a

File tree

4 files changed

+158
-6
lines changed

4 files changed

+158
-6
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
3131
long getReadPoint();
3232

3333
/**
34-
* Notify observers.
34+
* Notify observers. <br/>
35+
* NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every
36+
* {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a
37+
* concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is
38+
* invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously
39+
* in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or
40+
* {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived.
3541
* @param sfs The new files
3642
* @param memStoreScanners scanner of current memstore
3743
* @throws IOException e

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -885,15 +885,29 @@ private long getTotalSize(Collection<HStoreFile> sfs) {
885885
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
886886
}
887887

888-
private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
888+
private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {
889889
// NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
890890
// close {@link DefaultMemStore#snapshot}, which may be used by
891891
// {@link DefaultMemStore#getScanners}.
892892
storeEngine.addStoreFiles(sfs,
893-
snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
893+
// NOTE: here we must increase the refCount for storeFiles because we would open the
894+
// storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
895+
// If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
896+
// CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
897+
// HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
898+
// storeEngine lock. see HBASE-27519 for more details.
899+
snapshotId > 0 ? () -> {
900+
this.memstore.clearSnapshot(snapshotId);
901+
HStoreFile.increaseStoreFilesRefeCount(sfs);
902+
} : () -> {
903+
HStoreFile.increaseStoreFilesRefeCount(sfs);
894904
});
895905
// notify to be called here - only in case of flushes
896-
notifyChangedReadersObservers(sfs);
906+
try {
907+
notifyChangedReadersObservers(sfs);
908+
} finally {
909+
HStoreFile.decreaseStoreFilesRefeCount(sfs);
910+
}
897911
if (LOG.isTraceEnabled()) {
898912
long totalSize = getTotalSize(sfs);
899913
String traceMessage = "FLUSH time,count,size,store size,store files ["
@@ -961,7 +975,13 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
961975
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
962976
includeStartRow, stopRow, includeStopRow);
963977
memStoreScanners = this.memstore.getScanners(readPt);
964-
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
978+
// NOTE: here we must increase the refCount for storeFiles because we would open the
979+
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
980+
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
981+
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
982+
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
983+
// for more details.
984+
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
965985
} finally {
966986
this.storeEngine.readUnlock();
967987
}
@@ -982,7 +1002,7 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
9821002
clearAndClose(memStoreScanners);
9831003
throw t instanceof IOException ? (IOException) t : new IOException(t);
9841004
} finally {
985-
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
1005+
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
9861006
}
9871007
}
9881008

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.io.UnsupportedEncodingException;
2222
import java.net.URLEncoder;
23+
import java.util.Collection;
2324
import java.util.Collections;
2425
import java.util.HashSet;
2526
import java.util.Map;
@@ -49,6 +50,7 @@
4950
import org.slf4j.LoggerFactory;
5051

5152
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
53+
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
5254

5355
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
5456

@@ -643,4 +645,26 @@ public OptionalLong getMaximumTimestamp() {
643645
Set<String> getCompactedStoreFiles() {
644646
return Collections.unmodifiableSet(this.compactedStoreFiles);
645647
}
648+
649+
long increaseRefCount() {
650+
return this.fileInfo.refCount.incrementAndGet();
651+
}
652+
653+
long decreaseRefCount() {
654+
return this.fileInfo.refCount.decrementAndGet();
655+
}
656+
657+
static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
658+
if (CollectionUtils.isEmpty(storeFiles)) {
659+
return;
660+
}
661+
storeFiles.forEach(HStoreFile::increaseRefCount);
662+
}
663+
664+
static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
665+
if (CollectionUtils.isEmpty(storeFiles)) {
666+
return;
667+
}
668+
storeFiles.forEach(HStoreFile::decreaseRefCount);
669+
}
646670
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.mockito.Mockito.verify;
3232
import static org.mockito.Mockito.when;
3333

34+
import java.io.FileNotFoundException;
3435
import java.io.IOException;
3536
import java.lang.ref.SoftReference;
3637
import java.security.PrivilegedExceptionAction;
@@ -44,6 +45,7 @@
4445
import java.util.NavigableSet;
4546
import java.util.Optional;
4647
import java.util.TreeSet;
48+
import java.util.concurrent.BrokenBarrierException;
4749
import java.util.concurrent.ConcurrentSkipListSet;
4850
import java.util.concurrent.CountDownLatch;
4951
import java.util.concurrent.CyclicBarrier;
@@ -1530,6 +1532,106 @@ public void getScanners(MyStore store) throws IOException {
15301532
}
15311533
}
15321534

1535+
/**
1536+
* This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
1537+
* Compaction execute concurrently and theCcompaction compact and archive the flushed
1538+
* {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
1539+
* HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
1540+
*/
1541+
@Test
1542+
public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
1543+
Configuration conf = HBaseConfiguration.create();
1544+
conf.setBoolean(WALFactory.WAL_ENABLED, false);
1545+
conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
1546+
byte[] r0 = Bytes.toBytes("row0");
1547+
byte[] r1 = Bytes.toBytes("row1");
1548+
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
1549+
final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
1550+
// Initialize region
1551+
final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
1552+
@Override
1553+
public void getScanners(MyStore store) throws IOException {
1554+
try {
1555+
// Here this method is called by StoreScanner.updateReaders which is invoked by the
1556+
// following TestHStore.flushStore
1557+
if (shouldWaitRef.get()) {
1558+
// wait the following compaction Task start
1559+
cyclicBarrier.await();
1560+
// wait the following HStore.closeAndArchiveCompactedFiles end.
1561+
cyclicBarrier.await();
1562+
}
1563+
} catch (BrokenBarrierException | InterruptedException e) {
1564+
throw new RuntimeException(e);
1565+
}
1566+
}
1567+
});
1568+
1569+
final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
1570+
Runnable compactionTask = () -> {
1571+
try {
1572+
// Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
1573+
// entering the MyStore.getScanners, compactionTask could start.
1574+
cyclicBarrier.await();
1575+
region.compactStore(family, new NoLimitThroughputController());
1576+
myStore.closeAndArchiveCompactedFiles();
1577+
// Notify StoreScanner.updateReaders could enter MyStore.getScanners.
1578+
cyclicBarrier.await();
1579+
} catch (Throwable e) {
1580+
compactionExceptionRef.set(e);
1581+
}
1582+
};
1583+
1584+
long ts = EnvironmentEdgeManager.currentTime();
1585+
long seqId = 100;
1586+
byte[] value = Bytes.toBytes("value");
1587+
// older data whihc shouldn't be "seen" by client
1588+
myStore.add(createCell(r0, qf1, ts, seqId, value), null);
1589+
flushStore(myStore, id++);
1590+
myStore.add(createCell(r0, qf2, ts, seqId, value), null);
1591+
flushStore(myStore, id++);
1592+
myStore.add(createCell(r0, qf3, ts, seqId, value), null);
1593+
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1594+
quals.add(qf1);
1595+
quals.add(qf2);
1596+
quals.add(qf3);
1597+
1598+
myStore.add(createCell(r1, qf1, ts, seqId, value), null);
1599+
myStore.add(createCell(r1, qf2, ts, seqId, value), null);
1600+
myStore.add(createCell(r1, qf3, ts, seqId, value), null);
1601+
1602+
Thread.currentThread()
1603+
.setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
1604+
Scan scan = new Scan();
1605+
scan.withStartRow(r0, true);
1606+
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
1607+
List<Cell> results = new MyList<>(size -> {
1608+
switch (size) {
1609+
case 1:
1610+
shouldWaitRef.set(true);
1611+
Thread thread = new Thread(compactionTask);
1612+
thread.setName("MyCompacting Thread.");
1613+
thread.start();
1614+
try {
1615+
flushStore(myStore, id++);
1616+
thread.join();
1617+
} catch (IOException | InterruptedException e) {
1618+
throw new RuntimeException(e);
1619+
}
1620+
shouldWaitRef.set(false);
1621+
break;
1622+
default:
1623+
break;
1624+
}
1625+
});
1626+
// Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
1627+
// which used by StoreScanner.updateReaders is deleted by compactionTask.
1628+
scanner.next(results);
1629+
// The results is r0 row cells.
1630+
assertEquals(3, results.size());
1631+
assertTrue(compactionExceptionRef.get() == null);
1632+
}
1633+
}
1634+
15331635
@Test
15341636
public void testReclaimChunkWhenScaning() throws IOException {
15351637
init("testReclaimChunkWhenScaning");

0 commit comments

Comments
 (0)