Skip to content
188 changes: 156 additions & 32 deletions python/cuvs_bench/cuvs_bench/generate_groundtruth/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,206 @@
# limitations under the License.
#
import argparse
import importlib
import os
import sys
import warnings

import cupy as cp
import numpy as np
import rmm
from pylibraft.common import DeviceResources
from rmm.allocators.cupy import rmm_cupy_allocator
from .utils import memmap_bin_file, suffix_from_dtype, write_bin

from cuvs.neighbors.brute_force import build, search

from .utils import memmap_bin_file, suffix_from_dtype, write_bin
def import_with_fallback(primary_lib, secondary_lib=None, alias=None):
"""
Attempt to import a primary library, with an optional fallback to a
secondary library.
Optionally assigns the imported module to a global alias.

Parameters
----------
primary_lib : str
Name of the primary library to import.
secondary_lib : str, optional
Name of the secondary library to use as a fallback. If `None`,
no fallback is attempted.
alias : str, optional
Alias to assign the imported module globally.

Returns
-------
module or None
The imported module if successful; otherwise, `None`.

Examples
--------
>>> xp = import_with_fallback('cupy', 'numpy')
>>> mod = import_with_fallback('nonexistent_lib')
>>> if mod is None:
... print("Library not found.")
"""
try:
module = importlib.import_module(primary_lib)
except ImportError:
if secondary_lib is not None:
try:
module = importlib.import_module(secondary_lib)
except ImportError:
module = None
else:
module = None
if alias and module is not None:
globals()[alias] = module
return module


xp = import_with_fallback("cupy", "numpy")
rmm = import_with_fallback("rmm")

if rmm is not None:
gpu_system = True
from pylibraft.common import DeviceResources
from rmm.allocators.cupy import rmm_cupy_allocator
else:
gpu_system = False
warnings.warn(
"Consider using a GPU-based system to greatly accelerate "
" generating groundtruths using cuVS."
)


