Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rest;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.yetus.audience.InterfaceAudience;

/**
* For sending a Protobuf encoded object via Apache HttpClient efficiently without an interim byte
* array. This exposes the underlying Apache HttpClient types, but so do the other client classes.
*/
@InterfaceAudience.Public
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be IA.Public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we expect the user to explicitly create these entities.

i have not updated the tests to use them, but the intentended usage is similar to how
org.apache.hadoop.hbase.rest.client.RemoteHTable.put(Put) is implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least have a section in the javadoc to show how to use this class?

And since we have not reach an agreement on whether to move RemoteAdmin and RemoteHTable to src/main, I'm not sure whether we should make this class IA.Public...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only mentined RemoHTable as example where this is used, but this
is not limited to RemotHTable, it should also be used directly in user code.

I will add a standalone test case that uses this class as an illustration.

Copy link
Contributor Author

@stoty stoty Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least have a section in the javadoc to show how to use this class?

And since we have not reach an agreement on whether to move RemoteAdmin and RemoteHTable to src/main, I'm not sure whether we should make this class IA.Public...

This is not tied to RemoteAdmin / RemoteHTable.
This is just a more efficient way to use the existing public Client class (or any custom client based on Apache HttpClient)

The RemoteAdmin / RemoteHTable API does not even change, this new class only used internally in the RemoteHTable implementation.

public class ProtobufHttpEntity implements HttpEntity {

private ProtobufMessageHandler handler;

public ProtobufHttpEntity(ProtobufMessageHandler handler) {
this.handler = handler;
}

@Override
public boolean isRepeatable() {
return false;
}

@Override
public boolean isChunked() {
return true;
}

@Override
public long getContentLength() {
return -1;
}

@Override
public Header getContentType() {
return new BasicHeader(HTTP.CONTENT_TYPE, Constants.MIMETYPE_PROTOBUF_IETF);
}

@Override
public Header getContentEncoding() {
return null;
}

@Override
public InputStream getContent() throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException("only writeTo is supported");
}

@Override
public void writeTo(OutputStream outStream) throws IOException {
handler.writeProtobufOutput(outStream);
}

@Override
public boolean isStreaming() {
return true;
}

