Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public FSDataOutputStream create(final Path f,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withBlockFactory(blockFactory)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withPosition(position)
Expand Down Expand Up @@ -1940,6 +1940,11 @@ void setClient(AbfsClient client) {
this.client = client;
}

@VisibleForTesting
DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}

@VisibleForTesting
void setNamespaceEnabled(Trilean isNamespaceEnabled){
this.isNamespaceEnabled = isNamespaceEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
return null;
} finally {
IOUtils.close(blockUploadData);
blockToUpload.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, because the line above calls BlockUploadData.close(), a lot of the cleanup should take place already; that .startUpload() call on L315 sets the blockToUpload.buffer ref to null, so that reference doesn't retain a hold on the bytebuffer.

this is why we haven't seen problems yet like OOM/disk storage...cleanup was happening.

can you review the code to make sure the sequence of L348 always does the right thing and not fail due to some attempted double delete of the file...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the comment.

can you review the code to make sure the sequence of L348 always does the right thing and not fail due to some attempted double delete of the file...:

  1. disk:
    1. BlockUploadData.close() -> deletes the file:
      LOG.debug("File deleted in BlockUploadData close: {}", file.delete());
    2. blockToUpload.close() -> innerClose() -> closeBlock() -> tries to delete the file (java.io.File#delete returns true in case path in the obj deleted succesfully, false otherwise, ref: https://docs.oracle.com/javase/8/docs/api/java/io/File.html#delete--) :
  2. bytebuffer:
    1. What BlockUploadData contains: ByteBufferInputStream of blockBuffer :
    2. BlockUploadData.close() -> closes the ByteBufferInputSteam : -> unset the bytebuffer variable in the inputStream :
    3. BlockToUpload.close() -> releases the buffer : -> adds back the buffer in pool:
  3. ByteArrayBlock:
    1. BlockUploadData.close() -> closes the byteArrayInputStream(its wrapper around the buffer array): does nothing.
    2. BlockToUpload.close() -> does nothing:
      protected void innerClose() {
      buffer = null;
      blockReleased();
      }

For checking that this flow works correctly on all the types of dataBuffers, have made change in the testCloseOfDataBlockOnAppendComplete. This will also help keep check the sanity in future on any change in the databuffer code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe in case of file delete, as java.io.File will return false in case file is not there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blockToUpload in L347 alongside blockUploadData, for consistency and also checks for null.

}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,28 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;

import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.store.BlockUploadStatistics;
import org.apache.hadoop.fs.store.DataBlocks;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;

/**
* Test append operations.
Expand Down Expand Up @@ -90,4 +103,38 @@ public void testTracingForAppend() throws IOException {
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, 10);
}

@Test
public void testCloseOfDataBlockOnAppendComplete() throws Exception {
Set<String> blockBufferTypes = new HashSet<>();
blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
for (String blockBufferType : blockBufferTypes) {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
Mockito.doAnswer(getBlobFactoryInvocation -> {
DataBlocks.BlockFactory factory = Mockito.spy(
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
Mockito.doAnswer(factoryCreateInvocation -> {
dataBlock[0] = Mockito.spy(
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
return dataBlock[0];
})
.when(factory)
.create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
BlockUploadStatistics.class));
return factory;
}).when(store).getBlockFactory();
OutputStream os = fs.create(new Path("/file_" + blockBufferType));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try with resources or try-finally to close the stream (Off chance we see failure after creating, we clean up)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "getMethodName()" instead of hardcoding "/file"

os.write(new byte[1]);
os.close();
Mockito.verify(dataBlock[0], Mockito.times(1)).close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an assertion here that dataBlock[0] is closed.
*Suggestion, getState() can be used to verify the state of the block is in Closed state (May need to make the getter public). We can even add an assertion after L135 to assert it's in Writing state as well.

}
}
}