Skip to content

Commit 8e0913c

Browse files
bharathvBharath Vissapragada
authored andcommitted
HBASE-25984: Avoid premature reuse of sync futures in FSHLog (#3371)
Signed-off-by: Viraj Jasani [email protected] (cherry picked from commit 5a19bcf)
1 parent a40f458 commit 8e0913c

5 files changed

Lines changed: 235 additions & 21 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,7 @@ public class FSHLog implements WAL {
202202
*/
203203
private final RingBufferEventHandler ringBufferEventHandler;
204204

205-
/**
206-
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
207-
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
208-
* <p>
209-
*/
210-
private final ThreadLocal<SyncFuture> cachedSyncFutures;
205+
private final SyncFutureCache syncFutureCache;
211206

212207
/**
213208
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
@@ -597,12 +592,7 @@ public boolean accept(final Path fileName) {
597592
this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
598593
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
599594
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
600-
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
601-
@Override
602-
protected SyncFuture initialValue() {
603-
return new SyncFuture();
604-
}
605-
};
595+
this.syncFutureCache = new SyncFutureCache(conf);
606596
// Starting up threads in constructor is a no no; Interface should have an init call.
607597
this.disruptor.start();
608598
}
@@ -1126,6 +1116,10 @@ public void shutdown() throws IOException {
11261116
// With disruptor down, this is safe to let go.
11271117
if (this.appendExecutor != null) this.appendExecutor.shutdown();
11281118

1119+
if (syncFutureCache != null) {
1120+
syncFutureCache.clear();
1121+
}
1122+
11291123
// Tell our listeners that the log is closing
11301124
if (!this.listeners.isEmpty()) {
11311125
for (WALActionsListener i : this.listeners) {
@@ -1496,7 +1490,8 @@ private long getSequenceOnRingBuffer() {
14961490
return this.disruptor.getRingBuffer().next();
14971491
}
14981492

1499-
private SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
1493+
@InterfaceAudience.Private
1494+
public SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
15001495
long sequence = this.disruptor.getRingBuffer().next();
15011496
return publishSyncOnRingBuffer(sequence, span, forceSync);
15021497
}
@@ -1523,10 +1518,6 @@ private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
15231518
syncFuture.get(walSyncTimeout);
15241519
return syncFuture.getSpan();
15251520
} catch (TimeoutIOException tioe) {
1526-
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
1527-
// still refer to it, so if this thread use it next time may get a wrong
1528-
// result.
1529-
this.cachedSyncFutures.remove();
15301521
throw tioe;
15311522
} catch (InterruptedException ie) {
15321523
LOG.warn("Interrupted", ie);
@@ -1544,7 +1535,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
15441535
}
15451536

15461537
private SyncFuture getSyncFuture(final long sequence, Span span) {
1547-
return cachedSyncFutures.get().reset(sequence);
1538+
return syncFutureCache.getIfPresentOrNew().reset(sequence);
15481539
}
15491540

15501541
private void postSync(final long timeInNanos, final int handlerSyncs) {
@@ -1815,6 +1806,10 @@ SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
18151806
return syncFuture;
18161807
}
18171808

1809+
boolean isSafePointAttained() {
1810+
return safePointAttainedLatch.getCount() == 0;
1811+
}
1812+
18181813
/**
18191814
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
18201815
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -1902,7 +1897,7 @@ private void cleanupOutstandingSyncsOnException(final long sequence, final Excep
19021897
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
19031898
this.syncFutures[i].done(sequence, e);
19041899
}
1905-
this.syncFuturesCount.set(0);
1900+
offerDoneSyncsBackToCache();
19061901
}
19071902

19081903
/**
@@ -2018,12 +2013,25 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
20182013
new DamagedWALException("On sync", this.exception));
20192014
}
20202015
attainSafePoint(sequence);
2021-
this.syncFuturesCount.set(0);
2016+
// It is critical that we offer the futures back to the cache for reuse here after the
2017+
// safe point is attained and all the clean up has been done. There have been
2018+
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
2019+
offerDoneSyncsBackToCache();
20222020
} catch (Throwable t) {
20232021
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
20242022
}
20252023
}
20262024

2025+
/**
2026+
* Offers the finished syncs back to the cache for reuse.
2027+
*/
2028+
private void offerDoneSyncsBackToCache() {
2029+
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
2030+
syncFutureCache.offer(syncFutures[i]);
2031+
}
2032+
this.syncFuturesCount.set(0);
2033+
}
2034+
20272035
SafePointZigZagLatch attainSafePoint() {
20282036
this.zigzagLatch = new SafePointZigZagLatch();
20292037
return this.zigzagLatch;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ synchronized SyncFuture reset(final long sequence, Span span) {
115115

116116
@Override
117117
public synchronized String toString() {
118-
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
118+
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence +
119+
" threadID=" + t.getId() + " threadName=" + t.getName();
119120
}
120121

121122
synchronized long getRingBufferSequence() {
@@ -191,6 +192,15 @@ public synchronized long get(long timeout) throws InterruptedException,
191192
return this.doneSequence;
192193
}
193194

195+
/**
196+
* Returns the thread that owned this sync future, use with caution as we return the reference to
197+
* the actual thread object.
198+
* @return the associated thread instance.
199+
*/
200+
public Thread getThread() {
201+
return t;
202+
}
203+
194204
public Long get(long timeout, TimeUnit unit)
195205
throws InterruptedException, ExecutionException {
196206
throw new UnsupportedOperationException();
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.apache.hadoop.hbase.regionserver.wal;
2+
3+
import com.google.common.cache.Cache;
4+
import com.google.common.cache.CacheBuilder;
5+
import java.util.concurrent.TimeUnit;
6+
import org.apache.hadoop.conf.Configuration;
7+
import org.apache.hadoop.hbase.HConstants;
8+
9+
/**
10+
* A cache of {@link SyncFuture}s. This class supports two methods
11+
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer(
12+
* org.apache.hadoop.hbase.regionserver.wal.SyncFuture)}.
13+
*
14+
* Usage pattern:
15+
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
16+
* sf.reset(...);
17+
* // Use the sync future
18+
* finally: syncFutureCache.offer(sf);
19+
*
20+
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
21+
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
22+
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
23+
*/
24+
public final class SyncFutureCache {
25+
26+
private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
27+
28+
private final Cache<Thread, SyncFuture> syncFutureCache;
29+
30+
public SyncFutureCache(final Configuration conf) {
31+
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
32+
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
33+
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
34+
.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
35+
}
36+
37+
public SyncFuture getIfPresentOrNew() {
38+
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
39+
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
40+
return (future == null) ? new SyncFuture() : future;
41+
}
42+
43+
/**
44+
* Offers the sync future back to the cache for reuse.
45+
*/
46+
public void offer(SyncFuture syncFuture) {
47+
// It is ok to overwrite an existing mapping.
48+
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
49+
}
50+
51+
public void clear() {
52+
if (syncFutureCache != null) {
53+
syncFutureCache.invalidateAll();
54+
}
55+
}
56+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.hbase.regionserver.wal;
2020

2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertNotEquals;
2324
import static org.junit.Assert.assertNotNull;
2425
import static org.junit.Assert.assertTrue;
@@ -34,6 +35,7 @@
3435
import java.util.concurrent.CountDownLatch;
3536
import java.util.concurrent.ExecutorService;
3637
import java.util.concurrent.Executors;
38+
import java.util.concurrent.TimeUnit;
3739
import java.util.concurrent.atomic.AtomicBoolean;
3840

3941
import org.apache.commons.lang.mutable.MutableBoolean;
@@ -53,6 +55,7 @@
5355
import org.apache.hadoop.hbase.HTableDescriptor;
5456
import org.apache.hadoop.hbase.KeyValue;
5557
import org.apache.hadoop.hbase.TableName;
58+
import org.apache.hadoop.hbase.Waiter;
5659
import org.apache.hadoop.hbase.client.Get;
5760
import org.apache.hadoop.hbase.client.Put;
5861
import org.apache.hadoop.hbase.client.Result;
@@ -61,6 +64,7 @@
6164
import org.apache.hadoop.hbase.regionserver.HRegion;
6265
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
6366
import org.apache.hadoop.hbase.regionserver.Region;
67+
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint;
6468
import org.apache.hadoop.hbase.testclassification.MediumTests;
6569
import org.apache.hadoop.hbase.util.Bytes;
6670
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -69,6 +73,7 @@
6973
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
7074
import org.apache.hadoop.hbase.wal.WAL;
7175
import org.apache.hadoop.hbase.wal.WALKey;
76+
import org.apache.hadoop.hbase.wal.WALProvider;
7277
import org.junit.After;
7378
import org.junit.AfterClass;
7479
import org.junit.Before;
@@ -85,6 +90,8 @@
8590
public class TestFSHLog {
8691
private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
8792

93+
private static final long TEST_TIMEOUT_MS = 10000;
94+
8895
protected static Configuration conf;
8996
protected static FileSystem fs;
9097
protected static Path dir;
@@ -162,6 +169,87 @@ public void testWALCoprocessorLoaded() throws Exception {
162169
}
163170
}
164171

172+
/**
173+
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
174+
*/
175+
@Test
176+
public void testDeadlockWithSyncOverwrites() throws Exception {
177+
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
178+
179+
class FailingWriter implements WALProvider.Writer {
180+
@Override public void sync(boolean forceSync) throws IOException {
181+
throw new IOException("Injected failure..");
182+
}
183+
184+
@Override public void append(WAL.Entry entry) throws IOException {
185+
}
186+
187+
@Override public long getLength() throws IOException {
188+
return 0;
189+
}
190+
@Override public void close() throws IOException {
191+
}
192+
}
193+
194+
/*
195+
* Custom FSHLog implementation with a conditional wait before attaining safe point.
196+
*/
197+
class CustomFSHLog extends FSHLog {
198+
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
199+
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
200+
String prefix, String suffix) throws IOException {
201+
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
202+
}
203+
204+
@Override
205+
protected void beforeWaitOnSafePoint() {
206+
try {
207+
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
208+
} catch (InterruptedException e) {
209+
throw new RuntimeException(e);
210+
}
211+
}
212+
}
213+
214+
try (FSHLog log = new CustomFSHLog(fs, walRootDir, dir.toString(),
215+
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) {
216+
log.setWriter(new FailingWriter());
217+
Field ringBufferEventHandlerField =
218+
FSHLog.class.getDeclaredField("ringBufferEventHandler");
219+
ringBufferEventHandlerField.setAccessible(true);
220+
FSHLog.RingBufferEventHandler ringBufferEventHandler =
221+
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
222+
// Force a safe point
223+
final FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
224+
try {
225+
final SyncFuture future0 = log.publishSyncOnRingBuffer(null, false);
226+
// Wait for the sync to be done.
227+
Waiter.waitFor(conf, TEST_TIMEOUT_MS, new Waiter.Predicate<Exception>() {
228+
@Override
229+
public boolean evaluate() throws Exception {
230+
return future0.isDone();
231+
}
232+
});
233+
// Publish another sync from the same thread, this should not overwrite the done sync.
234+
SyncFuture future1 = log.publishSyncOnRingBuffer(null, false);
235+
assertFalse(future1.isDone());
236+
// Unblock the safe point trigger..
237+
blockBeforeSafePoint.countDown();
238+
// Wait for the safe point to be reached.
239+
// With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline.
240+
Waiter.waitFor(conf, TEST_TIMEOUT_MS, new Waiter.Predicate<Exception>() {
241+
@Override
242+
public boolean evaluate() throws Exception {
243+
return latch.isSafePointAttained();
244+
}
245+
});
246+
} finally {
247+
// Force release the safe point, for the clean up.
248+
latch.releaseSafePoint();
249+
}
250+
}
251+
}
252+
165253
protected void addEdits(WAL log,
166254
HRegionInfo hri,
167255
HTableDescriptor htd,
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package org.apache.hadoop.hbase.regionserver.wal;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertNotNull;
5+
import static org.junit.Assert.assertNotSame;
6+
import java.util.concurrent.CompletableFuture;
7+
import org.apache.hadoop.conf.Configuration;
8+
import org.apache.hadoop.hbase.HBaseConfiguration;
9+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
10+
import org.apache.hadoop.hbase.testclassification.SmallTests;
11+
import org.junit.Test;
12+
import org.junit.experimental.categories.Category;
13+
14+
@Category({ RegionServerTests.class, SmallTests.class })
15+
public class TestSyncFutureCache {
16+
17+
@Test
18+
public void testSyncFutureCacheLifeCycle() throws Exception {
19+
final Configuration conf = HBaseConfiguration.create();
20+
final SyncFutureCache cache = new SyncFutureCache(conf);
21+
try {
22+
SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
23+
assertNotNull(future0);
24+
// Get another future from the same thread, should be different one.
25+
SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
26+
assertNotNull(future1);
27+
assertNotSame(future0, future1);
28+
cache.offer(future1);
29+
// Should override.
30+
cache.offer(future0);
31+
SyncFuture future3 = cache.getIfPresentOrNew();
32+
// Should return the cached entry that was first offered back.
33+
assertEquals(future3, future0);
34+
final SyncFuture[] future4 = new SyncFuture[1];
35+
// From a different thread
36+
Thread t = new Thread(new Runnable() {
37+
@Override public void run() {
38+
future4[0] = cache.getIfPresentOrNew().reset(4);
39+
}
40+
});
41+
t.start();
42+
t.join();
43+
assertNotNull(future4[0]);
44+
assertNotSame(future3, future4[0]);
45+
// Clean up
46+
cache.offer(future3);
47+
cache.offer(future4[0]);
48+
} finally {
49+
cache.clear();
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)