Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public CachingBlockManager(
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.conf = requireNonNull(conf);

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
Expand All @@ -138,7 +139,6 @@ public CachingBlockManager(

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}

Expand Down Expand Up @@ -558,7 +558,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
}

protected BlockCache createCache() {
return new SingleFilePerBlockCache(prefetchingStatistics);
return new SingleFilePerBlockCache(prefetchingStatistics, conf);
}

protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Preconditions;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
Expand All @@ -61,7 +62,27 @@ public class SingleFilePerBlockCache implements BlockCache {
/**
* Blocks stored in this cache.
*/
private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
private final Map<Integer, Entry> blocks;

/**
* Total max blocks count, to be considered as baseline for LRU cache.
*/
private final int maxBlocksCount;

/**
* The lock to be shared by LRU based linked list updates.
*/
private final ReentrantReadWriteLock blocksLock;

/**
* Head of the linked list.
*/
private Entry head;

/**
* Tail of the lined list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "linked"

*/
private Entry tail;

/**
* Number of times a block was read from this cache.
Expand Down Expand Up @@ -89,6 +110,16 @@ public class SingleFilePerBlockCache implements BlockCache {
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);

/**
* Prefetch max blocks count config.
*/
public static final String FS_PREFETCH_MAX_BLOCKS_COUNT = "fs.prefetch.max.blocks.count";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a Constants class where we can move this to?


/**
* Default value for max blocks count config.
*/
private static final int DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT = 20;

/**
* Cache entry.
* Each block is stored as a separate file.
Expand All @@ -103,13 +134,17 @@ private enum LockType {
READ,
WRITE
}
private Entry previous;
private Entry next;

Entry(int blockNumber, Path path, int size, long checksum) {
this.blockNumber = blockNumber;
this.path = path;
this.size = size;
this.checksum = checksum;
this.lock = new ReentrantReadWriteLock();
this.previous = null;
this.next = null;
}

@Override
Expand Down Expand Up @@ -166,16 +201,39 @@ private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
}
return false;
}

private Entry getPrevious() {
return previous;
}

private void setPrevious(Entry previous) {
this.previous = previous;
}

private Entry getNext() {
return next;
}

private void setNext(Entry next) {
this.next = next;
}
}

/**
* Constructs an instance of a {@code SingleFilePerBlockCache}.
*
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration object.
*/
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, Configuration conf) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
this.maxBlocksCount =
conf.getInt(FS_PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT);
Preconditions.checkArgument(this.maxBlocksCount > 0,
"prefetch blocks total capacity should be more than 0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include the property name in the error message by which we can set this to a valid value

blocks = new ConcurrentHashMap<>();
blocksLock = new ReentrantReadWriteLock();
}

