-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Appending to zarr store #2706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Appending to zarr store #2706
Changes from 28 commits
f231393
f14f3b7
928440d
442e938
389ba43
6097da2
390a792
da9a962
6756b8f
95d5782
a750a92
295084b
cc353e1
e56a210
519b398
c85aa98
3adfd49
608813b
2078838
7a90ce8
b8af5bd
5bee0dc
7ed77ad
b4ff1c7
5b3f8ea
7564329
93be790
58c4b78
5316593
ad08c73
4d2122b
105ed39
af4a5a5
62d4f52
9558811
9d70e02
a6ff494
34b700f
3e54cb9
97ed25b
beb12e5
41a6ca3
321aec1
2b130ff
58de86d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -188,6 +188,7 @@ Other enhancements | |
| report showing what exactly differs between the two objects (dimensions / | ||
| coordinates / variables / attributes) (:issue:`1507`). | ||
| By `Benoit Bovy <https://github.com/benbovy>`_. | ||
| - Added append capability to the zarr store. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to credit all the contributors to this PR. |
||
| - Add ``tolerance`` option to ``resample()`` methods ``bfill``, ``pad``, | ||
| ``nearest``. (:issue:`2695`) | ||
| By `Hauke Schulz <https://github.com/observingClouds>`_. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,16 +4,21 @@ | |
| from io import BytesIO | ||
| from numbers import Number | ||
| from pathlib import Path | ||
| import re | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
|
|
||
| from .. import Dataset, backends, conventions | ||
| from ..core import indexing | ||
| from ..core.combine import ( | ||
| _CONCAT_DIM_DEFAULT, _auto_combine, _infer_concat_order_from_positions) | ||
| from ..core.utils import close_on_error, is_grib_path, is_remote_uri | ||
| from ..core.variable import Variable | ||
| from .common import ArrayWriter | ||
| from .locks import _get_scheduler | ||
| from ..coding.variables import safe_setitem, unpack_for_encoding | ||
| from ..coding.strings import encode_string_array | ||
|
|
||
| DATAARRAY_NAME = '__xarray_dataarray_name__' | ||
| DATAARRAY_VARIABLE = '__xarray_dataarray_variable__' | ||
|
|
@@ -1003,8 +1008,30 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, | |
| for w, s in zip(writes, stores)]) | ||
|
|
||
|
|
||
| def encode_utf8(var, string_max_length): | ||
|
||
| dims, data, attrs, encoding = unpack_for_encoding(var) | ||
| missing = pd.isnull(data) | ||
| data[missing] = "" | ||
| data = encode_string_array(data, 'utf-8') | ||
| data = data.astype(np.dtype("S{}".format(string_max_length * 2))) | ||
| return Variable(dims, data, attrs, encoding) | ||
|
|
||
|
|
||
| def _validate_datatypes_for_zarr_append(dataset): | ||
| """DataArray.name and Dataset keys must be a string or None""" | ||
| def check_dtype(var): | ||
| if (not np.issubdtype(var.dtype, np.number) | ||
| and not np.issubdtype(var.dtype, np.string_)): | ||
| # and not re.match('^bytes[1-9]+$', var.dtype.name)): | ||
| raise ValueError('Invalid dtype for DataVariable: {} ' | ||
shikharsg marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| 'dtype must be a subtype of number or ' | ||
| 'a fixed sized string'.format(var)) | ||
| for k in dataset.data_vars.values(): | ||
| check_dtype(k) | ||
|
|
||
|
|
||
| def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, | ||
| encoding=None, compute=True, consolidated=False): | ||
| encoding=None, compute=True, consolidated=False, append_dim=None): | ||
| """This function creates an appropriate datastore for writing a dataset to | ||
| a zarr ztore | ||
|
|
||
|
|
@@ -1019,11 +1046,14 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, | |
| _validate_dataset_names(dataset) | ||
| _validate_attrs(dataset) | ||
|
|
||
| if mode == "a": | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we raise an error if |
||
| _validate_datatypes_for_zarr_append(dataset) | ||
|
|
||
| zstore = backends.ZarrStore.open_group(store=store, mode=mode, | ||
| synchronizer=synchronizer, | ||
| group=group, | ||
| consolidate_on_close=consolidated) | ||
|
|
||
| zstore.append_dim = append_dim | ||
| writer = ArrayWriter() | ||
| # TODO: figure out how to properly handle unlimited_dims | ||
| dump_to_store(dataset, zstore, writer, encoding=encoding) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -158,24 +158,35 @@ class ArrayWriter: | |||||
| def __init__(self, lock=None): | ||||||
| self.sources = [] | ||||||
| self.targets = [] | ||||||
| self.regions = [] | ||||||
| self.lock = lock | ||||||
|
|
||||||
| def add(self, source, target): | ||||||
| def add(self, source, target, region=None): | ||||||
| if isinstance(source, dask_array_type): | ||||||
| self.sources.append(source) | ||||||
| self.targets.append(target) | ||||||
| if region: | ||||||
| self.regions.append(region) | ||||||
|
||||||
| else: | ||||||
| target[...] = source | ||||||
| if region: | ||||||
| target[region] = source | ||||||
| else: | ||||||
| target[...] = source | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 This seems like a perfect (and rather general) way to handle appends at the dask level. |
||||||
|
|
||||||
| def sync(self, compute=True): | ||||||
| if self.sources: | ||||||
| import dask.array as da | ||||||
| # TODO: consider wrapping targets with dask.delayed, if this makes | ||||||
| # for any discernable difference in perforance, e.g., | ||||||
| # targets = [dask.delayed(t) for t in self.targets] | ||||||
|
|
||||||
| if not self.regions: | ||||||
| regions = None | ||||||
| else: | ||||||
| regions = self.regions | ||||||
| delayed_store = da.store(self.sources, self.targets, | ||||||
| lock=self.lock, compute=compute, | ||||||
| flush=True) | ||||||
| flush=True, regions=regions) | ||||||
|
||||||
| flush=True, regions=regions) | |
| flush=True, regions=self.regions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we reset self.regions = [] for consistency?
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,8 @@ | |
| from ..core import indexing | ||
| from ..core.pycompat import integer_types | ||
| from ..core.utils import FrozenOrderedDict, HiddenKeyDict | ||
| from .common import AbstractWritableDataStore, BackendArray | ||
| from .common import AbstractWritableDataStore, BackendArray, \ | ||
| _encode_variable_name | ||
|
|
||
| # need some special secret attributes to tell us the dimensions | ||
| _DIMENSION_KEY = '_ARRAY_DIMENSIONS' | ||
|
|
@@ -257,6 +258,7 @@ def __init__(self, zarr_group, consolidate_on_close=False): | |
| self._synchronizer = self.ds.synchronizer | ||
| self._group = self.ds.path | ||
| self._consolidate_on_close = consolidate_on_close | ||
| self.append_dim = None | ||
|
|
||
| def open_store_variable(self, name, zarr_array): | ||
| data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self)) | ||
|
|
@@ -313,40 +315,125 @@ def encode_variable(self, variable): | |
| def encode_attribute(self, a): | ||
| return _encode_zarr_attr_value(a) | ||
|
|
||
| def prepare_variable(self, name, variable, check_encoding=False, | ||
| unlimited_dims=None): | ||
|
|
||
| attrs = variable.attrs.copy() | ||
| dims = variable.dims | ||
| dtype = variable.dtype | ||
| shape = variable.shape | ||
|
|
||
| fill_value = attrs.pop('_FillValue', None) | ||
| if variable.encoding == {'_FillValue': None} and fill_value is None: | ||
| variable.encoding = {} | ||
|
|
||
| encoding = _extract_zarr_variable_encoding( | ||
| variable, raise_on_invalid=check_encoding) | ||
|
|
||
| encoded_attrs = OrderedDict() | ||
| # the magic for storing the hidden dimension data | ||
| encoded_attrs[_DIMENSION_KEY] = dims | ||
| for k, v in attrs.items(): | ||
| encoded_attrs[k] = self.encode_attribute(v) | ||
|
|
||
| zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
| fill_value=fill_value, **encoding) | ||
| zarr_array.attrs.put(encoded_attrs) | ||
|
|
||
| return zarr_array, variable.data | ||
|
|
||
| def store(self, variables, attributes, *args, **kwargs): | ||
| AbstractWritableDataStore.store(self, variables, attributes, | ||
| *args, **kwargs) | ||
| def store(self, variables, attributes, check_encoding_set=frozenset(), | ||
| writer=None, unlimited_dims=None): | ||
| """ | ||
| Top level method for putting data on this store, this method: | ||
| - encodes variables/attributes | ||
| - sets dimensions | ||
| - sets variables | ||
|
|
||
| Parameters | ||
| ---------- | ||
| variables : dict-like | ||
| Dictionary of key/value (variable name / xr.Variable) pairs | ||
| attributes : dict-like | ||
| Dictionary of key/value (attribute name / attribute) pairs | ||
| check_encoding_set : list-like | ||
| List of variables that should be checked for invalid encoding | ||
| values | ||
| writer : ArrayWriter | ||
| unlimited_dims : list-like | ||
| List of dimension names that should be treated as unlimited | ||
| dimensions. | ||
| dimension on which the zarray will be appended | ||
| only needed in append mode | ||
| """ | ||
|
|
||
| existing_variables = set([vn for vn in variables | ||
| if _encode_variable_name(vn) in self.ds]) | ||
| new_variables = set(variables) - existing_variables | ||
| variables_without_encoding = OrderedDict([(vn, variables[vn]) | ||
| for vn in new_variables]) | ||
| variables_encoded, attributes = self.encode( | ||
| variables_without_encoding, attributes) | ||
|
|
||
| if len(existing_variables) > 0: | ||
| # there are variables to append | ||
| # their encoding must be the same as in the store | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any unit tests that verify that encoding is kept consistent? This would be nice to add, if not. Probably a good example would be dataset saved with scale/offset encoding, where the new dataset to be appended does not have any encoding provided. We could verify that probably scaled values are read back from disk. |
||
| ds = open_zarr(self.ds.store, auto_chunk=False) | ||
| variables_with_encoding = OrderedDict() | ||
| for vn in existing_variables: | ||
| variables_with_encoding[vn] = variables[vn] | ||
| variables_with_encoding[vn].encoding = ds[vn].encoding | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This modifies an argument that was passed into the function in-place, which in general should be avoided due to unexpected side effects in other parts of the code. It would be better to shallow copy the |
||
| variables_with_encoding, _ = self.encode(variables_with_encoding, | ||
| {}) | ||
| variables_encoded.update(variables_with_encoding) | ||
|
|
||
| self.set_attributes(attributes) | ||
| self.set_dimensions(variables_encoded, unlimited_dims=unlimited_dims) | ||
| self.set_variables(variables_encoded, check_encoding_set, writer, | ||
| unlimited_dims=unlimited_dims) | ||
|
|
||
| def sync(self): | ||
| pass | ||
|
|
||
| def set_variables(self, variables, check_encoding_set, writer, | ||
| unlimited_dims=None): | ||
| """ | ||
| This provides a centralized method to set the variables on the data | ||
| store. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| variables : dict-like | ||
| Dictionary of key/value (variable name / xr.Variable) pairs | ||
| check_encoding_set : list-like | ||
| List of variables that should be checked for invalid encoding | ||
| values | ||
| writer : | ||
| unlimited_dims : list-like | ||
| List of dimension names that should be treated as unlimited | ||
| dimensions. | ||
| """ | ||
| for vn, v in variables.items(): | ||
| name = _encode_variable_name(vn) | ||
| check = vn in check_encoding_set | ||
| attrs = v.attrs.copy() | ||
| dims = v.dims | ||
| dtype = v.dtype | ||
| shape = v.shape | ||
|
|
||
| fill_value = attrs.pop('_FillValue', None) | ||
| if v.encoding == {'_FillValue': None} and fill_value is None: | ||
| v.encoding = {} | ||
| if name in self.ds: | ||
| # append to existing variable | ||
| zarr_array = self.ds[name] | ||
| if self.append_dim is None: | ||
| raise ValueError( | ||
| 'dimension being appended is unknown; ' | ||
| 'did you forget to call to_zarr with append_dim ' | ||
| 'argument?') | ||
|
||
| if self.append_dim in dims: | ||
| # this is the DataArray that has append_dim as a | ||
| # dimension | ||
| append_axis = dims.index(self.append_dim) | ||
| new_shape = list(zarr_array.shape) | ||
| new_shape[append_axis] += v.shape[append_axis] | ||
| new_region = [slice(None)] * len(new_shape) | ||
| new_region[append_axis] = slice( | ||
| zarr_array.shape[append_axis], | ||
| None | ||
| ) | ||
| zarr_array.resize(new_shape) | ||
| writer.add(v.data, zarr_array, | ||
| region=tuple(new_region)) | ||
| else: | ||
| # new variable | ||
| encoding = _extract_zarr_variable_encoding( | ||
| v, raise_on_invalid=check) | ||
| encoded_attrs = OrderedDict() | ||
| # the magic for storing the hidden dimension data | ||
| encoded_attrs[_DIMENSION_KEY] = dims | ||
| for k2, v2 in attrs.items(): | ||
| encoded_attrs[k2] = self.encode_attribute(v2) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we pulled this attribute encoding out before the |
||
|
|
||
| zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
| fill_value=fill_value, **encoding) | ||
| zarr_array.attrs.put(encoded_attrs) | ||
| writer.add(v.data, zarr_array) | ||
|
|
||
| def close(self): | ||
| if self._consolidate_on_close: | ||
| import zarr | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1322,7 +1322,8 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None, | |
| compute=compute) | ||
|
|
||
| def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, | ||
| encoding=None, compute=True, consolidated=False): | ||
| encoding=None, compute=True, consolidated=False, | ||
| append_dim=None): | ||
| """Write dataset contents to a zarr group. | ||
|
|
||
| .. note:: Experimental | ||
|
|
@@ -1333,9 +1334,10 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, | |
| ---------- | ||
| store : MutableMapping or str, optional | ||
| Store or path to directory in file system. | ||
| mode : {'w', 'w-'} | ||
| mode : {'w', 'w-', 'a'} | ||
| Persistence mode: 'w' means create (overwrite if exists); | ||
| 'w-' means create (fail if exists). | ||
| 'w-' means create (fail if exists); | ||
| 'a' means append (create if does not exist). | ||
| synchronizer : object, optional | ||
| Array synchronizer | ||
| group : str, obtional | ||
|
|
@@ -1350,21 +1352,23 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, | |
| consolidated: bool, optional | ||
| If True, apply zarr's `consolidate_metadata` function to the store | ||
| after writing. | ||
| append_dim: str, optional | ||
| If mode='a', the dimension on which the data will be appended. | ||
|
|
||
| References | ||
| ---------- | ||
| https://zarr.readthedocs.io/ | ||
| """ | ||
| if encoding is None: | ||
| encoding = {} | ||
| if mode not in ['w', 'w-']: | ||
| # TODO: figure out how to handle 'r+' and 'a' | ||
| if mode not in ['w', 'w-', 'a']: | ||
| # TODO: figure out how to handle 'r+' | ||
| raise ValueError("The only supported options for mode are 'w' " | ||
| "and 'w-'.") | ||
|
||
| from ..backends.api import to_zarr | ||
| return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, | ||
| group=group, encoding=encoding, compute=compute, | ||
| consolidated=consolidated) | ||
| consolidated=consolidated, append_dim=append_dim) | ||
|
|
||
| def __repr__(self): | ||
| return formatting.dataset_repr(self) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we possibly get an example of using append here?