diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 99624527bf27..0aecd4a471ec 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -110,10 +110,6 @@ org.apache.commons commons-crypto - - com.github.ben-manes.caffeine - caffeine - junit junit diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java index 821f0d825446..233fc0160bd5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.compress; +import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -26,4 +27,7 @@ public interface ByteBuffDecompressionCodec { ByteBuffDecompressor createByteBuffDecompressor(); + Compression.HFileDecompressionContext + getDecompressionContextFromConfiguration(Configuration conf); + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java index 8a0ff71919a9..432b903fe4d6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.compress; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -45,4 +46,11 @@ public interface ByteBuffDecompressor extends Closeable { */ boolean canDecompress(ByteBuff output, ByteBuff input); + /** + * Call before every use of {@link #canDecompress(ByteBuff, ByteBuff)} and + * {@link #decompress(ByteBuff, ByteBuff, int)} to reinitialize the decompressor with settings + * from the HFileInfo. This can matter because ByteBuffDecompressors are reused many times. + */ + void reinit(@Nullable Compression.HFileDecompressionContext newHFileDecompressionContext); + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java index 8096af050037..080fc2c2ec55 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.io.compress; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Comparator; import java.util.NavigableSet; @@ -37,6 +35,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + /** * A global compressor/decompressor pool used to save and reuse (possibly native) * compression/decompression codecs. Copied from the class of the same name in hadoop-common and @@ -56,7 +58,12 @@ public class CodecPool { NavigableSet> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>(); private static LoadingCache, AtomicInteger> createCache() { - return Caffeine.newBuilder().build(key -> new AtomicInteger()); + return CacheBuilder.newBuilder().build(new CacheLoader<>() { + @Override + public AtomicInteger load(Class key) throws Exception { + return new AtomicInteger(); + } + }); } /** @@ -108,26 +115,19 @@ private static boolean payback(ConcurrentMap, NavigableSet> pool /** * Copied from hadoop-common without significant modification. */ - @SuppressWarnings("unchecked") - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", - justification = "LoadingCache will compute value if absent") private static int getLeaseCount(LoadingCache, AtomicInteger> usageCounts, Class codecClass) { - return usageCounts.get((Class) codecClass).get(); + return usageCounts.getUnchecked((Class) codecClass).get(); } /** * Copied from hadoop-common without significant modification. */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", - justification = "LoadingCache will compute value if absent") private static void updateLeaseCount(LoadingCache, AtomicInteger> usageCounts, T codec, int delta) { if (codec != null && usageCounts != null) { Class codecClass = ReflectionUtils.getClass(codec); - usageCounts.get(codecClass).addAndGet(delta); + usageCounts.getUnchecked(codecClass).addAndGet(delta); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java index c187a96702d0..2697ed152844 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.io.compress; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; @@ -26,6 +28,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -548,11 +551,36 @@ public void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) { } } + /** + * Get an object that holds settings used by ByteBuffDecompressor. It's expensive to pull these + * from a Configuration object every time we decompress a block, so pull them here when, for + * example, opening an HFile, and reuse the returned HFileDecompressionContext as much as + * possible. The concrete class of this object will be one that is specific to the codec + * implementation in use. You don't need to inspect it yourself, just pass it along to + * {@link ByteBuffDecompressor#reinit(HFileDecompressionContext)}. + */ + @Nullable + public HFileDecompressionContext + getHFileDecompressionContextForConfiguration(Configuration conf) { + if (supportsByteBuffDecompression()) { + return ((ByteBuffDecompressionCodec) getCodec(conf)) + .getDecompressionContextFromConfiguration(conf); + } else { + return null; + } + } + public String getName() { return compressName; } } + /** + * See {@link Algorithm#getHFileDecompressionContextForConfiguration(Configuration)}. + */ + public static abstract class HFileDecompressionContext implements Closeable, HeapSize { + } + public static Algorithm getCompressionAlgorithmByName(String compressName) { Algorithm[] algos = Algorithm.class.getEnumConstants(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java index 1d6e25675f26..78fa448b63df 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java @@ -59,11 +59,11 @@ private DictionaryCache() { * @param path the hadoop Path where the dictionary is located, as a String * @return the dictionary bytes if successful, null otherwise */ - public static byte[] getDictionary(final Configuration conf, final String path) - throws IOException { + public static byte[] getDictionary(final Configuration conf, final String path) { if (path == null || path.isEmpty()) { return null; } + // Create the dictionary loading cache if we haven't already if (CACHE == null) { synchronized (DictionaryCache.class) { @@ -91,7 +91,7 @@ public byte[] load(String s) throws Exception { try { return CACHE.get(path); } catch (ExecutionException e) { - throw new IOException(e); + throw new RuntimeException("Unable to load dictionary at " + path, e); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java index 2cdbdc620e07..81f8e5fa6a24 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java @@ -139,9 +139,7 @@ private void decompressViaByteBuff(ByteBuff blockBufferWithoutHeader, ByteBuff o Compression.Algorithm compression = fileContext.getCompression(); ByteBuffDecompressor decompressor = compression.getByteBuffDecompressor(); try { - if (decompressor instanceof CanReinit) { - ((CanReinit) decompressor).reinit(conf); - } + decompressor.reinit(fileContext.getDecompressionContext()); decompressor.decompress(blockBufferWithoutHeader, onDiskBlock, onDiskSizeWithoutHeader); } finally { compression.returnByteBuffDecompressor(decompressor); @@ -160,9 +158,7 @@ private boolean canDecompressViaByteBuff(ByteBuff blockBufferWithoutHeader, } else { ByteBuffDecompressor decompressor = fileContext.getCompression().getByteBuffDecompressor(); try { - if (decompressor instanceof CanReinit) { - ((CanReinit) decompressor).reinit(conf); - } + decompressor.reinit(fileContext.getDecompressionContext()); // Even if we have a ByteBuffDecompressor, we still need to check if it can decompress // our particular ByteBuffs return decompressor.canDecompress(blockBufferWithoutHeader, onDiskBlock); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 98520d949af4..5dbf34304266 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.io.hfile; +import edu.umd.cs.findbugs.annotations.Nullable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.InnerStoreCellComparator; @@ -50,6 +52,11 @@ public class HFileContext implements HeapSize, Cloneable { private boolean includesTags; /** Compression algorithm used **/ private Compression.Algorithm compressAlgo = Compression.Algorithm.NONE; + /** + * Details used by compression algorithm that are more efficiently loaded once and then reused + **/ + @Nullable + private Compression.HFileDecompressionContext decompressionContext = null; /** Whether tags to be compressed or not **/ private boolean compressTags; /** the checksum type **/ @@ -80,6 +87,7 @@ public HFileContext(HFileContext context) { this.includesMvcc = context.includesMvcc; this.includesTags = context.includesTags; this.compressAlgo = context.compressAlgo; + this.decompressionContext = context.decompressionContext; this.compressTags = context.compressTags; this.checksumType = context.checksumType; this.bytesPerChecksum = context.bytesPerChecksum; @@ -95,14 +103,16 @@ public HFileContext(HFileContext context) { } HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags, - Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType, - int bytesPerChecksum, int blockSize, DataBlockEncoding encoding, - Encryption.Context cryptoContext, long fileCreateTime, String hfileName, byte[] columnFamily, - byte[] tableName, CellComparator cellComparator, IndexBlockEncoding indexBlockEncoding) { + Compression.Algorithm compressAlgo, Compression.HFileDecompressionContext decompressionContext, + boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize, + DataBlockEncoding encoding, Encryption.Context cryptoContext, long fileCreateTime, + String hfileName, byte[] columnFamily, byte[] tableName, CellComparator cellComparator, + IndexBlockEncoding indexBlockEncoding) { this.usesHBaseChecksum = useHBaseChecksum; this.includesMvcc = includesMvcc; this.includesTags = includesTags; this.compressAlgo = compressAlgo; + this.decompressionContext = decompressionContext; this.compressTags = compressTags; this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; @@ -141,6 +151,20 @@ public Compression.Algorithm getCompression() { return compressAlgo; } + /** + * Get an object that, if non-null, may be cast into a codec-specific type that exposes some + * information from the store-file-specific Configuration that is relevant to decompression. For + * example, ZSTD tables can have "hbase.io.compress.zstd.dictionary" on their table descriptor, + * and decompressions of blocks in that table must use that dictionary. It's cheaper for HBase to + * load these settings into an object of their own once and check this upon each block + * decompression, than it is to call into {@link Configuration#get(String)} on each block + * decompression. + */ + @Nullable + public Compression.HFileDecompressionContext getDecompressionContext() { + return decompressionContext; + } + public boolean isUseHBaseChecksum() { return usesHBaseChecksum; } @@ -238,6 +262,9 @@ public long heapSize() { if (this.tableName != null) { size += ClassSize.sizeOfByteArray(this.tableName.length); } + if (this.decompressionContext != null) { + size += this.decompressionContext.heapSize(); + } return size; } @@ -274,6 +301,8 @@ public String toString() { sb.append(compressAlgo); sb.append(", compressTags="); sb.append(compressTags); + sb.append(", decompressionContext="); + sb.append(decompressionContext); sb.append(", cryptoContext=["); sb.append(cryptoContext); sb.append("]"); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java index 0394f12144e3..341461b26b1f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.io.hfile; +import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -42,6 +44,8 @@ public class HFileContextBuilder { private boolean includesTags = false; /** Compression algorithm used **/ private Algorithm compression = Algorithm.NONE; + @Nullable + private Compression.HFileDecompressionContext decompressionContext = null; /** Whether tags to be compressed or not **/ private boolean compressTags = false; /** the checksum type **/ @@ -73,6 +77,7 @@ public HFileContextBuilder(final HFileContext hfc) { this.includesMvcc = hfc.isIncludesMvcc(); this.includesTags = hfc.isIncludesTags(); this.compression = hfc.getCompression(); + this.decompressionContext = hfc.getDecompressionContext(); this.compressTags = hfc.isCompressTags(); this.checkSumType = hfc.getChecksumType(); this.bytesPerChecksum = hfc.getBytesPerChecksum(); @@ -107,6 +112,12 @@ public HFileContextBuilder withCompression(Algorithm compression) { return this; } + public HFileContextBuilder + withDecompressionContext(@Nullable Compression.HFileDecompressionContext decompressionContext) { + this.decompressionContext = decompressionContext; + return this; + } + public HFileContextBuilder withCompressTags(boolean compressTags) { this.compressTags = compressTags; return this; @@ -169,7 +180,8 @@ public HFileContextBuilder withCellComparator(CellComparator cellComparator) { public HFileContext build() { return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression, - compressTags, checkSumType, bytesPerChecksum, blockSize, encoding, cryptoContext, - fileCreateTime, hfileName, columnFamily, tableName, cellComparator, indexBlockEncoding); + decompressionContext, compressTags, checkSumType, bytesPerChecksum, blockSize, encoding, + cryptoContext, fileCreateTime, hfileName, columnFamily, tableName, cellComparator, + indexBlockEncoding); } } diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java index ec5315aa4c02..314710a6c662 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java @@ -22,10 +22,9 @@ import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.compress.BlockDecompressorHelper; import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor; -import org.apache.hadoop.hbase.io.compress.CanReinit; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.yetus.audience.InterfaceAudience; @@ -34,21 +33,18 @@ * Glue for ByteBuffDecompressor on top of zstd-jni */ @InterfaceAudience.Private -public class ZstdByteBuffDecompressor implements ByteBuffDecompressor, CanReinit { +public class ZstdByteBuffDecompressor implements ByteBuffDecompressor { protected int dictId; - @Nullable - protected ZstdDictDecompress dict; protected ZstdDecompressCtx ctx; // Intended to be set to false by some unit tests private boolean allowByteBuffDecompression; - ZstdByteBuffDecompressor(@Nullable byte[] dictionary) { + ZstdByteBuffDecompressor(@Nullable byte[] dictionaryBytes) { ctx = new ZstdDecompressCtx(); - if (dictionary != null) { - this.dictId = ZstdCodec.getDictionaryId(dictionary); - this.dict = new ZstdDictDecompress(dictionary); - this.ctx.loadDict(this.dict); + if (dictionaryBytes != null) { + this.ctx.loadDict(new ZstdDictDecompress(dictionaryBytes)); + dictId = ZstdCodec.getDictionaryId(dictionaryBytes); } allowByteBuffDecompression = true; } @@ -115,44 +111,30 @@ private int decompressHeapByteBuffers(ByteBuffer output, ByteBuffer input, int i } @Override - public void close() { - ctx.close(); - if (dict != null) { - dict.close(); - } - } - - @Override - public void reinit(Configuration conf) { - if (conf != null) { - // Dictionary may have changed - byte[] b = ZstdCodec.getDictionary(conf); - if (b != null) { - // Don't casually create dictionary objects; they consume native memory - int thisDictId = ZstdCodec.getDictionaryId(b); - if (dict == null || dictId != thisDictId) { - dictId = thisDictId; - ZstdDictDecompress oldDict = dict; - dict = new ZstdDictDecompress(b); - ctx.loadDict(dict); - if (oldDict != null) { - oldDict.close(); - } + public void reinit(@Nullable Compression.HFileDecompressionContext newHFileDecompressionContext) { + if (newHFileDecompressionContext != null) { + if (newHFileDecompressionContext instanceof ZstdHFileDecompressionContext) { + ZstdHFileDecompressionContext zstdContext = + (ZstdHFileDecompressionContext) newHFileDecompressionContext; + allowByteBuffDecompression = zstdContext.isAllowByteBuffDecompression(); + if (zstdContext.getDict() == null && dictId != 0) { + ctx.loadDict((byte[]) null); + dictId = 0; + } else if (zstdContext.getDictId() != dictId) { + this.ctx.loadDict(zstdContext.getDict()); + this.dictId = zstdContext.getDictId(); } } else { - ZstdDictDecompress oldDict = dict; - dict = null; - dictId = 0; - // loadDict((byte[]) accepts null to clear the dictionary - ctx.loadDict((byte[]) null); - if (oldDict != null) { - oldDict.close(); - } + throw new IllegalArgumentException( + "ZstdByteBuffDecompression#reinit() was given an HFileDecompressionContext that was not " + + "a ZstdHFileDecompressionContext, this should never happen"); } - - // unit test helper - this.allowByteBuffDecompression = - conf.getBoolean("hbase.io.compress.zstd.allowByteBuffDecompression", true); } } + + @Override + public void close() { + ctx.close(); + } + } diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java index b06b93e3167b..e934aa12c6cf 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java @@ -18,17 +18,23 @@ package org.apache.hadoop.hbase.io.compress.zstd; import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdDictDecompress; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressionCodec; import org.apache.hadoop.hbase.io.compress.ByteBuffDecompressor; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.DictionaryCache; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -38,6 +44,9 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; + /** * Hadoop ZStandard codec implemented with zstd-jni. *

@@ -51,6 +60,9 @@ public class ZstdCodec implements Configurable, CompressionCodec, ByteBuffDecomp public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; + private static final Cache> DECOMPRESS_DICT_CACHE = + CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES).build(); + private Configuration conf; private int bufferSize; private int level; @@ -125,6 +137,12 @@ public Class getByteBuffDecompressorType() { return ZstdByteBuffDecompressor.class; } + @Override + public Compression.HFileDecompressionContext + getDecompressionContextFromConfiguration(Configuration conf) { + return ZstdHFileDecompressionContext.fromConfiguration(conf); + } + @Override public String getDefaultExtension() { return ".zst"; @@ -145,12 +163,30 @@ static int getBufferSize(Configuration conf) { return size > 0 ? size : ZSTD_BUFFER_SIZE_DEFAULT; } + @Nullable static byte[] getDictionary(final Configuration conf) { String path = conf.get(ZSTD_DICTIONARY_KEY); + return DictionaryCache.getDictionary(conf, path); + } + + /** + * Returns dictionary and its ID number, useful for comparing to other dictionaries for equality + */ + @Nullable + static Pair getDecompressDictionary(final Configuration conf) { + String path = conf.get(ZSTD_DICTIONARY_KEY); + if (path == null) { + return null; + } + try { - return DictionaryCache.getDictionary(conf, path); - } catch (IOException e) { - throw new RuntimeException("Unable to load dictionary at " + path, e); + return DECOMPRESS_DICT_CACHE.get(path, () -> { + byte[] dictBytes = DictionaryCache.getDictionary(conf, path); + int dictId = getDictionaryId(dictBytes); + return new Pair<>(new ZstdDictDecompress(dictBytes), dictId); + }); + } catch (ExecutionException e) { + throw new RuntimeException("Unable to load ZSTD dictionary", e); } } diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java new file mode 100644 index 000000000000..ccca038ac19a --- /dev/null +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.compress.zstd; + +import com.github.luben.zstd.ZstdDictDecompress; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Holds HFile-level settings used by ZstdByteBuffDecompressor. It's expensive to pull these from a + * Configuration object every time we decompress a block, so pull them upon opening an HFile, and + * reuse them in every block that gets decompressed. + */ +@InterfaceAudience.Private +public final class ZstdHFileDecompressionContext extends Compression.HFileDecompressionContext { + + public static final long FIXED_OVERHEAD = + ClassSize.estimateBase(ZstdHFileDecompressionContext.class, false); + + @Nullable + private final ZstdDictDecompress dict; + private final int dictId; + // Intended to be set to false by some unit tests + private final boolean allowByteBuffDecompression; + + private ZstdHFileDecompressionContext(@Nullable ZstdDictDecompress dict, int dictId, + boolean allowByteBuffDecompression) { + this.dict = dict; + this.dictId = dictId; + this.allowByteBuffDecompression = allowByteBuffDecompression; + } + + @Nullable + public ZstdDictDecompress getDict() { + return dict; + } + + public int getDictId() { + return dictId; + } + + public boolean isAllowByteBuffDecompression() { + return allowByteBuffDecompression; + } + + public static ZstdHFileDecompressionContext fromConfiguration(Configuration conf) { + boolean allowByteBuffDecompression = + conf.getBoolean("hbase.io.compress.zstd.allowByteBuffDecompression", true); + Pair dictAndId = ZstdCodec.getDecompressDictionary(conf); + if (dictAndId != null) { + return new ZstdHFileDecompressionContext(dictAndId.getFirst(), dictAndId.getSecond(), + allowByteBuffDecompression); + } else { + return new ZstdHFileDecompressionContext(null, 0, allowByteBuffDecompression); + } + } + + @Override + public void close() throws IOException { + if (dict != null) { + dict.close(); + } + } + + @Override + public long heapSize() { + // ZstdDictDecompress objects are cached and shared between ZstdHFileDecompressionContexts, so + // don't include ours in our heap size. + return FIXED_OVERHEAD; + } + + @Override + public String toString() { + return "ZstdHFileDecompressionContext{dictId=" + dictId + ", allowByteBuffDecompression=" + + allowByteBuffDecompression + '}'; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java index fd10df1b9a67..0c3b7890f6ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java @@ -407,6 +407,8 @@ private HFileContext createHFileContext(Path path, FixedFileTrailer trailer, Con throws IOException { HFileContextBuilder builder = new HFileContextBuilder().withHBaseCheckSum(true) .withHFileName(path.getName()).withCompression(trailer.getCompressionCodec()) + .withDecompressionContext( + trailer.getCompressionCodec().getHFileDecompressionContextForConfiguration(conf)) .withCellComparator(FixedFileTrailer.createComparator(trailer.getComparatorClassName())); // Check for any key material available byte[] keyBytes = trailer.getEncryptionKey();