diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 6efa9dbd2e9e..61ffe32e4930 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BufferedBucketCache; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -151,6 +152,11 @@ public class CacheConfig { private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE; + /** + * Parameter to turn on bucketcache ramBuffer. + */ + static final String RAM_BUFFER_ENABLE = "hbase.bucketcache.rambuffer.enable"; + static final boolean RAM_BUFFER_ENABLE_DEFAULT = false; /** * @deprecated use {@link CacheConfig#BLOCKCACHE_BLOCKSIZE_KEY} instead. @@ -738,9 +744,11 @@ private static BlockCache getBucketCache(Configuration c) { "hbase.bucketcache.ioengine.errors.tolerated.duration", BucketCache.DEFAULT_ERROR_TOLERATION_DURATION); // Bucket cache logs its stats on creation internal to the constructor. - bucketCache = new BucketCache(bucketCacheIOEngineName, - bucketCacheSize, blockSize, bucketSizes, writerThreads, writerQueueLen, persistentPath, - ioErrorsTolerationDuration, c); + bucketCache = c.getBoolean(RAM_BUFFER_ENABLE, RAM_BUFFER_ENABLE_DEFAULT) ? + new BufferedBucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize, bucketSizes, + writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c) : + new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize, bucketSizes, + writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c); } catch (IOException ioex) { LOG.error("Can't instantiate bucket cache", ioex); throw new RuntimeException(ioex); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java new file mode 100644 index 000000000000..bcc985038112 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.Cacheable; + +/** + * A {@link BucketCache} with RAMBuffer to reduce GC pressure. + */ +@InterfaceAudience.Private +public class BufferedBucketCache extends BucketCache { + private static final Log LOG = LogFactory.getLog(BufferedBucketCache.class); + + static final String RAM_BUFFER_SIZE_RATIO = "hbase.bucketcache.rambuffer.ratio"; + static final double RAM_BUFFER_SIZE_RATIO_DEFAULT = 0.1; + static final String RAM_BUFFER_TIMEOUT = "hbase.bucketcache.rambuffer.timeout"; // in seconds. + static final int RAM_BUFFER_TIMEOUT_DEFAULT = 60; + + private final Cache ramBuffer; + private final long maxBufferSize; + + private final AtomicLong ramBufferEvictCount = new AtomicLong(0); + + private volatile float ramBufferThreshold; + + private transient final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("RAMBufferAdjustExecutor").setDaemon(true).build()); + + public BufferedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, + Configuration conf) throws IOException { + super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, + persistencePath, ioErrorsTolerationDuration, conf); + + maxBufferSize = (long) ((capacity / (double) blockSize) * conf.getDouble(RAM_BUFFER_SIZE_RATIO, + RAM_BUFFER_SIZE_RATIO_DEFAULT)); + int timeout = conf.getInt(RAM_BUFFER_TIMEOUT, RAM_BUFFER_TIMEOUT_DEFAULT); + ramBuffer = CacheBuilder.newBuilder(). + expireAfterAccess(timeout, TimeUnit.SECONDS). + maximumSize(maxBufferSize). + removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification removalNotification) { + ramBufferEvictCount.incrementAndGet(); + } + }).build(); + + // Adjust the cache threshold every minute. + scheduleThreadPool.scheduleAtFixedRate( + new RAMBufferAdjustThread(this), 60, 60,TimeUnit.SECONDS); + } + + @Override + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { + Cacheable block = ramBuffer.getIfPresent(key); + if (block != null) { + if (updateCacheMetrics) { + this.getStats().hit(caching, key.isPrimary(), key.getBlockType()); + } + return block; + } + block = super.getBlock(key, caching, repeat, updateCacheMetrics); + if (block != null && ramBuffer.size() < maxBufferSize * ramBufferThreshold) { + ramBuffer.put(key, block); + } + return block; + } + + private void updateRAMBufferThreshold(final float newThreshold) { + this.ramBufferThreshold = Math.max(Math.min(newThreshold, 1.0f), 0.01f); + } + + static class RAMBufferAdjustThread extends Thread { + private final BufferedBucketCache bucketCache; + + RAMBufferAdjustThread(BufferedBucketCache bucketCache) { + this.bucketCache = bucketCache; + } + + @Override + public void run() { + long currentEvictCount = bucketCache.ramBufferEvictCount.get(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + LOG.info(e); + return; + } + long delta = (bucketCache.ramBufferEvictCount.get() - currentEvictCount) / 10; + if (delta > 100) { + bucketCache.updateRAMBufferThreshold((float) (bucketCache.ramBufferThreshold * 0.9)); + } else if (delta < 10) { + bucketCache.updateRAMBufferThreshold((float) (bucketCache.ramBufferThreshold * 1.1)); + } + } + } +}