diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufHttpEntity.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufHttpEntity.java new file mode 100644 index 000000000000..f3e9278a9ed2 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufHttpEntity.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HTTP; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * For sending a Protobuf encoded object via Apache HttpClient efficiently without an interim byte + * array. This exposes the underlying Apache HttpClient types, but so do the other client classes. + */ +@InterfaceAudience.Public +public class ProtobufHttpEntity implements HttpEntity { + + private ProtobufMessageHandler handler; + + public ProtobufHttpEntity(ProtobufMessageHandler handler) { + this.handler = handler; + } + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public long getContentLength() { + return -1; + } + + @Override + public Header getContentType() { + return new BasicHeader(HTTP.CONTENT_TYPE, Constants.MIMETYPE_PROTOBUF_IETF); + } + + @Override + public Header getContentEncoding() { + return null; + } + + @Override + public InputStream getContent() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException("only writeTo is supported"); + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + handler.writeProtobufOutput(outStream); + } + + @Override + public boolean isStreaming() { + return true; + } + + @Override + public void consumeContent() throws IOException { + } + +} diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java index 74d3a11d3f2a..13d0804a321b 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java @@ -56,6 +56,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpHead; @@ -96,7 +97,7 @@ public class Client { private boolean sticky = false; private Configuration conf; private boolean sslEnabled; - private HttpResponse resp; + private CloseableHttpResponse resp; private HttpGet httpGet = null; private HttpClientContext stickyContext = null; private BasicCredentialsProvider provider; @@ -108,6 +109,16 @@ public class Client { private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "="; private static final String COOKIE = "Cookie"; + public static final Header ACCEPT_PROTOBUF_HEADER = + new BasicHeader("Accept", Constants.MIMETYPE_PROTOBUF); + public static final Header ACCEPT_XML_HEADER = new BasicHeader("Accept", Constants.MIMETYPE_XML); + public static final Header ACCEPT_JSON_HEADER = + new BasicHeader("Accept", Constants.MIMETYPE_JSON); + + public static final Header[] ACCEPT_PROTOBUF_HEADER_ARR = new Header[] { ACCEPT_PROTOBUF_HEADER }; + public static final Header[] ACCEPT_XML_HEADER_ARR = new Header[] { ACCEPT_XML_HEADER }; + public static final Header[] ACCEPT_JSON_HEADER_ARR = new Header[] { ACCEPT_JSON_HEADER }; + /** * Default Constructor */ @@ -319,6 +330,7 @@ public static KeyStore loadTruststore(String trustStorePath, Optional tr * Shut down the client. Close any open persistent connections. */ public void shutdown() { + close(); } /** Returns the wrapped HttpClient */ @@ -368,8 +380,8 @@ public void removeExtraHeader(final String name) { * @param path the properly urlencoded path * @return the HTTP response code */ - public HttpResponse executePathOnly(Cluster cluster, HttpUriRequest method, Header[] headers, - String path) throws IOException { + public CloseableHttpResponse executePathOnly(Cluster cluster, HttpUriRequest method, + Header[] headers, String path) throws IOException { IOException lastException; if (cluster.nodes.size() < 1) { throw new IOException("Cluster is empty"); @@ -428,7 +440,7 @@ public HttpResponse executePathOnly(Cluster cluster, HttpUriRequest method, Head * @param uri a properly urlencoded URI * @return the HTTP response code */ - public HttpResponse executeURI(HttpUriRequest method, Header[] headers, String uri) + public CloseableHttpResponse executeURI(HttpUriRequest method, Header[] headers, String uri) throws IOException { // method.setURI(new URI(uri, true)); for (Map.Entry e : extraHeaders.entrySet()) { @@ -473,14 +485,29 @@ public HttpResponse executeURI(HttpUriRequest method, Header[] headers, String u * Execute a transaction method. Will call either executePathOnly or executeURI * depending on whether a path only is supplied in 'path', or if a complete URI is passed instead, * respectively. - * @param cluster the cluster definition * @param method the HTTP method * @param headers HTTP header values to send * @param path the properly urlencoded path or URI * @return the HTTP response code */ - public HttpResponse execute(Cluster cluster, HttpUriRequest method, Header[] headers, String path) - throws IOException { + public CloseableHttpResponse execute(Cluster cluster, HttpUriRequest method, Header[] headers, + String path) throws IOException { + if (path.startsWith("/")) { + return executePathOnly(cluster, method, headers, path); + } + return executeURI(method, headers, path); + } + + /** + * Execute a transaction method. Will call either executePathOnly or executeURI + * depending on whether a path only is supplied in 'path', or if a complete URI is passed instead, + * respectively. + * @param method the HTTP method + * @param headers HTTP header values to send + * @return the CloseableHttpResponse object + */ + public CloseableHttpResponse execute(HttpUriRequest method, Header[] headers) throws IOException { + String path = method.getURI().toASCIIString(); if (path.startsWith("/")) { return executePathOnly(cluster, method, headers, path); } @@ -623,7 +650,7 @@ public Response head(String path) throws IOException { public Response head(Cluster cluster, String path, Header[] headers) throws IOException { HttpHead method = new HttpHead(path); try { - HttpResponse resp = execute(cluster, method, null, path); + CloseableHttpResponse resp = execute(cluster, method, null, path); return new Response(resp.getStatusLine().getStatusCode(), resp.getAllHeaders(), null); } finally { method.releaseConnection(); @@ -730,7 +757,7 @@ public Response get(Cluster c, String path, Header[] headers) throws IOException httpGet.releaseConnection(); } httpGet = new HttpGet(path); - HttpResponse resp = execute(c, httpGet, headers, path); + CloseableHttpResponse resp = execute(c, httpGet, headers, path); return new Response(resp.getStatusLine().getStatusCode(), resp.getAllHeaders(), resp, resp.getEntity() == null ? null : resp.getEntity().getContent()); } @@ -820,7 +847,7 @@ public Response put(Cluster cluster, String path, Header[] headers, byte[] conte HttpPut method = new HttpPut(path); try { method.setEntity(new ByteArrayEntity(content)); - HttpResponse resp = execute(cluster, method, headers, path); + CloseableHttpResponse resp = execute(cluster, method, headers, path); headers = resp.getAllHeaders(); content = getResponseBody(resp); return new Response(resp.getStatusLine().getStatusCode(), headers, content); @@ -914,7 +941,7 @@ public Response post(Cluster cluster, String path, Header[] headers, byte[] cont HttpPost method = new HttpPost(path); try { method.setEntity(new ByteArrayEntity(content)); - HttpResponse resp = execute(cluster, method, headers, path); + CloseableHttpResponse resp = execute(cluster, method, headers, path); headers = resp.getAllHeaders(); content = getResponseBody(resp); return new Response(resp.getStatusLine().getStatusCode(), headers, content); @@ -952,7 +979,7 @@ public Response delete(String path, Header extraHdr) throws IOException { public Response delete(Cluster cluster, String path) throws IOException { HttpDelete method = new HttpDelete(path); try { - HttpResponse resp = execute(cluster, method, null, path); + CloseableHttpResponse resp = execute(cluster, method, null, path); Header[] headers = resp.getAllHeaders(); byte[] content = getResponseBody(resp); return new Response(resp.getStatusLine().getStatusCode(), headers, content); @@ -972,7 +999,7 @@ public Response delete(Cluster cluster, String path, Header extraHdr) throws IOE HttpDelete method = new HttpDelete(path); try { Header[] headers = { extraHdr }; - HttpResponse resp = execute(cluster, method, headers, path); + CloseableHttpResponse resp = execute(cluster, method, headers, path); headers = resp.getAllHeaders(); byte[] content = getResponseBody(resp); return new Response(resp.getStatusLine().getStatusCode(), headers, content); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/RowResourceBase.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/RowResourceBase.java index 004458d99e6b..144e625f1bbf 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/RowResourceBase.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/RowResourceBase.java @@ -237,7 +237,7 @@ protected static void checkValuePB(String table, String row, String column, Stri assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); CellSetModel cellSet = new CellSetModel(); - cellSet.getObjectFromMessage(response.getBody()); + cellSet.getObjectFromMessage(response.getStream()); RowModel rowModel = cellSet.getRows().get(0); CellModel cell = rowModel.getCells().get(0); assertEquals(Bytes.toString(cell.getColumn()), column); @@ -250,7 +250,7 @@ protected static void checkIncrementValuePB(String table, String row, String col assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); CellSetModel cellSet = new CellSetModel(); - cellSet.getObjectFromMessage(response.getBody()); + cellSet.getObjectFromMessage(response.getStream()); RowModel rowModel = cellSet.getRows().get(0); CellModel cell = rowModel.getCells().get(0); assertEquals(Bytes.toString(cell.getColumn()), column); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java index b20baea9df8c..a831782a9176 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CompatibilityFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.rest.client.Client; import org.apache.hadoop.hbase.rest.client.Response; import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; @@ -41,6 +42,8 @@ import org.apache.hadoop.hbase.testclassification.RestTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.http.Header; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; import org.apache.http.message.BasicHeader; import org.junit.ClassRule; import org.junit.Test; @@ -167,6 +170,39 @@ public void testMultipleCellCheckPutPB() throws IOException { assertEquals(200, response.getCode()); } + @Test + public void testMultipleCellPutPBEntity() throws IOException { + Response response = getValuePB(TABLE, ROW_1, COLUMN_1); + assertEquals(404, response.getCode()); + + CellSetModel model = new CellSetModel(); + RowModel row1 = new RowModel(ROW_1); + row1.addCell(new CellModel(Bytes.toBytes(COLUMN_1), Bytes.toBytes(VALUE_1))); + row1.addCell(new CellModel(Bytes.toBytes(COLUMN_2), Bytes.toBytes(VALUE_2))); + row1.addCell(new CellModel(Bytes.toBytes(COLUMN_3), Bytes.toBytes(VALUE_3))); + model.addRow(row1); + + // This illustrates how to use the lower level API to more efficiently marshall cells + // to Protobuf for mutations + StringBuilder path = new StringBuilder(); + path.append('/'); + path.append(TABLE); + path.append('/'); + path.append("dummy_row"); + HttpPut httpPut = new HttpPut(path.toString()); + httpPut.setEntity(new ProtobufHttpEntity(model)); + try (CloseableHttpResponse closableResponse = + client.execute(httpPut, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + assertEquals(200, closableResponse.getStatusLine().getStatusCode()); + } + checkValuePB(TABLE, ROW_1, COLUMN_1, VALUE_1); + checkValuePB(TABLE, ROW_1, COLUMN_2, VALUE_2); + checkValuePB(TABLE, ROW_1, COLUMN_3, VALUE_3); + + response = deleteRow(TABLE, ROW_1); + assertEquals(200, response.getCode()); + } + @Test public void testMultipleCellCheckPutXML() throws IOException, JAXBException { Response response = getValuePB(TABLE, ROW_1, COLUMN_1); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesInstanceResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesInstanceResource.java index ca013390a7b1..f5106d35d36b 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesInstanceResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesInstanceResource.java @@ -217,7 +217,7 @@ public void testGetNamespaceTablesAndCannotDeleteNamespace() throws IOException, response = client.get(namespacePath, Constants.MIMETYPE_PROTOBUF); assertEquals(200, response.getCode()); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); checkNamespaceProperties(model.getProperties(), nsProperties); // Check get namespace tables as XML, JSON and Protobuf. @@ -238,7 +238,7 @@ public void testGetNamespaceTablesAndCannotDeleteNamespace() throws IOException, response = client.get(namespacePath, Constants.MIMETYPE_PROTOBUF); assertEquals(200, response.getCode()); tablemodel.setTables(new ArrayList<>()); - tablemodel.getObjectFromMessage(response.getBody()); + tablemodel.getObjectFromMessage(response.getStream()); checkNamespaceTables(tablemodel.getTables(), nsTables); // Check cannot delete namespace via REST because it contains tables. diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesResource.java index 9508c95a1eb4..55cb8002da67 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestNamespacesResource.java @@ -169,7 +169,7 @@ public void testNamespaceListPBandDefault() throws IOException { // Check that REST GET finds only default namespaces via PB and default Accept header. response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF); assertEquals(200, response.getCode()); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); testNamespacesModel.checkModel(model, "hbase", "default"); response = client.get(schemaPath); assertEquals(200, response.getCode()); @@ -178,7 +178,7 @@ public void testNamespaceListPBandDefault() throws IOException { createNamespaceViaAdmin(admin, NAMESPACE1); response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF); assertEquals(200, response.getCode()); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); testNamespacesModel.checkModel(model, NAMESPACE1, "hbase", "default"); response = client.get(schemaPath); assertEquals(200, response.getCode()); @@ -187,7 +187,7 @@ public void testNamespaceListPBandDefault() throws IOException { createNamespaceViaAdmin(admin, NAMESPACE2); response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF); assertEquals(200, response.getCode()); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); testNamespacesModel.checkModel(model, NAMESPACE1, NAMESPACE2, "hbase", "default"); response = client.get(schemaPath); assertEquals(200, response.getCode()); @@ -196,7 +196,7 @@ public void testNamespaceListPBandDefault() throws IOException { admin.deleteNamespace(NAMESPACE1); response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF); assertEquals(200, response.getCode()); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); testNamespacesModel.checkModel(model, NAMESPACE2, "hbase", "default"); response = client.get(schemaPath); assertEquals(200, response.getCode()); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java index ebe49d53da11..8041c39f7fa0 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java @@ -149,7 +149,7 @@ private static int fullTableScan(ScannerModel model) throws IOException { if (response.getCode() == 200) { assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); CellSetModel cellSet = new CellSetModel(); - cellSet.getObjectFromMessage(response.getBody()); + cellSet.getObjectFromMessage(response.getStream()); Iterator rows = cellSet.getRows().iterator(); while (rows.hasNext()) { RowModel row = rows.next(); @@ -283,7 +283,7 @@ public void testSimpleScannerPB() throws IOException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); CellSetModel cellSet = new CellSetModel(); - cellSet.getObjectFromMessage(response.getBody()); + cellSet.getObjectFromMessage(response.getStream()); // confirm batch size conformance assertEquals(BATCH_SIZE, countCellSet(cellSet)); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSchemaResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSchemaResource.java index 45787e419861..8ad36fc9d10c 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSchemaResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSchemaResource.java @@ -225,7 +225,7 @@ public void testTableCreateAndDeletePB() throws IOException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); model = new TableSchemaModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); testTableSchemaModel.checkModel(model, TABLE2); // retrieve the schema and validate it with alternate pbuf type @@ -233,7 +233,7 @@ public void testTableCreateAndDeletePB() throws IOException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF_IETF, response.getHeader("content-type")); model = new TableSchemaModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); testTableSchemaModel.checkModel(model, TABLE2); if (csrfEnabled) { diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestStatusResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestStatusResource.java index a115fd17af3f..7ab230b849c3 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestStatusResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestStatusResource.java @@ -126,13 +126,13 @@ public void testGetClusterStatusPB() throws IOException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); StorageClusterStatusModel model = new StorageClusterStatusModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); validate(model); response = client.get("/status/cluster", Constants.MIMETYPE_PROTOBUF_IETF); assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF_IETF, response.getHeader("content-type")); model = new StorageClusterStatusModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); validate(model); } } diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestTableResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestTableResource.java index c27d8ee2347a..b9b386bd1c53 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestTableResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestTableResource.java @@ -203,13 +203,13 @@ public void testTableListPB() throws IOException, JAXBException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); TableListModel model = new TableListModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); checkTableList(model); response = client.get("/", Constants.MIMETYPE_PROTOBUF_IETF); assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF_IETF, response.getHeader("content-type")); model = new TableListModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); checkTableList(model); } @@ -243,13 +243,13 @@ public void testTableInfoPB() throws IOException, JAXBException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); TableInfoModel model = new TableInfoModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); checkTableInfo(model); response = client.get("/" + TABLE + "/regions", Constants.MIMETYPE_PROTOBUF_IETF); assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF_IETF, response.getHeader("content-type")); model = new TableInfoModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); checkTableInfo(model); } diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestVersionResource.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestVersionResource.java index 158d412971b1..1176ee86dea2 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestVersionResource.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestVersionResource.java @@ -152,13 +152,13 @@ public void testGetStargateVersionPB() throws IOException { assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); VersionModel model = new VersionModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); validate(model); response = client.get("/version", Constants.MIMETYPE_PROTOBUF_IETF); assertEquals(200, response.getCode()); assertEquals(Constants.MIMETYPE_PROTOBUF_IETF, response.getHeader("content-type")); model = new VersionModel(); - model.getObjectFromMessage(response.getBody()); + model.getObjectFromMessage(response.getStream()); validate(model); } diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java index cddbf1c2c46f..5ef828e6353f 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hbase.rest.client; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; @@ -28,13 +28,16 @@ import javax.xml.stream.XMLStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.rest.Constants; +import org.apache.hadoop.hbase.rest.ProtobufHttpEntity; import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel; import org.apache.hadoop.hbase.rest.model.StorageClusterVersionModel; import org.apache.hadoop.hbase.rest.model.TableListModel; import org.apache.hadoop.hbase.rest.model.TableSchemaModel; import org.apache.hadoop.hbase.rest.model.VersionModel; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -42,6 +45,7 @@ public class RemoteAdmin { final Client client; final Configuration conf; + // This is never used in the server. Maybe it was for some proxy thing ??? final String accessToken; final int maxRetries; final long sleepTime; @@ -114,24 +118,27 @@ public VersionModel getRestVersion() throws IOException { int code = 0; for (int i = 0; i < maxRetries; i++) { - Response response = client.get(path.toString(), Constants.MIMETYPE_PROTOBUF); - code = response.getCode(); - switch (code) { - case 200: - - VersionModel v = new VersionModel(); - return (VersionModel) v.getObjectFromMessage(response.getBody()); - case 404: - throw new IOException("REST version not found"); - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("get request to " + path.toString() + " returned " + code); + HttpGet get = new HttpGet(path.toString()); + try ( + CloseableHttpResponse response = client.execute(get, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + + VersionModel v = new VersionModel(); + return (VersionModel) v.getObjectFromMessage(response.getEntity().getContent()); + case 404: + throw new IOException("REST version not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path.toString() + " returned " + code); + } } } throw new IOException("get request to " + path.toString() + " timed out"); @@ -155,23 +162,27 @@ public StorageClusterStatusModel getClusterStatus() throws IOException { int code = 0; for (int i = 0; i < maxRetries; i++) { - Response response = client.get(path.toString(), Constants.MIMETYPE_PROTOBUF); - code = response.getCode(); - switch (code) { - case 200: - StorageClusterStatusModel s = new StorageClusterStatusModel(); - return (StorageClusterStatusModel) s.getObjectFromMessage(response.getBody()); - case 404: - throw new IOException("Cluster version not found"); - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("get request to " + path + " returned " + code); + HttpGet get = new HttpGet(path.toString()); + try ( + CloseableHttpResponse response = client.execute(get, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + StorageClusterStatusModel s = new StorageClusterStatusModel(); + return (StorageClusterStatusModel) s + .getObjectFromMessage(response.getEntity().getContent()); + case 404: + throw new IOException("Cluster version not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path + " returned " + code); + } } } throw new IOException("get request to " + path + " timed out"); @@ -194,30 +205,32 @@ public StorageClusterVersionModel getClusterVersion() throws IOException { int code = 0; for (int i = 0; i < maxRetries; i++) { - Response response = client.get(path.toString(), Constants.MIMETYPE_XML); - code = response.getCode(); - switch (code) { - case 200: - try { - - return (StorageClusterVersionModel) getUnmarsheller() - .unmarshal(getInputStream(response)); - } catch (JAXBException jaxbe) { - - throw new IOException("Issue parsing StorageClusterVersionModel object in XML form: " - + jaxbe.getLocalizedMessage(), jaxbe); - } - case 404: - throw new IOException("Cluster version not found"); - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException(path.toString() + " request returned " + code); + HttpGet get = new HttpGet(path.toString()); + try (CloseableHttpResponse response = client.execute(get, Client.ACCEPT_XML_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + try { + + return (StorageClusterVersionModel) getUnmarsheller() + .unmarshal(getInputStream(response.getEntity().getContent())); + } catch (JAXBException jaxbe) { + + throw new IOException("Issue parsing StorageClusterVersionModel object in XML form: " + + jaxbe.getLocalizedMessage(), jaxbe); + } + case 404: + throw new IOException("Cluster version not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException(path.toString() + " request returned " + code); + } } } throw new IOException("get request to " + path.toString() + " request timed out"); @@ -240,22 +253,25 @@ public boolean isTableAvailable(byte[] tableName) throws IOException { path.append("exists"); int code = 0; for (int i = 0; i < maxRetries; i++) { - Response response = client.get(path.toString(), Constants.MIMETYPE_PROTOBUF); - code = response.getCode(); - switch (code) { - case 200: - return true; - case 404: - return false; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("get request to " + path.toString() + " returned " + code); + HttpGet get = new HttpGet(path.toString()); + try ( + CloseableHttpResponse response = client.execute(get, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return true; + case 404: + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path.toString() + " returned " + code); + } } } throw new IOException("get request to " + path.toString() + " timed out"); @@ -279,21 +295,24 @@ public void createTable(TableDescriptor desc) throws IOException { path.append("schema"); int code = 0; for (int i = 0; i < maxRetries; i++) { - Response response = - client.put(path.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - code = response.getCode(); - switch (code) { - case 201: - return; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("create request to " + path.toString() + " returned " + code); + HttpPut put = new HttpPut(path.toString()); + put.setEntity(new ProtobufHttpEntity(model)); + try ( + CloseableHttpResponse response = client.execute(put, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 201: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("create request to " + path.toString() + " returned " + code); + } } } throw new IOException("create request to " + path.toString() + " timed out"); @@ -325,20 +344,23 @@ public void deleteTable(final byte[] tableName) throws IOException { path.append("schema"); int code = 0; for (int i = 0; i < maxRetries; i++) { - Response response = client.delete(path.toString()); - code = response.getCode(); - switch (code) { - case 200: - return; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("delete request to " + path.toString() + " returned " + code); + HttpGet get = new HttpGet(path.toString()); + try ( + CloseableHttpResponse response = client.execute(get, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("delete request to " + path.toString() + " returned " + code); + } } } throw new IOException("delete request to " + path.toString() + " timed out"); @@ -361,23 +383,27 @@ public TableListModel getTableList() throws IOException { for (int i = 0; i < maxRetries; i++) { // Response response = client.get(path.toString(), // Constants.MIMETYPE_XML); - Response response = client.get(path.toString(), Constants.MIMETYPE_PROTOBUF); - code = response.getCode(); - switch (code) { - case 200: - TableListModel t = new TableListModel(); - return (TableListModel) t.getObjectFromMessage(response.getBody()); - case 404: - throw new IOException("Table list not found"); - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("get request to " + path.toString() + " request returned " + code); + HttpGet get = new HttpGet(path.toString()); + try ( + CloseableHttpResponse response = client.execute(get, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + TableListModel t = new TableListModel(); + return (TableListModel) t.getObjectFromMessage(response.getEntity().getContent()); + case 404: + throw new IOException("Table list not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException( + "get request to " + path.toString() + " request returned " + code); + } } } throw new IOException("get request to " + path.toString() + " request timed out"); @@ -385,17 +411,17 @@ public TableListModel getTableList() throws IOException { /** * Convert the REST server's response to an XML reader. - * @param response The REST server's response. + * @param req Entity InputStream * @return A reader over the parsed XML document. * @throws IOException If the document fails to parse */ - private XMLStreamReader getInputStream(Response response) throws IOException { + private XMLStreamReader getInputStream(InputStream is) throws IOException { try { // Prevent the parser from reading XMl with external entities defined XMLInputFactory xif = XMLInputFactory.newFactory(); xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false); xif.setProperty(XMLInputFactory.SUPPORT_DTD, false); - return xif.createXMLStreamReader(new ByteArrayInputStream(response.getBody())); + return xif.createXMLStreamReader(is); } catch (XMLStreamException e) { throw new IOException("Failed to parse XML", e); } diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index c5db9a294926..21796e711df6 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -62,7 +62,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.rest.Constants; +import org.apache.hadoop.hbase.rest.ProtobufHttpEntity; import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.RowModel; @@ -70,6 +70,11 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -316,29 +321,32 @@ public Result[] get(List gets) throws IOException { private Result[] getResults(String spec) throws IOException { for (int i = 0; i < maxRetries; i++) { - Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF); - int code = response.getCode(); - switch (code) { - case 200: - CellSetModel model = new CellSetModel(); - model.getObjectFromMessage(response.getBody()); - Result[] results = buildResultFromModel(model); - if (results.length > 0) { - return results; - } - // fall through - case 404: - return new Result[0]; - - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("get request returned " + code); + HttpGet httpGet = new HttpGet(spec); + try (CloseableHttpResponse response = + client.execute(httpGet, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + CellSetModel model = new CellSetModel(); + model.getObjectFromMessage(response.getEntity().getContent()); + Result[] results = buildResultFromModel(model); + if (results.length > 0) { + return results; + } + // fall through + case 404: + return new Result[0]; + + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request returned " + code); + } } } throw new IOException("get request timed out"); @@ -348,7 +356,7 @@ private Result[] getResults(String spec) throws IOException { public boolean exists(Get get) throws IOException { LOG.warn("exists() is really get(), just use get()"); Result result = get(get); - return (result != null && !(result.isEmpty())); + return result != null && !result.isEmpty(); } @Override @@ -370,21 +378,24 @@ public void put(Put put) throws IOException { sb.append('/'); sb.append(toURLEncodedBytes(put.getRow())); for (int i = 0; i < maxRetries; i++) { - Response response = - client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - int code = response.getCode(); - switch (code) { - case 200: - return; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("put request failed with " + code); + HttpPut httpPut = new HttpPut(sb.toString()); + httpPut.setEntity(new ProtobufHttpEntity(model)); + try (CloseableHttpResponse response = + client.execute(httpPut, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("put request failed with " + code); + } } } throw new IOException("put request timed out"); @@ -425,21 +436,24 @@ public void put(List puts) throws IOException { sb.append(Bytes.toString(name)); sb.append("/$multiput"); // can be any nonexistent row for (int i = 0; i < maxRetries; i++) { - Response response = - client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - int code = response.getCode(); - switch (code) { - case 200: - return; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("multiput request failed with " + code); + HttpPut httpPut = new HttpPut(sb.toString()); + httpPut.setEntity(new ProtobufHttpEntity(model)); + try (CloseableHttpResponse response = + client.execute(httpPut, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("multiput request failed with " + code); + } } } throw new IOException("multiput request timed out"); @@ -450,20 +464,23 @@ public void delete(Delete delete) throws IOException { String spec = buildRowSpec(delete.getRow(), delete.getFamilyCellMap(), delete.getTimestamp(), delete.getTimestamp(), 1); for (int i = 0; i < maxRetries; i++) { - Response response = client.delete(spec); - int code = response.getCode(); - switch (code) { - case 200: - return; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("delete request failed with " + code); + HttpDelete httpDelete = new HttpDelete(spec); + try (CloseableHttpResponse response = + client.execute(httpDelete, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("delete request failed with " + code); + } } } throw new IOException("delete request timed out"); @@ -488,22 +505,25 @@ public TableDescriptor getDescriptor() throws IOException { sb.append('/'); sb.append("schema"); for (int i = 0; i < maxRetries; i++) { - Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF); - int code = response.getCode(); - switch (code) { - case 200: - TableSchemaModel schema = new TableSchemaModel(); - schema.getObjectFromMessage(response.getBody()); - return schema.getTableDescriptor(); - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("schema request returned " + code); + HttpGet httpGet = new HttpGet(sb.toString()); + try (CloseableHttpResponse response = + client.execute(httpGet, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + TableSchemaModel schema = new TableSchemaModel(); + schema.getObjectFromMessage(response.getEntity().getContent()); + return schema.getTableDescriptor(); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("schema request returned " + code); + } } } throw new IOException("schema request timed out"); @@ -528,22 +548,25 @@ public Scanner(Scan scan) throws IOException { sb.append('/'); sb.append("scanner"); for (int i = 0; i < maxRetries; i++) { - Response response = - client.post(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - int code = response.getCode(); - switch (code) { - case 201: - uri = response.getLocation(); - return; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("scan request failed with " + code); + HttpPost httpPost = new HttpPost(sb.toString()); + httpPost.setEntity(new ProtobufHttpEntity(model)); + try (CloseableHttpResponse response = + client.execute(httpPost, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 201: + uri = response.getFirstHeader("Location").getValue(); + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("scan request failed with " + code); + } } } throw new IOException("scan request timed out"); @@ -552,25 +575,28 @@ public Scanner(Scan scan) throws IOException { public Result[] nextBatch() throws IOException { StringBuilder sb = new StringBuilder(uri); for (int i = 0; i < maxRetries; i++) { - Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF); - int code = response.getCode(); - switch (code) { - case 200: - CellSetModel model = new CellSetModel(); - model.getObjectFromMessage(response.getBody()); - return buildResultFromModel(model); - case 204: - case 206: - return null; - case 509: - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("scanner.next request failed with " + code); + HttpGet httpGet = new HttpGet(sb.toString()); + try (CloseableHttpResponse response = + client.execute(httpGet, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + CellSetModel model = new CellSetModel(); + model.getObjectFromMessage(response.getEntity().getContent()); + return buildResultFromModel(model); + case 204: + case 206: + return null; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("scanner.next request failed with " + code); + } } } throw new IOException("scanner.next request timed out"); @@ -701,23 +727,26 @@ private boolean doCheckAndPut(byte[] row, byte[] family, byte[] qualifier, byte[ sb.append("?check=put"); for (int i = 0; i < maxRetries; i++) { - Response response = - client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - int code = response.getCode(); - switch (code) { - case 200: - return true; - case 304: // NOT-MODIFIED - return false; - case 509: - try { - Thread.sleep(sleepTime); - } catch (final InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("checkAndPut request failed with " + code); + HttpPut httpPut = new HttpPut(sb.toString()); + httpPut.setEntity(new ProtobufHttpEntity(model)); + try (CloseableHttpResponse response = + client.execute(httpPut, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return true; + case 304: // NOT-MODIFIED + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("checkAndPut request failed with " + code); + } } } throw new IOException("checkAndPut request timed out"); @@ -737,23 +766,26 @@ private boolean doCheckAndDelete(byte[] row, byte[] family, byte[] qualifier, by sb.append("?check=delete"); for (int i = 0; i < maxRetries; i++) { - Response response = - client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - int code = response.getCode(); - switch (code) { - case 200: - return true; - case 304: // NOT-MODIFIED - return false; - case 509: - try { - Thread.sleep(sleepTime); - } catch (final InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - break; - default: - throw new IOException("checkAndDelete request failed with " + code); + HttpPut httpPut = new HttpPut(sb.toString()); + httpPut.setEntity(new ProtobufHttpEntity(model)); + try (CloseableHttpResponse response = + client.execute(httpPut, Client.ACCEPT_PROTOBUF_HEADER_ARR)) { + int code = response.getStatusLine().getStatusCode(); + switch (code) { + case 200: + return true; + case 304: // NOT-MODIFIED + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("checkAndDelete request failed with " + code); + } } } throw new IOException("checkAndDelete request timed out"); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteAdminRetries.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteAdminRetries.java index 0db1b4afa1dc..857b6d747b25 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteAdminRetries.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteAdminRetries.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -37,6 +36,9 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.http.ProtocolVersion; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.message.BasicStatusLine; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -64,11 +66,11 @@ public class TestRemoteAdminRetries { @Before public void setup() throws Exception { client = mock(Client.class); - Response response = new Response(509); - when(client.get(anyString(), anyString())).thenReturn(response); - when(client.delete(anyString())).thenReturn(response); - when(client.put(anyString(), anyString(), any())).thenReturn(response); - when(client.post(anyString(), anyString(), any())).thenReturn(response); + CloseableHttpResponse response = mock(CloseableHttpResponse.class); + when(response.getStatusLine()) + .thenReturn(new BasicStatusLine(new ProtocolVersion("Http", 1, 0), 509, "test")); + + when(client.execute(any(), any())).thenReturn(response); Configuration configuration = TEST_UTIL.getConfiguration(); configuration.setInt("hbase.rest.client.max.retries", RETRIES); @@ -126,7 +128,7 @@ public void run() throws Exception { .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf("TestTable")).build()); } }); - verify(client, times(RETRIES)).put(anyString(), anyString(), any()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -137,7 +139,7 @@ public void run() throws Exception { remoteAdmin.deleteTable("TestTable"); } }); - verify(client, times(RETRIES)).delete(anyString()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -152,7 +154,7 @@ public void run() throws Exception { private void testTimedOutGetCall(CallExecutor callExecutor) throws Exception { testTimedOutCall(callExecutor); - verify(client, times(RETRIES)).get(anyString(), anyString()); + verify(client, times(RETRIES)).execute(any(), any()); } private void testTimedOutCall(CallExecutor callExecutor) throws Exception { diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteHTableRetries.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteHTableRetries.java index 65c8a4f6c7fe..852bcb216baf 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteHTableRetries.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteHTableRetries.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -40,6 +39,9 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.http.ProtocolVersion; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.message.BasicStatusLine; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -72,12 +74,11 @@ public class TestRemoteHTableRetries { @Before public void setup() throws Exception { client = mock(Client.class); - Response response = new Response(509); - when(client.get(anyString(), anyString())).thenReturn(response); - when(client.delete(anyString())).thenReturn(response); - when(client.put(anyString(), anyString(), any())).thenReturn(response); - when(client.post(anyString(), anyString(), any())).thenReturn(response); + CloseableHttpResponse response = mock(CloseableHttpResponse.class); + when(response.getStatusLine()) + .thenReturn(new BasicStatusLine(new ProtocolVersion("Http", 1, 0), 509, "test")); + when(client.execute(any(), any())).thenReturn(response); Configuration configuration = TEST_UTIL.getConfiguration(); configuration.setInt("hbase.rest.client.max.retries", RETRIES); configuration.setInt("hbase.rest.client.sleep", SLEEP_TIME); @@ -99,7 +100,7 @@ public void run() throws Exception { remoteTable.delete(delete); } }); - verify(client, times(RETRIES)).delete(anyString()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -120,7 +121,7 @@ public void run() throws Exception { remoteTable.put(new Put(Bytes.toBytes("Row"))); } }); - verify(client, times(RETRIES)).put(anyString(), anyString(), any()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -132,7 +133,7 @@ public void run() throws Exception { remoteTable.put(Arrays.asList(puts)); } }); - verify(client, times(RETRIES)).put(anyString(), anyString(), any()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -143,7 +144,7 @@ public void run() throws Exception { remoteTable.getScanner(new Scan()); } }); - verify(client, times(RETRIES)).post(anyString(), anyString(), any()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -157,7 +158,7 @@ public void run() throws Exception { .thenPut(put); } }); - verify(client, times(RETRIES)).put(anyString(), anyString(), any()); + verify(client, times(RETRIES)).execute(any(), any()); } @Test @@ -176,7 +177,7 @@ public void run() throws Exception { private void testTimedOutGetCall(CallExecutor callExecutor) throws Exception { testTimedOutCall(callExecutor); - verify(client, times(RETRIES)).get(anyString(), anyString()); + verify(client, times(RETRIES)).execute(any(), any()); } private void testTimedOutCall(CallExecutor callExecutor) throws Exception { diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java index fcbe7c7ca02e..1ce85460dc7c 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java @@ -629,7 +629,7 @@ public void testLongLivedScan() throws Exception { REST_TEST_UTIL.shutdownServletContainer(); // Set the ConnectionCache timeout to trigger halfway through the trials - TEST_UTIL.getConfiguration().setLong(RESTServlet.MAX_IDLETIME, (numTrials / 2) * trialPause); + TEST_UTIL.getConfiguration().setLong(RESTServlet.MAX_IDLETIME, (numTrials / 2L) * trialPause); TEST_UTIL.getConfiguration().setLong(RESTServlet.CLEANUP_INTERVAL, cleanUpInterval); // Start the Rest Servlet container diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestXmlParsing.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestXmlParsing.java index 2618ff54180b..a39712739352 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestXmlParsing.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestXmlParsing.java @@ -20,18 +20,26 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import javax.xml.bind.UnmarshalException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.rest.Constants; import org.apache.hadoop.hbase.rest.model.StorageClusterVersionModel; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.ProtocolVersion; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.message.BasicHeader; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.protocol.HTTP; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,10 +63,21 @@ public void testParsingClusterVersion() throws Exception { final String xml = "" + ""; Client client = mock(Client.class); - RemoteAdmin admin = new RemoteAdmin(client, HBaseConfiguration.create(), null); - Response resp = new Response(200, null, Bytes.toBytes(xml)); - when(client.get("/version/cluster", Constants.MIMETYPE_XML)).thenReturn(resp); + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()) + .thenReturn(new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8))); + when(entity.getContentType()) + .thenReturn(new BasicHeader(HTTP.CONTENT_TYPE, Constants.MIMETYPE_XML)); + + CloseableHttpResponse response = mock(CloseableHttpResponse.class); + when(response.getStatusLine()) + .thenReturn(new BasicStatusLine(new ProtocolVersion("Http", 1, 0), 200, "OK")); + when(response.getEntity()).thenReturn(entity); + + when(client.execute(any(), any())).thenReturn(response); + + RemoteAdmin admin = new RemoteAdmin(client, HBaseConfiguration.create(), null); StorageClusterVersionModel cv = admin.getClusterVersion(); assertEquals("2.0.0", cv.getVersion()); @@ -70,10 +89,20 @@ public void testFailOnExternalEntities() throws Exception { + " ] >" + " &xee;"; Client client = mock(Client.class); - RemoteAdmin admin = new RemoteAdmin(client, HBaseConfiguration.create(), null); - Response resp = new Response(200, null, Bytes.toBytes(externalEntitiesXml)); - when(client.get("/version/cluster", Constants.MIMETYPE_XML)).thenReturn(resp); + HttpEntity entity = mock(HttpEntity.class); + when(entity.getContent()).thenReturn(new ByteArrayInputStream(externalEntitiesXml.getBytes())); + when(entity.getContentType()) + .thenReturn(new BasicHeader(HTTP.CONTENT_TYPE, Constants.MIMETYPE_XML)); + + CloseableHttpResponse response = mock(CloseableHttpResponse.class); + when(response.getStatusLine()) + .thenReturn(new BasicStatusLine(new ProtocolVersion("Http", 1, 0), 200, "OK")); + when(response.getEntity()).thenReturn(entity); + + when(client.execute(any(), any())).thenReturn(response); + + RemoteAdmin admin = new RemoteAdmin(client, HBaseConfiguration.create(), null); try { admin.getClusterVersion();