Skip to content

Commit 7c58e79

Browse files
committed
Spread to all LruBC
1 parent 3787212 commit 7c58e79

6 files changed

Lines changed: 115 additions & 154 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,42 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20+
import java.util.concurrent.Executors;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.TimeUnit;
2023
import org.apache.hadoop.hbase.io.HeapSize;
24+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
2125
import org.apache.yetus.audience.InterfaceAudience;
2226

2327
/**
2428
* In-memory BlockCache that may be backed by secondary layer(s).
2529
*/
2630
@InterfaceAudience.Private
27-
public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
31+
public abstract class FirstLevelBlockCache implements ResizableBlockCache, HeapSize {
32+
33+
/* Statistics thread */
34+
protected static String STAT_THREAD_ENABLE_KEY = "hbase.lru.stat.enable";
35+
protected static boolean STAT_THREAD_ENABLE_DEFAULT = false;
36+
protected static final int STAT_THREAD_PERIOD = 60 * 5;
37+
38+
protected transient ScheduledExecutorService statsThreadPool;
39+
40+
FirstLevelBlockCache(boolean statEnabled) {
41+
if (statEnabled) {
42+
this.statsThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
43+
.setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
44+
this.statsThreadPool.scheduleAtFixedRate(new LruBlockCache.StatisticsThread(this),
45+
STAT_THREAD_PERIOD, STAT_THREAD_PERIOD, TimeUnit.SECONDS);
46+
}
47+
}
2848

2949
/**
3050
* Whether the cache contains the block with specified cacheKey
3151
*
3252
* @param cacheKey cache key for the block
3353
* @return true if it contains the block
3454
*/
35-
boolean containsBlock(BlockCacheKey cacheKey);
55+
abstract boolean containsBlock(BlockCacheKey cacheKey);
3656

3757
/**
3858
* Specifies the secondary cache. An entry that is evicted from this cache due to a size
@@ -41,5 +61,42 @@ public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
4161
* @param victimCache the second level cache
4262
* @throws IllegalArgumentException if the victim cache had already been set
4363
*/
44-
void setVictimCache(BlockCache victimCache);
64+
abstract void setVictimCache(BlockCache victimCache);
65+
66+
public void shutdown() {
67+
if (statsThreadPool != null) {
68+
this.statsThreadPool.shutdown();
69+
for (int i = 0; i < 10; i++) {
70+
if (!this.statsThreadPool.isShutdown()) {
71+
try {
72+
Thread.sleep(10);
73+
} catch (InterruptedException e) {
74+
Thread.currentThread().interrupt();
75+
break;
76+
}
77+
}
78+
}
79+
}
80+
}
81+
82+
/*
83+
* Statistics thread. Periodically prints the cache statistics to the log.
84+
*/
85+
static class StatisticsThread extends Thread {
86+
87+
private final FirstLevelBlockCache l1;
88+
89+
public StatisticsThread(FirstLevelBlockCache l1) {
90+
super("LruBlockCacheStats");
91+
setDaemon(true);
92+
this.l1 = l1;
93+
}
94+
95+
@Override
96+
public void run() {
97+
l1.logStats();
98+
}
99+
}
100+
101+
protected abstract void logStats();
45102
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java

Lines changed: 15 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,9 @@
2222
import java.lang.ref.WeakReference;
2323
import java.util.EnumMap;
2424
import java.util.Iterator;
25-
import java.util.List;
2625
import java.util.Map;
2726
import java.util.PriorityQueue;
28-
import java.util.SortedSet;
29-
import java.util.TreeSet;
3027
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.Executors;
32-
import java.util.concurrent.ScheduledExecutorService;
33-
import java.util.concurrent.TimeUnit;
3428
import java.util.concurrent.atomic.AtomicLong;
3529
import java.util.concurrent.atomic.LongAdder;
3630
import java.util.concurrent.locks.ReentrantLock;
@@ -46,7 +40,6 @@
4640

4741
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
4842
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
49-
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
5043

5144
/**
5245
* <b>This realisation improve performance of classical LRU
@@ -146,7 +139,7 @@
146139
* Find more information about improvement: https://issues.apache.org/jira/browse/HBASE-23887
147140
*/
148141
@InterfaceAudience.Private
149-
public class LruAdaptiveBlockCache implements FirstLevelBlockCache {
142+
public class LruAdaptiveBlockCache extends FirstLevelBlockCache {
150143

151144
private static final Logger LOG = LoggerFactory.getLogger(LruAdaptiveBlockCache.class);
152145

@@ -243,11 +236,6 @@ public class LruAdaptiveBlockCache implements FirstLevelBlockCache {
243236
/** Eviction thread */
244237
private transient final EvictionThread evictionThread;
245238

246-
/** Statistics thread schedule pool (for heavy debugging, could remove) */
247-
private transient final ScheduledExecutorService scheduleThreadPool =
248-
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
249-
.setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build());
250-
251239
/** Current size of cache */
252240
private final AtomicLong size;
253241

@@ -345,7 +333,8 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThrea
345333
DEFAULT_MAX_BLOCK_SIZE,
346334
DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
347335
DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
348-
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT);
336+
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
337+
STAT_THREAD_ENABLE_DEFAULT);
349338
}
350339

