Skip to content

Commit c990899

Browse files
authored
HDDS-9913. Reduce number of times configuration is loaded in Ozone client (#5789)
1 parent c926ec9 commit c990899

16 files changed

Lines changed: 269 additions & 58 deletions

File tree

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdds.scm;
1919

20+
import com.google.common.annotations.VisibleForTesting;
2021
import org.apache.hadoop.hdds.conf.Config;
2122
import org.apache.hadoop.hdds.conf.ConfigGroup;
2223
import org.apache.hadoop.hdds.conf.ConfigTag;
@@ -254,6 +255,7 @@ public long getStreamBufferFlushSize() {
254255
return streamBufferFlushSize;
255256
}
256257

258+
@VisibleForTesting
257259
public void setStreamBufferFlushSize(long streamBufferFlushSize) {
258260
this.streamBufferFlushSize = streamBufferFlushSize;
259261
}
@@ -262,6 +264,7 @@ public int getStreamBufferSize() {
262264
return streamBufferSize;
263265
}
264266

267+
@VisibleForTesting
265268
public void setStreamBufferSize(int streamBufferSize) {
266269
this.streamBufferSize = streamBufferSize;
267270
}
@@ -270,6 +273,7 @@ public boolean isStreamBufferFlushDelay() {
270273
return streamBufferFlushDelay;
271274
}
272275

276+
@VisibleForTesting
273277
public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
274278
this.streamBufferFlushDelay = streamBufferFlushDelay;
275279
}
@@ -278,6 +282,7 @@ public long getStreamBufferMaxSize() {
278282
return streamBufferMaxSize;
279283
}
280284

