Skip to content

Commit 8931a00

Browse files
committed
Extend TestZstdCodec with a dictionary compression test
This will cause a small merge conflict between apache#3730 and apache#3748 because we need CanReinit here too.
1 parent ddaf259 commit 8931a00

8 files changed

Lines changed: 214 additions & 51 deletions

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
package org.apache.hadoop.hbase.io.compress;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* This is a marker interface that indicates if a compressor or decompressor
24+
* type can support reinitialization via reinit(Configuration conf).
25+
*/
26+
@InterfaceAudience.Private
27+
public interface CanReinit {
28+
29+
void reinit(Configuration conf);
30+
31+
}

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.apache.hadoop.hbase.io.compress;
1818

1919
import java.io.ByteArrayOutputStream;
20+
import java.io.FileNotFoundException;
2021
import java.io.IOException;
22+
import java.io.InputStream;
2123
import java.util.concurrent.ExecutionException;
2224
import java.util.concurrent.TimeUnit;
2325
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,7 @@ public class DictionaryCache {
4244

4345
public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
4446
public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
47+
public static final String RESOURCE_SCHEME = "resource://";
4548

4649
private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
4750
private static volatile LoadingCache<String, byte[]> CACHE;
@@ -61,37 +64,21 @@ public static byte[] getDictionary(final Configuration conf, final String path)
6164
if (CACHE == null) {
6265
synchronized (DictionaryCache.class) {
6366
if (CACHE == null) {
67+
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
6468
CACHE = CacheBuilder.newBuilder()
6569
.maximumSize(100)
6670
.expireAfterAccess(10, TimeUnit.MINUTES)
6771
.build(
6872
new CacheLoader<String, byte[]>() {
6973
public byte[] load(String s) throws Exception {
70-
final Path path = new Path(s);
71-
final FileSystem fs = FileSystem.get(path.toUri(), conf);
72-
final FileStatus stat = fs.getFileStatus(path);
73-
if (!stat.isFile()) {
74-
throw new IllegalArgumentException(s + " is not a file");
74+
byte[] bytes;
75+
if (path.startsWith(RESOURCE_SCHEME)) {
76+
bytes = loadFromResource(conf, path, maxSize);
77+
} else {
78+
bytes = loadFromHadoopFs(conf, path, maxSize);
7579
}
76-
final int limit = conf.getInt(DICTIONARY_MAX_SIZE_KEY,
77-
DEFAULT_DICTIONARY_MAX_SIZE);
78-
if (stat.getLen() > limit) {
79-
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
80-
", size=" + stat.getLen() + ", limit=" + limit);
81-
}
82-
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
83-
final byte[] buffer = new byte[8192];
84-
try (final FSDataInputStream in = fs.open(path)) {
85-
int n;
86-
do {
87-
n = in.read(buffer);
88-
if (n > 0) {
89-
baos.write(buffer, 0, n);
90-
}
91-
} while (n > 0);
92-
}
93-
LOG.info("Loaded dictionary from {} (size {})", s, stat.getLen());
94-
return baos.toByteArray();
80+
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
81+
return bytes;
9582
}
9683
});
9784
}
@@ -106,4 +93,70 @@ public byte[] load(String s) throws Exception {
10693
}
10794
}
10895

96+
// Visible for testing
97+
public static byte[] loadFromResource(final Configuration conf, final String s,
98+
final int maxSize) throws IOException {
99+
if (!s.startsWith(RESOURCE_SCHEME)) {
100+
throw new IllegalArgumentException("Path does not start with " + RESOURCE_SCHEME);
101+
}
102+
final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
103+
final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
104+
if (in == null) {
105+
throw new FileNotFoundException("Resource " + path + " not found");
106+
}
107+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
108+
final byte[] buffer = new byte[8192];
109+
try {
110+
int n, len = 0;
111+
do {
112+
n = in.read(buffer);
113+
if (n > 0) {
114+
len += n;
115+
if (len > maxSize) {
116+
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
117+
", limit=" + maxSize);
118+
}
119+
baos.write(buffer, 0, n);
120+
}
121+
} while (n > 0);
122+
} finally {
123+
in.close();
124+
}
125+
return baos.toByteArray();
126+
}
127+
128+
private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
129+
final int maxSize) throws IOException {
130+
final Path path = new Path(s);
131+
final FileSystem fs = FileSystem.get(path.toUri(), conf);
132+
final FileStatus stat = fs.getFileStatus(path);
133+
if (!stat.isFile()) {
134+
throw new IllegalArgumentException(s + " is not a file");
135+
}
136+
if (stat.getLen() > maxSize) {
137+
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
138+
", size=" + stat.getLen() + ", limit=" + maxSize);
139+
}
140+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
141+
final byte[] buffer = new byte[8192];
142+
try (final FSDataInputStream in = fs.open(path)) {
143+
int n;
144+
do {
145+
n = in.read(buffer);
146+
if (n > 0) {
147+
baos.write(buffer, 0, n);
148+
}
149+
} while (n > 0);
150+
}
151+
return baos.toByteArray();
152+
}
153+
154+
// Visible for testing
155+
public static boolean contains(String dictionaryPath) {
156+
if (CACHE != null) {
157+
return CACHE.asMap().containsKey(dictionaryPath);
158+
}
159+
return false;
160+
}
161+
109162
}

hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,31 @@
2424
import java.util.Random;
2525

2626
import org.apache.hadoop.conf.Configurable;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.hbase.HBaseConfiguration;
2729
import org.apache.hadoop.hbase.util.Bytes;
2830
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
2931
import org.apache.hadoop.hbase.util.RandomDistribution;
3032
import org.apache.hadoop.io.IOUtils;
3133
import org.apache.hadoop.io.compress.CompressionCodec;
3234
import org.apache.hadoop.io.compress.CompressionInputStream;
3335
import org.apache.hadoop.io.compress.CompressionOutputStream;
36+
import org.apache.hadoop.io.compress.Compressor;
37+
import org.apache.hadoop.io.compress.Decompressor;
3438
import org.slf4j.Logger;
3539
import org.slf4j.LoggerFactory;
3640

3741
@SuppressWarnings("checkstyle:innerassignment")
3842
public class CompressionTestBase {
3943

4044
protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
45+
protected static final Configuration conf = HBaseConfiguration.create();
4146

42-
static final int LARGE_SIZE = 10 * 1024 * 1024;
43-
static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
44-
static final int BLOCK_SIZE = 4096;
47+
protected static final int LARGE_SIZE = 10 * 1024 * 1024;
48+
protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
49+
protected static final int BLOCK_SIZE = 4096;
4550

46-
static final byte[] SMALL_INPUT;
51+
protected static final byte[] SMALL_INPUT;
4752
static {
4853
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
4954
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
@@ -67,15 +72,17 @@ public class CompressionTestBase {
6772
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
6873
}
6974

70-
protected void codecTest(final CompressionCodec codec, final byte[][] input)
71-
throws Exception {
75+
protected void codecTest(final Configuration conf, final CompressionCodec codec,
76+
final byte[][] input) throws Exception {
7277
// We do this in Compression.java
7378
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
7479
// Compress
80+
long start = EnvironmentEdgeManager.currentTime();
81+
Compressor compressor = codec.createCompressor();
82+
compressor.reinit(conf);
7583
ByteArrayOutputStream baos = new ByteArrayOutputStream();
76-
CompressionOutputStream out = codec.createOutputStream(baos);
84+
CompressionOutputStream out = codec.createOutputStream(baos, compressor);
7785
int inLen = 0;
78-
long start = EnvironmentEdgeManager.currentTime();
7986
for (int i = 0; i < input.length; i++) {
8087
out.write(input[i]);
8188
inLen += input[i].length;
@@ -87,7 +94,12 @@ protected void codecTest(final CompressionCodec codec, final byte[][] input)
8794
inLen, compressed.length, end - start);
8895
// Decompress
8996
final byte[] plain = new byte[inLen];
90-
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
97+
Decompressor decompressor = codec.createDecompressor();
98+
if (decompressor instanceof CanReinit) {
99+
((CanReinit)decompressor).reinit(conf);
100+
}
101+
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed),
102+
decompressor);
91103
start = EnvironmentEdgeManager.currentTime();
92104
IOUtils.readFully(in, plain, 0, plain.length);
93105
in.close();
@@ -107,35 +119,41 @@ protected void codecTest(final CompressionCodec codec, final byte[][] input)
107119
* Test with one smallish input buffer
108120
*/
109121
protected void codecSmallTest(final CompressionCodec codec) throws Exception {
110-
codecTest(codec, new byte[][] { SMALL_INPUT });
122+
codecTest(conf, codec, new byte[][] { SMALL_INPUT });
111123
}
112124

113125
/**
114126
* Test with a large input (1MB) divided into blocks of 4KB.
115127
*/
116128
protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
117-
RandomDistribution.DiscreteRNG zipf =
129+
RandomDistribution.DiscreteRNG rng =
118130
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
119131
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
120-
for (int i = 0; i < input.length; i++) {
121-
for (int j = 0; j < input[i].length; j++) {
122-
input[i][j] = (byte)zipf.nextInt();
123-
}
124-
}
125-
codecTest(codec, input);
132+
fill(rng, input);
133+
codecTest(conf, codec, input);
126134
}
127135

128136
/**
129137
* Test with a very large input (100MB) as a single input buffer.
130138
*/
131139
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
132-
RandomDistribution.DiscreteRNG zipf =
140+
RandomDistribution.DiscreteRNG rng =
133141
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
134142
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
135-
for (int i = 0; i < VERY_LARGE_SIZE; i++) {
136-
input[0][i] = (byte)zipf.nextInt();
143+
fill(rng, input);
144+
codecTest(conf, codec, input);
145+
}
146+
147+
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) {
148+
for (int i = 0; i < input.length; i++) {
149+
fill(rng, input[i]);
150+
}
151+
}
152+
153+
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) {
154+
for (int i = 0; i < input.length; i++) {
155+
input[i] = (byte) rng.nextInt();
137156
}
138-
codecTest(codec, input);
139157
}
140158

