Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,14 @@ public TimeoutExecutorThread() {
@Override
public void run() {
while (running.get()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
20, TimeUnit.SECONDS);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
if (task == null && queue.size() > 0) {
LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting"
+ " elapsed. If this is repeated consistently, it means no element is getting expired"
+ " from the queue and it might freeze the system. Queue: {}", queue);
}
// the executor may be shutting down, and the task is just the shutdown request
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure2;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -52,7 +53,8 @@ public void sendStopSignal() {
@Override
public void run() {
while (executor.isRunning()) {
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 20,
TimeUnit.SECONDS);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
// the executor may be shutting down,
// and the task is just the shutdown request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public String toString() {
/**
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
*/
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue,
final long timeout, final TimeUnit timeUnit) {
try {
return queue.take();
return queue.poll(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand Down