From 6991297a2ec66dfee4fe3bc1b5ae049053d30c38 Mon Sep 17 00:00:00 2001 From: Jirka Kremser Date: Thu, 23 Nov 2017 17:49:55 +0100 Subject: [PATCH 1/2] [SPARK-22594][CORE] Handling spark-submit and master version mismatch --- .../spark/network/client/TransportResponseHandler.java | 2 +- .../java/org/apache/spark/network/protocol/RpcFailure.java | 1 + .../spark/network/server/TransportRequestHandler.java | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 7a3d96ceaef0..a5c5d0317eef 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -199,7 +199,7 @@ public void handle(ResponseMessage message) throws Exception { } else if (message instanceof RpcFailure) { RpcFailure resp = (RpcFailure) message; RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); - if (listener == null) { + if (listener == null && resp.requestId != RpcFailure.EMPTY_REQUEST_ID) { logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding", resp.requestId, getRemoteAddress(channel), resp.errorString); } else { diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java index a76624ef5dc9..4dd645e97f1b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java @@ -24,6 +24,7 @@ public final class RpcFailure extends AbstractMessage implements ResponseMessage { public final long requestId; public final String errorString; + public static final long EMPTY_REQUEST_ID = Long.MIN_VALUE; public RpcFailure(long requestId, String errorString) { this.requestId = requestId; diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index e94453578e6b..b9ac89c5a14d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -17,6 +17,7 @@ package org.apache.spark.network.server; +import java.io.InvalidClassException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -206,6 +207,11 @@ public void onFailure(Throwable e) { private void processOneWayMessage(OneWayMessage req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer()); + } catch (InvalidClassException ice) { + final String msg = "There is probably a version mismatch between client and server: "; + respond(new RpcFailure(RpcFailure.EMPTY_REQUEST_ID, msg + + Throwables.getStackTraceAsString(ice))); + logger.error(msg, ice); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() for one-way message.", e); } finally { From 4f79632d22b67128a6be8a285f4fc1fec0d5f12f Mon Sep 17 00:00:00 2001 From: Jirka Kremser Date: Thu, 21 Dec 2017 16:39:40 +0100 Subject: [PATCH 2/2] [SPARK-22594][CORE] Adding tests for RPC communication when called from different versions --- .../spark/network/protocol/RpcFailure.java | 3 +- .../network/TransportRequestHandlerSuite.java | 38 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java index 4dd645e97f1b..65d913e75f5e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java @@ -22,9 +22,10 @@ /** Response to {@link RpcRequest} for a failed RPC. */ public final class RpcFailure extends AbstractMessage implements ResponseMessage { + public static final long EMPTY_REQUEST_ID = Long.MIN_VALUE; + public final long requestId; public final String errorString; - public static final long EMPTY_REQUEST_ID = Long.MIN_VALUE; public RpcFailure(long requestId, String errorString) { this.requestId = requestId; diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java index 2656cbee95a2..14cea9e6083c 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.network; +import java.io.InvalidClassException; import java.util.ArrayList; import java.util.List; @@ -27,6 +28,8 @@ import io.netty.util.concurrent.GenericFutureListener; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -100,6 +103,41 @@ public void handleFetchRequestAndStreamRequest() throws Exception { assert responseAndPromisePairs.size() == 3; } + @Test + public void handleOneWayMessageWithWrongSerialVersionUID() throws Exception { + RpcHandler rpcHandler = new NoOpRpcHandler(); + Channel channel = mock(Channel.class); + List> responseAndPromisePairs = + new ArrayList<>(); + + when(channel.writeAndFlush(any())) + .thenAnswer(invocationOnMock -> { + Object response = invocationOnMock.getArguments()[0]; + ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel); + responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture)); + return channelFuture; + }); + + TransportClient reverseClient = mock(TransportClient.class); + TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient, + rpcHandler, 2L); + + // req.body().nioByteBuffer() is the method that throws the InvalidClassException + // with wrong svUID, so let's mock it + ManagedBuffer body = mock(ManagedBuffer.class); + when(body.nioByteBuffer()).thenThrow(new InvalidClassException("test - wrong version")); + RequestMessage msg = new OneWayMessage(body); + + requestHandler.handle(msg); + + assertEquals(responseAndPromisePairs.size(), 1); + assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof RpcFailure); + assertEquals(((RpcFailure) responseAndPromisePairs.get(0).getLeft()).requestId, + RpcFailure.EMPTY_REQUEST_ID); + + responseAndPromisePairs.get(0).getRight().finish(true); + } + private class ExtendedChannelPromise extends DefaultChannelPromise { private List>> listeners = new ArrayList<>();