Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,7 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
lastChunkBuffer.capacity() - lastChunkBuffer.position();
appendLastChunkBuffer(chunk, 0, remainingBufferSize);
updateBlockDataWithLastChunkBuffer();
checksum.clearChecksumCache(); // New chunk, clear the checksum cache
appendLastChunkBuffer(chunk, remainingBufferSize,
chunk.remaining() - remainingBufferSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to compute and verify checksums for chunks.
*
* This class is not thread safe.
*/
public class Checksum {
public static final Logger LOG = LoggerFactory.getLogger(Checksum.class);

private static Function<ByteBuffer, ByteString> newMessageDigestFunction(
String algorithm) {
final MessageDigest md;
Expand All @@ -63,7 +67,7 @@ public static ByteString int2ByteString(int n) {
private static Function<ByteBuffer, ByteString> newChecksumByteBufferFunction(
Supplier<ChecksumByteBuffer> constructor) {
final ChecksumByteBuffer algorithm = constructor.get();
return data -> {
return data -> {
algorithm.reset();
algorithm.update(data);
return int2ByteString((int)algorithm.getValue());
Expand Down Expand Up @@ -97,6 +101,22 @@ Function<ByteBuffer, ByteString> newChecksumFunction() {

private final ChecksumType checksumType;
private final int bytesPerChecksum;
/**
* TODO: Make sure to clear this cache when a new block chunk is started.
*/
private final ChecksumCache checksumCache;

/**
* BlockOutputStream needs to call this method to clear the checksum cache
* whenever a block chunk has been established.
*/
public boolean clearChecksumCache() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return value not used.

if (checksumCache != null) {
checksumCache.clear();
return true;
}
return false;
}

/**
* Constructs a Checksum object.
Expand All @@ -106,6 +126,24 @@ Function<ByteBuffer, ByteString> newChecksumFunction() {
public Checksum(ChecksumType type, int bytesPerChecksum) {
this.checksumType = type;
this.bytesPerChecksum = bytesPerChecksum;
this.checksumCache = null;
}

/**
* Constructs a Checksum object.
* @param type type of Checksum
* @param bytesPerChecksum number of bytes of data per checksum
* @param useChecksumCache true to enable checksum cache
*/
public Checksum(ChecksumType type, int bytesPerChecksum, boolean useChecksumCache) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used in the test code only.

this.checksumType = type;
this.bytesPerChecksum = bytesPerChecksum;
LOG.debug("useChecksumCache = {}", useChecksumCache);
if (useChecksumCache) {
this.checksumCache = new ChecksumCache(bytesPerChecksum);
} else {
this.checksumCache = null;
}
}

/**
Expand Down Expand Up @@ -168,12 +206,20 @@ public ChecksumData computeChecksum(ChunkBuffer data)
throw new OzoneChecksumException(checksumType);
}

// Checksum is computed for each bytesPerChecksum number of bytes of data
// starting at offset 0. The last checksum might be computed for the
// remaining data with length less than bytesPerChecksum.
final List<ByteString> checksumList = new ArrayList<>();
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
checksumList.add(computeChecksum(b, function, bytesPerChecksum));
final List<ByteString> checksumList;
if (checksumCache == null) {
// When checksumCache is not enabled:
// Checksum is computed for each bytesPerChecksum number of bytes of data
// starting at offset 0. The last checksum might be computed for the
// remaining data with length less than bytesPerChecksum.
checksumList = new ArrayList<>();
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
checksumList.add(computeChecksum(b, function, bytesPerChecksum)); // merge this?
}
} else {
// When checksumCache is enabled:
// We only need to update the last checksum in the cache, then pass it along.
checksumList = checksumCache.computeChecksum(data, function);
}
return new ChecksumData(checksumType, bytesPerChecksum, checksumList);
}
Expand All @@ -185,7 +231,7 @@ public ChecksumData computeChecksum(ChunkBuffer data)
* @param maxLength the max length of data
* @return computed checksum ByteString
*/
private static ByteString computeChecksum(ByteBuffer data,
protected static ByteString computeChecksum(ByteBuffer data,
Function<ByteBuffer, ByteString> function, int maxLength) {
final int limit = data.limit();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.common;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

/**
* Cache previous checksums to avoid recomputing them.
* This is a stop-gap solution to reduce checksum calc overhead inside critical section
* without having to do a major refactoring/overhaul over protobuf and interfaces.
* This is only supposed to be used by BlockOutputStream, for now.
* <p>
* Each BlockOutputStream has its own Checksum instance.
* Each block chunk (4 MB default) is divided into 16 KB (default) each for checksum calculation.
* For CRC32/CRC32C, each checksum takes 4 bytes. Thus each block chunk has 4 MB / 16 KB * 4 B = 1 KB of checksum data.
*/
public class ChecksumCache {
public static final Logger LOG = LoggerFactory.getLogger(ChecksumCache.class);

private final int bytesPerChecksum;
private final List<ByteString> checksums;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a plain simple byte string in the future. I don't know why BlockData data structure is designed so complex.

// Chunk length last time the checksum is computed
private int prevChunkLength;
// This only serves as a hint for array list initial allocation. The array list will still grow as needed.
private static final int BLOCK_CHUNK_SIZE = 4 * 1024 * 1024; // 4 MB

public ChecksumCache(int bytesPerChecksum) {
this.prevChunkLength = 0;
this.bytesPerChecksum = bytesPerChecksum;
// Set initialCapacity to avoid costly resizes
this.checksums = new ArrayList<>(BLOCK_CHUNK_SIZE / bytesPerChecksum);
}

/**
* Clear cached checksums. And reset the written index.
*/
public void clear() {
prevChunkLength = 0;
checksums.clear();
}

public List<ByteString> getChecksums() {
return checksums;
}

public List<ByteString> computeChecksum(ChunkBuffer data, Function<ByteBuffer, ByteString> function) {
// Indicates how much data the current chunk buffer holds
final int currChunkLength = data.limit();

// Sanity check
if (currChunkLength <= prevChunkLength) {
// If currChunkLength <= lastChunkLength, it indicates a bug that needs to be addressed.
// It means BOS has not properly clear()ed the cache when a new chunk is started in that code path.
throw new IllegalArgumentException("ChunkBuffer data limit must be larger than the last time");
}

// One or more checksums need to be computed

// Start of the checksum index that need to be (re)computed
final int ciStart = prevChunkLength / bytesPerChecksum;
final int ciEnd = currChunkLength / bytesPerChecksum;
int i = 0;
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
if (i < ciStart) {
i++;
continue;
}

// i can either point to:
// 1. the last element in the list -- in which case the checksum needs to be updated
// 2. one after the last element -- in which case a new checksum needs to be added
assert i == checksums.size() - 1 || i == checksums.size();

// TODO: Furthermore for CRC32/CRC32C, it can be even more efficient by updating the last checksum byte-by-byte.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checksum functions don't allow updating checksum, so this is going to require a refactoring.

final ByteString checksum = Checksum.computeChecksum(b, function, bytesPerChecksum);
if (i == checksums.size()) {
checksums.add(checksum);
} else {
checksums.set(i, checksum);
}

i++;
}

// Sanity check
if (i - 1 != ciEnd) {
throw new IllegalStateException("Checksum index end does not match expectation");
}

// Update last written index
prevChunkLength = currChunkLength;
return checksums;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private Checksum getChecksum(ContainerProtos.ChecksumType type) {
if (type == null) {
type = CHECKSUM_TYPE_DEFAULT;
}
return new Checksum(type, BYTES_PER_CHECKSUM);
return new Checksum(type, BYTES_PER_CHECKSUM, true);
}

/**
Expand Down