Skip to content

Commit e8f9f96

Browse files
Switch back to Guava cache
1 parent 81585cf commit e8f9f96

7 files changed

Lines changed: 71 additions & 49 deletions

File tree

hbase-common/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@
110110
<groupId>org.apache.commons</groupId>
111111
<artifactId>commons-crypto</artifactId>
112112
</dependency>
113-
<dependency>
114-
<groupId>com.github.ben-manes.caffeine</groupId>
115-
<artifactId>caffeine</artifactId>
116-
</dependency>
117113
<dependency>
118114
<groupId>junit</groupId>
119115
<artifactId>junit</artifactId>

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ByteBuffDecompressionCodec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.compress;
1919

20+
import java.io.IOException;
2021
import org.apache.hadoop.conf.Configuration;
2122
import org.apache.yetus.audience.InterfaceAudience;
2223

@@ -27,7 +28,7 @@ public interface ByteBuffDecompressionCodec {
2728

2829
ByteBuffDecompressor createByteBuffDecompressor();
2930

30-
Compression.HFileDecompressionContext
31-
getDecompressionContextFromConfiguration(Configuration conf);
31+
Compression.HFileDecompressionContext getDecompressionContextFromConfiguration(Configuration conf)
32+
throws IOException;
3233

3334
}

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CodecPool.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.compress;
1919

20-
import com.github.benmanes.caffeine.cache.Caffeine;
21-
import com.github.benmanes.caffeine.cache.LoadingCache;
2220
import edu.umd.cs.findbugs.annotations.Nullable;
2321
import java.util.Comparator;
2422
import java.util.NavigableSet;
@@ -37,6 +35,10 @@
3735
import org.slf4j.Logger;
3836
import org.slf4j.LoggerFactory;
3937

38+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
39+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
40+
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
41+
4042
/**
4143
* A global compressor/decompressor pool used to save and reuse (possibly native)
4244
* compression/decompression codecs. Copied from the class of the same name in hadoop-common and
@@ -56,7 +58,12 @@ public class CodecPool {
5658
NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>();
5759

5860
private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() {
59-
return Caffeine.newBuilder().build(key -> new AtomicInteger());
61+
return CacheBuilder.newBuilder().build(new CacheLoader<>() {
62+
@Override
63+
public AtomicInteger load(Class<T> key) throws Exception {
64+
return new AtomicInteger();
65+
}
66+
});
6067
}
6168

6269
/**
@@ -108,26 +115,19 @@ private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool
108115
/**
109116
* Copied from hadoop-common without significant modification.
110117
*/
111-
@SuppressWarnings("unchecked")
112-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
113-
value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
114-
justification = "LoadingCache will compute value if absent")
115118
private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
116119
Class<? extends T> codecClass) {
117-
return usageCounts.get((Class<T>) codecClass).get();
120+
return usageCounts.getUnchecked((Class<T>) codecClass).get();
118121
}
119122