351340
public LruAdaptiveBlockCache(long maxSize, long blockSize,
@@ -368,7 +357,8 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize,
368357
conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
369358
DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT),
370359
conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
371-
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT));
360+
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT),
361+
conf.getBoolean(STAT_THREAD_ENABLE_KEY, STAT_THREAD_ENABLE_DEFAULT));
372362
}
373363

374364
public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
@@ -397,12 +387,13 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
397387
* @param heavyEvictionOverheadCoefficient how aggressive AdaptiveLRU will reduce GC
398388
*/
399389
public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread,
400-
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
401-
float minFactor, float acceptableFactor, float singleFactor,
402-
float multiFactor, float memoryFactor, float hardLimitFactor,
403-
boolean forceInMemory, long maxBlockSize,
404-
int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
405-
float heavyEvictionOverheadCoefficient) {
390+
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
391+
float minFactor, float acceptableFactor, float singleFactor,
392+
float multiFactor, float memoryFactor, float hardLimitFactor,
393+
boolean forceInMemory, long maxBlockSize,
394+
int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
395+
float heavyEvictionOverheadCoefficient, boolean statEnabled) {
396+
super(statEnabled);
406397
this.maxBlockSize = maxBlockSize;
407398
if(singleFactor + multiFactor + memoryFactor != 1 ||
408399
singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
@@ -446,11 +437,6 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThrea
446437
heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f);
447438
heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f);
448439
this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient;
449-
450-
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
451-
// every five minutes.
452-
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
453-
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
454440
}
455441

456442
@Override
@@ -1198,26 +1184,8 @@ boolean isEnteringRun() {
11981184
}
11991185
}
12001186

