Skip to content

Commit 1281234

Browse files
committed
HDDS-10384. RPC client reusing thread resources. (#6326)
(cherry picked from commit 2f05353)
1 parent 2e8a8b9 commit 1281234

9 files changed

Lines changed: 41 additions & 31 deletions

File tree

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
7373
return commitIndexMap;
7474
}
7575

76-
void updateCommitInfoMap(long index, List<BUFFER> buffers) {
76+
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
7777
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
7878
.addAll(buffers);
7979
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.CompletionException;
2626
import java.util.concurrent.ExecutionException;
2727
import java.util.concurrent.ExecutorService;
28-
import java.util.concurrent.Executors;
2928
import java.util.concurrent.atomic.AtomicLong;
3029
import java.util.concurrent.atomic.AtomicReference;
3130
import java.util.function.Supplier;
@@ -179,8 +178,7 @@ public BlockOutputStream(
179178
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
180179
.getStreamBufferFlushSize());
181180

182-
// A single thread executor handle the responses of async requests
183-
responseExecutor = Executors.newSingleThreadExecutor();
181+
this.responseExecutor = blockOutputStreamResourceProvider.get();
184182
bufferList = null;
185183
totalDataFlushedLength = 0;
186184
writtenDataLength = 0;
@@ -662,7 +660,6 @@ public void cleanup(boolean invalidateClient) {
662660
bufferList.clear();
663661
}
664662
bufferList = null;
665-
responseExecutor.shutdown();
666663
}
667664

668665
/**

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
*/
2525
package org.apache.hadoop.hdds.scm.storage;
2626

27+
import com.google.common.annotations.VisibleForTesting;
2728
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
2829
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
2930
import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -32,6 +33,7 @@
3233
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.ConcurrentHashMap;
3435
import java.util.concurrent.ConcurrentMap;
36+
import java.util.concurrent.ExecutionException;
3537

