Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void handle(ResponseMessage message) throws Exception {
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
if (listener == null && resp.requestId != RpcFailure.EMPTY_REQUEST_ID) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

/** Response to {@link RpcRequest} for a failed RPC. */
public final class RpcFailure extends AbstractMessage implements ResponseMessage {
public static final long EMPTY_REQUEST_ID = Long.MIN_VALUE;

public final long requestId;
public final String errorString;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.server;

import java.io.InvalidClassException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;

Expand Down Expand Up @@ -206,6 +207,11 @@ public void onFailure(Throwable e) {
private void processOneWayMessage(OneWayMessage req) {
try {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (InvalidClassException ice) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that this won't catch all or even most errors resulting from version incompatibility. Spark has never supported or contemplated mis-matching versions internally. I don't think we should try to handle this, because it's fundamentally piecemeal and hacky.

Copy link
Contributor Author

@jkremser jkremser Nov 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should have picked different name for the PR than Handling spark-submit and master version mismatch. It doesn't try to solve the issue in a complex way that two different version could be able to talk to each other, all it does is saying the user. Hey, you have probably different version than spark master. I agree, it's little bit hacky, on the other hand I see no other option than to catch the InvalidClassException, if the version is not part of the message. Perhaps some initial handshake in which the version is sent would be cleaner.

What about re-throwing the exception. This way it wouldn't change the semantics of the code, but the client would be informed. wdyt?

final String msg = "There is probably a version mismatch between client and server: ";
respond(new RpcFailure(RpcFailure.EMPTY_REQUEST_ID, msg
+ Throwables.getStackTraceAsString(ice)));
logger.error(msg, ice);
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network;

import java.io.InvalidClassException;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -27,6 +28,8 @@
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -100,6 +103,41 @@ public void handleFetchRequestAndStreamRequest() throws Exception {
assert responseAndPromisePairs.size() == 3;
}

@Test
public void handleOneWayMessageWithWrongSerialVersionUID() throws Exception {
RpcHandler rpcHandler = new NoOpRpcHandler();
Channel channel = mock(Channel.class);
List<Pair<Object, ExtendedChannelPromise>> responseAndPromisePairs =
new ArrayList<>();

when(channel.writeAndFlush(any()))
.thenAnswer(invocationOnMock -> {
Object response = invocationOnMock.getArguments()[0];
ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture));
return channelFuture;
});

TransportClient reverseClient = mock(TransportClient.class);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
rpcHandler, 2L);

// req.body().nioByteBuffer() is the method that throws the InvalidClassException
// with wrong svUID, so let's mock it
ManagedBuffer body = mock(ManagedBuffer.class);
when(body.nioByteBuffer()).thenThrow(new InvalidClassException("test - wrong version"));
RequestMessage msg = new OneWayMessage(body);

requestHandler.handle(msg);

assertEquals(responseAndPromisePairs.size(), 1);
assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof RpcFailure);
assertEquals(((RpcFailure) responseAndPromisePairs.get(0).getLeft()).requestId,
RpcFailure.EMPTY_REQUEST_ID);

responseAndPromisePairs.get(0).getRight().finish(true);
}

private class ExtendedChannelPromise extends DefaultChannelPromise {

private List<GenericFutureListener<Future<Void>>> listeners = new ArrayList<>();
Expand Down