Skip to content

Commit e63c710

Browse files
committed
HDFS-17514: RBF: Routers should unset cached stateID when namenode does not set stateID in RPC response header. (apache#6804)
(Cherry-picked from 6a4f0be)
1 parent e3d76d0 commit e63c710

3 files changed

Lines changed: 97 additions & 5 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder h
6464
*/
6565
@Override
6666
public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {
67-
sharedGlobalStateId.accumulate(header.getStateId());
67+
if (header.getStateId() == 0 && sharedGlobalStateId.get() > 0) {
68+
sharedGlobalStateId.reset();
69+
poolLocalStateId.reset();
70+
} else {
71+
sharedGlobalStateId.accumulate(header.getStateId());
72+
}
6873
}
6974

7075
/**

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,7 @@ public void teardown() throws IOException {
9797
public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception {
9898
int numberOfNamenode = 2 + numberOfObserver;
9999
Configuration conf = new Configuration(false);
100-
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
101-
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
102-
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
103-
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
100+
setConfDefaults(conf);
104101
if (confOverrides != null) {
105102
confOverrides
106103
.iterator()
@@ -143,6 +140,13 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th
143140
routerContext = cluster.getRandomRouter();
144141
}
145142

143+
private void setConfDefaults(Configuration conf) {
144+
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
145+
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
146+
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
147+
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
148+
}
149+
146150
public enum ConfigSetting {
147151
USE_NAMENODE_PROXY_FLAG,
148152
USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER
@@ -912,4 +916,55 @@ public void testMsyncWithNoNamespacesEligibleForCRS(ConfigSetting configSetting)
912916
// There should no calls to any namespace.
913917
assertEquals("No calls to any namespace", 0, rpcCountForActive);
914918
}
919+
920+
@EnumSource(ConfigSetting.class)
921+
@ParameterizedTest
922+
public void testRestartingNamenodeWithStateIDContextDisabled(ConfigSetting configSetting)
923+
throws Exception {
924+
fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting));
925+
Path path = new Path("/testFile1");
926+
// Send Create call to active
927+
fileSystem.create(path).close();
928+
929+
// Send read request
930+
fileSystem.open(path).close();
931+
932+
long observerCount1 = routerContext.getRouter().getRpcServer()
933+
.getRPCMetrics().getObserverProxyOps();
934+
935+
// Restart active namenodes and disable sending state id.
936+
restartActiveWithStateIDContextDisabled();
937+
938+
Configuration conf = getConfToEnableObserverReads(configSetting);
939+
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
940+
FileSystem fileSystem2 = routerContext.getFileSystem(conf);
941+
fileSystem2.msync();
942+
fileSystem2.open(path).close();
943+
944+
long observerCount2 = routerContext.getRouter().getRpcServer()
945+
.getRPCMetrics().getObserverProxyOps();
946+
assertEquals("There should no extra calls to the observer", observerCount1, observerCount2);
947+
948+
fileSystem.open(path).close();
949+
long observerCount3 = routerContext.getRouter().getRpcServer()
950+
.getRPCMetrics().getObserverProxyOps();
951+
assertTrue("Old filesystem will send calls to observer", observerCount3 > observerCount2);
952+
}
953+
954+
void restartActiveWithStateIDContextDisabled() throws Exception {
955+
for (int nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) {
956+
NameNode nameNode = cluster.getCluster().getNameNode(nnIndex);
957+
if (nameNode != null && nameNode.isActiveState()) {
958+
Configuration conf = new Configuration();
959+
setConfDefaults(conf);
960+
cluster.getCluster().getConfiguration(nnIndex)
961+
.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, false);
962+
cluster.getCluster().restartNameNode(nnIndex, true);
963+
cluster.getCluster().getNameNode(nnIndex).isActiveState();
964+
}
965+
}
966+
for (String ns : cluster.getNameservices()) {
967+
cluster.switchToActive(ns, NAMENODES[0]);
968+
}
969+
}
915970
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestPoolAlignmentContext.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdfs.server.federation.router;
1919

2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
2122
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
2223
import org.junit.jupiter.api.Assertions;
2324
import org.junit.jupiter.api.Test;
@@ -50,4 +51,35 @@ private void assertRequestHeaderStateId(PoolAlignmentContext poolAlignmentContex
5051
poolAlignmentContext.updateRequestState(builder);
5152
Assertions.assertEquals(expectedValue, builder.getStateId());
5253
}
54+
55+
@Test
56+
public void testWhenNamenodeStopsSendingStateId() {
57+
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
58+
String namespaceId = "namespace1";
59+
PoolAlignmentContext poolContext = new PoolAlignmentContext(routerStateIdContext, namespaceId);
60+
61+
poolContext.receiveResponseState(getRpcResponseHeader(10L));
62+
// Last seen value is the one from namenode,
63+
// but request header is the max seen by clients so far.
64+
Assertions.assertEquals(10L, poolContext.getLastSeenStateId());
65+
assertRequestHeaderStateId(poolContext, Long.MIN_VALUE);
66+
67+
poolContext.advanceClientStateId(10L);
68+
assertRequestHeaderStateId(poolContext, 10L);
69+
70+
// When namenode state context is disabled, it returns a stateId of zero
71+
poolContext.receiveResponseState(getRpcResponseHeader(0));
72+
// Routers should reset the cached state Id to not send a stale value to the observer.
73+
Assertions.assertEquals(Long.MIN_VALUE, poolContext.getLastSeenStateId());
74+
assertRequestHeaderStateId(poolContext, Long.MIN_VALUE);
75+
}
76+
77+
private RpcResponseHeaderProto getRpcResponseHeader(long stateID) {
78+
return RpcResponseHeaderProto
79+
.newBuilder()
80+
.setCallId(1)
81+
.setStatus(RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
82+
.setStateId(stateID)
83+
.build();
84+
}
5385
}

0 commit comments

Comments
 (0)