From 10d0fab726c80e32287c2b9018247a2469d73154 Mon Sep 17 00:00:00 2001 From: swaminathan-b Date: Sat, 11 Jul 2020 23:03:01 +0530 Subject: [PATCH 1/6] HADOOP-17122: Preserving Directory Attributes in DistCp with Atomic Copy --- .../hadoop/tools/mapred/CopyCommitter.java | 8 +++- .../tools/mapred/TestCopyCommitter.java | 42 ++++++++++++++++++- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 139bd08fd7abc..278800740aa74 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -318,8 +318,12 @@ private void preserveFileAttributesForDirectories(Configuration conf) SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(sourceListing)); long totalLen = clusterFS.getFileStatus(sourceListing).getLen(); - - Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); + // + // For Atomic Copy the Final & Work Path are different & atomic copy has + // already moved it to final path. + // + Path targetRoot = + new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); long preservedEntries = 0; try { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 11118c1f72400..fbeada09bb02f 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -160,10 +160,10 @@ public void testPreserveStatus() throws IOException { context.setTargetPathExists(false); CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); - Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + Path listingFile = new Path("/tmp1/" + rand.nextLong()); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); checkDirectoryPermissions(fs, targetBase, sourcePerm); @@ -177,6 +177,44 @@ public void testPreserveStatus() throws IOException { conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); } + } + @Test + public void testPreserveStatusWithAtomicCommit() throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + String sourceBase; + String workBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + FsPermission sourcePerm = new FsPermission((short) 511); + FsPermission initialPerm = new FsPermission((short) 448); + sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm); + workBase = TestDistCpUtils.createTestSetup(fs, initialPerm); + targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); + final DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), new Path("/out")) + .preserve(FileAttribute.PERMISSION).build(); + options.appendToConf(conf); + final DistCpContext context = new DistCpContext(options); + context.setTargetPathExists(false); + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, context); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workBase); + conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); + committer.commitJob(jobContext); + checkDirectoryPermissions(fs, targetBase, sourcePerm); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); + } + } @Test From 166ec387af72ca70b612154d1fc1e47a65647cad Mon Sep 17 00:00:00 2001 From: swaminathan-b Date: Tue, 11 Aug 2020 10:34:55 +0530 Subject: [PATCH 2/6] HADOOP-17122: Addressing Review Comments for Preserving Directory Attributes --- .../tools/mapred/TestCopyCommitter.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index fbeada09bb02f..c3478db9ac839 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -53,6 +53,8 @@ import java.util.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH; +import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH; import static org.apache.hadoop.tools.util.TestDistCpUtils.*; public class TestCopyCommitter { @@ -163,7 +165,7 @@ public void testPreserveStatus() throws IOException { Path listingFile = new Path("/tmp1/" + rand.nextLong()); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); checkDirectoryPermissions(fs, targetBase, sourcePerm); @@ -178,6 +180,7 @@ public void testPreserveStatus() throws IOException { } } + @Test public void testPreserveStatusWithAtomicCommit() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); @@ -195,7 +198,7 @@ public void testPreserveStatusWithAtomicCommit() throws IOException { FsPermission initialPerm = new FsPermission((short) 448); sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm); workBase = TestDistCpUtils.createTestSetup(fs, initialPerm); - targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); + targetBase = "/tmp1/" + rand.nextLong(); final DistCpOptions options = new DistCpOptions.Builder( Collections.singletonList(new Path(sourceBase)), new Path("/out")) .preserve(FileAttribute.PERMISSION).build(); @@ -203,10 +206,10 @@ public void testPreserveStatusWithAtomicCommit() throws IOException { final DistCpContext context = new DistCpContext(options); context.setTargetPathExists(false); CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); - Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + Path listingFile = new Path("/tmp1/" + rand.nextLong()); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); committer.commitJob(jobContext); checkDirectoryPermissions(fs, targetBase, sourcePerm); @@ -245,8 +248,8 @@ public void testDeleteMissing() throws IOException { Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); verifyFoldersAreInSync(fs, targetBase, sourceBase); @@ -294,8 +297,8 @@ public void testPreserveTimeWithDeleteMiss() throws IOException { Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); Path sourceListing = new Path( conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); @@ -358,8 +361,8 @@ public void testDeleteMissingFlatInterleavedFiles() throws IOException { Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); verifyFoldersAreInSync(fs, targetBase, sourceBase); @@ -391,8 +394,8 @@ public void testAtomicCommitMissingFinal() throws IOException { fs = FileSystem.get(conf); fs.mkdirs(new Path(workPath)); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); + conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); assertPathExists(fs, "Work path", new Path(workPath)); @@ -429,8 +432,8 @@ public void testAtomicCommitExistingFinal() throws IOException { fs.mkdirs(new Path(workPath)); fs.mkdirs(new Path(finalPath)); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); + conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); assertPathExists(fs, "Work path", new Path(workPath)); @@ -501,8 +504,8 @@ private void testCommitWithChecksumMismatch(boolean skipCrc) + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); OutputCommitter committer = new CopyCommitter( null, taskAttemptContext); @@ -650,4 +653,4 @@ public RecordReader createRecordReader(InputSplit split, return null; } } -} +} \ No newline at end of file From f97e6f79a943aa76c174300df698af96793a176f Mon Sep 17 00:00:00 2001 From: swaminathan-b Date: Thu, 13 Aug 2020 11:41:52 +0530 Subject: [PATCH 3/6] HADOOP-17122: Restoring new line at the end --- .../java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index c3478db9ac839..39cd225b44588 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -653,4 +653,4 @@ public RecordReader createRecordReader(InputSplit split, return null; } } -} \ No newline at end of file +} From c4ad2f73de56136b31b739d55f918e20cc15abca Mon Sep 17 00:00:00 2001 From: swaminathan-b Date: Thu, 13 Aug 2020 11:48:48 +0530 Subject: [PATCH 4/6] HADOOP-17122: Removing Whitespace from Added New line --- .../java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 39cd225b44588..c998cbd8debd6 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -180,7 +180,7 @@ public void testPreserveStatus() throws IOException { } } - + @Test public void testPreserveStatusWithAtomicCommit() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); From eeff2e61d2838cdfd2322126964a298c6b74bda2 Mon Sep 17 00:00:00 2001 From: swaminathan-b Date: Wed, 19 Aug 2020 14:30:30 +0530 Subject: [PATCH 5/6] HADOOP-17122: Correcting Whitespace checks --- .../java/org/apache/hadoop/tools/mapred/CopyCommitter.java | 2 -- .../java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 278800740aa74..e346d0b938c93 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -318,10 +318,8 @@ private void preserveFileAttributesForDirectories(Configuration conf) SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(sourceListing)); long totalLen = clusterFS.getFileStatus(sourceListing).getLen(); - // // For Atomic Copy the Final & Work Path are different & atomic copy has // already moved it to final path. - // Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index c998cbd8debd6..92bfae2d5fd82 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -180,7 +180,7 @@ public void testPreserveStatus() throws IOException { } } - + @Test public void testPreserveStatusWithAtomicCommit() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); @@ -217,7 +217,6 @@ public void testPreserveStatusWithAtomicCommit() throws IOException { TestDistCpUtils.delete(fs, "/tmp1"); conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); } - } @Test From 987a9aa1a1a039096803c3eea5a2dae8778442b3 Mon Sep 17 00:00:00 2001 From: swaminathan-b Date: Wed, 19 Aug 2020 14:32:24 +0530 Subject: [PATCH 6/6] HADOOP-17122: Correcting Whitespace checks --- .../java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 92bfae2d5fd82..685f030e15ea0 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -184,7 +184,8 @@ public void testPreserveStatus() throws IOException { @Test public void testPreserveStatusWithAtomicCommit() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); - JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); Configuration conf = jobContext.getConfiguration(); String sourceBase;