diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index a060f14ccf9a..03702e6f64bb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -310,8 +310,14 @@ public TimeoutExecutorThread() { @Override public void run() { while (running.get()) { - final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); + final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, + 20, TimeUnit.SECONDS); if (task == null || task == DelayedUtil.DELAYED_POISON) { + if (task == null && queue.size() > 0) { + LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting" + + " elapsed. If this is repeated consistently, it means no element is getting expired" + + " from the queue and it might freeze the system. Queue: {}", queue); + } // the executor may be shutting down, and the task is just the shutdown request continue; } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 1e796d9ba5a7..fc917b6f36ed 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure2; import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; import org.apache.yetus.audience.InterfaceAudience; @@ -52,7 +53,8 @@ public void sendStopSignal() { @Override public void run() { while (executor.isRunning()) { - final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); + final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20, + TimeUnit.SECONDS); if (task == null || task == DelayedUtil.DELAYED_POISON) { // the executor may be shutting down, // and the task is just the shutdown request diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java index 4d3ebd91b449..fa796ae97426 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java @@ -77,9 +77,10 @@ public String toString() { /** * @return null (if an interrupt) or an instance of E; resets interrupt on calling thread. */ - public static E takeWithoutInterrupt(final DelayQueue queue) { + public static E takeWithoutInterrupt(final DelayQueue queue, + final long timeout, final TimeUnit timeUnit) { try { - return queue.take(); + return queue.poll(timeout, timeUnit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null;