diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java new file mode 100644 index 000000000000..beb87513c408 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is subclass of {@link FastPathRWQueueRpcExecutor}, which has a better utility under various + * kinds of workloads. + */ +@InterfaceAudience.Private +public class AdaptiveFastPathRWQueueRpcExecutor extends FastPathRWQueueRpcExecutor { + private static final Logger LOG = LoggerFactory.getLogger(FastPathRWQueueRpcExecutor.class); + + private static final String FASTPATH_ADAPTIVE_RATIO = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio"; + private static final float FASTPATH_ADAPTIVE_DEFAULT = 0; + private static final String FASTPATH_ADAPTIVE_RADIO_WRITE = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio.write"; + private static final String FASTPATH_ADAPTIVE_RADIO_READ = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio.read"; + private static final String FASTPATH_ADAPTIVE_RADIO_SCAN = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio.scan"; + + private final int writeSharedHandlers; + private final int readSharedHandlers; + private final int scanSharedHandlers; + + protected final Deque sharedHandlerStack = new ConcurrentLinkedDeque<>(); + + public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, + PriorityFunction priority, Configuration conf, Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + float adaptiveRatio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + if (!checkAdaptiveRatioRationality(conf)) { + LOG.warn("The adaptive ratio should be in (0.0, 1.0) but get " + adaptiveRatio + + " using the default ratio: " + FASTPATH_ADAPTIVE_DEFAULT); + adaptiveRatio = FASTPATH_ADAPTIVE_DEFAULT; + } + + float writeRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_WRITE, FASTPATH_ADAPTIVE_DEFAULT); + float readRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_READ, FASTPATH_ADAPTIVE_DEFAULT); + float scanRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_SCAN, FASTPATH_ADAPTIVE_DEFAULT); + + writeSharedHandlers = checkRatioRationality(writeRatio) + ? (int) (writeRatio * writeHandlersCount) + : (int) (adaptiveRatio * writeHandlersCount); + readSharedHandlers = checkRatioRationality(readRatio) + ? (int) (readRatio * readHandlersCount) + : (int) (adaptiveRatio * readHandlersCount); + scanSharedHandlers = checkRatioRationality(scanRatio) + ? (int) (scanRatio * scanHandlersCount) + : (int) (adaptiveRatio * scanHandlersCount); + } + + @Override + public boolean dispatch(CallRunner callTask) { + RpcCall call = callTask.getRpcCall(); + boolean isWriteRequest = isWriteRequest(call.getHeader(), call.getParam()); + boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); + FastPathRpcHandler handler = isWriteRequest ? writeHandlerStack.poll() + : shouldDispatchToScanQueue ? scanHandlerStack.poll() + : readHandlerStack.poll(); + if (handler == null) { + handler = sharedHandlerStack.poll(); + } + + return handler != null + ? handler.loadCallRunner(callTask) + : dispatchTo(isWriteRequest, shouldDispatchToScanQueue, callTask); + } + + @Override + protected void startHandlers(final int port) { + startHandlers(".shared_write", writeSharedHandlers, queues, 0, numWriteQueues, port, + activeWriteHandlerCount); + + startHandlers(".write", writeHandlersCount - writeSharedHandlers, queues, 0, numWriteQueues, + port, activeWriteHandlerCount); + + startHandlers(".shared_read", readSharedHandlers, queues, numWriteQueues, numReadQueues, port, + activeReadHandlerCount); + startHandlers(".read", readHandlersCount - readSharedHandlers, queues, numWriteQueues, + numReadQueues, port, activeReadHandlerCount); + + if (numScanQueues > 0) { + startHandlers(".shared_scan", scanSharedHandlers, queues, numWriteQueues + numReadQueues, + numScanQueues, port, activeScanHandlerCount); + startHandlers(".scan", scanHandlersCount - scanSharedHandlers, queues, + numWriteQueues + numReadQueues, numScanQueues, port, activeScanHandlerCount); + } + } + + @Override + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + Deque handlerStack = name.contains("shared") ? sharedHandlerStack + : name.contains("read") ? readHandlerStack + : name.contains("write") ? writeHandlerStack + : scanHandlerStack; + return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, + activeHandlerCount, failedHandlerCount, abortable, handlerStack); + } + + int getWriteStackLength() { + return writeHandlerStack.size(); + } + + int getReadStackLength() { + return readHandlerStack.size(); + } + + int getScanStackLength() { + return scanHandlerStack.size(); + } + + int getSharedStackLength() { + return sharedHandlerStack.size(); + } + + static boolean checkAdaptiveRatioRationality(Configuration conf) { + float ratio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + return checkRatioRationality(ratio); + } + + private static boolean checkRatioRationality(float ratio) { + return !(ratio <= 0) && !(ratio >= 1.0f); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java index 63436e1dd4ab..05b2219faae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -35,9 +35,9 @@ @InterfaceStability.Evolving public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { - private final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); - private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); - private final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); + protected final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); + protected final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); + protected final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 298a9fc3aeb2..d2be4375157b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -55,16 +55,16 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final QueueBalancer writeBalancer; private final QueueBalancer readBalancer; private final QueueBalancer scanBalancer; - private final int writeHandlersCount; - private final int readHandlersCount; - private final int scanHandlersCount; - private final int numWriteQueues; - private final int numReadQueues; - private final int numScanQueues; - - private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0); - private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); - private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0); + protected final int writeHandlersCount; + protected final int readHandlersCount; + protected final int scanHandlersCount; + protected final int numWriteQueues; + protected final int numReadQueues; + protected final int numScanQueues; + + protected final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0); + protected final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); + protected final AtomicInteger activeScanHandlerCount = new AtomicInteger(0); public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 92b387570317..8d7826423f9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -87,8 +87,11 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand if (callqReadShare > 0) { // at least 1 read handler and 1 write handler - callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server); + callExecutor = AdaptiveFastPathRWQueueRpcExecutor.checkAdaptiveRatioRationality(conf) + ? new AdaptiveFastPathRWQueueRpcExecutor("default.AFPRWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server) + : new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), maxQueueLength, + priority, conf, server); } else { if ( RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java new file mode 100644 index 000000000000..1e9807bd2433 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.Semaphore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +@Category(SmallTests.class) +public class TestAdaptiveFastPathRWQueueRpcExecutor { + + private static final Semaphore blocker = new Semaphore(0); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAdaptiveFastPathRWQueueRpcExecutor.class); + + @Test + public void testInitialization() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.1f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testInitialization", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + // When the adaptive ratio is 0.1, then the shared stack should contain + // 25 * 0.1 + 25 * 0.1 + 50 * 0.1 = 9 handlers. + Assert.assertEquals(23, executor.getReadStackLength()); + Assert.assertEquals(23, executor.getScanStackLength()); + Assert.assertEquals(45, executor.getWriteStackLength()); + Assert.assertEquals(9, executor.getSharedStackLength()); + } + + @Test + public void testInvalidRatio() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", -0.5f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testInvalidRatio", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + // If the adaptive ratio is invalid, we just use the default ratio 0, with which the shared + // stack should be empty. + Assert.assertEquals(25, executor.getReadStackLength()); + Assert.assertEquals(25, executor.getScanStackLength()); + Assert.assertEquals(50, executor.getWriteStackLength()); + Assert.assertEquals(0, executor.getSharedStackLength()); + } + + @Test + public void testCustomRatio() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.2f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testCustomRatio", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + // When the adaptive ratio is 0.2, then the shared stack should contain + // 25 * 0.2 + 25 * 0.2 + 50 * 0.2 = 20 handlers. + Assert.assertEquals(20, executor.getReadStackLength()); + Assert.assertEquals(20, executor.getScanStackLength()); + Assert.assertEquals(40, executor.getWriteStackLength()); + Assert.assertEquals(20, executor.getSharedStackLength()); + } + + @Test + public void testActiveHandlerCount() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.2f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testInitialization", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + RpcCall call = Mockito.mock(RpcCall.class); + ClientProtos.ScanRequest scanRequest = + ClientProtos.ScanRequest.newBuilder().getDefaultInstanceForType(); + + Mockito.when(call.getParam()).thenReturn(scanRequest); + + // When the adaptive ratio is 0.2, then the shared stack should contain + // 25 * 0.2 + 25 * 0.2 + 50 * 0.2 = 20 handlers. + // We send 30 CallRunner with mocked scan requests here. + + for (int i = 0; i < 30; i++) { + CallRunner temp = new DummyCallRunner(null, call); + executor.dispatch(temp); + } + // Wait for the dummy CallRunner being executed. + Thread.sleep(2000); + + Assert.assertEquals(0, executor.getScanStackLength()); + Assert.assertEquals(10, executor.getSharedStackLength()); + Assert.assertEquals(40, executor.getWriteStackLength()); + Assert.assertEquals(20, executor.getReadStackLength()); + Assert.assertEquals(30, executor.getActiveScanHandlerCount() + + executor.getActiveWriteHandlerCount() + executor.getActiveReadHandlerCount()); + } + + static class DummyCallRunner extends CallRunner { + + DummyCallRunner(RpcServerInterface rpcServer, RpcCall call) { + super(rpcServer, call); + } + + @Override + public void run() { + try { + blocker.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +}