Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;

/**
* Wraps some usages of netty's unsafe API, for ease of maintainability.
*/
@InterfaceAudience.Private
public final class NettyUnsafeUtils {

private NettyUnsafeUtils() {
}

/**
* Directly closes the channel, skipping any handlers in the pipeline
*/
public static void closeDirect(Channel channel) {
channel.unsafe().close(channel.voidPromise());
}

/**
* Get total bytes pending write to socket
*/
public static long getTotalPendingOutboundBytes(Channel channel) {
return channel.unsafe().outboundBuffer().totalPendingWriteBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String PROCESS_CALL_TIME_DESC = "Processing call time.";
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";

String UNWRITABLE_TIME_NAME = "unwritableTime";
String UNWRITABLE_TIME_DESC =
"Time where an channel was unwritable due to having too many outbound bytes";
String MAX_OUTBOUND_BYTES_EXCEEDED_NAME = "maxOutboundBytesExceeded";
String MAX_OUTBOUND_BYTES_EXCEEDED_DESC =
"Number of times a connection was closed because the channel outbound "
+ "bytes exceeded the configured max.";
String QUEUE_SIZE_NAME = "queueSize";
String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and "
+ "parsed and is waiting to run or is currently being executed.";
Expand Down Expand Up @@ -97,6 +105,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage";

String NETTY_DM_USAGE_DESC = "Current Netty direct memory usage.";
String NETTY_TOTAL_PENDING_OUTBOUND_NAME = "nettyTotalPendingOutboundBytes";
String NETTY_TOTAL_PENDING_OUTBOUND_DESC = "Current total bytes pending write to all channel";
String NETTY_MAX_PENDING_OUTBOUND_NAME = "nettyMaxPendingOutboundBytes";
String NETTY_MAX_PENDING_OUTBOUND_DESC = "Current maximum bytes pending write to any channel";

void authorizationSuccess();

Expand All @@ -121,4 +133,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
void processedCall(int processingTime);

void queuedAndProcessedCall(int totalTime);

void unwritableTime(long unwritableTime);

void maxOutboundBytesExceeded();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
Expand All @@ -36,10 +37,12 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
private final MutableFastCounter authenticationFallbacks;
private final MutableFastCounter sentBytes;
private final MutableFastCounter receivedBytes;
private final MutableFastCounter maxOutboundBytesExceeded;

private MetricHistogram queueCallTime;
private MetricHistogram processCallTime;
private MetricHistogram totalCallTime;
private MetricHistogram unwritableTime;
private MetricHistogram requestSize;
private MetricHistogram responseSize;

Expand Down Expand Up @@ -67,6 +70,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, String metricsDescriptio
this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, PROCESS_CALL_TIME_DESC);
this.totalCallTime =
this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, TOTAL_CALL_TIME_DESC);
this.unwritableTime =
this.getMetricsRegistry().newTimeHistogram(UNWRITABLE_TIME_NAME, UNWRITABLE_TIME_DESC);
this.maxOutboundBytesExceeded = this.getMetricsRegistry()
.newCounter(MAX_OUTBOUND_BYTES_EXCEEDED_NAME, MAX_OUTBOUND_BYTES_EXCEEDED_DESC, 0);
this.requestSize =
this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, REQUEST_SIZE_DESC);
this.responseSize =
Expand Down Expand Up @@ -133,6 +140,16 @@ public void queuedAndProcessedCall(int totalTime) {
totalCallTime.add(totalTime);
}

@Override
public void unwritableTime(long unwritableTime) {
this.unwritableTime.add(unwritableTime);
}

@Override
public void maxOutboundBytesExceeded() {
maxOutboundBytesExceeded.incr();
}

@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
Expand Down Expand Up @@ -177,6 +194,13 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
wrapper.getActiveScanRpcHandlerCount())
.addGauge(Interns.info(NETTY_DM_USAGE_NAME, NETTY_DM_USAGE_DESC),
wrapper.getNettyDmUsage());

Pair<Long, Long> totalAndMax = wrapper.getTotalAndMaxNettyOutboundBytes();
mrb.addGauge(
Interns.info(NETTY_TOTAL_PENDING_OUTBOUND_NAME, NETTY_TOTAL_PENDING_OUTBOUND_DESC),
totalAndMax.getFirst());
mrb.addGauge(Interns.info(NETTY_MAX_PENDING_OUTBOUND_NAME, NETTY_MAX_PENDING_OUTBOUND_DESC),
totalAndMax.getSecond());
}

