Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions hbase-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this.

<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.compress;

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand All @@ -26,4 +27,7 @@ public interface ByteBuffDecompressionCodec {

ByteBuffDecompressor createByteBuffDecompressor();

Compression.HFileDecompressionContext
getDecompressionContextFromConfiguration(Configuration conf);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.compress;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.nio.ByteBuff;
Expand Down Expand Up @@ -45,4 +46,11 @@ public interface ByteBuffDecompressor extends Closeable {
*/
boolean canDecompress(ByteBuff output, ByteBuff input);

/**
* Call before every use of {@link #canDecompress(ByteBuff, ByteBuff)} and
* {@link #decompress(ByteBuff, ByteBuff, int)} to reinitialize the decompressor with settings
* from the HFileInfo. This can matter because ByteBuffDecompressors are reused many times.
*/
void reinit(@Nullable Compression.HFileDecompressionContext newHFileDecompressionContext);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.io.compress;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Comparator;
import java.util.NavigableSet;
Expand All @@ -37,6 +35,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;

/**
* A global compressor/decompressor pool used to save and reuse (possibly native)
* compression/decompression codecs. Copied from the class of the same name in hadoop-common and
Expand All @@ -56,7 +58,12 @@ public class CodecPool {
NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>();

private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() {
return Caffeine.newBuilder().build(key -> new AtomicInteger());
return CacheBuilder.newBuilder().build(new CacheLoader<>() {
@Override
public AtomicInteger load(Class<T> key) throws Exception {
return new AtomicInteger();
}
});
}

/**
Expand Down Expand Up @@ -108,26 +115,19 @@ private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool
/**
* Copied from hadoop-common without significant modification.
*/
@SuppressWarnings("unchecked")
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
justification = "LoadingCache will compute value if absent")
private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
Class<? extends T> codecClass) {
return usageCounts.get((Class<T>) codecClass).get();
return usageCounts.getUnchecked((Class<T>) codecClass).get();
}

/**
* Copied from hadoop-common without significant modification.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
justification = "LoadingCache will compute value if absent")
private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
T codec, int delta) {
if (codec != null && usageCounts != null) {
Class<T> codecClass = ReflectionUtils.getClass(codec);
usageCounts.get(codecClass).addAndGet(delta);
usageCounts.getUnchecked(codecClass).addAndGet(delta);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
*/
package org.apache.hadoop.hbase.io.compress;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
Expand Down Expand Up @@ -548,11 +551,36 @@ public void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) {
}
}

/**
* Get an object that holds settings used by ByteBuffDecompressor. It's expensive to pull these
* from a Configuration object every time we decompress a block, so pull them here when, for
* example, opening an HFile, and reuse the returned HFileDecompressionContext as much as
* possible. The concrete class of this object will be one that is specific to the codec
* implementation in use. You don't need to inspect it yourself, just pass it along to
* {@link ByteBuffDecompressor#reinit(HFileDecompressionContext)}.
*/
@Nullable
public HFileDecompressionContext
getHFileDecompressionContextForConfiguration(Configuration conf) {
if (supportsByteBuffDecompression()) {
return ((ByteBuffDecompressionCodec) getCodec(conf))
.getDecompressionContextFromConfiguration(conf);
} else {
return null;
}
}

public String getName() {
return compressName;
}
}

/**
* See {@link Algorithm#getHFileDecompressionContextForConfiguration(Configuration)}.
*/
public static abstract class HFileDecompressionContext implements Closeable, HeapSize {
}

