Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,9 @@ private long getPrimaryTimeoutNs() {
private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
addListener(
timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
(resp, error) -> {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -103,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection {

private volatile boolean closed = false;

private final Optional<MetricsConnection> metrics;

public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
User user) {
this.conf = conf;
Expand All @@ -112,7 +116,12 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
Expand Down Expand Up @@ -148,6 +157,7 @@ public void close() {
if (authService != null) {
authService.shutdown();
}
metrics.ifPresent(MetricsConnection::shutdown);
closed = true;
}

Expand Down Expand Up @@ -312,4 +322,8 @@ public Hbck getHbck(ServerName masterServer) throws IOException {
public void clearRegionLocationCache() {
locator.clearCache();
}

Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down Expand Up @@ -106,7 +107,7 @@ private void removeLocationFromCache(HRegionLocation loc) {

void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
this::addLocationToCache, this::removeLocationFromCache);
this::addLocationToCache, this::removeLocationFromCache, Optional.empty());
}

void clearCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,25 @@ private boolean onScanNext(TableName tableName, LocateRequest req, Result result
return true;
}

private void recordCacheHit() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
}

private void recordCacheMiss() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
}

private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId) {
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
Expand All @@ -355,8 +365,10 @@ private RegionLocations locateRowInCache(TableCache tableCache, TableName tableN
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
}
}
Expand All @@ -367,11 +379,13 @@ private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
Expand All @@ -380,8 +394,10 @@ private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
}
}
Expand Down Expand Up @@ -529,6 +545,10 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
}

private void recordClearRegionCache() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
}

private void removeLocationFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
Expand All @@ -544,10 +564,12 @@ private void removeLocationFromCache(HRegionLocation loc) {
RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (newLocs == null) {
if (tableCache.cache.remove(startKey, oldLocs)) {
recordClearRegionCache();
return;
}
} else {
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
recordClearRegionCache();
return;
}
}
Expand All @@ -569,7 +591,7 @@ private HRegionLocation getCachedLocation(HRegionLocation loc) {

void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
this::addLocationToCache, this::removeLocationFromCache);
this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
}

void clearCache(TableName tableName) {
Expand All @@ -583,6 +605,8 @@ void clearCache(TableName tableName) {
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
}
}
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}

void clearCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;

import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
Expand Down Expand Up @@ -51,7 +52,8 @@ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {

static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
Optional<MetricsConnection> metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
Expand All @@ -78,6 +80,7 @@ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
removeFromCache.accept(loc);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {

// thread executor shared by all Table instances created
// by this connection
private volatile ExecutorService batchPool = null;
private volatile ThreadPoolExecutor batchPool = null;
// meta thread executor shared by all Table instances created
// by this connection
private volatile ExecutorService metaLookupPool = null;
private volatile ThreadPoolExecutor metaLookupPool = null;
private volatile boolean cleanupPool = false;

private final Configuration conf;
Expand Down Expand Up @@ -238,14 +238,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* constructor
* @param conf Configuration object
*/
ConnectionImplementation(Configuration conf,
ExecutorService pool, User user) throws IOException {
ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
this.conf = conf;
this.user = user;
if (user != null && user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
this.batchPool = pool;
this.batchPool = (ThreadPoolExecutor) pool;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
Expand Down Expand Up @@ -286,7 +285,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
this.metrics =
new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
} else {
this.metrics = null;
}
Expand Down Expand Up @@ -461,7 +461,7 @@ public MetricsConnection getConnectionMetrics() {
return this.metrics;
}

private ExecutorService getBatchPool() {
private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
Expand All @@ -474,7 +474,7 @@ private ExecutorService getBatchPool() {
return this.batchPool;
}

private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
BlockingQueue<Runnable> passedWorkQueue) {
// shared HTable thread executor not yet initialized
if (maxThreads == 0) {
Expand Down Expand Up @@ -503,7 +503,7 @@ private ExecutorService getThreadPool(int maxThreads, int coreThreads, String na
return tpe;
}

private ExecutorService getMetaLookupPool() {
private ThreadPoolExecutor getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -500,13 +501,19 @@ static void incRegionCountMetrics(ScanMetrics scanMetrics) {
/**
* Connect the two futures, if the src future is done, then mark the dst future as done. And if
* the dst future is done, then cancel the src future. This is used for timeline consistent read.
* <p/>
* Pass empty metrics if you want to link the primary future and the dst future so we will not
* increase the hedge read related metrics.
*/
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture,
Optional<MetricsConnection> metrics) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
} else {
dstFuture.complete(r);
if (dstFuture.complete(r)) {
metrics.ifPresent(MetricsConnection::incrHedgedReadWin);
}
}
});
// The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
Expand All @@ -519,22 +526,23 @@ private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFutur

private static <T> void sendRequestsToSecondaryReplicas(
Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs,
CompletableFuture<T> future) {
CompletableFuture<T> future, Optional<MetricsConnection> metrics) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done, i.e, the primary request
// has already been finished.
return;
}
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
connect(secondaryFuture, future);
metrics.ifPresent(MetricsConnection::incrHedgedReadOps);
connect(secondaryFuture, future, metrics);
}
}

static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator,
TableName tableName, Query query, byte[] row, RegionLocateType locateType,
Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs,
long primaryCallTimeoutNs, Timer retryTimer) {
long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> metrics) {
if (query.getConsistency() == Consistency.STRONG) {
return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
Expand All @@ -545,7 +553,7 @@ static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locato
// Timeline consistent read, where we may send requests to other region replicas
CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
CompletableFuture<T> future = new CompletableFuture<>();
connect(primaryFuture, future);
connect(primaryFuture, future, Optional.empty());
long startNs = System.nanoTime();
// after the getRegionLocations, all the locations for the replicas of this region should have
// been cached, so it is not big deal to locate them again when actually sending requests to
Expand All @@ -567,11 +575,11 @@ static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locato
}
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
if (delayNs <= 0) {
sendRequestsToSecondaryReplicas(requestReplica, locs, future);
sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics);
} else {
retryTimer.newTimeout(
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs,
TimeUnit.NANOSECONDS);
timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future, metrics),
delayNs, TimeUnit.NANOSECONDS);
}
});
return future;
Expand Down
Loading