Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -77,17 +79,19 @@ public class SnapshotFileCache implements Stoppable {
interface SnapshotFileInspector {
/**
* Returns a collection of file names needed by the snapshot.
* @param fs {@link FileSystem} where snapshot mainifest files are stored
* @param snapshotDir {@link Path} to the snapshot directory to scan.
* @return the collection of file names needed by the snapshot.
*/
Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException;
Collection<String> filesUnderSnapshot(final FileSystem fs, final Path snapshotDir)
throws IOException;
}

private static final Logger LOG = LoggerFactory.getLogger(SnapshotFileCache.class);
private volatile boolean stop = false;
private final FileSystem fs;
private final FileSystem fs, workingFs;
private final SnapshotFileInspector fileInspector;
private final Path snapshotDir;
private final Path snapshotDir, workingSnapshotDir;
private final Set<String> cache = new HashSet<>();
/**
* This is a helper map of information about the snapshot directories so we don't need to rescan
Expand All @@ -104,30 +108,38 @@ interface SnapshotFileInspector {
* @param conf to extract the configured {@link FileSystem} where the snapshots are stored and
* hbase root directory
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
* @throws IOException if the {@link FileSystem} or root directory cannot be loaded
*/
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf), 0,
cacheRefreshPeriod, refreshThreadName, inspectSnapshotFiles);
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, long cacheRefreshDelay,
String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(CommonFSUtils.getCurrentFileSystem(conf), CommonFSUtils.getRootDir(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf).
getFileSystem(conf),
SnapshotDescriptionUtils.getWorkingSnapshotDir(CommonFSUtils.getRootDir(conf), conf),
cacheRefreshPeriod, cacheRefreshDelay, refreshThreadName, inspectSnapshotFiles);
}

/**
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
* filesystem
* @param fs {@link FileSystem} where the snapshots are stored
* @param rootDir hbase root directory
* @param workingFs {@link FileSystem} where ongoing snapshot mainifest files are stored
* @param workingDir Location to store ongoing snapshot manifest files
* @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
* @param refreshThreadName name of the cache refresh thread
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
*/
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
public SnapshotFileCache(FileSystem fs, Path rootDir, FileSystem workingFs, Path workingDir,
long cacheRefreshPeriod, long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
this.fs = fs;
this.workingFs = workingFs;
this.workingSnapshotDir = workingDir;
this.fileInspector = inspectSnapshotFiles;
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
// periodically refresh the file cache to make sure we aren't superfluously saving files.
Expand Down Expand Up @@ -176,6 +188,7 @@ public synchronized void triggerCacheRefreshForTesting() {
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager) throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList();
List<String> snapshotsInProgress = null;
boolean refreshed = false;
Lock lock = null;
if (snapshotManager != null) {
Expand All @@ -197,6 +210,12 @@ public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatu
if (cache.contains(fileName)) {
continue;
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file);
}
} finally {
Expand Down Expand Up @@ -239,7 +258,8 @@ private void refreshCache() throws IOException {
// that new snapshot, even though it has the same name as the files referenced have
// probably changed.
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(fs,
snapshotDir.getPath());
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
}
// add all the files to cache
Expand All @@ -251,6 +271,26 @@ private void refreshCache() throws IOException {
this.snapshots.putAll(newSnapshots);
}

@VisibleForTesting
List<String> getSnapshotsInProgress() throws IOException {
List<String> snapshotInProgress = Lists.newArrayList();
// only add those files to the cache, but not to the known snapshots

FileStatus[] snapshotsInProgress = CommonFSUtils.listStatus(this.workingFs, this.workingSnapshotDir);

if (!ArrayUtils.isEmpty(snapshotsInProgress)) {
for (FileStatus snapshot : snapshotsInProgress) {
try {
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(workingFs,
snapshot.getPath()));
} catch (CorruptedSnapshotException cse) {
LOG.info("Corrupted in-progress snapshot file exception, ignored.", cse);
}
}
}
return snapshotInProgress;
}

/**
* Simple helper task that just periodically attempts to refresh the cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -93,10 +94,15 @@ public void setConf(final Configuration conf) {
DEFAULT_HFILE_CACHE_REFRESH_PERIOD);
final FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, conf);
FileSystem workingFs = workingDir.getFileSystem(conf);

cache = new SnapshotFileCache(fs, rootDir, workingFs, workingDir, cacheRefreshPeriod,
cacheRefreshPeriod, "snapshot-hfile-cleaner-cache-refresher",
new SnapshotFileCache.SnapshotFileInspector() {
@Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
public Collection<String> filesUnderSnapshot(final FileSystem fs,
final Path snapshotDir)
throws IOException {
return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ public void process() {
verifier.verifySnapshot(this.workingDir, serverNames);

// complete the snapshot, atomically moving from tmp to .snapshot dir.
completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs, this.workingDirFs);
SnapshotDescriptionUtils.completeSnapshot(this.snapshotDir, this.workingDir, this.rootFs,
this.workingDirFs, this.conf);
finished = true;
msg = "Snapshot " + snapshot.getName() + " of table " + snapshotTable + " completed";
status.markComplete(msg);
LOG.info(msg);
Expand Down Expand Up @@ -258,42 +260,6 @@ public void process() {
}
}

/**
* Reset the manager to allow another snapshot to proceed.
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs) throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, this.conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
finished = true;
}

/**
* When taking snapshot, first we must acquire the exclusive table lock to confirm that there are
* no ongoing merge/split procedures. But later, we should try our best to release the exclusive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.hadoop.hbase.snapshot;

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -383,25 +385,38 @@ public static SnapshotDescription readSnapshotInfo(FileSystem fs, Path snapshotD
}

/**
* Move the finished snapshot to its final, publicly visible directory - this marks the snapshot
* as 'complete'.
* @param snapshot description of the snapshot being tabken
* @param rootdir root directory of the hbase installation
* @param workingDir directory where the in progress snapshot was built
* @param fs {@link FileSystem} where the snapshot was built
* @throws org.apache.hadoop.hbase.snapshot.SnapshotCreationException if the
* snapshot could not be moved
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
* @param conf Configuration
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public static void completeSnapshot(SnapshotDescription snapshot, Path rootdir, Path workingDir,
FileSystem fs) throws SnapshotCreationException, IOException {
Path finishedDir = getCompletedSnapshotDir(snapshot, rootdir);
LOG.debug("Snapshot is done, just moving the snapshot from " + workingDir + " to "
+ finishedDir);
if (!fs.rename(workingDir, finishedDir)) {
throw new SnapshotCreationException(
"Failed to move working directory(" + workingDir + ") to completed directory("
+ finishedDir + ").", ProtobufUtil.createSnapshotDesc(snapshot));
public static void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs, final Configuration conf)
throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
}

Expand Down
Loading