Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,11 @@ public boolean hasPathCapability(final Path path, final String capability)
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
listener));

// probe for presence of HADOOP-18546 fix.
case "hadoop-18546":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming the probe on a Hadoop Jira makes it a little difficult to understand it from the code directly. Should we have a general name for the probe related to the prefetch inconsistent reads and have the Hadoop jira mentioned in the comments only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was trying to think of one but it is a "is this fix in". that way, if we need to add another we can ask for that as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but having a probe for the fix "fs.azure.readahead.safe" would let us move to a different fix while retaining the same probe...

return true;

default:
return super.hasPathCapability(p, capability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,9 +828,10 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append("[HADOOP-18546]");
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append(", ").append(streamStatistics);
sb.append("}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closing bracket of the log should be outside the statistics if block

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.
append(","), should also move inside if block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, didn't understand at first, but yes, you are absolutely correct!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mehakmeet's comment is still vaild I guess.
Shouldn't sb.append("}"); L835 go outside of if block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, fixing

}
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void init() {

// hide instance constructor
private ReadBufferManager() {
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,23 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;

public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {

/**
* Time before the JUnit test times out for eventually() clauses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: causes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. eventually takes a lambda expression, which is what i was referring to

* to fail. This copes with slow network connections and debugging
* sessions, yet still allows for tests to fail with meaningful
* messages.
*/
public static final int TIMEOUT_OFFSET = 5 * 60_000;

/**
* Interval between eventually preobes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "probes"

*/
public static final int PROBE_INTERVAL_MILLIS = 1_000;

public ITestReadBufferManager() throws Exception {
}

Expand All @@ -61,6 +75,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
// verify that the fs has the capability to validate the fix
Assertions.assertThat(fs.hasPathCapability(new Path("/"), "HADOOP-18546"))
.describedAs("path capability \"HADOOP-18546\" in %s", fs)
.isTrue();

try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
Expand All @@ -80,9 +99,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}

ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
// verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
// readahead queue is empty
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
// verify the in progress list eventually empties out.
eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () ->
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()));
}

private void assertListEmpty(String listName, List<ReadBuffer> list) {
Expand Down