Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,42 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;

/**
* In-memory BlockCache that may be backed by secondary layer(s).
*/
@InterfaceAudience.Private
public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
public abstract class FirstLevelBlockCache implements ResizableBlockCache, HeapSize {

/* Statistics thread */
protected static String STAT_THREAD_ENABLE_KEY = "hbase.lru.stat.enable";
protected static boolean STAT_THREAD_ENABLE_DEFAULT = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said in #3956 , we'd better set it to true in this PR, and file another PR to change to true on master branch.

protected static final int STAT_THREAD_PERIOD = 60 * 5;

protected transient ScheduledExecutorService statsThreadPool;

FirstLevelBlockCache(boolean statEnabled) {
if (statEnabled) {
this.statsThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
this.statsThreadPool.scheduleAtFixedRate(new LruBlockCache.StatisticsThread(this),
STAT_THREAD_PERIOD, STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}
}

/**
* Whether the cache contains the block with specified cacheKey
*
* @param cacheKey cache key for the block
* @return true if it contains the block
*/
boolean containsBlock(BlockCacheKey cacheKey);
abstract boolean containsBlock(BlockCacheKey cacheKey);

/**
* Specifies the secondary cache. An entry that is evicted from this cache due to a size
Expand All @@ -41,5 +61,42 @@ public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
* @param victimCache the second level cache
* @throws IllegalArgumentException if the victim cache had already been set
*/
void setVictimCache(BlockCache victimCache);
abstract void setVictimCache(BlockCache victimCache);

public void shutdown() {
if (statsThreadPool != null) {
this.statsThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.statsThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class StatisticsThread extends Thread {

private final FirstLevelBlockCache l1;

public StatisticsThread(FirstLevelBlockCache l1) {
super("LruBlockCacheStats");
setDaemon(true);
this.l1 = l1;
}

@Override
public void run() {
l1.logStats();
}
}

protected abstract void logStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@
import java.lang.ref.WeakReference;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -46,7 +40,6 @@

import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* <b>This realisation improve performance of classical LRU
Expand Down Expand Up @@ -146,7 +139,7 @@
* Find more information about improvement: https://issues.apache.org/jira/browse/HBASE-23887
*/
@InterfaceAudience.Private
public class LruAdaptiveBlockCache implements FirstLevelBlockCache {
public class LruAdaptiveBlockCache extends FirstLevelBlockCache {

private static final Logger LOG = LoggerFactory.getLogger(LruAdaptiveBlockCache.class);

Expand Down Expand Up @@ -243,11 +236,6 @@ public class LruAdaptiveBlockCache implements FirstLevelBlockCache {
/** Eviction thread */
private transient final EvictionThread evictionThread;

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build());

/** Current size of cache */
private final AtomicLong size;

Expand Down Expand Up @@ -345,7 +333,8 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThrea
DEFAULT_MAX_BLOCK_SIZE,
DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT);
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
STAT_THREAD_ENABLE_DEFAULT);
}

public LruAdaptiveBlockCache(long maxSize, long blockSize,
Expand All @@ -368,7 +357,8 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize,
conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT),
conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT));
DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT),
conf.getBoolean(STAT_THREAD_ENABLE_KEY, STAT_THREAD_ENABLE_DEFAULT));
}

public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
Expand Down Expand Up @@ -397,12 +387,13 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
* @param heavyEvictionOverheadCoefficient how aggressive AdaptiveLRU will reduce GC
*/
public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread,
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
float minFactor, float acceptableFactor, float singleFactor,
float multiFactor, float memoryFactor, float hardLimitFactor,
boolean forceInMemory, long maxBlockSize,
int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
float heavyEvictionOverheadCoefficient) {
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
float minFactor, float acceptableFactor, float singleFactor,
float multiFactor, float memoryFactor, float hardLimitFactor,
boolean forceInMemory, long maxBlockSize,
int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
float heavyEvictionOverheadCoefficient, boolean statEnabled) {
super(statEnabled);
this.maxBlockSize = maxBlockSize;
if(singleFactor + multiFactor + memoryFactor != 1 ||
singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
Expand Down Expand Up @@ -446,11 +437,6 @@ public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThrea
heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f);
heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f);
this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient;

// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
// every five minutes.
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -1198,26 +1184,8 @@ boolean isEnteringRun() {
}
}

/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class StatisticsThread extends Thread {

private final LruAdaptiveBlockCache lru;

public StatisticsThread(LruAdaptiveBlockCache lru) {
super("LruAdaptiveBlockCacheStats");
setDaemon(true);
this.lru = lru;
}

@Override
public void run() {
lru.logStats();
}
}

public void logStats() {
@Override
protected void logStats() {
// Log size
long totalSize = heapSize();
long freeSize = maxSize - totalSize;
Expand Down Expand Up @@ -1375,26 +1343,10 @@ private long memorySize() {

@Override
public void shutdown() {
super.shutdown();
if (victimHandler != null) {
victimHandler.shutdown();
}
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}

if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);
}
this.evictionThread.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@
import java.lang.ref.WeakReference;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -45,7 +41,6 @@

import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an
Expand Down Expand Up @@ -85,7 +80,7 @@
* sizes and usage.
*/
@InterfaceAudience.Private
public class LruBlockCache implements FirstLevelBlockCache {
public class LruBlockCache extends FirstLevelBlockCache {

private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);

Expand Down Expand Up @@ -140,8 +135,6 @@ public class LruBlockCache implements FirstLevelBlockCache {

private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;

/* Statistics thread */
private static final int STAT_THREAD_PERIOD = 60 * 5;
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;

Expand All @@ -165,11 +158,6 @@ public class LruBlockCache implements FirstLevelBlockCache {
/** Eviction thread */
private transient final EvictionThread evictionThread;

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());

/** Current size of cache */
private final AtomicLong size;

Expand Down Expand Up @@ -252,7 +240,8 @@ public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
DEFAULT_MEMORY_FACTOR,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
false,
DEFAULT_MAX_BLOCK_SIZE);
DEFAULT_MAX_BLOCK_SIZE,
true);
}

public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
Expand All @@ -268,7 +257,8 @@ public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Confi
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE),
conf.getBoolean(STAT_THREAD_ENABLE_KEY, STAT_THREAD_ENABLE_DEFAULT));
}

public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
Expand All @@ -294,7 +284,8 @@ public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
float minFactor, float acceptableFactor, float singleFactor,
float multiFactor, float memoryFactor, float hardLimitFactor,
boolean forceInMemory, long maxBlockSize) {
boolean forceInMemory, long maxBlockSize, boolean statEnable) {
super(statEnable);
this.maxBlockSize = maxBlockSize;
if(singleFactor + multiFactor + memoryFactor != 1 ||
singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
Expand Down Expand Up @@ -330,10 +321,6 @@ public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
} else {
this.evictionThread = null;
}
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
// every five minutes.
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -974,26 +961,8 @@ boolean isEnteringRun() {
}
}

/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class StatisticsThread extends Thread {

private final LruBlockCache lru;

public StatisticsThread(LruBlockCache lru) {
super("LruBlockCacheStats");
setDaemon(true);
this.lru = lru;
}

@Override
public void run() {
lru.logStats();
}
}

public void logStats() {
@Override
protected void logStats() {
// Log size
long totalSize = heapSize();
long freeSize = maxSize - totalSize;
Expand Down Expand Up @@ -1151,26 +1120,10 @@ private long memorySize() {

@Override
public void shutdown() {
super.shutdown();
if (victimHandler != null) {
victimHandler.shutdown();
}
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}

if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);
}
this.evictionThread.shutdown();
}

Expand Down
Loading