metricsRegistry.snapshot(mrb, all);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand Down Expand Up @@ -64,4 +65,6 @@ public interface MetricsHBaseServerWrapper {
int getActiveScanRpcHandlerCount();

long getNettyDmUsage();

Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ void totalCall(int totalTime) {
source.queuedAndProcessedCall(totalTime);
}

void unwritableTime(long unwritableTime) {
source.unwritableTime(unwritableTime);
}

void maxOutboundBytesExceeded() {
source.maxOutboundBytesExceeded();
}

public void exception(Throwable throwable) {
source.exception();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand Down Expand Up @@ -209,4 +210,16 @@ public long getNettyDmUsage() {

return DirectMemoryUtils.getNettyDirectMemoryUsage();
}

@Override
public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
if (
!isServerStarted() || this.server.getScheduler() == null
|| !(this.server instanceof NettyRpcServer)
) {
return Pair.newPair(0L, 0L);
}

return ((NettyRpcServer) server).getTotalAndMaxNettyOutboundBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -53,6 +55,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
Expand Down Expand Up @@ -84,6 +87,38 @@ public class NettyRpcServer extends RpcServer {
static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
static final String HEAP_ALLOCATOR_TYPE = "heap";

/**
* Low watermark for pending outbound bytes of a single netty channel. If the high watermark was
* exceeded, channel will have setAutoRead to true again. The server will start reading incoming
* bytes (requests) from the client channel.
*/
private static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
"hbase.server.netty.writable.watermark.low";
private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;

/**
* High watermark for pending outbound bytes of a single netty channel. If the number of pending
* outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server
* will stop reading incoming requests from the client channel.
* <p>
* Note: any requests already in the call queue will still be processed.
*/
private static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
"hbase.server.netty.writable.watermark.high";
private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;

/**
* Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending
* outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory
* can be reclaimed. The client will have to re-establish a new connection and retry any in-flight
* requests.
* <p>
* Note: must be higher than the high watermark, otherwise it's ignored.
*/
private static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
"hbase.server.netty.writable.watermark.fatal";
private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;

private final InetSocketAddress bindAddress;

private final CountDownLatch closed = new CountDownLatch(1);
Expand All @@ -94,6 +129,10 @@ public class NettyRpcServer extends RpcServer {
private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();

private final ServerBootstrap bootstrap;
private volatile int writeBufferFatalThreshold;
private volatile WriteBufferWaterMark writeBufferWaterMark;

public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
boolean reservoirEnabled) throws IOException {
Expand All @@ -108,15 +147,20 @@ public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterfa
if (config == null) {
config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
}

// call before creating bootstrap below so that the necessary configs can be set
configureNettyWatermarks(conf);

EventLoopGroup eventLoopGroup = config.group();
Class<? extends ServerChannel> channelClass = config.serverChannelClass();
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
ch.config().setAllocator(channelAllocator);
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
Expand All @@ -129,7 +173,7 @@ protected void initChannel(Channel ch) throws Exception {
// We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may
// send RpcResponse to client.
.addLast(NettyRpcServerResponseEncoder.NAME,
new NettyRpcServerResponseEncoder(metrics));
new NettyRpcServerResponseEncoder(NettyRpcServer.this, metrics));
}
});
try {
Expand All @@ -142,6 +186,71 @@ protected void initChannel(Channel ch) throws Exception {
this.scheduler.init(new RpcSchedulerContext(this));
}

@Override
public void onConfigurationChange(Configuration newConf) {
super.onConfigurationChange(newConf);
configureNettyWatermarks(newConf);
}

private void configureNettyWatermarks(Configuration conf) {
int watermarkLow =
conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
int watermarkHigh =
conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
int fatalThreshold =
conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);

WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
int oldFatalThreshold = writeBufferFatalThreshold;

boolean disabled = false;
if (watermarkHigh == 0 && watermarkLow == 0) {
// if both are 0, use the netty default, which we will treat as "disabled".
// when disabled, we won't manage autoRead in response to writability changes.
writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
disabled = true;
} else {
writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh);

// only apply this check when watermark is enabled. this way we give the operator some
// flexibility if they want to try enabling fatal threshold without backpressure.
if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.",
CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
watermarkHigh);
fatalThreshold = 0;
}
}

writeBufferFatalThreshold = fatalThreshold;

if (
oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
|| oldWaterMark.high() != writeBufferWaterMark.high()
|| oldFatalThreshold != writeBufferFatalThreshold)
) {
LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}",
disabled ? "disabled" : writeBufferWaterMark.low(),
disabled ? "disabled" : writeBufferWaterMark.high(), writeBufferFatalThreshold);
}

// update any existing channels
if (bootstrap != null) {
for (Channel channel : allChannels) {
channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
// if disabling watermark, set auto read to true in case channel had been exceeding
// previous watermark
if (disabled) {
channel.config().setAutoRead(true);
}
}
}
}

public boolean isWriteBufferWaterMarkEnabled() {
return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
}

private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
if (value != null) {
Expand Down Expand Up @@ -296,4 +405,19 @@ SslContext getSslContext() throws X509Exception, IOException {
}
return result;
}

public int getWriteBufferFatalThreshold() {
return writeBufferFatalThreshold;
}

public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
long total = 0;
long max = 0;
for (Channel channel : allChannels) {
long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
total += outbound;
max = Math.max(max, outbound);
}
return Pair.newPair(total, max);
}
}
Loading