Skip to content

Commit 473ffc2

Browse files
committed
Revert "HBASE-28001: Add request attribute support to BufferedMutator (apache#6076)"
This reverts commit 7d019d9.
1 parent 12e4d4d commit 473ffc2

File tree

12 files changed

+349
-587
lines changed

12 files changed

+349
-587
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.Closeable;
2121
import java.util.Collections;
2222
import java.util.List;
23-
import java.util.Map;
2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.concurrent.TimeUnit;
2625
import org.apache.hadoop.conf.Configuration;
@@ -94,11 +93,4 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
9493
default long getPeriodicalFlushTimeout(TimeUnit unit) {
9594
throw new UnsupportedOperationException("Not implemented");
9695
}
97-
98-
/**
99-
* Returns the rpc request attributes.
100-
*/
101-
default Map<String, byte[]> getRequestAttributes() {
102-
return Collections.emptyMap();
103-
}
10496
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
2121

22-
import java.util.Map;
2322
import java.util.concurrent.TimeUnit;
2423
import org.apache.yetus.audience.InterfaceAudience;
2524

@@ -104,16 +103,6 @@ default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
104103
*/
105104
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
106105

107-
/**
108-
* Set a rpc request attribute.
109-
*/
110-
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);
111-
112-
/**
113-
* Set multiple rpc request attributes.
114-
*/
115-
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);
116-
117106
/**
118107
* Create the {@link AsyncBufferedMutator} instance.
119108
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20-
import java.util.Map;
2120
import java.util.concurrent.TimeUnit;
2221
import org.apache.yetus.audience.InterfaceAudience;
2322

@@ -79,20 +78,6 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
7978
return this;
8079
}
8180

82-
@Override
83-
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
84-
tableBuilder.setRequestAttribute(key, value);
85-
return this;
86-
}
87-
88-
@Override
89-
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
90-
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
91-
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
92-
}
93-
return this;
94-
}
95-
9681
@Override
9782
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
9883
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.ArrayList;
2525
import java.util.Iterator;
2626
import java.util.List;
27-
import java.util.Map;
2827
import java.util.concurrent.CompletableFuture;
2928
import java.util.concurrent.TimeUnit;
3029
import java.util.stream.Collectors;
@@ -171,9 +170,4 @@ public long getWriteBufferSize() {
171170
public long getPeriodicalFlushTimeout(TimeUnit unit) {
172171
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
173172
}
174-
175-
@Override
176-
public Map<String, byte[]> getRequestAttributes() {
177-
return table.getRequestAttributes();
178-
}
179173
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
2323

2424
import com.google.protobuf.RpcChannel;
25-
import java.util.Collections;
2625
import java.util.List;
2726
import java.util.Map;
2827
import java.util.concurrent.CompletableFuture;
2928
import java.util.concurrent.TimeUnit;
3029
import java.util.function.Function;
30+
import org.apache.commons.lang3.NotImplementedException;
3131
import org.apache.hadoop.conf.Configuration;
3232
import org.apache.hadoop.hbase.CompareOperator;
3333
import org.apache.hadoop.hbase.TableName;
@@ -117,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
117117
* @return a map of request attributes supplied by the client
118118
*/
119119
default Map<String, byte[]> getRequestAttributes() {
120-
return Collections.emptyMap();
120+
throw new NotImplementedException("Add an implementation!");
121121
}
122122

