diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index a95a9f4d5528..8bca6966cf33 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.snapshot; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; @@ -46,17 +49,27 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FileLink; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.WALLink; import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupProtos; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; @@ -73,6 +86,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +114,7 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); + private static final String RS_GROUP_ZNODE = "rsgroup"; private static final String MR_NUM_MAPS = "mapreduce.job.maps"; private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; @@ -110,6 +125,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; + private static final String CONF_TARGET_ZK = "snapshot.export.target.zk"; + private static final String CONF_TARGET_RSGROUP = "snapshot.export.target.rsgroup"; private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; @@ -152,6 +169,10 @@ static final class Options { "Number of mappers to use during the copy (mapreduce.job.maps)."); static final Option BANDWIDTH = new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); + static final Option TARGET_ZK = new Option(null, "targetZK", true, + "Target hbase zookeeper string of format - zk1,zk2,zk3:port:/znode."); + static final Option TARGET_RSGROUP = new Option(null, "targetRSGroup", true, + "Rsgroup of the target cluster to supply as favored nodes"); } // Export Map-Reduce Counters, to keep track of the progress @@ -172,7 +193,7 @@ private static class ExportMapper extends Mapper rsgroupServers = new ArrayList<>(); + private static Testing testing = new Testing(); @Override @@ -196,6 +221,8 @@ public void setup(Context context) throws IOException { filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); + targetZK = conf.get(CONF_TARGET_ZK); + targetRSGroup = conf.get(CONF_TARGET_RSGROUP); inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); @@ -209,7 +236,7 @@ public void setup(Context context) throws IOException { try { destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); - outputFs = FileSystem.get(outputRoot.toUri(), destConf); + outputFs = new HFileSystem(FileSystem.get(outputRoot.toUri(), destConf)); } catch (IOException e) { throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); } @@ -228,6 +255,41 @@ public void setup(Context context) throws IOException { // task. testing.injectedFailureCount = context.getTaskAttemptID().getId(); } + + + if (targetZK != null && targetRSGroup != null) { + Configuration targetConf = HBaseConfiguration.createClusterConf(new Configuration(), targetZK); + ZKWatcher watcher = new ZKWatcher(targetConf, NAME, null); + String rsGroupBasedPath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE); + try { + byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(rsGroupBasedPath, targetRSGroup)); + + if (data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + ByteArrayInputStream bis = new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); + RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(bis); + RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName()); + for (HBaseProtos.ServerName el : proto.getServersList()) { + RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort())); + } + rsgroupServers = new ArrayList<>(RSGroupInfo.getServers()); + } + } catch (KeeperException e) { + throw new IOException( + "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup + + targetZK + ", with exception " + e.getMessage(), e); + } catch (InterruptedException e) { + throw new IOException( + "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup + + targetZK + ", with exception " + e.getMessage(), e); + } catch (DeserializationException e) { + throw new IOException( + "Failed to deseralize rsgroup information from zookeeper for target zk cluster " + targetZK + + " and target rsgroup + " + targetRSGroup + ", with exception " + e.getMessage(), e); + } + } else { + LOG.warn("No targetRSGroup or targetZK passed, exporting without any rsgroup awareness to destination"); + } } @Override @@ -309,10 +371,21 @@ private void copyFile(final Context context, final SnapshotFileInfo inputInfo, try { context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); - // Ensure that the output folder is there and copy the file createOutputPath(outputPath.getParent()); - FSDataOutputStream out = outputFs.create(outputPath, true); + + Collections.shuffle(rsgroupServers); + int size = rsgroupServers.size(), index = 0; + InetSocketAddress[] favoredNodes = new InetSocketAddress[size > 2 ? 3 : size]; + for (Address address : this.rsgroupServers) { + if (index == 3 ) { + break; + } + favoredNodes[index++] = new InetSocketAddress(address.getHostname(), address.getPort()); + } + + LOG.info("FavoredNodes selected are " + Arrays.asList(favoredNodes)); + FSDataOutputStream out = FSUtils.create(outputFs.getConf(), outputFs, outputPath, null, favoredNodes); try { copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); } finally { @@ -801,7 +874,7 @@ public boolean nextKeyValue() { private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, - final int mappers, final int bandwidthMB) + final int mappers, final int bandwidthMB, final String targetZK, final String targetRSGroup) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); @@ -817,6 +890,13 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); conf.set(CONF_SNAPSHOT_NAME, snapshotName); conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); + if (targetZK != null) { + conf.set(CONF_TARGET_ZK, targetZK); + } + if (targetRSGroup != null) { + conf.set(CONF_TARGET_RSGROUP, targetRSGroup); + } + String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); Job job = new Job(conf); @@ -913,6 +993,8 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM private int bandwidthMB = Integer.MAX_VALUE; private int filesMode = 0; private int mappers = 0; + private String targetZK = null; + private String targetRSGroup = null; @Override protected void processOptions(CommandLine cmd) { @@ -924,6 +1006,12 @@ protected void processOptions(CommandLine cmd) { if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); } + if (cmd.hasOption(Options.TARGET_ZK.getLongOpt())) { + targetZK = cmd.getOptionValue(Options.TARGET_ZK.getLongOpt()); + } + if (cmd.hasOption(Options.TARGET_RSGROUP.getLongOpt())) { + targetRSGroup = cmd.getOptionValue(Options.TARGET_RSGROUP.getLongOpt()); + } mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); @@ -1080,7 +1168,7 @@ public int doWork() throws IOException { // by the HFileArchiver, since they have no references. try { runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, - filesUser, filesGroup, filesMode, mappers, bandwidthMB); + filesUser, filesGroup, filesMode, mappers, bandwidthMB, targetZK, targetRSGroup); LOG.info("Finalize the Snapshot Export"); if (!skipTmp) { @@ -1139,6 +1227,8 @@ protected void printUsage() { addOption(Options.CHMOD); addOption(Options.MAPPERS); addOption(Options.BANDWIDTH); + addOption(Options.TARGET_ZK); + addOption(Options.TARGET_RSGROUP); } public static void main(String[] args) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java index fc850409aa55..21e8ed6c8c54 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.junit.After; @@ -48,7 +48,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -62,8 +61,7 @@ /** * Test Export Snapshot Tool */ -@Ignore // HBASE-24493 -@Category({VerySlowMapReduceTests.class, LargeTests.class}) +@Category({MapReduceTests.class, LargeTests.class}) public class TestExportSnapshot { @ClassRule @@ -97,12 +95,10 @@ public static void setUpBaseConf(Configuration conf) { public static void setUpBeforeClass() throws Exception { setUpBaseConf(TEST_UTIL.getConfiguration()); TEST_UTIL.startMiniCluster(1); - TEST_UTIL.startMiniMapReduceCluster(); } @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniMapReduceCluster(); TEST_UTIL.shutdownMiniCluster(); } @@ -325,7 +321,7 @@ private static Set listFiles(final FileSystem fs, final Path root, final return files; } - private Path getHdfsDestinationDir() { + protected Path getHdfsDestinationDir() { Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis()); LOG.info("HDFS export destination path: " + path); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotRSGroupAware.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotRSGroupAware.java new file mode 100644 index 000000000000..263b45d31040 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotRSGroupAware.java @@ -0,0 +1,200 @@ +package org.apache.hadoop.hbase.snapshot; + +import static org.apache.hadoop.util.ToolRunner.run; +import static org.junit.Assert.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.rsgroup.RSGroupUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@Category({ MapReduceTests.class, LargeTests.class }) public class TestExportSnapshotRSGroupAware + extends TestExportSnapshot { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestExportSnapshot.class); + private static final Logger LOG = LoggerFactory.getLogger(TestExportSnapshot.class); + private static final HBaseTestingUtility TEST_UTIL1 = new HBaseTestingUtility(); + private static final HBaseTestingUtility TEST_UTIL2 = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final String RSGROUP_NAME = "rsgroup"; + private static final String NAMESPACE_NAME = "snapshotNamespace"; + private static final String TABLE_NAME = "snapshotTable"; + private static final int numRegions = 5; + private static MiniHBaseCluster cluster1; + private static Configuration conf1; + private static Admin admin1; + private static Admin admin2; + private static MiniHBaseCluster cluster2; + private static Configuration conf2; + private static TableName table; + private static String snapshotName; + @Rule public final TestName testName = new TestName(); + + @BeforeClass public static void setUpBeforeClass() throws Exception { + conf1 = TEST_UTIL1.getConfiguration(); + conf1.setBoolean(RSGroupUtil.RS_GROUP_ENABLED, true); + cluster1 = TEST_UTIL1.startMiniCluster(7); + admin1 = cluster1.getMaster().getConnection().getAdmin(); + + conf2 = conf1; + cluster2 = cluster1; + admin2 = admin1; +// conf2 = TEST_UTIL2.getConfiguration(); +// conf2.setBoolean(RSGroupUtil.RS_GROUP_ENABLED, true); +// cluster2 = TEST_UTIL2.startMiniCluster(7); +// admin2 = cluster2.getMaster().getConnection().getAdmin(); + } + + @AfterClass public static void tearDownAfterClass() throws Exception { + TEST_UTIL1.shutdownMiniCluster(); +// TEST_UTIL2.shutdownMiniCluster(); + } + + private RSGroupInfo addGroup(Admin gAdmin, String group, int servers) throws IOException { + RSGroupInfo defaultInfo = gAdmin.getRSGroup(RSGroupInfo.DEFAULT_GROUP); + assertNotNull("Default group info can't be null", defaultInfo); + assertTrue("default group doesn't have enough servers", + defaultInfo.getServers().size() >= servers); + gAdmin.addRSGroup(group); + Set
set = Sets.newHashSet(); + for (Address address : defaultInfo.getServers()) { + if (set.size() == servers) { + break; + } + set.add(address); + } + + gAdmin.moveServersToRSGroup(set, group); + RSGroupInfo result = gAdmin.getRSGroup(group); + assertEquals("Insufficient servers in group", result.getServers().size(), servers); + LOG.debug("Created group: " + group + " with servers: " + result.getServers()); + return result; + } + + @Before public void setUp() throws Exception { + table = TableName.valueOf(NAMESPACE_NAME + ":" + TABLE_NAME); + snapshotName = "snaptb0-" + testName.getMethodName(); + + addGroup(admin1, RSGROUP_NAME, 4); +// addGroup(admin2, RSGROUP_NAME, 4); + + NamespaceDescriptor nsDescriptor = NamespaceDescriptor.create(NAMESPACE_NAME) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, RSGROUP_NAME).build(); + admin1.createNamespace(nsDescriptor); + + SnapshotTestingUtils.createPreSplitTable(TEST_UTIL1, table, numRegions, FAMILY); + SnapshotTestingUtils.loadData(TEST_UTIL1, table, 1000, FAMILY); + + admin1.snapshot(snapshotName, table); + } + + @After public void tearDown() throws Exception { + SnapshotTestingUtils.deleteAllSnapshots(admin1); +// SnapshotTestingUtils.deleteAllSnapshots(admin2); + admin1.disableTable(table); + admin1.deleteTable(table); + admin1.deleteNamespace(NAMESPACE_NAME); + } + + @Test public void testExportSnapshotRSGroupAware() throws Exception { + Path srcPath = cluster1.getMaster().getMasterFileSystem().getRootDir(); + Path targetPath = cluster2.getMaster().getMasterFileSystem().getRootDir(); + FileSystem srcFs = srcPath.getFileSystem(conf1); + FileSystem targetFs = targetPath.getFileSystem(conf2); + Path targetRootDir = + targetPath.makeQualified(targetFs.getUri(), targetFs.getWorkingDirectory()); + Path targetDir = new Path(targetRootDir, new Path("tmp")); + LOG.info("tgtFsUri={}, tgtDir={}, srcFsUri={}", targetFs.getUri(), targetDir, srcFs.getUri()); + List opts = new ArrayList<>(); + opts.add("--snapshot"); + opts.add(snapshotName); + opts.add("--copy-to"); + opts.add(targetDir.toString()); + opts.add("--target"); + opts.add(snapshotName); + opts.add("--targetZK"); + opts.add(TEST_UTIL1.getZkCluster().getAddress().toString() + ":" + conf2 + .get("zookeeper.znode.parent")); + opts.add("--targetRSGroup"); + opts.add(RSGROUP_NAME); + + int res = run(conf1, new ExportSnapshot(), opts.toArray(new String[opts.size()])); + assertEquals("res=" + res, 0, res); + + final Path snapshotDir = new Path(HConstants.SNAPSHOT_DIR_NAME, snapshotName); + assertTrue(targetDir.toString() + " " + snapshotDir.toString(), + targetFs.exists(new Path(targetDir, snapshotDir))); + LOG.info("Exported snapshot"); + + // Verify File-System state + FileStatus[] rootFiles = targetFs.listStatus(targetDir); + assertEquals("Number of files should match number of regions", 2, rootFiles.length); + for (FileStatus fileStatus : rootFiles) { + String name = fileStatus.getPath().getName(); + assertTrue(fileStatus.toString(), fileStatus.isDirectory()); + assertTrue(name.toString(), name.equals(HConstants.SNAPSHOT_DIR_NAME) || name + .equals(HConstants.HFILE_ARCHIVE_DIRECTORY)); + } + LOG.info("Verified filesystem state"); + + // Compare the snapshot metadata and verify the hfiles + verifySnapshotDir(srcFs, new Path(srcPath, snapshotDir), targetFs, + new Path(targetDir, snapshotDir)); + Set snapshotFiles = + verifySnapshot(conf2, targetFs, targetDir, table, snapshotName, null); + assertEquals(numRegions, snapshotFiles.size()); + + Set rsgroupHosts = new HashSet<>(); + RSGroupInfo rsGroupInfo = admin2.getRSGroup(RSGROUP_NAME); + for (Address addr : rsGroupInfo.getServers()) { + rsgroupHosts.add(addr.toString()); + } + + //Ensure rsaware + RemoteIterator fileStatusListIterator = + targetFs.listFiles(new Path(targetDir, HConstants.HFILE_ARCHIVE_DIRECTORY), true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + BlockLocation[] locations = targetFs.getFileBlockLocations(fileStatus.getPath(), 0, Integer.MAX_VALUE); + for (BlockLocation location : locations) { + for (String host : location.getNames()) { +// assertTrue("Location of file should be a node from rsgroup " + RSGROUP_NAME, rsgroupHosts.contains(host)); + } + } + } + } +} +