Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ message BucketCacheEntry {
map<int32, string> deserializers = 4;
required BackingMap backing_map = 5;
optional bytes checksum = 6;
map<string, bool> prefetched_files = 7;
}

message BackingMap {
Expand Down Expand Up @@ -71,6 +72,7 @@ message BucketEntry {
required int64 access_counter = 3;
required int32 deserialiser_index = 4;
required BlockPriority priority = 5;
required int64 cachedTime = 6;
}

enum BlockPriority {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ public class CacheConfig {
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";

public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";

/**
* Configuration key to set interval for persisting bucket cache to disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,6 +78,14 @@ public void run() {
block.release();
}
}
cacheConf.getBlockCache().ifPresent(bc -> {
if (bc instanceof CombinedBlockCache) {
BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache();
if (l2 instanceof BucketCache) {
((BucketCache) l2).fileCacheCompleted(path.getName());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, persistent cache is only for L2 cache. I think, we need to get the instance of L2 cache only if the blockcache is an instance of CombinedBlockCache.

});
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
Expand All @@ -42,25 +37,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;

@InterfaceAudience.Private
public final class PrefetchExecutor {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Set of files for which prefetch is completed */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
static String prefetchedFileListPath;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
Expand Down Expand Up @@ -90,13 +79,6 @@ public Thread newThread(Runnable r) {
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");

public static void request(Path path, Runnable runnable) {
if (prefetchCompleted != null) {
if (isFilePrefetched(path.getName())) {
LOG.info(
"File has already been prefetched before the restart, so skipping prefetch : " + path);
return;
}
}
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
Expand All @@ -122,8 +104,9 @@ public static void request(Path path, Runnable runnable) {

public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
LOG.debug("Prefetch completed for {}", path.getName());
if (LOG.isDebugEnabled()) {
LOG.debug("Prefetch completed for {}", path.getName());
}
}

public static void cancel(Path path) {
Expand All @@ -134,8 +117,6 @@ public static void cancel(Path path) {
prefetchFutures.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName());
removePrefetchedFileWhileEvict(path.getName());
}

public static boolean isCompleted(Path path) {
Expand All @@ -146,70 +127,6 @@ public static boolean isCompleted(Path path) {
return true;
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
justification = "false positive, try-with-resources ensures close is called.")
public static void persistToFile(String path) throws IOException {
prefetchedFileListPath = path;
if (prefetchedFileListPath == null) {
LOG.info("Exception while persisting prefetch!");
throw new IOException("Error persisting prefetched HFiles set!");
}
if (!prefetchCompleted.isEmpty()) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
}
}
}

public static void retrieveFromFile(String path) throws IOException {
prefetchedFileListPath = path;
File prefetchPersistenceFile = new File(prefetchedFileListPath);
if (!prefetchPersistenceFile.exists()) {
LOG.warn("Prefetch persistence file does not exist!");
return;
}
LOG.info("Retrieving from prefetch persistence file " + path);
assert (prefetchedFileListPath != null);
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
PersistentPrefetchProtos.PrefetchedHfileName proto =
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(protoPrefetchedFilesMap);
}
}

private static FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
private File myFile;

private FileInputStream init(File file) {
myFile = file;
return this;
}

@Override
public void close() throws IOException {
if (myFile == null) {
return;
}

super.close();
if (!myFile.delete()) {
throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
}
myFile = null;
}
}.init(file);
}

public static void removePrefetchedFileWhileEvict(String hfileName) {
prefetchCompleted.remove(hfileName);
}

public static boolean isFilePrefetched(String hfileName) {
return prefetchCompleted.containsKey(hfileName);
}

private PrefetchExecutor() {
}
}
Loading