Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public CachingBlockManager(
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.conf = requireNonNull(conf);

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
Expand All @@ -138,7 +139,6 @@ public CachingBlockManager(

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}

Expand Down Expand Up @@ -558,7 +558,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
}

protected BlockCache createCache() {
return new SingleFilePerBlockCache(prefetchingStatistics);
return new SingleFilePerBlockCache(prefetchingStatistics, conf);
}

protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.impl.prefetch;

import java.util.concurrent.TimeUnit;

/**
* Constants used by prefetch implementations.
*/
public final class Constants {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a different name please; too confusing


private Constants() {
}

/**
* Prefetch max blocks count config.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add {@value} here and below so IDE popups show the value

*/
public static final String FS_PREFETCH_MAX_BLOCKS_COUNT = "fs.prefetch.max.blocks.count";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use fs.s3a here for per bucket settings.

how about just pass in the count as a parameter, rather than a full Configuration object?


/**
* Timeout to be used by close, while acquiring prefetch block write lock.
*/
static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;

/**
* Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
*/
static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;

/**
* Default value for max blocks count config.
*/
static final int DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT = 20;

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Preconditions;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
Expand All @@ -61,27 +62,37 @@ public class SingleFilePerBlockCache implements BlockCache {
/**
* Blocks stored in this cache.
*/
private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
private final Map<Integer, Entry> blocks;

/**
* Number of times a block was read from this cache.
* Used for determining cache utilization factor.
* Total max blocks count, to be considered as baseline for LRU cache.
*/
private int numGets = 0;
private final int maxBlocksCount;

private final AtomicBoolean closed;
/**
* The lock to be shared by LRU based linked list updates.
*/
private final ReentrantReadWriteLock blocksLock;

private final PrefetchingStatistics prefetchingStatistics;
/**
* Head of the linked list.
*/
private Entry head;

/**
* Timeout to be used by close, while acquiring prefetch block write lock.
* Tail of the linked list.
*/
private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
private Entry tail;

/**
* Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
* Number of times a block was read from this cache.
* Used for determining cache utilization factor.
*/
private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
private int numGets = 0;

private final AtomicBoolean closed;

private final PrefetchingStatistics prefetchingStatistics;

/**
* File attributes attached to any intermediate temporary file created during index creation.
Expand All @@ -103,13 +114,17 @@ private enum LockType {
READ,
WRITE
}
private Entry previous;
private Entry next;

Entry(int blockNumber, Path path, int size, long checksum) {
this.blockNumber = blockNumber;
this.path = path;
this.size = size;
this.checksum = checksum;
this.lock = new ReentrantReadWriteLock();
this.previous = null;
this.next = null;
}

@Override
Expand Down Expand Up @@ -166,16 +181,40 @@ private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
}
return false;
}

private Entry getPrevious() {
return previous;
}

private void setPrevious(Entry previous) {
this.previous = previous;
}

private Entry getNext() {
return next;
}

private void setNext(Entry next) {
this.next = next;
}
}

/**
* Constructs an instance of a {@code SingleFilePerBlockCache}.
*
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration object.
*/
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, Configuration conf) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
this.maxBlocksCount =
conf.getInt(
Constants.FS_PREFETCH_MAX_BLOCKS_COUNT, Constants.DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT);
Preconditions.checkArgument(this.maxBlocksCount > 0,
Constants.FS_PREFETCH_MAX_BLOCKS_COUNT + " should be more than 0");
blocks = new ConcurrentHashMap<>();
blocksLock = new ReentrantReadWriteLock();
}

/**
Expand Down Expand Up @@ -247,9 +286,47 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
}
numGets++;
addToHeadOfLinkedList(entry);
return entry;
}

/**
* Add the given entry to the head of the linked list.
*
* @param entry Block entry to add.
*/
private void addToHeadOfLinkedList(Entry entry) {
blocksLock.writeLock().lock();
try {
if (head == null) {
head = entry;
tail = entry;
}
LOG.debug(
"Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
entry.blockNumber, head.blockNumber, tail.blockNumber);
if (entry != head) {
Entry prev = entry.getPrevious();
Entry nxt = entry.getNext();
if (prev != null) {
prev.setNext(nxt);
}
if (nxt != null) {
nxt.setPrevious(prev);
}
entry.setPrevious(null);
entry.setNext(head);
head.setPrevious(entry);
head = entry;
if (prev != null && prev.getNext() == null) {
tail = prev;
}
}
} finally {
blocksLock.writeLock().unlock();
}
}

