1919package org .apache .hadoop .hbase .snapshot ;
2020
2121import java .io .BufferedInputStream ;
22+ import java .io .ByteArrayInputStream ;
2223import java .io .DataInput ;
2324import java .io .DataOutput ;
2425import java .io .FileNotFoundException ;
2526import java .io .IOException ;
2627import java .io .InputStream ;
28+ import java .net .InetSocketAddress ;
2729import java .util .ArrayList ;
30+ import java .util .Arrays ;
2831import java .util .Collections ;
2932import java .util .Comparator ;
3033import java .util .LinkedList ;
4649import org .apache .hadoop .hbase .HConstants ;
4750import org .apache .hadoop .hbase .TableName ;
4851import org .apache .hadoop .hbase .client .RegionInfo ;
52+ import org .apache .hadoop .hbase .exceptions .DeserializationException ;
53+ import org .apache .hadoop .hbase .fs .HFileSystem ;
4954import org .apache .hadoop .hbase .io .FileLink ;
5055import org .apache .hadoop .hbase .io .HFileLink ;
5156import org .apache .hadoop .hbase .io .WALLink ;
5257import org .apache .hadoop .hbase .io .hadoopbackport .ThrottledInputStream ;
5358import org .apache .hadoop .hbase .mapreduce .TableMapReduceUtil ;
5459import org .apache .hadoop .hbase .mob .MobUtils ;
60+ import org .apache .hadoop .hbase .net .Address ;
61+ import org .apache .hadoop .hbase .rsgroup .RSGroupInfo ;
62+ import org .apache .hadoop .hbase .shaded .protobuf .ProtobufUtil ;
63+ import org .apache .hadoop .hbase .shaded .protobuf .generated .HBaseProtos ;
64+ import org .apache .hadoop .hbase .shaded .protobuf .generated .RSGroupProtos ;
5565import org .apache .hadoop .hbase .util .AbstractHBaseTool ;
5666import org .apache .hadoop .hbase .util .CommonFSUtils ;
5767import org .apache .hadoop .hbase .util .FSUtils ;
5868import org .apache .hadoop .hbase .util .HFileArchiveUtil ;
5969import org .apache .hadoop .hbase .util .Pair ;
70+ import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
71+ import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
72+ import org .apache .hadoop .hbase .zookeeper .ZNodePaths ;
6073import org .apache .hadoop .io .BytesWritable ;
6174import org .apache .hadoop .io .IOUtils ;
6275import org .apache .hadoop .io .NullWritable ;
7386import org .apache .hadoop .util .StringUtils ;
7487import org .apache .hadoop .util .Tool ;
7588import org .apache .yetus .audience .InterfaceAudience ;
89+ import org .apache .zookeeper .KeeperException ;
7690import org .slf4j .Logger ;
7791import org .slf4j .LoggerFactory ;
7892
@@ -100,6 +114,7 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
100114
101115 private static final Logger LOG = LoggerFactory .getLogger (ExportSnapshot .class );
102116
117+ private static final String RS_GROUP_ZNODE = "rsgroup" ;
103118 private static final String MR_NUM_MAPS = "mapreduce.job.maps" ;
104119 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits" ;
105120 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name" ;
@@ -110,6 +125,8 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
110125 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify" ;
111126 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root" ;
112127 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root" ;
128+ private static final String CONF_TARGET_ZK = "snapshot.export.target.zk" ;
129+ private static final String CONF_TARGET_RSGROUP = "snapshot.export.target.rsgroup" ;
113130 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size" ;
114131 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group" ;
115132 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb" ;
@@ -152,6 +169,10 @@ static final class Options {
152169 "Number of mappers to use during the copy (mapreduce.job.maps)." );
153170 static final Option BANDWIDTH = new Option (null , "bandwidth" , true ,
154171 "Limit bandwidth to this value in MB/second." );
172+ static final Option TARGET_ZK = new Option (null , "targetZK" , true ,
173+ "Target hbase zookeeper string of format - zk1,zk2,zk3:port:/znode." );
174+ static final Option TARGET_RSGROUP = new Option (null , "targetRSGroup" , true ,
175+ "Rsgroup of the target cluster to supply as favored nodes" );
155176 }
156177
157178 // Export Map-Reduce Counters, to keep track of the progress
@@ -172,14 +193,18 @@ private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
172193 private short filesMode ;
173194 private int bufferSize ;
174195
175- private FileSystem outputFs ;
196+ private HFileSystem outputFs ;
176197 private Path outputArchive ;
177198 private Path outputRoot ;
178199
179200 private FileSystem inputFs ;
180201 private Path inputArchive ;
181202 private Path inputRoot ;
182203
204+ private String targetZK ;
205+ private String targetRSGroup ;
206+ private List <Address > rsgroupServers = new ArrayList <>();
207+
183208 private static Testing testing = new Testing ();
184209
185210 @ Override
@@ -196,6 +221,8 @@ public void setup(Context context) throws IOException {
196221 filesMode = (short )conf .getInt (CONF_FILES_MODE , 0 );
197222 outputRoot = new Path (conf .get (CONF_OUTPUT_ROOT ));
198223 inputRoot = new Path (conf .get (CONF_INPUT_ROOT ));
224+ targetZK = conf .get (CONF_TARGET_ZK );
225+ targetRSGroup = conf .get (CONF_TARGET_RSGROUP );
199226
200227 inputArchive = new Path (inputRoot , HConstants .HFILE_ARCHIVE_DIRECTORY );
201228 outputArchive = new Path (outputRoot , HConstants .HFILE_ARCHIVE_DIRECTORY );
@@ -209,7 +236,7 @@ public void setup(Context context) throws IOException {
209236
210237 try {
211238 destConf .setBoolean ("fs." + outputRoot .toUri ().getScheme () + ".impl.disable.cache" , true );
212- outputFs = FileSystem .get (outputRoot .toUri (), destConf );
239+ outputFs = new HFileSystem ( FileSystem .get (outputRoot .toUri (), destConf ) );
213240 } catch (IOException e ) {
214241 throw new IOException ("Could not get the output FileSystem with root=" + outputRoot , e );
215242 }
@@ -228,6 +255,41 @@ public void setup(Context context) throws IOException {
228255 // task.
229256 testing .injectedFailureCount = context .getTaskAttemptID ().getId ();
230257 }
258+
259+
260+ if (targetZK != null && targetRSGroup != null ) {
261+ Configuration targetConf = HBaseConfiguration .createClusterConf (new Configuration (), targetZK );
262+ ZKWatcher watcher = new ZKWatcher (targetConf , NAME , null );
263+ String rsGroupBasedPath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
264+ try {
265+ byte [] data = ZKUtil .getData (watcher , ZNodePaths .joinZNode (rsGroupBasedPath , targetRSGroup ));
266+
267+ if (data .length > 0 ) {
268+ ProtobufUtil .expectPBMagicPrefix (data );
269+ ByteArrayInputStream bis = new ByteArrayInputStream (data , ProtobufUtil .lengthOfPBMagic (), data .length );
270+ RSGroupProtos .RSGroupInfo proto = RSGroupProtos .RSGroupInfo .parseFrom (bis );
271+ RSGroupInfo RSGroupInfo = new RSGroupInfo (proto .getName ());
272+ for (HBaseProtos .ServerName el : proto .getServersList ()) {
273+ RSGroupInfo .addServer (Address .fromParts (el .getHostName (), el .getPort ()));
274+ }
275+ rsgroupServers = new ArrayList <>(RSGroupInfo .getServers ());
276+ }
277+ } catch (KeeperException e ) {
278+ throw new IOException (
279+ "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup
280+ + targetZK + ", with exception " + e .getMessage (), e );
281+ } catch (InterruptedException e ) {
282+ throw new IOException (
283+ "Failed to setup zookeeper watcher for target zk cluster " + " and target rsgroup + " + targetRSGroup
284+ + targetZK + ", with exception " + e .getMessage (), e );
285+ } catch (DeserializationException e ) {
286+ throw new IOException (
287+ "Failed to deseralize rsgroup information from zookeeper for target zk cluster " + targetZK
288+ + " and target rsgroup + " + targetRSGroup + ", with exception " + e .getMessage (), e );
289+ }
290+ } else {
291+ LOG .warn ("No targetRSGroup or targetZK passed, exporting without any rsgroup awareness to destination" );
292+ }
231293 }
232294
233295 @ Override
@@ -309,10 +371,21 @@ private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
309371
310372 try {
311373 context .getCounter (Counter .BYTES_EXPECTED ).increment (inputStat .getLen ());
312-
313374 // Ensure that the output folder is there and copy the file
314375 createOutputPath (outputPath .getParent ());
315- FSDataOutputStream out = outputFs .create (outputPath , true );
376+
377+ Collections .shuffle (rsgroupServers );
378+ int size = rsgroupServers .size (), index = 0 ;
379+ InetSocketAddress [] favoredNodes = new InetSocketAddress [size > 2 ? 3 : size ];
380+ for (Address address : this .rsgroupServers ) {
381+ if (index == 3 ) {
382+ break ;
383+ }
384+ favoredNodes [index ++] = new InetSocketAddress (address .getHostname (), address .getPort ());
385+ }
386+
387+ LOG .info ("FavoredNodes selected are " + Arrays .asList (favoredNodes ));
388+ FSDataOutputStream out = FSUtils .create (outputFs .getConf (), outputFs , outputPath , null , favoredNodes );
316389 try {
317390 copyData (context , inputStat .getPath (), in , outputPath , out , inputStat .getLen ());
318391 } finally {
@@ -801,7 +874,7 @@ public boolean nextKeyValue() {
801874 private void runCopyJob (final Path inputRoot , final Path outputRoot ,
802875 final String snapshotName , final Path snapshotDir , final boolean verifyChecksum ,
803876 final String filesUser , final String filesGroup , final int filesMode ,
804- final int mappers , final int bandwidthMB )
877+ final int mappers , final int bandwidthMB , final String targetZK , final String targetRSGroup )
805878 throws IOException , InterruptedException , ClassNotFoundException {
806879 Configuration conf = getConf ();
807880 if (filesGroup != null ) conf .set (CONF_FILES_GROUP , filesGroup );
@@ -817,6 +890,9 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot,
817890 conf .setInt (CONF_BANDWIDTH_MB , bandwidthMB );
818891 conf .set (CONF_SNAPSHOT_NAME , snapshotName );
819892 conf .set (CONF_SNAPSHOT_DIR , snapshotDir .toString ());
893+ conf .set (CONF_TARGET_ZK , targetZK );
894+ conf .set (CONF_TARGET_RSGROUP , targetRSGroup );
895+
820896
821897 String jobname = conf .get (CONF_MR_JOB_NAME , "ExportSnapshot-" + snapshotName );
822898 Job job = new Job (conf );
@@ -913,6 +989,8 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM
913989 private int bandwidthMB = Integer .MAX_VALUE ;
914990 private int filesMode = 0 ;
915991 private int mappers = 0 ;
992+ private String targetZK = null ;
993+ private String targetRSGroup = null ;
916994
917995 @ Override
918996 protected void processOptions (CommandLine cmd ) {
@@ -924,6 +1002,12 @@ protected void processOptions(CommandLine cmd) {
9241002 if (cmd .hasOption (Options .COPY_FROM .getLongOpt ())) {
9251003 inputRoot = new Path (cmd .getOptionValue (Options .COPY_FROM .getLongOpt ()));
9261004 }
1005+ if (cmd .hasOption (Options .TARGET_ZK .getLongOpt ())) {
1006+ targetZK = cmd .getOptionValue (Options .TARGET_ZK .getLongOpt ());
1007+ }
1008+ if (cmd .hasOption (Options .TARGET_RSGROUP .getLongOpt ())) {
1009+ targetRSGroup = cmd .getOptionValue (Options .TARGET_RSGROUP .getLongOpt ());
1010+ }
9271011 mappers = getOptionAsInt (cmd , Options .MAPPERS .getLongOpt (), mappers );
9281012 filesUser = cmd .getOptionValue (Options .CHUSER .getLongOpt (), filesUser );
9291013 filesGroup = cmd .getOptionValue (Options .CHGROUP .getLongOpt (), filesGroup );
@@ -1080,7 +1164,7 @@ public int doWork() throws IOException {
10801164 // by the HFileArchiver, since they have no references.
10811165 try {
10821166 runCopyJob (inputRoot , outputRoot , snapshotName , snapshotDir , verifyChecksum ,
1083- filesUser , filesGroup , filesMode , mappers , bandwidthMB );
1167+ filesUser , filesGroup , filesMode , mappers , bandwidthMB , targetZK , targetRSGroup );
10841168
10851169 LOG .info ("Finalize the Snapshot Export" );
10861170 if (!skipTmp ) {
@@ -1139,6 +1223,8 @@ protected void printUsage() {
11391223 addOption (Options .CHMOD );
11401224 addOption (Options .MAPPERS );
11411225 addOption (Options .BANDWIDTH );
1226+ addOption (Options .TARGET_ZK );
1227+ addOption (Options .TARGET_RSGROUP );
11421228 }
11431229
11441230 public static void main (String [] args ) {
0 commit comments