diff --git a/conda/recipes/rapidsmpf/recipe.yaml b/conda/recipes/rapidsmpf/recipe.yaml index cba2b0e67..f35e841de 100644 --- a/conda/recipes/rapidsmpf/recipe.yaml +++ b/conda/recipes/rapidsmpf/recipe.yaml @@ -58,10 +58,12 @@ requirements: - ${{ compiler("c") }} - ${{ compiler("cxx") }} - ${{ compiler("cuda") }} + - cuda-cupti-dev - cuda-version =${{ cuda_version }} - ${{ stdlib("c") }} host: - cuda-cudart-dev + - cuda-cupti-dev - cuda-version =${{ cuda_version }} - cython >=3.0.0 - librapidsmpf =${{ version }} @@ -74,10 +76,11 @@ requirements: - rmm =${{ minor_version }} - scikit-build-core >=0.10.0 run: - - ${{ pin_compatible("cuda-version", upper_bound="x", lower_bound="x") }} + - cuda-cupti - if: cuda_major == "12" then: cuda-python >=12.9.2,<13.0a0 else: cuda-python >=13.0.1,<14.0a0 + - ${{ pin_compatible("cuda-version", upper_bound="x", lower_bound="x") }} - cudf =${{ minor_version }} - cupy >=13.6.0 - librapidsmpf =${{ version }} @@ -98,6 +101,7 @@ requirements: - ${{ stdlib("c") }} by_name: - cuda-cudart + - cuda-cupti - cuda-version - openmpi - mpi4py diff --git a/cpp/examples/example_cupti_monitor.cpp b/cpp/examples/example_cupti_monitor.cpp index 331c8884e..2c2c219ed 100644 --- a/cpp/examples/example_cupti_monitor.cpp +++ b/cpp/examples/example_cupti_monitor.cpp @@ -58,7 +58,7 @@ int main() { // Allocate device memory using rmm::device_buffer rmm::device_buffer buf(allocation_size, rmm::cuda_stream_default); device_buffers.push_back(std::move(buf)); - } catch (rmm::bad_alloc& e) { + } catch (rmm::bad_alloc const& e) { std::cerr << "rmm::device_buffer allocation failed: " << e.what() << std::endl; break; diff --git a/python/rapidsmpf/rapidsmpf/CMakeLists.txt b/python/rapidsmpf/rapidsmpf/CMakeLists.txt index bc9370f2f..3c6e5e993 100644 --- a/python/rapidsmpf/rapidsmpf/CMakeLists.txt +++ b/python/rapidsmpf/rapidsmpf/CMakeLists.txt @@ -7,6 +7,11 @@ set(cython_modules config.pyx cuda_stream.pyx progress_thread.pyx rmm_resource_a shuffler.pyx statistics.pyx ) +# Add cupti module conditionally if CUPTI support is enabled +if(RAPIDSMPF_HAVE_CUPTI) + list(APPEND cython_modules cupti.pyx) +endif() + rapids_cython_create_modules( CXX SOURCE_FILES "${cython_modules}" diff --git a/python/rapidsmpf/rapidsmpf/cupti.pxd b/python/rapidsmpf/rapidsmpf/cupti.pxd new file mode 100644 index 000000000..4827de49e --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/cupti.pxd @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from libc.stddef cimport size_t +from libcpp cimport bool as bool_t +from libcpp.memory cimport unique_ptr +from libcpp.string cimport string +from libcpp.unordered_map cimport unordered_map +from libcpp.vector cimport vector + + +cdef extern from "" namespace "std::chrono" nogil: + cdef cppclass milliseconds: + milliseconds(long long) except + + +cdef extern from "" nogil: + ctypedef enum CUpti_CallbackId: + pass + + +cdef extern from "" nogil: + cdef struct cpp_MemoryDataPoint "rapidsmpf::MemoryDataPoint": + double timestamp + size_t free_memory + size_t total_memory + size_t used_memory + + cdef cppclass cpp_CuptiMonitor "rapidsmpf::CuptiMonitor": + cpp_CuptiMonitor( + bool_t enable_periodic_sampling, + milliseconds sampling_interval_ms + ) except + + void start_monitoring() except + + void stop_monitoring() except + + bool_t is_monitoring() except + + void capture_memory_sample() except + + const vector[cpp_MemoryDataPoint]& get_memory_samples() except + + void clear_samples() except + + size_t get_sample_count() except + + void write_csv(const string& filename) except + + void set_debug_output(bool_t enabled, size_t threshold_mb) except + + unordered_map[CUpti_CallbackId, size_t] get_callback_counters() except + + void clear_callback_counters() except + + size_t get_total_callback_count() except + + string get_callback_summary() except + + + +cdef class MemoryDataPoint: + cdef cpp_MemoryDataPoint _data + + @staticmethod + cdef MemoryDataPoint from_cpp(cpp_MemoryDataPoint data) + + +cdef class CuptiMonitor: + cdef unique_ptr[cpp_CuptiMonitor] _handle diff --git a/python/rapidsmpf/rapidsmpf/cupti.pyi b/python/rapidsmpf/rapidsmpf/cupti.pyi new file mode 100644 index 000000000..01aa5f509 --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/cupti.pyi @@ -0,0 +1,33 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +class MemoryDataPoint: + @property + def timestamp(self) -> float: ... + @property + def free_memory(self) -> int: ... + @property + def total_memory(self) -> int: ... + @property + def used_memory(self) -> int: ... + def __repr__(self) -> str: ... + +class CuptiMonitor: + def __init__( + self, enable_periodic_sampling: bool = ..., sampling_interval_ms: int = ... + ) -> None: ... + def start_monitoring(self) -> None: ... + def stop_monitoring(self) -> None: ... + def is_monitoring(self) -> bool: ... + def capture_memory_sample(self) -> None: ... + def get_memory_samples(self) -> list[MemoryDataPoint]: ... + def clear_samples(self) -> None: ... + def get_sample_count(self) -> int: ... + def write_csv(self, filename: str) -> None: ... + def set_debug_output(self, enabled: bool, threshold_mb: int = ...) -> None: ... + def get_callback_counters(self) -> dict[int, int]: ... + def clear_callback_counters(self) -> None: ... + def get_total_callback_count(self) -> int: ... + def get_callback_summary(self) -> str: ... diff --git a/python/rapidsmpf/rapidsmpf/cupti.pyx b/python/rapidsmpf/rapidsmpf/cupti.pyx new file mode 100644 index 000000000..8d4c8b0a9 --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/cupti.pyx @@ -0,0 +1,260 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from cython.operator cimport dereference as deref +from cython.operator cimport postincrement +from libcpp cimport bool as bool_t +from libcpp.memory cimport make_unique +from libcpp.string cimport string +from libcpp.unordered_map cimport unordered_map +from libcpp.vector cimport vector + +from rapidsmpf.cupti cimport milliseconds + + +cdef class MemoryDataPoint: + """A data point representing GPU memory usage at a specific time. + + Attributes + ---------- + timestamp + Time when sample was taken (seconds since epoch). + free_memory + Free GPU memory in bytes. + total_memory + Total GPU memory in bytes. + used_memory + Used GPU memory in bytes. + """ + + def __init__(self): + raise ValueError("Use the `from_cpp` factory method") + + @staticmethod + cdef MemoryDataPoint from_cpp(cpp_MemoryDataPoint data): + """Create a MemoryDataPoint from C++ data.""" + cdef MemoryDataPoint ret = MemoryDataPoint.__new__(MemoryDataPoint) + ret._data = data + return ret + + @property + def timestamp(self): + """Time when sample was taken (seconds since epoch).""" + return self._data.timestamp + + @property + def free_memory(self): + """Free GPU memory in bytes.""" + return self._data.free_memory + + @property + def total_memory(self): + """Total GPU memory in bytes.""" + return self._data.total_memory + + @property + def used_memory(self): + """Used GPU memory in bytes.""" + return self._data.used_memory + + def __repr__(self): + return (f"MemoryDataPoint(timestamp={self.timestamp}, " + f"free_memory={self.free_memory}, " + f"total_memory={self.total_memory}, " + f"used_memory={self.used_memory})") + + +cdef class CuptiMonitor: + """CUDA memory monitoring using CUPTI (CUDA Profiling Tools Interface). + + This class provides memory monitoring capabilities for CUDA applications + by intercepting CUDA runtime and driver API calls related to memory + operations and kernel launches. + """ + + def __cinit__(self, enable_periodic_sampling=False, int sampling_interval_ms=100): + """Initialize a CuptiMonitor instance. + + Parameters + ---------- + enable_periodic_sampling + Enable background thread for periodic memory sampling + sampling_interval_ms + Interval between periodic samples in milliseconds + """ + cdef bool_t enable_sampling = enable_periodic_sampling + + self._handle = make_unique[cpp_CuptiMonitor]( + enable_sampling, + milliseconds(sampling_interval_ms), + ) + + def __dealloc__(self): + """Destructor - automatically stops monitoring and cleans up CUPTI.""" + with nogil: + self._handle.reset() + + def start_monitoring(self): + """Start memory monitoring. + + Initializes CUPTI and begins intercepting CUDA API calls. + + Raises + ------ + RuntimeError + If CUPTI initialization fails + """ + with nogil: + self._handle.get().start_monitoring() + + def stop_monitoring(self): + """Stop memory monitoring. + + Stops CUPTI callbacks and periodic sampling if enabled. + """ + with nogil: + self._handle.get().stop_monitoring() + + def is_monitoring(self) : + """Check if monitoring is currently active. + + Returns + ------- + True if monitoring is active, False otherwise + """ + cdef bool_t ret + with nogil: + ret = self._handle.get().is_monitoring() + return ret + + def capture_memory_sample(self): + """Manually capture current memory usage. + + This can be called at any time to manually record a memory sample, + regardless of whether periodic sampling is enabled. + """ + with nogil: + self._handle.get().capture_memory_sample() + + def get_memory_samples(self): + """Get all collected memory samples. + + Returns + ------- + List of memory data points + """ + cdef const vector[cpp_MemoryDataPoint]* samples + with nogil: + samples = &self._handle.get().get_memory_samples() + + cdef list ret = [] + cdef size_t i + for i in range(samples.size()): + ret.append(MemoryDataPoint.from_cpp(deref(samples)[i])) + return ret + + def clear_samples(self): + """Clear all collected memory samples.""" + with nogil: + self._handle.get().clear_samples() + + def get_sample_count(self): + """Get the number of memory samples collected. + + Returns + ------- + Number of samples + """ + cdef size_t ret + with nogil: + ret = self._handle.get().get_sample_count() + return ret + + def write_csv(self, filename: str): + """Write memory samples to CSV file. + + Parameters + ---------- + filename + Output CSV filename. + + Raises + ------ + RuntimeError + If file cannot be written. + """ + cdef string c_filename = filename.encode('utf-8') + with nogil: + self._handle.get().write_csv(c_filename) + + def set_debug_output(self, enabled: bool, threshold_mb: int = 10): + """Enable or disable debug output for significant memory changes. + + Parameters + ---------- + enabled + If True, prints debug info when memory usage changes significantly + threshold_mb + Threshold in MB for what constitutes a "significant" change (default: 10) + """ + cdef bool_t c_enabled = enabled + cdef size_t c_threshold = threshold_mb + with nogil: + self._handle.get().set_debug_output(c_enabled, c_threshold) + + def get_callback_counters(self): + """Get callback counters for all monitored CUPTI callbacks. + + Returns a dictionary where keys are CUPTI callback IDs and values are the number + of times each callback was triggered during monitoring. + + Returns + ------- + Dictionary from CUPTI callback ID to call count + """ + cdef unordered_map[CUpti_CallbackId, size_t] counters + with nogil: + counters = self._handle.get().get_callback_counters() + + cdef dict ret = {} + cdef unordered_map[CUpti_CallbackId, size_t].iterator it = counters.begin() + while it != counters.end(): + ret[deref(it).first] = deref(it).second + postincrement(it) + return ret + + def clear_callback_counters(self): + """Clear all callback counters. + + Resets all callback counters to zero. + """ + with nogil: + self._handle.get().clear_callback_counters() + + def get_total_callback_count(self): + """Get total number of callbacks triggered across all monitored callback IDs. + + Returns + ------- + Total number of callbacks + """ + cdef size_t ret + with nogil: + ret = self._handle.get().get_total_callback_count() + return ret + + def get_callback_summary(self): + """Get a human-readable summary of callback counters. + + Returns a formatted string showing callback names and their counts. + + Returns + ------- + String containing callback counter summary + """ + cdef string summary + with nogil: + summary = self._handle.get().get_callback_summary() + return summary.decode('utf-8') diff --git a/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py b/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py index ffdfee0cd..a0204643a 100644 --- a/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py +++ b/python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py @@ -32,6 +32,13 @@ from rapidsmpf.testing import pylibcudf_to_cudf_dataframe from rapidsmpf.utils.string import format_bytes, parse_bytes +try: + from rapidsmpf.cupti import CuptiMonitor + + CUPTI_AVAILABLE = True +except ImportError: + CUPTI_AVAILABLE = False + if TYPE_CHECKING: from collections.abc import Callable @@ -297,6 +304,19 @@ def setup_and_run(args: argparse.Namespace) -> None: stats = Statistics(enable=args.statistics, mr=mr) + cupti_monitor = None + if args.monitor_memory is not None: + if not CUPTI_AVAILABLE: + if comm.rank == 0: + comm.logger.print( + "WARNING: --memory-monitor specified but CUPTI support not available. " + "CUPTI monitoring disabled." + ) + else: + cupti_monitor = CuptiMonitor(enable_periodic_sampling=False) + if comm.rank == 0: + comm.logger.print("CUPTI memory monitoring enabled") + if comm.rank == 0: spill_device = ( "disabled" if args.spill_device is None else format_bytes(args.spill_device) @@ -316,6 +336,10 @@ def setup_and_run(args: argparse.Namespace) -> None: ) MPI.COMM_WORLD.barrier() + + if cupti_monitor is not None: + cupti_monitor.start_monitoring() + start_time = MPI.Wtime() bulk_mpi_shuffle( paths=sorted(map(str, args.input.glob("**/*"))), @@ -331,6 +355,27 @@ def setup_and_run(args: argparse.Namespace) -> None: elapsed_time = MPI.Wtime() - start_time MPI.COMM_WORLD.barrier() + if cupti_monitor is not None: + cupti_monitor.stop_monitoring() + + csv_filename = f"{args.monitor_memory}_{comm.rank}.csv" + try: + # Write CSV files + cupti_monitor.write_csv(csv_filename) + comm.logger.print( + f"CUPTI memory data written to {csv_filename} " + f"({cupti_monitor.get_sample_count()} samples, " + f"{cupti_monitor.get_total_callback_count()} callbacks)" + ) + + # Print callback summary for rank 0 + if comm.rank == 0: + comm.logger.print( + f"CUPTI Callback Summary:\n{cupti_monitor.get_callback_summary()}" + ) + except Exception as e: + comm.logger.print(f"Failed to write CUPTI CSV file: {e}") + mem_peak = format_bytes(mr.get_main_record().peak()) comm.logger.print( f"elapsed: {elapsed_time:.2f} sec | rmm device memory peak: {mem_peak}" @@ -434,6 +479,16 @@ def dir_path(path: str) -> Path: "be launched with 'mpirun'." ), ) + parser.add_argument( + "--monitor-memory", + type=str, + default=None, + help=( + "Enable memory monitoring with CUPTI and save CSV files with given path " + "prefix. For example, /tmp/test will write files to /tmp/test_.csv. " + "Requires CUPTI support to be compiled in." + ), + ) args = parser.parse_args() args.rmm_pool_size = (args.rmm_pool_size // 256) * 256 # Align to 256 bytes setup_and_run(args) diff --git a/python/rapidsmpf/rapidsmpf/examples/example_cupti_monitor.py b/python/rapidsmpf/rapidsmpf/examples/example_cupti_monitor.py new file mode 100644 index 000000000..858f34f8c --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/examples/example_cupti_monitor.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +""" +Simple example demonstrating the use of CuptiMonitor. + +This example shows how to use RapidsMPF's CuptiMonitor to track +GPU memory usage during CUDA operations. +""" + +from __future__ import annotations + +import rmm + +try: + from rapidsmpf.cupti import CuptiMonitor + + CUPTI_AVAILABLE = True +except ImportError: + CUPTI_AVAILABLE = False + + +def main() -> None: + """Main example function demonstrating CUPTI memory monitoring.""" + print("CUPTI Memory Monitor Example") + print("============================\n") + + if not CUPTI_AVAILABLE: + print( + "CUPTI support is not available. Please ensure CUPTI support is compiled in." + ) + return + + try: + # Create a CuptiMonitor with periodic sampling enabled (every 100ms) + monitor = CuptiMonitor(enable_periodic_sampling=True, sampling_interval_ms=100) + + # Enable debug output for memory changes > 5MB + monitor.set_debug_output(enabled=True, threshold_mb=5) + + print("Starting CUPTI monitoring...") + monitor.start_monitoring() + + # Perform some GPU memory operations to demonstrate monitoring + num_allocations = 3 + allocation_size = 64 * 1024 * 1024 # 64MB each + device_buffers: list[rmm.DeviceBuffer] = [] + + for _ in range(num_allocations): + print( + f"Allocating {allocation_size // (1024 * 1024)} MB on GPU using rmm.DeviceBuffer..." + ) + try: + # Allocate device memory using rmm.DeviceBuffer + buf = rmm.DeviceBuffer(size=allocation_size) + device_buffers.append(buf) + except Exception as e: + print(f"rmm.DeviceBuffer allocation failed: {e}") + break + + # Manually capture a memory sample + monitor.capture_memory_sample() + + print("\nReleasing allocated memory (handled by rmm.DeviceBuffer cleanup)...") + device_buffers.clear() + + print("Stopping monitoring...") + monitor.stop_monitoring() + + # Report results + samples = monitor.get_memory_samples() + print("\nMemory monitoring results:") + print(f"Total samples collected: {len(samples)}") + + if samples: + initial_sample = samples[0] + final_sample = samples[-1] + initial_utilization = ( + initial_sample.used_memory / initial_sample.total_memory + ) * 100.0 + final_utilization = ( + final_sample.used_memory / final_sample.total_memory + ) * 100.0 + + print( + f"Initial memory usage: {initial_sample.used_memory / (1024.0 * 1024.0):.2f} MB " + f"({initial_utilization:.1f}%)" + ) + print( + f"Final memory usage: {final_sample.used_memory / (1024.0 * 1024.0):.2f} MB " + f"({final_utilization:.1f}%)" + ) + + # Find peak memory usage + peak_used = 0 + peak_utilization = 0.0 + for sample in samples: + if sample.used_memory > peak_used: + peak_used = sample.used_memory + peak_utilization = ( + sample.used_memory / sample.total_memory + ) * 100.0 + + print( + f"Peak memory usage: {peak_used / (1024.0 * 1024.0):.2f} MB " + f"({peak_utilization:.1f}%)" + ) + + # Write results to CSV file + csv_filename = "cupti_monitor_example.csv" + monitor.write_csv(csv_filename) + print(f"Memory usage data written to {csv_filename}") + + # Show callback counter summary + print(f"\n{monitor.get_callback_summary()}") + + except Exception as e: + print(f"Error: {e}") + return + + print("\nExample completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/python/rapidsmpf/rapidsmpf/tests/test_cupti.py b/python/rapidsmpf/rapidsmpf/tests/test_cupti.py new file mode 100644 index 000000000..f3c845e21 --- /dev/null +++ b/python/rapidsmpf/rapidsmpf/tests/test_cupti.py @@ -0,0 +1,574 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import tempfile +import threading +import time +from pathlib import Path +from typing import TYPE_CHECKING + +import cupy as cp +import cupy.cuda.runtime as runtime +import pytest + +if TYPE_CHECKING: + from collections.abc import Generator + +# Try to import CUPTI module, skip tests if not available +try: + import cupy as cp + + from rapidsmpf.cupti import CuptiMonitor, MemoryDataPoint + + CUPTI_AVAILABLE = True +except ImportError: + CUPTI_AVAILABLE = False + + +def _perform_gpu_operations(size_bytes: int, num_operations: int = 1) -> None: + """Helper function to allocate and free GPU memory using CUDA runtime APIs.""" + if not CUPTI_AVAILABLE: + return + + # Use multiple approaches to trigger CUDA runtime API calls that CUPTI monitors + try: + for _ in range(num_operations): + # Allocate using CUDA runtime API + ptr = runtime.malloc(size_bytes) + # Free using CUDA runtime API + runtime.free(ptr) + + except cp.cuda.memory.OutOfMemoryError as e: + pytest.fail(f"GPU memory allocation failed: {e}") + + +@pytest.fixture +def cuda_context() -> Generator[None, None, None]: + """Fixture to ensure CUDA context is initialized.""" + try: + # Initialize CUDA context + cp.cuda.Device(0).use() + yield + finally: + # Synchronize device to ensure all operations complete + cp.cuda.Device().synchronize() + + +class TestMemoryDataPoint: + """Test cases for MemoryDataPoint class.""" + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_memory_data_point_properties(self) -> None: + """Test that MemoryDataPoint properties work correctly.""" + # Note: We can't directly create MemoryDataPoint instances from Python + # They are created internally by CuptiMonitor, so we test them through that + monitor = CuptiMonitor() + monitor.start_monitoring() + monitor.capture_memory_sample() + monitor.stop_monitoring() + + samples = monitor.get_memory_samples() + assert len(samples) > 0 + + sample = samples[0] + assert isinstance(sample, MemoryDataPoint) + assert isinstance(sample.timestamp, float) + assert isinstance(sample.free_memory, int) + assert isinstance(sample.total_memory, int) + assert isinstance(sample.used_memory, int) + + assert sample.timestamp > 0 + assert sample.total_memory > 0 + assert sample.free_memory <= sample.total_memory + assert sample.used_memory == sample.total_memory - sample.free_memory + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_memory_data_point_repr(self) -> None: + """Test MemoryDataPoint string representation.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + monitor.capture_memory_sample() + monitor.stop_monitoring() + + samples = monitor.get_memory_samples() + sample = samples[0] + + repr_str = repr(sample) + assert "MemoryDataPoint" in repr_str + assert "timestamp=" in repr_str + assert "free_memory=" in repr_str + assert "total_memory=" in repr_str + assert "used_memory=" in repr_str + + +class TestCuptiMonitor: + """Test cases for CuptiMonitor class.""" + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_basic_construction(self, cuda_context: None) -> None: + """Test CuptiMonitor construction with different parameters.""" + # Test default construction + monitor1 = CuptiMonitor() + assert not monitor1.is_monitoring() + assert monitor1.get_sample_count() == 0 + + # Test construction with parameters + monitor2 = CuptiMonitor(enable_periodic_sampling=True, sampling_interval_ms=50) + assert not monitor2.is_monitoring() + assert monitor2.get_sample_count() == 0 + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_start_stop_monitoring(self, cuda_context: None) -> None: + """Test starting and stopping monitoring.""" + monitor = CuptiMonitor() + + # Initially not monitoring + assert not monitor.is_monitoring() + + # Start monitoring + monitor.start_monitoring() + assert monitor.is_monitoring() + + # Should have captured initial state + assert monitor.get_sample_count() > 0 + + # Stop monitoring + monitor.stop_monitoring() + assert not monitor.is_monitoring() + + # Should have captured final state + final_count = monitor.get_sample_count() + assert final_count >= 2 # At least initial + final + + # Stopping again should be safe + monitor.stop_monitoring() + assert not monitor.is_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_double_start_monitoring(self, cuda_context: None) -> None: + """Test that starting monitoring twice is safe.""" + monitor = CuptiMonitor() + + # Start monitoring twice - should be safe + monitor.start_monitoring() + assert monitor.is_monitoring() + first_count = monitor.get_sample_count() + + monitor.start_monitoring() + assert monitor.is_monitoring() + + # Should not have added extra samples + assert monitor.get_sample_count() == first_count + + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_manual_capture(self, cuda_context: None) -> None: + """Test manual memory sample capture.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + initial_count = monitor.get_sample_count() + + # Manual capture should add a sample + monitor.capture_memory_sample() + assert monitor.get_sample_count() == initial_count + 1 + + # Multiple manual captures + monitor.capture_memory_sample() + monitor.capture_memory_sample() + assert monitor.get_sample_count() == initial_count + 3 + + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_manual_capture_without_monitoring(self, cuda_context: None) -> None: + """Test that manual capture without monitoring is safe but no-op.""" + monitor = CuptiMonitor() + + # Manual capture without monitoring should be safe but no-op + monitor.capture_memory_sample() + assert monitor.get_sample_count() == 0 + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_memory_operations_detection(self, cuda_context: None) -> None: + """Test that GPU memory operations are detected.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + initial_count = monitor.get_sample_count() + + # Perform GPU memory operations - should trigger CUPTI callbacks + _perform_gpu_operations(1024 * 1024, 3) # 1 MiB, 3 operations + + final_count = monitor.get_sample_count() + # Should have captured memory allocations/deallocations + assert final_count > initial_count + + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_memory_data_points(self, cuda_context: None) -> None: + """Test memory data point collection and validation.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + # Perform some operations + _perform_gpu_operations(2 * 1024 * 1024) # 2 MiB + + monitor.stop_monitoring() + + samples = monitor.get_memory_samples() + assert len(samples) > 0 + + # Check data point structure + for sample in samples: + assert sample.timestamp > 0 + assert sample.total_memory > 0 + assert sample.free_memory <= sample.total_memory + assert sample.used_memory == sample.total_memory - sample.free_memory + + # Timestamps should be in order + for i in range(1, len(samples)): + assert samples[i].timestamp >= samples[i - 1].timestamp + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_clear_samples(self, cuda_context: None) -> None: + """Test clearing collected samples.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + _perform_gpu_operations(1024 * 1024) # 1 MiB + + assert monitor.get_sample_count() > 0 + samples_before = monitor.get_memory_samples() + assert len(samples_before) > 0 + + # Clear samples + monitor.clear_samples() + assert monitor.get_sample_count() == 0 + + samples_after = monitor.get_memory_samples() + assert len(samples_after) == 0 + + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_periodic_sampling(self, cuda_context: None) -> None: + """Test periodic sampling functionality.""" + # Monitor with periodic sampling every 50ms + monitor = CuptiMonitor(enable_periodic_sampling=True, sampling_interval_ms=50) + monitor.start_monitoring() + + initial_count = monitor.get_sample_count() + + # Wait for periodic samples to be collected + time.sleep(0.2) + + final_count = monitor.get_sample_count() + + # Should have collected periodic samples + assert final_count > initial_count + + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_no_periodic_sampling(self, cuda_context: None) -> None: + """Test that periodic sampling can be disabled.""" + # Monitor without periodic sampling + monitor = CuptiMonitor(enable_periodic_sampling=False, sampling_interval_ms=50) + monitor.start_monitoring() + + initial_count = monitor.get_sample_count() + + # Wait - should not collect periodic samples + time.sleep(0.2) + + final_count = monitor.get_sample_count() + + # Should only have initial sample (no periodic sampling) + assert final_count == initial_count + + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_csv_export(self, cuda_context: None) -> None: + """Test CSV export functionality.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + _perform_gpu_operations(1024 * 1024, 2) # 1 MiB, 2 operations + + monitor.stop_monitoring() + + # Create temporary file for CSV output + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + filename = f.name + + try: + # Write CSV + monitor.write_csv(filename) + + # Verify file exists and has content + file_path = Path(filename) + assert file_path.exists() + + with file_path.open() as f: + lines = f.readlines() + + # Should have header + data lines + assert len(lines) > 1 + + # Check header + header = lines[0] + assert "timestamp" in header + assert "free_memory_bytes" in header + assert "total_memory_bytes" in header + assert "used_memory_bytes" in header + + # Check data lines have correct number of columns + for line in lines[1:]: + if line.strip(): # Skip empty lines + comma_count = line.count(",") + assert comma_count == 3 # 4 columns = 3 commas + + finally: + # Clean up + file_path = Path(filename) + if file_path.exists(): + file_path.unlink() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_csv_export_invalid_path(self, cuda_context: None) -> None: + """Test CSV export with invalid path raises exception.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + monitor.stop_monitoring() + + # Try to write to invalid path + with pytest.raises(RuntimeError): + monitor.write_csv("/invalid/path/file.csv") + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_debug_output(self, cuda_context: None) -> None: + """Test debug output configuration.""" + monitor = CuptiMonitor() + + # Test setting debug output + monitor.set_debug_output(enabled=True, threshold_mb=5) # 5MB threshold + monitor.set_debug_output(enabled=False, threshold_mb=10) # Disable + + # These calls should be safe regardless of monitoring state + monitor.start_monitoring() + monitor.set_debug_output(enabled=True, threshold_mb=1) # 1MB threshold + monitor.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_thread_safety(self, cuda_context: None) -> None: + """Test thread safety of CuptiMonitor.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + threads = [] + num_threads = 4 + + def thread_worker() -> None: + # Each thread does some GPU operations and manual captures + _perform_gpu_operations(1024 * 1024) # 1 MiB + monitor.capture_memory_sample() + time.sleep(0.01) # 10ms + monitor.capture_memory_sample() + + # Multiple threads performing operations simultaneously + for _ in range(num_threads): + thread = threading.Thread(target=thread_worker) + threads.append(thread) + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + monitor.stop_monitoring() + + # Should have collected samples from all threads + assert monitor.get_sample_count() > num_threads + + # All samples should be valid + samples = monitor.get_memory_samples() + for sample in samples: + assert sample.total_memory > 0 + assert sample.free_memory <= sample.total_memory + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_destructor_cleanup(self, cuda_context: None) -> None: + """Test that destructor properly cleans up monitoring.""" + + # Create monitor in a scope that will destroy it + def create_and_destroy_monitor() -> None: + monitor = CuptiMonitor() + monitor.start_monitoring() + _perform_gpu_operations(1024 * 1024) # 1 MiB + # Monitor should be destroyed here and automatically stop monitoring + + create_and_destroy_monitor() + + # Should be able to create a new monitor after destruction + monitor2 = CuptiMonitor() + monitor2.start_monitoring() + monitor2.stop_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_large_number_of_samples(self, cuda_context: None) -> None: + """Test handling of large number of samples.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + # Generate many samples + for _ in range(100): + monitor.capture_memory_sample() + + assert monitor.get_sample_count() == 101 # 100 manual + 1 initial + + samples = monitor.get_memory_samples() + assert len(samples) == 101 + + monitor.stop_monitoring() + assert monitor.get_sample_count() == 102 # +1 final + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_callback_counters(self, cuda_context: None) -> None: + """Test callback counter functionality.""" + monitor = CuptiMonitor() + + # Initially no callbacks + assert monitor.get_total_callback_count() == 0 + counters = monitor.get_callback_counters() + assert len(counters) == 0 + + monitor.start_monitoring() + + # Perform GPU operations that should trigger callbacks + _perform_gpu_operations(1024 * 1024, 2) # 1 MiB, 2 operations + + # Also try some additional CUDA operations to ensure callbacks are triggered + ptr1 = runtime.mallocManaged(1024 * 1024) + runtime.free(ptr1) + + ptr2 = runtime.malloc(1024 * 1024) + runtime.free(ptr2) + + monitor.stop_monitoring() + + # Get callback information for debugging + total_callbacks = monitor.get_total_callback_count() + counters = monitor.get_callback_counters() + summary = monitor.get_callback_summary() + + # Should have recorded some callbacks + assert total_callbacks > 0 + assert len(counters) > 0 + + # Verify that callback summary doesn't crash and contains expected content + assert len(summary) > 0 + assert "CUPTI Callback Counter Summary" in summary + assert "Total" in summary + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_callback_counters_clear(self, cuda_context: None) -> None: + """Test clearing callback counters.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + _perform_gpu_operations(1024 * 1024) # 1 MiB + + # Also try additional CUDA operations + ptr = runtime.malloc(1024 * 1024) + runtime.free(ptr) + + monitor.stop_monitoring() + + # Check if callbacks were recorded + total_callbacks = monitor.get_total_callback_count() + assert total_callbacks > 0 + + # Should have some callbacks recorded + assert total_callbacks > 0 + + # Clear counters + monitor.clear_callback_counters() + + # Should be empty now + assert monitor.get_total_callback_count() == 0 + counters = monitor.get_callback_counters() + assert len(counters) == 0 + + # Summary should indicate no callbacks + summary = monitor.get_callback_summary() + assert "No callbacks recorded yet" in summary + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_callback_counters_accumulate(self, cuda_context: None) -> None: + """Test that callback counters accumulate properly.""" + monitor = CuptiMonitor() + monitor.start_monitoring() + + # First batch of operations + _perform_gpu_operations(1024 * 1024, 1) # 1 MiB, 1 operation + first_count = monitor.get_total_callback_count() + + # Second batch of operations + _perform_gpu_operations(1024 * 1024, 1) # 1 MiB, 1 operation + second_count = monitor.get_total_callback_count() + + monitor.stop_monitoring() + + # Should have accumulated more callbacks (or at least stayed the same) + assert first_count > 0 + assert second_count > 0 + assert second_count >= first_count + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_parameter_validation(self, cuda_context: None) -> None: + """Test parameter validation for CuptiMonitor constructor.""" + # Test with various parameter combinations + monitor1 = CuptiMonitor(enable_periodic_sampling=False) + assert not monitor1.is_monitoring() + + monitor2 = CuptiMonitor(sampling_interval_ms=200) + assert not monitor2.is_monitoring() + + monitor3 = CuptiMonitor(enable_periodic_sampling=True, sampling_interval_ms=25) + assert not monitor3.is_monitoring() + + @pytest.mark.skipif(not CUPTI_AVAILABLE, reason="CUPTI support not available") + def test_context_manager_like_usage(self, cuda_context: None) -> None: + """Test using CuptiMonitor in a context-manager-like pattern.""" + monitor = CuptiMonitor() + + try: + monitor.start_monitoring() + _perform_gpu_operations(1024 * 1024) # 1 MiB + + # Should have samples + assert monitor.get_sample_count() > 0 + + finally: + monitor.stop_monitoring() + + # Should still have samples after stopping + assert monitor.get_sample_count() > 0 + + +# Tests when CUPTI is not available +class TestCuptiNotAvailable: + """Test cases for when CUPTI support is not available.""" + + @pytest.mark.skipif(CUPTI_AVAILABLE, reason="CUPTI support is available") + def test_cupti_not_available(self) -> None: + """Test that appropriate error is raised when CUPTI is not available.""" + pytest.skip( + "CUPTI support not enabled. Build with -DBUILD_CUPTI_SUPPORT=ON to enable tests." + )