Skip to content

Commit 1596842

Browse files
committed
HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation
1 parent 7d0a687 commit 1596842

4 files changed

Lines changed: 39 additions & 97 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 10 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
2323

2424
import java.io.IOException;
25-
import java.net.InetSocketAddress;
2625
import java.net.SocketAddress;
27-
import java.net.UnknownHostException;
2826
import java.util.Collection;
2927
import java.util.concurrent.Executors;
3028
import java.util.concurrent.ScheduledExecutorService;
@@ -320,7 +318,7 @@ private int nextCallId() {
320318
* @return A pair with the Message response and the Cell data (if any).
321319
*/
322320
private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
323-
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
321+
Message param, Message returnType, final User ticket, final Address isa)
324322
throws ServiceException {
325323
BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
326324
callMethod(md, hrc, param, returnType, ticket, isa, done);
@@ -392,7 +390,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
392390

393391
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
394392
final Message param, Message returnType, final User ticket,
395-
final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
393+
final Address addr, final RpcCallback<Message> callback) {
396394
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
397395
cs.setStartTime(EnvironmentEdgeManager.currentTime());
398396

@@ -406,7 +404,6 @@ Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController
406404
cs.setNumActionsPerServer(numActions);
407405
}
408406

409-
final Address addr = Address.fromSocketAddress(inetAddr);
410407
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
411408
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
412409
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@@ -520,13 +517,6 @@ private static class AbstractRpcChannel {
520517

521518
protected final Address addr;
522519

523-
// We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
524-
// per method call on the channel. If the remote target is removed or reprovisioned and
525-
// its identity changes a new channel with a newly resolved InetSocketAddress will be
526-
// created as part of retry, so caching here is fine.
527-
// Normally, caching an InetSocketAddress is an anti-pattern.
528-
protected InetSocketAddress isa;
529-
530520
protected final AbstractRpcClient<?> rpcClient;
531521

532522
protected final User ticket;
@@ -576,23 +566,9 @@ protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
576566

577567
@Override
578568
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
579-
Message param, Message returnType) throws ServiceException {
580-
// Look up remote address upon first call
581-
if (isa == null) {
582-
if (this.rpcClient.metrics != null) {
583-
this.rpcClient.metrics.incrNsLookups();
584-
}
585-
isa = Address.toSocketAddress(addr);
586-
if (isa.isUnresolved()) {
587-
if (this.rpcClient.metrics != null) {
588-
this.rpcClient.metrics.incrNsLookupsFailed();
589-
}
590-
isa = null;
591-
throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
592-
}
593-
}
594-
return rpcClient.callBlockingMethod(md, configureRpcController(controller),
595-
param, returnType, ticket, isa);
569+
Message param, Message returnType) throws ServiceException {
570+
return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
571+
ticket, addr);
596572
}
597573
}
598574

@@ -608,29 +584,13 @@ protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
608584
}
609585

