Skip to content

Commit 8b813db

Browse files
committed
HBASE-26353 Support loadable dictionaries in hbase-compression-zstd
ZStandard supports initialization of compressors and decompressors with a precomputed dictionary, which can dramatically improve and speed up compression of tables with small values. For more details, please see The Case For Small Data Compression https://github.com/facebook/zstd#the-case-for-small-data-compression
1 parent ce44e16 commit 8b813db

3 files changed

Lines changed: 122 additions & 16 deletions

File tree

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,32 @@
1616
*/
1717
package org.apache.hadoop.hbase.io.compress.zstd;
1818

19+
import java.io.ByteArrayOutputStream;
1920
import java.io.IOException;
2021
import java.io.InputStream;
2122
import java.io.OutputStream;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.TimeUnit;
2225
import org.apache.hadoop.conf.Configurable;
2326
import org.apache.hadoop.conf.Configuration;
2427
import org.apache.hadoop.fs.CommonConfigurationKeys;
28+
import org.apache.hadoop.fs.FSDataInputStream;
29+
import org.apache.hadoop.fs.FileStatus;
30+
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.Path;
2532
import org.apache.hadoop.io.compress.BlockCompressorStream;
2633
import org.apache.hadoop.io.compress.BlockDecompressorStream;
2734
import org.apache.hadoop.io.compress.CompressionCodec;
2835
import org.apache.hadoop.io.compress.CompressionInputStream;
2936
import org.apache.hadoop.io.compress.CompressionOutputStream;
3037
import org.apache.hadoop.io.compress.Compressor;
3138
import org.apache.hadoop.io.compress.Decompressor;
39+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
40+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
41+
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
3242
import org.apache.yetus.audience.InterfaceAudience;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
3345

