Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -176,6 +178,7 @@ public synchronized void triggerCacheRefreshForTesting() {
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager) throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList();
List<String> snapshotsInProgress = null;
boolean refreshed = false;
Lock lock = null;
if (snapshotManager != null) {
Expand All @@ -197,6 +200,12 @@ public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatu
if (cache.contains(fileName)) {
continue;
}
if (snapshotsInProgress == null) {
snapshotsInProgress = getSnapshotsInProgress();
}
if (snapshotsInProgress.contains(fileName)) {
continue;
}
unReferencedFiles.add(file);
}
} finally {
Expand Down Expand Up @@ -251,6 +260,24 @@ private void refreshCache() throws IOException {
this.snapshots.putAll(newSnapshots);
}

@VisibleForTesting
List<String> getSnapshotsInProgress() throws IOException {
List<String> snapshotInProgress = Lists.newArrayList();
// only add those files to the cache, but not to the known snapshots
Path snapshotTmpDir = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
FileStatus[] running = FSUtils.listStatus(fs, snapshotTmpDir);
if (running != null) {
for (FileStatus run : running) {
try {
snapshotInProgress.addAll(fileInspector.filesUnderSnapshot(run.getPath()));
} catch (CorruptedSnapshotException cse) {
LOG.debug("Corrupted in-progress snapshot file exception, ignored.", cse);
}
}
}
return snapshotInProgress;
}

/**
* Simple helper task that just periodically attempts to refresh the cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@
*/
package org.apache.hadoop.hbase.master.snapshot;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
Expand All @@ -46,6 +51,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;

/**
* Test that we correctly reload the cache, filter directories, etc.
*/
Expand Down Expand Up @@ -89,8 +99,10 @@ public void testLoadAndDelete() throws IOException {
"test-snapshot-file-cache-refresh", new SnapshotFiles());

createAndTestSnapshotV1(cache, "snapshot1a", false, true, false);
createAndTestSnapshotV1(cache, "snapshot1b", true, true, false);

createAndTestSnapshotV2(cache, "snapshot2a", false, true, false);
createAndTestSnapshotV2(cache, "snapshot1a", true, true, false);
}

@Test
Expand Down Expand Up @@ -133,6 +145,71 @@ public void testCacheUpdatedWhenLastModifiedOfSnapDirNotUpdated() throws IOExcep
createAndTestSnapshotV2(cache, "snapshot2v2", true, false, true);
}

@Test
public void testWeNeverCacheTmpDirAndLoadIt() throws Exception {

final AtomicInteger count = new AtomicInteger(0);
// don't refresh the cache unless we tell it to
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles()) {
@Override
List<String> getSnapshotsInProgress()
throws IOException {
List<String> result = super.getSnapshotsInProgress();
count.incrementAndGet();
return result;
}

@Override public void triggerCacheRefreshForTesting() {
super.triggerCacheRefreshForTesting();
}
};

SnapshotMock.SnapshotBuilder complete =
createAndTestSnapshotV1(cache, "snapshot", false, false, false);

int countBeforeCheck = count.get();

FSUtils.logFileSystemState(fs, rootDir, LOG);

List<FileStatus> allStoreFiles = getStoreFilesForSnapshot(complete);
Iterable<FileStatus> deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertTrue(Iterables.isEmpty(deletableFiles));
// no need for tmp dir check as all files are accounted for.
assertEquals(0, count.get() - countBeforeCheck);

// add a random file to make sure we refresh
FileStatus randomFile = mockStoreFile(UTIL.getRandomUUID().toString());
allStoreFiles.add(randomFile);
deletableFiles = cache.getUnreferencedFiles(allStoreFiles, null);
assertEquals(randomFile, Iterables.getOnlyElement(deletableFiles));
assertEquals(1, count.get() - countBeforeCheck); // we check the tmp directory
}

private List<FileStatus> getStoreFilesForSnapshot(SnapshotMock.SnapshotBuilder builder)
throws IOException {
final List<FileStatus> allStoreFiles = Lists.newArrayList();
SnapshotReferenceUtil
.visitReferencedFiles(UTIL.getConfiguration(), fs, builder.getSnapshotsDir(),
new SnapshotReferenceUtil.SnapshotVisitor() {
@Override public void storeFile(RegionInfo regionInfo, String familyName,
SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile) throws IOException {
FileStatus status = mockStoreFile(storeFile.getName());
allStoreFiles.add(status);
}
});
return allStoreFiles;
}

private FileStatus mockStoreFile(String storeFileName) {
FileStatus status = mock(FileStatus.class);
Path path = mock(Path.class);
when(path.getName()).thenReturn(storeFileName);
when(status.getPath()).thenReturn(path);
return status;
}

class SnapshotFiles implements SnapshotFileCache.SnapshotFileInspector {
@Override
public Collection<String> filesUnderSnapshot(final Path snapshotDir) throws IOException {
Expand Down Expand Up @@ -164,12 +241,20 @@ private void createAndTestSnapshot(final SnapshotFileCache cache,
List<Path> files = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
for (Path filePath: builder.addRegion()) {
if (tmp) {
// We should be able to find all the files while the snapshot creation is in-progress
FSUtils.logFileSystemState(fs, rootDir, LOG);
assertFalse("Cache didn't find " + filePath,
contains(getNonSnapshotFiles(cache, filePath), filePath));
}
files.add(filePath);
}
}

// Finalize the snapshot
builder.commit();
if (!tmp) {
builder.commit();
}

if (setFolderTime) {
fs.setTimes(snapshotDir, 0, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.snapshot;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -140,8 +141,15 @@ public void testCorruptedRegionManifest() throws IOException {
builder.addRegionV2();
builder.corruptOneRegionManifest();

fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, TEST_UTIL.getConfiguration()),
true);
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
} finally {
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
TEST_UTIL.getConfiguration()), true);
}
}

/**
Expand All @@ -159,7 +167,29 @@ public void testCorruptedDataManifest() throws IOException {
builder.consolidate();
builder.corruptDataManifest();

fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
try {
cache.getSnapshotsInProgress();
} finally {
fs.delete(SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
TEST_UTIL.getConfiguration()), true);
}
}

@Test
public void testMissedTmpSnapshot() throws IOException {
SnapshotTestingUtils.SnapshotMock snapshotMock =
new SnapshotTestingUtils.SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir);
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2(
SNAPSHOT_NAME_STR, TABLE_NAME_STR);
builder.addRegionV2();
builder.missOneRegionSnapshotFile();
long period = Long.MAX_VALUE;
SnapshotFileCache cache = new SnapshotFileCache(fs, rootDir, period, 10000000,
"test-snapshot-file-cache-refresh", new SnapshotFiles());
cache.getSnapshotsInProgress();
assertTrue(fs.exists(builder.getSnapshotsDir()));
}
}