Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -52,7 +54,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -82,10 +84,9 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,21 +133,24 @@ private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface>

protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);

@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
@RegisterExtension
private static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create();

@Parameter(0)
public Class<? extends RpcServer> rpcServerImpl;
private Class<? extends RpcServer> rpcServerImpl;

@Before
protected AbstractTestIPC(Class<? extends RpcServer> rpcServerImpl) {
this.rpcServerImpl = rpcServerImpl;
}

@BeforeEach
public void setUpBeforeTest() {
CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
}

/**
* Ensure we do not HAVE TO HAVE a codec.
*/
@Test
@TestTemplate
public void testNoCodec() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand All @@ -172,7 +176,7 @@ public void testNoCodec() throws IOException, ServiceException {
* unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
* confirm that compression is happening down in the client and server).
*/
@Test
@TestTemplate
public void testCompressCellBlock() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
Expand Down Expand Up @@ -209,7 +213,7 @@ public void testCompressCellBlock() throws IOException, ServiceException {
protected abstract AbstractRpcClient<?>
createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException;

@Test
@TestTemplate
public void testRTEDuringConnectionSetup() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand All @@ -222,7 +226,7 @@ public void testRTEDuringConnectionSetup() throws Exception {
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"), e.toString());
} finally {
rpcServer.stop();
}
Expand All @@ -231,7 +235,7 @@ public void testRTEDuringConnectionSetup() throws Exception {
/**
* Tests that the rpc scheduler is called when requests arrive.
*/
@Test
@TestTemplate
public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
Configuration clientConf = new Configuration(CONF);
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
Expand All @@ -255,7 +259,7 @@ public void testRpcScheduler() throws IOException, ServiceException, Interrupted
}

/** Tests that the rpc scheduler is called when requests arrive. */
@Test
@TestTemplate
public void testRpcMaxRequestSize() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
Expand All @@ -276,8 +280,8 @@ public void testRpcMaxRequestSize() throws IOException, ServiceException {
fail("RPC should have failed because it exceeds max request size");
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e);
assertTrue(e.toString(),
StringUtils.stringifyException(e).contains("RequestTooBigException"));
assertTrue(StringUtils.stringifyException(e).contains("RequestTooBigException"),
e.toString());
} finally {
rpcServer.stop();
}
Expand All @@ -287,7 +291,7 @@ public void testRpcMaxRequestSize() throws IOException, ServiceException {
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
*/
@Test
@TestTemplate
public void testRpcServerForNotNullRemoteAddressInCallObject()
throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
Expand All @@ -305,7 +309,7 @@ public void testRpcServerForNotNullRemoteAddressInCallObject()
}
}

@Test
@TestTemplate
public void testRemoteError() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand All @@ -314,18 +318,18 @@ public void testRemoteError() throws IOException, ServiceException {
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.error(null, EmptyRequestProto.getDefaultInstance());
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e);
IOException ioe = ProtobufUtil.handleRemoteException(e);
assertTrue(ioe instanceof DoNotRetryIOException);
assertTrue(ioe.getMessage().contains("server error!"));
ServiceException se = assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
LOG.info("Caught expected exception: " + se);
IOException ioe = ProtobufUtil.handleRemoteException(se);
assertThat(ioe, instanceOf(DoNotRetryIOException.class));
assertThat(ioe.getMessage(), containsString("server error!"));
} finally {
rpcServer.stop();
}
}

@Test
@TestTemplate
public void testTimeout() throws IOException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand All @@ -341,23 +345,25 @@ public void testTimeout() throws IOException {
pcrc.reset();
pcrc.setCallTimeout(timeout);
long startTime = System.nanoTime();
try {
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
} catch (ServiceException e) {
long waitTime = (System.nanoTime() - startTime) / 1000000;
// expected
LOG.info("Caught expected exception: " + e);
IOException ioe = ProtobufUtil.handleRemoteException(e);
assertTrue(ioe.getCause() instanceof CallTimeoutException);
// confirm that we got exception before the actual pause.
assertTrue(waitTime < ms);
}
ServiceException se = assertThrows(ServiceException.class,
() -> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()));
long waitTime = (System.nanoTime() - startTime) / 1000000;
// expected
LOG.info("Caught expected exception: " + se);
IOException ioe = ProtobufUtil.handleRemoteException(se);
assertThat(ioe.getCause(), instanceOf(CallTimeoutException.class));
// confirm that we got exception before the actual pause.
assertThat(waitTime, lessThan((long) ms));
}
} finally {
// wait until all active calls quit, otherwise it may mess up the tracing spans
await().atMost(Duration.ofSeconds(2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to bring it up, but would 5 sec be more reliable in our CI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout value is 1 secs, so I think 2 secs is enough?

.untilAsserted(() -> assertEquals(0, rpcServer.getScheduler().getActiveRpcHandlerCount()));
rpcServer.stop();
}
}

@SuppressWarnings("deprecation")
private static class FailingSimpleRpcServer extends SimpleRpcServer {

FailingSimpleRpcServer(Server server, String name,
Expand Down Expand Up @@ -397,27 +403,25 @@ protected RpcServer createTestFailingRpcServer(final String name,
}

/** Tests that the connection closing is handled by the client with outstanding RPC calls */
@Test
@TestTemplate
public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));

try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
stub.echo(null, param);
fail("RPC should have failed because connection closed");
} catch (ServiceException e) {
LOG.info("Caught expected exception: " + e.toString());
ServiceException se = assertThrows(ServiceException.class, () -> stub.echo(null, param),
"RPC should have failed because connection closed");
LOG.info("Caught expected exception: " + se);
} finally {
rpcServer.stop();
}
}