1201-
/*
1202-
* Statistics thread. Periodically prints the cache statistics to the log.
1203-
*/
1204-
static class StatisticsThread extends Thread {
1205-
1206-
private final LruAdaptiveBlockCache lru;
1207-
1208-
public StatisticsThread(LruAdaptiveBlockCache lru) {
1209-
super("LruAdaptiveBlockCacheStats");
1210-
setDaemon(true);
1211-
this.lru = lru;
1212-
}
1213-
1214-
@Override
1215-
public void run() {
1216-
lru.logStats();
1217-
}
1218-
}
1219-
1220-
public void logStats() {
1187+
@Override
1188+
protected void logStats() {
12211189
// Log size
12221190
long totalSize = heapSize();
12231191
long freeSize = maxSize - totalSize;
@@ -1375,26 +1343,10 @@ private long memorySize() {
13751343

13761344
@Override
13771345
public void shutdown() {
1346+
super.shutdown();
13781347
if (victimHandler != null) {
13791348
victimHandler.shutdown();
13801349
}
1381-
this.scheduleThreadPool.shutdown();
1382-
for (int i = 0; i < 10; i++) {
1383-
if (!this.scheduleThreadPool.isShutdown()) {
1384-
try {
1385-
Thread.sleep(10);
1386-
} catch (InterruptedException e) {
1387-
LOG.warn("Interrupted while sleeping");
1388-
Thread.currentThread().interrupt();
1389-
break;
1390-
}
1391-
}
1392-
}
1393-
1394-
if (!this.scheduleThreadPool.isShutdown()) {
1395-
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1396-
LOG.debug("Still running " + runnables);
1397-
}
13981350
this.evictionThread.shutdown();
13991351
}
14001352

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,11 @@
2222
import java.lang.ref.WeakReference;
2323
import java.util.EnumMap;
2424
import java.util.Iterator;
25-
import java.util.List;
2625
import java.util.Map;
2726
import java.util.PriorityQueue;
2827
import java.util.SortedSet;
2928
import java.util.TreeSet;
3029
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.Executors;
32-
import java.util.concurrent.ScheduledExecutorService;
33-
import java.util.concurrent.TimeUnit;
3430
import java.util.concurrent.atomic.AtomicLong;
3531
import java.util.concurrent.atomic.LongAdder;
3632
import java.util.concurrent.locks.ReentrantLock;
@@ -45,7 +41,6 @@
4541

4642
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
4743
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
48-
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4944

5045
/**
5146
* A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an
@@ -85,7 +80,7 @@
8580
* sizes and usage.
8681
*/
8782
@InterfaceAudience.Private
88-
public class LruBlockCache implements FirstLevelBlockCache {
83+
public class LruBlockCache extends FirstLevelBlockCache {
8984

9085
private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
9186

@@ -140,10 +135,6 @@ public class LruBlockCache implements FirstLevelBlockCache {
140135

141136
private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
142137

143-
/* Statistics thread */
144-
private static final String STAT_THREAD_ENABLE_KEY = "hbase.lru.stat.enable";
145-
private static final boolean STAT_THREAD_ENABLE_DEFAULT = false;
146-
private static final int STAT_THREAD_PERIOD = 60 * 5;
147138
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
148139
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
149140

@@ -188,9 +179,6 @@ public class LruBlockCache implements FirstLevelBlockCache {
188179
/** Cache statistics */
189180
private final CacheStats stats;
190181

191-
/** Statistics thread schedule pool (for heavy debugging, could remove) */
192-
private transient ScheduledExecutorService scheduleThreadPool;
193-
194182
/** Maximum allowable size of cache (block put if size > max, evict) */
195183
private long maxSize;
196184

@@ -297,6 +285,7 @@ public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
297285
float minFactor, float acceptableFactor, float singleFactor,
298286
float multiFactor, float memoryFactor, float hardLimitFactor,
299287
boolean forceInMemory, long maxBlockSize, boolean statEnable) {
288+
super(statEnable);
300289
this.maxBlockSize = maxBlockSize;
301290
if(singleFactor + multiFactor + memoryFactor != 1 ||
302291
singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
@@ -332,13 +321,6 @@ public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
332321
} else {
333322
this.evictionThread = null;
334323
}
335-
336-
if (statEnable) {
337-
this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
338-
.setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
339-
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
340-
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
341-
}
342324
}
343325

344326
@Override
@@ -979,26 +961,8 @@ boolean isEnteringRun() {
979961
}
980962
}
981963

982-
/*
983-
* Statistics thread. Periodically prints the cache statistics to the log.
984-
*/
985-
static class StatisticsThread extends Thread {
986-
987-
private final LruBlockCache lru;
988-
989-
public StatisticsThread(LruBlockCache lru) {
990-
super("LruBlockCacheStats");
991-
setDaemon(true);
992-
this.lru = lru;
993-
}
994-
995-
@Override
996-
public void run() {
997-
lru.logStats();
998-
}
999-
}
1000-
1001-
public void logStats() {
964+
@Override
965+
protected void logStats() {
1002966
// Log size
1003967
long totalSize = heapSize();
1004968
long freeSize = maxSize - totalSize;
@@ -1156,27 +1120,10 @@ private long memorySize() {
11561120

11571121
@Override
11581122
public void shutdown() {
1123+
super.shutdown();
11591124
if (victimHandler != null) {
11601125
victimHandler.shutdown();
11611126
}
1162-
if (this.scheduleThreadPool != null) {
1163-
this.scheduleThreadPool.shutdown();
1164-
for (int i = 0; i < 10; i++) {
1165-
if (!this.scheduleThreadPool.isShutdown()) {
1166-
try {
1167-
Thread.sleep(10);
1168-
} catch (InterruptedException e) {
1169-
LOG.warn("Interrupted while sleeping");
1170-
Thread.currentThread().interrupt();
1171-
break;
1172-
}
1173-
}
1174-
}
1175-
if (!this.scheduleThreadPool.isShutdown()) {
1176-
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1177-
LOG.debug("Still running " + runnables);
1178-
}
1179-
}
11801127
this.evictionThread.shutdown();
11811128
}
11821129

0 commit comments

Comments
 (0)