Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.Table;
Expand All @@ -79,7 +80,11 @@
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.recon.api.handlers.BucketHandler;
import org.apache.hadoop.ozone.recon.api.handlers.EntityHandler;
import org.apache.hadoop.ozone.recon.api.ServiceNotReadyException;
import org.apache.hadoop.ozone.recon.api.types.NSSummary;
import org.apache.hadoop.ozone.recon.api.types.DUResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
Expand Down Expand Up @@ -179,102 +185,133 @@ public OMDBInsightEndpoint(OzoneStorageContainerManager reconSCM,
@Path("/open")
public Response getOpenKeyInfo(
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
int limit,
int limit,
@DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_PREVKEY)
String prevKey,
@DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_FSO)
@QueryParam(RECON_OPEN_KEY_INCLUDE_FSO)
boolean includeFso,
@DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_NON_FSO)
@QueryParam(RECON_OPEN_KEY_INCLUDE_NON_FSO)
boolean includeNonFso) {
String prevKey,
@DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_START_PREFIX)
String startPrefix,
@DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_FSO) @QueryParam(RECON_OPEN_KEY_INCLUDE_FSO)
boolean includeFso,
@DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_NON_FSO) @QueryParam(RECON_OPEN_KEY_INCLUDE_NON_FSO)
boolean includeNonFso) {

KeyInsightInfoResponse openKeyInsightInfo = new KeyInsightInfoResponse();
List<KeyEntityInfo> nonFSOKeyInfoList =
openKeyInsightInfo.getNonFSOKeyInfoList();

boolean skipPrevKeyDone = false;
boolean isLegacyBucketLayout = true;
boolean recordsFetchedLimitReached = false;

String lastKey = "";
List<KeyEntityInfo> fsoKeyInfoList = openKeyInsightInfo.getFsoKeyInfoList();
for (BucketLayout layout : Arrays.asList(
BucketLayout.LEGACY, BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
isLegacyBucketLayout = (layout == BucketLayout.LEGACY);
// Skip bucket iteration based on parameters includeFso and includeNonFso
if ((!includeFso && !isLegacyBucketLayout) ||
(!includeNonFso && isLegacyBucketLayout)) {
continue;

try {
long replicatedTotal = 0;
long unreplicatedTotal = 0;
boolean skipPrevKeyDone = false; // Tracks if prevKey was used earlier
boolean keysFound = false; // Flag to track if any keys are found
String lastKey = null;
Map<String, OmKeyInfo> obsKeys = Collections.emptyMap();
Map<String, OmKeyInfo> fsoKeys = Collections.emptyMap();

// Validate startPrefix if it's provided
if (isNotBlank(startPrefix) && !validateStartPrefix(startPrefix)) {
return createBadRequestResponse("Invalid startPrefix: Path must be at the bucket level or deeper.");
}

Table<String, OmKeyInfo> openKeyTable =
omMetadataManager.getOpenKeyTable(layout);
try (
TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = openKeyTable.iterator()) {
boolean skipPrevKey = false;
String seekKey = prevKey;
if (!skipPrevKeyDone && isNotBlank(prevKey)) {
skipPrevKey = true;
Table.KeyValue<String, OmKeyInfo> seekKeyValue =
keyIter.seek(seekKey);
// check if RocksDB was able to seek correctly to the given key prefix
// if not, then return empty result
// In case of an empty prevKeyPrefix, all the keys are returned
if (seekKeyValue == null ||
(isNotBlank(prevKey) &&
!seekKeyValue.getKey().equals(prevKey))) {
continue;
}
// Use searchOpenKeys logic with adjustments for FSO and Non-FSO filtering
if (includeNonFso) {
// Search for non-FSO keys in KeyTable
Table<String, OmKeyInfo> openKeyTable = omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY);
obsKeys = extractKeysFromTable(openKeyTable, startPrefix, limit, prevKey);
for (Map.Entry<String, OmKeyInfo> entry : obsKeys.entrySet()) {
keysFound = true;
skipPrevKeyDone = true; // Don't use the prevKey for the file table
KeyEntityInfo keyEntityInfo = createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue());
openKeyInsightInfo.getNonFSOKeyInfoList().add(keyEntityInfo); // Add to non-FSO list
replicatedTotal += entry.getValue().getReplicatedSize();
unreplicatedTotal += entry.getValue().getDataSize();
lastKey = entry.getKey(); // Update lastKey
}
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
String key = kv.getKey();
lastKey = key;
OmKeyInfo omKeyInfo = kv.getValue();
// skip the prev key if prev key is present
if (skipPrevKey && key.equals(prevKey)) {
skipPrevKeyDone = true;
continue;
}
KeyEntityInfo keyEntityInfo = new KeyEntityInfo();
keyEntityInfo.setIsKey(omKeyInfo.isFile());
keyEntityInfo.setKey(key);
keyEntityInfo.setPath(omKeyInfo.getKeyName());
keyEntityInfo.setInStateSince(omKeyInfo.getCreationTime());
keyEntityInfo.setSize(omKeyInfo.getDataSize());
keyEntityInfo.setReplicatedSize(omKeyInfo.getReplicatedSize());
keyEntityInfo.setReplicationConfig(omKeyInfo.getReplicationConfig());
openKeyInsightInfo.setUnreplicatedDataSize(
openKeyInsightInfo.getUnreplicatedDataSize() +
keyEntityInfo.getSize());
openKeyInsightInfo.setReplicatedDataSize(
openKeyInsightInfo.getReplicatedDataSize() +
keyEntityInfo.getReplicatedSize());
boolean added =
isLegacyBucketLayout ? nonFSOKeyInfoList.add(keyEntityInfo) :
fsoKeyInfoList.add(keyEntityInfo);
if ((nonFSOKeyInfoList.size() + fsoKeyInfoList.size()) == limit) {
recordsFetchedLimitReached = true;
break;
}
}

if (includeFso) {
// Search for FSO keys in FileTable
// If prevKey was used for non-FSO keys, skip it for FSO keys.
String effectivePrevKey = skipPrevKeyDone ? "" : prevKey;
// If limit = -1 then we need to fetch all keys without limit
int effectiveLimit = limit == -1 ? limit : limit - obsKeys.size();
fsoKeys = searchOpenKeysInFSO(startPrefix, effectiveLimit, effectivePrevKey);
for (Map.Entry<String, OmKeyInfo> entry : fsoKeys.entrySet()) {
keysFound = true;
KeyEntityInfo keyEntityInfo = createKeyEntityInfoFromOmKeyInfo(entry.getKey(), entry.getValue());
openKeyInsightInfo.getFsoKeyInfoList().add(keyEntityInfo); // Add to FSO list
replicatedTotal += entry.getValue().getReplicatedSize();
unreplicatedTotal += entry.getValue().getDataSize();
lastKey = entry.getKey(); // Update lastKey
}
} catch (IOException ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
} catch (IllegalArgumentException e) {
throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
} catch (Exception ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
}
if (recordsFetchedLimitReached) {
break;

// If no keys were found, return a response indicating that no keys matched
if (!keysFound) {
return noMatchedKeysResponse(startPrefix);
}

// Set the aggregated totals in the response
openKeyInsightInfo.setReplicatedDataSize(replicatedTotal);
openKeyInsightInfo.setUnreplicatedDataSize(unreplicatedTotal);
openKeyInsightInfo.setLastKey(lastKey);

// Return the response with the matched keys and their data sizes
return Response.ok(openKeyInsightInfo).build();
} catch (IOException e) {
// Handle IO exceptions and return an internal server error response
return createInternalServerErrorResponse("Error searching open keys in OM DB: " + e.getMessage());
} catch (IllegalArgumentException e) {
// Handle illegal argument exceptions and return a bad request response
return createBadRequestResponse("Invalid argument: " + e.getMessage());
}
}

openKeyInsightInfo.setLastKey(lastKey);
return Response.ok(openKeyInsightInfo).build();
public Map<String, OmKeyInfo> searchOpenKeysInFSO(String startPrefix,
int limit, String prevKey)
throws IOException, IllegalArgumentException {
Map<String, OmKeyInfo> matchedKeys = new LinkedHashMap<>();
// Convert the search prefix to an object path for FSO buckets
String startPrefixObjectPath =
convertToObjectPath(startPrefix, omMetadataManager, reconNamespaceSummaryManager, reconSCM);
String[] names = parseRequestPath(startPrefixObjectPath);
Table<String, OmKeyInfo> openFileTable =
omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED);

// If names.length <= 2, then the search prefix is at the volume or bucket level hence
// no need to find parent or extract id's or find subpaths as the openFileTable is
// suitable for volume and bucket level search
if (names.length > 2 && startPrefixObjectPath.endsWith(OM_KEY_PREFIX)) {
// Fetch the parent ID to search for
long parentId = Long.parseLong(names[names.length - 1]);

// Fetch the nameSpaceSummary for the parent ID
NSSummary parentSummary = reconNamespaceSummaryManager.getNSSummary(parentId);
if (parentSummary == null) {
return matchedKeys;
}
List<String> subPaths = new ArrayList<>();
// Add the initial search prefix object path because it can have both openFiles
// and subdirectories with openFiles
subPaths.add(startPrefixObjectPath);

// Recursively gather all subpaths
gatherSubPaths(parentId, subPaths, Long.parseLong(names[0]), Long.parseLong(names[1]),
reconNamespaceSummaryManager);

// Iterate over the subpaths and retrieve the open files
for (String subPath : subPaths) {
matchedKeys.putAll(
extractKeysFromTable(openFileTable, subPath, limit - matchedKeys.size(), prevKey));
if (matchedKeys.size() >= limit) {
break;
}
}
return matchedKeys;
}

// If the search level is at the volume, bucket or key level, directly search the openFileTable
matchedKeys.putAll(
extractKeysFromTable(openFileTable, startPrefixObjectPath, limit, prevKey));
return matchedKeys;
}

/**
Expand Down Expand Up @@ -1265,8 +1302,7 @@ private KeyEntityInfo createKeyEntityInfoFromOmKeyInfo(String dbKey,
KeyEntityInfo keyEntityInfo = new KeyEntityInfo();
keyEntityInfo.setKey(dbKey); // Set the DB key
keyEntityInfo.setIsKey(keyInfo.isFile());
keyEntityInfo.setPath(ReconUtils.constructFullPath(keyInfo, reconNamespaceSummaryManager,
omMetadataManager));
keyEntityInfo.setPath(ReconUtils.constructFullPath(keyInfo, reconNamespaceSummaryManager, omMetadataManager));
keyEntityInfo.setSize(keyInfo.getDataSize());
keyEntityInfo.setCreationTime(keyInfo.getCreationTime());
keyEntityInfo.setModificationTime(keyInfo.getModificationTime());
Expand All @@ -1284,6 +1320,20 @@ private void createSummaryForDeletedDirectories(
dirSummary.put("totalDeletedDirectories", deletedDirCount);
}

private boolean validateStartPrefix(String startPrefix) {

// Ensure startPrefix starts with '/' for non-empty values
startPrefix = startPrefix.startsWith("/") ? startPrefix : "/" + startPrefix;

// Split the path to ensure it's at least at the bucket level (volume/bucket).
String[] pathComponents = startPrefix.split("/");
if (pathComponents.length < 3 || pathComponents[2].isEmpty()) {
return false; // Invalid if not at bucket level or deeper
}

return true;
}

private String createPath(OmKeyInfo omKeyInfo) {
return omKeyInfo.getVolumeName() + OM_KEY_PREFIX +
omKeyInfo.getBucketName() + OM_KEY_PREFIX + omKeyInfo.getKeyName();
Expand Down
Loading