2828import java .util .List ;
2929import java .util .Map ;
3030import java .util .Map .Entry ;
31+ import java .util .Objects ;
3132import java .util .Set ;
3233import java .util .concurrent .ConcurrentNavigableMap ;
3334import java .util .concurrent .ConcurrentSkipListMap ;
5152import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
5253import org .apache .hadoop .hbase .client .AsyncRegionServerAdmin ;
5354import org .apache .hadoop .hbase .client .RegionInfo ;
55+ import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
5456import org .apache .hadoop .hbase .ipc .RemoteWithExtrasException ;
5557import org .apache .hadoop .hbase .master .assignment .RegionStates ;
5658import org .apache .hadoop .hbase .master .procedure .ServerCrashProcedure ;
100102 * only after the handler is fully enabled and has completed the handling.
101103 */
102104@ InterfaceAudience .Private
103- public class ServerManager {
105+ public class ServerManager implements ConfigurationObserver {
104106 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
105107 "hbase.master.wait.on.regionservers.maxtostart" ;
106108
@@ -172,6 +174,9 @@ public class ServerManager {
172174 /** Listeners that are called on server events. */
173175 private List <ServerListener > listeners = new CopyOnWriteArrayList <>();
174176
177+ /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
178+ private volatile boolean rejectDecommissionedHostsConfig ;
179+
175180 /**
176181 * Constructor.
177182 */
@@ -183,6 +188,35 @@ public ServerManager(final MasterServices master, RegionServerList storage) {
183188 warningSkew = c .getLong ("hbase.master.warningclockskew" , 10000 );
184189 persistFlushedSequenceId =
185190 c .getBoolean (PERSIST_FLUSHEDSEQUENCEID , PERSIST_FLUSHEDSEQUENCEID_DEFAULT );
191+ rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig (c );
192+ }
193+
194+ /**
195+ * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
196+ * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
197+ * @param conf Server configuration instance
198+ */
199+ @ Override
200+ public void onConfigurationChange (Configuration conf ) {
201+ final boolean newValue = getRejectDecommissionedHostsConfig (conf );
202+ if (rejectDecommissionedHostsConfig == newValue ) {
203+ // no-op
204+ return ;
205+ }
206+
207+ LOG .info ("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}" ,
208+ rejectDecommissionedHostsConfig , newValue );
209+
210+ rejectDecommissionedHostsConfig = newValue ;
211+ }
212+
213+ /**
214+ * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
215+ * @param conf Configuration instance of the Master
216+ */
217+ public boolean getRejectDecommissionedHostsConfig (Configuration conf ) {
218+ return conf .getBoolean (HConstants .REJECT_DECOMMISSIONED_HOSTS_KEY ,
219+ HConstants .REJECT_DECOMMISSIONED_HOSTS_DEFAULT );
186220 }
187221
188222 /**
@@ -227,11 +261,14 @@ ServerName regionServerStartup(RegionServerStartupRequest request, int versionNu
227261 final String hostname =
228262 request .hasUseThisHostnameInstead () ? request .getUseThisHostnameInstead () : isaHostName ;
229263 ServerName sn = ServerName .valueOf (hostname , request .getPort (), request .getServerStartCode ());
264+
265+ // Check if the host should be rejected based on it's decommissioned status
266+ checkRejectableDecommissionedStatus (sn );
267+
230268 checkClockSkew (sn , request .getServerCurrentTime ());
231269 checkIsDead (sn , "STARTUP" );
232270 if (!checkAndRecordNewServer (sn , ServerMetricsBuilder .of (sn , versionNumber , version ))) {
233- LOG .warn (
234- "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn );
271+ LOG .warn ("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}" , sn );
235272 }
236273 storage .started (sn );
237274 return sn ;
@@ -293,6 +330,42 @@ public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDea
293330 updateLastFlushedSequenceIds (sn , sl );
294331 }
295332
333+ /**
334+ * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
335+ * to do so, any RegionServer trying to join the cluster will have it's host checked against the
336+ * list of hosts of currently decommissioned servers and potentially get prevented from reporting
337+ * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
338+ * details.
339+ * @param sn The ServerName to check for
340+ * @throws DecommissionedHostRejectedException if the Master is configured to reject
341+ * decommissioned hosts and this host exists in the
342+ * list of the decommissioned servers
343+ */
344+ private void checkRejectableDecommissionedStatus (ServerName sn )
345+ throws DecommissionedHostRejectedException {
346+ LOG .info ("Checking decommissioned status of RegionServer {}" , sn .getServerName ());
347+
348+ // If the Master is not configured to reject decommissioned hosts, return early.
349+ if (!rejectDecommissionedHostsConfig ) {
350+ return ;
351+ }
352+
353+ // Look for a match for the hostname in the list of decommissioned servers
354+ for (ServerName server : getDrainingServersList ()) {
355+ if (Objects .equals (server .getHostname (), sn .getHostname ())) {
356+ // Found a match and master is configured to reject decommissioned hosts, throw exception!
357+ LOG .warn (
358+ "Rejecting RegionServer {} from reporting for duty because Master is configured "
359+ + "to reject decommissioned hosts and this host was marked as such in the past." ,
360+ sn .getServerName ());
361+ throw new DecommissionedHostRejectedException (String .format (
362+ "Host %s exists in the list of decommissioned servers and Master is configured to "
363+ + "reject decommissioned hosts" ,
364+ sn .getHostname ()));
365+ }
366+ }
367+ }
368+
296369 /**
297370 * Check is a server of same host and port already exists, if not, or the existed one got a
298371 * smaller start code, record it.
@@ -647,13 +720,8 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
647720 * Remove the server from the drain list.
648721 */
649722 public synchronized boolean removeServerFromDrainList (final ServerName sn ) {
650- // Warn if the server (sn) is not online. ServerName is of the form:
651- // <hostname> , <port> , <startcode>
723+ LOG .info ("Removing server {} from the draining list." , sn );
652724
653- if (!this .isServerOnline (sn )) {
654- LOG .warn ("Server " + sn + " is not currently online. "
655- + "Removing from draining list anyway, as requested." );
656- }
657725 // Remove the server from the draining servers lists.
658726 return this .drainingServers .remove (sn );
659727 }
@@ -663,22 +731,23 @@ public synchronized boolean removeServerFromDrainList(final ServerName sn) {
663731 * @return True if the server is added or the server is already on the drain list.
664732 */
665733 public synchronized boolean addServerToDrainList (final ServerName sn ) {
666- // Warn if the server (sn) is not online. ServerName is of the form:
667- // <hostname> , <port> , <startcode>
668-
669- if (!this .isServerOnline (sn )) {
670- LOG .warn ("Server " + sn + " is not currently online. "
671- + "Ignoring request to add it to draining list." );
734+ // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
735+ // However, we want to add servers even if they're not online if the master is configured
736+ // to reject decommissioned hosts
737+ if (!rejectDecommissionedHostsConfig && ! this .isServerOnline (sn )) {
738+ LOG .warn ("Server {} is not currently online. Ignoring request to add it to draining list." ,
739+ sn );
672740 return false ;
673741 }
674- // Add the server to the draining servers lists, if it's not already in
675- // it.
742+
743+ // Add the server to the draining servers lists, if it's not already in it.
676744 if (this .drainingServers .contains (sn )) {
677- LOG .warn ("Server " + sn + " is already in the draining server list."
678- + " Ignoring request to add it again." );
745+ LOG .warn (
746+ "Server {} is already in the draining server list. Ignoring request to add it again.", sn );
679747 return true ;
680748 }
681- LOG .info ("Server " + sn + " added to draining server list." );
749+
750+ LOG .info ("Server {} added to draining server list." , sn );
682751 return this .drainingServers .add (sn );
683752 }
684753
0 commit comments