From 85596945ba77f639c3248f055cf7e8a0f7228ade Mon Sep 17 00:00:00 2001 From: "zengqiang.xu" Date: Wed, 15 Jun 2022 11:20:40 +0800 Subject: [PATCH] HDFS-13522. IPC changes to support observer reads through routers. --- .../apache/hadoop/ipc/AlignmentContext.java | 2 +- .../federation/router/ConnectionManager.java | 31 +++++++++- .../federation/router/ConnectionPool.java | 21 ++++--- .../federation/router/RBFConfigKeys.java | 4 ++ .../federation/router/RouterGSIContext.java | 56 +++++++++++++++++++ .../federation/router/RouterRpcClient.java | 2 +- .../src/main/resources/hdfs-rbf-default.xml | 7 +++ .../federation/FederationTestUtils.java | 3 +- .../router/TestConnectionManager.java | 20 +++---- 9 files changed, 124 insertions(+), 22 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterGSIContext.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 3d309235fe891..8d43fd74a843c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -46,7 +46,7 @@ public interface AlignmentContext { void updateResponseState(RpcResponseHeaderProto.Builder header); /** - * This is the intended client method call to implement to recieve state info + * This is the intended client method call to implement to receive state info * during RPC response processing. * * @param header The RPC response header. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 5fe797bf5ce2c..9dc4840d67b0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -73,6 +73,8 @@ public class ConnectionManager { /** Queue for creating new connections. */ private final BlockingQueue creatorQueue; + private final Map alignmentContexts; + private volatile boolean enableObserverRead; /** Max size of queue for creating new connections. */ private final int creatorQueueMaxSize; @@ -125,6 +127,12 @@ public ConnectionManager(Configuration config) { RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT); LOG.info("Cleaning connections every {} seconds", TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)); + + this.alignmentContexts = new HashMap<>(); + + this.enableObserverRead = this.conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_ENABLE_OBSERVER_READ_KEY, + RBFConfigKeys.DFS_ROUTER_ENABLE_OBSERVER_READ_DEFAULT); } /** @@ -172,11 +180,13 @@ public void close() { * @param ugi User group information. * @param nnAddress Namenode address for the connection. * @param protocol Protocol for the connection. + * @param nsId Nameservice Identify. * @return Proxy client to connect to nnId as UGI. * @throws IOException If the connection cannot be obtained. */ public ConnectionContext getConnection(UserGroupInformation ugi, - String nnAddress, Class protocol) throws IOException { + String nnAddress, Class protocol, String nsId) + throws IOException { // Check if the manager is shutdown if (!this.running) { @@ -203,9 +213,14 @@ public ConnectionContext getConnection(UserGroupInformation ugi, try { pool = this.pools.get(connectionId); if (pool == null) { + RouterGSIContext gsiContext = this.alignmentContexts.get(nsId); + if (gsiContext == null) { + gsiContext = new RouterGSIContext(this.enableObserverRead); + this.alignmentContexts.put(nsId, gsiContext); + } pool = new ConnectionPool( this.conf, nnAddress, ugi, this.minSize, this.maxSize, - this.minActiveRatio, protocol); + this.minActiveRatio, protocol, gsiContext); this.pools.put(connectionId, pool); } } finally { @@ -231,6 +246,18 @@ public ConnectionContext getConnection(UserGroupInformation ugi, return conn; } + /** + * Dynamically reconfigure the enableObserverRead. + */ + public void reconfEnableObserverRead(boolean enableObserverRead) { + readLock.lock(); + this.enableObserverRead = enableObserverRead; + for (RouterGSIContext routerGSIContext : alignmentContexts.values()) { + routerGSIContext.setEnableObserverRead(enableObserverRead); + } + readLock.unlock(); + } + /** * Get the number of connection pools. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 293a4b64d2031..825495b7de7b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -47,6 +47,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryUtils; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -105,6 +106,8 @@ public class ConnectionPool { /** The last time a connection was active. */ private volatile long lastActiveTime = 0; + private final AlignmentContext alignmentContext; + /** Map for the protocols and their protobuf implementations. */ private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); static { @@ -134,7 +137,8 @@ private static class ProtoImpl { protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, - float minActiveRatio, Class proto) throws IOException { + float minActiveRatio, Class proto, + AlignmentContext alignmentContext) throws IOException { this.conf = config; @@ -150,6 +154,8 @@ protected ConnectionPool(Configuration config, String address, this.maxSize = maxPoolSize; this.minActiveRatio = minActiveRatio; + this.alignmentContext = alignmentContext; + // Add minimum connections to the pool for (int i=0; i ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi, Class proto) - throws IOException { + String nnAddress, UserGroupInformation ugi, Class proto, + AlignmentContext alignmentContext) throws IOException { if (!PROTO_MAP.containsKey(proto)) { String msg = "Unsupported protocol for connection to NameNode: " + ((proto != null) ? proto.getName() : "null"); @@ -438,14 +445,14 @@ protected static ConnectionContext newConnection(Configuration conf, InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); final long version = RPC.getProtocolVersion(classes.protoPb); Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, - conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null, + alignmentContext).getProxy(); T client = newProtoClient(proto, classes, proxy); Text dtService = SecurityUtil.buildTokenService(socket); ProxyAndInfo clientProxy = new ProxyAndInfo(client, dtService, socket); - ConnectionContext connection = new ConnectionContext(clientProxy); - return connection; + return new ConnectionContext(clientProxy); } private static T newProtoClient(Class proto, ProtoImpl classes, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index c0a9e3f294cd8..68e3e878a8dc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -136,6 +136,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(10); + public static final String DFS_ROUTER_ENABLE_OBSERVER_READ_KEY = + FEDERATION_ROUTER_PREFIX + "enable.observer.read"; + public static final boolean DFS_ROUTER_ENABLE_OBSERVER_READ_DEFAULT = false; + // HDFS Router RPC client public static final String DFS_ROUTER_CLIENT_THREADS_SIZE = FEDERATION_ROUTER_PREFIX + "client.thread-size"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterGSIContext.java new file mode 100644 index 0000000000000..3cda99406c6b0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterGSIContext.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ClientGSIContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; + +/** + * Global State ID context for the router. + *

+ * This is the router side implementation responsible for receiving + * state alignment info from server(s). + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RouterGSIContext extends ClientGSIContext { + private volatile boolean enableObserverRead = false; + + public RouterGSIContext(boolean enableObserverRead) { + super(); + setEnableObserverRead(enableObserverRead); + } + + public void setEnableObserverRead(boolean enableObserverRead) { + this.enableObserverRead = enableObserverRead; + } + + + /** + * Router side implementation for providing state alignment info in requests. + */ + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + if (enableObserverRead) { + super.updateRequestState(header); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index ff90854ebb7ec..5780c31f12ea7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -368,7 +368,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, ugi.getUserName(), routerUser); } connection = this.connectionManager.getConnection( - connUGI, rpcAddress, proto); + connUGI, rpcAddress, proto, nsId); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index fcf6a28475fbd..3621deea448ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -151,6 +151,13 @@ + + dfs.federation.router.enable.observer.read + false + + Enable observer read for client with router. + + dfs.federation.router.dn-report.time-out diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index e758eee4fda7c..6cc15699a3549 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -398,7 +398,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { throw new IOException("Simulate connectionManager throw IOException"); } }).when(spyConnectionManager).getConnection( - any(UserGroupInformation.class), any(String.class), any(Class.class)); + any(UserGroupInformation.class), any(String.class), any(Class.class), + any(String.class)); Whitebox.setInternalState(rpcClient, "connectionManager", spyConnectionManager); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index e397692e9a86d..0bae068dd7343 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -81,14 +81,14 @@ public void testCleanup() throws Exception { Map poolMap = connManager.getPools(); ConnectionPool pool1 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool1, 9, 4); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool1); ConnectionPool pool2 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool2, 10, 10); poolMap.put( new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), @@ -111,7 +111,7 @@ public void testCleanup() throws Exception { // Make sure the number of connections doesn't go below minSize ConnectionPool pool3 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool3, 8, 0); poolMap.put( new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), @@ -136,7 +136,7 @@ public void testConnectionCreatorWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); BlockingQueue queue = new ArrayBlockingQueue<>(1); queue.add(badPool); ConnectionManager.ConnectionCreator connectionCreator = @@ -162,7 +162,7 @@ public void testGetConnectionWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); } @Test @@ -172,7 +172,7 @@ public void testGetConnection() throws Exception { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), @@ -197,7 +197,7 @@ public void testGetConnection() throws Exception { @Test public void testValidClientIndex() throws Exception { ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class, null); for(int i = -3; i <= 3; i++) { pool.getClientIndex().set(i); ConnectionContext conn = pool.getConnection(); @@ -213,7 +213,7 @@ public void getGetConnectionNamenodeProtocol() throws Exception { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId( @@ -286,7 +286,7 @@ private void testConnectionCleanup(float ratio, int totalConns, // Create one new connection pool tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, - NamenodeProtocol.class); + NamenodeProtocol.class, "mockNS"); Map poolMap = tmpConnManager.getPools(); ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, @@ -317,6 +317,6 @@ public void testUnsupportedProtoExceptionMsg() throws Exception { "Unsupported protocol for connection to NameNode: " + TestConnectionManager.class.getName(), () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1, - TestConnectionManager.class)); + TestConnectionManager.class, null)); } }