diff --git a/python/examples/zarr_cupy_nvcomp.py b/python/examples/zarr_cupy_nvcomp.py new file mode 100644 index 0000000000..03d96b21ef --- /dev/null +++ b/python/examples/zarr_cupy_nvcomp.py @@ -0,0 +1,58 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import cupy +import numpy +import zarr + +import kvikio +import kvikio.zarr + + +def main(path): + a = cupy.arange(20) + + # Let's use KvikIO's convenience function `open_cupy_array()` to create + # a new Zarr file on disk. Its semantic is the same as `zarr.open_array()` + # but uses a GDS file store, nvCOMP compression, and CuPy arrays. + z = kvikio.zarr.open_cupy_array(store=path, mode="w", shape=(20,), chunks=(5,)) + + # `z` is a regular Zarr Array that we can write to as usual + z[0:10] = numpy.arange(0, 10) + # but it also support direct reads and writes of CuPy arrays + z[10:20] = cupy.arange(10, 20) + + # Reading `z` returns a CuPy array + assert isinstance(z[:], cupy.ndarray) + assert (a == z[:]).all() + + # Normally, we cannot assume that GPU and CPU compressors are compatible. + # E.g., `open_cupy_array()` uses nvCOMP's Snappy GPU compression by default, + # which, as far as we know, isn’t compatible with any CPU compressor. Thus, + # let’s re-write our Zarr array using a CPU and GPU compatible compressor. + z = kvikio.zarr.open_cupy_array( + store=path, + mode="w", + shape=(20,), + chunks=(5,), + compressor=kvikio.zarr.CompatCompressor.lz4(), + ) + z[:] = a + + # Because we are using a CompatCompressor, it is now possible to open the file + # using Zarr's built-in LZ4 decompressor that uses the CPU. + z = zarr.open_array(path) + # `z` is now read as a regular NumPy array + assert isinstance(z[:], numpy.ndarray) + assert (a.get() == z[:]).all() + # and we can write to is as usual + z[:] = numpy.arange(20, 40) + + # And we can read the Zarr file back into a CuPy array. + z = kvikio.zarr.open_cupy_array(store=path, mode="r") + assert isinstance(z[:], cupy.ndarray) + assert (cupy.arange(20, 40) == z[:]).all() + + +if __name__ == "__main__": + main("/tmp/zarr-cupy-nvcomp") diff --git a/python/kvikio/nvcomp_codec.py b/python/kvikio/nvcomp_codec.py index 872c93b9e6..af36494bad 100644 --- a/python/kvikio/nvcomp_codec.py +++ b/python/kvikio/nvcomp_codec.py @@ -8,7 +8,7 @@ from numcodecs.abc import Codec from numcodecs.compat import ensure_contiguous_ndarray_like -import kvikio._lib.libnvcomp_ll as _ll +from kvikio._lib.libnvcomp_ll import SUPPORTED_ALGORITHMS class NvCompBatchCodec(Codec): @@ -34,11 +34,11 @@ def __init__( stream: Optional[cp.cuda.Stream] = None, ) -> None: algo_id = algorithm.lower() - algo_t = _ll.SUPPORTED_ALGORITHMS.get(algo_id, None) + algo_t = SUPPORTED_ALGORITHMS.get(algo_id, None) if algo_t is None: raise ValueError( f"{algorithm} is not supported. " - f"Must be one of: {list(_ll.SUPPORTED_ALGORITHMS.keys())}" + f"Must be one of: {list(SUPPORTED_ALGORITHMS.keys())}" ) self.algorithm = algo_id diff --git a/python/kvikio/zarr.py b/python/kvikio/zarr.py index 50a6756db8..1c030f96ae 100644 --- a/python/kvikio/zarr.py +++ b/python/kvikio/zarr.py @@ -1,13 +1,15 @@ # Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. +from __future__ import annotations import contextlib import os import os.path from abc import abstractmethod -from typing import Any, Mapping, Sequence +from typing import Any, Literal, Mapping, Optional, Sequence, Union import cupy +import numcodecs import numpy import numpy as np import zarr @@ -20,6 +22,9 @@ import kvikio import kvikio.nvcomp +import kvikio.nvcomp_codec +import kvikio.zarr +from kvikio.nvcomp_codec import NvCompBatchCodec MINIMUM_ZARR_VERSION = "2.15" @@ -37,22 +42,60 @@ class GDSStore(zarr.storage.DirectoryStore): It uses KvikIO for reads and writes, which in turn will use GDS when applicable. + Parameters + ---------- + path : string + Location of directory to use as the root of the storage hierarchy. + normalize_keys : bool, optional + If True, all store keys will be normalized to use lower case characters + (e.g. 'foo' and 'FOO' will be treated as equivalent). This can be + useful to avoid potential discrepancies between case-sensitive and + case-insensitive file system. Default value is False. + dimension_separator : {'.', '/'}, optional + Separator placed between the dimensions of a chunk. + compressor_config_overwrite + If not None, use this `Mapping` to specify what is written to the Zarr metadata + file on disk (`.zarray`). Normally, Zarr writes the configuration[1] given by + the `compressor` argument to the `.zarray` file. Use this argument to overwrite + the normal configuration and use the specified `Mapping` instead. + decompressor_config_overwrite + If not None, use this `Mapping` to specify what compressor configuration[1] is + used for decompressing no matter the configuration found in the Zarr metadata + on disk (the `.zarray` file). + + [1] https://github.com/zarr-developers/numcodecs/blob/cb155432/numcodecs/abc.py#L79 + Notes ----- - GDSStore doesn't implement `_fromfile()` thus non-array data such as - meta data is always read into host memory. - This is because only zarr.Array use getitems() to retrieve data. + Atomic writes are used, which means that data are first written to a + temporary file, then moved into place when the write is successfully + completed. Files are only held open while they are being read or written and are + closed immediately afterwards, so there is no need to manually close any files. + + Safe to write in multiple threads or processes. """ # The default output array type used by getitems(). default_meta_array = numpy.empty(()) - def __init__(self, *args, **kwargs) -> None: + def __init__( + self, + path, + normalize_keys=False, + dimension_separator=None, + *, + compressor_config_overwrite: Optional[Mapping] = None, + decompressor_config_overwrite: Optional[Mapping] = None, + ) -> None: if not kvikio.zarr.supported: raise RuntimeError( f"GDSStore requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}" ) - super().__init__(*args, **kwargs) + super().__init__( + path, normalize_keys=normalize_keys, dimension_separator=dimension_separator + ) + self.compressor_config_overwrite = compressor_config_overwrite + self.decompressor_config_overwrite = decompressor_config_overwrite def __eq__(self, other): return isinstance(other, GDSStore) and self.path == other.path @@ -62,6 +105,23 @@ def _tofile(self, a, fn): written = f.write(a) assert written == a.nbytes + def __getitem__(self, key): + ret = super().__getitem__(key) + if self.decompressor_config_overwrite and key == ".zarray": + meta = self._metadata_class.decode_array_metadata(ret) + if meta["compressor"]: + meta["compressor"] = self.decompressor_config_overwrite + ret = self._metadata_class.encode_array_metadata(meta) + return ret + + def __setitem__(self, key, value): + if self.compressor_config_overwrite and key == ".zarray": + meta = self._metadata_class.decode_array_metadata(value) + if meta["compressor"]: + meta["compressor"] = self.compressor_config_overwrite + value = self._metadata_class.encode_array_metadata(meta) + super().__setitem__(key, value) + def getitems( self, keys: Sequence[str], @@ -237,3 +297,107 @@ def get_nvcomp_manager(self): nvcomp_compressors = [ANS, Bitcomp, Cascaded, Gdeflate, LZ4, Snappy] for c in nvcomp_compressors: register_codec(c) + + +class CompatCompressor: + """A pair of compatible compressors one using the CPU and one using the GPU""" + + def __init__(self, cpu: Codec, gpu: Codec) -> None: + self.cpu = cpu + self.gpu = gpu + + @classmethod + def lz4(cls) -> CompatCompressor: + """A compatible pair of LZ4 compressors""" + return cls(cpu=numcodecs.LZ4(), gpu=NvCompBatchCodec("lz4")) + + +def open_cupy_array( + store: Union[os.PathLike, str], + mode: Literal["r", "r+", "a", "w", "w-"] = "a", + compressor: Codec | CompatCompressor = Snappy(device_ordinal=0), + meta_array=cupy.empty(()), + **kwargs, +) -> zarr.Array: + """Open an Zarr array as a CuPy-like array using file-mode-like semantics. + + This function is a CUDA friendly version of `zarr.open_array` that reads + and writes to CuPy arrays. Beside the arguments listed below, the arguments + have the same semantics as in `zarr.open_array`. + + Parameters + ---------- + store + Path to directory in file system. As opposed to `zarr.open_array`, + Store and path to zip files isn't supported. + mode + Persistence mode: 'r' means read only (must exist); 'r+' means + read/write (must exist); 'a' means read/write (create if doesn't + exist); 'w' means create (overwrite if exists); 'w-' means create + (fail if exists). + compressor + The compressor used when creating a Zarr file or None if no compressor + is to be used. If a `CompatCompressor` is given, `CompatCompressor.gpu` + is used for compression and decompression; and `CompatCompressor.cpu` + is written as the compressor in the Zarr file metadata on disk. + This argument is ignored in "r" and "r+" mode. By default the + Snappy compressor by nvCOMP is used. + meta_array : array-like, optional + An CuPy-like array instance to use for determining arrays to create and + return to users. It must implement `__cuda_array_interface__`. + **kwargs + The rest of the arguments are forwarded to `zarr.open_array` as-is. + + Returns + ------- + Zarr array backed by a GDS file store, nvCOMP compression, and CuPy arrays. + """ + + if not isinstance(store, (str, os.PathLike)): + raise ValueError("store must be a path") + store = str(os.fspath(store)) + if not hasattr(meta_array, "__cuda_array_interface__"): + raise ValueError("meta_array must implement __cuda_array_interface__") + + if mode in ("r", "r+"): + ret = zarr.open_array( + store=kvikio.zarr.GDSStore(path=store), + mode=mode, + meta_array=meta_array, + **kwargs, + ) + # If we are reading a LZ4-CPU compressed file, we overwrite the metadata + # on-the-fly to make Zarr use LZ4-GPU for both compression and decompression. + compat_lz4 = CompatCompressor.lz4() + if ret.compressor == compat_lz4.cpu: + ret = zarr.open_array( + store=kvikio.zarr.GDSStore( + path=store, + compressor_config_overwrite=compat_lz4.cpu.get_config(), + decompressor_config_overwrite=compat_lz4.gpu.get_config(), + ), + mode=mode, + meta_array=meta_array, + **kwargs, + ) + return ret + + if isinstance(compressor, CompatCompressor): + compressor_config_overwrite = compressor.cpu.get_config() + decompressor_config_overwrite = compressor.gpu.get_config() + compressor = compressor.gpu + else: + compressor_config_overwrite = None + decompressor_config_overwrite = None + + return zarr.open_array( + store=kvikio.zarr.GDSStore( + path=store, + compressor_config_overwrite=compressor_config_overwrite, + decompressor_config_overwrite=decompressor_config_overwrite, + ), + mode=mode, + meta_array=meta_array, + compressor=compressor, + **kwargs, + ) diff --git a/python/tests/test_examples.py b/python/tests/test_examples.py index 3343216c6f..e9e1f83d08 100644 --- a/python/tests/test_examples.py +++ b/python/tests/test_examples.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import os @@ -16,3 +16,13 @@ def test_hello_world(tmp_path, monkeypatch): monkeypatch.syspath_prepend(str(examples_path)) import_module("hello_world").main(tmp_path / "test-file") + + +def test_zarr_cupy_nvcomp(tmp_path, monkeypatch): + """Test examples/zarr_cupy_nvcomp.py""" + + # `examples/zarr_cupy_nvcomp.py` requires the Zarr submodule + pytest.importorskip("kvikio.zarr") + + monkeypatch.syspath_prepend(str(examples_path)) + import_module("zarr_cupy_nvcomp").main(tmp_path / "test-file") diff --git a/python/tests/test_zarr.py b/python/tests/test_zarr.py index 296c5f1ee6..f909559eea 100644 --- a/python/tests/test_zarr.py +++ b/python/tests/test_zarr.py @@ -4,12 +4,14 @@ import math +import numpy import pytest cupy = pytest.importorskip("cupy") zarr = pytest.importorskip("zarr") kvikio_zarr = pytest.importorskip("kvikio.zarr") - +kvikio_nvcomp_codec = pytest.importorskip("kvikio.nvcomp_codec") +numcodecs = pytest.importorskip("numcodecs") if not kvikio_zarr.supported: pytest.skip( @@ -156,3 +158,90 @@ def test_compressor(store, xp_write, xp_read, compressor): b = z[:] assert isinstance(b, xp_read.ndarray) cupy.testing.assert_array_equal(b, a) + + +@pytest.mark.parametrize("algo", ["lz4", "zstd"]) +def test_decompressor_config_overwrite(tmp_path, xp, algo): + cpu_codec = numcodecs.registry.get_codec({"id": algo}) + gpu_codec = kvikio_nvcomp_codec.NvCompBatchCodec(algo) + + # Write using Zarr's default file store and the `cpu_codec` compressor + z = zarr.open_array(tmp_path, mode="w", shape=(10,), compressor=cpu_codec) + z[:] = range(10) + assert z.compressor == cpu_codec + + # Open file using GDSStore and use `gpu_codec` as decompressor. + z = zarr.open_array( + kvikio_zarr.GDSStore( + tmp_path, + decompressor_config_overwrite=gpu_codec.get_config(), + ), + mode="r", + meta_array=xp.empty(()), + ) + assert z.compressor == gpu_codec + assert isinstance(z[:], xp.ndarray) + xp.testing.assert_array_equal(z[:], range(10)) + + +@pytest.mark.parametrize("algo", ["lz4"]) +def test_compressor_config_overwrite(tmp_path, xp, algo): + cpu_codec = numcodecs.registry.get_codec({"id": algo}) + gpu_codec = kvikio_nvcomp_codec.NvCompBatchCodec(algo) + + # Write file using GDSStore and the `gpu_codec` compressor. In order + # to make the file compatible with Zarr's builtin CPU decompressor, + # we set `cpu_codec` as the compressor in the meta file on disk. + z = zarr.open_array( + kvikio_zarr.GDSStore( + tmp_path, + compressor_config_overwrite=cpu_codec.get_config(), + decompressor_config_overwrite=gpu_codec.get_config(), + ), + mode="w", + shape=10, + compressor=gpu_codec, + meta_array=xp.empty(()), + ) + assert z.compressor == gpu_codec + z[:] = xp.arange(10) + + # We can now open the file using Zarr's builtin CPU decompressor + z = zarr.open_array(tmp_path, mode="r") + assert isinstance(z[:], numpy.ndarray) + numpy.testing.assert_array_equal(z[:], range(10)) + + +def test_open_cupy_array(tmp_path): + a = cupy.arange(10) + z = kvikio_zarr.open_cupy_array( + tmp_path, + mode="w", + shape=a.shape, + dtype=a.dtype, + chunks=(2,), + compressor=kvikio_zarr.CompatCompressor.lz4(), + ) + z[:] = a + assert a.shape == z.shape + assert a.dtype == z.dtype + assert isinstance(z[:], type(a)) + assert z.compressor == kvikio_nvcomp_codec.NvCompBatchCodec("lz4") + cupy.testing.assert_array_equal(a, z[:]) + + z = kvikio_zarr.open_cupy_array( + tmp_path, + mode="r", + ) + assert a.shape == z.shape + assert a.dtype == z.dtype + assert isinstance(z[:], type(a)) + assert z.compressor == kvikio_nvcomp_codec.NvCompBatchCodec("lz4") + cupy.testing.assert_array_equal(a, z[:]) + + z = zarr.open_array(tmp_path, mode="r") + assert a.shape == z.shape + assert a.dtype == z.dtype + assert isinstance(z[:], numpy.ndarray) + assert z.compressor == kvikio_zarr.CompatCompressor.lz4().cpu + numpy.testing.assert_array_equal(a.get(), z[:])