-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-4119. Improve performance of the BufferPool management of Ozone client #1336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
16347b7
61a8e6a
2e32191
962cfd5
d4456c0
1c4f272
65a542f
256db2a
118d8ce
514f711
dd99deb
a2d8fc5
ad9c07c
9ab01a7
40721a1
8969b42
7bf5b29
bc5b38b
0bce14d
c4144cc
23ba2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,46 +17,44 @@ | |
| */ | ||
|
|
||
| package org.apache.hadoop.hdds.scm.storage; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Preconditions; | ||
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| import org.apache.hadoop.hdds.client.BlockID; | ||
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; | ||
| import org.apache.hadoop.hdds.scm.XceiverClientManager; | ||
| import org.apache.hadoop.hdds.scm.XceiverClientReply; | ||
| import org.apache.hadoop.hdds.scm.XceiverClientSpi; | ||
| import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; | ||
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; | ||
| import org.apache.hadoop.ozone.common.Checksum; | ||
| import org.apache.hadoop.ozone.common.ChecksumData; | ||
| import org.apache.hadoop.ozone.common.ChunkBuffer; | ||
| import org.apache.hadoop.ozone.common.OzoneChecksumException; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Preconditions; | ||
| import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; | ||
| import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; | ||
| import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; | ||
| import org.apache.hadoop.hdds.scm.XceiverClientManager; | ||
| import org.apache.hadoop.hdds.scm.XceiverClientSpi; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; | ||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; | ||
| import org.apache.hadoop.hdds.client.BlockID; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
| import java.util.List; | ||
| import java.util.ArrayList; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls | ||
| .putBlockAsync; | ||
| import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls | ||
| .writeChunkAsync; | ||
|
|
||
| /** | ||
| * An {@link OutputStream} used by the REST service in combination with the | ||
| * SCMClient to write the value of a key to a sequence | ||
|
|
@@ -120,6 +118,13 @@ public class BlockOutputStream extends OutputStream { | |
| private final List<DatanodeDetails> failedServers; | ||
| private final Checksum checksum; | ||
|
|
||
| //number of buffers used before doing a flush/putBlock. | ||
| private int flushPeriod; | ||
| //bytes remaining to write in the current buffer. | ||
| private int currentBufferRemaining; | ||
| //current buffer allocated to write | ||
| private ChunkBuffer currentBuffer; | ||
|
|
||
| /** | ||
| * Creates a new BlockOutputStream. | ||
| * | ||
|
|
@@ -154,6 +159,14 @@ public BlockOutputStream(BlockID blockID, | |
| this.bufferPool = bufferPool; | ||
| this.bytesPerChecksum = bytesPerChecksum; | ||
|
|
||
| //number of buffers used before doing a flush | ||
| refreshCurrentBuffer(bufferPool); | ||
| flushPeriod = (int) (streamBufferFlushSize / streamBufferSize); | ||
|
|
||
| Preconditions | ||
| .checkArgument( | ||
| (long) flushPeriod * streamBufferSize == streamBufferFlushSize); | ||
|
|
||
| // A single thread executor handle the responses of async requests | ||
| responseExecutor = Executors.newSingleThreadExecutor(); | ||
| commitWatcher = new CommitWatcher(bufferPool, xceiverClient); | ||
|
|
@@ -165,6 +178,11 @@ public BlockOutputStream(BlockID blockID, | |
| checksum = new Checksum(checksumType, bytesPerChecksum); | ||
| } | ||
|
|
||
| private void refreshCurrentBuffer(BufferPool pool) { | ||
| currentBuffer = pool.getCurrentBuffer(); | ||
| currentBufferRemaining = | ||
| currentBuffer != null ? currentBuffer.remaining() : 0; | ||
| } | ||
|
|
||
| public BlockID getBlockID() { | ||
| return blockID.get(); | ||
|
|
@@ -209,9 +227,18 @@ public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() { | |
| @Override | ||
| public void write(int b) throws IOException { | ||
| checkOpen(); | ||
| byte[] buf = new byte[1]; | ||
| buf[0] = (byte) b; | ||
| write(buf, 0, 1); | ||
| allocateNewBufferIfNeeded(); | ||
| currentBuffer.put((byte) b); | ||
| currentBufferRemaining--; | ||
| writeChunkIfNeeded(); | ||
| writtenDataLength++; | ||
| doFlushOrWatchIfNeeded(); | ||
| } | ||
|
|
||
| private void writeChunkIfNeeded() throws IOException { | ||
| if (currentBufferRemaining == 0) { | ||
| writeChunk(currentBuffer); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -229,32 +256,36 @@ public void write(byte[] b, int off, int len) throws IOException { | |
| } | ||
|
|
||
| while (len > 0) { | ||
| // Allocate a buffer if needed. The buffer will be allocated only | ||
| // once as needed and will be reused again for multiple blockOutputStream | ||
| // entries. | ||
| final ChunkBuffer currentBuffer = bufferPool.allocateBufferIfNeeded( | ||
| bytesPerChecksum); | ||
| final int writeLen = Math.min(currentBuffer.remaining(), len); | ||
| allocateNewBufferIfNeeded(); | ||
| final int writeLen = Math.min(currentBufferRemaining, len); | ||
| currentBuffer.put(b, off, writeLen); | ||
| if (!currentBuffer.hasRemaining()) { | ||
| writeChunk(currentBuffer); | ||
| } | ||
| currentBufferRemaining -= writeLen; | ||
| writeChunkIfNeeded(); | ||
| off += writeLen; | ||
| len -= writeLen; | ||
| writtenDataLength += writeLen; | ||
| if (shouldFlush()) { | ||
| doFlushOrWatchIfNeeded(); | ||
| } | ||
| } | ||
|
|
||
| private void doFlushOrWatchIfNeeded() throws IOException { | ||
| if (currentBufferRemaining == 0) { | ||
| if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { | ||
| updateFlushLength(); | ||
| executePutBlock(false, false); | ||
| } | ||
| // Data in the bufferPool can not exceed streamBufferMaxSize | ||
| if (isBufferPoolFull()) { | ||
| if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) { | ||
| handleFullBuffer(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private boolean shouldFlush() { | ||
| return bufferPool.computeBufferData() % streamBufferFlushSize == 0; | ||
| private void allocateNewBufferIfNeeded() { | ||
| if (currentBufferRemaining == 0) { | ||
| currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum); | ||
| currentBufferRemaining = currentBuffer.remaining(); | ||
| } | ||
| } | ||
|
|
||
| private void updateFlushLength() { | ||
|
|
@@ -264,6 +295,7 @@ private void updateFlushLength() { | |
| private boolean isBufferPoolFull() { | ||
| return bufferPool.computeBufferData() == streamBufferMaxSize; | ||
| } | ||
|
|
||
| /** | ||
| * Will be called on the retryPath in case closedContainerException/ | ||
| * TimeoutException. | ||
|
|
@@ -282,11 +314,9 @@ public void writeOnRetry(long len) throws IOException { | |
| Preconditions.checkArgument(len <= streamBufferMaxSize); | ||
| int count = 0; | ||
| while (len > 0) { | ||
| ChunkBuffer buffer = bufferPool.getBuffer(count); | ||
| long writeLen = Math.min(buffer.position(), len); | ||
| if (!buffer.hasRemaining()) { | ||
| writeChunk(buffer); | ||
| } | ||
| refreshCurrentBuffer(bufferPool); | ||
| long writeLen = Math.min(currentBuffer.position(), len); | ||
| writeChunkIfNeeded(); | ||
| len -= writeLen; | ||
| count++; | ||
| writtenDataLength += writeLen; | ||
|
|
@@ -334,6 +364,7 @@ private void handleFullBuffer() throws IOException { | |
| // only contain data which have not been sufficiently replicated | ||
| private void adjustBuffersOnException() { | ||
| commitWatcher.releaseBuffersOnException(); | ||
| refreshCurrentBuffer(bufferPool); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -363,6 +394,8 @@ private void watchForCommit(boolean bufferFull) throws IOException { | |
| setIoException(ioe); | ||
| throw getIoException(); | ||
| } | ||
| refreshCurrentBuffer(bufferPool); | ||
|
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -481,11 +514,9 @@ private void handleFlush(boolean close) | |
| checkOpen(); | ||
| // flush the last chunk data residing on the currentBuffer | ||
| if (totalDataFlushedLength < writtenDataLength) { | ||
| final ChunkBuffer currentBuffer = bufferPool.getCurrentBuffer(); | ||
| refreshCurrentBuffer(bufferPool); | ||
| Preconditions.checkArgument(currentBuffer.position() > 0); | ||
| if (currentBuffer.hasRemaining()) { | ||
| writeChunk(currentBuffer); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note the condition is different here, might not be safe to replace with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks the help. I fully reverted these lines in #23ba2d1 and build is green again. |
||
| } | ||
| writeChunkIfNeeded(); | ||
| // This can be a partially filled chunk. Since we are flushing the buffer | ||
| // here, we just limit this buffer to the current position. So that next | ||
| // write will happen in new buffer | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.