Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public NettyRpcFrameDecoder(int maxFrameLength) {
this.maxFrameLength = maxFrameLength;
}

private NettyServerRpcConnection connection;
NettyServerRpcConnection connection;

void setConnection(NettyServerRpcConnection connection) {
this.connection = connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public class NettyRpcServer extends RpcServer {

private final CountDownLatch closed = new CountDownLatch(1);
private final Channel serverChannel;
private final ChannelGroup allChannels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
private final ByteBufAllocator channelAllocator;

public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class NettyServerRpcConnection extends ServerRpcConnection {
NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
super(rpcServer);
this.channel = channel;
// register close hook to release resources
channel.closeFuture().addListener(f -> {
disposeSasl();
callCleanupIfNeeded();
});
InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
this.addr = inetSocketAddress.getAddress();
if (addr == null) {
Expand Down Expand Up @@ -101,9 +106,7 @@ void process(ByteBuff buf) throws IOException, InterruptedException {

@Override
public synchronized void close() {
disposeSasl();
channel.close();
callCleanupIfNeeded();
Comment on lines -104 to -106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not calling this is also a problem, but that's fine we can track that later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We register close hook, so close the channel will trigger the close hook and call these two methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I didn't mean call to disposeSasl() and callCleanupIfNeeded(). I meant this whole NettyServerRpcConnection#close not getting called is a problem.

Having the channel close hook to dispose sasl is the right thing to do but I think we can simplify this a bit and maybe remove close() from NettyServerRpcConnection or do some better refactoring so that first time readers don't get confused and add more closable stuffs in NettyServerRpcConnection#close.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;

import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;

/**
* Confirm that we truly close the NettyRpcConnection when the netty channel is closed.
*/
@Category({ RPCTests.class, MediumTests.class })
public class TestNettyIPCCloseConnection {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNettyIPCCloseConnection.class);

private static Configuration CONF = HBaseConfiguration.create();

private NioEventLoopGroup group;

private NettyRpcServer server;

private NettyRpcClient client;

private TestProtobufRpcProto.BlockingInterface stub;

@Before
public void setUp() throws IOException {
group = new NioEventLoopGroup();
server = new NettyRpcServer(null, getClass().getSimpleName(),
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1), true);
NettyRpcClientConfigHelper.setEventLoopConfig(CONF, group, NioSocketChannel.class);
client = new NettyRpcClient(CONF);
server.start();
stub = TestProtobufRpcServiceImpl.newBlockingStub(client, server.getListenerAddress());
}

@After
public void tearDown() throws Exception {
Closeables.close(client, true);
server.stop();
group.shutdownGracefully().sync();
}

@Test
public void test() throws Exception {
assertEquals("test",
stub.echo(null, EchoRequestProto.newBuilder().setMessage("test").build()).getMessage());
Channel channel = Iterators.getOnlyElement(server.allChannels.iterator());
assertNotNull(channel);
NettyRpcFrameDecoder decoder = channel.pipeline().get(NettyRpcFrameDecoder.class);
// set a mock saslServer to verify that it will call the dispose method of this instance
HBaseSaslRpcServer saslServer = mock(HBaseSaslRpcServer.class);
decoder.connection.saslServer = saslServer;
client.close();
// the channel should have been closed
channel.closeFuture().await(5, TimeUnit.SECONDS);
// verify that we have called the dispose method and set saslServer to null
Waiter.waitFor(CONF, 5000, () -> decoder.connection.saslServer == null);
verify(saslServer, times(1)).dispose();
}
}