141159
}

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2223
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2324
import org.apache.hadoop.io.compress.Compressor;
2425
import org.apache.yetus.audience.InterfaceAudience;
@@ -31,7 +32,7 @@
3132
* Hadoop compressor glue for zstd-jni.
3233
*/
3334
@InterfaceAudience.Private
34-
public class ZstdCompressor implements Compressor {
35+
public class ZstdCompressor implements CanReinit, Compressor {
3536

3637
protected static final Logger LOG = LoggerFactory.getLogger(ZstdCompressor.class);
3738
protected int level, bufferSize;

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.io.compress.CanReinit;
2223
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
2324
import org.apache.hadoop.io.compress.Decompressor;
2425
import org.apache.yetus.audience.InterfaceAudience;
@@ -31,7 +32,7 @@
3132
* Hadoop decompressor glue for zstd-java.
3233
*/
3334
@InterfaceAudience.Private
34-
public class ZstdDecompressor implements Decompressor {
35+
public class ZstdDecompressor implements CanReinit, Decompressor {
3536

3637
protected static final Logger LOG = LoggerFactory.getLogger(ZstdDecompressor.class);
3738
protected ByteBuffer inBuf, outBuf;
@@ -151,6 +152,7 @@ public void setInput(final byte[] b, final int off, final int len) {
151152
finished = false;
152153
}
153154

155+
@Override
154156
public void reinit(final Configuration conf) {
155157
LOG.trace("reinit");
156158
if (conf != null) {

0 commit comments

Comments
 (0)