diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index e8498c4c597b..61107f7cd12b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +import org.apache.hadoop.hbase.exceptions.LockTimeoutException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; @@ -1316,13 +1317,15 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, // Query the meta region long pauseBase = this.pause; - userRegionLock.lock(); + takeUserRegionLock(); try { - if (useCache) {// re-check cache after get lock - RegionLocations locations = getCachedLocation(tableName, row); - if (locations != null && locations.getRegionLocation(replicaId) != null) { - return locations; - } + // We don't need to check if useCache is enabled or not. Even if useCache is false + // we already cleared the cache for this row before acquiring userRegion lock so if this + // row is present in cache that means some other thread has populated it while we were + // waiting to acquire user region lock. + RegionLocations locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } Result regionInfoRow = null; s.resetMvccReadPoint(); @@ -1339,7 +1342,7 @@ rpcControllerFactory, getMetaLookupPool(), } // convert the row result into the HRegionLocation we need! - RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); + locations = MetaTableAccessor.getRegionLocations(regionInfoRow); if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("HRegionInfo was null in " + tableName + ", row=" + regionInfoRow); @@ -1423,6 +1426,19 @@ rpcControllerFactory, getMetaLookupPool(), } } + void takeUserRegionLock() throws IOException { + try { + long waitTime = connectionConfig.getMetaOperationTimeout(); + if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { + throw new LockTimeoutException("Failed to get user region lock in" + + waitTime + " ms. " + " for accessing meta region server."); + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for a lock", ie); + throw ExceptionUtil.asInterrupt(ie); + } + } + /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java index 7499ab7205a6..62d07386de36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +import org.apache.hadoop.hbase.exceptions.LockTimeoutException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; @@ -42,6 +43,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static junit.framework.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -57,6 +60,7 @@ public class TestMetaCache { private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static HRegionServer badRS; + private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class); /** * @throws java.lang.Exception @@ -356,4 +360,77 @@ public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest throws ServiceException { } } -} \ No newline at end of file + + @Test + public void testUserRegionLockThrowsException() throws IOException, InterruptedException { + ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector()); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000); + + try (ConnectionManager.HConnectionImplementation conn = + (ConnectionManager.HConnectionImplementation) ConnectionFactory.createConnection(conf)) { + ClientThread client1 = new ClientThread(conn); + ClientThread client2 = new ClientThread(conn); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and + // eventually fail since the sleep time is more than hbase client scanner timeout period. + // Other thread will wait to acquire userRegionLock. + // Have no idea which thread will be scheduled first. So need to check both threads. + + // Both the threads will throw exception. One thread will throw exception since after + // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period + // is 2 seconds. + // Other thread will throw exception since it was not able to get hold of user region lock + // within meta operation timeout period. + assertNotNull(client1.getException()); + assertNotNull(client2.getException()); + + assertTrue(client1.getException() instanceof LockTimeoutException + ^ client2.getException() instanceof LockTimeoutException); + } + } + + private final class ClientThread extends Thread { + private Exception exception; + private ConnectionManager.HConnectionImplementation connection; + + private ClientThread(ConnectionManager.HConnectionImplementation connection) { + this.connection = connection; + } + @Override + public void run() { + byte[] currentKey = HConstants.EMPTY_START_ROW; + try { + connection.getRegionLocation(TABLE_NAME, currentKey, true); + } catch (IOException e) { + LOG.error("Thread id: " + this.getId() + " exception: ", e); + this.exception = e; + } + } + public Exception getException() { + return exception; + } + } + + public static class LockSleepInjector extends ExceptionInjector { + @Override + public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.info("Interrupted exception", e); + } + } + + @Override + public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { } + + @Override + public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { } + } +}