Skip to content

Commit 1a0a1d4

Browse files
authored
Add Helper to Create NumPy Files (#1645)
This PR introduces `create_numpy_file` in the file_io.hpp header. It creates NumPy files with pre-allocated space and writes the appropriate header based on the specified shape and data type. This removes duplicated code in CAGRA ACE in `cagra_build.cuh`. It addresses one follow-up item of #1486. CC @tfeher Authors: - Julian Miller (https://github.com/julianmi) Approvers: - Tamas Bela Feher (https://github.com/tfeher) URL: #1645
1 parent 27ca73c commit 1a0a1d4

2 files changed

Lines changed: 64 additions & 124 deletions

File tree

cpp/include/cuvs/util/file_io.hpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
/*
2-
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.
2+
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55
#pragma once
66

77
#include <raft/core/error.hpp>
8+
#include <raft/core/serialize.hpp>
89

910
#include <algorithm>
1011
#include <cstring>
1112
#include <istream>
1213
#include <limits.h>
1314
#include <memory>
1415
#include <ostream>
16+
#include <sstream>
1517
#include <streambuf>
1618
#include <string>
1719
#include <utility>
@@ -166,6 +168,58 @@ class file_descriptor {
166168
std::string path_;
167169
};
168170

171+
/**
172+
* @brief Create a numpy file with pre-allocated space and write the header.
173+
*
174+
* Opens a file, writes a numpy header for the given shape/dtype, and pre-allocates
175+
* space for the data. This is useful for memory-mapped or streaming writes.
176+
*
177+
* @tparam T Data type for the numpy array
178+
* @param path File path to create
179+
* @param shape Shape of the numpy array (e.g., {rows, cols} for 2D)
180+
* @return Pair of (file_descriptor, header_size)
181+
*/
182+
template <typename T>
183+
std::pair<file_descriptor, size_t> create_numpy_file(const std::string& path,
184+
const std::vector<size_t>& shape)
185+
{
186+
// Open file
187+
file_descriptor fd(path, O_CREAT | O_RDWR | O_TRUNC, 0644);
188+
189+
// Build header
190+
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<T>();
191+
const bool fortran_order = false;
192+
const raft::detail::numpy_serializer::header_t header = {dtype, fortran_order, shape};
193+
194+
std::stringstream ss;
195+
raft::detail::numpy_serializer::write_header(ss, header);
196+
std::string header_str = ss.str();
197+
size_t header_size = header_str.size();
198+
199+
// Calculate data size from shape
200+
size_t data_bytes = sizeof(T);
201+
for (auto dim : shape) {
202+
data_bytes *= dim;
203+
}
204+
205+
// Pre-allocate file space
206+
if (posix_fallocate(fd.get(), 0, header_size + data_bytes) != 0) {
207+
RAFT_FAIL("Failed to pre-allocate space for file: %s", path.c_str());
208+
}
209+
210+
// Seek to beginning and write header
211+
if (lseek(fd.get(), 0, SEEK_SET) == -1) {
212+
RAFT_FAIL("Failed to seek to beginning of file: %s", path.c_str());
213+
}
214+
215+
ssize_t written = write(fd.get(), header_str.data(), header_str.size());
216+
if (written < 0 || static_cast<size_t>(written) != header_str.size()) {
217+
RAFT_FAIL("Failed to write numpy header to file: %s", path.c_str());
218+
}
219+
220+
return {std::move(fd), header_size};
221+
}
222+
169223
/**
170224
* @brief Read large file in chunks using pread
171225
*

cpp/src/neighbors/detail/cagra/cagra_build.cuh

Lines changed: 9 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -952,132 +952,18 @@ index<T, IdxT> build_ace(raft::resources const& res,
952952
// Mark for cleanup if we fail after creating the directory
953953
cleanup_on_failure = true;
954954

955-
// Helper lambda to write numpy header to file descriptor
956-
auto write_numpy_header = [](int fd,
957-
const std::vector<size_t>& shape,
958-
const raft::detail::numpy_serializer::dtype_t& dtype) {
959-
std::stringstream ss;
955+
// Create numpy files with pre-allocated space
956+
std::tie(reordered_fd, reordered_header_size) = cuvs::util::create_numpy_file<T>(
957+
build_dir + "/reordered_dataset.npy", {dataset_size, dataset_dim});
960958

961-
const bool fortran_order = false;
962-
const raft::detail::numpy_serializer::header_t header = {dtype, fortran_order, shape};
959+
std::tie(augmented_fd, augmented_header_size) = cuvs::util::create_numpy_file<T>(
960+
build_dir + "/augmented_dataset.npy", {dataset_size, dataset_dim});
963961

964-
raft::detail::numpy_serializer::write_header(ss, header);
962+
std::tie(mapping_fd, mapping_header_size) =
963+
cuvs::util::create_numpy_file<IdxT>(build_dir + "/dataset_mapping.npy", {dataset_size});
965964

966-
std::string header_str = ss.str();
967-
ssize_t written = write(fd, header_str.data(), header_str.size());
968-
if (written < 0 || static_cast<size_t>(written) != header_str.size()) {
969-
RAFT_FAIL("Failed to write numpy header to file descriptor");
970-
}
971-
return header_str.size();
972-
};
973-
974-
// Create and allocate dataset file
975-
reordered_fd = cuvs::util::file_descriptor(
976-
build_dir + "/reordered_dataset.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
977-
{
978-
std::stringstream ss;
979-
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<T>();
980-
const bool fortran_order = false;
981-
const raft::detail::numpy_serializer::header_t header = {
982-
dtype, fortran_order, {dataset_size, dataset_dim}};
983-
raft::detail::numpy_serializer::write_header(ss, header);
984-
reordered_header_size = ss.str().size();
985-
}
986-
if (posix_fallocate(reordered_fd.get(),
987-
0,
988-
reordered_header_size + dataset_size * dataset_dim * sizeof(T)) != 0) {
989-
RAFT_FAIL("Failed to pre-allocate space for reordered dataset file");
990-
}
991-
{
992-
auto dtype_for_dataset = raft::detail::numpy_serializer::get_numpy_dtype<T>();
993-
RAFT_LOG_DEBUG("Writing reordered_dataset.npy header: shape=[%zu,%zu], dtype=%c",
994-
dataset_size,
995-
dataset_dim,
996-
dtype_for_dataset.kind);
997-
if (lseek(reordered_fd.get(), 0, SEEK_SET) == -1) {
998-
RAFT_FAIL("Failed to seek to beginning of reordered dataset file");
999-
}
1000-
write_numpy_header(reordered_fd.get(), {dataset_size, dataset_dim}, dtype_for_dataset);
1001-
}
1002-
1003-
// Create and allocate augmented dataset file
1004-
augmented_fd = cuvs::util::file_descriptor(
1005-
build_dir + "/augmented_dataset.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
1006-
{
1007-
std::stringstream ss;
1008-
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<T>();
1009-
const bool fortran_order = false;
1010-
const raft::detail::numpy_serializer::header_t header = {
1011-
dtype, fortran_order, {dataset_size, dataset_dim}};
1012-
raft::detail::numpy_serializer::write_header(ss, header);
1013-
augmented_header_size = ss.str().size();
1014-
}
1015-
if (posix_fallocate(augmented_fd.get(),
1016-
0,
1017-
augmented_header_size + dataset_size * dataset_dim * sizeof(T)) != 0) {
1018-
RAFT_FAIL("Failed to pre-allocate space for augmented dataset file");
1019-
}
1020-
// Seek to beginning before writing header
1021-
if (lseek(augmented_fd.get(), 0, SEEK_SET) == -1) {
1022-
RAFT_FAIL("Failed to seek to beginning of augmented dataset file");
1023-
}
1024-
write_numpy_header(augmented_fd.get(),
1025-
{dataset_size, dataset_dim},
1026-
raft::detail::numpy_serializer::get_numpy_dtype<T>());
1027-
1028-
// Create and allocate mapping file
1029-
mapping_fd = cuvs::util::file_descriptor(
1030-
build_dir + "/dataset_mapping.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
1031-
{
1032-
std::stringstream ss;
1033-
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
1034-
const bool fortran_order = false;
1035-
const raft::detail::numpy_serializer::header_t header = {
1036-
dtype, fortran_order, {dataset_size}};
1037-
raft::detail::numpy_serializer::write_header(ss, header);
1038-
mapping_header_size = ss.str().size();
1039-
}
1040-
if (posix_fallocate(mapping_fd.get(), 0, mapping_header_size + dataset_size * sizeof(IdxT)) !=
1041-
0) {
1042-
RAFT_FAIL("Failed to pre-allocate space for dataset mapping file");
1043-
}
1044-
{
1045-
auto dtype_for_mapping = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
1046-
RAFT_LOG_DEBUG("Writing dataset_mapping.npy header: shape=[%zu], dtype=%c",
1047-
dataset_size,
1048-
dtype_for_mapping.kind);
1049-
if (lseek(mapping_fd.get(), 0, SEEK_SET) == -1) {
1050-
RAFT_FAIL("Failed to seek to beginning of mapping file");
1051-
}
1052-
write_numpy_header(mapping_fd.get(), {dataset_size}, dtype_for_mapping);
1053-
}
1054-
1055-
// Create and allocate graph file
1056-
graph_fd = cuvs::util::file_descriptor(
1057-
build_dir + "/cagra_graph.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
1058-
{
1059-
std::stringstream ss;
1060-
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
1061-
const bool fortran_order = false;
1062-
const raft::detail::numpy_serializer::header_t header = {
1063-
dtype, fortran_order, {dataset_size, graph_degree}};
1064-
raft::detail::numpy_serializer::write_header(ss, header);
1065-
graph_header_size = ss.str().size();
1066-
}
1067-
if (posix_fallocate(graph_fd.get(), 0, graph_header_size + cagra_graph_size) != 0) {
1068-
RAFT_FAIL("Failed to pre-allocate space for graph file");
1069-
}
1070-
{
1071-
auto dtype_for_graph = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
1072-
RAFT_LOG_DEBUG("Writing cagra_graph.npy header: shape=[%zu,%zu], dtype=%c",
1073-
dataset_size,
1074-
graph_degree,
1075-
dtype_for_graph.kind);
1076-
if (lseek(graph_fd.get(), 0, SEEK_SET) == -1) {
1077-
RAFT_FAIL("Failed to seek to beginning of graph file");
1078-
}
1079-
write_numpy_header(graph_fd.get(), {dataset_size, graph_degree}, dtype_for_graph);
1080-
}
965+
std::tie(graph_fd, graph_header_size) = cuvs::util::create_numpy_file<IdxT>(
966+
build_dir + "/cagra_graph.npy", {dataset_size, graph_degree});
1081967

1082968
RAFT_LOG_DEBUG(
1083969
"ACE: Wrote numpy headers (reordered: %zu, augmented: %zu, mapping: %zu, graph: %zu bytes)",

0 commit comments

Comments
 (0)