Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.nvidia.cuvs.CuVSMatrix;
import com.nvidia.cuvs.CuVSResources;
import com.nvidia.cuvs.SearchResults;
import com.nvidia.cuvs.internal.common.CloseableRMMAllocation;
import com.nvidia.cuvs.internal.panama.cuvsFilter;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -158,25 +159,28 @@ private IndexReference build(

try (var resourcesAccessor = resources.access()) {
long cuvsResources = resourcesAccessor.handle();
MemorySegment datasetMemorySegmentP = allocateRMMSegment(cuvsResources, datasetBytes);
try (var closeableDataMemorySegmentP = allocateRMMSegment(cuvsResources, datasetBytes)) {
MemorySegment datasetMemorySegmentP = closeableDataMemorySegmentP.handle();

cudaMemcpy(datasetMemorySegmentP, datasetMemSegment, datasetBytes, INFER_DIRECTION);
cudaMemcpy(datasetMemorySegmentP, datasetMemSegment, datasetBytes, INFER_DIRECTION);

long[] datasetShape = {rows, cols};
var tensorDataArena = Arena.ofShared();
MemorySegment datasetTensor =
prepareTensor(tensorDataArena, datasetMemorySegmentP, datasetShape, 2, 32, 2, 1);
long[] datasetShape = {rows, cols};
var tensorDataArena = Arena.ofShared();
MemorySegment datasetTensor =
prepareTensor(tensorDataArena, datasetMemorySegmentP, datasetShape, 2, 32, 2, 1);

var returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");
var returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");

returnValue = cuvsBruteForceBuild(cuvsResources, datasetTensor, 0, 0.0f, index);
checkCuVSError(returnValue, "cuvsBruteForceBuild");
returnValue = cuvsBruteForceBuild(cuvsResources, datasetTensor, 0, 0.0f, index);
checkCuVSError(returnValue, "cuvsBruteForceBuild");

returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");
returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");

return new IndexReference(datasetMemorySegmentP, datasetBytes, tensorDataArena, index);
closeableDataMemorySegmentP.release();
return new IndexReference(datasetMemorySegmentP, datasetBytes, tensorDataArena, index);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a problem here: we pass the dataset "pointer" to IndexReference to hold it and clean it when we are done with the index (see destroyIndex()), so this will lead to a double free.
Your change is good only if we are able to determine that we don't need the dataset device memory after we built the index; however, I was not able to determine if we need it or not; I think we might need it, as it might not be copied over again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see now why you need "release": it's a way to work around that.
I think it is better to avoid it, and simply avoid to use try with resources here.

Copy link
Copy Markdown
Contributor Author

@mythrocks mythrocks Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I haven't conveyed the utility of CloseableRMMAllocation properly.

Simple case (CagraIndexImpl, etc.)

Consider the following simplified (if contrived) case, representative of how allocRMMSegment() is used in CagraIndexImpl, TieredIndexImpl, etc.:

// search()
{
  var queriesDP = allocateRMMSegment(...);
  // ...
  cudaMemcpy(...); // Can throw.
  checkCuVSError( cuvsCagraSearch(...) ); // Can also throw.
  // ...
  // And finally.
  cuvsRmmFree( queriesDP );
}

There are several throwable points between the alloc() and the free(). If any of them fire, queriesDP is leaked in __device__ memory. This is the simple case that CloseableRMMAllocation addresses.

The case for .release() (BruteForceIndexImpl)

Similar example as above, except that the Index adopts the allocation, and holds it until destroyIndex() is called.

// build()
{
  var datasetMemorySegmentP = allocateRMMSegment(...);
  // ...
  cudaMemcpy(...); // Can throw.
  checkCuVSError( cuvsBruteForceBuild(...) ); // Can also throw.
  // ...
  // And finally, commit.  No free().
  return new IndexReference( datasetMemorySegmentP, ... );
}

All the perils of the first example appear here as well; there are many throwable points between the allocation and the creation of the IndexReference. We need a way to clean up the memory allocation if there's any throw before the final commit (to IndexReference).
This is why we release() right before the return.

Not using throw-with-resources would mean that we're still open to __device__ memory leaks.

Copy link
Copy Markdown
Contributor

@ldematte ldematte Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Still, I feel like release() is not clear; a more explicit way would be to do an old, simple try/catch:

var closeableDataMemorySegment = allocateRMMSegment(cuvsResources, datasetBytes));
try {
   MemorySegment datasetMemorySegment = closeableDataMemorySegment.handle();
   // use handle
   // here ownership is "transferred"
   return new IndexReference(closeableDataMemorySegment, ... );
} catch (Throwable t) {
   closeableDataMemorySegment.close();
   throw;
}

let me see if I can think of a different pattern.

Copy link
Copy Markdown
Contributor

@ldematte ldematte Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you are trying can be seen as similar to C++ unique_ptr; the difference is that we don't have move semantics in Java (as a simple way to transfer ownership).
An alternative could be to implement release() closer to what C++ has, e.g. returning the enclosed object:

try (var closeableDataMemorySegment = allocateRMMSegment(cuvsResources, datasetBytes)) {
   MemorySegment datasetMemorySegment = closeableDataMemorySegment.handle();
   // use handle
   // here ownership is "transferred" more explicitly
   return new IndexReference(
         new CloseableRMMAllocation(datasetMemorySegment.release()), // release it, return the "raw pointer", pass it immediately to another `CloseableRMMAllocation` that will handle its lifetime
         ... );
}

While I think this models more clearly the "I am transferring ownership" idea, there is one drawback: what if IndexReference or CloseableRMMAllocation ctors throw? This is not a danger here, they are both no-throw ctors (just simple assignements), but still it's a little bit less robust.

I think I still like the explicit try/catch more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to highlight this though:

If any of them fire, queriesDP is leaked in __device__ memory

Leaking device memory seems particularly bad; but how bad is it actually? Like, would be leaking it even after the process is gone, or the OS/device driver will be able to reclaim that memory (like for host memory)?

In any case, calling @ChrisHegarty in to see if we can protect further against this possibility (using cleaners maybe?)

Copy link
Copy Markdown
Contributor Author

@mythrocks mythrocks Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to C++ unique_ptr...

Exactly right. That's what this is modeled around. This will turn into an RAII wrapper after I have moved the allocation into the constructor.

That's also the pattern we use in https://github.com/rapidsai/cudf, when we transfer ownership of the underlying __device__ memory in a CUDF column.

My initial version also had the pointer returned from release.

Edit: Looks like it's not just my initial version; release() does currently return the old pointer when relinquishing the memory. I didn't use it in BruteForceIndexImpl because the original pointer was already at hand in the same scope. This will get tighter once the RAII change is made.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be leaking it even after the process is gone, or the OS/device driver will be able to reclaim that memory (like for host memory)?

No, the leak does not persist beyond the lifetime of the process. That should be reclaimed after the process exits, yes.

But I would like not to make an assumption that the users of cuvs-java are short-running processes, and require them to be tolerant of leaking memory. Even if current users might be alright with a leak in exceptional events, a future user might be a long-running application.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like not to make an assumption that the users of cuvs-java are short-running processes

Definitely not, I was just trying to understand what's the worst case scenario here, and if we need something extra, beyond taking care of exceptions.

}
} finally {
omp_set_num_threads(1);
}
Expand Down Expand Up @@ -205,15 +209,19 @@ public SearchResults search(BruteForceQuery cuvsQuery) throws Throwable {

// prepare the prefiltering data
final long prefilterDataLength;
final long prefilterBytes;
final MemorySegment prefilterDataMemorySegment;
BitSet[] prefilters = cuvsQuery.getPrefilters();
if (prefilters != null && prefilters.length > 0) {
BitSet concatenatedFilters = concatenate(prefilters, cuvsQuery.getNumDocs());
long[] filters = concatenatedFilters.toLongArray();
prefilterDataMemorySegment = buildMemorySegment(localArena, filters);
prefilterDataLength = (long) cuvsQuery.getNumDocs() * prefilters.length;
long[] prefilterShape = {(prefilterDataLength + 31) / 32};
prefilterBytes = C_INT_BYTE_SIZE * prefilterShape[0];
} else {
prefilterDataLength = 0;
prefilterBytes = 0;
prefilterDataMemorySegment = MemorySegment.NULL;
}

Expand All @@ -223,77 +231,66 @@ public SearchResults search(BruteForceQuery cuvsQuery) throws Throwable {
try (var resourcesAccessor = cuvsQuery.getResources().access()) {
long cuvsResources = resourcesAccessor.handle();

long queriesBytes = C_FLOAT_BYTE_SIZE * numQueries * vectorDimension;
long neighborsBytes = C_LONG_BYTE_SIZE * numQueries * topk;
long distanceBytes = C_FLOAT_BYTE_SIZE * numQueries * topk;
long prefilterBytes = 0; // size assigned later

MemorySegment queriesDP = allocateRMMSegment(cuvsResources, queriesBytes);
MemorySegment neighborsDP = allocateRMMSegment(cuvsResources, neighborsBytes);
MemorySegment distancesDP = allocateRMMSegment(cuvsResources, distanceBytes);
MemorySegment prefilterDP = MemorySegment.NULL;

cudaMemcpy(queriesDP, querySeg, queriesBytes, INFER_DIRECTION);

long[] queriesShape = {numQueries, vectorDimension};
MemorySegment queriesTensor =
prepareTensor(localArena, queriesDP, queriesShape, 2, 32, 2, 1);
long[] neighborsShape = {numQueries, topk};
MemorySegment neighborsTensor =
prepareTensor(localArena, neighborsDP, neighborsShape, 0, 64, 2, 1);
long[] distancesShape = {numQueries, topk};
MemorySegment distancesTensor =
prepareTensor(localArena, distancesDP, distancesShape, 2, 32, 2, 1);

MemorySegment prefilter = cuvsFilter.allocate(localArena);
MemorySegment prefilterTensor;

if (prefilterDataMemorySegment == MemorySegment.NULL) {
cuvsFilter.type(prefilter, 0); // NO_FILTER
cuvsFilter.addr(prefilter, 0);
} else {
long[] prefilterShape = {(prefilterDataLength + 31) / 32};
long prefilterLen = prefilterShape[0];
prefilterBytes = C_INT_BYTE_SIZE * prefilterLen;

prefilterDP = allocateRMMSegment(cuvsResources, prefilterBytes);

cudaMemcpy(prefilterDP, prefilterDataMemorySegment, prefilterBytes, HOST_TO_DEVICE);

prefilterTensor = prepareTensor(localArena, prefilterDP, prefilterShape, 1, 32, 2, 1);

cuvsFilter.type(prefilter, 2);
cuvsFilter.addr(prefilter, prefilterTensor.address());
}

var returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");

returnValue =
cuvsBruteForceSearch(
cuvsResources,
bruteForceIndexReference.indexPtr,
queriesTensor,
neighborsTensor,
distancesTensor,
prefilter);
checkCuVSError(returnValue, "cuvsBruteForceSearch");

returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");

cudaMemcpy(neighborsMemorySegment, neighborsDP, neighborsBytes, INFER_DIRECTION);
cudaMemcpy(distancesMemorySegment, distancesDP, distanceBytes, INFER_DIRECTION);

returnValue = cuvsRMMFree(cuvsResources, neighborsDP, neighborsBytes);
checkCuVSError(returnValue, "cuvsRMMFree");
returnValue = cuvsRMMFree(cuvsResources, distancesDP, distanceBytes);
checkCuVSError(returnValue, "cuvsRMMFree");
returnValue = cuvsRMMFree(cuvsResources, queriesDP, queriesBytes);
checkCuVSError(returnValue, "cuvsRMMFree");
if (prefilterBytes > 0) {
returnValue = cuvsRMMFree(cuvsResources, prefilterDP, prefilterBytes);
checkCuVSError(returnValue, "cuvsRMMFree");
final long queriesBytes = C_FLOAT_BYTE_SIZE * numQueries * vectorDimension;
final long neighborsBytes = C_LONG_BYTE_SIZE * numQueries * topk;
final long distanceBytes = C_FLOAT_BYTE_SIZE * numQueries * topk;

try (var queriesDP = allocateRMMSegment(cuvsResources, queriesBytes);
var neighborsDP = allocateRMMSegment(cuvsResources, neighborsBytes);
var distancesDP = allocateRMMSegment(cuvsResources, distanceBytes);
var prefilterDP =
prefilterBytes > 0
? allocateRMMSegment(cuvsResources, prefilterBytes)
: CloseableRMMAllocation.EMPTY) {

cudaMemcpy(queriesDP.handle(), querySeg, queriesBytes, INFER_DIRECTION);

long[] queriesShape = {numQueries, vectorDimension};
MemorySegment queriesTensor =
prepareTensor(localArena, queriesDP.handle(), queriesShape, 2, 32, 2, 1);
long[] neighborsShape = {numQueries, topk};
MemorySegment neighborsTensor =
prepareTensor(localArena, neighborsDP.handle(), neighborsShape, 0, 64, 2, 1);
long[] distancesShape = {numQueries, topk};
MemorySegment distancesTensor =
prepareTensor(localArena, distancesDP.handle(), distancesShape, 2, 32, 2, 1);

MemorySegment prefilter = cuvsFilter.allocate(localArena);
MemorySegment prefilterTensor;

if (prefilterDataMemorySegment == MemorySegment.NULL) {
cuvsFilter.type(prefilter, 0); // NO_FILTER
cuvsFilter.addr(prefilter, 0);
} else {
long[] prefilterShape = {(prefilterDataLength + 31) / 32};
cudaMemcpy(
prefilterDP.handle(), prefilterDataMemorySegment, prefilterBytes, HOST_TO_DEVICE);

prefilterTensor =
prepareTensor(localArena, prefilterDP.handle(), prefilterShape, 1, 32, 2, 1);

cuvsFilter.type(prefilter, 2);
cuvsFilter.addr(prefilter, prefilterTensor.address());
}

var returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");

returnValue =
cuvsBruteForceSearch(
cuvsResources,
bruteForceIndexReference.indexPtr,
queriesTensor,
neighborsTensor,
distancesTensor,
prefilter);
checkCuVSError(returnValue, "cuvsBruteForceSearch");

returnValue = cuvsStreamSync(cuvsResources);
checkCuVSError(returnValue, "cuvsStreamSync");

cudaMemcpy(neighborsMemorySegment, neighborsDP.handle(), neighborsBytes, INFER_DIRECTION);
cudaMemcpy(distancesMemorySegment, distancesDP.handle(), distanceBytes, INFER_DIRECTION);
}
}
return BruteForceSearchResults.create(
Expand Down
Loading
Loading