Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -57,6 +59,9 @@ class AsyncConnectionConfiguration {
// timeout for each read rpc request
private final long readRpcTimeoutNs;

// timeout for each read rpc request against system tables
private final long metaReadRpcTimeoutNs;

// timeout for each write rpc request
private final long writeRpcTimeoutNs;

Expand All @@ -74,6 +79,7 @@ class AsyncConnectionConfiguration {
// client that it is still alive. The scan timeout is used as operation timeout for every
// operations in a scan, such as openScanner or next.
private final long scanTimeoutNs;
private final long metaScanTimeoutNs;

private final int scannerCaching;

Expand Down Expand Up @@ -111,8 +117,11 @@ class AsyncConnectionConfiguration {
TimeUnit.MILLISECONDS.toNanos(connectionConf.getMetaOperationTimeout());
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout());
this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout());
this.readRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getReadRpcTimeout()));
long readRpcTimeoutMillis =
conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getRpcTimeout());
this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, connectionConf.getWriteRpcTimeout()));
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis());
Expand All @@ -124,8 +133,11 @@ class AsyncConnectionConfiguration {
TimeUnit.MICROSECONDS.toNanos(connectionConf.getReplicaCallTimeoutMicroSecondScan());
this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan());
this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis));

// fields not in connection configuration
this.startLogErrorsCnt =
Expand All @@ -150,6 +162,10 @@ long getReadRpcTimeoutNs() {
return readRpcTimeoutNs;
}

long getMetaReadRpcTimeoutNs() {
return metaReadRpcTimeoutNs;
}

long getWriteRpcTimeoutNs() {
return writeRpcTimeoutNs;
}
Expand All @@ -174,6 +190,10 @@ long getScanTimeoutNs() {
return scanTimeoutNs;
}

long getMetaScanTimeoutNs() {
return metaScanTimeoutNs;
}

