Skip to content

Commit 1069753

Browse files
committed
HBASE-28312 The bad auth exception can not be passed to client rpc calls properly (#5629)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org> (cherry picked from commit 6017937)
1 parent 13bbb2b commit 1069753

14 files changed

Lines changed: 251 additions & 50 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
21-
import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
2221
import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
23-
import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
2422
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
2523
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
2624

@@ -69,6 +67,7 @@
6967

7068
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
7169
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
70+
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
7271
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
7372
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
7473

@@ -704,6 +703,25 @@ private void readResponse() {
704703
// Read the header
705704
ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
706705
int id = responseHeader.getCallId();
706+
if (LOG.isTraceEnabled()) {
707+
LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader)
708+
+ ", totalSize: " + totalSize + " bytes");
709+
}
710+
RemoteException remoteExc;
711+
if (responseHeader.hasException()) {
712+
ExceptionResponse exceptionResponse = responseHeader.getException();
713+
remoteExc = IPCUtil.createRemoteException(exceptionResponse);
714+
if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
715+
// Here we will cleanup all calls so do not need to fall back, just return.
716+
synchronized (this) {
717+
closeConn(remoteExc);
718+
}
719+
return;
720+
}
721+
} else {
722+
remoteExc = null;
723+
}
724+
707725
call = calls.remove(id); // call.done have to be set before leaving this method
708726
expectedCall = (call != null && !call.isDone());
709727
if (!expectedCall) {
@@ -714,46 +732,34 @@ private void readResponse() {
714732
// this connection.
715733
int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
716734
int whatIsLeftToRead = totalSize - readSoFar;
735+
LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead
736+
+ " bytes");
717737
IOUtils.skipFully(in, whatIsLeftToRead);
718738
if (call != null) {
719739
call.callStats.setResponseSizeBytes(totalSize);
720-
call.callStats
721-
.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
722740
}
723741
return;
724742
}
725-
if (responseHeader.hasException()) {
726-
ExceptionResponse exceptionResponse = responseHeader.getException();
727-
RemoteException re = createRemoteException(exceptionResponse);
728-
call.setException(re);
729-
call.callStats.setResponseSizeBytes(totalSize);
730-
call.callStats
731-
.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
732-
if (isFatalConnectionException(exceptionResponse)) {
733-
synchronized (this) {
734-
closeConn(re);
735-
}
736-
}
737-
} else {
738-
Message value = null;
739-
if (call.responseDefaultType != null) {
740-
Message.Builder builder = call.responseDefaultType.newBuilderForType();
741-
ProtobufUtil.mergeDelimitedFrom(builder, in);
742-
value = builder.build();
743-
}
744-
CellScanner cellBlockScanner = null;
745-
if (responseHeader.hasCellBlockMeta()) {
746-
int size = responseHeader.getCellBlockMeta().getLength();
747-
byte[] cellBlock = new byte[size];
748-
IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
749-
cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
750-
this.compressor, cellBlock);
751-
}
752-
call.setResponse(value, cellBlockScanner);
753-
call.callStats.setResponseSizeBytes(totalSize);
754-
call.callStats
755-
.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
743+
call.callStats.setResponseSizeBytes(totalSize);
744+
if (remoteExc != null) {
745+
call.setException(remoteExc);
746+
return;
747+
}
748+
Message value = null;
749+
if (call.responseDefaultType != null) {
750+
Message.Builder builder = call.responseDefaultType.newBuilderForType();
751+
ProtobufUtil.mergeDelimitedFrom(builder, in);
752+
value = builder.build();
753+
}
754+
CellScanner cellBlockScanner = null;
755+
if (responseHeader.hasCellBlockMeta()) {
756+
int size = responseHeader.getCellBlockMeta().getLength();
757+
byte[] cellBlock = new byte[size];
758+
IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
759+
cellBlockScanner =
760+
this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
756761
}
762+
call.setResponse(value, cellBlockScanner);
757763
} catch (IOException e) {
758764
if (expectedCall) {
759765
call.setException(e);

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4242
import org.apache.hadoop.ipc.RemoteException;
4343
import org.apache.yetus.audience.InterfaceAudience;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
4446

4547
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
4648
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
@@ -62,6 +64,8 @@
6264
@InterfaceAudience.Private
6365
class IPCUtil {
6466

67+
private static final Logger LOG = LoggerFactory.getLogger(IPCUtil.class);
68+
6569
/**
6670
* Write out header, param, and cell block if there is one.
6771
* @param dos Stream to write into
@@ -159,8 +163,19 @@ static RemoteException createRemoteException(final ExceptionResponse e) {
159163
}
160164

161165
/** Returns True if the exception is a fatal connection exception. */
162-
static boolean isFatalConnectionException(final ExceptionResponse e) {
163-
return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
166+
static boolean isFatalConnectionException(ExceptionResponse e) {
167+
if (e.getExceptionClassName().equals(FatalConnectionException.class.getName())) {
168+
return true;
169+
}
170+
// try our best to check for sub classes of FatalConnectionException
171+
try {
172+
return e.getExceptionClassName() != null && FatalConnectionException.class.isAssignableFrom(
173+
Class.forName(e.getExceptionClassName(), false, IPCUtil.class.getClassLoader()));
174+
// Class.forName may throw ExceptionInInitializerError so we have to catch Throwable here
175+
} catch (Throwable t) {
176+
LOG.debug("Can not get class object for {}", e.getExceptionClassName(), t);
177+
return false;
178+
}
164179
}
165180

166181
static IOException toIOE(Throwable t) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ public void run(Timeout timeout) throws Exception {
145145
}
146146
}
147147

148-
protected final byte[] getConnectionHeaderPreamble() {
148+
// will be overridden in tests
149+
protected byte[] getConnectionHeaderPreamble() {
149150
// Assemble the preamble up in a buffer first and then send it. Writing individual elements,
150151
// they are getting sent across piecemeal according to wireshark and then server is messing
151152
// up the reading on occasion (the passed in stream is not buffered yet).
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
/**
21+
* Just a dummy exception for testing IPCUtil.isFatalConnectionException.
22+
*/
23+
public class DummyException extends Exception {
24+
25+
private static final long serialVersionUID = 215191975455115118L;
26+
27+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
/**
21+
* Just a dummy exception for testing IPCUtil.isFatalConnectionException.
22+
*/
23+
public class DummyFatalConnectionException extends FatalConnectionException {
24+
25+
private static final long serialVersionUID = -1966815615846798490L;
26+
27+
}

hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.hamcrest.CoreMatchers.instanceOf;
2121
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertTrue;
2324

2425
import java.io.IOException;
@@ -44,6 +45,8 @@
4445
import org.apache.hbase.thirdparty.io.netty.channel.DefaultEventLoop;
4546
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
4647

48+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
49+
4750
@Category({ ClientTests.class, SmallTests.class })
4851
public class TestIPCUtil {
4952

@@ -159,4 +162,23 @@ public void run() {
159162
eventLoop.shutdownGracefully().get();
160163
}
161164
}
165+
166+
@Test
167+
public void testIsFatalConnectionException() {
168+
// intentionally not reference the class object directly, so here we will not load the class, to
169+
// make sure that in isFatalConnectionException, we can use initialized = false when calling
170+
// Class.forName
171+
ExceptionResponse resp = ExceptionResponse.newBuilder()
172+
.setExceptionClassName("org.apache.hadoop.hbase.ipc.DummyFatalConnectionException").build();
173+
assertTrue(IPCUtil.isFatalConnectionException(resp));
174+
175+
resp = ExceptionResponse.newBuilder()
176+
.setExceptionClassName("org.apache.hadoop.hbase.ipc.DummyException").build();
177+
assertFalse(IPCUtil.isFatalConnectionException(resp));
178+
179+
// class not found
180+
resp = ExceptionResponse.newBuilder()
181+
.setExceptionClassName("org.apache.hadoop.hbase.ipc.WhatEver").build();
182+
assertFalse(IPCUtil.isFatalConnectionException(resp));
183+
}
162184
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler<ByteBuf>
3838

3939
private final NettyRpcServer rpcServer;
4040
private final NettyServerRpcConnection conn;
41+
private boolean processPreambleError;
4142

4243
public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer, NettyServerRpcConnection conn) {
4344
this.rpcServer = rpcServer;
@@ -46,10 +47,18 @@ public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer, NettyServerRpcCon
4647

4748
@Override
4849
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
50+
if (processPreambleError) {
51+
// if we failed to process preamble, we will close the connection immediately, but it is
52+
// possible that we have already received some bytes after the 'preamble' so when closing, the
53+
// netty framework will still pass them here. So we set a flag here to just skip processing
54+
// these broken messages.
55+
return;
56+
}
4957
ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes());
5058
msg.readBytes(buf);
5159
buf.flip();
5260
if (!conn.processPreamble(buf)) {
61+
processPreambleError = true;
5362
conn.close();
5463
return;
5564
}

hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
3030
import static org.hamcrest.MatcherAssert.assertThat;
3131
import static org.hamcrest.Matchers.allOf;
32+
import static org.hamcrest.Matchers.containsString;
3233
import static org.hamcrest.Matchers.everyItem;
3334
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3435
import static org.hamcrest.Matchers.hasItem;
36+
import static org.hamcrest.Matchers.instanceOf;
3537
import static org.junit.Assert.assertEquals;
3638
import static org.junit.Assert.assertFalse;
3739
import static org.junit.Assert.assertNotNull;
@@ -53,6 +55,7 @@
5355
import java.net.InetSocketAddress;
5456
import java.time.Duration;
5557
import java.util.ArrayList;
58+
import java.util.Collections;
5659
import java.util.List;
5760
import org.apache.hadoop.conf.Configuration;
5861
import org.apache.hadoop.hbase.Cell;
@@ -545,4 +548,24 @@ public void testTracingErrorIpc() throws IOException {
545548
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
546549
}
547550
}
551+
552+
protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);
553+
554+
@Test
555+
public void testBadPreambleHeader() throws IOException, ServiceException {
556+
Configuration clientConf = new Configuration(CONF);
557+
RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
558+
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
559+
try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
560+
rpcServer.start();
561+
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
562+
ServiceException se = assertThrows(ServiceException.class,
563+
() -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
564+
IOException ioe = ProtobufUtil.handleRemoteException(se);
565+
assertThat(ioe, instanceOf(BadAuthException.class));
566+
assertThat(ioe.getMessage(), containsString("authName=unknown"));
567+
} finally {
568+
rpcServer.stop();
569+
}
570+
}
548571
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.io.IOException;
21+
22+
public class BadAuthNettyRpcConnection extends NettyRpcConnection {
23+
24+
public BadAuthNettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId)
25+
throws IOException {
26+
super(rpcClient, remoteId);
27+
}
28+
29+
@Override
30+
protected byte[] getConnectionHeaderPreamble() {
31+
byte[] header = super.getConnectionHeaderPreamble();
32+
// set an invalid auth code
33+
header[header.length - 1] = -10;
34+
return header;
35+
}
36+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,24 @@ protected RpcServer createTestFailingRpcServer(String name,
107107
Configuration conf, RpcScheduler scheduler) throws IOException {
108108
return new TestFailingRpcServer(null, name, services, bindAddress, conf, scheduler);
109109
}
110+
111+
@Override
112+
protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
113+
return new BlockingRpcClient(conf) {
114+
115+
@Override
116+
protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException {
117+
return new BlockingRpcConnection(this, remoteId) {
118+
@Override
119+
protected byte[] getConnectionHeaderPreamble() {
120+
byte[] header = super.getConnectionHeaderPreamble();
121+
// set an invalid auth code
122+
header[header.length - 1] = -10;
123+
return header;
124+
}
125+
};
126+
}
127+
128+
};
129+
}
110130
}

0 commit comments

Comments
 (0)