diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index abd1267ffc4b..0b892349b80d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -62,15 +62,15 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); - public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, Scan scanForMetrics, + TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { - super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, - connectionConfiguration, requestAttributes); + super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory, + rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout, + replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index ef8e4b0404f6..df7f900830e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -61,6 +61,12 @@ public abstract class ClientScanner extends AbstractClientScanner { private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); protected final Scan scan; + // We clone the original client Scan to avoid modifying user object from scan internals. + // The below scanForMetrics is the client's object, which we mutate only for returning + // ScanMetrics. + // See https://issues.apache.org/jira/browse/HBASE-27402. + private final Scan scanForMetrics; + protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. @@ -101,12 +107,13 @@ public abstract class ClientScanner extends AbstractClientScanner { * @param tableName The table that we wish to scan * @param connection Connection identifying the cluster */ - public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + public ClientScanner(final Configuration conf, final Scan scan, final Scan scanForMetrics, + final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int primaryOperationTimeout, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { + this.scanForMetrics = scanForMetrics; if (LOG.isTraceEnabled()) { LOG.trace( "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -294,7 +301,7 @@ protected void writeScanMetrics() { // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published // to Scan will be messed up. - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, + scanForMetrics.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); scanMetricsPublished = true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index bde036f88806..b5b7b1926ac2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -35,15 +35,15 @@ */ @InterfaceAudience.Private public class ClientSimpleScanner extends ClientScanner { - public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + public ClientSimpleScanner(Configuration configuration, Scan scan, Scan scanForMetrics, + TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { - super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, - connectionConfiguration, requestAttributes); + super(configuration, scan, scanForMetrics, name, connection, rpcCallerFactory, + rpcControllerFactory, pool, scanReadRpcTimeout, scannerTimeout, + replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 268a2495d69e..08e24d4d6bff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1049,10 +1049,11 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool final Span span = new TableOperationSpanBuilder(this) .setTableName(TableName.META_TABLE_NAME).setOperation(s).build(); try (Scope ignored = span.makeCurrent(); - ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, - this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), - connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(), - metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) { + ReversedClientScanner rcs = + new ReversedClientScanner(conf, s, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, + rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), + connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond, + connectionConfig, Collections.emptyMap())) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 386a7db3526e..fd3de615cb2f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -300,6 +300,12 @@ private Pair, List> getKeysAndRegionsInRange(final */ @Override public ResultScanner getScanner(Scan scan) throws IOException { + // Clone to avoid modifying user object from scan internals. + // See https://issues.apache.org/jira/browse/HBASE-27402. + return getScannerInternal(new Scan(scan), scan); + } + + private ResultScanner getScannerInternal(Scan scan, Scan scanForMetrics) throws IOException { final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build(); try (Scope ignored = span.makeCurrent()) { @@ -319,18 +325,18 @@ public ResultScanner getScanner(Scan scan) throws IOException { final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); if (scan.isReversed()) { - return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + return new ReversedClientScanner(getConfiguration(), scan, scanForMetrics, getName(), + connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, replicaTimeout, connConfiguration, requestAttributes); } else { if (async) { - return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, connConfiguration, requestAttributes); + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, scanForMetrics, getName(), + connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, + scanTimeout, replicaTimeout, connConfiguration, requestAttributes); } else { - return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, - rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, connConfiguration, requestAttributes); + return new ClientSimpleScanner(getConfiguration(), scan, scanForMetrics, getName(), + connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, + scanTimeout, replicaTimeout, connConfiguration, requestAttributes); } } } @@ -344,7 +350,7 @@ public ResultScanner getScanner(Scan scan) throws IOException { public ResultScanner getScanner(byte[] family) throws IOException { Scan scan = new Scan(); scan.addFamily(family); - return getScanner(scan); + return getScannerInternal(scan, scan); } /** @@ -355,7 +361,7 @@ public ResultScanner getScanner(byte[] family) throws IOException { public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { Scan scan = new Scan(); scan.addColumn(family, qualifier); - return getScanner(scan); + return getScannerInternal(scan, scan); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 36bbdb5b60e4..2f363002b14e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -37,13 +37,13 @@ public class ReversedClientScanner extends ClientScanner { * Create a new ReversibleClientScanner for the specified table Note that the passed * {@link Scan}'s start row maybe changed. */ - public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + public ReversedClientScanner(Configuration conf, Scan scan, Scan scanForMetrics, + TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int primaryOperationTimeout, ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + super(conf, scan, scanForMetrics, tableName, connection, rpcFactory, controllerFactory, pool, scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, requestAttributes); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 9b5eb91bbd5c..e039be8baa83 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -110,7 +110,7 @@ public MockClientScanner(final Configuration conf, final Scan scan, final TableN ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, ConnectionConfiguration connectionConfig) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, + super(conf, scan, scan, tableName, connection, rpcFactory, controllerFactory, pool, HConstants.DEFAULT_HBASE_RPC_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout, connectionConfig, Collections.emptyMap()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 36ecad5276d7..3cc454a8642c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -167,6 +167,25 @@ public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) thro TEST_UTIL.startMiniCluster(builder.build()); } + @Test + public void testScanImmutable() throws IOException { + TableName tableName = name.getTableName(); + Table table = TEST_UTIL.createTable(tableName, FAMILY); + TEST_UTIL.loadRandomRows(table, FAMILY, 100, 100); + + Scan scan = new Scan().setCaching(-1).setMvccReadPoint(-1).setScanMetricsEnabled(true); + + try (ResultScanner scanner = table.getScanner(scan)) { + scanner.next(1000); + } + // these 2 should be unchanged + assertEquals(-1, scan.getCaching()); + assertEquals(-1, scan.getMvccReadPoint()); + // scan metrics should be populated + assertNotNull(scan.getScanMetrics()); + assertEquals(scan.getScanMetrics().countOfRegions.get(), 1); + } + /** * Test from client side for batch of scan */