Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -667,39 +667,30 @@ public void rename2(final String src, final String dst,
public void concat(String trg, String[] src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);

// See if the src and target files are all in the same namespace
LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
if (targetBlocks == null) {
throw new IOException("Cannot locate blocks for target file - " + trg);
}
LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
for (String source : src) {
LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
if (sourceBlocks == null) {
throw new IOException(
"Cannot located blocks for source file " + source);
}
String sourceBlockPoolId =
sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
throw new IOException("Cannot concatenate source file " + source
+ " because it is located in a different namespace"
+ " with block pool id " + sourceBlockPoolId
+ " from the target file with block pool id "
+ targetBlockPoolId);
}
// Concat only effects when all files in the same namespace.
RemoteResult<RemoteLocation, HdfsFileStatus> trgResult = getFileRemoteResult(trg);
if (trgResult.getResult() == null) {
throw new IOException("Cannot find target file - " + trg);
}
RemoteLocation targetDestination = trgResult.getLocation();
String targetNameService = trgResult.getLocation().getNameserviceId();

// Find locations in the matching namespace.
final RemoteLocation targetDestination =
rpcServer.getLocationForPath(trg, true, targetBlockPoolId);
String[] sourceDestinations = new String[src.length];
for (int i = 0; i < src.length; i++) {
String sourceFile = src[i];
RemoteLocation location =
rpcServer.getLocationForPath(sourceFile, true, targetBlockPoolId);
sourceDestinations[i] = location.getDest();
RemoteResult<RemoteLocation, HdfsFileStatus> srcResult = getFileRemoteResult(sourceFile);
if (srcResult.getResult() == null) {
throw new IOException("Cannot find source file - " + sourceFile);
}
RemoteLocation srcLocation = srcResult.getLocation();
sourceDestinations[i] = srcLocation.getDest();

if (!targetNameService.equals(srcLocation.getNameserviceId())) {
throw new IOException("Cannot concatenate source file " + sourceFile
+ " because it is located in a different namespace" + " with nameservice "
+ srcLocation.getNameserviceId() + " from the target file with nameservice "
+ targetNameService);
}
}
// Invoke
RemoteMethod method = new RemoteMethod("concat",
Expand Down Expand Up @@ -1009,6 +1000,20 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
return ret;
}

public RemoteResult<RemoteLocation, HdfsFileStatus> getFileRemoteResult(String path)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);

final List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false, false);
RemoteMethod method =
new RemoteMethod("getFileInfo", new Class<?>[] {String.class}, new RemoteParam());
// Check for file information sequentially
RemoteResult<RemoteLocation, HdfsFileStatus> result =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If locations only contains one namespace, we can returns this namespace directly instead of getting the namespace through getFileInfo, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be true. Even locations only contains one namespace, it still cannot decide whether the file exists or not. So getFileInfo is better to execute at least once. Or it will send to namenode and namenode throw file is not found.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RBF is simply responsible for locating the downstream namespace and then proxying the request.
So if the input path is only mounted to one namespace, RBF only needs to proxy it directly. RBF does not need to check if the file exists in this only one downstream namespace, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

rpcClient.invokeSequential(method, locations, HdfsFileStatus.class, null);

return result;
}

@Override
public boolean isFileClosed(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,42 +1697,6 @@ public Long getNextSPSPath() throws IOException {
return nnProto.getNextSPSPath();
}

/**
* Locate the location with the matching block pool id.
*
* @param path Path to check.
* @param failIfLocked Fail the request if locked (top mount point).
* @param blockPoolId The block pool ID of the namespace to search for.
* @return Prioritized list of locations in the federated cluster.
* @throws IOException if the location for this path cannot be determined.
*/
protected RemoteLocation getLocationForPath(
String path, boolean failIfLocked, String blockPoolId)
throws IOException {

final List<RemoteLocation> locations =
getLocationsForPath(path, failIfLocked);

String nameserviceId = null;
Set<FederationNamespaceInfo> namespaces =
this.namenodeResolver.getNamespaces();
for (FederationNamespaceInfo namespace : namespaces) {
if (namespace.getBlockPoolId().equals(blockPoolId)) {
nameserviceId = namespace.getNameserviceId();
break;
}
}
if (nameserviceId != null) {
for (RemoteLocation location : locations) {
if (location.getNameserviceId().equals(nameserviceId)) {
return location;
}
}
}
throw new IOException(
"Cannot locate a nameservice for block pool " + blockPoolId);
}

/**
* Get the possible locations of a path in the federated cluster.
* During the get operation, it will do the quota verification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,17 @@ public void testProxyConcatFile() throws Exception {
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, new String[] {routerFile}});

// Test when concat trg is a empty file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to check the empty source file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is a empty source file, it will throw exception in namenode. This behaiver is as same as with dfsrouter.
And when trg is a empty file, it is diffrent . Without dfsrouter ,it success. And with dfsrouter, it will throw Exception.
So I think there is no need to check the empty source file. Or implement it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the namenode and rbf throw the same Exception?

Maybe RBF throws NPE, but NN throws org.apache.hadoop.HadoopIllegalArgumentException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you modify the UT to cover the case that one or more source files are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test for a empty src file

createFile(routerFS, existingFile, existingFileSize);
String sameRouterEmptyFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_newemptyfile";
createFile(routerFS, sameRouterEmptyFile, 0);
// Concat in same namespaces, succeeds
testConcat(existingFile, sameRouterEmptyFile, false);
FileStatus mergedStatus = getFileStatus(routerFS, sameRouterEmptyFile);
assertEquals(existingFileSize, mergedStatus.getLen());
}

@Test
Expand Down