Skip to content

Commit a3f44da

Browse files
Hexiaoqiaojojochuang
authored andcommitted
HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
1 parent 86e6aa8 commit a3f44da

3 files changed

Lines changed: 163 additions & 17 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.TimeUnit;
3838

3939
import com.google.common.annotations.VisibleForTesting;
40+
import org.apache.hadoop.hdfs.DFSUtilClient;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
4243
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -688,7 +689,7 @@ Result runOneIteration() {
688689
* execute a {@link Balancer} to work through all datanodes once.
689690
*/
690691
static private int doBalance(Collection<URI> namenodes,
691-
final BalancerParameters p, Configuration conf)
692+
Collection<String> nsIds, final BalancerParameters p, Configuration conf)
692693
throws IOException, InterruptedException {
693694
final long sleeptime =
694695
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@@ -707,13 +708,12 @@ static private int doBalance(Collection<URI> namenodes,
707708
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
708709

709710
List<NameNodeConnector> connectors = Collections.emptyList();
710-
try {
711-
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
712-
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
713-
p.getMaxIdleIteration());
714-
715-
boolean done = false;
716-
for(int iteration = 0; !done; iteration++) {
711+
boolean done = false;
712+
for(int iteration = 0; !done; iteration++) {
713+
try {
714+
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
715+
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
716+
p.getMaxIdleIteration());
717717
done = true;
718718
Collections.shuffle(connectors);
719719
for(NameNodeConnector nnc : connectors) {
@@ -741,19 +741,25 @@ static private int doBalance(Collection<URI> namenodes,
741741
if (!done) {
742742
Thread.sleep(sleeptime);
743743
}
744-
}
745-
} finally {
746-
for(NameNodeConnector nnc : connectors) {
747-
IOUtils.cleanupWithLogger(LOG, nnc);
744+
} finally {
745+
for(NameNodeConnector nnc : connectors) {
746+
IOUtils.cleanupWithLogger(LOG, nnc);
747+
}
748748
}
749749
}
750750
return ExitStatus.SUCCESS.getExitCode();
751751
}
752752

753753
static int run(Collection<URI> namenodes, final BalancerParameters p,
754-
Configuration conf) throws IOException, InterruptedException {
754+
Configuration conf) throws IOException, InterruptedException {
755+
return run(namenodes, null, p, conf);
756+
}
757+
758+
static int run(Collection<URI> namenodes, Collection<String> nsIds,
759+
final BalancerParameters p, Configuration conf)
760+
throws IOException, InterruptedException {
755761
if (!p.getRunAsService()) {
756-
return doBalance(namenodes, p, conf);
762+
return doBalance(namenodes, nsIds, p, conf);
757763
}
758764
if (!serviceRunning) {
759765
serviceRunning = true;
@@ -772,7 +778,7 @@ static int run(Collection<URI> namenodes, final BalancerParameters p,
772778

773779
while (serviceRunning) {
774780
try {
775-
int retCode = doBalance(namenodes, p, conf);
781+
int retCode = doBalance(namenodes, nsIds, p, conf);
776782
if (retCode < 0) {
777783
LOG.info("Balance failed, error code: " + retCode);
778784
failedTimesSinceLastSuccessfulBalance++;
@@ -856,7 +862,8 @@ public int run(String[] args) {
856862
checkReplicationPolicyCompatibility(conf);
857863

858864
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
859-
return Balancer.run(namenodes, parse(args), conf);
865+
final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
866+
return Balancer.run(namenodes, nsIds, parse(args), conf);
860867
} catch (IOException e) {
861868
System.out.println(e + ". Exiting ...");
862869
return ExitStatus.IO_EXCEPTION.getExitCode();

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,20 @@
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
2727
import java.util.Collection;
28+
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.concurrent.atomic.AtomicLong;
3233

3334
import com.google.common.base.Preconditions;
3435
import com.google.common.util.concurrent.RateLimiter;
36+
import org.apache.hadoop.ha.HAServiceProtocol;
3537
import org.apache.hadoop.hdfs.DFSConfigKeys;
38+
import org.apache.hadoop.hdfs.HAUtil;
39+
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
40+
import org.apache.hadoop.ipc.RPC;
41+
import org.apache.hadoop.security.UserGroupInformation;
3642
import org.slf4j.Logger;
3743
import org.slf4j.LoggerFactory;
3844
import org.apache.hadoop.classification.InterfaceAudience;
@@ -100,6 +106,32 @@ public static List<NameNodeConnector> newNameNodeConnectors(
100106
return connectors;
101107
}
102108

109+
public static List<NameNodeConnector> newNameNodeConnectors(
110+
Collection<URI> namenodes, Collection<String> nsIds, String name,
111+
Path idPath, Configuration conf, int maxIdleIterations)
112+
throws IOException {
113+
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
114+
namenodes.size());
115+
Map<URI, String> uriToNsId = new HashMap<>();
116+
if (nsIds != null) {
117+
for (URI uri : namenodes) {
118+
for (String nsId : nsIds) {
119+
if (uri.getAuthority().equals(nsId)) {
120+
uriToNsId.put(uri, nsId);
121+
}
122+
}
123+
}
124+
}
125+
for (URI uri : namenodes) {
126+
String nsId = uriToNsId.get(uri);
127+
NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
128+
null, conf, maxIdleIterations);
129+
nnc.getKeyManager().startBlockKeyUpdater();
130+
connectors.add(nnc);
131+
}
132+
return connectors;
133+
}
134+
103135
@VisibleForTesting
104136
public static void setWrite2IdFile(boolean write2IdFile) {
105137
NameNodeConnector.write2IdFile = write2IdFile;
@@ -114,6 +146,13 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
114146
private final String blockpoolID;
115147

116148
private final BalancerProtocols namenode;
149+
/**
150+
* If set balancerShouldRequestStandby true, Balancer will getBlocks from
151+
* Standby NameNode only and it can reduce the performance impact of Active
152+
* NameNode, especially in a busy HA mode cluster.
153+
*/
154+
private boolean balancerShouldRequestStandby;
155+
private NamenodeProtocol standbyNameNode;
117156
private final KeyManager keyManager;
118157
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
119158

@@ -149,6 +188,11 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
149188

150189
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
151190
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
191+
this.balancerShouldRequestStandby = conf.getBoolean(
192+
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
193+
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
194+
this.standbyNameNode = null;
195+
152196
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
153197

154198
final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -167,6 +211,31 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
167211
}
168212
}
169213

214+
public NameNodeConnector(String name, URI nameNodeUri, String nsId,
215+
Path idPath, List<Path> targetPaths,
216+
Configuration conf, int maxNotChangedIterations)
217+
throws IOException {
218+
this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
219+
if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
220+
List<ClientProtocol> namenodes =
221+
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
222+
for (ClientProtocol proxy : namenodes) {
223+
try {
224+
if (proxy.getHAServiceState().equals(
225+
HAServiceProtocol.HAServiceState.STANDBY)) {
226+
this.standbyNameNode = NameNodeProxies.createNonHAProxy(
227+
conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
228+
UserGroupInformation.getCurrentUser(), false).getProxy();
229+
break;
230+
}
231+
} catch (Exception e) {
232+
//Ignore the exception while connecting to a namenode.
233+
LOG.debug("Error while connecting to namenode", e);
234+
}
235+
}
236+
}
237+
}
238+
170239
public DistributedFileSystem getDistributedFileSystem() {
171240
return fs;
172241
}
@@ -186,6 +255,22 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
186255
if (getBlocksRateLimiter != null) {
187256
getBlocksRateLimiter.acquire();
188257
}
258+
boolean isRequestStandby = true;
259+
try {
260+
if (balancerShouldRequestStandby && standbyNameNode != null) {
261+
return standbyNameNode.getBlocks(datanode, size, minBlockSize);
262+
} else {
263+
isRequestStandby = false;
264+
}
265+
} catch (Exception e) {
266+
LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
267+
"will fallback to normal way", e);
268+
isRequestStandby = false;
269+
} finally {
270+
if (isRequestStandby) {
271+
LOG.info("Request #getBlocks to Standby NameNode success.");
272+
}
273+
}
189274
return namenode.getBlocks(datanode, size, minBlockSize);
190275
}
191276

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.balancer;
1919

20+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
21+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
22+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
2023
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertTrue;
2125
import static org.mockito.ArgumentMatchers.any;
2226
import static org.mockito.ArgumentMatchers.anyLong;
2327
import static org.mockito.Mockito.times;
@@ -31,6 +35,7 @@
3135
import org.apache.hadoop.conf.Configuration;
3236
import org.apache.hadoop.fs.FileSystem;
3337
import org.apache.hadoop.hdfs.DFSUtil;
38+
import org.apache.hadoop.hdfs.DFSUtilClient;
3439
import org.apache.hadoop.hdfs.DistributedFileSystem;
3540
import org.apache.hadoop.hdfs.HdfsConfiguration;
3641
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -44,7 +49,9 @@
4449
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
4550
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
4651
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
52+
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
4753
import org.junit.Test;
54+
import org.slf4j.LoggerFactory;
4855

4956
/**
5057
* Test balancer with HA NameNodes
@@ -106,6 +113,12 @@ void doTest(Configuration conf) throws Exception {
106113
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
107114
/ numOfDatanodes, (short) numOfDatanodes, 0);
108115

116+
boolean isRequestStandby = conf.getBoolean(
117+
DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
118+
if (isRequestStandby) {
119+
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
120+
cluster.getNameNode(1));
121+
}
109122
// start up an empty node with the same capacity and on the same rack
110123
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
111124
String newNodeRack = TestBalancer.RACK2; // new node's rack
@@ -115,13 +128,54 @@ void doTest(Configuration conf) throws Exception {
115128
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
116129
cluster);
117130
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
131+
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
118132
assertEquals(1, namenodes.size());
119-
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
133+
final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
134+
conf);
120135
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
121136
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
122137
cluster, BalancerParameters.DEFAULT);
123138
}
124139

140+
/**
141+
* Test Balancer request Standby NameNode when enable this feature.
142+
*/
143+
@Test(timeout = 60000)
144+
public void testBalancerRequestSBNWithHA() throws Exception {
145+
Configuration conf = new HdfsConfiguration();
146+
conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
147+
conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
148+
//conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
149+
TestBalancer.initConf(conf);
150+
assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
151+
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
152+
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
153+
Configuration copiedConf = new Configuration(conf);
154+
cluster = new MiniDFSCluster.Builder(copiedConf)
155+
.nnTopology(MiniDFSNNTopology.simpleHATopology())
156+
.numDataNodes(TEST_CAPACITIES.length)
157+
.racks(TEST_RACKS)
158+
.simulatedCapacities(TEST_CAPACITIES)
159+
.build();
160+
// Try capture NameNodeConnector log.
161+
LogCapturer log =LogCapturer.captureLogs(
162+
LoggerFactory.getLogger(NameNodeConnector.class));
163+
HATestUtil.setFailoverConfigurations(cluster, conf);
164+
try {
165+
cluster.waitActive();
166+
cluster.transitionToActive(0);
167+
Thread.sleep(500);
168+
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
169+
ClientProtocol.class).getProxy();
170+
doTest(conf);
171+
// Check getBlocks request to Standby NameNode.
172+
assertTrue(log.getOutput().contains(
173+
"Request #getBlocks to Standby NameNode success."));
174+
} finally {
175+
cluster.shutdown();
176+
}
177+
}
178+
125179
/**
126180
* Test Balancer with ObserverNodes.
127181
*/

0 commit comments

Comments
 (0)