Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface AlignmentContext {
void updateResponseState(RpcResponseHeaderProto.Builder header);

/**
* This is the intended client method call to implement to recieve state info
* This is the intended client method call to implement to receive state info
* during RPC response processing.
*
* @param header The RPC response header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ public static class Call implements Schedulable,
private volatile String detailedMetricsName = "";
final int callId; // the client's call id
final int retryCount; // the retry count of the call
long timestampNanos; // time the call was received
private final long timestampNanos; // time the call was received
long responseTimestampNanos; // time the call was served
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
Expand Down Expand Up @@ -1107,6 +1107,10 @@ public void setDeferredResponse(Writable response) {

public void setDeferredError(Throwable t) {
}

public long getTimestampNanos() {
return timestampNanos;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1188,7 +1192,7 @@ public Void run() throws Exception {

try {
value = call(
rpcKind, connection.protocolName, rpcRequest, timestampNanos);
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
populateResponseParamsOnError(e, responseParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
optional int64 stateId = 8; // The last seen Global State ID
// Alignment context info for use with routers.
// The client should not interpret these bytes, but only forward bytes
// received from RpcResponseHeaderProto.routerFederatedState.
optional bytes routerFederatedState = 9;
}


Expand Down Expand Up @@ -157,6 +161,10 @@ message RpcResponseHeaderProto {
optional bytes clientId = 7; // Globally unique client ID
optional sint32 retryCount = 8 [default = -1];
optional int64 stateId = 9; // The last written Global State ID
// Alignment context info for use with routers.
// The client should not interpret these bytes, but only
// forward them to the router using RpcRequestHeaderProto.routerFederatedState.
optional bytes routerFederatedState = 10;
}

message RpcSaslProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,16 @@ message GetDisabledNameservicesRequestProto {
message GetDisabledNameservicesResponseProto {
repeated string nameServiceIds = 1;
}

/////////////////////////////////////////////////
// Alignment state for namespaces.
/////////////////////////////////////////////////

/**
* Clients should receive this message in RPC responses and forward it
* in RPC requests without interpreting it. It should be encoded
* as an obscure byte array when being sent to clients.
*/
message RouterFederatedStateProto {
map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto;
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.util.ProtoUtil;
import org.junit.Test;

import static org.junit.Assert.*;


public class TestRouterFederatedState {

@Test
public void testRpcRouterFederatedState() throws InvalidProtocolBufferException {
byte[] uuid = ClientId.getClientId();
Map<String, Long> expectedStateIds = new HashMap<String, Long>() {{
put("namespace1", 11L );
put("namespace2", 22L);
}};

AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds);

RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0,
RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext);

Map<String, Long> stateIdsFromHeader =
RouterFederatedStateProto.parseFrom(
header.getRouterFederatedState().toByteArray()
).getNamespaceStateIdsMap();

assertEquals(expectedStateIds, stateIdsFromHeader);
}

private static class AlignmentContextWithRouterState implements AlignmentContext {

Map<String, Long> routerFederatedState;

public AlignmentContextWithRouterState(Map<String, Long> namespaceStates) {
this.routerFederatedState = namespaceStates;
}

@Override
public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) {
RouterFederatedStateProto fedState = RouterFederatedStateProto
.newBuilder()
.putAllNamespaceStateIds(routerFederatedState)
.build();

header.setRouterFederatedState(fedState.toByteString());
}

@Override
public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) {}

@Override
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {}

@Override
public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException {
return 0;
}

@Override
public long getLastSeenStateId() {
return 0;
}

@Override
public boolean isCoordinatedCall(String protocolName, String method) {
return false;
}
}
}