3638
/**
3739
* This class executes watchForCommit on ratis pipeline and releases
@@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
4244
private final BufferPool bufferPool;
4345

4446
// future Map to hold up all putBlock futures
45-
private final ConcurrentMap<Long, CompletableFuture<
46-
ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
47+
private final ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>>
48+
futureMap = new ConcurrentHashMap<>();
4749

4850
CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
4951
super(xceiverClient);
@@ -67,11 +69,24 @@ void releaseBuffers(long index) {
6769
+ totalLength + ": existing = " + futureMap.keySet());
6870
}
6971

70-
ConcurrentMap<Long, CompletableFuture<
71-
ContainerCommandResponseProto>> getFutureMap() {
72+
@VisibleForTesting
73+
ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>> getFutureMap() {
7274
return futureMap;
7375
}
7476

77+
public void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
78+
futureMap.compute(flushPos,
79+
(key, previous) -> previous == null ? flushFuture :
80+
previous.thenCombine(flushFuture, (prev, curr) -> curr));
81+
}
82+
83+
84+
public void waitOnFlushFutures() throws InterruptedException, ExecutionException {
85+
// wait for all the transactions to complete
86+
CompletableFuture.allOf(futureMap.values().toArray(
87+
new CompletableFuture[0])).get();
88+
}
89+
7590
@Override
7691
public void cleanup() {
7792
super.cleanup();

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,13 @@ void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
117117
}
118118

119119
@Override
120-
void putFlushFuture(long flushPos,
121-
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
122-
commitWatcher.getFutureMap().put(flushPos, flushFuture);
120+
void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
121+
commitWatcher.putFlushFuture(flushPos, flushFuture);
123122
}
124123

125124
@Override
126125
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
127-
// wait for all the transactions to complete
128-
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
129-
new CompletableFuture[0])).get();
126+
commitWatcher.waitOnFlushFutures();
130127
}
131128

132129
@Override

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable {
101101

102102
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
103103

104-
// TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used.
105-
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
104+
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;
106105

107106
private final ECContainerOperationClient containerOperationClient;
108107

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@
4343
import java.util.concurrent.ArrayBlockingQueue;
4444
import java.util.concurrent.BlockingQueue;
4545
import java.util.concurrent.ExecutionException;
46-
import java.util.concurrent.ExecutorService;
47-
import java.util.concurrent.Executors;
4846
import java.util.concurrent.Future;
4947
import java.util.concurrent.TimeUnit;
5048
import java.util.concurrent.atomic.AtomicLong;
@@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
6664
private final int numParityBlks;
6765
private final ByteBufferPool bufferPool;
6866
private final RawErasureEncoder encoder;
69-
private final ExecutorService flushExecutor;
7067
private final Future<Boolean> flushFuture;
7168
private final AtomicLong flushCheckpoint;
7269

@@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) {
119116
this.writeOffset = 0;
120117
this.encoder = CodecUtil.createRawEncoderWithFallback(
121118
builder.getReplicationConfig());
122-
this.flushExecutor = Executors.newSingleThreadExecutor();
123119
S3Auth s3Auth = builder.getS3CredentialsProvider().get();
124120
ThreadLocal<S3Auth> s3CredentialsProvider =
125121
builder.getS3CredentialsProvider();
126-
flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
127-
this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
122+
this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> {
123+
s3CredentialsProvider.set(s3Auth);
124+
return flushStripeFromQueue();
125+
});
128126
this.flushCheckpoint = new AtomicLong(0);
129127
this.atomicKeyCreation = builder.getAtomicKeyCreation();
130128
}
@@ -495,7 +493,6 @@ public void close() throws IOException {
495493
} catch (InterruptedException e) {
496494
throw new IOException("Flushing thread was interrupted", e);
497495
} finally {
498-
flushExecutor.shutdownNow();
499496
closeCurrentStreamEntry();
500497
blockOutputStreamEntryPool.cleanup();
501498
}

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,7 @@ public class RpcClient implements ClientProtocol {
197197
// for reconstruction.
198198
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
199199

200-
// TODO: Adjusts to the appropriate value when the writeThreadPool is used.
201-
private static final int WRITE_POOL_MIN_SIZE = 0;
200+
private static final int WRITE_POOL_MIN_SIZE = 1;
202201

203202
private final ConfigurationSource conf;
204203
private final OzoneManagerClientProtocol ozoneManagerClient;

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public void testReleaseBuffers() throws Exception {
214214
return v;
215215
});
216216
futures.add(future);
217-
watcher.getFutureMap().put(length, future);
217+
watcher.putFlushFuture(length, future);
218218
replies.add(reply);
219219
}
220220

@@ -288,7 +288,7 @@ public void testReleaseBuffersOnException() throws Exception {
288288
return v;
289289
});
290290
futures.add(future);
291-
watcher.getFutureMap().put(length, future);
291+
watcher.putFlushFuture(length, future);
292292
replies.add(reply);
293293
}
294294

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,14 @@ static void shutdown() throws IOException {
187187
}
188188
}
189189

190+
static void reInitClient() throws IOException {
191+
ozClient = OzoneClientFactory.getRpcClient(conf);
192+
store = ozClient.getObjectStore();
193+
TestOzoneRpcClient.setOzClient(ozClient);
194+
TestOzoneRpcClient.setStore(store);
195+
}
196+
197+
190198
@ParameterizedTest
191199
@EnumSource
192200
void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception {
@@ -704,9 +712,7 @@ void testGetKeyProvider() throws Exception {
704712

705713
KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider();
706714
assertNotEquals(kp3, kpSpy);
707-
// Restore ozClient and store
708-
TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
709-
TestOzoneRpcClient.setStore(ozClient.getObjectStore());
715+
reInitClient();
710716
}
711717

712718
private static RepeatedOmKeyInfo getMatchedKeyInfo(

0 commit comments

Comments
 (0)