-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-17428. ABFS: Implementation for getContentSummary #2549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 36 commits
8708856
01aba06
2876a8b
a9960da
30cf195
95d1396
03d342c
bb55b14
a9e94a9
06609da
1433c85
27b6007
d747f06
be2daf0
96cd2b9
e3eaca7
94a95df
48d0607
a10be00
636b434
bc276b2
744f8c4
9070413
657d7ea
041d9bc
9c92338
4be7b19
7a2e218
d21b58a
9b2723b
2378431
fe71af1
fa34b57
aa48086
f320785
be0e94c
2104268
a718cbd
c9d65aa
4003aff
3039f7f
8259a2e
b64b492
16d9436
137627d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,7 @@ | |
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.BlockLocation; | ||
| import org.apache.hadoop.fs.CommonPathCapabilities; | ||
| import org.apache.hadoop.fs.ContentSummary; | ||
| import org.apache.hadoop.fs.CreateFlag; | ||
| import org.apache.hadoop.fs.FSDataInputStream; | ||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||
|
|
@@ -79,6 +80,7 @@ | |
| import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; | ||
| import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; | ||
| import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; | ||
| import org.apache.hadoop.fs.azurebfs.services.ContentSummaryProcessor; | ||
| import org.apache.hadoop.fs.azurebfs.utils.Listener; | ||
| import org.apache.hadoop.fs.azurebfs.utils.TracingContext; | ||
| import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; | ||
|
|
@@ -433,6 +435,31 @@ public boolean delete(final Path f, final boolean recursive) throws IOException | |
|
|
||
| } | ||
|
|
||
| /** | ||
| * Returns a ContentSummary instance containing the count of directories, | ||
| * files and total number of bytes under a given path | ||
| * @param path The given path | ||
| * @return ContentSummary | ||
| * @throws IOException if an error is encountered during listStatus calls | ||
| * or if there is any issue with the thread pool used while processing | ||
| */ | ||
| @Override | ||
| public ContentSummary getContentSummary(Path path) throws IOException { | ||
| try { | ||
| TracingContext tracingContext = new TracingContext(clientCorrelationId, | ||
| fileSystemId, FSOperationType.GET_CONTENT_SUMMARY, true, | ||
| tracingHeaderFormat, listener); | ||
| return (new ContentSummaryProcessor(abfsStore)).getContentSummary(path, | ||
| tracingContext); | ||
| } catch (InterruptedException e) { | ||
| LOG.debug("Thread interrupted"); | ||
| throw new IOException(e); | ||
| } catch(ExecutionException ex) { | ||
| LOG.debug("GetContentSummary failed with error: {}", ex.getMessage()); | ||
| throw new IOException(ex); | ||
|
||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public FileStatus[] listStatus(final Path f) throws IOException { | ||
| LOG.debug( | ||
|
|
@@ -1435,7 +1462,7 @@ Map<String, Long> getInstrumentationMap() { | |
| } | ||
|
|
||
| @VisibleForTesting | ||
| String getFileSystemId() { | ||
| public String getFileSystemId() { | ||
| return fileSystemId; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * <p> | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * <p> | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.services; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.CompletionService; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorCompletionService; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.SynchronousQueue; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import org.apache.hadoop.fs.azurebfs.utils.TracingContext; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.hadoop.fs.ContentSummary; | ||
| import org.apache.hadoop.fs.FileStatus; | ||
| import org.apache.hadoop.fs.Path; | ||
|
|
||
| public class ContentSummaryProcessor { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: javadocs
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added description |
||
| private static final int CORE_POOL_SIZE = 1; | ||
| private static final int MAX_THREAD_COUNT = 16; | ||
| private static final int KEEP_ALIVE_TIME = 5; | ||
| private static final int POLL_TIMEOUT = 100; | ||
| private static final Logger LOG = LoggerFactory.getLogger(ContentSummaryProcessor.class); | ||
| private final AtomicLong fileCount = new AtomicLong(0L); | ||
| private final AtomicLong directoryCount = new AtomicLong(0L); | ||
| private final AtomicLong totalBytes = new AtomicLong(0L); | ||
| private final AtomicInteger numTasks = new AtomicInteger(0); | ||
| private final ListingSupport abfsStore; | ||
| private final ExecutorService executorService = new ThreadPoolExecutor( | ||
| CORE_POOL_SIZE, MAX_THREAD_COUNT, KEEP_ALIVE_TIME, TimeUnit.SECONDS, | ||
| new SynchronousQueue<>()); | ||
| private final CompletionService<Void> completionService = | ||
|
||
| new ExecutorCompletionService<>(executorService); | ||
| private final LinkedBlockingQueue<FileStatus> queue = new LinkedBlockingQueue<>(); | ||
|
|
||
| /** | ||
| * Processes a given path for count of subdirectories, files and total number | ||
| * of bytes | ||
| * @param abfsStore Instance of AzureBlobFileSystemStore, used to make | ||
| * listStatus calls to server | ||
| */ | ||
| public ContentSummaryProcessor(ListingSupport abfsStore) { | ||
| this.abfsStore = abfsStore; | ||
| } | ||
|
|
||
| public ContentSummary getContentSummary(Path path, TracingContext tracingContext) | ||
| throws IOException, ExecutionException, InterruptedException { | ||
| try { | ||
| processDirectoryTree(path, tracingContext); | ||
| while (!queue.isEmpty() || numTasks.get() > 0) { | ||
| try { | ||
| completionService.take().get(); | ||
| } finally { | ||
| numTasks.decrementAndGet(); | ||
| LOG.debug("FileStatus queue size = {}, number of submitted unfinished tasks = {}, active thread count = {}", | ||
| queue.size(), numTasks, ((ThreadPoolExecutor) executorService).getActiveCount()); | ||
| } | ||
| } | ||
| } finally { | ||
| executorService.shutdownNow(); | ||
| LOG.debug("Executor shutdown"); | ||
| } | ||
| LOG.debug("Processed content summary of subtree under given path"); | ||
| ContentSummary.Builder builder = new ContentSummary.Builder() | ||
| .directoryCount(directoryCount.get()).fileCount(fileCount.get()) | ||
| .length(totalBytes.get()).spaceConsumed(totalBytes.get()); | ||
| return builder.build(); | ||
| } | ||
|
|
||
| /** | ||
| * Calls listStatus on given path and populated fileStatus queue with | ||
| * subdirectories. Is called by new tasks to process the complete subtree | ||
| * under a given path | ||
| * @param path: Path to a file or directory | ||
| * @throws IOException: listStatus error | ||
| * @throws InterruptedException: error while inserting into queue | ||
| */ | ||
| private void processDirectoryTree(Path path, TracingContext tracingContext) | ||
| throws IOException, InterruptedException { | ||
| FileStatus[] fileStatuses = abfsStore.listStatus(path, tracingContext); | ||
|
|
||
| for (FileStatus fileStatus : fileStatuses) { | ||
|
||
| if (fileStatus.isDirectory()) { | ||
| queue.put(fileStatus); | ||
| processDirectory(); | ||
| conditionalSubmitTaskToExecutor(tracingContext); | ||
| } else { | ||
| processFile(fileStatus); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void processDirectory() { | ||
| directoryCount.incrementAndGet(); | ||
| } | ||
|
|
||
| /** | ||
| * Increments file count and byte count | ||
| * @param fileStatus: Provides file size to update byte count | ||
| */ | ||
| private void processFile(FileStatus fileStatus) { | ||
| fileCount.incrementAndGet(); | ||
| totalBytes.addAndGet(fileStatus.getLen()); | ||
| } | ||
|
|
||
| /** | ||
| * Submit task for processing a subdirectory based on current size of | ||
| * filestatus queue and number of already submitted tasks | ||
| */ | ||
| private synchronized void conditionalSubmitTaskToExecutor(TracingContext tracingContext) { | ||
| if (!queue.isEmpty() && numTasks.get() < MAX_THREAD_COUNT) { | ||
| numTasks.incrementAndGet(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See if there is a method for simple increment, since you are not using the return value
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't find any op without returning, will have to stick to this |
||
| completionService.submit(() -> { | ||
| FileStatus fileStatus1; | ||
| while ((fileStatus1 = queue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)) | ||
| != null) { | ||
| processDirectoryTree(fileStatus1.getPath(), new TracingContext(tracingContext)); | ||
| } | ||
| return null; | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InterruptedIOException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed