Skip to content

Commit ae20c64

Browse files
authored
HADOOP-18797. Support Concurrent Writes With S3A Magic Committer (#6006)
Jobs which commit their work to S3 through the magic committer now use a unique magic path containing the job ID: __magic_job-${jobid} This allows for multiple jobs to write to the same destination simultaneously. Contributed by Syed Shameerur Rahman
1 parent 1708df3 commit ae20c64

19 files changed

Lines changed: 125 additions & 109 deletions

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3476,7 +3476,7 @@ public UserGroupInformation getOwner() {
34763476
* directories. Has the semantics of Unix {@code 'mkdir -p'}.
34773477
* Existence of the directory hierarchy is not an error.
34783478
* Parent elements are scanned to see if any are a file,
3479-
* <i>except under __magic</i> paths.
3479+
* <i>except under "MAGIC PATH"</i> paths.
34803480
* There the FS assumes that the destination directory creation
34813481
* did that scan and that paths in job/task attempts are all
34823482
* "well formed"
@@ -4575,7 +4575,7 @@ public boolean isMagicCommitPath(Path path) {
45754575

45764576
/**
45774577
* Predicate: is a path under a magic commit path?
4578-
* True if magic commit is enabled and the path is under __magic,
4578+
* True if magic commit is enabled and the path is under "MAGIC PATH",
45794579
* irrespective of file type.
45804580
* @param path path to examine
45814581
* @return true if the path is in a magic dir and the FS has magic writes enabled.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ private CommitConstants() {
3939
* {@value}.
4040
*/
4141
public static final String MAGIC = "__magic";
42+
public static final String JOB_ID_PREFIX = "job-";
43+
public static final String MAGIC_PATH_PREFIX = MAGIC + "_" + JOB_ID_PREFIX;
4244

4345
/**
4446
* Marker of the start of a directory tree for calculating
@@ -280,10 +282,12 @@ private CommitConstants() {
280282
/**
281283
* Default configuration value for
282284
* {@link #FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS}.
285+
* It is disabled by default to support concurrent writes on the same
286+
* parent directory but different partition/sub directory.
283287
* Value: {@value}.
284288
*/
285289
public static final boolean DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS =
286-
true;
290+
false;
287291

288292
/**
289293
* The limit to the number of committed objects tracked during

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
2626
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
2727

28-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
2928
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
29+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
3030

3131
/**
3232
* These are internal constants not intended for public use.
@@ -108,7 +108,7 @@ private InternalCommitterConstants() {
108108

109109
/** Error message for a path without a magic element in the list: {@value}. */
110110
public static final String E_NO_MAGIC_PATH_ELEMENT
111-
= "No " + MAGIC + " element in path";
111+
= "No " + MAGIC_PATH_PREFIX + " element in path";
112112

113113
/**
114114
* The UUID for jobs: {@value}.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.List;
24+
import java.util.Optional;
25+
import java.util.stream.IntStream;
2426

2527
import org.apache.hadoop.fs.Path;
2628
import org.apache.hadoop.util.StringUtils;
2729

2830
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
31+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
2932
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
30-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
3133
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT;
3234

3335
/**
@@ -76,7 +78,8 @@ public static List<String> splitPathToElements(Path path) {
7678
* @return true if a path is considered magic
7779
*/
7880
public static boolean isMagicPath(List<String> elements) {
79-
return elements.contains(MAGIC);
81+
return elements.stream()
82+
.anyMatch(element -> element.startsWith(MAGIC_PATH_PREFIX));
8083
}
8184

8285
/**
@@ -96,9 +99,16 @@ public static boolean containsBasePath(List<String> elements) {
9699
* @throws IllegalArgumentException if there is no magic element
97100
*/
98101
public static int magicElementIndex(List<String> elements) {
99-
int index = elements.indexOf(MAGIC);
100-
checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT);
101-
return index;
102+
Optional<Integer> index = IntStream.range(0, elements.size())
103+
.filter(i -> elements.get(i).startsWith(MAGIC_PATH_PREFIX))
104+
.boxed()
105+
.findFirst();
106+
107+
if (index.isPresent()) {
108+
return index.get();
109+
} else {
110+
throw new IllegalArgumentException(E_NO_MAGIC_PATH_ELEMENT);
111+
}
102112
}
103113

104114
/**
@@ -182,18 +192,9 @@ public static String lastElement(List<String> strings) {
182192
return strings.get(strings.size() - 1);
183193
}
184194

185-
/**
186-
* Get the magic subdirectory of a destination directory.
187-
* @param destDir the destination directory
188-
* @return a new path.
189-
*/
190-
public static Path magicSubdir(Path destDir) {
191-
return new Path(destDir, MAGIC);
192-
}
193-
194195
/**
195196
* Calculates the final destination of a file.
196-
* This is the parent of any {@code __magic} element, and the filename
197+
* This is the parent of any "MAGIC PATH" element, and the filename
197198
* of the path. That is: all intermediate child path elements are discarded.
198199
* Why so? paths under the magic path include job attempt and task attempt
199200
* subdirectories, which need to be skipped.
@@ -208,8 +209,8 @@ public static List<String> finalDestination(List<String> elements) {
208209
if (isMagicPath(elements)) {
209210
List<String> destDir = magicPathParents(elements);
210211
List<String> children = magicPathChildren(elements);
211-
checkArgument(!children.isEmpty(), "No path found under " +
212-
MAGIC);
212+
checkArgument(!children.isEmpty(), "No path found under the prefix " +
213+
MAGIC_PATH_PREFIX);
213214
ArrayList<String> dest = new ArrayList<>(destDir);
214215
if (containsBasePath(children)) {
215216
// there's a base marker in the path

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitUtilsWithMR.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import org.apache.hadoop.mapreduce.JobID;
2727
import org.apache.hadoop.mapreduce.MRJobConfig;
2828
import org.apache.hadoop.mapreduce.TaskAttemptContext;
29+
import org.apache.hadoop.util.Preconditions;
2930

3031
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
31-
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
32+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.JOB_ID_PREFIX;
33+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
3234
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
3335

3436
/**
@@ -49,10 +51,13 @@ private CommitUtilsWithMR() {
4951
/**
5052
* Get the location of magic job attempts.
5153
* @param out the base output directory.
54+
* @param jobUUID unique Job ID.
5255
* @return the location of magic job attempts.
5356
*/
54-
public static Path getMagicJobAttemptsPath(Path out) {
55-
return new Path(out, MAGIC);
57+
public static Path getMagicJobAttemptsPath(Path out, String jobUUID) {
58+
Preconditions.checkArgument(jobUUID != null && !(jobUUID.isEmpty()),
59+
"Invalid job ID: %s", jobUUID);
60+
return new Path(out, MAGIC_PATH_PREFIX + jobUUID);
5661
}
5762

5863
/**
@@ -76,7 +81,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
7681
int appAttemptId,
7782
Path dest) {
7883
return new Path(
79-
getMagicJobAttemptsPath(dest),
84+
getMagicJobAttemptsPath(dest, jobUUID),
8085
formatAppAttemptDir(jobUUID, appAttemptId));
8186
}
8287

@@ -88,9 +93,7 @@ public static Path getMagicJobAttemptPath(String jobUUID,
8893
*/
8994
public static Path getMagicJobPath(String jobUUID,
9095
Path dest) {
91-
return new Path(
92-
getMagicJobAttemptsPath(dest),
93-
formatJobDir(jobUUID));
96+
return getMagicJobAttemptsPath(dest, jobUUID);
9497
}
9598

9699
/**
@@ -102,7 +105,7 @@ public static Path getMagicJobPath(String jobUUID,
102105
*/
103106
public static String formatJobDir(
104107
String jobUUID) {
105-
return String.format("job-%s", jobUUID);
108+
return JOB_ID_PREFIX + jobUUID;
106109
}
107110

108111
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ public void setupJob(JobContext context) throws IOException {
105105
Path jobPath = getJobPath();
106106
final FileSystem destFS = getDestinationFS(jobPath,
107107
context.getConfiguration());
108-
destFS.delete(jobPath, true);
109108
destFS.mkdirs(jobPath);
110109
}
111110
}
@@ -132,7 +131,7 @@ protected ActiveCommit listPendingUploadsToCommit(
132131
*/
133132
public void cleanupStagingDirs() {
134133
final Path out = getOutputPath();
135-
Path path = magicSubdir(out);
134+
Path path = getMagicJobPath(getUUID(), out);
136135
try(DurationInfo ignored = new DurationInfo(LOG, true,
137136
"Deleting magic directory %s", path)) {
138137
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,12 +1308,12 @@ so returning the special new stream.
13081308

13091309

13101310

1311-
This is done with a "magic" temporary directory name, `__magic`, to indicate that all files
1311+
This is done with a "MAGIC PATH" (where the filesystem knows to remap paths with prefix `__magic_job-${jobId}`) temporary directory name to indicate that all files
13121312
created under this path are not to be completed during the stream write process.
13131313
Directories created under the path will still be created —this allows job- and
13141314
task-specific directories to be created for individual job and task attempts.
13151315

1316-
For example, the pattern `__magic/${jobID}/${taskId}` could be used to
1316+
For example, the pattern `${MAGIC PATH}/${jobID}/${taskId}` could be used to
13171317
store pending commits to the final directory for that specific task. If that
13181318
task is committed, all pending commit files stored in that path will be loaded
13191319
and used to commit the final uploads.
@@ -1322,19 +1322,19 @@ Consider a job with the final directory `/results/latest`
13221322

13231323
The intermediate directory for the task 01 attempt 01 of job `job_400_1` would be
13241324

1325-
/results/latest/__magic/job_400_1/_task_01_01
1325+
/results/latest/__magic_job-400/job_400_1/_task_01_01
13261326

13271327
This would be returned as the temp directory.
13281328

13291329
When a client attempted to create the file
1330-
`/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate
1330+
`/results/latest/__magic_job-400/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate
13311331
a multipart request with the final destination of `/results/latest/latest.orc.lzo`.
13321332

13331333
As data was written to the output stream, it would be incrementally uploaded as
13341334
individual multipart PUT operations
13351335

13361336
On `close()`, summary data would be written to the file
1337-
`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
1337+
`/results/latest/__magic_job-400/job400_1/task_01_01/latest.orc.lzo.pending`.
13381338
This would contain the upload ID and all the parts and etags of uploaded data.
13391339

13401340
A marker file is also created, so that code which verifies that a newly created file
@@ -1358,7 +1358,7 @@ to the job attempt.
13581358
1. These are merged into to a single `Pendingset` structure.
13591359
1. Which is saved to a `.pendingset` file in the job attempt directory.
13601360
1. Finally, the task attempt directory is deleted. In the example, this
1361-
would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`;
1361+
would be to `/results/latest/__magic_job-400/job400_1/task_01_01.pendingset`;
13621362

13631363

13641364
A failure to load any of the single pending upload files (i.e. the file
@@ -1386,9 +1386,9 @@ file.
13861386

13871387
To allow tasks to generate data in subdirectories, a special filename `__base`
13881388
will be used to provide an extra cue as to the final path. When mapping an output
1389-
path `/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending`
1389+
path `/results/latest/__magic_job-400/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending`
13901390
to a final destination path, the path will become `/results/latest/2017/2017-01-01.orc.lzo`.
1391-
That is: all directories between `__magic` and `__base` inclusive will be ignored.
1391+
That is: all directories between `__magic_job-400` and `__base` inclusive will be ignored.
13921392

13931393

13941394
**Issues**
@@ -1479,16 +1479,16 @@ Job drivers themselves may be preempted.
14791479

14801480
One failure case is that the entire execution framework failed; a new process
14811481
must identify outstanding jobs with pending work, and abort them, then delete
1482-
the appropriate `__magic` directories.
1482+
the appropriate `"MAGIC PATH"` directories.
14831483

1484-
This can be done either by scanning the directory tree for `__magic` directories
1484+
This can be done either by scanning the directory tree for `"MAGIC PATH"` directories
14851485
and scanning underneath them, or by using the `listMultipartUploads()` call to
14861486
list multipart uploads under a path, then cancel them. The most efficient solution
14871487
may be to use `listMultipartUploads` to identify all outstanding request, and use that
1488-
to identify which requests to cancel, and where to scan for `__magic` directories.
1488+
to identify which requests to cancel, and where to scan for `"MAGIC PATH"` directories.
14891489
This strategy should address scalability problems when working with repositories
14901490
with many millions of objects —rather than list all keys searching for those
1491-
with `/__magic/**/*.pending` in their name, work backwards from the active uploads to
1491+
with `/${MAGIC PATH}/**/*.pending` in their name, work backwards from the active uploads to
14921492
the directories with the data.
14931493

14941494
We may also want to consider having a cleanup operation in the S3 CLI to
@@ -1569,11 +1569,11 @@ a directory, then it is not going to work: the existing data will not be cleaned
15691569
up. A cleanup operation would need to be included in the job commit, deleting
15701570
all files in the destination directory which where not being overwritten.
15711571

1572-
1. It requires a path element, such as `__magic` which cannot be used
1572+
1. It requires a path element, such as `"MAGIC PATH"` which cannot be used
15731573
for any purpose other than for the storage of pending commit data.
15741574

15751575
1. Unless extra code is added to every FS operation, it will still be possible
1576-
to manipulate files under the `__magic` tree. That's not bad, just potentially
1576+
to manipulate files under the `"MAGIC PATH"` tree. That's not bad, just potentially
15771577
confusing.
15781578

15791579
1. As written data is not materialized until the commit, it will not be possible
@@ -1692,9 +1692,9 @@ must be used, which means: the V2 classes.
16921692
#### Resolved issues
16931693

16941694

1695-
**Magic Committer: Name of directory**
1695+
**Magic Committer: Directory Naming**
16961696

1697-
The design proposes the name `__magic` for the directory. HDFS and
1697+
The design proposes `__magic_job-` as the prefix for the magic paths of different jobs for the directory. HDFS and
16981698
the various scanning routines always treat files and directories starting with `_`
16991699
as temporary/excluded data.
17001700

@@ -1705,14 +1705,14 @@ It is legal to create subdirectories in a task work directory, which
17051705
will then be moved into the destination directory, retaining that directory
17061706
tree.
17071707

1708-
That is, a if the task working dir is `dest/__magic/app1/task1/`, all files
1709-
under `dest/__magic/app1/task1/part-0000/` must end up under the path
1708+
That is, a if the task working dir is `dest/${MAGIC PATH}/app1/task1/`, all files
1709+
under `dest/${MAGIC PATH}/app1/task1/part-0000/` must end up under the path
17101710
`dest/part-0000/`.
17111711

17121712
This behavior is relied upon for the writing of intermediate map data in an MR
17131713
job.
17141714

1715-
This means it is not simply enough to strip off all elements of under `__magic`,
1715+
This means it is not simply enough to strip off all elements of under ``"MAGIC PATH"``,
17161716
it is critical to determine the base path.
17171717

17181718
Proposed: use the special name `__base` as a marker of the base element for
@@ -1918,9 +1918,9 @@ bandwidth and the data upload bandwidth.
19181918

19191919
No use is made of the cluster filesystem; there are no risks there.
19201920

1921-
A malicious user with write access to the `__magic` directory could manipulate
1921+
A malicious user with write access to the ``"MAGIC PATH"`` directory could manipulate
19221922
or delete the metadata of pending uploads, or potentially inject new work int
1923-
the commit. Having access to the `__magic` directory implies write access
1923+
the commit. Having access to the ``"MAGIC PATH"`` directory implies write access
19241924
to the parent destination directory: a malicious user could just as easily
19251925
manipulate the final output, without needing to attack the committer's intermediate
19261926
files.

0 commit comments

Comments
 (0)