int getScannerCaching() {
return scannerCaching;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
this.operationTimeoutNs = tableName.isSystemTable()
? connConf.getMetaOperationTimeoutNs()
: connConf.getOperationTimeoutNs();
this.scanTimeoutNs = connConf.getScanTimeoutNs();
this.scanTimeoutNs =
tableName.isSystemTable() ? connConf.getMetaScanTimeoutNs() : connConf.getScanTimeoutNs();
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.readRpcTimeoutNs = tableName.isSystemTable()
? connConf.getMetaReadRpcTimeoutNs()
: connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {

public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,
int replicaCallTimeoutMicroSecondScan) throws IOException {
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
exceptionsQueue = new ConcurrentLinkedQueue<>();
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final long maxScannerResultSize;
private final ClusterConnection connection;
protected final TableName tableName;
protected final int readRpcTimeout;
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller;
Expand All @@ -100,8 +101,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
*/
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
Expand All @@ -120,8 +121,8 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.readRpcTimeout = scanReadRpcTimeout;
this.scannerTimeout = scannerTimeout;

// check if application wants to collect scan metrics
initScanMetrics(scan);
Expand Down Expand Up @@ -248,9 +249,9 @@ protected boolean moveToNextRegion() {
// clear the current region, we will set a new value to it after the first call of the new
// callable.
this.currentRegion = null;
this.callable =
new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
scannerTimeout, caching, conf, caller);
this.callable.setCaching(this.caching);
incRegionCountMetrics(scanMetrics);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
public class ClientSimpleScanner extends ClientScanner {
public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name,
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool,
int replicaCallTimeoutMicroSecondScan) throws IOException {
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
replicaCallTimeoutMicroSecondScan);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class ConnectionConfiguration {
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
}

public static final String HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY =
"hbase.client.meta.read.rpc.timeout";
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
"hbase.client.meta.scanner.timeout.period";

private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
Expand All @@ -85,7 +90,11 @@ public class ConnectionConfiguration {
private final int maxKeyValueSize;
private final int rpcTimeout;
private final int readRpcTimeout;
private final int metaReadRpcTimeout;
private final int writeRpcTimeout;
private final int scanTimeout;
private final int metaScanTimeout;

// toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;
private final long pauseMs;
Expand Down Expand Up @@ -140,9 +149,16 @@ public class ConnectionConfiguration {
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

this.metaReadRpcTimeout = conf.getInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeout);

this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

this.scanTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);

this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);

long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
Expand Down Expand Up @@ -178,8 +194,11 @@ protected ConnectionConfiguration() {
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.scanTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
this.metaScanTimeout = scanTimeout;
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
}
Expand All @@ -188,6 +207,10 @@ public int getReadRpcTimeout() {
return readRpcTimeout;
}

public int getMetaReadRpcTimeout() {
return metaReadRpcTimeout;
}

public int getWriteRpcTimeout() {
return writeRpcTimeout;
}
Expand Down Expand Up @@ -248,6 +271,14 @@ public int getRpcTimeout() {
return rpcTimeout;
}

public int getScanTimeout() {
return scanTimeout;
}

public int getMetaScanTimeout() {
return metaScanTimeout;
}

public long getPauseMillis() {
return pauseMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,8 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
try (Scope ignored = span.makeCurrent();
ReversedClientScanner rcs =
new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(),
connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public class HTable implements Table {
private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
private int readRpcTimeoutMs; // timeout for each read rpc request
private int writeRpcTimeoutMs; // timeout for each write rpc request

private final int scanReadRpcTimeout;
private final int scanTimeout;
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final HRegionLocator locator;

Expand Down Expand Up @@ -191,6 +194,8 @@ protected HTable(final ConnectionImplementation connection, final TableBuilderBa
this.rpcTimeoutMs = builder.rpcTimeout;
this.readRpcTimeoutMs = builder.readRpcTimeout;
this.writeRpcTimeoutMs = builder.writeRpcTimeout;
this.scanReadRpcTimeout = builder.scanReadRpcTimeout;
this.scanTimeout = builder.scanTimeout;
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();

Expand Down Expand Up @@ -312,18 +317,21 @@ public ResultScanner getScanner(Scan scan) throws IOException {
final boolean async = scan.isAsyncPrefetch() != null
? scan.isAsyncPrefetch()
: connConfiguration.isClientScannerAsyncPrefetch();
final int timeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();
final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();

if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, timeout);
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout);
} else {
if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, timeout);
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout);
} else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, timeout);
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ class QueueingFuture<T> implements RunnableFuture<T> {
private T result = null;
private ExecutionException exeEx = null;
private volatile boolean cancelled = false;
private final int callTimeout;
private final int operationTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
private final int replicaId; // replica id

public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout,
int id) {
this.future = future;
this.callTimeout = callTimeout;
this.retryingCaller = retryingCallerFactory.<T> newCaller();
this.operationTimeout = operationTimeout;
this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout);
this.replicaId = id;
}

Expand All @@ -70,7 +71,7 @@ public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
public void run() {
try {
if (!cancelled) {
result = this.retryingCaller.callWithRetries(future, callTimeout);
result = this.retryingCaller.callWithRetries(future, operationTimeout);
resultObtained = true;
}
} catch (Throwable t) {
Expand Down Expand Up @@ -157,8 +158,8 @@ public ResultBoundedCompletionService(RpcRetryingCallerFactory retryingCallerFac
this.completedTasks = new ArrayList<>(maxTasks);
}

public void submit(RetryingCallable<V> task, int callTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id);
public void submit(RetryingCallable<V> task, int rpcTimeout, int operationTimeout, int id) {
QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id);
// remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable
executor.execute(newFuture);
tasks[id] = newFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class ReversedClientScanner extends ClientScanner {
*/
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private void addCallsForReplica(ResultBoundedCompletionService<Result> cs, Regio
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
cs.submit(callOnReplica, operationTimeout, id);
cs.submit(callOnReplica, rpcTimeout, operationTimeout, id);
}
}

Expand Down
Loading