Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,7 @@ public class FSHLog implements WAL {
*/
private final RingBufferEventHandler ringBufferEventHandler;

/**
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
* <p>
*/
private final ThreadLocal<SyncFuture> cachedSyncFutures;
private final SyncFutureCache syncFutureCache;

/**
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
Expand Down Expand Up @@ -597,12 +592,7 @@ public boolean accept(final Path fileName) {
this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
@Override
protected SyncFuture initialValue() {
return new SyncFuture();
}
};
this.syncFutureCache = new SyncFutureCache(conf);
// Starting up threads in constructor is a no no; Interface should have an init call.
this.disruptor.start();
}
Expand Down Expand Up @@ -1126,6 +1116,10 @@ public void shutdown() throws IOException {
// With disruptor down, this is safe to let go.
if (this.appendExecutor != null) this.appendExecutor.shutdown();

if (syncFutureCache != null) {
syncFutureCache.clear();
}

// Tell our listeners that the log is closing
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
Expand Down Expand Up @@ -1496,7 +1490,8 @@ private long getSequenceOnRingBuffer() {
return this.disruptor.getRingBuffer().next();
}

private SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
@InterfaceAudience.Private
public SyncFuture publishSyncOnRingBuffer(Span span, boolean forceSync) {
long sequence = this.disruptor.getRingBuffer().next();
return publishSyncOnRingBuffer(sequence, span, forceSync);
}
Expand All @@ -1523,10 +1518,6 @@ private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
syncFuture.get(walSyncTimeout);
return syncFuture.getSpan();
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong
// result.
this.cachedSyncFutures.remove();
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
Expand All @@ -1544,7 +1535,7 @@ private IOException convertInterruptedExceptionToIOException(final InterruptedEx
}

private SyncFuture getSyncFuture(final long sequence, Span span) {
return cachedSyncFutures.get().reset(sequence);
return syncFutureCache.getIfPresentOrNew().reset(sequence);
}

private void postSync(final long timeInNanos, final int handlerSyncs) {
Expand Down Expand Up @@ -1815,6 +1806,10 @@ SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
return syncFuture;
}

boolean isSafePointAttained() {
return safePointAttainedLatch.getCount() == 0;
}

/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals
* Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
Expand Down Expand Up @@ -1902,7 +1897,7 @@ private void cleanupOutstandingSyncsOnException(final long sequence, final Excep
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
this.syncFutures[i].done(sequence, e);
}
this.syncFuturesCount.set(0);
offerDoneSyncsBackToCache();
}

/**
Expand Down Expand Up @@ -2018,12 +2013,25 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
new DamagedWALException("On sync", this.exception));
}
attainSafePoint(sequence);
this.syncFuturesCount.set(0);
// It is critical that we offer the futures back to the cache for reuse here after the
// safe point is attained and all the clean up has been done. There have been
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
offerDoneSyncsBackToCache();
} catch (Throwable t) {
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
}
}

/**
* Offers the finished syncs back to the cache for reuse.
*/
private void offerDoneSyncsBackToCache() {
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
syncFutureCache.offer(syncFutures[i]);
}
this.syncFuturesCount.set(0);
}

SafePointZigZagLatch attainSafePoint() {
this.zigzagLatch = new SafePointZigZagLatch();
return this.zigzagLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ synchronized SyncFuture reset(final long sequence, Span span) {

@Override
public synchronized String toString() {
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence +
" threadID=" + t.getId() + " threadName=" + t.getName();
}

synchronized long getRingBufferSequence() {
Expand Down Expand Up @@ -191,6 +192,15 @@ public synchronized long get(long timeout) throws InterruptedException,
return this.doneSequence;
}

/**
* Returns the thread that owned this sync future, use with caution as we return the reference to
* the actual thread object.
* @return the associated thread instance.
*/
public Thread getThread() {
return t;
}

public Long get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;

/**
* A cache of {@link SyncFuture}s. This class supports two methods
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer(
* org.apache.hadoop.hbase.regionserver.wal.SyncFuture)}.
*
* Usage pattern:
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
* sf.reset(...);
* // Use the sync future
* finally: syncFutureCache.offer(sf);
*
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
*/
public final class SyncFutureCache {

private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;

private final Cache<Thread, SyncFuture> syncFutureCache;

public SyncFutureCache(final Configuration conf) {
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
}

public SyncFuture getIfPresentOrNew() {
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
return (future == null) ? new SyncFuture() : future;
}

/**
* Offers the sync future back to the cache for reuse.
*/
public void offer(SyncFuture syncFuture) {
// It is ok to overwrite an existing mapping.
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
}

public void clear() {
if (syncFutureCache != null) {
syncFutureCache.invalidateAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang.mutable.MutableBoolean;
Expand All @@ -53,6 +55,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -69,6 +72,7 @@
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -85,6 +89,8 @@
public class TestFSHLog {
private static final Log LOG = LogFactory.getLog(TestFSHLog.class);

private static final long TEST_TIMEOUT_MS = 10000;

protected static Configuration conf;
protected static FileSystem fs;
protected static Path dir;
Expand Down Expand Up @@ -162,6 +168,87 @@ public void testWALCoprocessorLoaded() throws Exception {
}
}

/**
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
*/
@Test
public void testDeadlockWithSyncOverwrites() throws Exception {
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);

class FailingWriter implements WALProvider.Writer {
@Override public void sync(boolean forceSync) throws IOException {
throw new IOException("Injected failure..");
}

@Override public void append(WAL.Entry entry) throws IOException {
}

@Override public long getLength() throws IOException {
return 0;
}
@Override public void close() throws IOException {
}
}

/*
* Custom FSHLog implementation with a conditional wait before attaining safe point.
*/
class CustomFSHLog extends FSHLog {
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}

@Override
protected void beforeWaitOnSafePoint() {
try {
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

try (FSHLog log = new CustomFSHLog(fs, walRootDir, dir.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) {
log.setWriter(new FailingWriter());
Field ringBufferEventHandlerField =
FSHLog.class.getDeclaredField("ringBufferEventHandler");
ringBufferEventHandlerField.setAccessible(true);
FSHLog.RingBufferEventHandler ringBufferEventHandler =
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
// Force a safe point
final FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
try {
final SyncFuture future0 = log.publishSyncOnRingBuffer(null, false);
// Wait for the sync to be done.
Waiter.waitFor(conf, TEST_TIMEOUT_MS, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return future0.isDone();
}
});
// Publish another sync from the same thread, this should not overwrite the done sync.
SyncFuture future1 = log.publishSyncOnRingBuffer(null, false);
assertFalse(future1.isDone());
// Unblock the safe point trigger..
blockBeforeSafePoint.countDown();
// Wait for the safe point to be reached. With the deadlock in HBASE-25984, this is never
// possible, thus blocking the sync pipeline.
Waiter.waitFor(conf, TEST_TIMEOUT_MS, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return latch.isSafePointAttained();
}
});
} finally {
// Force release the safe point, for the clean up.
latch.releaseSafePoint();
}
}
}

protected void addEdits(WAL log,
HRegionInfo hri,
HTableDescriptor htd,
Expand Down
Loading