@Test
@TestTemplate
public void testAsyncEcho() throws IOException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand Down Expand Up @@ -448,14 +452,13 @@ public void testAsyncEcho() throws IOException {
}
}

@Test
@TestTemplate
public void testAsyncRemoteError() throws IOException {
Configuration clientConf = new Configuration(CONF);
AbstractRpcClient<?> client = createRpcClient(clientConf);
RpcServer rpcServer = createRpcServer("testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try {
try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
rpcServer.start();
Interface stub = newStub(client, rpcServer.getListenerAddress());
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
Expand All @@ -465,15 +468,14 @@ public void testAsyncRemoteError() throws IOException {
assertTrue(pcrc.failed());
LOG.info("Caught expected exception: " + pcrc.getFailed());
IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
assertTrue(ioe instanceof DoNotRetryIOException);
assertTrue(ioe.getMessage().contains("server error!"));
assertThat(ioe, instanceOf(DoNotRetryIOException.class));
assertThat(ioe.getMessage(), containsString("server error!"));
} finally {
client.close();
rpcServer.stop();
}
}

@Test
@TestTemplate
public void testAsyncTimeout() throws IOException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand Down Expand Up @@ -503,19 +505,21 @@ public void testAsyncTimeout() throws IOException {
assertTrue(pcrc.failed());
LOG.info("Caught expected exception: " + pcrc.getFailed());
IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
assertTrue(ioe.getCause() instanceof CallTimeoutException);
assertThat(ioe.getCause(), instanceOf(CallTimeoutException.class));
}
// confirm that we got exception before the actual pause.
assertTrue(waitTime < ms);
assertThat(waitTime, lessThan((long) ms));
} finally {
// wait until all active calls quit, otherwise it may mess up the tracing spans
await().atMost(Duration.ofSeconds(2))
.untilAsserted(() -> assertEquals(0, rpcServer.getScheduler().getActiveRpcHandlerCount()));
rpcServer.stop();
}
}

private SpanData waitSpan(Matcher<SpanData> matcher) {
Waiter.waitFor(CONF, 1000,
new MatcherPredicate<>(() -> traceRule.getSpans(), hasItem(matcher)));
return traceRule.getSpans().stream().filter(matcher::matches).findFirst()
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(() -> OTEL_EXT.getSpans(), hasItem(matcher)));
return OTEL_EXT.getSpans().stream().filter(matcher::matches).findFirst()
.orElseThrow(AssertionError::new);
}

Expand Down Expand Up @@ -555,7 +559,7 @@ private void assertRemoteSpan() {
assertEquals(SpanKind.SERVER, data.getKind());
}

@Test
@TestTemplate
public void testTracingSuccessIpc() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand All @@ -576,17 +580,17 @@ public void testTracingSuccessIpc() throws IOException, ServiceException {
assertThat(pauseServerSpan,
buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
assertRemoteSpan();
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
assertThat(traceRule.getSpans(),
assertFalse(OTEL_EXT.getSpans().isEmpty(), "no spans provided");
assertThat(OTEL_EXT.getSpans(),
everyItem(allOf(hasStatusWithCode(StatusCode.OK),
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),
hasTraceId(OTEL_EXT.getSpans().iterator().next().getTraceId()),
hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
} finally {
rpcServer.stop();
}
}

@Test
@TestTemplate
public void testTracingErrorIpc() throws IOException {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer",
Expand All @@ -608,9 +612,9 @@ public void testTracingErrorIpc() throws IOException {
assertThat(errorServerSpan,
buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
assertRemoteSpan();
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
assertThat(traceRule.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR),
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
assertFalse(OTEL_EXT.getSpans().isEmpty(), "no spans provided");
assertThat(OTEL_EXT.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR),
hasTraceId(OTEL_EXT.getSpans().iterator().next().getTraceId()))));
} finally {
rpcServer.stop();
}
Expand All @@ -624,7 +628,7 @@ private IOException doBadPreableHeaderCall(BlockingInterface stub) {
return ProtobufUtil.handleRemoteException(se);
}

@Test
@TestTemplate
public void testBadPreambleHeader() throws Exception {
Configuration clientConf = new Configuration(CONF);
RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
Expand All @@ -643,7 +647,7 @@ public void testBadPreambleHeader() throws Exception {
}
Thread.sleep(100);
}
assertNotNull("Can not get expected BadAuthException", error);
assertNotNull(error, "Can not get expected BadAuthException");
assertThat(error.getMessage(), containsString("authName=unknown"));
} finally {
rpcServer.stop();
Expand All @@ -654,7 +658,7 @@ public void testBadPreambleHeader() throws Exception {
* Testcase for getting connection registry information through connection preamble header, see
* HBASE-25051 for more details.
*/
@Test
@TestTemplate
public void testGetConnectionRegistry() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
String clusterId = "test_cluster_id";
Expand Down Expand Up @@ -684,7 +688,7 @@ public void testGetConnectionRegistry() throws IOException, ServiceException {
* preamble header, i.e, a new client connecting to an old server. We simulate this by using a
* Server without implementing the ConnectionRegistryEndpoint interface.
*/
@Test
@TestTemplate
public void testGetConnectionRegistryError() throws IOException, ServiceException {
Configuration clientConf = new Configuration(CONF);
// do not need any services
Expand Down
Loading