Skip to content

Commit 98a80ca

Browse files
HADOOP-19307 Add option to add parent directory of source directories to target directories
1 parent a3b9c37 commit 98a80ca

File tree

9 files changed

+122
-6
lines changed

9 files changed

+122
-6
lines changed

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.hadoop.tools;
2020

2121
import java.io.IOException;
22+
import java.util.List;
2223
import java.util.Random;
2324

2425
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
26+
import org.apache.hadoop.util.StringUtils;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729
import org.apache.hadoop.classification.InterfaceAudience;
@@ -203,6 +205,7 @@ public Job createAndSubmitJob() throws Exception {
203205
job = createJob();
204206
}
205207
prepareFileListing(job);
208+
addSourcePathsToConf(job);
206209
job.submit();
207210
submitted = true;
208211
} finally {
@@ -221,6 +224,8 @@ public Job createAndSubmitJob() throws Exception {
221224
return job;
222225
}
223226

227+
228+
224229
/**
225230
* Wait for the given job to complete.
226231
* @param job the given mapreduce job that has already been submitted
@@ -233,6 +238,27 @@ public void waitForJobCompletion(Job job) throws Exception {
233238
}
234239
}
235240

241+
private void addSourcePathsToConf(Job job){
242+
List<Path> sourcePaths = context.getSourcePaths();
243+
244+
if(sourcePaths == null || sourcePaths.size() == 0){
245+
return;
246+
}
247+
248+
StringBuilder sb = new StringBuilder();
249+
for(Path source : sourcePaths){
250+
sb.append(StringUtils.escapeString(source.toString()));
251+
sb.append(",");
252+
}
253+
if(sb.length() > 0){
254+
sb.deleteCharAt(sb.length() -1);
255+
}
256+
257+
258+
job.getConfiguration().set(DistCpConstants.CONF_LABEL_SOURCE_PATHS,
259+
sb.toString());
260+
}
261+
236262
/**
237263
* Set targetPathExists in both inputOptions and job config,
238264
* for the benefit of CopyCommitter

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ private DistCpConstants() {
6464
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
6565
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
6666
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
67+
68+
public static final String CONF_LABEL_SOURCE_PATHS = "distcp.source.paths";
6769
public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
70+
public static final String CONF_LABEL_PRESERVE_PARENT_DIRS = "distcp.preserve.parent.dirs";
6871
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
6972
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
7073
public static final String CONF_LABEL_APPEND = "distcp.copy.append";

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ public void setPreserveRawXattrs(boolean preserveRawXattrs) {
155155
this.preserveRawXattrs = preserveRawXattrs;
156156
}
157157

158+
public boolean shouldPreserveParentDirs(){
159+
return options.shouldPreserveParentDirs();
160+
}
161+
158162
public Path getAtomicWorkPath() {
159163
return options.getAtomicWorkPath();
160164
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public enum DistCpOptionSwitch {
5656
"preservation is independent of the -p flag. " +
5757
"Refer to the DistCp documentation for more details.")),
5858

59+
PRESERVE_PARENT_DIRS(DistCpConstants.CONF_LABEL_PRESERVE_PARENT_DIRS,
60+
new Option("preserveParentDirs", false, "Preserve source directories," +
61+
" not the contents of the source-directories")),
62+
5963
/**
6064
* Update target location by copying only files that are missing
6165
* in the target. This can be used to periodically sync two folders

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ public final class DistCpOptions {
149149
/** File attributes that need to be preserved. */
150150
private final EnumSet<FileAttribute> preserveStatus;
151151

152+
/** Source directories that need to be preserved. */
153+
private final boolean preserveParentDirs;
154+
152155
// Size of chunk in number of blocks when splitting large file into chunks
153156
// to copy in parallel. Default is 0 and file are not splitted.
154157
private final int blocksPerChunk;
@@ -216,6 +219,8 @@ private DistCpOptions(Builder builder) {
216219

217220
this.preserveStatus = builder.preserveStatus;
218221

222+
this.preserveParentDirs = builder.preserveParentDirs;
223+
219224
this.blocksPerChunk = builder.blocksPerChunk;
220225

221226
this.copyBufferSize = builder.copyBufferSize;
@@ -336,6 +341,15 @@ public boolean shouldPreserve(FileAttribute attribute) {
336341
return preserveStatus.contains(attribute);
337342
}
338343

344+
/**
345+
* Checks if the source directories should be preserved or not.
346+
*
347+
* @return true - True if source dirs should be preserved
348+
*/
349+
public boolean shouldPreserveParentDirs(){
350+
return preserveParentDirs;
351+
}
352+
339353
public int getBlocksPerChunk() {
340354
return blocksPerChunk;
341355
}
@@ -394,6 +408,8 @@ public void appendToConf(Configuration conf) {
394408
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
395409
filtersFile);
396410
}
411+
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_PARENT_DIRS,
412+
String.valueOf(preserveParentDirs));
397413
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
398414
String.valueOf(blocksPerChunk));
399415
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.COPY_BUFFER_SIZE,
@@ -451,6 +467,7 @@ public String toString() {
451467
", verboseLog=" + verboseLog +
452468
", directWrite=" + directWrite +
453469
", useiterator=" + useIterator +
470+
", preserveParentDirs=" + preserveParentDirs +
454471
'}';
455472
}
456473