/**
* Puts the given block in this cache.
*
Expand Down Expand Up @@ -278,6 +355,7 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
} finally {
entry.releaseLock(Entry.LockType.READ);
}
addToHeadOfLinkedList(entry);
return;
}

Expand All @@ -299,9 +377,63 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
// entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
// the input stream can lead to the removal of the cache file even before blocks is added with
// the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
// the input stream can lead to the removal of the cache file even before blocks is added
// with the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
addToLinkedListAndEvictIfRequired(entry);
}

/**
* Add the given entry to the head of the linked list and if the LRU cache size
* exceeds the max limit, evict tail of the LRU linked list.
*
* @param entry Block entry to add.
*/
private void addToLinkedListAndEvictIfRequired(Entry entry) {
addToHeadOfLinkedList(entry);
blocksLock.writeLock().lock();
try {
if (blocks.size() > maxBlocksCount && !closed.get()) {
Entry elementToPurge = tail;
tail = tail.getPrevious();
if (tail == null) {
tail = head;
}
tail.setNext(null);
elementToPurge.setPrevious(null);
deleteBlockFileAndEvictCache(elementToPurge);
}
} finally {
blocksLock.writeLock().unlock();
}
}

/**
* Delete cache file as part of the block cache LRU eviction.
*
* @param elementToPurge Block entry to evict.
*/
private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
boolean lockAcquired =
elementToPurge.takeLock(Entry.LockType.WRITE, Constants.PREFETCH_WRITE_LOCK_TIMEOUT,
Constants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there can be a scenario where the current cache exceeds its normal capacity? Is 5 seconds enough time? or are we okay with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are already using 5s at other place also (PREFETCH_WRITE_LOCK_TIMEOUT), used it here as well but happy to change it in future as/if we encounter some problem with this, does that sound good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we are okay with things not blowing up if eviction is not successful, are we okay with it? Can this hurt in the long run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be okay, in fact we have same logic for input stream close as well, if eviction or removal of disk block is unsuccessful, we are leaving them with a fat warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if eviction misses it, stream close would be able to clean it up.
if stream close misses it, then it stays on disk and we might eventually also come up with some "file last accessed" based check and maybe some crons removing them eventually. not a bad idea IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds good

+ " be acquired within {} {}", elementToPurge.path,
Constants.PREFETCH_WRITE_LOCK_TIMEOUT,
Constants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
} else {
try {
if (Files.deleteIfExists(elementToPurge.path)) {
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
} finally {
elementToPurge.releaseLock(Entry.LockType.WRITE);
}
}
}

private static final Set<? extends OpenOption> CREATE_OPTIONS =
Expand Down Expand Up @@ -337,30 +469,37 @@ protected Path getCacheFilePath(final Configuration conf,
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
LOG.debug(getStats());
int numFilesDeleted = 0;

for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue;
}
try {
Files.deleteIfExists(entry.path);
deleteCacheFiles();
}
}

/**
* Delete cache files as part of the close call.
*/
private void deleteCacheFiles() {
int numFilesDeleted = 0;
for (Entry entry : blocks.values()) {
boolean lockAcquired =
entry.takeLock(Entry.LockType.WRITE, Constants.PREFETCH_WRITE_LOCK_TIMEOUT,
Constants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, Constants.PREFETCH_WRITE_LOCK_TIMEOUT,
Constants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue;
}
try {
if (Files.deleteIfExists(entry.path)) {
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}

LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
}
LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
public void testArgChecks() throws Exception {
// Should not throw.
BlockCache cache =
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), CONF);

ByteBuffer buffer = ByteBuffer.allocate(16);

Expand All @@ -55,15 +55,15 @@ public void testArgChecks() throws Exception {


intercept(NullPointerException.class, null,
() -> new SingleFilePerBlockCache(null));
() -> new SingleFilePerBlockCache(null, CONF));

}


@Test
public void testPutAndGet() throws Exception {
BlockCache cache =
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), CONF);

ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
for (byte i = 0; i < BUFFER_SIZE; i++) {
Expand Down
Loading