Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,34 @@ protected void push(final Procedure procedure, final boolean addFront, final boo
* NOTE: this method is called with the sched lock held.
* @return the Procedure to execute, or null if nothing is available.
*/
protected abstract Procedure dequeue();
protected abstract Procedure dequeue(boolean highPriority);

@Override
public Procedure pollHighPriority() {
return poll(-1, true);
}

@Override
public Procedure pollHighPriority(long timeout, TimeUnit unit) {
return poll(unit.toNanos(timeout), true);
}

@Override
public Procedure poll() {
return poll(-1);
return poll(-1, false);
}

@Override
public Procedure poll(long timeout, TimeUnit unit) {
return poll(unit.toNanos(timeout));
return poll(unit.toNanos(timeout), false);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Procedure poll(final long nanos) {
return poll(nanos, false);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
private Procedure poll(final long nanos, final boolean highPriority) {
schedLock();
try {
if (!running) {
Expand All @@ -174,7 +188,7 @@ public Procedure poll(final long nanos) {
return null;
}
}
final Procedure pollResult = dequeue();
final Procedure pollResult = dequeue(highPriority);

pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we use this new functionality? We pass flag once set true for high priority and then later false for all the rest?

IIRC, there is a mechanism for putting procedures at front of the queue already... so we process children before their parent. We could not exploit that mechanism here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a pity we have to add the boolean priority flag as it makes this queue Interface no longer look like the usual queue Interface (they don't usually take a flag like this).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dequeue(highPriority=true) only pull highPriority procedure from the queue.
And dequeue(highPriority=false) will pull procedure from the queue like before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes the new method makes the queue Interface no longer look like the usual.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1967,13 +1967,14 @@ protected WorkerThread(ThreadGroup group, String prefix) {
public void sendStopSignal() {
scheduler.signalAll();
}

@Override
public void run() {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
@SuppressWarnings("unchecked")
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
Procedure<TEnvironment> proc = getProcedure();
if (proc == null) {
continue;
}
Expand Down Expand Up @@ -2025,6 +2026,10 @@ public long getCurrentRunTime() {
protected boolean keepAlive(long lastUpdate) {
return true;
}

protected Procedure<TEnvironment> getProcedure() {
return scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
}
}

// A worker thread which can be added when core workers are stuck. Will timeout after
Expand All @@ -2040,6 +2045,25 @@ protected boolean keepAlive(long lastUpdate) {
}
}

private final class HighPriorityWorkerThread extends WorkerThread {
private Procedure<TEnvironment> procedure;

public HighPriorityWorkerThread(ThreadGroup group, Procedure<TEnvironment> proc) {
super(group, "HighPriorityPEWorker-");
this.procedure = proc;
}

@Override
protected boolean keepAlive(long lastUpdate) {
return false;
}

@Override
protected Procedure<TEnvironment> getProcedure() {
return procedure;
}
}

// ----------------------------------------------------------------------------
// TODO-MAYBE: Should we provide a InlineChore to notify the store with the
// full set of procedures pending and completed to write a compacted
Expand All @@ -2051,7 +2075,7 @@ protected boolean keepAlive(long lastUpdate) {
private final class WorkerMonitor extends InlineChore {
public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
"hbase.procedure.worker.monitor.interval.msec";
private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 1000; // 1sec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mean we will get logging from procedure in the Master log more frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
"hbase.procedure.worker.stuck.threshold.msec";
Expand All @@ -2071,13 +2095,36 @@ public WorkerMonitor() {

@Override
public void run() {
// accelerate high priority procedure.
accelerateHighPriority();

final int stuckCount = checkForStuckWorkers();
checkThreadCount(stuckCount);

// refresh interval (poor man dynamic conf update)
refreshConfig();
}

private void accelerateHighPriority() {
if (!scheduler.hasRunnables()) {
return;
}
while (true) {
// Poll a high priority procedure and execute it intermediately
Procedure highPriorityProcedure = scheduler.pollHighPriority(1, TimeUnit.NANOSECONDS);
if (highPriorityProcedure != null) {
final HighPriorityWorkerThread worker =
new HighPriorityWorkerThread(threadGroup, highPriorityProcedure);
workerThreads.add(worker);
worker.start();
LOG.info("Added new HighPriority worker thread {} for highPriorityProcedure {}", worker,
highPriorityProcedure);
} else {
return;
}
}
}

private int checkForStuckWorkers() {
// check if any of the worker is stuck
int stuckCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ public interface ProcedureScheduler {
*/
boolean hasRunnables();

/**
* Fetch one high priority Procedure from the queue
* @return the Procedure to execute, or null if nothing present.
*/
Procedure pollHighPriority();

/**
* Fetch one high priority Procedure from the queue
* @param timeout how long to wait before giving up, in units of unit
* @param unit a TimeUnit determining how to interpret the timeout parameter
* @return the Procedure to execute, or null if nothing present.
*/
Procedure pollHighPriority(long timeout, TimeUnit unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is high priority? We have priority on rpcs with a gradient where 200 is high priority. Here there only gradient is high or not high? And only the assign of meta is high priority?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high priority procedure is meta procedure/ServerCrashProcedure(carry meta)


/**
* Fetch one Procedure from the queue
* @return the Procedure to execute, or null if nothing present.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected void enqueue(final Procedure procedure, final boolean addFront) {
}

@Override
protected Procedure dequeue() {
protected Procedure dequeue(boolean highPriority) {
return runnables.poll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
(n, k) -> n.compareKey((String) k);
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private static final AvlKeyComparator<ServerQueue> SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((ServerName) k);

private final FairQueue<ServerName> serverHighPriorityRunQueue = new FairQueue<>();
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();

private final ServerQueue[] serverHighPriorityBuckets = new ServerQueue[4];
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
Expand All @@ -135,7 +139,11 @@ protected void enqueue(final Procedure proc, final boolean addFront) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
if (spi.hasMetaTableRegion()) {
doAdd(serverHighPriorityRunQueue, getServerQueue(proc), proc, addFront);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make it so all procedures associated with this server are 'high' priority?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, only the server procedure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there is a problem that, the sub procedures will inherit some resources from the parent procedure, so changing the logic here is easy to introduce dead lock...

} else {
doAdd(serverRunQueue, getServerQueue(proc), proc, addFront);
}
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
Expand Down Expand Up @@ -173,25 +181,30 @@ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,

@Override
protected boolean queueHasRunnables() {
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || serverRunQueue
.hasRunnables() || peerRunQueue.hasRunnables() || serverHighPriorityRunQueue.hasRunnables();
}

@Override
protected Procedure dequeue() {
protected Procedure dequeue(boolean highPriority) {
// meta procedure is always the first priority
Procedure<?> pollResult = doPoll(metaRunQueue);
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
if (pollResult == null) {
pollResult = doPoll(serverRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
pollResult = doPoll(serverHighPriorityRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
if (!highPriority) {
if (pollResult == null) {
pollResult = doPoll(serverRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
}
return pollResult;
}
Expand Down Expand Up @@ -269,6 +282,11 @@ private void clearQueue() {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
for (int i = 0; i < serverHighPriorityBuckets.length; ++i) {
clear(serverHighPriorityBuckets[i], serverHighPriorityRunQueue,
SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR);
serverHighPriorityBuckets[i] = null;
}

// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
Expand Down Expand Up @@ -307,6 +325,9 @@ protected int queueSize() {
for (ServerQueue serverMap : serverBuckets) {
count += queueSize(serverMap);
}
for (ServerQueue serverMap : serverHighPriorityBuckets) {
count += queueSize(serverMap);
}
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
Expand Down Expand Up @@ -338,7 +359,7 @@ public void completionCleanup(final Procedure proc) {
} else if (proc instanceof PeerProcedureInterface) {
tryCleanupPeerQueue(getPeerId(proc), proc);
} else if (proc instanceof ServerProcedureInterface) {
tryCleanupServerQueue(getServerName(proc), proc);
tryCleanupServerQueue(proc);
} else {
// No cleanup for other procedure types, yet.
return;
Expand Down Expand Up @@ -391,12 +412,28 @@ private static TableName getTableName(Procedure<?> proc) {
return ((TableProcedureInterface)proc).getTableName();
}

private ServerQueue getServerQueue(Procedure<?> proc) {
if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
if (spi.hasMetaTableRegion()) {
return getServerQueue(serverHighPriorityBuckets, SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR,
spi.getServerName(), spi);
} else {
return getServerQueue(serverBuckets, SERVER_QUEUE_KEY_COMPARATOR, spi.getServerName(), spi);
}
} else {
return null;
}
}

// ============================================================================
// Server Queue Lookup Helpers
// ============================================================================
private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
private ServerQueue getServerQueue(ServerQueue[] serverBuckets,
AvlKeyComparator<ServerQueue> keyComparator, ServerName serverName,
ServerProcedureInterface proc) {
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, keyComparator);
if (node != null) {
return node;
}
Expand All @@ -411,18 +448,32 @@ private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterfa
return node;
}

private void removeServerQueue(ServerName serverName) {
private void removeServerQueue(ServerQueue[] serverBuckets,
AvlKeyComparator<ServerQueue> keyComparator, ServerName serverName) {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
serverBuckets[index] =
AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[index] = AvlTree.remove(serverBuckets[index], serverName, keyComparator);
locking.removeServerLock(serverName);
}

private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
private void tryCleanupServerQueue(Procedure<?> proc) {
ServerName serverName = getServerName(proc);
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
if (spi.hasMetaTableRegion()) {
tryCleanupServerQueue(this.serverHighPriorityBuckets, this.serverHighPriorityRunQueue,
SERVER_HIGHPRIORITY_QUEUE_KEY_COMPARATOR, serverName, proc);
} else {
tryCleanupServerQueue(this.serverBuckets, this.serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR,
serverName, proc);
}
}

private void tryCleanupServerQueue(ServerQueue[] serverBuckets,
FairQueue<ServerName> serverRunQueue, AvlKeyComparator<ServerQueue> keyComparator,
ServerName serverName, Procedure<?> proc) {
schedLock();
try {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, keyComparator);
if (node == null) {
return;
}
Expand All @@ -431,7 +482,7 @@ private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
removeFromRunQueue(serverRunQueue, node,
() -> "clean up server queue after " + proc + " completed");
removeServerQueue(serverName);
removeServerQueue(serverBuckets, keyComparator, serverName);
}
} finally {
schedUnlock();
Expand Down Expand Up @@ -873,13 +924,14 @@ public boolean waitServerExclusiveLock(final Procedure<?> procedure,
try {
final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) {
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
removeFromRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null),
() -> procedure + " held exclusive lock");
ServerProcedureInterface spi = (ServerProcedureInterface) procedure;
if (spi.hasMetaTableRegion()) {
removeFromRunQueue(serverHighPriorityRunQueue, getServerQueue(procedure),
() -> procedure + " held exclusive lock");
} else {
removeFromRunQueue(serverRunQueue, getServerQueue(procedure),
() -> procedure + " held exclusive lock");
}
return false;
}
waitProcedure(lock, procedure);
Expand All @@ -902,12 +954,14 @@ public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerNa
final LockAndQueue lock = locking.getServerLock(serverName);
// Only SCP will acquire/release server lock so do not need to check the return value here.
lock.releaseExclusiveLock(procedure);
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
addToRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null), () -> procedure + " released exclusive lock");
ServerProcedureInterface spi = (ServerProcedureInterface) procedure;
if (spi.hasMetaTableRegion()) {
addToRunQueue(serverHighPriorityRunQueue, getServerQueue(procedure),
() -> procedure + " released exclusive lock");
} else {
addToRunQueue(serverRunQueue, getServerQueue(procedure),
() -> procedure + " released exclusive lock");
}
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
Expand Down
Loading