Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3857,4 +3857,9 @@ public void run() {
public AsyncClusterConnection getAsyncClusterConnection() {
return asyncClusterConnection;
}

@VisibleForTesting
public CompactedHFilesDischarger getCompactedHFilesDischarger() {
return compactedFileDischarger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,16 @@ public void storeFile(final RegionInfo regionInfo, final String family,
String hfile = storeFile.getName();
if (HFileLink.isHFileLink(hfile)) {
names.add(HFileLink.getReferencedHFileName(hfile));
} else if (StoreFileInfo.isReference(hfile)) {
Path refPath = StoreFileInfo.getReferredToFile(new Path(new Path(
new Path(new Path(regionInfo.getTable().getNamespaceAsString(),
regionInfo.getTable().getQualifierAsString()), regionInfo.getEncodedName()),
family), hfile));
names.add(hfile);
names.add(refPath.getName());
if (HFileLink.isHFileLink(refPath.getName())) {
names.add(HFileLink.getReferencedHFileName(refPath.getName()));
}
} else {
names.add(hfile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.client;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -29,13 +32,19 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
Expand Down Expand Up @@ -306,4 +315,133 @@ private static void verifyRow(Result result) throws IOException {
}
}

@Test
public void testMergeRegion() throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testMergeRegion");
String snapshotName = tableName.getNameAsString() + "_snapshot";
Configuration conf = UTIL.getConfiguration();
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
long timeout = 20000; // 20s
try (Admin admin = UTIL.getAdmin()) {
List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
.collect(Collectors.toList());
// create table with 3 regions
Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
List<RegionInfo> regions = admin.getRegions(tableName);
Assert.assertEquals(3, regions.size());
RegionInfo region0 = regions.get(0);
RegionInfo region1 = regions.get(1);
RegionInfo region2 = regions.get(2);
// put some data in the table
UTIL.loadTable(table, FAMILIES);
admin.flush(tableName);
// wait flush is finished
UTIL.waitFor(timeout, () -> {
try {
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
for (RegionInfo region : regions) {
Path regionDir = new Path(tableDir, region.getEncodedName());
for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
if (fs.listStatus(familyDir).length != 1) {
return false;
}
}
}
return true;
} catch (IOException e) {
LOG.warn("Failed check if flush is finished", e);
return false;
}
});
// merge 2 regions
admin.compactionSwitch(false, serverList);
admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
true);
UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
List<RegionInfo> mergedRegions = admin.getRegions(tableName);
RegionInfo mergedRegion =
mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
? mergedRegions.get(1)
: mergedRegions.get(0);
// snapshot
admin.snapshot(snapshotName, tableName);
Assert.assertEquals(1, admin.listSnapshots().size());
// major compact
admin.compactionSwitch(true, serverList);
admin.majorCompactRegion(mergedRegion.getRegionName());
// wait until merged region has no reference
UTIL.waitFor(timeout, () -> {
try {
for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
.getRegionServerThreads()) {
HRegionServer regionServer = regionServerThread.getRegionServer();
for (HRegion subRegion : regionServer.getRegions(tableName)) {
if (subRegion.getRegionInfo().getEncodedName()
.equals(mergedRegion.getEncodedName())) {
regionServer.getCompactedHFilesDischarger().chore();
}
}
}
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
return !regionFs.hasReferences(admin.getDescriptor(tableName));
} catch (IOException e) {
LOG.warn("Failed check merged region has no reference", e);
return false;
}
});
// run catalog janitor to clean and wait for parent regions are archived
UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
UTIL.waitFor(timeout, () -> {
try {
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
for (FileStatus fileStatus : fs.listStatus(tableDir)) {
String name = fileStatus.getPath().getName();
if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
return false;
}
}
return true;
} catch (IOException e) {
LOG.warn("Check if parent regions are archived error", e);
return false;
}
});
// set file modify time and then run cleaner
long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
// scan snapshot
try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) {
verifyScanner(scanner, bbb, yyy);
}
} catch (Exception e) {
LOG.error("scan snapshot error", e);
Assert.fail("Should not throw FileNotFoundException");
Assert.assertTrue(e.getCause() != null);
Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException);
} finally {
tearDownCluster();
}
}

private void traverseAndSetFileTime(Path path, long time) throws IOException {
fs.setTimes(path, time, -1);
if (fs.isDirectory(path)) {
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
List<FileStatus> subDirs =
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
List<FileStatus> files =
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
for (FileStatus subDir : subDirs) {
traverseAndSetFileTime(subDir.getPath(), time);
}
for (FileStatus file : files) {
fs.setTimes(file.getPath(), time, -1);
}
}
}
}