/**
Expand Down Expand Up @@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
}
numGets++;
addToHeadOfLinkedList(entry);
return entry;
}

/**
* Add the given entry to the head of the linked list.
*
* @param entry Block entry to add.
*/
private void addToHeadOfLinkedList(Entry entry) {
blocksLock.writeLock().lock();
try {
if (head == null) {
head = entry;
tail = entry;
}
if (entry != head) {
Entry prev = entry.getPrevious();
Entry nxt = entry.getNext();
if (prev != null) {
prev.setNext(nxt);
}
if (nxt != null) {
nxt.setPrevious(prev);
}
entry.setPrevious(null);
entry.setNext(head);
head.setPrevious(entry);
head = entry;
}
if (tail != null) {
while (tail.getNext() != null) {
tail = tail.getNext();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit about this part, not able to get why this is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let's say:

head -> 1, tail -> 2

new block: 3
so, we need to make: 3 -> 1 -> 2
i.e. new head -> 3, tail -> 2

new block: 4
updated list: 4 -> 3 -> 1 -> 2

now let's say input stream accesses block 2, hence block 2 needs to be put at the head.
new list should be: 2 -> 4 -> 3 -> 1

we change head to 2 and we also update next pointer of block 1
however, if we don't update tail (L322-L326), we will not be able to move tail to block 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but when we are adding the node to the head, doesn't it make more sense to check if the current node is tail, get the previous of this, and set that to tail? This would work, was just interested if we can avoid traversing the list 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, nice optimization for sure, let me double check this again, i ran through multiple test iterations and somehow found that this works for sure but let me check if the optimization works (i think it should but i am just wondering if i am missing some cases).

Thank you btw @mehakmeet !!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i applied this patch temporarily to debug further but somehow head and tail are getting screwed up:

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
index ef685b54d30..1aad82ff9c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
@@ -306,6 +306,7 @@ private void addToHeadOfLinkedList(Entry entry) {
           "Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
           entry.blockNumber, head.blockNumber, tail.blockNumber);
       if (entry != head) {
+        boolean isEntryTail = entry == tail;
         Entry prev = entry.getPrevious();
         Entry nxt = entry.getNext();
         if (prev != null) {
@@ -318,10 +319,8 @@ private void addToHeadOfLinkedList(Entry entry) {
         entry.setNext(head);
         head.setPrevious(entry);
         head = entry;
-      }
-      if (tail != null) {
-        while (tail.getNext() != null) {
-          tail = tail.getNext();
+        if (isEntryTail) {
+          tail = prev;
         }
       }
     } finally {

however, somehow after eviction, the head and tail are getting screwed up, still trying to understand what is going wrong and why this patch would not work.
but i hope you were suggestion change like this one, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it works with slight modification, let me push the change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, happy with the new change I think. Would be good to explicitly test certain tail changing scenarios in the IT like you mentioned above if we are not already doing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i was actually thinking about it but there is just not clean way of doing so, hence what i have been able to do so far was by "logging" head and tail nodes (as you also mentioned earlier) with all other nodes, so that i could track the exact nodes sequence. that's the best way i could find so far, but extracting that info in IT is really difficult (if we were to do it in clean way).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I was thinking, would it be possible via a simple UT as well, where we pass in the entries as we desire and access them in our preferences to test functionality, might be easier if we directly test the LRU logic than via the stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice idea, it might be more beneficial for UT to test this.

i am also planning to refactor Entry class on it's own new class rather than as an inner class of SingleFilePerBlockCache as part of next follow-up sub-task. once we do that, then it might be even more easier to write some UT to directly access head, tail pointers.

sorry, i was thinking this as sub-task so maybe adding UT can also be done with sub-task. does that sound good?

} finally {
blocksLock.writeLock().unlock();
}
}

/**
* Puts the given block in this cache.
*
Expand Down Expand Up @@ -278,6 +373,7 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
} finally {
entry.releaseLock(Entry.LockType.READ);
}
addToHeadOfLinkedList(entry);
return;
}

Expand All @@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
// entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
// the input stream can lead to the removal of the cache file even before blocks is added with
// the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
// the input stream can lead to the removal of the cache file even before blocks is added
// with the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
addToLinkedListAndEvictIfRequired(entry);
}

/**
* Add the given entry to the head of the linked list and if the LRU cache size
* exceeds the max limit, evict tail of the LRU linked list.
*
* @param entry Block entry to add.
*/
private void addToLinkedListAndEvictIfRequired(Entry entry) {
addToHeadOfLinkedList(entry);
blocksLock.writeLock().lock();
try {
if (blocks.size() > maxBlocksCount && !closed.get()) {
Entry elementToPurge = tail;
tail = tail.getPrevious();
if (tail == null) {
tail = head;
}
tail.setNext(null);
elementToPurge.setPrevious(null);
deleteBlockFileAndEvictCache(elementToPurge);
}
} finally {
blocksLock.writeLock().unlock();
}
}

/**
* Delete cache file as part of the block cache LRU eviction.
*
* @param elementToPurge Block entry to evict.
*/
private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
boolean lockAcquired =
elementToPurge.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, there can be a scenario where the current cache exceeds its normal capacity? Is 5 seconds enough time? or are we okay with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are already using 5s at other place also (PREFETCH_WRITE_LOCK_TIMEOUT), used it here as well but happy to change it in future as/if we encounter some problem with this, does that sound good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we are okay with things not blowing up if eviction is not successful, are we okay with it? Can this hurt in the long run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be okay, in fact we have same logic for input stream close as well, if eviction or removal of disk block is unsuccessful, we are leaving them with a fat warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if eviction misses it, stream close would be able to clean it up.
if stream close misses it, then it stays on disk and we might eventually also come up with some "file last accessed" based check and maybe some crons removing them eventually. not a bad idea IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds good

+ " be acquired within {} {}", elementToPurge.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
} else {
try {
if (Files.deleteIfExists(elementToPurge.path)) {
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
} finally {
elementToPurge.releaseLock(Entry.LockType.WRITE);
}
}
}

private static final Set<? extends OpenOption> CREATE_OPTIONS =
Expand Down Expand Up @@ -337,30 +486,36 @@ protected Path getCacheFilePath(final Configuration conf,
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
LOG.debug(getStats());
int numFilesDeleted = 0;
deleteCacheFiles();
}
}

for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
/**
* Delete cache files as part of the close call.
*/
private void deleteCacheFiles() {
int numFilesDeleted = 0;
for (Entry entry : blocks.values()) {
boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
continue;
}
try {
Files.deleteIfExists(entry.path);
continue;
}
try {
if (Files.deleteIfExists(entry.path)) {
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", entry.path, e);
} finally {
entry.releaseLock(Entry.LockType.WRITE);
}

LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
}
LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
public void testArgChecks() throws Exception {
// Should not throw.
BlockCache cache =
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), CONF);

ByteBuffer buffer = ByteBuffer.allocate(16);

Expand All @@ -55,15 +55,15 @@ public void testArgChecks() throws Exception {


intercept(NullPointerException.class, null,
() -> new SingleFilePerBlockCache(null));
() -> new SingleFilePerBlockCache(null, CONF));

}


@Test
public void testPutAndGet() throws Exception {
BlockCache cache =
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), CONF);

ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
for (byte i = 0; i < BUFFER_SIZE; i++) {
Expand Down
Loading