diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e3c56b4df8f8..969c3e953134 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -168,6 +168,7 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure; @@ -260,6 +261,7 @@ import org.apache.hadoop.hbase.util.JsonMapper; import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.TableDescriptorChecker; @@ -489,6 +491,15 @@ public class HMaster extends HBaseServerBase implements Maste public static final String WARMUP_BEFORE_MOVE = "hbase.master.warmup.before.move"; private static final boolean DEFAULT_WARMUP_BEFORE_MOVE = true; + /** + * Use RSProcedureDispatcher instance to initiate master -> rs remote procedure execution. Use + * this config to extend RSProcedureDispatcher (mainly for testing purpose). + */ + public static final String HBASE_MASTER_RSPROC_DISPATCHER_CLASS = + "hbase.master.rsproc.dispatcher.class"; + private static final String DEFAULT_HBASE_MASTER_RSPROC_DISPATCHER_CLASS = + RSProcedureDispatcher.class.getName(); + private TaskGroup startupTaskGroup; /** @@ -1833,7 +1844,11 @@ protected void stopServiceThreads() { } private void createProcedureExecutor() throws IOException { - MasterProcedureEnv procEnv = new MasterProcedureEnv(this); + final String procedureDispatcherClassName = + conf.get(HBASE_MASTER_RSPROC_DISPATCHER_CLASS, DEFAULT_HBASE_MASTER_RSPROC_DISPATCHER_CLASS); + final RSProcedureDispatcher procedureDispatcher = ReflectionUtils.instantiateWithCustomCtor( + procedureDispatcherClassName, new Class[] { MasterServices.class }, new Object[] { this }); + final MasterProcedureEnv procEnv = new MasterProcedureEnv(this, procedureDispatcher); procedureStore = new RegionProcedureStore(this, masterRegion, new MasterProcedureEnv.FsUtilsLeaseRecovery(this)); procedureStore.registerListener(new ProcedureStoreListener() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 218d3096d8df..3ef7fcdd6928 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -75,10 +75,6 @@ private boolean isRunning() { private final MasterProcedureScheduler procSched; private final MasterServices master; - public MasterProcedureEnv(final MasterServices master) { - this(master, new RSProcedureDispatcher(master)); - } - public MasterProcedureEnv(final MasterServices master, final RSProcedureDispatcher remoteDispatcher) { this.master = master; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index ef025757c58e..9841af4fdf33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hadoop.hbase.ipc.RpcConnectionConstants; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterServices; @@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; -import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; @@ -249,6 +249,22 @@ protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, "hbase.regionserver.rpc.retry.interval"; private static final int DEFAULT_RS_RPC_RETRY_INTERVAL = 100; + /** + * Config to determine the retry limit while executing remote regionserver procedure. This retry + * limit applies to only specific errors. These errors could potentially get the remote + * procedure stuck for several minutes unless the retry limit is applied. + */ + private static final String RS_REMOTE_PROC_FAIL_FAST_LIMIT = + "hbase.master.rs.remote.proc.fail.fast.limit"; + /** + * The default retry limit. Waiting for more than {@value} attempts is not going to help much + * for genuine connectivity errors. Therefore, consider fail-fast after {@value} retries. Value + * = {@value} + */ + private static final int DEFAULT_RS_REMOTE_PROC_RETRY_LIMIT = 5; + + private final int failFastRetryLimit; + private ExecuteProceduresRequest.Builder request = null; public ExecuteProceduresRemoteCall(final ServerName serverName, @@ -257,6 +273,8 @@ public ExecuteProceduresRemoteCall(final ServerName serverName, this.remoteProcedures = remoteProcedures; this.rsRpcRetryInterval = master.getConfiguration().getLong(RS_RPC_RETRY_INTERVAL_CONF_KEY, DEFAULT_RS_RPC_RETRY_INTERVAL); + this.failFastRetryLimit = master.getConfiguration().getInt(RS_REMOTE_PROC_FAIL_FAST_LIMIT, + DEFAULT_RS_REMOTE_PROC_RETRY_LIMIT); } private AsyncRegionServerAdmin getRsAdmin() throws IOException { @@ -300,13 +318,28 @@ private boolean scheduleForRetry(IOException e) { if (numberOfAttemptsSoFar == 0 && unableToConnectToServer(e)) { return false; } + + // Check if the num of attempts have crossed the retry limit, and if the error type can + // fail-fast. + if (numberOfAttemptsSoFar >= failFastRetryLimit - 1 && isErrorTypeFailFast(e)) { + LOG + .warn("Number of retries {} exceeded limit {} for the given error type. Scheduling server" + + " crash for {}", numberOfAttemptsSoFar + 1, failFastRetryLimit, serverName, e); + // Expiring the server will schedule SCP and also reject the regionserver report from the + // regionserver if regionserver is somehow able to send the regionserver report to master. + // The master rejects the report by throwing YouAreDeadException, which would eventually + // result in the regionserver abort. + // This will also remove "serverName" from the ServerManager's onlineServers map. + master.getServerManager().expireServer(serverName); + return false; + } // Always retry for other exception types if the region server is not dead yet. if (!master.getServerManager().isServerOnline(serverName)) { LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up", serverName, e.toString(), numberOfAttemptsSoFar); return false; } - if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) { + if (e instanceof RegionServerStoppedException) { // A better way is to return true here to let the upper layer quit, and then schedule a // background task to check whether the region server is dead. And if it is dead, call // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect @@ -324,7 +357,8 @@ private boolean scheduleForRetry(IOException e) { // retry^2 on each try // up to max of 10 seconds (don't want to back off too much in case of situation change). submitTask(this, - Math.min(rsRpcRetryInterval * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar), + Math.min( + rsRpcRetryInterval * ((long) this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar), 10 * 1000), TimeUnit.MILLISECONDS); return true; @@ -371,6 +405,39 @@ private boolean isSaslError(IOException e) { } } + /** + * Returns true if the error or its cause is of type ConnectionClosedException. + * @param e IOException thrown by the underlying rpc framework. + * @return True if the error or its cause is of type ConnectionClosedException. + */ + private boolean isConnectionClosedError(IOException e) { + if (e instanceof ConnectionClosedException) { + return true; + } + Throwable cause = e; + while (true) { + if (cause instanceof IOException) { + IOException unwrappedCause = unwrapException((IOException) cause); + if (unwrappedCause instanceof ConnectionClosedException) { + return true; + } + } + cause = cause.getCause(); + if (cause == null) { + return false; + } + } + } + + /** + * Returns true if the error type can allow fail-fast. + * @param e IOException thrown by the underlying rpc framework. + * @return True if the error type can allow fail-fast. + */ + private boolean isErrorTypeFailFast(IOException e) { + return e instanceof CallQueueTooBigException || isSaslError(e) || isConnectionClosedError(e); + } + private long getMaxWaitTime() { if (this.maxWaitTime < 0) { // This is the max attempts, not retries, so it should be at least 1. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index c6170bd0c604..5f4555342726 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -146,6 +146,7 @@ protected void setupConfiguration(Configuration conf) throws Exception { // make retry for TRSP more frequent conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10); conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100); + conf.setInt("hbase.master.rs.remote.proc.fail.fast.limit", Integer.MAX_VALUE); } @Before diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RSProcDispatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RSProcDispatcher.java new file mode 100644 index 000000000000..43f6b0aceeaf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RSProcDispatcher.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + +/** + * Test implementation of RSProcedureDispatcher that throws desired errors for testing purpose. + */ +public class RSProcDispatcher extends RSProcedureDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(RSProcDispatcher.class); + + private static final AtomicInteger i = new AtomicInteger(); + + public RSProcDispatcher(MasterServices master) { + super(master); + } + + @Override + protected void remoteDispatch(final ServerName serverName, + final Set remoteProcedures) { + if (!master.getServerManager().isServerOnline(serverName)) { + // fail fast + submitTask(new DeadRSRemoteCall(serverName, remoteProcedures)); + } else { + submitTask(new TestExecuteProceduresRemoteCall(serverName, remoteProcedures)); + } + } + + class TestExecuteProceduresRemoteCall extends ExecuteProceduresRemoteCall { + + public TestExecuteProceduresRemoteCall(ServerName serverName, + Set remoteProcedures) { + super(serverName, remoteProcedures); + } + + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, + final AdminProtos.ExecuteProceduresRequest request) throws IOException { + int j = i.addAndGet(1); + LOG.info("sendRequest() req: {} , j: {}", request, j); + if (j == 12 || j == 22) { + // Execute the remote close and open region requests in the last (5th) retry before + // throwing ConnectionClosedException. This is to ensure even if the region open/close + // is successfully completed by regionserver, master still schedules SCP because + // sendRequest() throws error which has retry-limit exhausted. + FutureUtils.get(getRsAdmin().executeProcedures(request)); + } + // For one of the close region requests and one of the open region requests, + // throw ConnectionClosedException until retry limit is exhausted and master + // schedules recoveries for the server. + // We will have ABNORMALLY_CLOSED regions, and they are expected to recover on their own. + if (j >= 8 && j <= 13 || j >= 18 && j <= 23) { + throw new ConnectionClosedException("test connection closed error..."); + } + return FutureUtils.get(getRsAdmin().executeProcedures(request)); + } + + private AsyncRegionServerAdmin getRsAdmin() { + return master.getAsyncClusterConnection().getRegionServerAdmin(getServerName()); + } + } + + private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall { + + public DeadRSRemoteCall(ServerName serverName, Set remoteProcedures) { + super(serverName, remoteProcedures); + } + + @Override + public void run() { + remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(), + new RegionServerStoppedException("Server " + getServerName() + " is not online")); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestProcDispatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestProcDispatcher.java new file mode 100644 index 000000000000..99414d44488c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestProcDispatcher.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hbase.master.HMaster.HBASE_MASTER_RSPROC_DISPATCHER_CLASS; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.hbck.HbckChore; +import org.apache.hadoop.hbase.master.hbck.HbckReport; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * Testing custom RSProcedureDispatcher to ensure retry limit can be imposed on certain errors. + */ +@Category({ MiscTests.class, LargeTests.class }) +public class TestProcDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(TestProcDispatcher.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestProcDispatcher.class); + + @Rule + public TestName name = new TestName(); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static ServerName rs0; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().set(HBASE_MASTER_RSPROC_DISPATCHER_CLASS, + RSProcDispatcher.class.getName()); + TEST_UTIL.startMiniCluster(3); + SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + rs0 = cluster.getRegionServer(0).getServerName(); + TEST_UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("fam1")).build(); + int startKey = 0; + int endKey = 80000; + TEST_UTIL.getAdmin().createTable(tableDesc, Bytes.toBytes(startKey), Bytes.toBytes(endKey), 9); + } + + @Test + public void testRetryLimitOnConnClosedErrors() throws Exception { + HbckChore hbckChore = new HbckChore(TEST_UTIL.getHBaseCluster().getMaster()); + final TableName tableName = TableName.valueOf(name.getMethodName()); + SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + Admin admin = TEST_UTIL.getAdmin(); + Table table = TEST_UTIL.getConnection().getTable(tableName); + List puts = IntStream.range(10, 50000).mapToObj(i -> new Put(Bytes.toBytes(i)) + .addColumn(Bytes.toBytes("fam1"), Bytes.toBytes("q1"), Bytes.toBytes("val_" + i))) + .collect(Collectors.toList()); + table.put(puts); + admin.flush(tableName); + admin.compact(tableName); + Thread.sleep(3000); + HRegionServer hRegionServer0 = cluster.getRegionServer(0); + HRegionServer hRegionServer1 = cluster.getRegionServer(1); + HRegionServer hRegionServer2 = cluster.getRegionServer(2); + int numRegions0 = hRegionServer0.getNumberOfOnlineRegions(); + int numRegions1 = hRegionServer1.getNumberOfOnlineRegions(); + int numRegions2 = hRegionServer2.getNumberOfOnlineRegions(); + + hbckChore.choreForTesting(); + HbckReport hbckReport = hbckChore.getLastReport(); + Assert.assertEquals(0, hbckReport.getInconsistentRegions().size()); + Assert.assertEquals(0, hbckReport.getOrphanRegionsOnFS().size()); + Assert.assertEquals(0, hbckReport.getOrphanRegionsOnRS().size()); + + HRegion region0 = hRegionServer0.getRegions().get(0); + // move all regions from server1 to server0 + for (HRegion region : hRegionServer1.getRegions()) { + TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), rs0); + } + TEST_UTIL.getAdmin().move(region0.getRegionInfo().getEncodedNameAsBytes()); + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + + // Ensure: + // 1. num of regions before and after scheduling SCP remain same + // 2. all procedures including SCPs are successfully completed + // 3. two servers have SCPs scheduled + TEST_UTIL.waitFor(5000, 1000, () -> { + LOG.info("numRegions0: {} , numRegions1: {} , numRegions2: {}", numRegions0, numRegions1, + numRegions2); + LOG.info("Online regions - server0 : {} , server1: {} , server2: {}", + cluster.getRegionServer(0).getNumberOfOnlineRegions(), + cluster.getRegionServer(1).getNumberOfOnlineRegions(), + cluster.getRegionServer(2).getNumberOfOnlineRegions()); + LOG.info("Num of successfully completed procedures: {} , num of all procedures: {}", + master.getMasterProcedureExecutor().getProcedures().stream() + .filter(masterProcedureEnvProcedure -> masterProcedureEnvProcedure.getState() + == ProcedureProtos.ProcedureState.SUCCESS) + .count(), + master.getMasterProcedureExecutor().getProcedures().size()); + LOG.info("Num of SCPs: " + master.getMasterProcedureExecutor().getProcedures().stream() + .filter(proc -> proc instanceof ServerCrashProcedure).count()); + return (numRegions0 + numRegions1 + numRegions2) + == (cluster.getRegionServer(0).getNumberOfOnlineRegions() + + cluster.getRegionServer(1).getNumberOfOnlineRegions() + + cluster.getRegionServer(2).getNumberOfOnlineRegions()) + && master.getMasterProcedureExecutor().getProcedures().stream() + .filter(masterProcedureEnvProcedure -> masterProcedureEnvProcedure.getState() + == ProcedureProtos.ProcedureState.SUCCESS) + .count() == master.getMasterProcedureExecutor().getProcedures().size() + && master.getMasterProcedureExecutor().getProcedures().stream() + .filter(proc -> proc instanceof ServerCrashProcedure).count() > 0; + }); + + // Ensure we have no inconsistent regions + TEST_UTIL.waitFor(5000, 1000, () -> { + hbckChore.choreForTesting(); + HbckReport report = hbckChore.getLastReport(); + return report.getInconsistentRegions().isEmpty() && report.getOrphanRegionsOnFS().isEmpty() + && report.getOrphanRegionsOnRS().isEmpty(); + }); + + } + +}