From df8a2ea3346876a3a80234b48fe19c2d782b5ec7 Mon Sep 17 00:00:00 2001 From: Sean Xiao Yutong Date: Wed, 5 Jul 2023 17:30:28 +0800 Subject: [PATCH 1/5] HBASE-27962 Introduce an AdaptiveFastPathRWRpcExecutor to make the W/R/S separations fit various workloads --- .../AdaptiveFastPathRWQueueRpcExecutor.java | 132 +++++++++++++++ .../hbase/ipc/FastPathRWQueueRpcExecutor.java | 6 +- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 20 +-- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 7 +- ...estAdaptiveFastPathRWQueueRpcExecutor.java | 157 ++++++++++++++++++ 5 files changed, 307 insertions(+), 15 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java new file mode 100644 index 000000000000..301580909d00 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is subclass of {@link FastPathRWQueueRpcExecutor}, which has a better utility under various + * kinds of workloads. + */ +@InterfaceAudience.Private +public class AdaptiveFastPathRWQueueRpcExecutor extends FastPathRWQueueRpcExecutor { + private static final Logger LOG = LoggerFactory.getLogger(FastPathRWQueueRpcExecutor.class); + + private static final String FASTPATH_ADAPTIVE_RATIO = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio"; + private static final float FASTPATH_ADAPTIVE_DEFAULT = 0; + + private final int writeSharedHandlers; + private final int readSharedHandlers; + private final int scanSharedHandlers; + + protected final Deque sharedHandlerStack = new ConcurrentLinkedDeque<>(); + + public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, + PriorityFunction priority, Configuration conf, Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + float slackRatio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + if (!checkAdaptiveRatioRationality(conf)) { + LOG.warn("The slackRatio should be in (0.0, 1.0) but get " + slackRatio + + " using the default ratio: " + FASTPATH_ADAPTIVE_DEFAULT); + slackRatio = FASTPATH_ADAPTIVE_DEFAULT; + } + + writeSharedHandlers = (int) (slackRatio * writeHandlersCount); + readSharedHandlers = (int) (slackRatio * readHandlersCount); + scanSharedHandlers = (int) (slackRatio * scanHandlersCount); + } + + @Override + public boolean dispatch(CallRunner callTask) { + RpcCall call = callTask.getRpcCall(); + boolean isWriteRequest = isWriteRequest(call.getHeader(), call.getParam()); + boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); + FastPathRpcHandler handler = isWriteRequest ? writeHandlerStack.poll() : + shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll(); + if (handler == null) { + handler = sharedHandlerStack.poll(); + } + + return handler != null ? handler.loadCallRunner(callTask) : + dispatchTo(isWriteRequest, shouldDispatchToScanQueue, callTask); + } + + @Override + protected void startHandlers(final int port) { + startHandlers(".shared_write", writeSharedHandlers, queues, 0, numWriteQueues, port, + activeWriteHandlerCount); + + startHandlers(".write", writeHandlersCount - writeSharedHandlers, queues, 0, numWriteQueues, + port, activeWriteHandlerCount); + + startHandlers(".shared_read", readSharedHandlers, queues, numWriteQueues, numReadQueues, port, + activeReadHandlerCount); + startHandlers(".read", readHandlersCount - readSharedHandlers, queues, numWriteQueues, + numReadQueues, port, activeReadHandlerCount); + + if (numScanQueues > 0) { + startHandlers(".shared_scan", scanSharedHandlers, queues, numWriteQueues + numReadQueues, + numScanQueues, port, activeScanHandlerCount); + startHandlers(".scan", scanHandlersCount - scanSharedHandlers, queues, + numWriteQueues + numReadQueues, numScanQueues, port, activeScanHandlerCount); + } + } + + @Override + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + Deque handlerStack = + name.contains("shared") ? sharedHandlerStack : + name.contains("read") ? readHandlerStack : + name.contains("write") ? writeHandlerStack : scanHandlerStack; + return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, + activeHandlerCount, failedHandlerCount, abortable, handlerStack); + } + + int getWriteStackLength() { + return writeHandlerStack.size(); + } + + int getReadStackLength() { + return readHandlerStack.size(); + } + + int getScanStackLength() { + return scanHandlerStack.size(); + } + + int getSharedStackLength() { + return sharedHandlerStack.size(); + } + + static boolean checkAdaptiveRatioRationality(Configuration conf) { + float ratio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + return !(ratio <= 0) && !(ratio >= 1.0f); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java index 63436e1dd4ab..05b2219faae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -35,9 +35,9 @@ @InterfaceStability.Evolving public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { - private final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); - private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); - private final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); + protected final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); + protected final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); + protected final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 298a9fc3aeb2..d2be4375157b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -55,16 +55,16 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final QueueBalancer writeBalancer; private final QueueBalancer readBalancer; private final QueueBalancer scanBalancer; - private final int writeHandlersCount; - private final int readHandlersCount; - private final int scanHandlersCount; - private final int numWriteQueues; - private final int numReadQueues; - private final int numScanQueues; - - private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0); - private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); - private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0); + protected final int writeHandlersCount; + protected final int readHandlersCount; + protected final int scanHandlersCount; + protected final int numWriteQueues; + protected final int numReadQueues; + protected final int numScanQueues; + + protected final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0); + protected final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); + protected final AtomicInteger activeScanHandlerCount = new AtomicInteger(0); public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 92b387570317..453ba31a41d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -87,8 +87,11 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand if (callqReadShare > 0) { // at least 1 read handler and 1 write handler - callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server); + callExecutor = AdaptiveFastPathRWQueueRpcExecutor.checkAdaptiveRatioRationality(conf) ? + new AdaptiveFastPathRWQueueRpcExecutor("default.AFPRWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server) : + new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server); } else { if ( RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java new file mode 100644 index 000000000000..1e9807bd2433 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAdaptiveFastPathRWQueueRpcExecutor.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.Semaphore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +@Category(SmallTests.class) +public class TestAdaptiveFastPathRWQueueRpcExecutor { + + private static final Semaphore blocker = new Semaphore(0); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAdaptiveFastPathRWQueueRpcExecutor.class); + + @Test + public void testInitialization() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.1f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testInitialization", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + // When the adaptive ratio is 0.1, then the shared stack should contain + // 25 * 0.1 + 25 * 0.1 + 50 * 0.1 = 9 handlers. + Assert.assertEquals(23, executor.getReadStackLength()); + Assert.assertEquals(23, executor.getScanStackLength()); + Assert.assertEquals(45, executor.getWriteStackLength()); + Assert.assertEquals(9, executor.getSharedStackLength()); + } + + @Test + public void testInvalidRatio() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", -0.5f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testInvalidRatio", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + // If the adaptive ratio is invalid, we just use the default ratio 0, with which the shared + // stack should be empty. + Assert.assertEquals(25, executor.getReadStackLength()); + Assert.assertEquals(25, executor.getScanStackLength()); + Assert.assertEquals(50, executor.getWriteStackLength()); + Assert.assertEquals(0, executor.getSharedStackLength()); + } + + @Test + public void testCustomRatio() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.2f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testCustomRatio", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + // When the adaptive ratio is 0.2, then the shared stack should contain + // 25 * 0.2 + 25 * 0.2 + 50 * 0.2 = 20 handlers. + Assert.assertEquals(20, executor.getReadStackLength()); + Assert.assertEquals(20, executor.getScanStackLength()); + Assert.assertEquals(40, executor.getWriteStackLength()); + Assert.assertEquals(20, executor.getSharedStackLength()); + } + + @Test + public void testActiveHandlerCount() throws InterruptedException { + Configuration conf = new Configuration(); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.5f); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 0.1f); + conf.setFloat("hbase.ipc.server.callqueue.fastpath.adaptive.ratio", 0.2f); + + AdaptiveFastPathRWQueueRpcExecutor executor = + new AdaptiveFastPathRWQueueRpcExecutor("testInitialization", 100, 250, null, conf, null); + executor.start(0); + Thread.sleep(1000); + + RpcCall call = Mockito.mock(RpcCall.class); + ClientProtos.ScanRequest scanRequest = + ClientProtos.ScanRequest.newBuilder().getDefaultInstanceForType(); + + Mockito.when(call.getParam()).thenReturn(scanRequest); + + // When the adaptive ratio is 0.2, then the shared stack should contain + // 25 * 0.2 + 25 * 0.2 + 50 * 0.2 = 20 handlers. + // We send 30 CallRunner with mocked scan requests here. + + for (int i = 0; i < 30; i++) { + CallRunner temp = new DummyCallRunner(null, call); + executor.dispatch(temp); + } + // Wait for the dummy CallRunner being executed. + Thread.sleep(2000); + + Assert.assertEquals(0, executor.getScanStackLength()); + Assert.assertEquals(10, executor.getSharedStackLength()); + Assert.assertEquals(40, executor.getWriteStackLength()); + Assert.assertEquals(20, executor.getReadStackLength()); + Assert.assertEquals(30, executor.getActiveScanHandlerCount() + + executor.getActiveWriteHandlerCount() + executor.getActiveReadHandlerCount()); + } + + static class DummyCallRunner extends CallRunner { + + DummyCallRunner(RpcServerInterface rpcServer, RpcCall call) { + super(rpcServer, call); + } + + @Override + public void run() { + try { + blocker.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} From 8e74efc5c44d2c64d0d37273b187eac82168e967 Mon Sep 17 00:00:00 2001 From: Sean Xiao Yutong Date: Thu, 6 Jul 2023 10:36:26 +0800 Subject: [PATCH 2/5] Run spotless --- .../AdaptiveFastPathRWQueueRpcExecutor.java | 22 ++++++++++--------- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 10 ++++----- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java index 301580909d00..f2f729db153f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java @@ -50,8 +50,8 @@ public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int max super(name, handlerCount, maxQueueLength, priority, conf, abortable); float slackRatio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); if (!checkAdaptiveRatioRationality(conf)) { - LOG.warn("The slackRatio should be in (0.0, 1.0) but get " + slackRatio + - " using the default ratio: " + FASTPATH_ADAPTIVE_DEFAULT); + LOG.warn("The slackRatio should be in (0.0, 1.0) but get " + slackRatio + + " using the default ratio: " + FASTPATH_ADAPTIVE_DEFAULT); slackRatio = FASTPATH_ADAPTIVE_DEFAULT; } @@ -65,14 +65,16 @@ public boolean dispatch(CallRunner callTask) { RpcCall call = callTask.getRpcCall(); boolean isWriteRequest = isWriteRequest(call.getHeader(), call.getParam()); boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); - FastPathRpcHandler handler = isWriteRequest ? writeHandlerStack.poll() : - shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll(); + FastPathRpcHandler handler = isWriteRequest ? writeHandlerStack.poll() + : shouldDispatchToScanQueue ? scanHandlerStack.poll() + : readHandlerStack.poll(); if (handler == null) { handler = sharedHandlerStack.poll(); } - return handler != null ? handler.loadCallRunner(callTask) : - dispatchTo(isWriteRequest, shouldDispatchToScanQueue, callTask); + return handler != null + ? handler.loadCallRunner(callTask) + : dispatchTo(isWriteRequest, shouldDispatchToScanQueue, callTask); } @Override @@ -101,10 +103,10 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh final int handlerCount, final BlockingQueue q, final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, final Abortable abortable) { - Deque handlerStack = - name.contains("shared") ? sharedHandlerStack : - name.contains("read") ? readHandlerStack : - name.contains("write") ? writeHandlerStack : scanHandlerStack; + Deque handlerStack = name.contains("shared") ? sharedHandlerStack + : name.contains("read") ? readHandlerStack + : name.contains("write") ? writeHandlerStack + : scanHandlerStack; return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount, abortable, handlerStack); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 453ba31a41d5..8d7826423f9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -87,11 +87,11 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand if (callqReadShare > 0) { // at least 1 read handler and 1 write handler - callExecutor = AdaptiveFastPathRWQueueRpcExecutor.checkAdaptiveRatioRationality(conf) ? - new AdaptiveFastPathRWQueueRpcExecutor("default.AFPRWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server) : - new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server); + callExecutor = AdaptiveFastPathRWQueueRpcExecutor.checkAdaptiveRatioRationality(conf) + ? new AdaptiveFastPathRWQueueRpcExecutor("default.AFPRWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server) + : new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), maxQueueLength, + priority, conf, server); } else { if ( RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) From f325a23f3aca3d0548fef76d0c05ef692a0a7493 Mon Sep 17 00:00:00 2001 From: Sean Xiao Yutong Date: Thu, 6 Jul 2023 10:41:53 +0800 Subject: [PATCH 3/5] Refine the variable name --- .../ipc/AdaptiveFastPathRWQueueRpcExecutor.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java index f2f729db153f..e6f967d80f13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java @@ -48,16 +48,16 @@ public class AdaptiveFastPathRWQueueRpcExecutor extends FastPathRWQueueRpcExecut public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) { super(name, handlerCount, maxQueueLength, priority, conf, abortable); - float slackRatio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + float adaptiveRatio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); if (!checkAdaptiveRatioRationality(conf)) { - LOG.warn("The slackRatio should be in (0.0, 1.0) but get " + slackRatio + LOG.warn("The adaptive ratio should be in (0.0, 1.0) but get " + adaptiveRatio + " using the default ratio: " + FASTPATH_ADAPTIVE_DEFAULT); - slackRatio = FASTPATH_ADAPTIVE_DEFAULT; + adaptiveRatio = FASTPATH_ADAPTIVE_DEFAULT; } - writeSharedHandlers = (int) (slackRatio * writeHandlersCount); - readSharedHandlers = (int) (slackRatio * readHandlersCount); - scanSharedHandlers = (int) (slackRatio * scanHandlersCount); + writeSharedHandlers = (int) (adaptiveRatio * writeHandlersCount); + readSharedHandlers = (int) (adaptiveRatio * readHandlersCount); + scanSharedHandlers = (int) (adaptiveRatio * scanHandlersCount); } @Override From e5980f37274e05314675bcdb62d1dff35d477760 Mon Sep 17 00:00:00 2001 From: Sean Xiao Yutong Date: Mon, 17 Jul 2023 09:59:34 +0800 Subject: [PATCH 4/5] Added three sub properties to make the adaptive ratio setting more flexible --- .../AdaptiveFastPathRWQueueRpcExecutor.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java index e6f967d80f13..0e4e9fe730a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java @@ -38,6 +38,12 @@ public class AdaptiveFastPathRWQueueRpcExecutor extends FastPathRWQueueRpcExecut private static final String FASTPATH_ADAPTIVE_RATIO = "hbase.ipc.server.callqueue.fastpath.adaptive.ratio"; private static final float FASTPATH_ADAPTIVE_DEFAULT = 0; + private static final String FASTPATH_ADAPTIVE_RADIO_WRITE = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio.write"; + private static final String FASTPATH_ADAPTIVE_RADIO_READ = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio.read"; + private static final String FASTPATH_ADAPTIVE_RADIO_SCAN = + "hbase.ipc.server.callqueue.fastpath.adaptive.ratio.scan"; private final int writeSharedHandlers; private final int readSharedHandlers; @@ -55,9 +61,19 @@ public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int max adaptiveRatio = FASTPATH_ADAPTIVE_DEFAULT; } - writeSharedHandlers = (int) (adaptiveRatio * writeHandlersCount); - readSharedHandlers = (int) (adaptiveRatio * readHandlersCount); - scanSharedHandlers = (int) (adaptiveRatio * scanHandlersCount); + float writeRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_WRITE, FASTPATH_ADAPTIVE_DEFAULT); + float readRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_READ, FASTPATH_ADAPTIVE_DEFAULT); + float scanRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_SCAN, FASTPATH_ADAPTIVE_DEFAULT); + + writeSharedHandlers = checkRatioRationality(conf, FASTPATH_ADAPTIVE_RADIO_WRITE) ? + (int) (writeRatio * writeHandlersCount) : + (int) (adaptiveRatio * writeHandlersCount); + readSharedHandlers = checkRatioRationality(conf, FASTPATH_ADAPTIVE_RADIO_READ) ? + (int) (readRatio * readHandlersCount) : + (int) (adaptiveRatio * readHandlersCount); + scanSharedHandlers = checkRatioRationality(conf, FASTPATH_ADAPTIVE_RADIO_SCAN) ? + (int) (scanRatio * scanHandlersCount) : + (int) (adaptiveRatio * scanHandlersCount); } @Override @@ -128,7 +144,11 @@ int getSharedStackLength() { } static boolean checkAdaptiveRatioRationality(Configuration conf) { - float ratio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + return checkRatioRationality(conf, FASTPATH_ADAPTIVE_RATIO); + } + + private static boolean checkRatioRationality(Configuration conf, String propertyName) { + float ratio = conf.getFloat(propertyName, FASTPATH_ADAPTIVE_DEFAULT); return !(ratio <= 0) && !(ratio >= 1.0f); } } From a14fa48f77428a1e9df547951be29dd5d5cc859f Mon Sep 17 00:00:00 2001 From: Sean Xiao Yutong Date: Mon, 17 Jul 2023 11:37:49 +0800 Subject: [PATCH 5/5] Run spotless apply --- .../AdaptiveFastPathRWQueueRpcExecutor.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java index 0e4e9fe730a5..beb87513c408 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveFastPathRWQueueRpcExecutor.java @@ -65,15 +65,15 @@ public AdaptiveFastPathRWQueueRpcExecutor(String name, int handlerCount, int max float readRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_READ, FASTPATH_ADAPTIVE_DEFAULT); float scanRatio = conf.getFloat(FASTPATH_ADAPTIVE_RADIO_SCAN, FASTPATH_ADAPTIVE_DEFAULT); - writeSharedHandlers = checkRatioRationality(conf, FASTPATH_ADAPTIVE_RADIO_WRITE) ? - (int) (writeRatio * writeHandlersCount) : - (int) (adaptiveRatio * writeHandlersCount); - readSharedHandlers = checkRatioRationality(conf, FASTPATH_ADAPTIVE_RADIO_READ) ? - (int) (readRatio * readHandlersCount) : - (int) (adaptiveRatio * readHandlersCount); - scanSharedHandlers = checkRatioRationality(conf, FASTPATH_ADAPTIVE_RADIO_SCAN) ? - (int) (scanRatio * scanHandlersCount) : - (int) (adaptiveRatio * scanHandlersCount); + writeSharedHandlers = checkRatioRationality(writeRatio) + ? (int) (writeRatio * writeHandlersCount) + : (int) (adaptiveRatio * writeHandlersCount); + readSharedHandlers = checkRatioRationality(readRatio) + ? (int) (readRatio * readHandlersCount) + : (int) (adaptiveRatio * readHandlersCount); + scanSharedHandlers = checkRatioRationality(scanRatio) + ? (int) (scanRatio * scanHandlersCount) + : (int) (adaptiveRatio * scanHandlersCount); } @Override @@ -144,11 +144,11 @@ int getSharedStackLength() { } static boolean checkAdaptiveRatioRationality(Configuration conf) { - return checkRatioRationality(conf, FASTPATH_ADAPTIVE_RATIO); + float ratio = conf.getFloat(FASTPATH_ADAPTIVE_RATIO, FASTPATH_ADAPTIVE_DEFAULT); + return checkRatioRationality(ratio); } - private static boolean checkRatioRationality(Configuration conf, String propertyName) { - float ratio = conf.getFloat(propertyName, FASTPATH_ADAPTIVE_DEFAULT); + private static boolean checkRatioRationality(float ratio) { return !(ratio <= 0) && !(ratio >= 1.0f); } }