diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index ad1747ba3b1f..8e5467478caf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -52,7 +52,7 @@ public BalancedQueueRpcExecutor(final String name, final int handlerCount, } @Override - public boolean dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) { int queueIndex = balancer.getNextQueue(callTask); BlockingQueue queue = queues.get(queueIndex); // that means we can overflow by at most size (5), that's ok diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index 5d9e07a8d117..9e6a0bb103a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,9 +20,7 @@ import java.util.Deque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.yetus.audience.InterfaceAudience; @@ -65,7 +63,7 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh } @Override - public boolean dispatch(CallRunner callTask) throws InterruptedException { + public boolean dispatch(CallRunner callTask) { //FastPathHandlers don't check queue limits, so if we're completely shut down //we have to prevent ourselves from using the handler in the first place if (currentQueueLimit == 0){ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java index c335b024c212..b07f44900fbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -1,5 +1,4 @@ -/** - +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,8 +26,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in @@ -37,7 +34,6 @@ @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { - private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class); private final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); @@ -60,7 +56,7 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh } @Override - public boolean dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) { RpcCall call = callTask.getRpcCall(); boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam()); boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index b2df2254130a..835966847a32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -132,7 +132,7 @@ protected void startHandlers(final int port) { } @Override - public boolean dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) { RpcCall call = callTask.getRpcCall(); return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()), shouldDispatchToScanQueue(callTask), callTask); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 1ef78af15e8d..97570066bc68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -29,20 +29,21 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; -import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; /** * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular @@ -53,14 +54,16 @@ public abstract class RpcExecutor { private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; - public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; + public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = + "hbase.ipc.server.callqueue.handler.factor"; - /** max delay in msec used to bound the deprioritized requests */ - public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay"; + /** max delay in msec used to bound the de-prioritized requests */ + public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = + "hbase.ipc.server.queue.max.call.delay"; /** * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority - * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced + * queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced * throughput. */ public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; @@ -70,14 +73,18 @@ public abstract class RpcExecutor { public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; - public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class"; + public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = + "hbase.ipc.server.callqueue.balancer.class"; public static final Class CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class; // These 3 are only used by Codel executor - public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay"; - public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval"; - public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold"; + public static final String CALL_QUEUE_CODEL_TARGET_DELAY = + "hbase.ipc.server.callqueue.codel.target.delay"; + public static final String CALL_QUEUE_CODEL_INTERVAL = + "hbase.ipc.server.callqueue.codel.interval"; + public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = + "hbase.ipc.server.callqueue.codel.lifo.threshold"; public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; @@ -88,16 +95,14 @@ public abstract class RpcExecutor { public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled"; - private LongAdder numGeneralCallsDropped = new LongAdder(); - private LongAdder numLifoModeSwitches = new LongAdder(); + private final LongAdder numGeneralCallsDropped = new LongAdder(); + private final LongAdder numLifoModeSwitches = new LongAdder(); protected final int numCallQueues; protected final List> queues; private final Class queueClass; private final Object[] queueInitArgs; - private final PriorityFunction priority; - protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); @@ -107,8 +112,8 @@ public abstract class RpcExecutor { private String name; - private Configuration conf = null; - private Abortable abortable = null; + private final Configuration conf; + private final Abortable abortable; public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { @@ -144,12 +149,10 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ this.handlerCount = Math.max(handlerCount, this.numCallQueues); this.handlers = new ArrayList<>(this.handlerCount); - this.priority = priority; - if (isDeadlineQueueType(callQueueType)) { this.name += ".Deadline"; this.queueInitArgs = new Object[] { maxQueueLength, - new CallPriorityComparator(conf, this.priority) }; + new CallPriorityComparator(conf, priority) }; this.queueClass = BoundedPriorityBlockingQueue.class; } else if (isCodelQueueType(callQueueType)) { this.name += ".Codel"; @@ -159,16 +162,17 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, - codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; + codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; this.queueClass = AdaptiveLifoCoDelCallQueue.class; } else if (isPluggableQueueType(callQueueType)) { - Optional>> pluggableQueueClass = getPluggableQueueClass(); + Optional>> pluggableQueueClass = + getPluggableQueueClass(); if (!pluggableQueueClass.isPresent()) { throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call" + " queue type required"); } else { - this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf }; + this.queueInitArgs = new Object[] { maxQueueLength, priority, conf }; this.queueClass = pluggableQueueClass.get(); } } else { @@ -186,50 +190,41 @@ protected int computeNumCallQueues(final int handlerCount, final float callQueue return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); } - public Map getCallQueueCountsSummary() { - HashMap callQueueMethodTotalCount = new HashMap<>(); - - for(BlockingQueue queue: queues) { - for (CallRunner cr:queue) { - RpcCall rpcCall = cr.getRpcCall(); - - String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { - method = "Unknown"; - } + /** + * Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown". + */ + private static String getMethodName(final CallRunner callRunner) { + return Optional.ofNullable(callRunner) + .map(CallRunner::getRpcCall) + .map(RpcCall::getMethod) + .map(Descriptors.MethodDescriptor::getName) + .orElse("Unknown"); + } - callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L)); - } - } + /** + * Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L. + */ + private static long getRpcCallSize(final CallRunner callRunner) { + return Optional.ofNullable(callRunner) + .map(CallRunner::getRpcCall) + .map(RpcCall::getSize) + .orElse(0L); + } - return callQueueMethodTotalCount; + public Map getCallQueueCountsSummary() { + return queues.stream() + .flatMap(Collection::stream) + .map(RpcExecutor::getMethodName) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); } public Map getCallQueueSizeSummary() { - HashMap callQueueMethodTotalSize = new HashMap<>(); - - for(BlockingQueue queue: queues) { - for (CallRunner cr:queue) { - RpcCall rpcCall = cr.getRpcCall(); - String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { - method = "Unknown"; - } - - long size = rpcCall.getSize(); - - callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L)); - } - } - - return callQueueMethodTotalSize; + return queues.stream() + .flatMap(Collection::stream) + .map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner))) + .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); } - protected void initializeQueues(final int numQueues) { if (queueInitArgs.length > 0) { currentQueueLimit = (int) queueInitArgs[0]; @@ -252,7 +247,7 @@ public void stop() { } /** Add the request to the executor queue */ - public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; + public abstract boolean dispatch(final CallRunner callTask); /** Returns the list of request queues */ protected List> getQueues() { @@ -298,26 +293,26 @@ protected void startHandlers(final String nameSuffix, final int numHandlers, handlers.size(), threadPrefix, qsize, port); } - public static QueueBalancer getBalancer(String executorName, Configuration conf, List> queues) { + /** + * All requests go to the first queue, at index 0 + */ + private static final QueueBalancer ONE_QUEUE = val -> 0; + + public static QueueBalancer getBalancer( + final String executorName, + final Configuration conf, + final List> queues + ) { Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); if (queues.size() == 1) { return ONE_QUEUE; } else { - Class balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); + Class balancerClass = conf.getClass( + CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues); } } - /** - * All requests go to the first queue, at index 0 - */ - private static QueueBalancer ONE_QUEUE = new QueueBalancer() { - @Override - public int getNextQueue(CallRunner callRunner) { - return 0; - } - }; - /** * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have @@ -455,7 +450,8 @@ public void resizeQueues(Configuration conf) { configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH; } } - currentQueueLimit = conf.getInt(configKey, currentQueueLimit); + final int queueLimit = currentQueueLimit; + currentQueueLimit = conf.getInt(configKey, queueLimit); } public void onConfigurationChange(Configuration conf) {