Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen
}

protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
}

protected boolean isLogRollRequested() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -131,10 +130,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);

private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
int c = Long.compare(o1.getTxid(), o2.getTxid());
return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
};
private static final Comparator<SyncFuture> SEQ_COMPARATOR = Comparator.comparingLong(
SyncFuture::getTxid).thenComparingInt(System::identityHashCode);

public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
Expand Down Expand Up @@ -371,7 +368,8 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim
// sync futures then just use the default one.
private boolean isHsync(long beginTxid, long endTxid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to note that above the methods are changed from getTxid to getTxId but then here we have beginTxid... super nit... but you might want them to be consistent? Just leave the getTxid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole Txid change mess was unintentional, undone.

SortedSet<SyncFuture> futures =
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
new SyncFuture().reset(endTxid + 1, false));
if (futures.isEmpty()) {
return useHsync;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,9 @@ static class SafePointZigZagLatch {
private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);

private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException {
if (syncFuture.isThrowable()) {
throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
Throwable t = syncFuture.getThrowable();
if (t != null) {
throw new FailedSyncBeforeLogCloseException(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -19,7 +19,8 @@

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -44,15 +45,23 @@
*/
@InterfaceAudience.Private
class SyncFuture {
// Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
// to coordinate on but it did not give any obvious advantage and some issues with order in which
// events happen.

private static final long NOT_DONE = -1L;
private Thread t;

/**
* The transaction id of this operation, monotonically increases.
* Lock protecting the thread-safe fields.
*/
private final ReentrantLock doneLock;

/**
* Condition to wait on for client threads.
*/
private final Condition doneCondition;

/*
* Fields below are protected by {@link SyncFuture#doneLock}.
*/
private long txid;

/**
* The transaction id that was set in here when we were marked done. Should be equal or > txnId.
Expand All @@ -65,16 +74,30 @@ class SyncFuture {
*/
private Throwable throwable;

private Thread t;
/*
* Fields below are created once at reset() and accessed without any lock. Should be ok as they
* are immutable for this instance of sync future until it is reset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it doing all this handling just so we reuse SyncFuture instances? Maybe a later experiment would be trying to create a SyncFuture every time? Just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, the whole SyncFuture caching seems unnecessary, I was discussing this with my colleagues at work. cc: @apurtell @d-c-manning

I don't have the full historical context here but if I were to guess I think it is because we don't want GC to (over) work with millions of these small sync future objects since they are created once per mutation. May be the latest versions of Java garbage collectors don't need these kind of optimizations, agree that this needs more performance analysis. Let me create a jira to perf analyze the removal of this.

*/

/**
* The transaction id of this operation, monotonically increases.
*/
private long txid;

private boolean forceSync;

SyncFuture() {
this.doneLock = new ReentrantLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two fields could be marked as final if we never change them after construction.

this.doneCondition = doneLock.newCondition();
}

/**
* Call this method to clear old usage and get it ready for new deploy.
*
* @param txid the new transaction id
* @return this
*/
synchronized SyncFuture reset(long txid) {
SyncFuture reset(long txid, boolean forceSync) {
if (t != null && t != Thread.currentThread()) {
throw new IllegalStateException();
}
Expand All @@ -83,30 +106,26 @@ synchronized SyncFuture reset(long txid) {
throw new IllegalStateException("" + txid + " " + Thread.currentThread());
}
this.doneTxid = NOT_DONE;
this.forceSync = forceSync;
this.txid = txid;
this.throwable = null;
return this;
}

@Override
public synchronized String toString() {
public String toString() {
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
" threadName=" + t.getName();
}

synchronized long getTxid() {
long getTxid() {
return this.txid;
}

synchronized boolean isForceSync() {
boolean isForceSync() {
return forceSync;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be covered by the lock? The comment at head of the class says this data member should be covered by the lock? ("all below")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also this comment..

 /*
   * Fields below are created once at reset() and accessed without any lock. Should be ok as they
   * are immutable for this instance of sync future until it is reset.

}

synchronized SyncFuture setForceSync(boolean forceSync) {
this.forceSync = forceSync;
return this;
}

/**
* Returns the thread that owned this sync future, use with caution as we return the reference to
* the actual thread object.
Expand All @@ -122,55 +141,67 @@ Thread getThread() {
* @return True if we successfully marked this outstanding future as completed/done. Returns false
* if this future is already 'done' when this method called.
*/
synchronized boolean done(final long txid, final Throwable t) {
if (isDone()) {
return false;
}
this.throwable = t;
if (txid < this.txid) {
// Something badly wrong.
if (throwable == null) {
this.throwable =
new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid);
boolean done(final long txid, final Throwable t) {
doneLock.lock();
try {
if (doneTxid != NOT_DONE) {
return false;
}
this.throwable = t;
if (txid < this.txid) {
// Something badly wrong.
if (throwable == null) {
this.throwable =
new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid);
}
}
// Mark done.
this.doneTxid = txid;
doneCondition.signalAll();
return true;
} finally {
doneLock.unlock();
}
// Mark done.
this.doneTxid = txid;
// Wake up waiting threads.
notify();
return true;
}

boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}

synchronized long get(long timeoutNs) throws InterruptedException,
long get(long timeoutNs) throws InterruptedException,
ExecutionException, TimeoutIOException {
final long done = System.nanoTime() + timeoutNs;
while (!isDone()) {
wait(1000);
if (System.nanoTime() >= done) {
throw new TimeoutIOException(
"Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
+ " ms for txid=" + this.txid + ", WAL system stuck?");
doneLock.lock();
try {
while (doneTxid == NOT_DONE) {
if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) {
throw new TimeoutIOException("Failed to get sync result after "
+ TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms for txid=" + this.txid
+ ", WAL system stuck?");
}
}
if (this.throwable != null) {
throw new ExecutionException(this.throwable);
}
return this.doneTxid;
} finally {
doneLock.unlock();
}
if (this.throwable != null) {
throw new ExecutionException(this.throwable);
}
return this.doneTxid;
}

synchronized boolean isDone() {
return this.doneTxid != NOT_DONE;
}

synchronized boolean isThrowable() {
return isDone() && getThrowable() != null;
boolean isDone() {
doneLock.lock();
try {
return this.doneTxid != NOT_DONE;
} finally {
doneLock.unlock();
}
}

synchronized Throwable getThrowable() {
return this.throwable;
Throwable getThrowable() {
doneLock.lock();
try {
if (doneTxid == NOT_DONE) {
return null;
}
return this.throwable;
} finally {
doneLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class TestSyncFuture {
public void testGet() throws Exception {
long timeout = 5000;
long txid = 100000;
SyncFuture syncFulture = new SyncFuture().reset(txid);
SyncFuture syncFulture = new SyncFuture().reset(txid, false);
syncFulture.done(txid, null);
assertEquals(txid, syncFulture.get(timeout));

syncFulture.reset(txid).get(timeout);
syncFulture.reset(txid, false).get(timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public void testSyncFutureCacheLifeCycle() throws Exception {
final Configuration conf = HBaseConfiguration.create();
SyncFutureCache cache = new SyncFutureCache(conf);
try {
SyncFuture future0 = cache.getIfPresentOrNew().reset(0);
SyncFuture future0 = cache.getIfPresentOrNew().reset(0, false);
assertNotNull(future0);
// Get another future from the same thread, should be different one.
SyncFuture future1 = cache.getIfPresentOrNew().reset(1);
SyncFuture future1 = cache.getIfPresentOrNew().reset(1, false);
assertNotNull(future1);
assertNotSame(future0, future1);
cache.offer(future1);
Expand All @@ -55,7 +55,8 @@ public void testSyncFutureCacheLifeCycle() throws Exception {
assertEquals(future3, future0);
final SyncFuture[] future4 = new SyncFuture[1];
// From a different thread
CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get();
CompletableFuture.runAsync(() ->
future4[0] = cache.getIfPresentOrNew().reset(4, false)).get();
assertNotNull(future4[0]);
assertNotSame(future3, future4[0]);
// Clean up
Expand Down