1919package org .apache .hadoop .hbase .snapshot ;
2020
2121import java .io .BufferedInputStream ;
22+ import java .io .ByteArrayInputStream ;
2223import java .io .DataInput ;
2324import java .io .DataOutput ;
2425import java .io .FileNotFoundException ;
2526import java .io .IOException ;
2627import java .io .InputStream ;
28+ import java .net .InetSocketAddress ;
2729import java .util .ArrayList ;
30+ import java .util .Arrays ;
2831import java .util .Collections ;
2932import java .util .Comparator ;
3033import java .util .LinkedList ;
@@ -110,6 +113,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
110113 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify" ;
111114 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root" ;
112115 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root" ;
116+ private static final String CONF_TARGET_ZK = "snapshot.export.target.zk" ;
117+ private static final String CONF_TARGET_RSGROUP = "snapshot.export.target.rsgroup" ;
113118 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size" ;
114119 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group" ;
115120 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb" ;
@@ -152,6 +157,10 @@ static final class Options {
152157 "Number of mappers to use during the copy (mapreduce.job.maps)." );
153158 static final Option BANDWIDTH = new Option (null , "bandwidth" , true ,
154159 "Limit bandwidth to this value in MB/second." );
160+ static final Option TARGET_ZK = new Option (null , "targetZK" , true ,
161+ "Target hbase zookeeper string of format - zk1,zk2,zk3:port:/znode." );
162+ static final Option TARGET_RSGROUP = new Option (null , "targetRSGroup" , true ,
163+ "Rsgroup of the target cluster to supply as favored nodes" );
155164 }
156165
157166 // Export Map-Reduce Counters, to keep track of the progress
@@ -172,14 +181,18 @@ private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
172181 private short filesMode ;
173182 private int bufferSize ;
174183
175- private FileSystem outputFs ;
184+ private HFileSystem outputFs ;
176185 private Path outputArchive ;
177186 private Path outputRoot ;
178187
179188 private FileSystem inputFs ;
180189 private Path inputArchive ;
181190 private Path inputRoot ;
182191
192+ private String targetZK ;
193+ private String targetRSGroup ;
194+ private List <Address > rsgroupServers = new ArrayList <>();
195+
183196 private static Testing testing = new Testing ();
184197
185198 @ Override
@@ -196,6 +209,8 @@ public void setup(Context context) throws IOException {
196209 filesMode = (short )conf .getInt (CONF_FILES_MODE , 0 );
197210 outputRoot = new Path (conf .get (CONF_OUTPUT_ROOT ));
198211 inputRoot = new Path (conf .get (CONF_INPUT_ROOT ));
212+ targetZK = conf .get (CONF_TARGET_ZK );
213+ targetRSGroup = conf .get (CONF_TARGET_RSGROUP );
199214
200215 inputArchive = new Path (inputRoot , HConstants .HFILE_ARCHIVE_DIRECTORY );
201216 outputArchive = new Path (outputRoot , HConstants .HFILE_ARCHIVE_DIRECTORY );
@@ -209,7 +224,7 @@ public void setup(Context context) throws IOException {
209224
210225 try {
211226 destConf .setBoolean ("fs." + outputRoot .toUri ().getScheme () + ".impl.disable.cache" , true );
212- outputFs = FileSystem .get (outputRoot .toUri (), destConf );
227+ outputFs = new HFileSystem ( FileSystem .get (outputRoot .toUri (), destConf ) );
213228 } catch (IOException e ) {
214229 throw new IOException ("Could not get the output FileSystem with root=" + outputRoot , e );
215230 }
@@ -228,6 +243,41 @@ public void setup(Context context) throws IOException {
228243 // task.
229244 testing .injectedFailureCount = context .getTaskAttemptID ().getId ();
230245 }
246+
247+
248+ if (targetZK != null && targetRSGroup != null ) {
249+ Configuration targetConf = HBaseConfiguration .createClusterConf (new Configuration (), targetZK );
250+ ZKWatcher watcher = new ZKWatcher (targetConf , NAME , null );
251+ String rsGroupBasedPath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RSGroupZNodePaths .rsGroupZNode );
252+ try {
253+ byte [] data = ZKUtil .getData (watcher , ZNodePaths .joinZNode (rsGroupBasedPath , targetRSGroup ));
254+
255+ if (data .length > 0 ) {
256+ ProtobufUtil .expectPBMagicPrefix (data );
257+ ByteArrayInputStream bis = new ByteArrayInputStream (data , ProtobufUtil .lengthOfPBMagic (), data .length );
258+ RSGroupProtos .RSGroupInfo proto = RSGroupProtos .RSGroupInfo .parseFrom (bis );
259+ RSGroupInfo RSGroupInfo = new RSGroupInfo (proto .getName ());
260+ for (HBaseProtos .ServerName el : proto .getServersList ()) {
261+ RSGroupInfo .addServer (Address .fromParts (el .getHostName (), el .getPort ()));
262+ }
263+ rsgroupServers = new ArrayList <>(RSGroupInfo .getServers ());
264+ }
265+ } catch (KeeperException e ) {
266+ throw new IOException (
267+ "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup
268+ + targetZK + ", with exception " + e .getMessage (), e );
269+ } catch (InterruptedException e ) {
270+ throw new IOException (
271+ "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup
272+ + targetZK + ", with exception " + e .getMessage (), e );
273+ } catch (DeserializationException e ) {
274+ throw new IOException (
275+ "Failed to deseralize rsgroup information from zookeeper for target zk cluster " + targetZK
276+ + " and target rsgroup + " + targetRSGroup + ", with exception " + e .getMessage (), e );
277+ }
278+ } else {
279+ LOG .warn ("No targetRSGroup or targetZK passed, exporting without any rsgroup awareness to destination" );
280+ }
231281 }
232282
233283 @ Override
@@ -309,10 +359,21 @@ private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
309359
310360 try {
311361 context .getCounter (Counter .BYTES_EXPECTED ).increment (inputStat .getLen ());
312-
313362 // Ensure that the output folder is there and copy the file
314363 createOutputPath (outputPath .getParent ());
315- FSDataOutputStream out = outputFs .create (outputPath , true );
364+
365+ Collections .shuffle (rsgroupServers );
366+ int size = rsgroupServers .size (), index = 0 ;
367+ InetSocketAddress [] favoredNodes = new InetSocketAddress [size > 2 ? 3 : size ];
368+ for (Address address : this .rsgroupServers ) {
369+ if (index == 3 ) {
370+ break ;
371+ }
372+ favoredNodes [index ++] = new InetSocketAddress (address .getHostname (), address .getPort ());
373+ }
374+
375+ LOG .info ("FavoredNodes selected are " + Arrays .asList (favoredNodes ));
376+ FSDataOutputStream out = FSUtils .create (outputFs .getConf (), outputFs , outputPath , null , favoredNodes );
316377 try {
317378 copyData (context , inputStat .getPath (), in , outputPath , out , inputStat .getLen ());
318379 } finally {
@@ -801,7 +862,7 @@ public boolean nextKeyValue() {
801862 private void runCopyJob (final Path inputRoot , final Path outputRoot ,
802863 final String snapshotName , final Path snapshotDir , final boolean verifyChecksum ,
803864 final String filesUser , final String filesGroup , final int filesMode ,
804- final int mappers , final int bandwidthMB )
865+ final int mappers , final int bandwidthMB , final String targetZK , final String targetRSGroup )
805866 throws IOException , InterruptedException , ClassNotFoundException {
806867 Configuration conf = getConf ();
807868 if (filesGroup != null ) conf .set (CONF_FILES_GROUP , filesGroup );
@@ -817,6 +878,9 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot,
817878 conf .setInt (CONF_BANDWIDTH_MB , bandwidthMB );
818879 conf .set (CONF_SNAPSHOT_NAME , snapshotName );
819880 conf .set (CONF_SNAPSHOT_DIR , snapshotDir .toString ());
881+ conf .set (CONF_TARGET_ZK , targetZK );
882+ conf .set (CONF_TARGET_RSGROUP , targetRSGroup );
883+
820884
821885 String jobname = conf .get (CONF_MR_JOB_NAME , "ExportSnapshot-" + snapshotName );
822886 Job job = new Job (conf );
@@ -913,6 +977,8 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM
913977 private int bandwidthMB = Integer .MAX_VALUE ;
914978 private int filesMode = 0 ;
915979 private int mappers = 0 ;
980+ private String targetZK = null ;
981+ private String targetRSGroup = null ;
916982
917983 @ Override
918984 protected void processOptions (CommandLine cmd ) {
@@ -924,6 +990,12 @@ protected void processOptions(CommandLine cmd) {
924990 if (cmd .hasOption (Options .COPY_FROM .getLongOpt ())) {
925991 inputRoot = new Path (cmd .getOptionValue (Options .COPY_FROM .getLongOpt ()));
926992 }
993+ if (cmd .hasOption (Options .TARGET_ZK .getLongOpt ())) {
994+ targetZK = cmd .getOptionValue (Options .TARGET_ZK .getLongOpt ());
995+ }
996+ if (cmd .hasOption (Options .TARGET_RSGROUP .getLongOpt ())) {
997+ targetRSGroup = cmd .getOptionValue (Options .TARGET_RSGROUP .getLongOpt ());
998+ }
927999 mappers = getOptionAsInt (cmd , Options .MAPPERS .getLongOpt (), mappers );
9281000 filesUser = cmd .getOptionValue (Options .CHUSER .getLongOpt (), filesUser );
9291001 filesGroup = cmd .getOptionValue (Options .CHGROUP .getLongOpt (), filesGroup );
@@ -1080,7 +1152,7 @@ public int doWork() throws IOException {
10801152 // by the HFileArchiver, since they have no references.
10811153 try {
10821154 runCopyJob (inputRoot , outputRoot , snapshotName , snapshotDir , verifyChecksum ,
1083- filesUser , filesGroup , filesMode , mappers , bandwidthMB );
1155+ filesUser , filesGroup , filesMode , mappers , bandwidthMB , targetZK , targetRSGroup );
10841156
10851157 LOG .info ("Finalize the Snapshot Export" );
10861158 if (!skipTmp ) {
@@ -1139,6 +1211,8 @@ protected void printUsage() {
11391211 addOption (Options .CHMOD );
11401212 addOption (Options .MAPPERS );
11411213 addOption (Options .BANDWIDTH );
1214+ addOption (Options .TARGET_ZK );
1215+ addOption (Options .TARGET_RSGROUP );
11421216 }
11431217
11441218 public static void main (String [] args ) {
0 commit comments