def generate_random_queries(n_queries, n_features, dtype=np.float32):
def generate_random_queries(n_queries, n_features, dtype=xp.float32):
print("Generating random queries")
if np.issubdtype(dtype, np.integer):
queries = cp.random.randint(
if xp.issubdtype(dtype, xp.integer):
queries = xp.random.randint(
0, 255, size=(n_queries, n_features), dtype=dtype
)
else:
queries = cp.random.uniform(size=(n_queries, n_features)).astype(dtype)
queries = xp.random.uniform(size=(n_queries, n_features)).astype(dtype)
return queries


def choose_random_queries(dataset, n_queries):
print("Choosing random vector from dataset as query vectors")
query_idx = np.random.choice(
query_idx = xp.random.choice(
dataset.shape[0], size=(n_queries,), replace=False
)
return dataset[query_idx, :]


def cpu_search(dataset, queries, k, metric="squeclidean"):
"""
Find the k nearest neighbors for each query point in the dataset using the
specified metric.

Parameters
----------
dataset : numpy.ndarray
An array of shape (n_samples, n_features) representing the dataset.
queries : numpy.ndarray
An array of shape (n_queries, n_features) representing the query
points.
k : int
The number of nearest neighbors to find.
metric : str, optional
The distance metric to use. Can be 'squeclidean' or 'inner_product'.
Default is 'squeclidean'.

Returns
-------
distances : numpy.ndarray
An array of shape (n_queries, k) containing the distances
(for 'squeclidean') or similarities
(for 'inner_product') to the k nearest neighbors for each query.
indices : numpy.ndarray
An array of shape (n_queries, k) containing the indices of the
k nearest neighbors in the dataset for each query.

"""
if metric == "squeclidean":
diff = queries[:, xp.newaxis, :] - dataset[xp.newaxis, :, :]
dist_sq = xp.sum(diff**2, axis=2) # Shape: (n_queries, n_samples)

indices = xp.argpartition(dist_sq, kth=k - 1, axis=1)[:, :k]
distances = xp.take_along_axis(dist_sq, indices, axis=1)

sorted_idx = xp.argsort(distances, axis=1)
distances = xp.take_along_axis(distances, sorted_idx, axis=1)
indices = xp.take_along_axis(indices, sorted_idx, axis=1)

elif metric == "inner_product":
similarities = xp.dot(
queries, dataset.T
) # Shape: (n_queries, n_samples)

neg_similarities = -similarities
indices = xp.argpartition(neg_similarities, kth=k - 1, axis=1)[:, :k]
distances = xp.take_along_axis(similarities, indices, axis=1)

sorted_idx = xp.argsort(-distances, axis=1)
Comment thread
divyegala marked this conversation as resolved.

else:
raise ValueError(
"Unsupported metric in cuvs-bench-cpu. "
"Use 'squeclidean' or 'inner_product' or use the GPU package"
"to use any distance supported by cuVS."
)

distances = xp.take_along_axis(distances, sorted_idx, axis=1)
indices = xp.take_along_axis(indices, sorted_idx, axis=1)

return distances, indices


def calc_truth(dataset, queries, k, metric="sqeuclidean"):
resources = DeviceResources()
n_samples = dataset.shape[0]
n = 500000 # batch size for processing neighbors
i = 0
indices = None
distances = None
queries = cp.asarray(queries, dtype=cp.float32)
queries = xp.asarray(queries, dtype=xp.float32)

if gpu_system:
from cuvs.neighbors.brute_force import build, search

resources = DeviceResources()

while i < n_samples:
print("Step {0}/{1}:".format(i // n, n_samples // n))
n_batch = n if i + n <= n_samples else n_samples - i

X = cp.asarray(dataset[i : i + n_batch, :], cp.float32)
X = xp.asarray(dataset[i : i + n_batch, :], xp.float32)

index = build(X, metric=metric, resources=resources)
D, Ind = search(index, queries, k, resources=resources)
resources.sync()
if gpu_system:
index = build(X, metric=metric, resources=resources)
D, Ind = search(index, queries, k, resources=resources)
resources.sync()
else:
D, Ind = cpu_search(X, queries, metric=metric)

D, Ind = cp.asarray(D), cp.asarray(Ind)
D, Ind = xp.asarray(D), xp.asarray(Ind)
Ind += i # shift neighbor index by offset i

if distances is None:
distances = D
indices = Ind
else:
distances = cp.concatenate([distances, D], axis=1)
indices = cp.concatenate([indices, Ind], axis=1)
idx = cp.argsort(distances, axis=1)[:, :k]
distances = cp.take_along_axis(distances, idx, axis=1)
indices = cp.take_along_axis(indices, idx, axis=1)
distances = xp.concatenate([distances, D], axis=1)
indices = xp.concatenate([indices, Ind], axis=1)
idx = xp.argsort(distances, axis=1)[:, :k]
distances = xp.take_along_axis(distances, idx, axis=1)
indices = xp.take_along_axis(indices, idx, axis=1)

i += n_batch

return distances, indices


def main():
pool = rmm.mr.PoolMemoryResource(
rmm.mr.CudaMemoryResource(), initial_pool_size=2**30
)
rmm.mr.set_current_device_resource(pool)
cp.cuda.set_allocator(rmm_cupy_allocator)
if gpu_system:
pool = rmm.mr.PoolMemoryResource(
rmm.mr.CudaMemoryResource(), initial_pool_size=2**30
)
rmm.mr.set_current_device_resource(pool)
xp.cuda.set_allocator(rmm_cupy_allocator)

parser = argparse.ArgumentParser(
prog="generate_groundtruth",
Expand Down Expand Up @@ -197,7 +321,7 @@ def main():
"Dataset size {:6.1f} GB, shape {}, dtype {}".format(
dataset.size * dataset.dtype.itemsize / 1e9,
dataset.shape,
np.dtype(dtype),
xp.dtype(dtype),
)
)

Expand Down Expand Up @@ -230,11 +354,11 @@ def main():

write_bin(
os.path.join(args.output, "groundtruth.neighbors.ibin"),
indices.astype(np.uint32),
indices.astype(xp.uint32),
)
write_bin(
os.path.join(args.output, "groundtruth.distances.fbin"),
distances.astype(np.float32),
distances.astype(xp.float32),
)


Expand Down