@Override
public void consumeContent() throws IOException {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class Client {
private boolean sticky = false;
private Configuration conf;
private boolean sslEnabled;
private HttpResponse resp;
private CloseableHttpResponse resp;
private HttpGet httpGet = null;
private HttpClientContext stickyContext = null;
private BasicCredentialsProvider provider;
Expand All @@ -108,6 +109,16 @@ public class Client {
private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
private static final String COOKIE = "Cookie";

public static final Header ACCEPT_PROTOBUF_HEADER =
new BasicHeader("Accept", Constants.MIMETYPE_PROTOBUF);
public static final Header ACCEPT_XML_HEADER = new BasicHeader("Accept", Constants.MIMETYPE_XML);
public static final Header ACCEPT_JSON_HEADER =
new BasicHeader("Accept", Constants.MIMETYPE_JSON);

public static final Header[] ACCEPT_PROTOBUF_HEADER_ARR = new Header[] { ACCEPT_PROTOBUF_HEADER };
public static final Header[] ACCEPT_XML_HEADER_ARR = new Header[] { ACCEPT_XML_HEADER };
public static final Header[] ACCEPT_JSON_HEADER_ARR = new Header[] { ACCEPT_JSON_HEADER };

/**
* Default Constructor
*/
Expand Down Expand Up @@ -319,6 +330,7 @@ public static KeyStore loadTruststore(String trustStorePath, Optional<String> tr
* Shut down the client. Close any open persistent connections.
*/
public void shutdown() {
close();
}

/** Returns the wrapped HttpClient */
Expand Down Expand Up @@ -368,8 +380,8 @@ public void removeExtraHeader(final String name) {
* @param path the properly urlencoded path
* @return the HTTP response code
*/
public HttpResponse executePathOnly(Cluster cluster, HttpUriRequest method, Header[] headers,
String path) throws IOException {
public CloseableHttpResponse executePathOnly(Cluster cluster, HttpUriRequest method,
Header[] headers, String path) throws IOException {
IOException lastException;
if (cluster.nodes.size() < 1) {
throw new IOException("Cluster is empty");
Expand Down Expand Up @@ -428,7 +440,7 @@ public HttpResponse executePathOnly(Cluster cluster, HttpUriRequest method, Head
* @param uri a properly urlencoded URI
* @return the HTTP response code
*/
public HttpResponse executeURI(HttpUriRequest method, Header[] headers, String uri)
public CloseableHttpResponse executeURI(HttpUriRequest method, Header[] headers, String uri)
throws IOException {
// method.setURI(new URI(uri, true));
for (Map.Entry<String, String> e : extraHeaders.entrySet()) {
Expand Down Expand Up @@ -473,14 +485,29 @@ public HttpResponse executeURI(HttpUriRequest method, Header[] headers, String u
* Execute a transaction method. Will call either <tt>executePathOnly</tt> or <tt>executeURI</tt>
* depending on whether a path only is supplied in 'path', or if a complete URI is passed instead,
* respectively.
* @param cluster the cluster definition
* @param method the HTTP method
* @param headers HTTP header values to send
* @param path the properly urlencoded path or URI
* @return the HTTP response code
*/
public HttpResponse execute(Cluster cluster, HttpUriRequest method, Header[] headers, String path)
throws IOException {
public CloseableHttpResponse execute(Cluster cluster, HttpUriRequest method, Header[] headers,
String path) throws IOException {
if (path.startsWith("/")) {
return executePathOnly(cluster, method, headers, path);
}
return executeURI(method, headers, path);
}

/**
* Execute a transaction method. Will call either <tt>executePathOnly</tt> or <tt>executeURI</tt>
* depending on whether a path only is supplied in 'path', or if a complete URI is passed instead,
* respectively.
* @param method the HTTP method
* @param headers HTTP header values to send
* @return the CloseableHttpResponse object
*/
public CloseableHttpResponse execute(HttpUriRequest method, Header[] headers) throws IOException {
String path = method.getURI().toASCIIString();
if (path.startsWith("/")) {
return executePathOnly(cluster, method, headers, path);
}
Expand Down Expand Up @@ -623,7 +650,7 @@ public Response head(String path) throws IOException {
public Response head(Cluster cluster, String path, Header[] headers) throws IOException {
HttpHead method = new HttpHead(path);
try {
HttpResponse resp = execute(cluster, method, null, path);
CloseableHttpResponse resp = execute(cluster, method, null, path);
return new Response(resp.getStatusLine().getStatusCode(), resp.getAllHeaders(), null);
} finally {
method.releaseConnection();
Expand Down Expand Up @@ -730,7 +757,7 @@ public Response get(Cluster c, String path, Header[] headers) throws IOException
httpGet.releaseConnection();
}
httpGet = new HttpGet(path);
HttpResponse resp = execute(c, httpGet, headers, path);
CloseableHttpResponse resp = execute(c, httpGet, headers, path);
return new Response(resp.getStatusLine().getStatusCode(), resp.getAllHeaders(), resp,
resp.getEntity() == null ? null : resp.getEntity().getContent());
}
Expand Down Expand Up @@ -820,7 +847,7 @@ public Response put(Cluster cluster, String path, Header[] headers, byte[] conte
HttpPut method = new HttpPut(path);
try {
method.setEntity(new ByteArrayEntity(content));
HttpResponse resp = execute(cluster, method, headers, path);
CloseableHttpResponse resp = execute(cluster, method, headers, path);
headers = resp.getAllHeaders();
content = getResponseBody(resp);
return new Response(resp.getStatusLine().getStatusCode(), headers, content);
Expand Down Expand Up @@ -914,7 +941,7 @@ public Response post(Cluster cluster, String path, Header[] headers, byte[] cont
HttpPost method = new HttpPost(path);
try {
method.setEntity(new ByteArrayEntity(content));
HttpResponse resp = execute(cluster, method, headers, path);
CloseableHttpResponse resp = execute(cluster, method, headers, path);
headers = resp.getAllHeaders();
content = getResponseBody(resp);
return new Response(resp.getStatusLine().getStatusCode(), headers, content);
Expand Down Expand Up @@ -952,7 +979,7 @@ public Response delete(String path, Header extraHdr) throws IOException {
public Response delete(Cluster cluster, String path) throws IOException {
HttpDelete method = new HttpDelete(path);
try {
HttpResponse resp = execute(cluster, method, null, path);
CloseableHttpResponse resp = execute(cluster, method, null, path);
Header[] headers = resp.getAllHeaders();
byte[] content = getResponseBody(resp);
return new Response(resp.getStatusLine().getStatusCode(), headers, content);
Expand All @@ -972,7 +999,7 @@ public Response delete(Cluster cluster, String path, Header extraHdr) throws IOE
HttpDelete method = new HttpDelete(path);
try {
Header[] headers = { extraHdr };
HttpResponse resp = execute(cluster, method, headers, path);
CloseableHttpResponse resp = execute(cluster, method, headers, path);
headers = resp.getAllHeaders();
byte[] content = getResponseBody(resp);
return new Response(resp.getStatusLine().getStatusCode(), headers, content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ protected static void checkValuePB(String table, String row, String column, Stri
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
CellSetModel cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
cellSet.getObjectFromMessage(response.getStream());
RowModel rowModel = cellSet.getRows().get(0);
CellModel cell = rowModel.getCells().get(0);
assertEquals(Bytes.toString(cell.getColumn()), column);
Expand All @@ -250,7 +250,7 @@ protected static void checkIncrementValuePB(String table, String row, String col
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
CellSetModel cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
cellSet.getObjectFromMessage(response.getStream());
RowModel rowModel = cellSet.getRows().get(0);
CellModel cell = rowModel.getCells().get(0);
assertEquals(Bytes.toString(cell.getColumn()), column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.rest.client.Client;
import org.apache.hadoop.hbase.rest.client.Response;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
Expand All @@ -41,6 +42,8 @@
import org.apache.hadoop.hbase.testclassification.RestTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.message.BasicHeader;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -167,6 +170,39 @@ public void testMultipleCellCheckPutPB() throws IOException {
assertEquals(200, response.getCode());
}

@Test
public void testMultipleCellPutPBEntity() throws IOException {
Response response = getValuePB(TABLE, ROW_1, COLUMN_1);
assertEquals(404, response.getCode());

CellSetModel model = new CellSetModel();
RowModel row1 = new RowModel(ROW_1);
row1.addCell(new CellModel(Bytes.toBytes(COLUMN_1), Bytes.toBytes(VALUE_1)));
row1.addCell(new CellModel(Bytes.toBytes(COLUMN_2), Bytes.toBytes(VALUE_2)));
row1.addCell(new CellModel(Bytes.toBytes(COLUMN_3), Bytes.toBytes(VALUE_3)));
model.addRow(row1);

// This illustrates how to use the lower level API to more efficiently marshall cells
// to Protobuf for mutations
StringBuilder path = new StringBuilder();
path.append('/');
path.append(TABLE);
path.append('/');
path.append("dummy_row");
HttpPut httpPut = new HttpPut(path.toString());
httpPut.setEntity(new ProtobufHttpEntity(model));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how a normal client can use ProtobufHttpEntity, @Apache9 .

try (CloseableHttpResponse closableResponse =
client.execute(httpPut, Client.ACCEPT_PROTOBUF_HEADER_ARR)) {
assertEquals(200, closableResponse.getStatusLine().getStatusCode());
}
checkValuePB(TABLE, ROW_1, COLUMN_1, VALUE_1);
checkValuePB(TABLE, ROW_1, COLUMN_2, VALUE_2);
checkValuePB(TABLE, ROW_1, COLUMN_3, VALUE_3);

response = deleteRow(TABLE, ROW_1);
assertEquals(200, response.getCode());
}

@Test
public void testMultipleCellCheckPutXML() throws IOException, JAXBException {
Response response = getValuePB(TABLE, ROW_1, COLUMN_1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void testGetNamespaceTablesAndCannotDeleteNamespace() throws IOException,

response = client.get(namespacePath, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
model.getObjectFromMessage(response.getBody());
model.getObjectFromMessage(response.getStream());
checkNamespaceProperties(model.getProperties(), nsProperties);

// Check get namespace tables as XML, JSON and Protobuf.
Expand All @@ -238,7 +238,7 @@ public void testGetNamespaceTablesAndCannotDeleteNamespace() throws IOException,
response = client.get(namespacePath, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
tablemodel.setTables(new ArrayList<>());
tablemodel.getObjectFromMessage(response.getBody());
tablemodel.getObjectFromMessage(response.getStream());
checkNamespaceTables(tablemodel.getTables(), nsTables);

// Check cannot delete namespace via REST because it contains tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testNamespaceListPBandDefault() throws IOException {
// Check that REST GET finds only default namespaces via PB and default Accept header.
response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
model.getObjectFromMessage(response.getBody());
model.getObjectFromMessage(response.getStream());
testNamespacesModel.checkModel(model, "hbase", "default");
response = client.get(schemaPath);
assertEquals(200, response.getCode());
Expand All @@ -178,7 +178,7 @@ public void testNamespaceListPBandDefault() throws IOException {
createNamespaceViaAdmin(admin, NAMESPACE1);
response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
model.getObjectFromMessage(response.getBody());
model.getObjectFromMessage(response.getStream());
testNamespacesModel.checkModel(model, NAMESPACE1, "hbase", "default");
response = client.get(schemaPath);
assertEquals(200, response.getCode());
Expand All @@ -187,7 +187,7 @@ public void testNamespaceListPBandDefault() throws IOException {
createNamespaceViaAdmin(admin, NAMESPACE2);
response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
model.getObjectFromMessage(response.getBody());
model.getObjectFromMessage(response.getStream());
testNamespacesModel.checkModel(model, NAMESPACE1, NAMESPACE2, "hbase", "default");
response = client.get(schemaPath);
assertEquals(200, response.getCode());
Expand All @@ -196,7 +196,7 @@ public void testNamespaceListPBandDefault() throws IOException {
admin.deleteNamespace(NAMESPACE1);
response = client.get(schemaPath, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
model.getObjectFromMessage(response.getBody());
model.getObjectFromMessage(response.getStream());
testNamespacesModel.checkModel(model, NAMESPACE2, "hbase", "default");
response = client.get(schemaPath);
assertEquals(200, response.getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private static int fullTableScan(ScannerModel model) throws IOException {
if (response.getCode() == 200) {
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
CellSetModel cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
cellSet.getObjectFromMessage(response.getStream());
Iterator<RowModel> rows = cellSet.getRows().iterator();
while (rows.hasNext()) {
RowModel row = rows.next();
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testSimpleScannerPB() throws IOException {
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
CellSetModel cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
cellSet.getObjectFromMessage(response.getStream());
// confirm batch size conformance
assertEquals(BATCH_SIZE, countCellSet(cellSet));

Expand Down
Loading