diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java index becff7665b05..1ebf124f8ce1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java @@ -21,6 +21,8 @@ @InterfaceAudience.Private public class CompressionUtil { + private CompressionUtil() { } + /** * Round up to the next power of two, unless the value would become negative (ints * are signed), in which case just return Integer.MAX_VALUE. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java new file mode 100644 index 000000000000..5e19e93e0fa4 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.io.compress; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + +/** + * A utility class for managing compressor/decompressor dictionary loading and caching of load + * results. Useful for any codec that can support changing dictionaries at runtime, + * such as ZStandard. + */ +@InterfaceAudience.Private +public class DictionaryCache { + + public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size"; + public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024; + public static final String RESOURCE_SCHEME = "resource://"; + + private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class); + private static volatile LoadingCache CACHE; + + private DictionaryCache() { } + + /** + * Load a dictionary or return a previously cached load. + * @param conf configuration + * @param path the hadoop Path where the dictionary is located, as a String + * @return the dictionary bytes if successful, null otherwise + */ + public static byte[] getDictionary(final Configuration conf, final String path) + throws IOException { + if (path == null || path.isEmpty()) { + return null; + } + // Create the dictionary loading cache if we haven't already + if (CACHE == null) { + synchronized (DictionaryCache.class) { + if (CACHE == null) { + final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE); + CACHE = CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterAccess(10, TimeUnit.MINUTES) + .build( + new CacheLoader() { + public byte[] load(String s) throws Exception { + byte[] bytes; + if (path.startsWith(RESOURCE_SCHEME)) { + bytes = loadFromResource(conf, path, maxSize); + } else { + bytes = loadFromHadoopFs(conf, path, maxSize); + } + LOG.info("Loaded dictionary from {} (size {})", s, bytes.length); + return bytes; + } + }); + } + } + } + + // Get or load the dictionary for the given path + try { + return CACHE.get(path); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + // Visible for testing + public static byte[] loadFromResource(final Configuration conf, final String s, + final int maxSize) throws IOException { + if (!s.startsWith(RESOURCE_SCHEME)) { + throw new IllegalArgumentException("Path does not start with " + RESOURCE_SCHEME); + } + final String path = s.substring(RESOURCE_SCHEME.length(), s.length()); + final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path); + if (in == null) { + throw new FileNotFoundException("Resource " + path + " not found"); + } + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final byte[] buffer = new byte[8192]; + try { + int n, len = 0; + do { + n = in.read(buffer); + if (n > 0) { + len += n; + if (len > maxSize) { + throw new IllegalArgumentException("Dictionary " + s + " is too large" + + ", limit=" + maxSize); + } + baos.write(buffer, 0, n); + } + } while (n > 0); + } finally { + in.close(); + } + return baos.toByteArray(); + } + + private static byte[] loadFromHadoopFs(final Configuration conf, final String s, + final int maxSize) throws IOException { + final Path path = new Path(s); + final FileSystem fs = FileSystem.get(path.toUri(), conf); + final FileStatus stat = fs.getFileStatus(path); + if (!stat.isFile()) { + throw new IllegalArgumentException(s + " is not a file"); + } + if (stat.getLen() > maxSize) { + throw new IllegalArgumentException("Dictionary " + s + " is too large" + + ", size=" + stat.getLen() + ", limit=" + maxSize); + } + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final byte[] buffer = new byte[8192]; + try (final FSDataInputStream in = fs.open(path)) { + int n; + do { + n = in.read(buffer); + if (n > 0) { + baos.write(buffer, 0, n); + } + } while (n > 0); + } + return baos.toByteArray(); + } + + // Visible for testing + public static boolean contains(String dictionaryPath) { + if (CACHE != null) { + return CACHE.asMap().containsKey(dictionaryPath); + } + return false; + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java index 616bf0b25fef..e259e1b832fb 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java @@ -17,12 +17,10 @@ package org.apache.hadoop.hbase.io.compress; import static org.junit.Assert.assertTrue; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.Random; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -31,6 +29,8 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +39,11 @@ public class CompressionTestBase { protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class); - static final int LARGE_SIZE = 10 * 1024 * 1024; - static final int VERY_LARGE_SIZE = 100 * 1024 * 1024; - static final int BLOCK_SIZE = 4096; + protected static final int LARGE_SIZE = 10 * 1024 * 1024; + protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024; + protected static final int BLOCK_SIZE = 4096; - static final byte[] SMALL_INPUT; + protected static final byte[] SMALL_INPUT; static { // 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597 SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597]; @@ -67,15 +67,21 @@ public class CompressionTestBase { Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q'); } - protected void codecTest(final CompressionCodec codec, final byte[][] input) - throws Exception { + protected void codecTest(final CompressionCodec codec, final byte[][] input) throws Exception { + codecTest(codec, input, null); + } + + protected void codecTest(final CompressionCodec codec, final byte[][] input, + final Integer expectedCompressedSize) throws Exception { // We do this in Compression.java ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); // Compress + long start = EnvironmentEdgeManager.currentTime(); + Compressor compressor = codec.createCompressor(); + compressor.reinit(((Configurable)codec).getConf()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - CompressionOutputStream out = codec.createOutputStream(baos); + CompressionOutputStream out = codec.createOutputStream(baos, compressor); int inLen = 0; - long start = EnvironmentEdgeManager.currentTime(); for (int i = 0; i < input.length; i++) { out.write(input[i]); inLen += input[i].length; @@ -85,9 +91,18 @@ protected void codecTest(final CompressionCodec codec, final byte[][] input) final byte[] compressed = baos.toByteArray(); LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(), inLen, compressed.length, end - start); + if (expectedCompressedSize != null) { + assertTrue("Expected compressed size does not match: (expected=" + expectedCompressedSize + + ", actual=" + compressed.length + ")", expectedCompressedSize == compressed.length); + } // Decompress final byte[] plain = new byte[inLen]; - CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed)); + Decompressor decompressor = codec.createDecompressor(); + if (decompressor instanceof CanReinit) { + ((CanReinit)decompressor).reinit(((Configurable)codec).getConf()); + } + CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed), + decompressor); start = EnvironmentEdgeManager.currentTime(); IOUtils.readFully(in, plain, 0, plain.length); in.close(); @@ -113,29 +128,37 @@ protected void codecSmallTest(final CompressionCodec codec) throws Exception { /** * Test with a large input (1MB) divided into blocks of 4KB. */ - protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception { - RandomDistribution.DiscreteRNG zipf = + protected void codecLargeTest(final CompressionCodec codec, final double sigma) + throws Exception { + RandomDistribution.DiscreteRNG rng = new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma); final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE]; - for (int i = 0; i < input.length; i++) { - for (int j = 0; j < input[i].length; j++) { - input[i][j] = (byte)zipf.nextInt(); - } - } + fill(rng, input); codecTest(codec, input); } /** * Test with a very large input (100MB) as a single input buffer. */ - protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception { - RandomDistribution.DiscreteRNG zipf = + protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) + throws Exception { + RandomDistribution.DiscreteRNG rng = new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma); final byte[][] input = new byte[1][VERY_LARGE_SIZE]; - for (int i = 0; i < VERY_LARGE_SIZE; i++) { - input[0][i] = (byte)zipf.nextInt(); - } + fill(rng, input); codecTest(codec, input); } + protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) { + for (int i = 0; i < input.length; i++) { + fill(rng, input[i]); + } + } + + protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) { + for (int i = 0; i < input.length; i++) { + input[i] = (byte) rng.nextInt(); + } + } + } diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java index fc5f445d29a2..8f8dae182ab0 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.io.compress.DictionaryCache; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -41,6 +42,7 @@ public class ZstdCodec implements Configurable, CompressionCodec { public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level"; public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; + public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; private Configuration conf; @@ -60,12 +62,12 @@ public void setConf(Configuration conf) { @Override public Compressor createCompressor() { - return new ZstdCompressor(getLevel(conf), getBufferSize(conf)); + return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf)); } @Override public Decompressor createDecompressor() { - return new ZstdDecompressor(getBufferSize(conf)); + return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf)); } @Override @@ -123,4 +125,13 @@ static int getBufferSize(Configuration conf) { return size > 0 ? size : 256 * 1024; // Don't change this default } + static byte[] getDictionary(final Configuration conf) { + String path = conf.get(ZSTD_DICTIONARY_KEY); + try { + return DictionaryCache.getDictionary(conf, path); + } catch (IOException e) { + throw new RuntimeException("Unable to load dictionary at " + path, e); + } + } + } diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java index b9d9da18f0ab..1be613144762 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdDictCompress; /** * Hadoop compressor glue for zstd-jni. @@ -38,13 +39,21 @@ public class ZstdCompressor implements CanReinit, Compressor { protected ByteBuffer inBuf, outBuf; protected boolean finish, finished; protected long bytesRead, bytesWritten; + protected ZstdDictCompress dict; - ZstdCompressor(final int level, final int bufferSize) { + ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) { this.level = level; this.bufferSize = bufferSize; this.inBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf.position(bufferSize); + if (dictionary != null) { + this.dict = new ZstdDictCompress(dictionary, level); + } + } + + ZstdCompressor(final int level, final int bufferSize) { + this(level, bufferSize, null); } @Override @@ -72,7 +81,12 @@ public int compress(final byte[] b, final int off, final int len) throws IOExcep } else { outBuf.clear(); } - int written = Zstd.compress(outBuf, inBuf, level); + int written; + if (dict != null) { + written = Zstd.compress(outBuf, inBuf, dict); + } else { + written = Zstd.compress(outBuf, inBuf, level); + } bytesWritten += written; inBuf.clear(); LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level); @@ -131,6 +145,11 @@ public void reinit(final Configuration conf) { if (conf != null) { // Level might have changed level = ZstdCodec.getLevel(conf); + // Dictionary may have changed + byte[] b = ZstdCodec.getDictionary(conf); + if (b != null) { + dict = new ZstdDictCompress(b, level); + } // Buffer size might have changed int newBufferSize = ZstdCodec.getBufferSize(conf); if (bufferSize != newBufferSize) { diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java index a3d77f51faf3..473eac7e7af2 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdDictDecompress; /** * Hadoop decompressor glue for zstd-java. @@ -38,12 +39,20 @@ public class ZstdDecompressor implements CanReinit, Decompressor { protected int bufferSize; protected int inLen; protected boolean finished; + protected ZstdDictDecompress dict; - ZstdDecompressor(final int bufferSize) { + ZstdDecompressor(final int bufferSize, final byte[] dictionary) { this.bufferSize = bufferSize; this.inBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf.position(bufferSize); + if (dictionary != null) { + this.dict = new ZstdDictDecompress(dictionary); + } + } + + ZstdDecompressor(final int bufferSize) { + this(bufferSize, null); } @Override @@ -60,7 +69,11 @@ public int decompress(final byte[] b, final int off, final int len) throws IOExc inLen -= remaining; outBuf.clear(); int written; - written = Zstd.decompress(outBuf, inBuf); + if (dict != null) { + written = Zstd.decompress(outBuf, inBuf, dict); + } else { + written = Zstd.decompress(outBuf, inBuf); + } inBuf.clear(); LOG.trace("decompress: decompressed {} -> {}", remaining, written); outBuf.flip(); @@ -117,7 +130,7 @@ public boolean needsInput() { @Override public void setDictionary(final byte[] b, final int off, final int len) { LOG.trace("setDictionary: off={} len={}", off, len); - throw new UnsupportedOperationException("setDictionary not supported"); + this.dict = new ZstdDictDecompress(b, off, len); } @Override @@ -143,6 +156,11 @@ public void setInput(final byte[] b, final int off, final int len) { public void reinit(final Configuration conf) { LOG.trace("reinit"); if (conf != null) { + // Dictionary may have changed + byte[] b = ZstdCodec.getDictionary(conf); + if (b != null) { + dict = new ZstdDictDecompress(b); + } // Buffer size might have changed int newBufferSize = ZstdCodec.getBufferSize(conf); if (bufferSize != newBufferSize) { diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java index 6bcb2aa11511..7745c9ab4667 100644 --- a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java +++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java @@ -16,11 +16,20 @@ */ package org.apache.hadoop.hbase.io.compress.zstd; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.compress.CompressionTestBase; +import org.apache.hadoop.hbase.io.compress.DictionaryCache; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.RandomDistribution; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -33,20 +42,20 @@ public class TestZstdCodec extends CompressionTestBase { HBaseClassTestRule.forClass(TestZstdCodec.class); @Test - public void testzstdCodecSmall() throws Exception { + public void testZstdCodecSmall() throws Exception { codecSmallTest(new ZstdCodec()); } @Test - public void testzstdCodecLarge() throws Exception { + public void testZstdCodecLarge() throws Exception { codecLargeTest(new ZstdCodec(), 1.1); // poor compressability codecLargeTest(new ZstdCodec(), 2); codecLargeTest(new ZstdCodec(), 10); // very high compressability } @Test - public void testzstdCodecVeryLarge() throws Exception { - Configuration conf = new Configuration(); + public void testZstdCodecVeryLarge() throws Exception { + Configuration conf = HBaseConfiguration.create(); // ZStandard levels range from 1 to 22. // Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast. conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3); @@ -55,4 +64,55 @@ public void testzstdCodecVeryLarge() throws Exception { codecVeryLargeTest(codec, 3); // like text } + @Test + public void testZstdCodecWithDictionary() throws Exception { + // zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of + // 365533 bytes + final int expectedCompressedSize = 365533; + Configuration conf = HBaseConfiguration.create(); + conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3); + // Configure for dictionary available in test resources + final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict"; + conf.set(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath); + // Load test data from test resources + // This will throw an IOException if the test data cannot be loaded + final byte[] testData = DictionaryCache.loadFromResource(conf, + DictionaryCache.RESOURCE_SCHEME + "zstd.test.data", /* maxSize */ 1024*1024); + assertNotNull("Failed to load test data", testData); + // Run the test + // This will throw an IOException of some kind if there is a problem loading or using the + // dictionary. + ZstdCodec codec = new ZstdCodec(); + codec.setConf(conf); + codecTest(codec, new byte[][] { testData }, expectedCompressedSize); + // Assert that the dictionary was actually loaded + assertTrue("Dictionary was not loaded by codec", DictionaryCache.contains(dictionaryPath)); + } + + // + // For generating the test data in src/test/resources/ + // + + public static void main(String[] args) throws IOException { + // Write 1000 1k blocks for training to the specified file + // Train with: + // zstd --train-fastcover=k=32,b=8 -B1024 -o + if (args.length < 1) { + System.err.println("Usage: TestZstdCodec "); + System.exit(-1); + } + final RandomDistribution.DiscreteRNG rng = + new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, 2); + final File outFile = new File(args[0]); + final byte[] buffer = new byte[1024]; + System.out.println("Generating " + outFile); + try (FileOutputStream os = new FileOutputStream(outFile)) { + for (int i = 0; i < 1000; i++) { + fill(rng, buffer); + os.write(buffer); + } + } + System.out.println("Done"); + } + } diff --git a/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.data b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.data new file mode 100644 index 000000000000..a497af551fd3 Binary files /dev/null and b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.data differ diff --git a/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.dict b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.dict new file mode 100644 index 000000000000..e5b79d2d7552 Binary files /dev/null and b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.dict differ