120123
/**
121124
* Copied from hadoop-common without significant modification.
122125
*/
123-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
124-
value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
125-
justification = "LoadingCache will compute value if absent")
126126
private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
127127
T codec, int delta) {
128128
if (codec != null && usageCounts != null) {
129129
Class<T> codecClass = ReflectionUtils.getClass(codec);
130-
usageCounts.get(codecClass).addAndGet(delta);
130+
usageCounts.getUnchecked(codecClass).addAndGet(delta);
131131
}
132132
}
133133

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ public void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) {
561561
*/
562562
@Nullable
563563
public HFileDecompressionContext
564-
getHFileDecompressionContextForConfiguration(Configuration conf) {
564+
getHFileDecompressionContextForConfiguration(Configuration conf) throws IOException {
565565
if (supportsByteBuffDecompression()) {
566566
return ((ByteBuffDecompressionCodec) getCodec(conf))
567567
.getDecompressionContextFromConfiguration(conf);

hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.compress;
1919

20-
import com.github.benmanes.caffeine.cache.Cache;
21-
import com.github.benmanes.caffeine.cache.Caffeine;
2220
import java.io.ByteArrayOutputStream;
2321
import java.io.FileNotFoundException;
2422
import java.io.IOException;
2523
import java.io.InputStream;
24+
import java.util.concurrent.ExecutionException;
2625
import java.util.concurrent.TimeUnit;
2726
import org.apache.hadoop.conf.Configuration;
2827
import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,6 +31,10 @@
3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
3433

34+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
35+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
36+
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
37+
3538
/**
3639
* A utility class for managing compressor/decompressor dictionary loading and caching of load
3740
* results. Useful for any codec that can support changing dictionaries at runtime, such as
@@ -45,8 +48,7 @@ public final class DictionaryCache {
4548
public static final String RESOURCE_SCHEME = "resource://";
4649

4750
private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
48-
private static final Cache<String, byte[]> BYTE_ARRAY_CACHE =
49-
Caffeine.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES).build();
51+
private static volatile LoadingCache<String, byte[]> CACHE;
5052

5153
private DictionaryCache() {
5254
}
@@ -62,22 +64,35 @@ public static byte[] getDictionary(final Configuration conf, final String path)
6264
return null;
6365
}
6466

65-
// Get or load the dictionary for the given path
66-
return BYTE_ARRAY_CACHE.get(path, s -> {
67-
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
68-
byte[] bytes;
69-
try {
70-
if (path.startsWith(RESOURCE_SCHEME)) {
71-
bytes = loadFromResource(conf, path, maxSize);
72-
} else {
73-
bytes = loadFromHadoopFs(conf, path, maxSize);
67+
// Create the dictionary loading cache if we haven't already
68+
if (CACHE == null) {
69+
synchronized (DictionaryCache.class) {
70+
if (CACHE == null) {
71+
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
72+
CACHE = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES)
73+
.build(new CacheLoader<String, byte[]>() {
74+
@Override
75+
public byte[] load(String s) throws Exception {
76+
byte[] bytes;
77+
if (path.startsWith(RESOURCE_SCHEME)) {
78+
bytes = loadFromResource(conf, path, maxSize);
79+
} else {
80+
bytes = loadFromHadoopFs(conf, path, maxSize);
81+
}
82+
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
83+
return bytes;
84+
}
85+
});
7486
}
75-
} catch (IOException e) {
76-
throw new RuntimeException(e);
7787
}
78-
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
79-
return bytes;
80-
});
88+
}
89+
90+
// Get or load the dictionary for the given path
91+
try {
92+
return CACHE.get(path);
93+
} catch (ExecutionException e) {
94+
throw new RuntimeException("Unable to load dictionary at " + path, e);
95+
}
8196
}
8297

8398
// Visible for testing
@@ -86,7 +101,7 @@ public static byte[] loadFromResource(final Configuration conf, final String s,
86101
if (!s.startsWith(RESOURCE_SCHEME)) {
87102
throw new IOException("Path does not start with " + RESOURCE_SCHEME);
88103
}
89-
final String path = s.substring(RESOURCE_SCHEME.length());
104+
final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
90105
LOG.info("Loading resource {}", path);
91106
final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
92107
if (in == null) {
@@ -140,7 +155,10 @@ private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
140155

141156
// Visible for testing
142157
public static boolean contains(String dictionaryPath) {
143-
return BYTE_ARRAY_CACHE.asMap().containsKey(dictionaryPath);
158+
if (CACHE != null) {
159+
return CACHE.asMap().containsKey(dictionaryPath);
160+
}
161+
return false;
144162
}
145163

146164
}

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.compress.zstd;
1919

20-
import com.github.benmanes.caffeine.cache.Cache;
21-
import com.github.benmanes.caffeine.cache.Caffeine;
2220
import com.github.luben.zstd.Zstd;
2321
import com.github.luben.zstd.ZstdDictDecompress;
2422
import edu.umd.cs.findbugs.annotations.Nullable;
@@ -27,6 +25,7 @@
2725
import java.io.OutputStream;
2826
import java.nio.ByteBuffer;
2927
import java.nio.ByteOrder;
28+
import java.util.concurrent.ExecutionException;
3029
import java.util.concurrent.TimeUnit;
3130
import org.apache.hadoop.conf.Configurable;
3231
import org.apache.hadoop.conf.Configuration;
@@ -45,6 +44,9 @@
4544
import org.apache.hadoop.io.compress.Decompressor;
4645
import org.apache.yetus.audience.InterfaceAudience;
4746

47+
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
48+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
49+
4850
/**
4951
* Hadoop ZStandard codec implemented with zstd-jni.
5052
* <p>
@@ -59,7 +61,7 @@ public class ZstdCodec implements Configurable, CompressionCodec, ByteBuffDecomp
5961
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
6062

6163
private static final Cache<String, Pair<ZstdDictDecompress, Integer>> DECOMPRESS_DICT_CACHE =
62-
Caffeine.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES).build();
64+
CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES).build();
6365

6466
private Configuration conf;
6567
private int bufferSize;
@@ -137,7 +139,7 @@ public Class<? extends ByteBuffDecompressor> getByteBuffDecompressorType() {
137139

138140
@Override
139141
public Compression.HFileDecompressionContext
140-
getDecompressionContextFromConfiguration(Configuration conf) {
142+
getDecompressionContextFromConfiguration(Configuration conf) throws IOException {
141143
return ZstdHFileDecompressionContext.fromConfiguration(conf);
142144
}
143145

@@ -177,11 +179,15 @@ static Pair<ZstdDictDecompress, Integer> getDecompressDictionary(final Configura
177179
return null;
178180
}
179181

180-
return DECOMPRESS_DICT_CACHE.get(path, (s) -> {
181-
byte[] dictBytes = DictionaryCache.getDictionary(conf, path);
182-
int dictId = getDictionaryId(dictBytes);
183-
return new Pair<>(new ZstdDictDecompress(dictBytes), dictId);
184-
});
182+
try {
183+
return DECOMPRESS_DICT_CACHE.get(path, () -> {
184+
byte[] dictBytes = DictionaryCache.getDictionary(conf, path);
185+
int dictId = getDictionaryId(dictBytes);
186+
return new Pair<>(new ZstdDictDecompress(dictBytes), dictId);
187+
});
188+
} catch (ExecutionException e) {
189+
throw new RuntimeException("Unable to load ZSTD dictionary", e);
190+
}
185191
}
186192

187193
// Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian

hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdHFileDecompressionContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public boolean isAllowByteBuffDecompression() {
6363
return allowByteBuffDecompression;
6464
}
6565

66-
public static ZstdHFileDecompressionContext fromConfiguration(Configuration conf) {
66+
public static ZstdHFileDecompressionContext fromConfiguration(Configuration conf)
67+
throws IOException {
6768
boolean allowByteBuffDecompression =
6869
conf.getBoolean("hbase.io.compress.zstd.allowByteBuffDecompression", true);
6970
Pair<ZstdDictDecompress, Integer> dictAndId = ZstdCodec.getDecompressDictionary(conf);

0 commit comments

Comments
 (0)