public static Algorithm getCompressionAlgorithmByName(String compressName) {
Algorithm[] algos = Algorithm.class.getEnumConstants();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ private DictionaryCache() {
* @param path the hadoop Path where the dictionary is located, as a String
* @return the dictionary bytes if successful, null otherwise
*/
public static byte[] getDictionary(final Configuration conf, final String path)
throws IOException {
public static byte[] getDictionary(final Configuration conf, final String path) {
if (path == null || path.isEmpty()) {
return null;
}

// Create the dictionary loading cache if we haven't already
if (CACHE == null) {
synchronized (DictionaryCache.class) {
Expand Down Expand Up @@ -91,7 +91,7 @@ public byte[] load(String s) throws Exception {
try {
return CACHE.get(path);
} catch (ExecutionException e) {
throw new IOException(e);
throw new RuntimeException("Unable to load dictionary at " + path, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ private void decompressViaByteBuff(ByteBuff blockBufferWithoutHeader, ByteBuff o
Compression.Algorithm compression = fileContext.getCompression();
ByteBuffDecompressor decompressor = compression.getByteBuffDecompressor();
try {
if (decompressor instanceof CanReinit) {
((CanReinit) decompressor).reinit(conf);
}
decompressor.reinit(fileContext.getDecompressionContext());
decompressor.decompress(blockBufferWithoutHeader, onDiskBlock, onDiskSizeWithoutHeader);
} finally {
compression.returnByteBuffDecompressor(decompressor);
Expand All @@ -160,9 +158,7 @@ private boolean canDecompressViaByteBuff(ByteBuff blockBufferWithoutHeader,
} else {
ByteBuffDecompressor decompressor = fileContext.getCompression().getByteBuffDecompressor();
try {
if (decompressor instanceof CanReinit) {
((CanReinit) decompressor).reinit(conf);
}
decompressor.reinit(fileContext.getDecompressionContext());
// Even if we have a ByteBuffDecompressor, we still need to check if it can decompress
// our particular ByteBuffs
return decompressor.canDecompress(blockBufferWithoutHeader, onDiskBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.InnerStoreCellComparator;
Expand Down Expand Up @@ -50,6 +52,11 @@ public class HFileContext implements HeapSize, Cloneable {
private boolean includesTags;
/** Compression algorithm used **/
private Compression.Algorithm compressAlgo = Compression.Algorithm.NONE;
/**
* Details used by compression algorithm that are more efficiently loaded once and then reused
**/
@Nullable
private Compression.HFileDecompressionContext decompressionContext = null;
/** Whether tags to be compressed or not **/
private boolean compressTags;
/** the checksum type **/
Expand Down Expand Up @@ -80,6 +87,7 @@ public HFileContext(HFileContext context) {
this.includesMvcc = context.includesMvcc;
this.includesTags = context.includesTags;
this.compressAlgo = context.compressAlgo;
this.decompressionContext = context.decompressionContext;
this.compressTags = context.compressTags;
this.checksumType = context.checksumType;
this.bytesPerChecksum = context.bytesPerChecksum;
Expand All @@ -95,14 +103,16 @@ public HFileContext(HFileContext context) {
}

HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags,
Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType,
int bytesPerChecksum, int blockSize, DataBlockEncoding encoding,
Encryption.Context cryptoContext, long fileCreateTime, String hfileName, byte[] columnFamily,
byte[] tableName, CellComparator cellComparator, IndexBlockEncoding indexBlockEncoding) {
Compression.Algorithm compressAlgo, Compression.HFileDecompressionContext decompressionContext,
boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize,
DataBlockEncoding encoding, Encryption.Context cryptoContext, long fileCreateTime,
String hfileName, byte[] columnFamily, byte[] tableName, CellComparator cellComparator,
IndexBlockEncoding indexBlockEncoding) {
this.usesHBaseChecksum = useHBaseChecksum;
this.includesMvcc = includesMvcc;
this.includesTags = includesTags;
this.compressAlgo = compressAlgo;
this.decompressionContext = decompressionContext;
this.compressTags = compressTags;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
Expand Down Expand Up @@ -141,6 +151,20 @@ public Compression.Algorithm getCompression() {
return compressAlgo;
}

/**
* Get an object that, if non-null, may be cast into a codec-specific type that exposes some
* information from the store-file-specific Configuration that is relevant to decompression. For
* example, ZSTD tables can have "hbase.io.compress.zstd.dictionary" on their table descriptor,
* and decompressions of blocks in that table must use that dictionary. It's cheaper for HBase to
* load these settings into an object of their own once and check this upon each block
* decompression, than it is to call into {@link Configuration#get(String)} on each block
* decompression.
*/
@Nullable
public Compression.HFileDecompressionContext getDecompressionContext() {
return decompressionContext;
}

public boolean isUseHBaseChecksum() {
return usesHBaseChecksum;
}
Expand Down Expand Up @@ -238,6 +262,9 @@ public long heapSize() {
if (this.tableName != null) {
size += ClassSize.sizeOfByteArray(this.tableName.length);
}
if (this.decompressionContext != null) {
size += this.decompressionContext.heapSize();
}
return size;
}

Expand Down Expand Up @@ -274,6 +301,8 @@ public String toString() {
sb.append(compressAlgo);
sb.append(", compressTags=");
sb.append(compressTags);
sb.append(", decompressionContext=");
sb.append(decompressionContext);
sb.append(", cryptoContext=[");
sb.append(cryptoContext);
sb.append("]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
Expand All @@ -42,6 +44,8 @@ public class HFileContextBuilder {
private boolean includesTags = false;
/** Compression algorithm used **/
private Algorithm compression = Algorithm.NONE;
@Nullable
private Compression.HFileDecompressionContext decompressionContext = null;
/** Whether tags to be compressed or not **/
private boolean compressTags = false;
/** the checksum type **/
Expand Down Expand Up @@ -73,6 +77,7 @@ public HFileContextBuilder(final HFileContext hfc) {
this.includesMvcc = hfc.isIncludesMvcc();
this.includesTags = hfc.isIncludesTags();
this.compression = hfc.getCompression();
this.decompressionContext = hfc.getDecompressionContext();
this.compressTags = hfc.isCompressTags();
this.checkSumType = hfc.getChecksumType();
this.bytesPerChecksum = hfc.getBytesPerChecksum();
Expand Down Expand Up @@ -107,6 +112,12 @@ public HFileContextBuilder withCompression(Algorithm compression) {
return this;
}

public HFileContextBuilder
withDecompressionContext(@Nullable Compression.HFileDecompressionContext decompressionContext) {
this.decompressionContext = decompressionContext;
return this;
}

public HFileContextBuilder withCompressTags(boolean compressTags) {
this.compressTags = compressTags;
return this;
Expand Down Expand Up @@ -169,7 +180,8 @@ public HFileContextBuilder withCellComparator(CellComparator cellComparator) {

public HFileContext build() {
return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression,
compressTags, checkSumType, bytesPerChecksum, blockSize, encoding, cryptoContext,
fileCreateTime, hfileName, columnFamily, tableName, cellComparator, indexBlockEncoding);
decompressionContext, compressTags, checkSumType, bytesPerChecksum, blockSize, encoding,
cryptoContext, fileCreateTime, hfileName, columnFamily, tableName, cellComparator,
indexBlockEncoding);
}
}
Loading