3446
/**
3547
* Hadoop ZStandard codec implemented with zstd-jni.
@@ -41,7 +53,13 @@ public class ZstdCodec implements Configurable, CompressionCodec {
4153

4254
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
4355
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
56+
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
57+
public static final String ZSTD_DICTIONARY_MAX_SIZE_KEY =
58+
"hbase.io.compress.zstd.dictionary.max.size";
59+
public static final int DEFAULT_ZSTD_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
4460

61+
private static final Logger LOG = LoggerFactory.getLogger(ZstdCodec.class);
62+
private static volatile LoadingCache<String, byte[]> DICTIONARY_CACHE;
4563
private Configuration conf;
4664

4765
public ZstdCodec() {
@@ -60,12 +78,12 @@ public void setConf(Configuration conf) {
6078

6179
@Override
6280
public Compressor createCompressor() {
63-
return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
81+
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
6482
}
6583

6684
@Override
6785
public Decompressor createDecompressor() {
68-
return new ZstdDecompressor(getBufferSize(conf));
86+
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
6987
}
7088

7189
@Override
@@ -123,4 +141,59 @@ static int getBufferSize(Configuration conf) {
123141
return size > 0 ? size : 256 * 1024; // Don't change this default
124142
}
125143

144+
static byte[] getDictionary(final Configuration conf) {
145+
// Get the dictionary path, if set
146+
final String s = conf.get(ZSTD_DICTIONARY_KEY);
147+
if (s == null) {
148+
return null;
149+
}
150+
151+
// Create the dictionary loading cache if we haven't already
152+
if (DICTIONARY_CACHE == null) {
153+
synchronized (ZstdCodec.class) {
154+
if (DICTIONARY_CACHE == null) {
155+
DICTIONARY_CACHE = CacheBuilder.newBuilder()
156+
.expireAfterAccess(1, TimeUnit.HOURS)
157+
.build(
158+
new CacheLoader<String, byte[]>() {
159+
public byte[] load(String s) throws Exception {
160+
final Path path = new Path(s);
161+
final FileSystem fs = FileSystem.get(path.toUri(), conf);
162+
final FileStatus stat = fs.getFileStatus(path);
163+
if (!stat.isFile()) {
164+
throw new IllegalArgumentException(s + " is not a file");
165+
}
166+
final int limit = conf.getInt(ZSTD_DICTIONARY_MAX_SIZE_KEY,
167+
DEFAULT_ZSTD_DICTIONARY_MAX_SIZE);
168+
if (stat.getLen() > limit) {
169+
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
170+
", size=" + stat.getLen() + ", limit=" + limit);
171+
}
172+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
173+
final byte[] buffer = new byte[8192];
174+
try (final FSDataInputStream in = fs.open(path)) {
175+
int n;
176+
do {
177+
n = in.read(buffer);
178+
if (n > 0) {
179+
baos.write(buffer, 0, n);
180+
}
181+
} while (n > 0);
182+
}
183+
LOG.info("Loaded {} from {} (size {})", ZSTD_DICTIONARY_KEY, s, stat.getLen());
184+
return baos.toByteArray();
185+
}
186+
});
187+
}
188+
}
189+
}
190+
191+
// Get or load the dictionary for the given path
192+
try {
193+
return DICTIONARY_CACHE.get(s);
194+
} catch (ExecutionException e) {
195+
throw new RuntimeException(e);
196+
}
197+
}
198+
126199
}

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727
import com.github.luben.zstd.Zstd;
28+
import com.github.luben.zstd.ZstdDictCompress;
2829

2930
/**
3031
* Hadoop compressor glue for zstd-jni.
@@ -37,17 +38,25 @@ public class ZstdCompressor implements Compressor {
3738
protected ByteBuffer inBuf, outBuf;
3839
protected boolean finish, finished;
3940
protected long bytesRead, bytesWritten;
41+
protected ZstdDictCompress dict;
4042

41-
ZstdCompressor(int level, int bufferSize) {
43+
ZstdCompressor(final int level, final int bufferSize, final byte[] dict) {
4244
this.level = level;
4345
this.bufferSize = bufferSize;
4446
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
4547
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
4648
this.outBuf.position(bufferSize);
49+
if (dict != null) {
50+
this.dict = new ZstdDictCompress(dict, level);
51+
}
52+
}
53+
54+
ZstdCompressor(final int level, final int bufferSize) {
55+
this(level, bufferSize, null);
4756
}
4857

4958
@Override
50-
public int compress(byte[] b, int off, int len) throws IOException {
59+
public int compress(final byte[] b, final int off, final int len) throws IOException {
5160
// If we have previously compressed our input and still have some buffered bytes
5261
// remaining, provide them to the caller.
5362
if (outBuf.hasRemaining()) {
@@ -71,7 +80,12 @@ public int compress(byte[] b, int off, int len) throws IOException {
7180
} else {
7281
outBuf.clear();
7382
}
74-
int written = Zstd.compress(outBuf, inBuf, level, true);
83+
int written;
84+
if (dict != null) {
85+
written = Zstd.compress(outBuf, inBuf, dict);
86+
} else {
87+
written = Zstd.compress(outBuf, inBuf, level);
88+
}
7589
bytesWritten += written;
7690
inBuf.clear();
7791
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
@@ -125,11 +139,16 @@ public boolean needsInput() {
125139
}
126140

127141
@Override
128-
public void reinit(Configuration conf) {
142+
public void reinit(final Configuration conf) {
129143
LOG.trace("reinit");
130144
if (conf != null) {
131145
// Level might have changed
132146
level = ZstdCodec.getLevel(conf);
147+
// Dictionary may have changed
148+
byte[] b = ZstdCodec.getDictionary(conf);
149+
if (b != null) {
150+
dict = new ZstdDictCompress(b, level);
151+
}
133152
// Buffer size might have changed
134153
int newBufferSize = ZstdCodec.getBufferSize(conf);
135154
if (bufferSize != newBufferSize) {
@@ -154,12 +173,12 @@ public void reset() {
154173
}
155174

156175
@Override
157-
public void setDictionary(byte[] b, int off, int len) {
176+
public void setDictionary(final byte[] b, final int off, final int len) {
158177
throw new UnsupportedOperationException("setDictionary is not supported");
159178
}
160179

161180
@Override
162-
public void setInput(byte[] b, int off, int len) {
181+
public void setInput(final byte[] b, final int off, final int len) {
163182
LOG.trace("setInput: off={} len={}", off, len);
164183
if (inBuf.remaining() < len) {
165184
// Get a new buffer that can accomodate the accumulated input plus the additional
@@ -179,7 +198,7 @@ public void setInput(byte[] b, int off, int len) {
179198

180199
// Package private
181200

182-
int maxCompressedLength(int len) {
201+
int maxCompressedLength(final int len) {
183202
return (int) Zstd.compressBound(len);
184203
}
185204

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626
import com.github.luben.zstd.Zstd;
27+
import com.github.luben.zstd.ZstdDictDecompress;
2728

2829
/**
2930
* Hadoop decompressor glue for zstd-java.
@@ -35,15 +36,23 @@ public class ZstdDecompressor implements Decompressor {
3536
protected ByteBuffer inBuf, outBuf;
3637
protected int inLen;
3738
protected boolean finished;
39+
protected ZstdDictDecompress dict;
3840

39-
ZstdDecompressor(int bufferSize) {
41+
ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
4042
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
4143
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
4244
this.outBuf.position(bufferSize);
45+
if (dictionary != null) {
46+
this.dict = new ZstdDictDecompress(dictionary);
47+
}
48+
}
49+
50+
ZstdDecompressor(final int bufferSize) {
51+
this(bufferSize, null);
4352
}
4453

4554
@Override
46-
public int decompress(byte[] b, int off, int len) throws IOException {
55+
public int decompress(final byte[] b, final int off, final int len) throws IOException {
4756
if (outBuf.hasRemaining()) {
4857
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
4958
outBuf.get(b, off, n);
@@ -55,7 +64,12 @@ public int decompress(byte[] b, int off, int len) throws IOException {
5564
int remaining = inBuf.remaining();
5665
inLen -= remaining;
5766
outBuf.clear();
58-
int written = Zstd.decompress(outBuf, inBuf);
67+
int written;
68+
if (dict != null) {
69+
written = Zstd.decompress(outBuf, inBuf, dict);
70+
} else {
71+
written = Zstd.decompress(outBuf, inBuf);
72+
}
5973
inBuf.clear();
6074
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
6175
outBuf.flip();
@@ -104,24 +118,24 @@ public void reset() {
104118

105119
@Override
106120
public boolean needsInput() {
107-
boolean b = (inBuf.position() == 0);
121+
final boolean b = (inBuf.position() == 0);
108122
LOG.trace("needsInput: {}", b);
109123
return b;
110124
}
111125

112126
@Override
113-
public void setDictionary(byte[] b, int off, int len) {
127+
public void setDictionary(final byte[] b, final int off, final int len) {
114128
throw new UnsupportedOperationException("setDictionary is not supported");
115129
}
116130

117131
@Override
118-
public void setInput(byte[] b, int off, int len) {
132+
public void setInput(final byte[] b, final int off, final int len) {
119133
LOG.trace("setInput: off={} len={}", off, len);
120134
if (inBuf.remaining() < len) {
121135
// Get a new buffer that can accomodate the accumulated input plus the additional
122136
// input that would cause a buffer overflow without reallocation.
123137
// This condition should be fortunately rare, because it is expensive.
124-
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
138+
final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
125139
LOG.trace("setInput: resize inBuf {}", needed);
126140
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
127141
inBuf.flip();

0 commit comments

Comments
 (0)