Skip to content

Commit 816f2fd

Browse files
committed
HBASE-26353 Support loadable dictionaries in hbase-compression-zstd
ZStandard supports initialization of compressors and decompressors with a precomputed dictionary, which can dramatically improve and speed up compression of tables with small values. For more details, please see The Case For Small Data Compression https://github.com/facebook/zstd#the-case-for-small-data-compression
1 parent ad7d698 commit 816f2fd

48 files changed

Lines changed: 354 additions & 122 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
package org.apache.hadoop.hbase.io.compress;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* This is a marker interface that indicates if a compressor or decompressor
24+
* type can support reinitialization via reinit(Configuration conf).
25+
*/
26+
@InterfaceAudience.Private
27+
public interface CanReinit {
28+
29+
void reinit(Configuration conf);
30+
31+
}

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/AbstractDataBlockEncoder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
21+
22+
import org.apache.hadoop.conf.Configuration;
2123
import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
2224
import org.apache.hadoop.hbase.Cell;
2325
import org.apache.hadoop.hbase.KeyValue;
@@ -35,8 +37,8 @@ public HFileBlockEncodingContext newDataBlockEncodingContext(
3537
}
3638

3739
@Override
38-
public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
39-
return new HFileBlockDefaultDecodingContext(meta);
40+
public HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf, HFileContext meta) {
41+
return new HFileBlockDefaultDecodingContext(conf, meta);
4042
}
4143

4244
protected void postEncoding(HFileBlockEncodingContext encodingCtx)

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.io.DataOutputStream;
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
23+
24+
import org.apache.hadoop.conf.Configuration;
2325
import org.apache.hadoop.hbase.Cell;
2426
import org.apache.hadoop.hbase.CellComparator;
2527
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -107,11 +109,13 @@ HFileBlockEncodingContext newDataBlockEncodingContext(
107109
* Creates an encoder specific decoding context, which will prepare the data
108110
* before actual decoding
109111
*
112+
* @param conf
113+
* store configuration
110114
* @param meta
111115
* HFile meta data
112116
* @return a newly created decoding context
113117
*/
114-
HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta);
118+
HFileBlockDecodingContext newDataBlockDecodingContext(Configuration conf, HFileContext meta);
115119

116120
/**
117121
* An interface which enable to seek while underlying data is encoded.

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Iterator;
2828
import java.util.List;
2929
import org.apache.commons.lang3.NotImplementedException;
30+
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.hbase.Cell;
3132
import org.apache.hadoop.hbase.HConstants;
3233
import org.apache.hadoop.hbase.KeyValue;
@@ -57,6 +58,7 @@ public class EncodedDataBlock {
5758
private HFileContext meta;
5859

5960
private final DataBlockEncoding encoding;
61+
private final Configuration conf;
6062

6163
// The is for one situation that there are some cells includes tags and others are not.
6264
// isTagsLenZero stores if cell tags length is zero before doing encoding since we need
@@ -70,11 +72,12 @@ public class EncodedDataBlock {
7072
* Create a buffer which will be encoded using dataBlockEncoder.
7173
* @param dataBlockEncoder Algorithm used for compression.
7274
* @param encoding encoding type used
73-
* @param rawKVs
74-
* @param meta
75+
* @param rawKVs raw KVs
76+
* @param meta hfile context
77+
* @param conf store configuration
7578
*/
7679
public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding,
77-
byte[] rawKVs, HFileContext meta) {
80+
byte[] rawKVs, HFileContext meta, Configuration conf) {
7881
Preconditions.checkNotNull(encoding,
7982
"Cannot create encoded data block with null encoder");
8083
this.dataBlockEncoder = dataBlockEncoder;
@@ -83,6 +86,7 @@ public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding enc
8386
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
8487
this.rawKVs = rawKVs;
8588
this.meta = meta;
89+
this.conf = conf;
8690
}
8791

