diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index 50798ed60a0a..99cab625b85c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,25 +17,24 @@ * limitations under the License. */ package org.apache.hadoop.hbase.master; - import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** * Handles everything on master-side related to master election. @@ -57,12 +56,18 @@ public class ActiveMasterManager extends ZKListener { final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); final AtomicBoolean clusterShutDown = new AtomicBoolean(false); + // This server's information. private final ServerName sn; private int infoPort; private final Server master; + // Active master's server name. Invalidated anytime active master changes (based on ZK + // notifications) and lazily fetched on-demand. + // ServerName is immutable, so we don't need heavy synchronization around it. + private volatile ServerName activeMasterServerName; + /** - * @param watcher + * @param watcher ZK watcher * @param sn ServerName * @param master In an instance of a Master. */ @@ -106,6 +111,30 @@ void handle(final String path) { } } + /** + * Fetches the active master's ServerName from zookeeper. + */ + private void fetchAndSetActiveMasterServerName() { + LOG.debug("Attempting to fetch active master sn from zk"); + try { + activeMasterServerName = MasterAddressTracker.getMasterAddress(watcher); + } catch (IOException | KeeperException e) { + // Log and ignore for now and re-fetch later if needed. + LOG.error("Error fetching active master information", e); + } + } + + public Optional getActiveMasterServerName() { + if (!clusterHasActiveMaster.get()) { + return Optional.empty(); + } + if (activeMasterServerName == null) { + fetchAndSetActiveMasterServerName(); + } + // It could still be null, but return whatever we have. + return Optional.ofNullable(activeMasterServerName); + } + /** * Handle a change in the master node. Doesn't matter whether this was called * from a nodeCreated or nodeDeleted event because there are no guarantees @@ -134,6 +163,9 @@ private void handleMasterNodeChange() { // Notify any thread waiting to become the active master clusterHasActiveMaster.notifyAll(); } + // Reset the active master sn. Will be re-fetched later if needed. + // We don't want to make a synchronous RPC under a monitor. + activeMasterServerName = null; } } catch (KeeperException ke) { master.abort("Received an unexpected KeeperException, aborting", ke); @@ -151,8 +183,8 @@ private void handleMasterNodeChange() { * @param checkInterval the interval to check if the master is stopped * @param startupStatus the monitor status to track the progress * @return True if no issue becoming active master else false if another - * master was running or if some other problem (zookeeper, stop flag has been - * set on this Master) + * master was running or if some other problem (zookeeper, stop flag has been + * set on this Master) */ boolean blockUntilBecomingActiveMaster( int checkInterval, MonitoredTask startupStatus) { @@ -178,10 +210,14 @@ boolean blockUntilBecomingActiveMaster( // We are the master, return startupStatus.setStatus("Successfully registered as active master."); this.clusterHasActiveMaster.set(true); + activeMasterServerName = sn; LOG.info("Registered as active master=" + this.sn); return true; } + // Invalidate the active master name so that subsequent requests do not get any stale + // master information. Will be re-fetched if needed. + activeMasterServerName = null; // There is another active master running elsewhere or this is a restart // and the master ephemeral node has not expired yet. this.clusterHasActiveMaster.set(true); @@ -208,7 +244,8 @@ boolean blockUntilBecomingActiveMaster( ZKUtil.deleteNode(this.watcher, this.watcher.getZNodePaths().masterAddressZNode); // We may have failed to delete the znode at the previous step, but - // we delete the file anyway: a second attempt to delete the znode is likely to fail again. + // we delete the file anyway: a second attempt to delete the znode is likely to fail + // again. ZNodeClearer.deleteMyEphemeralNodeOnDisk(); } else { msg = "Another master is the active master, " + currentMaster + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CachedClusterId.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CachedClusterId.java new file mode 100644 index 000000000000..9ca739987618 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CachedClusterId.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * Caches the cluster ID of the cluster. For standby masters, this is used to serve the client + * RPCs that fetch the cluster ID. ClusterID is only created by an active master if one does not + * already exist. Standby masters just read the information from the file system. This class is + * thread-safe. + * + * TODO: Make it a singleton without affecting concurrent junit tests. + */ +@InterfaceAudience.Private +public class CachedClusterId { + + public static final Logger LOG = LoggerFactory.getLogger(CachedClusterId.class); + private static final int MAX_FETCH_TIMEOUT_MS = 10000; + + private Path rootDir; + private FileSystem fs; + + // When true, indicates that a FileSystem fetch of ClusterID is in progress. This is used to + // avoid multiple fetches from FS and let only one thread fetch the information. + AtomicBoolean fetchInProgress = new AtomicBoolean(false); + + // When true, it means that the cluster ID has been fetched successfully from fs. + private AtomicBoolean isClusterIdSet = new AtomicBoolean(false); + // Immutable once set and read multiple times. + private ClusterId clusterId; + + // cache stats for testing. + private AtomicInteger cacheMisses = new AtomicInteger(0); + + public CachedClusterId(Configuration conf) throws IOException { + rootDir = FSUtils.getRootDir(conf); + fs = rootDir.getFileSystem(conf); + } + + /** + * Succeeds only once, when setting to a non-null value. Overwrites are not allowed. + */ + private void setClusterId(ClusterId id) { + if (id == null || isClusterIdSet.get()) { + return; + } + clusterId = id; + isClusterIdSet.set(true); + } + + /** + * Returns a cached copy of the cluster ID. null if the cache is not populated. + */ + private String getClusterId() { + if (!isClusterIdSet.get()) { + return null; + } + // It is ok to read without a lock since clusterId is immutable once set. + return clusterId.toString(); + } + + /** + * Attempts to fetch the cluster ID from the file system. If no attempt is already in progress, + * synchronously fetches the cluster ID and sets it. If an attempt is already in progress, + * returns right away and the caller is expected to wait for the fetch to finish. + * @return true if the attempt is done, false if another thread is already fetching it. + */ + private boolean attemptFetch() { + if (fetchInProgress.compareAndSet(false, true)) { + // A fetch is not in progress, so try fetching the cluster ID synchronously and then notify + // the waiting threads. + try { + cacheMisses.incrementAndGet(); + setClusterId(FSUtils.getClusterId(fs, rootDir)); + } catch (IOException e) { + LOG.warn("Error fetching cluster ID", e); + } finally { + Preconditions.checkState(fetchInProgress.compareAndSet(true, false)); + synchronized (fetchInProgress) { + fetchInProgress.notifyAll(); + } + } + return true; + } + return false; + } + + private void waitForFetchToFinish() throws InterruptedException { + synchronized (fetchInProgress) { + while (fetchInProgress.get()) { + // We don't want the fetches to block forever, for example if there are bugs + // of missing notifications. + fetchInProgress.wait(MAX_FETCH_TIMEOUT_MS); + } + } + } + + /** + * Fetches the ClusterId from FS if it is not cached locally. Atomically updates the cached + * copy and is thread-safe. Optimized to do a single fetch when there are multiple threads are + * trying get from a clean cache. + * + * @return ClusterId by reading from FileSystem or null in any error case or cluster ID does + * not exist on the file system. + */ + public String getFromCacheOrFetch() { + String id = getClusterId(); + if (id != null) { + return id; + } + if (!attemptFetch()) { + // A fetch is in progress. + try { + waitForFetchToFinish(); + } catch (InterruptedException e) { + // pass and return whatever is in the cache. + } + } + return getClusterId(); + } + + @VisibleForTesting + public int getCacheStats() { + return cacheMisses.get(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0b78bc4d2e0a..d11a4ab46d6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -444,6 +444,9 @@ public void run() { private final boolean maintenanceMode; static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode"; + // Cached clusterId on stand by masters to serve clusterID requests from clients. + private final CachedClusterId cachedClusterId; + public static class RedirectServlet extends HttpServlet { private static final long serialVersionUID = 2894774810058302473L; private final int regionServerInfoPort; @@ -564,6 +567,7 @@ public HMaster(final Configuration conf) } else { this.activeMasterManager = null; } + cachedClusterId = new CachedClusterId(conf); } catch (Throwable t) { // Make sure we log the exception. HMaster is often started via reflection and the // cause of failed startup is lost. @@ -3765,7 +3769,10 @@ public HbckChore getHbckChore() { return this.hbckChore; } - @Override + public Optional getActiveMaster() { + return activeMasterManager.getActiveMasterServerName(); + } + public void runReplicationBarrierCleaner() { ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner; if (rbc != null) { @@ -3776,4 +3783,11 @@ public void runReplicationBarrierCleaner() { public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() { return this.snapshotQuotaChore; } + + public String getClusterId() { + if (activeMaster) { + return super.getClusterId(); + } + return cachedClusterId.getFromCacheOrFetch(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCachedClusterId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCachedClusterId.java new file mode 100644 index 000000000000..932cb3bb7e15 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestCachedClusterId.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.master.CachedClusterId; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestCachedClusterId { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCachedClusterId.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static String clusterId; + private static HMaster activeMaster; + private static HMaster standByMaster; + + private static class GetClusterIdThread extends TestThread { + CachedClusterId cachedClusterId; + public GetClusterIdThread(TestContext ctx, CachedClusterId clusterId) { + super(ctx); + cachedClusterId = clusterId; + } + + @Override + public void doWork() throws Exception { + assertEquals(clusterId, cachedClusterId.getFromCacheOrFetch()); + } + } + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + clusterId = activeMaster.getClusterId(); + standByMaster = TEST_UTIL.getHBaseCluster().startMaster().getMaster(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testClusterIdMatch() { + assertEquals(clusterId, standByMaster.getClusterId()); + } + + @Test + public void testMultiThreadedGetClusterId() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + CachedClusterId cachedClusterId = new CachedClusterId(conf); + TestContext context = new TestContext(conf); + int numThreads = 100; + for (int i = 0; i < numThreads; i++) { + context.addThread(new GetClusterIdThread(context, cachedClusterId)); + } + context.startThreads(); + context.stop(); + int cacheMisses = cachedClusterId.getCacheStats(); + assertEquals(cacheMisses, 1); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java index 55d00f0bead8..ad766421a361 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -91,6 +92,7 @@ public static void tearDownAfterClass() throws Exception { ActiveMasterManager activeMasterManager = dummyMaster.getActiveMasterManager(); assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + assertFalse(activeMasterManager.getActiveMasterServerName().isPresent()); // First test becoming the active master uninterrupted MonitoredTask status = Mockito.mock(MonitoredTask.class); @@ -99,6 +101,7 @@ public static void tearDownAfterClass() throws Exception { activeMasterManager.blockUntilBecomingActiveMaster(100, status); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, master); + assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); // Now pretend master restart DummyMaster secondDummyMaster = new DummyMaster(zk,master); @@ -108,6 +111,8 @@ public static void tearDownAfterClass() throws Exception { activeMasterManager.blockUntilBecomingActiveMaster(100, status); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, master); + assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); + assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get()); } /** @@ -135,6 +140,7 @@ public void testActiveMasterManagerFromZK() throws Exception { ActiveMasterManager activeMasterManager = ms1.getActiveMasterManager(); assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + assertFalse(activeMasterManager.getActiveMasterServerName().isPresent()); // First test becoming the active master uninterrupted ClusterStatusTracker clusterStatusTracker = @@ -144,6 +150,7 @@ public void testActiveMasterManagerFromZK() throws Exception { Mockito.mock(MonitoredTask.class)); assertTrue(activeMasterManager.clusterHasActiveMaster.get()); assertMaster(zk, firstMasterAddress); + assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); // New manager will now try to become the active master in another thread WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress); @@ -161,6 +168,8 @@ public void testActiveMasterManagerFromZK() throws Exception { assertTrue(t.manager.clusterHasActiveMaster.get()); // But secondary one should not be the active master assertFalse(t.isActiveMaster); + // Verify the active master ServerName is populated in standby master. + assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get()); // Close the first server and delete it's master node ms1.stop("stopping first server"); @@ -189,6 +198,7 @@ public void testActiveMasterManagerFromZK() throws Exception { assertTrue(t.manager.clusterHasActiveMaster.get()); assertTrue(t.isActiveMaster); + assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get()); LOG.info("Deleting master node");