Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,10 @@ default ChunkBuffer put(ByteString b) {
List<ByteBuffer> asByteBufferList();

/**
* Write the contents of the buffer from the current position to the limit
* Write all the contents of the buffer from the current position to the limit
* to {@code channel}.
*
* @return The number of bytes written, possibly zero
*/
long writeTo(GatheringByteChannel channel) throws IOException;
void writeFully(GatheringByteChannel channel) throws IOException;

/**
* Convert this buffer to a {@link ByteString}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Objects;
import java.util.function.Function;

import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

Expand Down Expand Up @@ -101,8 +102,8 @@ public List<ByteBuffer> asByteBufferList() {
}

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
public void writeFully(GatheringByteChannel channel) throws IOException {
BufferUtils.writeFully(channel, buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -212,10 +214,11 @@ public List<ByteBuffer> asByteBufferList() {
}

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
public void writeFully(GatheringByteChannel channel) throws IOException {
for (ByteBuffer buf : buffers) {
BufferUtils.writeFully(channel, buf);
}
findCurrent();
return bytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -276,10 +277,11 @@ public Iterable<ByteBuffer> iterate(int bufferSize) {
public List<ByteBuffer> asByteBufferList() {
return Collections.unmodifiableList(buffers);
}

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffers.toArray(new ByteBuffer[0]));
public void writeFully(GatheringByteChannel channel) throws IOException {
for (ByteBuffer buf : buffers) {
BufferUtils.writeFully(channel, buf);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.hadoop.ozone.common.utils;

import com.google.common.base.Preconditions;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -136,4 +139,16 @@ public static int getNumberOfBins(long numElements, int maxElementsPerBin) {
}
return Math.toIntExact(n);
}

/**
* Write all remaining bytes in buffer to the given channel.
*/
public static void writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOException {
while (bb.remaining() > 0) {
int n = ch.write(bb);
if (n <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the javadoc, n == 0 is a valid case.

BTW, let's have another method to handle array? The GatheringByteChannel works more efficient in that way.

  public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException {
    long written = 0;
    for(int i = 0; i < buffers.length; i++) {
      while (buffers[i].remaining() > 0) {
        final long n = ch.write(buffers, i, buffers.length - i);
        if (n < 0) {
          throw new IllegalStateException("GatheringByteChannel.write returns " + n + " for " + ch);
        }
        written += n;
      }
    }
    return written;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szetszwo for the suggestion. Can we address it in follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let's do it in a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szetszwo. If n == 0 is valid, I guess we should change the condition to n < 0 in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about merging this first? We can change it immediately in the follow-up pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new IllegalStateException("no bytes written");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private static void assertWrite(byte[] expected, ChunkBuffer impl) throws IOExce
assertEquals(0, impl.position());

ByteArrayOutputStream output = new ByteArrayOutputStream(expected.length);
impl.writeTo(new MockGatheringChannel(Channels.newChannel(output)));
impl.writeFully(new MockGatheringChannel(Channels.newChannel(output)));
assertArrayEquals(expected, output.toByteArray());
assertFalse(impl.hasRemaining());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -124,14 +124,13 @@ public static void writeData(FileChannel file, String filename,

private static void writeData(ChunkBuffer data, String filename,
long offset, long len, HddsVolume volume,
ToLongFunction<ChunkBuffer> writer) throws StorageContainerException {
Consumer<ChunkBuffer> writer) throws StorageContainerException {

validateBufferSize(len, data.remaining());

final long startTime = Time.monotonicNow();
final long bytesWritten;
try {
bytesWritten = writer.applyAsLong(data);
writer.accept(data);
} catch (UncheckedIOException e) {
if (!(e.getCause() instanceof InterruptedIOException)) {
onFailure(volume);
Expand All @@ -144,13 +143,11 @@ private static void writeData(ChunkBuffer data, String filename,
if (volume != null) {
volume.getVolumeIOStats().incWriteTime(elapsed);
volume.getVolumeIOStats().incWriteOpCount();
volume.getVolumeIOStats().incWriteBytes(bytesWritten);
volume.getVolumeIOStats().incWriteBytes(len);
}

LOG.debug("Written {} bytes at offset {} to {} in {} ms",
bytesWritten, offset, filename, elapsed);

validateWriteSize(len, bytesWritten);
len, offset, filename, elapsed);
}

private static long writeDataToFile(File file, ChunkBuffer data,
Expand All @@ -163,7 +160,9 @@ private static long writeDataToFile(File file, ChunkBuffer data,
channel = open(path, WRITE_OPTIONS, NO_ATTRIBUTES);

try (FileLock ignored = channel.lock()) {
return writeDataToChannel(channel, data, offset);
int len = data.remaining();
writeDataToChannel(channel, data, offset);
return len;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -177,11 +176,10 @@ private static long writeDataToFile(File file, ChunkBuffer data,
}
}

private static long writeDataToChannel(FileChannel channel, ChunkBuffer data,
long offset) {
private static void writeDataToChannel(FileChannel channel, ChunkBuffer data, long offset) {
try {
channel.position(offset);
return data.writeTo(channel);
data.writeFully(channel);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -433,11 +431,6 @@ private static void validateReadSize(long expected, long actual)
checkSize("read", expected, actual, CONTAINER_INTERNAL_ERROR);
}

private static void validateWriteSize(long expected, long actual)
throws StorageContainerException {
checkSize("write", expected, actual, INVALID_WRITE_SIZE);
}

public static void validateBufferSize(long expected, long actual)
throws StorageContainerException {
checkSize("buffer", expected, actual, INVALID_WRITE_SIZE);
Expand Down