8892
/**
@@ -115,7 +119,7 @@ public Cell next() {
115119
if (decompressedData == null) {
116120
try {
117121
decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder
118-
.newDataBlockDecodingContext(meta));
122+
.newDataBlockDecodingContext(conf, meta));
119123
} catch (IOException e) {
120124
throw new RuntimeException("Problem with data block encoder, " +
121125
"most likely it requested more bytes than are available.", e);

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import java.io.IOException;
2121
import java.io.InputStream;
2222
import org.apache.commons.io.IOUtils;
23+
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
2425
import org.apache.hadoop.hbase.io.TagCompressionContext;
26+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2527
import org.apache.hadoop.hbase.io.compress.Compression;
2628
import org.apache.hadoop.hbase.io.crypto.Cipher;
2729
import org.apache.hadoop.hbase.io.crypto.Decryptor;
@@ -30,6 +32,7 @@
3032
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
3133
import org.apache.hadoop.hbase.nio.ByteBuff;
3234
import org.apache.hadoop.hbase.util.Bytes;
35+
import org.apache.hadoop.io.compress.Decompressor;
3336
import org.apache.yetus.audience.InterfaceAudience;
3437

3538
/**
@@ -41,10 +44,12 @@
4144
*/
4245
@InterfaceAudience.Private
4346
public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingContext {
47+
private final Configuration conf;
4448
private final HFileContext fileContext;
4549
private TagCompressionContext tagCompressionContext;
4650

47-
public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
51+
public HFileBlockDefaultDecodingContext(Configuration conf, HFileContext fileContext) {
52+
this.conf = conf;
4853
this.fileContext = fileContext;
4954
}
5055

@@ -87,8 +92,24 @@ public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWit
8792

8893
Compression.Algorithm compression = fileContext.getCompression();
8994
if (compression != Compression.Algorithm.NONE) {
90-
Compression.decompress(blockBufferWithoutHeader, dataInputStream,
91-
uncompressedSizeWithoutHeader, compression);
95+
Decompressor decompressor = null;
96+
try {
97+
decompressor = compression.getDecompressor();
98+
// Some algorithms don't return decompressors and accept null as a valid parameter for
99+
// same when creating decompression streams. We can ignore these cases wrt reinit.
100+
if (decompressor != null && decompressor instanceof CanReinit) {
101+
((CanReinit)decompressor).reinit(conf);
102+
}
103+
try (InputStream is =
104+
compression.createDecompressionStream(dataInputStream, decompressor, 0)) {
105+
BlockIOUtils.readFullyWithHeapBuffer(is, blockBufferWithoutHeader,
106+
uncompressedSizeWithoutHeader);
107+
}
108+
} finally {
109+
if (decompressor != null) {
110+
compression.returnDecompressor(decompressor);
111+
}
112+
}
92113
} else {
93114
BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
94115
onDiskSizeWithoutHeader);

hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2223
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425
import org.slf4j.Logger;
@@ -30,7 +31,7 @@
3031
*/
3132
@InterfaceAudience.Private
3233
public abstract class HadoopCompressor<T extends Compressor>
33-
implements org.apache.hadoop.io.compress.Compressor {
34+
implements CanReinit, org.apache.hadoop.io.compress.Compressor {
3435

3536
protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
3637
protected T compressor;

hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2223
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2324
import org.apache.hadoop.io.compress.Compressor;
2425
import org.apache.yetus.audience.InterfaceAudience;
@@ -31,7 +32,7 @@
3132
* Hadoop compressor glue for lz4-java.
3233
*/
3334
@InterfaceAudience.Private
34-
public class Lz4Compressor implements Compressor {
35+
public class Lz4Compressor implements CanReinit, Compressor {
3536

3637
protected static final Logger LOG = LoggerFactory.getLogger(Lz4Compressor.class);
3738
protected LZ4Compressor compressor;

hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2223
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2324
import org.apache.hadoop.io.compress.Compressor;
2425
import org.apache.yetus.audience.InterfaceAudience;
@@ -30,7 +31,7 @@
3031
* Hadoop compressor glue for Xerial Snappy.
3132
*/
3233
@InterfaceAudience.Private
33-
public class SnappyCompressor implements Compressor {
34+
public class SnappyCompressor implements CanReinit, Compressor {
3435

3536
protected static final Logger LOG = LoggerFactory.getLogger(SnappyCompressor.class);
3637
protected ByteBuffer inBuf, outBuf;

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,32 @@
1616
*/
1717
package org.apache.hadoop.hbase.io.compress.zstd;
1818

19+
import java.io.ByteArrayOutputStream;
1920
import java.io.IOException;
2021
import java.io.InputStream;
2122
import java.io.OutputStream;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.TimeUnit;
2225
import org.apache.hadoop.conf.Configurable;
2326
import org.apache.hadoop.conf.Configuration;
2427
import org.apache.hadoop.fs.CommonConfigurationKeys;
28+
import org.apache.hadoop.fs.FSDataInputStream;
29+
import org.apache.hadoop.fs.FileStatus;
30+
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.Path;
2532
import org.apache.hadoop.io.compress.BlockCompressorStream;
2633
import org.apache.hadoop.io.compress.BlockDecompressorStream;
2734
import org.apache.hadoop.io.compress.CompressionCodec;
2835
import org.apache.hadoop.io.compress.CompressionInputStream;
2936
import org.apache.hadoop.io.compress.CompressionOutputStream;
3037
import org.apache.hadoop.io.compress.Compressor;
3138
import org.apache.hadoop.io.compress.Decompressor;
39+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
40+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
41+
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
3242
import org.apache.yetus.audience.InterfaceAudience;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
3345

3446
/**
3547
* Hadoop ZStandard codec implemented with zstd-jni.
@@ -41,7 +53,13 @@ public class ZstdCodec implements Configurable, CompressionCodec {
4153

4254
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
4355
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
56+
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
57+
public static final String ZSTD_DICTIONARY_MAX_SIZE_KEY =
58+
"hbase.io.compress.zstd.dictionary.max.size";
59+
public static final int DEFAULT_ZSTD_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
4460

61+
private static final Logger LOG = LoggerFactory.getLogger(ZstdCodec.class);
62+
private static volatile LoadingCache<String, byte[]> DICTIONARY_CACHE;
4563
private Configuration conf;
4664

4765
public ZstdCodec() {
@@ -60,12 +78,12 @@ public void setConf(Configuration conf) {
6078

6179
@Override
6280
public Compressor createCompressor() {
63-
return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
81+
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
6482
}
6583

6684
@Override
6785
public Decompressor createDecompressor() {
68-
return new ZstdDecompressor(getBufferSize(conf));
86+
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
6987
}
7088

7189
@Override
@@ -123,4 +141,59 @@ static int getBufferSize(Configuration conf) {
123141
return size > 0 ? size : 256 * 1024; // Don't change this default
124142
}
125143

144+
static byte[] getDictionary(final Configuration conf) {
145+
// Get the dictionary path, if set
146+
final String s = conf.get(ZSTD_DICTIONARY_KEY);
147+
if (s == null || s.isEmpty()) {
148+
return null;
149+
}
150+
151+
// Create the dictionary loading cache if we haven't already
152+
if (DICTIONARY_CACHE == null) {
153+
synchronized (ZstdCodec.class) {
154+
if (DICTIONARY_CACHE == null) {
155+
DICTIONARY_CACHE = CacheBuilder.newBuilder()
156+
.expireAfterAccess(1, TimeUnit.HOURS)
157+
.build(
158+
new CacheLoader<String, byte[]>() {
159+
public byte[] load(String s) throws Exception {
160+
final Path path = new Path(s);
161+
final FileSystem fs = FileSystem.get(path.toUri(), conf);
162+
final FileStatus stat = fs.getFileStatus(path);
163+
if (!stat.isFile()) {
164+
throw new IllegalArgumentException(s + " is not a file");
165+
}
166+
final int limit = conf.getInt(ZSTD_DICTIONARY_MAX_SIZE_KEY,
167+
DEFAULT_ZSTD_DICTIONARY_MAX_SIZE);
168+
if (stat.getLen() > limit) {
169+
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
170+
", size=" + stat.getLen() + ", limit=" + limit);
171+
}
172+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
173+
final byte[] buffer = new byte[8192];
174+
try (final FSDataInputStream in = fs.open(path)) {
175+
int n;
176+
do {
177+
n = in.read(buffer);
178+
if (n > 0) {
179+
baos.write(buffer, 0, n);
180+
}
181+
} while (n > 0);
182+
}
183+
LOG.info("Loaded {} from {} (size {})", ZSTD_DICTIONARY_KEY, s, stat.getLen());
184+
return baos.toByteArray();
185+
}
186+
});
187+
}
188+
}
189+
}
190+
191+
// Get or load the dictionary for the given path
192+
try {
193+
return DICTIONARY_CACHE.get(s);
194+
} catch (ExecutionException e) {
195+
throw new RuntimeException(e);
196+
}
197+
}
198+
126199
}

0 commit comments

Comments
 (0)