610586
@Override
611-
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
612-
Message param, Message returnType, RpcCallback<Message> done) {
613-
HBaseRpcController configuredController =
614-
configureRpcController(Preconditions.checkNotNull(controller,
615-
"RpcController can not be null for async rpc call"));
616-
// Look up remote address upon first call
617-
if (isa == null || isa.isUnresolved()) {
618-
if (this.rpcClient.metrics != null) {
619-
this.rpcClient.metrics.incrNsLookups();
620-
}
621-
isa = Address.toSocketAddress(addr);
622-
if (isa.isUnresolved()) {
623-
if (this.rpcClient.metrics != null) {
624-
this.rpcClient.metrics.incrNsLookupsFailed();
625-
}
626-
isa = null;
627-
controller.setFailed(addr + " could not be resolved");
628-
return;
629-
}
630-
}
587+
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
588+
Message returnType, RpcCallback<Message> done) {
589+
HBaseRpcController configuredController = configureRpcController(
590+
Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
631591
// This method does not throw any exceptions, so the caller must provide a
632592
// HBaseRpcController which is used to pass the exceptions.
633-
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
593+
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
634594
}
635595
}
636596
}

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.net.InetSocketAddress;
3636
import java.net.Socket;
3737
import java.net.SocketTimeoutException;
38-
import java.net.UnknownHostException;
3938
import java.security.PrivilegedExceptionAction;
4039
import java.util.ArrayDeque;
4140
import java.util.Locale;
@@ -44,15 +43,13 @@
4443
import java.util.concurrent.ConcurrentMap;
4544
import java.util.concurrent.ThreadLocalRandom;
4645
import javax.security.sasl.SaslException;
47-
4846
import org.apache.hadoop.conf.Configuration;
4947
import org.apache.hadoop.hbase.CellScanner;
5048
import org.apache.hadoop.hbase.DoNotRetryIOException;
5149
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
5250
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
5351
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
5452
import org.apache.hadoop.hbase.log.HBaseMarkers;
55-
import org.apache.hadoop.hbase.net.Address;
5653
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
5754
import org.apache.hadoop.hbase.security.SaslUtil;
5855
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -69,11 +66,13 @@
6966
import org.apache.yetus.audience.InterfaceAudience;
7067
import org.slf4j.Logger;
7168
import org.slf4j.LoggerFactory;
69+
7270
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
7371
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
7472
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
7573
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
7674
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
75+
7776
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
7877
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
7978
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -256,16 +255,7 @@ protected void setupConnection() throws IOException {
256255
if (this.rpcClient.localAddr != null) {
257256
this.socket.bind(this.rpcClient.localAddr);
258257
}
259-
if (this.rpcClient.metrics != null) {
260-
this.rpcClient.metrics.incrNsLookups();
261-
}
262-
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
263-
if (remoteAddr.isUnresolved()) {
264-
if (this.rpcClient.metrics != null) {
265-
this.rpcClient.metrics.incrNsLookupsFailed();
266-
}
267-
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
268-
}
258+
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
269259
NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
270260
this.socket.setSoTimeout(this.rpcClient.readTO);
271261
return;
@@ -374,15 +364,8 @@ private boolean setupSaslConnection(final InputStream in2, final OutputStream ou
374364
if (this.metrics != null) {
375365
this.metrics.incrNsLookups();
376366
}
377-
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
378-
if (serverAddr.isUnresolved()) {
379-
if (this.metrics != null) {
380-
this.metrics.incrNsLookupsFailed();
381-
}
382-
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
383-
}
384367
saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
385-
serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed,
368+
socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
386369
this.rpcClient.conf.get("hbase.rpc.protection",
387370
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
388371
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,16 @@
3232
import java.util.concurrent.TimeUnit;
3333
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
3434
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
35-
import org.apache.hadoop.hbase.net.Address;
3635
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
3736
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
3837
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
3938
import org.apache.hadoop.hbase.util.Threads;
4039
import org.apache.hadoop.security.UserGroupInformation;
41-
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4240
import org.apache.yetus.audience.InterfaceAudience;
4341
import org.slf4j.Logger;
4442
import org.slf4j.LoggerFactory;
4543

44+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4645
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
4746
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
4847
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -210,18 +209,9 @@ private void saslNegotiate(final Channel ch) {
210209
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
211210
final NettyHBaseSaslRpcClientHandler saslHandler;
212211
try {
213-
if (this.metrics != null) {
214-
this.metrics.incrNsLookups();
215-
}
216-
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
217-
if (serverAddr.isUnresolved()) {
218-
if (this.metrics != null) {
219-
this.metrics.incrNsLookupsFailed();
220-
}
221-
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
222-
}
223212
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
224-
serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
213+
((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
214+
rpcClient.fallbackAllowed, this.rpcClient.conf);
225215
} catch (IOException e) {
226216
failInit(ch, e);
227217
return;
@@ -282,16 +272,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
282272
private void connect() throws UnknownHostException {
283273
assert eventLoop.inEventLoop();
284274
LOG.trace("Connecting to {}", remoteId.getAddress());
285-
if (this.rpcClient.metrics != null) {
286-
this.rpcClient.metrics.incrNsLookups();
287-
}
288-
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
289-
if (remoteAddr.isUnresolved()) {
290-
if (this.rpcClient.metrics != null) {
291-
this.rpcClient.metrics.incrNsLookupsFailed();
292-
}
293-
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
294-
}
275+
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
295276
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
296277
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
297278
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import java.io.IOException;
21+
import java.net.InetSocketAddress;
22+
import java.net.UnknownHostException;
2123
import java.util.concurrent.TimeUnit;
2224

2325
import org.apache.hadoop.conf.Configuration;
2426
import org.apache.hadoop.hbase.HConstants;
2527
import org.apache.hadoop.hbase.client.MetricsConnection;
2628
import org.apache.hadoop.hbase.codec.Codec;
29+
import org.apache.hadoop.hbase.net.Address;
2730
import org.apache.hadoop.hbase.security.SecurityInfo;
2831
import org.apache.hadoop.hbase.security.User;
2932
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
@@ -122,7 +125,7 @@ protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, Conne
122125
this.remoteId = remoteId;
123126
}
124127

125-
protected void scheduleTimeoutTask(final Call call) {
128+
protected final void scheduleTimeoutTask(final Call call) {
126129
if (call.timeout > 0) {
127130
call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
128131

@@ -137,7 +140,7 @@ public void run(Timeout timeout) throws Exception {
137140
}
138141
}
139142

140-
protected byte[] getConnectionHeaderPreamble() {
143+
protected final byte[] getConnectionHeaderPreamble() {
141144
// Assemble the preamble up in a buffer first and then send it. Writing individual elements,
142145
// they are getting sent across piecemeal according to wireshark and then server is messing
143146
// up the reading on occasion (the passed in stream is not buffered yet).
@@ -153,7 +156,7 @@ protected byte[] getConnectionHeaderPreamble() {
153156
return preamble;
154157
}
155158

156-
protected ConnectionHeader getConnectionHeader() {
159+
protected final ConnectionHeader getConnectionHeader() {
157160
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
158161
builder.setServiceName(remoteId.getServiceName());
159162
final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
@@ -176,6 +179,21 @@ protected ConnectionHeader getConnectionHeader() {
176179
return builder.build();
177180
}
178181

182+
protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
183+
throws UnknownHostException {
184+
if (metrics != null) {
185+
metrics.incrNsLookups();
186+
}
187+
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
188+
if (remoteAddr.isUnresolved()) {
189+
if (metrics != null) {
190+
metrics.incrNsLookupsFailed();
191+
}
192+
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
193+
}
194+
return remoteAddr;
195+
}
196+
179197
protected abstract void callTimeout(Call call);
180198

181199
public ConnectionId remoteId() {

0 commit comments

Comments
 (0)