285+
@VisibleForTesting
281286
public void setStreamBufferMaxSize(long streamBufferMaxSize) {
282287
this.streamBufferMaxSize = streamBufferMaxSize;
283288
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdds.scm;
20+
21+
import org.apache.hadoop.hdds.client.ECReplicationConfig;
22+
import org.apache.hadoop.hdds.client.ReplicationConfig;
23+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
24+
25+
/**
26+
* This class encapsulates the arguments that are
27+
* required for Ozone client StreamBuffer.
28+
*/
29+
public class StreamBufferArgs {
30+
31+
private int streamBufferSize;
32+
private long streamBufferFlushSize;
33+
private long streamBufferMaxSize;
34+
private boolean streamBufferFlushDelay;
35+
36+
protected StreamBufferArgs(Builder builder) {
37+
this.streamBufferSize = builder.bufferSize;
38+
this.streamBufferFlushSize = builder.bufferFlushSize;
39+
this.streamBufferMaxSize = builder.bufferMaxSize;
40+
this.streamBufferFlushDelay = builder.streamBufferFlushDelay;
41+
}
42+
43+
public int getStreamBufferSize() {
44+
return streamBufferSize;
45+
}
46+
47+
public long getStreamBufferFlushSize() {
48+
return streamBufferFlushSize;
49+
}
50+
51+
public long getStreamBufferMaxSize() {
52+
return streamBufferMaxSize;
53+
}
54+
55+
public boolean isStreamBufferFlushDelay() {
56+
return streamBufferFlushDelay;
57+
}
58+
59+
public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
60+
this.streamBufferFlushDelay = streamBufferFlushDelay;
61+
}
62+
63+
protected void setStreamBufferSize(int streamBufferSize) {
64+
this.streamBufferSize = streamBufferSize;
65+
}
66+
67+
protected void setStreamBufferFlushSize(long streamBufferFlushSize) {
68+
this.streamBufferFlushSize = streamBufferFlushSize;
69+
}
70+
71+
protected void setStreamBufferMaxSize(long streamBufferMaxSize) {
72+
this.streamBufferMaxSize = streamBufferMaxSize;
73+
}
74+
75+
/**
76+
* Builder class for StreamBufferArgs.
77+
*/
78+
public static class Builder {
79+
private int bufferSize;
80+
private long bufferFlushSize;
81+
private long bufferMaxSize;
82+
private boolean streamBufferFlushDelay;
83+
84+
public Builder setBufferSize(int bufferSize) {
85+
this.bufferSize = bufferSize;
86+
return this;
87+
}
88+
89+
public Builder setBufferFlushSize(long bufferFlushSize) {
90+
this.bufferFlushSize = bufferFlushSize;
91+
return this;
92+
}
93+
94+
public Builder setBufferMaxSize(long bufferMaxSize) {
95+
this.bufferMaxSize = bufferMaxSize;
96+
return this;
97+
}
98+
99+
public Builder setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
100+
this.streamBufferFlushDelay = streamBufferFlushDelay;
101+
return this;
102+
}
103+
104+
public StreamBufferArgs build() {
105+
return new StreamBufferArgs(this);
106+
}
107+
108+
public static Builder getNewBuilder() {
109+
return new Builder();
110+
}
111+
}
112+
113+
public static StreamBufferArgs getDefaultStreamBufferArgs(
114+
ReplicationConfig replicationConfig, OzoneClientConfig clientConfig) {
115+
int bufferSize;
116+
long flushSize;
117+
long bufferMaxSize;
118+
boolean streamBufferFlushDelay = clientConfig.isStreamBufferFlushDelay();
119+
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) {
120+
bufferSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
121+
flushSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
122+
bufferMaxSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
123+
} else {
124+
bufferSize = clientConfig.getStreamBufferSize();
125+
flushSize = clientConfig.getStreamBufferFlushSize();
126+
bufferMaxSize = clientConfig.getStreamBufferMaxSize();
127+
}
128+
129+
return Builder.getNewBuilder()
130+
.setBufferSize(bufferSize)
131+
.setBufferFlushSize(flushSize)
132+
.setBufferMaxSize(bufferMaxSize)
133+
.setStreamBufferFlushDelay(streamBufferFlushDelay)
134+
.build();
135+
}
136+
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
3939
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
4040
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
41+
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
4142
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
4243
import org.apache.hadoop.hdds.scm.XceiverClientReply;
4344
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -88,6 +89,7 @@ public class BlockOutputStream extends OutputStream {
8889
private XceiverClientFactory xceiverClientFactory;
8990
private XceiverClientSpi xceiverClient;
9091
private OzoneClientConfig config;
92+
private StreamBufferArgs streamBufferArgs;
9193

9294
private int chunkIndex;
9395
private final AtomicLong chunkOffset = new AtomicLong();
@@ -134,14 +136,15 @@ public class BlockOutputStream extends OutputStream {
134136
* @param pipeline pipeline where block will be written
135137
* @param bufferPool pool of buffers
136138
*/
139+
@SuppressWarnings("checkstyle:ParameterNumber")
137140
public BlockOutputStream(
138141
BlockID blockID,
139142
XceiverClientFactory xceiverClientManager,
140143
Pipeline pipeline,
141144
BufferPool bufferPool,
142145
OzoneClientConfig config,
143146
Token<? extends TokenIdentifier> token,
144-
ContainerClientMetrics clientMetrics
147+
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
145148
) throws IOException {
146149
this.xceiverClientFactory = xceiverClientManager;
147150
this.config = config;
@@ -166,12 +169,12 @@ public BlockOutputStream(
166169

167170
//number of buffers used before doing a flush
168171
refreshCurrentBuffer();
169-
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
172+
flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs
170173
.getStreamBufferSize());
171174

172175
Preconditions
173176
.checkArgument(
174-
(long) flushPeriod * config.getStreamBufferSize() == config
177+
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
175178
.getStreamBufferFlushSize());
176179

177180
// A single thread executor handle the responses of async requests
@@ -185,6 +188,7 @@ public BlockOutputStream(
185188
config.getBytesPerChecksum());
186189
this.clientMetrics = clientMetrics;
187190
this.pipeline = pipeline;
191+
this.streamBufferArgs = streamBufferArgs;
188192
}
189193

190194
void refreshCurrentBuffer() {
@@ -321,7 +325,7 @@ private void updateFlushLength() {
321325
}
322326

323327
private boolean isBufferPoolFull() {
324-
return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
328+
return bufferPool.computeBufferData() == streamBufferArgs.getStreamBufferMaxSize();
325329
}
326330

327331
/**
@@ -339,7 +343,7 @@ public void writeOnRetry(long len) throws IOException {
339343
if (LOG.isDebugEnabled()) {
340344
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
341345
}
342-
Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
346+
Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize());
343347
int count = 0;
344348
while (len > 0) {
345349
ChunkBuffer buffer = bufferPool.getBuffer(count);
@@ -355,13 +359,13 @@ public void writeOnRetry(long len) throws IOException {
355359
// the buffer. We should just validate
356360
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
357361
// call for handling full buffer/flush buffer condition.
358-
if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
362+
if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) {
359363
// reset the position to zero as now we will be reading the
360364
// next buffer in the list
361365
updateFlushLength();
362366
executePutBlock(false, false);
363367
}
364-
if (writtenDataLength == config.getStreamBufferMaxSize()) {
368+
if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
365369
handleFullBuffer();
366370
}
367371
}
@@ -518,9 +522,9 @@ void putFlushFuture(long flushPos,
518522
public void flush() throws IOException {
519523
if (xceiverClientFactory != null && xceiverClient != null
520524
&& bufferPool != null && bufferPool.getSize() > 0
521-
&& (!config.isStreamBufferFlushDelay() ||
525+
&& (!streamBufferArgs.isStreamBufferFlushDelay() ||
522526
writtenDataLength - totalDataFlushedLength
523-
>= config.getStreamBufferSize())) {
527+
>= streamBufferArgs.getStreamBufferSize())) {
524528
handleFlush(false);
525529
}
526530
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
2626
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
2727
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
28+
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
2829
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
2930
import org.apache.hadoop.hdds.scm.XceiverClientReply;
3031
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -66,17 +67,18 @@ public class ECBlockOutputStream extends BlockOutputStream {
6667
* @param pipeline pipeline where block will be written
6768
* @param bufferPool pool of buffers
6869
*/
70+
@SuppressWarnings("checkstyle:ParameterNumber")
6971
public ECBlockOutputStream(
7072
BlockID blockID,
7173
XceiverClientFactory xceiverClientManager,
7274
Pipeline pipeline,
7375
BufferPool bufferPool,
7476
OzoneClientConfig config,
7577
Token<? extends TokenIdentifier> token,
76-
ContainerClientMetrics clientMetrics
78+
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
7779
) throws IOException {
7880
super(blockID, xceiverClientManager,
79-
pipeline, bufferPool, config, token, clientMetrics);
81+
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
8082
// In EC stream, there will be only one node in pipeline.
8183
this.datanodeDetails = pipeline.getClosestNode();
8284
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
2424
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
2525
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
26+
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
2627
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
2728
import org.apache.hadoop.hdds.scm.XceiverClientReply;
2829
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -71,17 +72,18 @@ public class RatisBlockOutputStream extends BlockOutputStream
7172
* @param blockID block ID
7273
* @param bufferPool pool of buffers
7374
*/
75+
@SuppressWarnings("checkstyle:ParameterNumber")
7476
public RatisBlockOutputStream(
7577
BlockID blockID,
7678
XceiverClientFactory xceiverClientManager,
7779
Pipeline pipeline,
7880
BufferPool bufferPool,
7981
OzoneClientConfig config,
8082
Token<? extends TokenIdentifier> token,
81-
ContainerClientMetrics clientMetrics
83+
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
8284
) throws IOException {
8385
super(blockID, xceiverClientManager, pipeline,
84-
bufferPool, config, token, clientMetrics);
86+
bufferPool, config, token, clientMetrics, streamBufferArgs);
8587
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
8688
}
8789

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
3737
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
3838
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
39+
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
3940
import org.apache.hadoop.hdds.scm.XceiverClientManager;
4041
import org.apache.hadoop.hdds.scm.XceiverClientReply;
4142
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -103,6 +104,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
103104
config.setStreamBufferFlushSize(16 * 1024 * 1024);
104105
config.setChecksumType(ChecksumType.NONE);
105106
config.setBytesPerChecksum(256 * 1024);
107+
StreamBufferArgs streamBufferArgs =
108+
StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), config);
106109

107110
return new RatisBlockOutputStream(
108111
new BlockID(1L, 1L),
@@ -111,7 +114,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
111114
bufferPool,
112115
config,
113116
null,
114-
ContainerClientMetrics.acquire());
117+
ContainerClientMetrics.acquire(), streamBufferArgs);
115118
}
116119

117120
/**

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2929
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
3030
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
31+
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
3132
import org.apache.hadoop.hdds.scm.container.ContainerID;
3233
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
3334
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -223,13 +224,15 @@ private ECBlockOutputStream getECBlockOutputStream(
223224
BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
224225
ECReplicationConfig repConfig, int replicaIndex,
225226
OzoneClientConfig configuration) throws IOException {
227+
StreamBufferArgs streamBufferArgs =
228+
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, configuration);
226229
return new ECBlockOutputStream(
227230
blockLocationInfo.getBlockID(),
228231
containerOperationClient.getXceiverClientManager(),
229232
containerOperationClient.singleNodePipeline(datanodeDetails,
230233
repConfig, replicaIndex),
231234
BufferPool.empty(), configuration,
232-
blockLocationInfo.getToken(), clientMetrics);
235+
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
233236
}
234237

235238
@VisibleForTesting

0 commit comments

Comments
 (0)