@@ -495,6 +512,8 @@ public static class Builder {
495512
private EnumSet<FileAttribute> preserveStatus =
496513
EnumSet.noneOf(FileAttribute.class);
497514

515+
private boolean preserveParentDirs = false;
516+
498517
private int blocksPerChunk = 0;
499518

500519
private int copyBufferSize =
@@ -740,6 +759,11 @@ public Builder preserve(FileAttribute attribute) {
740759
return this;
741760
}
742761

762+
public Builder withPreserveParentDirs(boolean newPreserveDirs){
763+
this.preserveParentDirs = newPreserveDirs;
764+
return this;
765+
}
766+
743767
public Builder withBlocksPerChunk(int newBlocksPerChunk) {
744768
this.blocksPerChunk = newBlocksPerChunk;
745769
return this;

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,8 @@ private Path computeSourceRootPath(FileStatus sourceStatus,
465465
} else {
466466
return sourceStatus.getPath().getParent();
467467
}
468+
} else if (context.shouldPreserveParentDirs()) {
469+
return sourceStatus.getPath().getParent();
468470
} else {
469471
boolean specialHandling =
470472
(context.getSourcePaths().size() == 1 &&

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.tools.mapred;
2020

21+
import org.apache.hadoop.util.StringUtils;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

@@ -46,6 +47,7 @@
4647
import java.io.FileNotFoundException;
4748
import java.io.IOException;
4849
import java.util.ArrayList;
50+
import java.util.Arrays;
4951
import java.util.EnumSet;
5052
import java.util.LinkedList;
5153
import java.util.List;
@@ -71,6 +73,9 @@ public class CopyCommitter extends FileOutputCommitter {
7173
private final TaskAttemptContext taskAttemptContext;
7274
private boolean syncFolder = false;
7375
private boolean overwrite = false;
76+
77+
private boolean preserveParentDirs = false;
78+
7479
private boolean targetPathExists = true;
7580
private boolean ignoreFailures = false;
7681
private boolean skipCrc = false;
@@ -104,6 +109,7 @@ public void commitJob(JobContext jobContext) throws IOException {
104109
DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
105110
ignoreFailures = conf.getBoolean(
106111
DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
112+
preserveParentDirs = conf.getBoolean(CONF_LABEL_PRESERVE_PARENT_DIRS, false);
107113

108114
if (blocksPerChunk > 0) {
109115
concatFileChunks(conf);
@@ -451,8 +457,8 @@ private void deleteMissing(Configuration conf) throws IOException {
451457
srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
452458
}
453459
Path targetEntry = trgtFileStatus.getPath();
454-
LOG.debug("Comparing {} and {}",
455-
srcFileStatus.getPath(), targetEntry);
460+
LOG.debug("Comparing {} and {}. Source REL PATH: {}, target REL PATH: {}",
461+
srcFileStatus.getPath(), targetEntry, srcRelPath, trgtRelPath);
456462

457463
if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
458464

@@ -552,7 +558,19 @@ private Path listTargetFiles(final Configuration conf,
552558
Path targetFinalPath = new Path(
553559
conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
554560
List<Path> targets = new ArrayList<>(1);
555-
targets.add(targetFinalPath);
561+
boolean preserveParentDirs = conf.getBoolean(CONF_LABEL_PRESERVE_PARENT_DIRS, false);
562+
if(preserveParentDirs){
563+
String sourcePathsStr = conf.get(CONF_LABEL_SOURCE_PATHS);
564+
String[] sourcePaths = StringUtils.split(sourcePathsStr);
565+
LOG.info("sourcePaths: {}", Arrays.toString(sourcePaths));
566+
for(String sourcePath : sourcePaths){
567+
Path source = new Path(sourcePath);
568+
targets.add(new Path(targetFinalPath, source.getName()));
569+
}
570+
}else{
571+
targets.add(targetFinalPath);
572+
}
573+
556574
Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
557575
.toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
558576
? DistCpConstants.RAW_NONE_PATH
@@ -571,9 +589,11 @@ private Path listTargetFiles(final Configuration conf,
571589
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
572590
.withOverwrite(overwrite)
573591
.withSyncFolder(syncFolder)
592+
.withPreserveParentDirs(preserveParentDirs)
574593
.withNumListstatusThreads(threads)
575594
.withUseIterator(useIterator)
576595
.build();
596+
LOG.info("Source directories: {}, target directories: {}", targets, resultNonePath);
577597
DistCpContext distCpContext = new DistCpContext(options);
578598
distCpContext.setTargetPathExists(targetPathExists);
579599

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public void testToString() {
289289
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
290290
"sourcePaths=null, targetPath=xyz, filtersFile='null', " +
291291
"blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
292-
"directWrite=false, useiterator=false}";
292+
"directWrite=false, useiterator=false, preserveParentDirs=false}";
293293
String optionString = option.toString();
294294
Assert.assertEquals(val, optionString);
295295
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,12 +345,12 @@ public void testDeleteMissingInDestination() {
345345

346346
try {
347347
addEntries(listFile, "srcdir");
348-
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
348+
createFiles("srcdir/file1", "srcdir/dir1/file1", "dstdir/file1", "dstdir/file2");
349349

350350
Path target = new Path(root + "/dstdir");
351351
runTest(listFile, target, false, true, true, false);
352352

353-
checkResult(target, 1, "file1");
353+
checkResult(target, 2, "file1","dir1/file1");
354354
} catch (IOException e) {
355355
LOG.error("Exception encountered while running distcp", e);
356356
Assert.fail("distcp failure");
@@ -359,6 +359,32 @@ public void testDeleteMissingInDestination() {
359359
TestDistCpUtils.delete(fs, "target/tmp1");
360360
}
361361
}
362+
363+
364+
@Test
365+
public void testUpdateAndDeleteMissingInDestination() {
366+
367+
try {
368+
addEntries(listFile, "/srcdir/dir1","/srcdir/dir2");
369+
370+
mkdirs(root + "/srcdir/dir1");
371+
mkdirs(root + "/srcdir/dir2");
372+
mkdirs(root + "/srcdir/dir3");
373+
374+
createFiles("srcdir/dir1/file1", "srcdir/dir2/file1", "srcdir/dir3/file1");
375+
createFiles("dstdir/dir1/file1", "dstdir/dir1/file2", "dstdir/dir3/file1");
376+
377+
Path target = new Path(root + "/dstdir");
378+
runTest(listFile, target, true, true, true, false, true);
379+
380+
checkResult(target, 3, "dir1/file1","dir2/file1","dir3/file1");
381+
} catch (IOException e) {
382+
LOG.error("Exception encountered while running distcp", e);
383+
Assert.fail("distcp failure");
384+
} finally {
385+
TestDistCpUtils.delete(fs, root);
386+
}
387+
}
362388

363389
@Test(timeout=100000)
364390
public void testOverwrite() {
@@ -559,11 +585,18 @@ private void runTest(Path listFile, Path target, boolean targetExists,
559585
private void runTest(Path listFile, Path target, boolean targetExists,
560586
boolean sync, boolean delete,
561587
boolean overwrite) throws IOException {
588+
runTest(listFile, target, targetExists, sync, delete, overwrite, false);
589+
}
590+
591+
private void runTest(Path listFile, Path target, boolean targetExists,
592+
boolean sync, boolean delete,
593+
boolean overwrite, boolean preserveDirs) throws IOException {
562594
final DistCpOptions options = new DistCpOptions.Builder(listFile, target)
563595
.withSyncFolder(sync)
564596
.withDeleteMissing(delete)
565597
.withOverwrite(overwrite)
566598
.withNumListstatusThreads(numListstatusThreads)
599+
.withPreserveParentDirs(preserveDirs)
567600
.build();
568601
try {
569602
final DistCp distCp = new DistCp(getConf(), options);

0 commit comments

Comments
 (0)