123123
/**

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22-
import java.util.Collections;
2322
import java.util.List;
24-
import java.util.Map;
2523
import org.apache.hadoop.conf.Configuration;
2624
import org.apache.hadoop.hbase.TableName;
2725
import org.apache.yetus.audience.InterfaceAudience;
@@ -196,13 +194,6 @@ default void setOperationTimeout(int timeout) {
196194
"The BufferedMutator::setOperationTimeout has not been implemented");
197195
}
198196

199-
/**
200-
* Returns the rpc request attributes.
201-
*/
202-
default Map<String, byte[]> getRequestAttributes() {
203-
return Collections.emptyMap();
204-
}
205-
206197
/**
207198
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
208199
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Collections;
2727
import java.util.Iterator;
2828
import java.util.List;
29-
import java.util.Map;
3029
import java.util.NoSuchElementException;
3130
import java.util.Timer;
3231
import java.util.TimerTask;
@@ -90,7 +89,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
9089
private final ExecutorService pool;
9190
private final AtomicInteger rpcTimeout;
9291
private final AtomicInteger operationTimeout;
93-
private final Map<String, byte[]> requestAttributes;
9492
private final boolean cleanupPoolOnClose;
9593
private volatile boolean closed = false;
9694
private final AsyncProcess ap;
@@ -137,9 +135,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
137135
this.operationTimeout = new AtomicInteger(params.getOperationTimeout() != UNSET
138136
? params.getOperationTimeout()
139137
: conn.getConnectionConfiguration().getOperationTimeout());
140-
141-
this.requestAttributes = params.getRequestAttributes();
142-
143138
this.ap = ap;
144139
}
145140

@@ -257,8 +252,7 @@ public synchronized void close() throws IOException {
257252

258253
private AsyncProcessTask createTask(QueueRowAccess access) {
259254
return new AsyncProcessTask(AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
260-
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
261-
.setRequestAttributes(requestAttributes).build()) {
255+
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE).build()) {
262256
@Override
263257
public int getRpcTimeout() {
264258
return rpcTimeout.get();
@@ -397,11 +391,6 @@ public void setOperationTimeout(int operationTimeout) {
397391
this.operationTimeout.set(operationTimeout);
398392
}
399393

400-
@Override
401-
public Map<String, byte[]> getRequestAttributes() {
402-
return requestAttributes;
403-
}
404-
405394
long getCurrentWriteBufferSize() {
406395
return currentWriteBufferSize.get();
407396
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20-
import java.util.Collections;
21-
import java.util.HashMap;
22-
import java.util.Map;
2320
import java.util.concurrent.ExecutorService;
2421
import org.apache.hadoop.hbase.TableName;
2522
import org.apache.yetus.audience.InterfaceAudience;
@@ -41,7 +38,6 @@ public class BufferedMutatorParams implements Cloneable {
4138
private String implementationClassName = null;
4239
private int rpcTimeout = UNSET;
4340
private int operationTimeout = UNSET;
44-
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
4541
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
4642
@Override
4743
public void onException(RetriesExhaustedWithDetailsException exception,
@@ -89,18 +85,6 @@ public int getOperationTimeout() {
8985
return operationTimeout;
9086
}
9187

92-
public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
93-
if (requestAttributes.isEmpty()) {
94-
requestAttributes = new HashMap<>();
95-
}
96-
requestAttributes.put(key, value);
97-
return this;
98-
}
99-
100-
public Map<String, byte[]> getRequestAttributes() {
101-
return requestAttributes;
102-
}
103-
10488
/**
10589
* Override the write buffer size specified by the provided {@link Connection}'s
10690
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key

hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,15 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22-
import java.util.HashMap;
2322
import java.util.List;
24-
import java.util.Map;
2523
import java.util.concurrent.Callable;
2624
import java.util.concurrent.ExecutionException;
2725
import java.util.concurrent.ExecutorService;
2826
import java.util.concurrent.Executors;
2927
import java.util.concurrent.Future;
3028
import java.util.concurrent.TimeUnit;
3129
import java.util.concurrent.TimeoutException;
32-
import org.apache.hadoop.conf.Configuration;
3330
import org.apache.hadoop.conf.Configured;
34-
import org.apache.hadoop.hbase.AuthUtil;
3531
import org.apache.hadoop.hbase.TableName;
3632
import org.apache.hadoop.hbase.client.BufferedMutator;
3733
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
@@ -71,18 +67,12 @@ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator
7167
}
7268
}
7369
};
74-
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener)
75-
.setRequestAttribute("requestInfo", Bytes.toBytes("bar"));
70+
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener);
7671

7772
//
7873
// step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
7974
//
80-
Map<String, byte[]> connectionAttributes = new HashMap<>();
81-
connectionAttributes.put("clientId", Bytes.toBytes("foo"));
82-
Configuration conf = getConf();
83-
try (
84-
final Connection conn = ConnectionFactory.createConnection(conf, null,
85-
AuthUtil.loginClient(conf), connectionAttributes);
75+
try (final Connection conn = ConnectionFactory.createConnection(getConf());
8676
final BufferedMutator mutator = conn.getBufferedMutator(params)) {
8777

8878
/** worker pool that operates on BufferedTable instances */

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionAttributes.java

Lines changed: 0 additions & 123 deletions
This file was deleted.

0 commit comments

Comments
 (0)