Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
}

@VisibleForTesting
Map<WAL, Boolean> getWalNeedsRoll() {
return this.walNeedsRoll;
Map<WAL, RollController> getWalNeedsRoll() {
return this.wals;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -58,31 +58,31 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread

protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";

protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

walRolls? instead of wals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollWals? more suitable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have all wal entries once the wal instance is created. Not just when it needs a roll. So ya these were wals which needed roll at some point. So wals name also fine IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

protected final T abortable;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private final long checkLowReplicationInterval;

private volatile boolean running = true;

public void addWAL(WAL wal) {
// check without lock first
if (walNeedsRoll.containsKey(wal)) {
if (wals.containsKey(wal)) {
return;
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this talking about what this PR is trying to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the todo is pre-existing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ramkrish86 the purpose of this PR is to make each wal separate roll when using multiwal. thanks review.

synchronized (AbstractWALRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already done wals.putIfAbsent(wal, new RollController(wal)) above.
Hence, wals.computeIfAbsent() is needed here? Should we not directly get the value with RollController controller = wals.get(wal) and expect non-null object?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Q. In fact I also thought when reviewed this. Ideally speaking we should get the addWAL call 1st which will add the instance to the Map. When we get call here the wal should be in the map already. But if u see the cur impl, there is no such contract enforcing. It just add the WAL with True value. So believe while making patch, @WenFeiYi went with similar lines.
We can consider this.. Need to see any chance we get a rollReq before adding.. While RS start, we do some rollReq on WALs.. This introduced some bug in the past. We need to see that closely.. If we can confirm that we can add that contract enforcing and so what u suggested. I would say add a TODO here and raise another issue. This went through multiple cycles of changes. :-) U ok Viraj?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is my idea, if there is a change, the impact will be greater, it will be far away from the purpose of the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this is critical code anyways, we can live with bit extra atomic calls on Concurrent Map. Sounds good, TODO should make this better :)

Nice work @WenFeiYi .

controller.requestRoll();
AbstractWALRoller.this.notifyAll();
}
}
Expand All @@ -93,9 +93,8 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) {

public void requestRollAll() {
synchronized (this) {
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
for (WAL wal : wals) {
walNeedsRoll.put(wal, Boolean.TRUE);
for (RollController controller : wals.values()) {
controller.requestRoll();
}
notifyAll();
}
Expand All @@ -115,9 +114,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
*/
private void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
for (Entry<WAL, RollController> entry : wals.entrySet()) {
WAL wal = entry.getKey();
boolean needRollAlready = entry.getValue();
boolean needRollAlready = entry.getValue().needsRoll(now);
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
Expand All @@ -133,7 +132,7 @@ private void abort(String reason, Throwable cause) {
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
// is already broken.
for (WAL wal : walNeedsRoll.keySet()) {
for (WAL wal : wals.keySet()) {
// shutdown rather than close here since we are going to abort the RS and the wals need to be
// split when recovery
try {
Expand All @@ -148,53 +147,49 @@ private void abort(String reason, Throwable cause) {
@Override
public void run() {
while (running) {
boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
// Time for periodic roll, fall through
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
// WAL roll requested, fall through
LOG.debug("WAL roll requested");
} else {
try {
wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// restore the interrupt state
Thread.currentThread().interrupt();
}
// goto the beginning to check whether again whether we should fall through to roll
// several WALs, and also check whether we should quit.
continue;
synchronized (this) {
if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
try {
wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// restore the interrupt state
Thread.currentThread().interrupt();
}
// goto the beginning to check whether again whether we should fall through to roll
// several WALs, and also check whether we should quit.
continue;
}
}
try {
this.lastRollTime = System.currentTimeMillis();
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
.hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator();
iter.hasNext();) {
Entry<WAL, RollController> entry = iter.next();
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
Map<byte[], List<byte[]>> regionsToFlush = null;
RollController controller = entry.getValue();
if (controller.isRollRequested()) {
// WAL roll requested, fall through
LOG.debug("WAL {} roll requested", wal);
} else if (controller.needsPeriodicRoll(now)){
// Time for periodic roll, fall through
LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
} else {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is they key. We ensure we only roll if one of the condition is met (if either size reach caused a log roll ) or the time elapsed. That is also tracked per wal.

}
try {
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an collection of actual region and family names.
regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now);
if (regionsToFlush != null) {
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
}
}
} catch (WALClosedException e) {
LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
iter.remove();
}
if (regionsToFlush != null) {
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
}
}
afterRoll(wal);
}
} catch (FailedLogCloseException | ConnectException e) {
Expand Down Expand Up @@ -232,7 +227,9 @@ private boolean isWaiting() {
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking its usage, I think this API impl is already buggy. This just checks the status of the boolean. Once we start a roll on a WAL, we reset the boolean (Even before this patch). So it is not clearly telling anything abt the roll status. This can return true even while an active wal roll is going on. We can keep it as an another jira and fix (if required).. Just add some TODO comments here.
We might need another boolean in Controller which clearly tracks whether we are ongoing a roll. So this really need to check that status as well as a requested roll status.

// TODO add a status field of roll in RollController
return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis()))
&& isWaiting();
}

/**
Expand All @@ -249,4 +246,43 @@ public void close() {
running = false;
interrupt();
}

/**
* Independently control the roll of each wal. When use multiwal,
* can avoid all wal roll together. see HBASE-24665 for detail
*/
protected class RollController {
private final WAL wal;
private final AtomicBoolean rollRequest;
private long lastRollTime;

RollController(WAL wal) {
this.wal = wal;
this.rollRequest = new AtomicBoolean(false);
this.lastRollTime = System.currentTimeMillis();
}

public void requestRoll() {
this.rollRequest.set(true);
}

public Map<byte[], List<byte[]>> rollWal(long now) throws IOException {
this.lastRollTime = now;
// reset the flag in front to avoid missing roll request before we return from rollWriter.
this.rollRequest.set(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey this is where we need to reset it even before we do actual roll work.. We were doing that. We had a comment also.
// reset the flag in front to avoid missing roll request before we return from rollWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you very much! I also noticed this, but in order to avoid repeated roll, so fix to that. if use boolean, roll at most once, the impact can be ignored. Thanks for review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry did not get. How AtomicBoolean is different compared to boolean wrt repeated roll possibility. In the past we used to keep the Boolean state in a Map. Now in this object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after reset flag, before roll is complete, if have a roll request, that will also cause an extra roll.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is happening now also. We intentionally reset the boolean when we start roll itself.
// reset the flag in front to avoid missing roll request before we return from rollWriter.
So there is no change for that right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I go it

return wal.rollWriter(true);
}

public boolean isRollRequested() {
return rollRequest.get();
}

public boolean needsPeriodicRoll(long now) {
return (now - this.lastRollTime) > rollPeriod;
}

public boolean needsRoll(long now) {
return isRollRequested() || needsPeriodicRoll(now);
}
}
}
Loading