From b7571ab4b0d9b67b767c55db400b4ffac0f752f1 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 5 Oct 2023 17:20:11 +0200 Subject: [PATCH 01/21] Fix `array.values` handling in array cast/embed --- src/datasets/table.py | 48 +++++++++++++++++++++++++++++-------------- tests/test_table.py | 5 +++++ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index e021dea1092..c00e0261019 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1877,7 +1877,7 @@ def array_concat(arrays: List[pa.Array]): array_type = arrays[0].type arrays = [chunk for arr in arrays for chunk in (arr.chunks if isinstance(arr, pa.ChunkedArray) else (arr,))] - if not _is_extension_type(array_type): + if config.PYARROW_VERSION.major >= 12 or not _is_extension_type(array_type): return pa.concat_arrays(arrays) def _offsets_concat(offsets): @@ -1916,15 +1916,24 @@ def _concat_arrays(arrays): _concat_arrays([array.values for array in arrays]), ) elif pa.types.is_fixed_size_list(array_type): - if config.PYARROW_VERSION.major < 14: + if config.PYARROW_VERSION.major < 10: # PyArrow bug: https://github.com/apache/arrow/issues/35360 return pa.FixedSizeListArray.from_arrays( - _concat_arrays([array.values[array.offset * array.type.list_size :] for array in arrays]), + _concat_arrays( + [ + array.values[ + array.offset + * array.type.list_size : (array.offset + len(array)) + * array.type.list_size + ] + for array in arrays + ] + ), array_type.list_size, ) else: return pa.FixedSizeListArray.from_arrays( - _concat_arrays([array.values for array in arrays]), + _concat_arrays([array.flatten() for array in arrays]), array_type.value_type, array_type.list_size, ) @@ -1992,10 +2001,13 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): ) return pa.ListArray.from_arrays(array.offsets, _c(array.values, pa_type.value_type)) elif pa.types.is_fixed_size_list(array.type): - array_values = array.values - if config.PYARROW_VERSION.major < 14: + if config.PYARROW_VERSION.major < 10: # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[array.offset * array.type.list_size :] + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + else: + array_values = array.flatten() if pa.types.is_fixed_size_list(pa_type): return pa.FixedSizeListArray.from_arrays( _c(array_values, pa_type.value_type), @@ -2109,10 +2121,13 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ return pa.ListArray.from_arrays(array.offsets, _c(array.values, feature.feature)) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - array_values = array.values - if config.PYARROW_VERSION.major < 14: + if config.PYARROW_VERSION.major < 10: # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[array.offset * array.type.list_size :] + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + else: + array_values = array.flatten() if isinstance(feature, list): if array.null_count > 0: if config.PYARROW_VERSION.major < 10: @@ -2200,8 +2215,8 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): if feature.length * len(array) == len(array.values): return pa.FixedSizeListArray.from_arrays(_e(array.values, feature.feature), feature.length) else: - casted_values = _e(array.values, feature.feature) - if casted_values.type == array.values.type: + embedded_values = _e(array.values, feature.feature) + if embedded_values.type == array.values.type: return array else: if array.null_count > 0: @@ -2216,10 +2231,13 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature.feature)) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - array_values = array.values - if config.PYARROW_VERSION.major < 14: + if config.PYARROW_VERSION.major < 10: # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[array.offset * array.type.list_size :] + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + else: + array_values = array.flatten() if isinstance(feature, list): if array.null_count > 0: if config.PYARROW_VERSION.major < 10: diff --git a/tests/test_table.py b/tests/test_table.py index b20e509d1b8..13e235dfe9f 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -1203,6 +1203,11 @@ def test_cast_fixed_size_array_to_features_sequence(): def test_cast_sliced_fixed_size_array_to_features(): arr = pa.array([[0, 1, 2], [3, 4, 5], [6, 7, 8]], pa.list_(pa.int32(), 3)) + # arr.offset not set + casted_array = cast_array_to_feature(arr[:2], Sequence(Value("int64"), length=3)) + assert casted_array.type == pa.list_(pa.int64(), 3) + assert casted_array.to_pylist() == arr[:2].to_pylist() + # arr.offset set casted_array = cast_array_to_feature(arr[1:], Sequence(Value("int64"), length=3)) assert casted_array.type == pa.list_(pa.int64(), 3) assert casted_array.to_pylist() == arr[1:].to_pylist() From feb1c1a06aa1bc1a512912518eab40e14051714b Mon Sep 17 00:00:00 2001 From: mariosasko Date: Fri, 6 Oct 2023 02:14:57 +0200 Subject: [PATCH 02/21] Fix fixed size array with nulls cast --- src/datasets/table.py | 142 ++++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 69 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index c00e0261019..d4f9b8ecb8d 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -276,11 +276,7 @@ def to_pylist(self, *args, **kwargs): Returns: `list` """ - try: - return self.table.to_pylist(*args, **kwargs) - except AttributeError: # pyarrow <7 does not have to_pylist, so we use to_pydict - pydict = self.table.to_pydict(*args, **kwargs) - return [{k: pydict[k][i] for k in pydict} for i in range(len(self.table))] + return self.table.to_pylist(*args, **kwargs) def to_pandas(self, *args, **kwargs): """ @@ -1916,27 +1912,16 @@ def _concat_arrays(arrays): _concat_arrays([array.values for array in arrays]), ) elif pa.types.is_fixed_size_list(array_type): - if config.PYARROW_VERSION.major < 10: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - return pa.FixedSizeListArray.from_arrays( - _concat_arrays( - [ - array.values[ - array.offset - * array.type.list_size : (array.offset + len(array)) - * array.type.list_size - ] - for array in arrays - ] - ), - array_type.list_size, - ) - else: - return pa.FixedSizeListArray.from_arrays( - _concat_arrays([array.flatten() for array in arrays]), - array_type.value_type, - array_type.list_size, - ) + array_values = [ + array.values[array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size] + for array in arrays + ] + return pa.Array.from_buffers( + array_type, + sum(len(array) for array in arrays), + [_concat_arrays([array.is_valid() for array in arrays]).buffers()[1]], + children=[_concat_arrays(array_values)], + ) return pa.concat_arrays(arrays) return _concat_arrays(arrays) @@ -1985,9 +1970,12 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): elif pa.types.is_list(array.type): if pa.types.is_fixed_size_list(pa_type): if pa_type.list_size * len(array) == len(array.values): - return pa.FixedSizeListArray.from_arrays( - _c(array.values, pa_type.value_type), - pa_type.list_size, + # FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 + return pa.Array.from_buffers( + pa_type, + len(array), + [array.is_valid().buffers()[1]], + children=[_c(array_values, pa_type.value_type)], ) elif pa.types.is_list(pa_type): if array.null_count > 0: @@ -2001,24 +1989,20 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): ) return pa.ListArray.from_arrays(array.offsets, _c(array.values, pa_type.value_type)) elif pa.types.is_fixed_size_list(array.type): - if config.PYARROW_VERSION.major < 10: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[ - array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] - else: - array_values = array.flatten() + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if pa.types.is_fixed_size_list(pa_type): - return pa.FixedSizeListArray.from_arrays( - _c(array_values, pa_type.value_type), - pa_type.list_size, + # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 + return pa.Array.from_buffers( + pa_type, len(array), [array.is_valid().buffers()[1]], children=[_c(array_values, pa_type.value_type)] ) elif pa.types.is_list(pa_type): - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if array.null_count > 0: if config.PYARROW_VERSION.major < 10: warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {pa_type}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." + f"None values are converted to empty lists of size {array.type.list_size} in `pyarrow<10.0.0` when converting array to {pa_type}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." ) else: return pa.ListArray.from_arrays( @@ -2103,7 +2087,14 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ elif isinstance(feature, Sequence): if feature.length > -1: if feature.length * len(array) == len(array.values): - return pa.FixedSizeListArray.from_arrays(_c(array.values, feature.feature), feature.length) + # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 + casted_values = _c(array_values, feature.feature) + return pa.Array.from_buffers( + pa.list_(casted_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[casted_values], + ) else: casted_values = _c(array.values, feature.feature) if casted_values.type == array.values.type: @@ -2121,32 +2112,35 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ return pa.ListArray.from_arrays(array.offsets, _c(array.values, feature.feature)) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - if config.PYARROW_VERSION.major < 10: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[ - array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] - else: - array_values = array.flatten() + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if isinstance(feature, list): if array.null_count > 0: if config.PYARROW_VERSION.major < 10: warnings.warn( - f"None values are converted to empty lists when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" + f"None values are converted to empty lists of size {array.type.list_size} when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" ) else: - return pa.ListArray.from_arrays(array.offsets, _c(array_values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, _c(array_values, feature[0])) + return pa.ListArray.from_arrays(offsets_arr, _c(array_values, feature[0]), mask=array.is_null()) + return pa.ListArray.from_arrays(offsets_arr, _c(array_values, feature[0])) elif isinstance(feature, Sequence): if feature.length > -1: if feature.length * len(array) == len(array_values): - return pa.FixedSizeListArray.from_arrays(_c(array_values, feature.feature), feature.length) + # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 + casted_values = _c(array_values, feature.feature) + return pa.Array.from_buffers( + pa.list_(casted_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[casted_values], + ) else: - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if array.null_count > 0: if config.PYARROW_VERSION.major < 10: warnings.warn( - f"None values are converted to empty lists when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" + f"None values are converted to empty lists of size {array.type.list_size} when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" ) else: return pa.ListArray.from_arrays( @@ -2213,7 +2207,14 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): elif isinstance(feature, Sequence): if feature.length > -1: if feature.length * len(array) == len(array.values): - return pa.FixedSizeListArray.from_arrays(_e(array.values, feature.feature), feature.length) + # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 + embedded_values = _e(array_values, feature.feature) + return pa.Array.from_buffers( + pa.list_(embedded_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[embedded_values], + ) else: embedded_values = _e(array.values, feature.feature) if embedded_values.type == array.values.type: @@ -2231,32 +2232,35 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature.feature)) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - if config.PYARROW_VERSION.major < 10: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[ - array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] - else: - array_values = array.flatten() + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if isinstance(feature, list): if array.null_count > 0: if config.PYARROW_VERSION.major < 10: warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" + f"None values are converted to empty lists of size {array.type.list_size} when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" ) else: - return pa.ListArray.from_arrays(array.offsets, _e(array_values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, _e(array_values, feature[0])) + return pa.ListArray.from_arrays(offsets_arr, _e(array_values, feature[0]), mask=array.is_null()) + return pa.ListArray.from_arrays(offsets_arr, _e(array_values, feature[0])) elif isinstance(feature, Sequence): if feature.length > -1: if feature.length * len(array) == len(array_values): - return pa.FixedSizeListArray.from_arrays(_e(array_values, feature.feature), feature.length) + # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 + embedded_values = _e(array_values, feature.feature) + return pa.Array.from_buffers( + pa.list_(embedded_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[embedded_values], + ) else: - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if array.null_count > 0: if config.PYARROW_VERSION.major < 10: warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" + f"None values are converted to empty lists of size {array.type.list_size} when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" ) else: return pa.ListArray.from_arrays( From 0b2ad109a46d4013c54791d7966a414f1fe3dee5 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 5 Dec 2023 19:46:19 +0100 Subject: [PATCH 03/21] Bump PyArrow to version 12.0.0 --- .github/workflows/ci.yml | 2 +- setup.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 298b306d308..311f44a9259 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,7 @@ jobs: run: pip install --upgrade pyarrow huggingface-hub dill - name: Install depencencies (minimum versions) if: ${{ matrix.deps_versions != 'deps-latest' }} - run: pip install pyarrow==8.0.0 huggingface-hub==0.14.0 transformers dill==0.3.1.1 + run: pip install pyarrow==12.0.0 huggingface-hub==0.14.0 transformers dill==0.3.1.1 - name: Test with pytest run: | python -m pytest -rfExX -m ${{ matrix.test }} -n 2 --dist loadfile -sv ./tests/ diff --git a/setup.py b/setup.py index c2c1cb9b29e..cce3abec3c0 100644 --- a/setup.py +++ b/setup.py @@ -110,8 +110,8 @@ # We use numpy>=1.17 to have np.random.Generator (Dataset shuffling) "numpy>=1.17", # Backend and serialization. - # Minimum 8.0.0 to be able to use .to_reader() - "pyarrow>=8.0.0", + # Minimum 12.0.0 to be able to concatenate extension arrays + "pyarrow>=12.0.0", # For smart caching dataset processing "dill>=0.3.0,<0.3.8", # tmp pin until dill has official support for determinism see https://github.com/uqfoundation/dill/issues/19 # For performance gains with apache arrow From fda0d314b0606f12f3c0f488bd671d46ae1660ab Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 5 Dec 2023 19:46:47 +0100 Subject: [PATCH 04/21] Fix cast/embed --- src/datasets/arrow_writer.py | 9 +- src/datasets/table.py | 401 +++++++++++++---------------------- tests/test_table.py | 133 +++++------- 3 files changed, 204 insertions(+), 339 deletions(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 87aeb9124c2..4fca73a499f 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -38,7 +38,7 @@ from .filesystems import is_remote_filesystem from .info import DatasetInfo from .keyhash import DuplicatedKeysError, KeyHasher -from .table import array_cast, array_concat, cast_array_to_feature, embed_table_storage, table_cast +from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast from .utils import logging from .utils.file_utils import hash_url_to_filename from .utils.py_utils import asdict, first_non_null_value @@ -439,7 +439,12 @@ def write_examples_on_file(self): # This can happen in `.map()` when we want to re-write the same Arrow data if all(isinstance(row[0][col], (pa.Array, pa.ChunkedArray)) for row in self.current_examples): arrays = [row[0][col] for row in self.current_examples] - batch_examples[col] = array_concat(arrays) + arrays = [ + chunk + for array in arrays + for chunk in (array.chunks if isinstance(array, pa.ChunkedArray) else [array]) + ] + batch_examples[col] = pa.concat_arrays(arrays) else: batch_examples[col] = [ row[0][col].to_pylist()[0] if isinstance(row[0][col], (pa.Array, pa.ChunkedArray)) else row[0][col] diff --git a/src/datasets/table.py b/src/datasets/table.py index d4f9b8ecb8d..430c387d1e8 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1,7 +1,6 @@ import copy import os import tempfile -import warnings from functools import partial from itertools import groupby from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Tuple, TypeVar, Union @@ -21,6 +20,30 @@ logger = get_logger(__name__) +_IS_FLATTEN_WITH_NULL_ARRAY_SUPPORTED = config.PYARROW_VERSION.major > 12 +_IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED = config.PYARROW_VERSION.major > 13 + + +def _list_array_flatten(array: Union[pa.ListArray, pa.FixedSizeListArray]) -> pa.Array: + if _IS_FLATTEN_WITH_NULL_ARRAY_SUPPORTED and pc.all(array.is_null()).as_py(): + return array.values[:0] + else: + return array.flatten() + + +def _list_array_to_fixed_size_list_values(array: pa.ListArray, list_size: int) -> pa.Array: + array_offsets = np.array(array.offsets) + offsets_pair_iter = iter(zip(array_offsets[:-1], array_offsets[1:])) + array_values = array.values[:0] + for is_value_valid in np.array(array.is_valid()): + start, end = next(offsets_pair_iter) + if is_value_valid: + array_values = pa.concat_arrays([array_values, array.values[start:end]]) + else: + array_values = pa.concat_arrays([array_values, pa.nulls(list_size, array_values.type)]) + return array_values + + def inject_arrow_table_documentation(arrow_table_method): def wrapper(fn): fn.__doc__ = arrow_table_method.__doc__ + (fn.__doc__ if fn.__doc__ is not None else "") @@ -1831,102 +1854,6 @@ def wrapper(array, *args, **kwargs): return wrapper -def _is_extension_type(pa_type: pa.DataType) -> bool: - """ - Check (recursively) if a pyarrow type is an extension type. - """ - if isinstance(pa_type, pa.StructType): - return any(_is_extension_type(field.type) for field in pa_type) - elif isinstance(pa_type, (pa.ListType, pa.FixedSizeListType, pa.LargeListType)): - return _is_extension_type(pa_type.value_type) - elif isinstance(pa_type, pa.ExtensionType): - return True - else: - return False - - -def array_concat(arrays: List[pa.Array]): - """Improved version of pa.concat_arrays - - It supports concatenating pa.ExtensionArray objects by concatenating the underlying storages. - - Args: - arrays (List[pa.Array]): List of arrays to contatenate - - Raises: - pa.ArrowInvalid: if the arrow array concatenation fails - ValueError: if the list of arrays is empty - TypeError: if the arrays to be concatenated have different types - - Returns: - array (:obj:`pyarrow.Array`): the concatenated array - """ - arrays = list(arrays) - array_types = {array.type for array in arrays} - - if not array_types: - raise ValueError("Couldn't concatenate empty list of arrays") - if len(array_types) > 1: - array_types = list(array_types) - raise TypeError(f"Couldn't concatenate arrays with different types {array_types[0]} and {array_types[1]}") - - array_type = arrays[0].type - arrays = [chunk for arr in arrays for chunk in (arr.chunks if isinstance(arr, pa.ChunkedArray) else (arr,))] - - if config.PYARROW_VERSION.major >= 12 or not _is_extension_type(array_type): - return pa.concat_arrays(arrays) - - def _offsets_concat(offsets): - offset = offsets[0] - concatenated_offsets = offset - for offset in offsets[1:]: - offset = pc.subtract(offset, offset[0]) - offset = pc.add(offset[1:], concatenated_offsets[-1]) - concatenated_offsets = pa.concat_arrays([concatenated_offsets, offset]) - return concatenated_offsets - - def _concat_arrays(arrays): - array_type = arrays[0].type - if isinstance(array_type, pa.PyExtensionType): - return array_type.wrap_array(_concat_arrays([array.storage for array in arrays])) - elif pa.types.is_struct(array_type): - return pa.StructArray.from_arrays( - [_concat_arrays([array.field(field.name) for array in arrays]) for field in array_type], - fields=list(array_type), - mask=pa.concat_arrays([array.is_null() for array in arrays]), - ) - elif pa.types.is_list(array_type): - if any(array.null_count > 0 for array in arrays): - if config.PYARROW_VERSION.major < 10: - warnings.warn( - "None values are converted to empty lists in `pyarrow<10.0.0` when concatenating list arrays with None values. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays( - _offsets_concat([array.offsets for array in arrays]), - _concat_arrays([array.values for array in arrays]), - mask=pa.concat_arrays([array.is_null() for array in arrays]), - ) - return pa.ListArray.from_arrays( - _offsets_concat([array.offsets for array in arrays]), - _concat_arrays([array.values for array in arrays]), - ) - elif pa.types.is_fixed_size_list(array_type): - array_values = [ - array.values[array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size] - for array in arrays - ] - return pa.Array.from_buffers( - array_type, - sum(len(array) for array in arrays), - [_concat_arrays([array.is_valid() for array in arrays]).buffers()[1]], - children=[_concat_arrays(array_values)], - ) - return pa.concat_arrays(arrays) - - return _concat_arrays(arrays) - - @_wrap_for_chunked_arrays def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): """Improved version of `pa.Array.cast` @@ -1969,8 +1896,40 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): return pa.StructArray.from_arrays(arrays, fields=list(pa_type), mask=array.is_null()) elif pa.types.is_list(array.type): if pa.types.is_fixed_size_list(pa_type): - if pa_type.list_size * len(array) == len(array.values): - # FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 + if pc.all(pc.equal(array.value_lengths(), pa_type.list_size)).as_py(): + if array.null_count > 0: + # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 + if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: + # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) + array_values = array.cast(pa.list_(array.type.value_type, pa_type.list_size)).values + else: + array_values = _list_array_to_fixed_size_list_values(array, pa_type.list_size) + return pa.Array.from_buffers( + pa_type, + len(array), + [array.is_valid().buffers()[1]], + children=[_c(array_values, pa_type.value_type)], + ) + else: + array_values = _list_array_flatten(array) + return pa.FixedSizeListArray.from_arrays(_c(array_values, pa_type.value_type), pa_type.list_size) + elif pa.types.is_list(pa_type): + # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.offset: + array_offsets = np.array(array.offsets) + array_values = _list_array_flatten(array) + array_offsets -= array_offsets[0] + else: + array_offsets = array.offsets + array_values = array.values + return pa.ListArray.from_arrays(array_offsets, _c(array_values, pa_type.value_type), mask=array.is_null()) + elif pa.types.is_fixed_size_list(array.type): + if pa.types.is_fixed_size_list(pa_type): + if pa_type.list_size == array.type.list_size: + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] # `array.values` preserves the null arrays unlike `array.flatten()` (we cannot use offsets here to define valid members) + # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 return pa.Array.from_buffers( pa_type, len(array), @@ -1978,37 +1937,10 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): children=[_c(array_values, pa_type.value_type)], ) elif pa.types.is_list(pa_type): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {pa_type}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays( - array.offsets, _c(array.values, pa_type.value_type), mask=array.is_null() - ) - return pa.ListArray.from_arrays(array.offsets, _c(array.values, pa_type.value_type)) - elif pa.types.is_fixed_size_list(array.type): - array_values = array.values[ - array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) - if pa.types.is_fixed_size_list(pa_type): - # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 - return pa.Array.from_buffers( - pa_type, len(array), [array.is_valid().buffers()[1]], children=[_c(array_values, pa_type.value_type)] - ) - elif pa.types.is_list(pa_type): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists of size {array.type.list_size} in `pyarrow<10.0.0` when converting array to {pa_type}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays( - offsets_arr, _c(array_values, pa_type.value_type), mask=array.is_null() - ) - return pa.ListArray.from_arrays(offsets_arr, _c(array_values, pa_type.value_type)) + array_values = _list_array_flatten(array) + array_offsets = np.concatenate([[0], array.is_valid()]) * array.type.list_size + array_offsets = np.cumsum(array_offsets, dtype=np.int32) + return pa.ListArray.from_arrays(array_offsets, _c(array_values, pa_type.value_type), mask=array.is_null()) else: if ( not allow_number_to_str @@ -2072,81 +2004,80 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) if isinstance(feature, list): - casted_values = _c(array.values, feature[0]) - if casted_values.type == array.values.type: + # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.offset: + array_offsets = np.array(array.offsets) + array_values = _list_array_flatten(array) + array_offsets -= array_offsets[0] + else: + array_offsets = array.offsets + array_values = array.values + c_array_values = _c(array_values, feature[0]) + if c_array_values.type == array_values.type: return array else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays(array.offsets, casted_values, mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, casted_values) + return pa.ListArray.from_arrays(array_offsets, c_array_values, mask=array.is_null()) elif isinstance(feature, Sequence): if feature.length > -1: - if feature.length * len(array) == len(array.values): - # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 - casted_values = _c(array_values, feature.feature) - return pa.Array.from_buffers( - pa.list_(casted_values.type, feature.length), - len(array), - [array.is_valid().buffers()[1]], - children=[casted_values], - ) + if pc.all(pc.equal(array.value_lengths(), feature.length)).as_py(): + if array.null_count > 0: + # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 + if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: + # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) + array_values = array.cast(pa.list_(array.type.value_type, feature.length)).values + else: + # import pdb; pdb.set_trace() + array_values = _list_array_to_fixed_size_list_values(array, feature.length) + c_array_values = _c(array_values, feature.feature) + return pa.Array.from_buffers( + pa.list_(c_array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[c_array_values], + ) + else: + array_values = _list_array_flatten(array) + return pa.FixedSizeListArray.from_arrays(_c(array_values, feature.feature), feature.length) else: - casted_values = _c(array.values, feature.feature) - if casted_values.type == array.values.type: + # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.offset: + array_offsets = np.array(array.offsets) + array_values = _list_array_flatten(array) + array_offsets -= array_offsets[0] + else: + array_offsets = array.offsets + array_values = array.values + c_array_values = _c(array_values, feature.feature) + if c_array_values.type == array_values.type: return array else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays( - array.offsets, _c(array.values, feature.feature), mask=array.is_null() - ) - return pa.ListArray.from_arrays(array.offsets, _c(array.values, feature.feature)) + return pa.ListArray.from_arrays(array_offsets, c_array_values, mask=array.is_null()) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - array_values = array.values[ - array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) if isinstance(feature, list): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists of size {array.type.list_size} when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays(offsets_arr, _c(array_values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(offsets_arr, _c(array_values, feature[0])) + array_values = _list_array_flatten(array) + array_offsets = np.concatenate([[0], array.is_valid()]) * array.type.list_size + array_offsets = np.cumsum(array_offsets, dtype=np.int32) + return pa.ListArray.from_arrays(array_offsets, _c(array_values, feature[0]), mask=array.is_null()) elif isinstance(feature, Sequence): if feature.length > -1: - if feature.length * len(array) == len(array_values): - # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 - casted_values = _c(array_values, feature.feature) + if feature.length == array.type.list_size: + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] # `array.values` preserves the null arrays unlike `array.flatten()` (we cannot use offsets here to define valid members) + # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 + c_array_values = _c(array_values, feature.feature) return pa.Array.from_buffers( - pa.list_(casted_values.type, feature.length), + pa.list_(c_array_values.type, feature.length), len(array), [array.is_valid().buffers()[1]], - children=[casted_values], + children=[c_array_values], ) else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists of size {array.type.list_size} when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays( - offsets_arr, _c(array_values, feature.feature), mask=array.is_null() - ) - return pa.ListArray.from_arrays(offsets_arr, _c(array_values, feature.feature)) + array_values = _list_array_flatten(array) + array_offsets = np.concatenate([[0], array.is_valid()]) * array.type.list_size + array_offsets = np.cumsum(array_offsets, dtype=np.int32) + return pa.ListArray.from_arrays(array_offsets, _c(array_values, feature.feature), mask=array.is_null()) if pa.types.is_null(array.type): return array_cast(array, get_nested_type(feature), allow_number_to_str=allow_number_to_str) elif not isinstance(feature, (Sequence, dict, list, tuple)): @@ -2195,78 +2126,32 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): return pa.StructArray.from_arrays(arrays, names=list(feature), mask=array.is_null()) elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) + # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.offset: + array_offsets = np.array(array.offsets) + array_values = _list_array_flatten(array) + array_offsets -= array_offsets[0] + else: + array_offsets = array.offsets + array_values = array.values if isinstance(feature, list): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature[0])) - elif isinstance(feature, Sequence): - if feature.length > -1: - if feature.length * len(array) == len(array.values): - # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 - embedded_values = _e(array_values, feature.feature) - return pa.Array.from_buffers( - pa.list_(embedded_values.type, feature.length), - len(array), - [array.is_valid().buffers()[1]], - children=[embedded_values], - ) - else: - embedded_values = _e(array.values, feature.feature) - if embedded_values.type == array.values.type: - return array - else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays( - array.offsets, _e(array.values, feature.feature), mask=array.is_null() - ) - return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature.feature)) + return pa.ListArray.from_arrays(array_offsets, _e(array_values, feature[0]), mask=array.is_null()) + if isinstance(feature, Sequence) and feature.length == -1: + return pa.ListArray.from_arrays(array_offsets, _e(array_values, feature.feature), mask=array.is_null()) elif pa.types.is_fixed_size_list(array.type): - # feature must be either [subfeature] or Sequence(subfeature) - array_values = array.values[ - array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) - if isinstance(feature, list): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists of size {array.type.list_size} when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays(offsets_arr, _e(array_values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(offsets_arr, _e(array_values, feature[0])) - elif isinstance(feature, Sequence): - if feature.length > -1: - if feature.length * len(array) == len(array_values): - # FixedSizeListArray still does not support mask: https://github.com/apache/arrow/issues/34316 - embedded_values = _e(array_values, feature.feature) - return pa.Array.from_buffers( - pa.list_(embedded_values.type, feature.length), - len(array), - [array.is_valid().buffers()[1]], - children=[embedded_values], - ) - else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists of size {array.type.list_size} when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays( - offsets_arr, _e(array_values, feature.feature), mask=array.is_null() - ) - return pa.ListArray.from_arrays(offsets_arr, _e(array_values, feature.feature)) + # feature must be Sequence(subfeature) + if isinstance(feature, Sequence) and feature.length > -1: + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] # `array.values` preserves the null arrays unlike `array.flatten()` (we cannot use offsets here to define valid members) + # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 + e_array_values = _e(array_values, feature.feature) + return pa.Array.from_buffers( + pa.list_(array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[e_array_values], + ) if not isinstance(feature, (Sequence, dict, list, tuple)): return array raise TypeError(f"Couldn't embed array of type\n{array.type}\nwith\n{feature}") diff --git a/tests/test_table.py b/tests/test_table.py index 13e235dfe9f..5a06c57b3d6 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -1,15 +1,13 @@ import copy import pickle -import warnings from typing import List, Union import numpy as np import pyarrow as pa import pytest -import datasets from datasets import Sequence, Value -from datasets.features.features import Array2DExtensionType, ClassLabel, Features, Image +from datasets.features.features import ClassLabel, Features, Image, get_nested_type from datasets.table import ( ConcatenationTable, InMemoryTable, @@ -19,9 +17,7 @@ _in_memory_arrow_table_from_buffer, _in_memory_arrow_table_from_file, _interpolation_search, - _is_extension_type, _memory_mapped_arrow_table_from_file, - array_concat, cast_array_to_feature, concat_tables, embed_array_storage, @@ -1081,35 +1077,6 @@ def test_indexed_table_mixin(): assert table.fast_slice(2, 13) == pa_table.slice(2, 13) -@pytest.mark.parametrize( - "arrays", - [ - [pa.array([[1, 2, 3, 4]]), pa.array([[10, 2]])], - [ - pa.array([[[1, 2], [3]]], pa.list_(pa.list_(pa.int32()), 2)), - pa.array([[[10, 2, 3], [2]]], pa.list_(pa.list_(pa.int32()), 2)), - ], - [pa.array([[[1, 2, 3]], [[2, 3], [20, 21]], [[4]]]).slice(1), pa.array([[[1, 2, 3]]])], - ], -) -def test_concat_arrays(arrays): - assert array_concat(arrays) == pa.concat_arrays(arrays) - - -def test_concat_arrays_nested_with_nulls(): - arrays = [pa.array([{"a": 21, "b": [[1, 2], [3]]}]), pa.array([{"a": 100, "b": [[1], None]}])] - concatenated_arrays = array_concat(arrays) - assert concatenated_arrays == pa.array([{"a": 21, "b": [[1, 2], [3]]}, {"a": 100, "b": [[1], None]}]) - - -def test_concat_extension_arrays(): - arrays = [pa.array([[[1, 2], [3, 4]]]), pa.array([[[10, 2], [3, 4]]])] - extension_type = Array2DExtensionType((2, 2), "int64") - assert array_concat([extension_type.wrap_array(array) for array in arrays]) == extension_type.wrap_array( - pa.concat_arrays(arrays) - ) - - def test_cast_array_to_features(): arr = pa.array([[0, 1]]) assert cast_array_to_feature(arr, Sequence(Value("string"))).type == pa.list_(pa.string()) @@ -1130,28 +1097,17 @@ def test_cast_array_to_features_to_nested_with_no_fields(): assert cast_array_to_feature(arr, {}).to_pylist() == arr.to_pylist() -def test_cast_array_to_features_nested_with_null_values(): +def test_cast_array_to_features_nested_with_nulls(): # same type arr = pa.array([{"foo": [None, [0]]}], pa.struct({"foo": pa.list_(pa.list_(pa.int64()))})) casted_array = cast_array_to_feature(arr, {"foo": [[Value("int64")]]}) assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int64()))}) assert casted_array.to_pylist() == arr.to_pylist() - # different type arr = pa.array([{"foo": [None, [0]]}], pa.struct({"foo": pa.list_(pa.list_(pa.int64()))})) - if datasets.config.PYARROW_VERSION.major < 10: - with pytest.warns(UserWarning, match="None values are converted to empty lists.+"): - casted_array = cast_array_to_feature(arr, {"foo": [[Value("int32")]]}) - assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int32()))}) - assert casted_array.to_pylist() == [ - {"foo": [[], [0]]} - ] # empty list because of https://github.com/huggingface/datasets/issues/3676 - else: - with warnings.catch_warnings(): - warnings.simplefilter("error") - casted_array = cast_array_to_feature(arr, {"foo": [[Value("int32")]]}) - assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int32()))}) - assert casted_array.to_pylist() == [{"foo": [None, [0]]}] + casted_array = cast_array_to_feature(arr, {"foo": [[Value("int32")]]}) + assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int32()))}) + assert casted_array.to_pylist() == [{"foo": [None, [0]]}] def test_cast_array_to_features_to_null_type(): @@ -1189,28 +1145,61 @@ def test_cast_array_to_features_sequence_classlabel(): assert cast_array_to_feature(arr, Sequence(ClassLabel(names=["foo", "bar"]))) -def test_cast_fixed_size_array_to_features_sequence(): - arr = pa.array([[0, 1, 2], [3, 4, 5], [6, 7, 8]], pa.list_(pa.int32(), 3)) +@pytest.mark.parametrize( + "arr", + [ + pa.array([[0, 1, 2], [3, None, 5], None, [6, 7, 8], None], pa.list_(pa.int32(), 3)), + ], +) +@pytest.mark.parametrize("slice", [None, slice(1, None), slice(-1), slice(1, 3), slice(2, 3), slice(1, 1)]) +@pytest.mark.parametrize("target_value_feature", [Value("int64")]) +def test_cast_fixed_size_list_array_to_features_sequence(arr, slice, target_value_feature): + arr = arr if slice is None else arr[slice] # Fixed size list - casted_array = cast_array_to_feature(arr, Sequence(Value("int64"), length=3)) - assert casted_array.type == pa.list_(pa.int64(), 3) + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=arr.type.list_size)) assert casted_array.to_pylist() == arr.to_pylist() # Variable size list - casted_array = cast_array_to_feature(arr, Sequence(Value("int64"))) - assert casted_array.type == pa.list_(pa.int64()) + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature)) assert casted_array.to_pylist() == arr.to_pylist() + casted_array = cast_array_to_feature(arr, [target_value_feature]) + assert casted_array.type == get_nested_type([target_value_feature]) + assert casted_array.to_pylist() == arr.to_pylist() + + with pytest.raises(TypeError): + cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size + 1)) -def test_cast_sliced_fixed_size_array_to_features(): - arr = pa.array([[0, 1, 2], [3, 4, 5], [6, 7, 8]], pa.list_(pa.int32(), 3)) - # arr.offset not set - casted_array = cast_array_to_feature(arr[:2], Sequence(Value("int64"), length=3)) - assert casted_array.type == pa.list_(pa.int64(), 3) - assert casted_array.to_pylist() == arr[:2].to_pylist() - # arr.offset set - casted_array = cast_array_to_feature(arr[1:], Sequence(Value("int64"), length=3)) - assert casted_array.type == pa.list_(pa.int64(), 3) - assert casted_array.to_pylist() == arr[1:].to_pylist() +@pytest.mark.parametrize( + "arr", + [ + pa.array([[0, 1, 2], [3, None, 5], None, [6, 7, 8], None], pa.list_(pa.int32())), + ], +) +@pytest.mark.parametrize("slice", [None, slice(1, None), slice(-1), slice(1, 3), slice(2, 3), slice(1, 1)]) +@pytest.mark.parametrize("target_value_feature", [Value("int64")]) +def test_cast_list_array_to_features_sequence(arr, slice, target_value_feature): + arr = arr if slice is None else arr[slice] + # Variable size list + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature)) + assert casted_array.to_pylist() == arr.to_pylist() + casted_array = cast_array_to_feature(arr, [target_value_feature]) + assert casted_array.type == get_nested_type([target_value_feature]) + assert casted_array.to_pylist() == arr.to_pylist() + # Fixed size list + value_lengths = arr.value_lengths().to_pylist() + value_lengths = [i for i in value_lengths if i is not None] + if len(set(value_lengths)) == 1: + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=value_lengths[0])) + assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=value_lengths[0])) + assert casted_array.to_pylist() == arr.to_pylist() + else: + with pytest.raises(TypeError): + cast_array_to_feature( + arr, Sequence(target_value_feature, length=value_lengths[0] if value_lengths else len(arr)) + ) def test_embed_array_storage(image_file): @@ -1263,17 +1252,3 @@ def test_table_iter(table, batch_size, drop_last_batch): if num_rows > 0: reloaded = pa.concat_tables(subtables) assert table.slice(0, num_rows).to_pydict() == reloaded.to_pydict() - - -@pytest.mark.parametrize( - "pa_type, expected", - [ - (pa.int8(), False), - (pa.struct({"col1": pa.int8(), "col2": pa.int64()}), False), - (pa.struct({"col1": pa.list_(pa.int8()), "col2": Array2DExtensionType((1, 3), "int64")}), True), - (pa.list_(pa.int8()), False), - (pa.list_(Array2DExtensionType((1, 3), "int64"), 4), True), - ], -) -def test_is_extension_type(pa_type, expected): - assert _is_extension_type(pa_type) == expected From 024c02945d111d50db3dcf0c8f76d6304203ac76 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 5 Dec 2023 20:07:13 +0100 Subject: [PATCH 05/21] Remove pdb comment --- src/datasets/table.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index 3911464ec3b..a3d344a89b5 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1992,7 +1992,6 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) array_values = array.cast(pa.list_(array.type.value_type, feature.length)).values else: - # import pdb; pdb.set_trace() array_values = _list_array_to_fixed_size_list_values(array, feature.length) c_array_values = _c(array_values, feature.feature) return pa.Array.from_buffers( From 09b5e154c0d59139c4245ba5e8f0003a4f179b0c Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 19 Dec 2023 19:28:16 +0100 Subject: [PATCH 06/21] Add warnings and some comments --- src/datasets/table.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index a3d344a89b5..948f11ca079 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1,5 +1,6 @@ import copy import os +import warnings from functools import partial from itertools import groupby from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Tuple, TypeVar, Union @@ -19,18 +20,18 @@ logger = get_logger(__name__) -_IS_FLATTEN_WITH_NULL_ARRAY_SUPPORTED = config.PYARROW_VERSION.major > 12 +_IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED = config.PYARROW_VERSION.major > 12 _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED = config.PYARROW_VERSION.major > 13 def _list_array_flatten(array: Union[pa.ListArray, pa.FixedSizeListArray]) -> pa.Array: - if _IS_FLATTEN_WITH_NULL_ARRAY_SUPPORTED and pc.all(array.is_null()).as_py(): - return array.values[:0] + if _IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED and pc.all(array.is_null()).as_py(): + return array.values[:0] # Preserve the type else: return array.flatten() -def _list_array_to_fixed_size_list_values(array: pa.ListArray, list_size: int) -> pa.Array: +def _list_array_to_fixed_size_list_array_values(array: pa.ListArray, list_size: int) -> pa.Array: array_offsets = np.array(array.offsets) offsets_pair_iter = iter(zip(array_offsets[:-1], array_offsets[1:])) array_values = array.values[:0] @@ -1862,14 +1863,19 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): return pa.StructArray.from_arrays(arrays, fields=list(pa_type), mask=array.is_null()) elif pa.types.is_list(array.type): if pa.types.is_fixed_size_list(pa_type): - if pc.all(pc.equal(array.value_lengths(), pa_type.list_size)).as_py(): + if pc.all( + pc.equal(array.value_lengths(), pa_type.list_size) + ).as_py(): # Make sure all the sublists have the same length (non-valid members are ignored) if array.null_count > 0: # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) array_values = array.cast(pa.list_(array.type.value_type, pa_type.list_size)).values else: - array_values = _list_array_to_fixed_size_list_values(array, pa_type.list_size) + warnings.warn( + "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make it fast." + ) + array_values = _list_array_to_fixed_size_list_array_values(array, pa_type.list_size) return pa.Array.from_buffers( pa_type, len(array), @@ -1985,14 +1991,19 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ return pa.ListArray.from_arrays(array_offsets, c_array_values, mask=array.is_null()) elif isinstance(feature, Sequence): if feature.length > -1: - if pc.all(pc.equal(array.value_lengths(), feature.length)).as_py(): + if pc.all( + pc.equal(array.value_lengths(), feature.length) + ).as_py(): # # Make sure all the sublists have the same length (non-valid members are ignored) if array.null_count > 0: # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) array_values = array.cast(pa.list_(array.type.value_type, feature.length)).values else: - array_values = _list_array_to_fixed_size_list_values(array, feature.length) + warnings.warn( + "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make it fast." + ) + array_values = _list_array_to_fixed_size_list_array_values(array, feature.length) c_array_values = _c(array_values, feature.feature) return pa.Array.from_buffers( pa.list_(c_array_values.type, feature.length), From 19ee42bc5352257da38763a54b96bda5bc1c3e6c Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 19 Dec 2023 19:28:29 +0100 Subject: [PATCH 07/21] CI fix --- setup.py | 4 ++-- src/datasets/arrow_writer.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 46cf61142c1..0688e2057eb 100644 --- a/setup.py +++ b/setup.py @@ -166,7 +166,7 @@ "pytest-datadir", "pytest-xdist", # optional dependencies - "apache-beam>=2.26.0,<2.44.0;python_version<'3.10'", # doesn't support recent dill versions for recent python versions + "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions "elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch() "faiss-cpu>=1.6.4", "jax>=0.3.14; sys_platform != 'win32'", @@ -233,7 +233,7 @@ EXTRAS_REQUIRE = { "audio": AUDIO_REQUIRE, "vision": VISION_REQUIRE, - "apache-beam": ["apache-beam>=2.26.0,<2.44.0"], + "apache-beam": ["apache-beam>=2.26.0"], "tensorflow": [ "tensorflow>=2.2.0,!=2.6.0,!=2.6.1; sys_platform != 'darwin' or platform_machine != 'arm64'", "tensorflow-macos; sys_platform == 'darwin' and platform_machine == 'arm64'", diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index ce19c815b72..0f1a786e1af 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -740,8 +740,8 @@ def get_parquet_lengths(sources) -> List[int]: def parquet_to_arrow(source, destination) -> List[int]: """Convert parquet file to arrow file. Inputs can be str paths or file-like objects""" stream = None if isinstance(destination, str) else destination - with ArrowWriter(path=destination, stream=stream) as writer: - parquet_file = pa.parquet.ParquetFile(source) + parquet_file = pa.parquet.ParquetFile(source) + with ArrowWriter(schema=parquet_file.schema_arrow, path=destination, stream=stream) as writer: for record_batch in parquet_file.iter_batches(): pa_table = pa.Table.from_batches([record_batch]) writer.write_table(pa_table) From 1505ce6250ce272015bee9967da1223038f5627f Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 19 Dec 2023 19:30:18 +0100 Subject: [PATCH 08/21] Onemore comment --- src/datasets/arrow_writer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 0f1a786e1af..587c64076bf 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -741,6 +741,7 @@ def parquet_to_arrow(source, destination) -> List[int]: """Convert parquet file to arrow file. Inputs can be str paths or file-like objects""" stream = None if isinstance(destination, str) else destination parquet_file = pa.parquet.ParquetFile(source) + # Beam can create empty Parquet files, so we need to pass the source Parquet file's schema with ArrowWriter(schema=parquet_file.schema_arrow, path=destination, stream=stream) as writer: for record_batch in parquet_file.iter_batches(): pa_table = pa.Table.from_batches([record_batch]) From da085d8731309a8f44251e71f42d047ce5b76ac8 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 21 Dec 2023 16:51:49 +0100 Subject: [PATCH 09/21] Don't install beam --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0688e2057eb..8707642aafd 100644 --- a/setup.py +++ b/setup.py @@ -166,7 +166,7 @@ "pytest-datadir", "pytest-xdist", # optional dependencies - "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions + # "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions "elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch() "faiss-cpu>=1.6.4", "jax>=0.3.14; sys_platform != 'win32'", From 2aec0f73f0f7a7b099b90e5a186085eca4b2811b Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 21 Dec 2023 17:20:20 +0100 Subject: [PATCH 10/21] Fix tests --- src/datasets/table.py | 52 +++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index 33dd82eeff8..2ed9896cac3 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -20,30 +20,6 @@ logger = get_logger(__name__) -_IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED = config.PYARROW_VERSION.major > 12 -_IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED = config.PYARROW_VERSION.major > 13 - - -def _list_array_flatten(array: Union[pa.ListArray, pa.FixedSizeListArray]) -> pa.Array: - if _IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED and pc.all(array.is_null()).as_py(): - return array.values[:0] # Preserve the type - else: - return array.flatten() - - -def _list_array_to_fixed_size_list_array_values(array: pa.ListArray, list_size: int) -> pa.Array: - array_offsets = np.array(array.offsets) - offsets_pair_iter = iter(zip(array_offsets[:-1], array_offsets[1:])) - array_values = array.values[:0] - for is_value_valid in np.array(array.is_valid()): - start, end = next(offsets_pair_iter) - if is_value_valid: - array_values = pa.concat_arrays([array_values, array.values[start:end]]) - else: - array_values = pa.concat_arrays([array_values, pa.nulls(list_size, array_values.type)]) - return array_values - - def inject_arrow_table_documentation(arrow_table_method): def wrapper(fn): fn.__doc__ = arrow_table_method.__doc__ + (fn.__doc__ if fn.__doc__ is not None else "") @@ -1821,6 +1797,30 @@ def wrapper(array, *args, **kwargs): return wrapper +_IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED = config.PYARROW_VERSION.major > 12 +_IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED = config.PYARROW_VERSION.major > 13 + + +def _list_array_flatten(array: Union[pa.ListArray, pa.FixedSizeListArray]) -> pa.Array: + if not _IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED and pc.all(array.is_null()).as_py(): + return array.values[:0] # Preserve the type + else: + return array.flatten() + + +def _list_array_to_fixed_size_list_array_values(array: pa.ListArray, list_size: int) -> pa.Array: + array_offsets = np.array(array.offsets) + offsets_pair_iter = iter(zip(array_offsets[:-1], array_offsets[1:])) + array_values = array.values[:0] + for is_value_valid in np.array(array.is_valid()): + start, end = next(offsets_pair_iter) + if is_value_valid: + array_values = pa.concat_arrays([array_values, array.values[start:end]]) + else: + array_values = pa.concat_arrays([array_values, pa.nulls(list_size, array_values.type)]) + return array_values + + @_wrap_for_chunked_arrays def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): """Improved version of `pa.Array.cast` @@ -1873,7 +1873,7 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): array_values = array.cast(pa.list_(array.type.value_type, pa_type.list_size)).values else: warnings.warn( - "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make it fast." + "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make this cast fast." ) array_values = _list_array_to_fixed_size_list_array_values(array, pa_type.list_size) return pa.Array.from_buffers( @@ -2001,7 +2001,7 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ array_values = array.cast(pa.list_(array.type.value_type, feature.length)).values else: warnings.warn( - "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make it fast." + "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make this cast fast." ) array_values = _list_array_to_fixed_size_list_array_values(array, feature.length) c_array_values = _c(array_values, feature.feature) From 12c4c57df577a21e0816bd6229049f141cadcb83 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 21 Dec 2023 20:47:28 +0100 Subject: [PATCH 11/21] Still run beam tests? --- additional-tests-requirements.txt | 1 + setup.py | 3 +-- src/datasets/arrow_writer.py | 6 +----- tests/test_beam.py | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/additional-tests-requirements.txt b/additional-tests-requirements.txt index 00b5b8d62a3..f68d7e6da27 100644 --- a/additional-tests-requirements.txt +++ b/additional-tests-requirements.txt @@ -2,3 +2,4 @@ unbabel-comet>=1.0.0 git+https://github.com/google-research/bleurt.git git+https://github.com/ns-moosavi/coval.git git+https://github.com/hendrycks/math.git +apache-beam>=2.52.0; sys_platform != 'win32' and python_version<'3.10' # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions diff --git a/setup.py b/setup.py index 8707642aafd..47e5539b452 100644 --- a/setup.py +++ b/setup.py @@ -166,7 +166,6 @@ "pytest-datadir", "pytest-xdist", # optional dependencies - # "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions "elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch() "faiss-cpu>=1.6.4", "jax>=0.3.14; sys_platform != 'win32'", @@ -233,7 +232,7 @@ EXTRAS_REQUIRE = { "audio": AUDIO_REQUIRE, "vision": VISION_REQUIRE, - "apache-beam": ["apache-beam>=2.26.0"], + "apache-beam": ["apache-beam>=2.52.0"], "tensorflow": [ "tensorflow>=2.2.0,!=2.6.0,!=2.6.1; sys_platform != 'darwin' or platform_machine != 'arm64'", "tensorflow-macos; sys_platform == 'darwin' and platform_machine == 'arm64'", diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 587c64076bf..96d5fe52b9e 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -683,17 +683,13 @@ def finalize(self, metrics_query_result: dict): parquet_path = str(Path(parquet_path)) if not is_remote_filesystem(fs) else fs.unstrip_protocol(parquet_path) shards_metadata = list(beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list) - shards = [metadata.path for metadata in shards_metadata] + shards = sorted([metadata.path for metadata in shards_metadata]) # Our shard logic assumes sorted shards num_bytes = sum([metadata.size_in_bytes for metadata in shards_metadata]) shard_lengths = get_parquet_lengths(shards) # Convert to arrow if self._path.endswith(".arrow"): logger.info(f"Converting parquet files {self._parquet_path} to arrow {self._path}") - shards = [ - metadata.path - for metadata in beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list - ] try: # stream conversion num_bytes = 0 for shard in hf_tqdm(shards, unit="shards"): diff --git a/tests/test_beam.py b/tests/test_beam.py index 3cc91d4e452..aecfed5560b 100644 --- a/tests/test_beam.py +++ b/tests/test_beam.py @@ -105,7 +105,7 @@ def test_download_and_prepare_sharded(self): self.assertTrue( os.path.exists( os.path.join( - tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-00002.arrow" + tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00001-of-00002.arrow" ) ) ) From 00e78560e16cf202962cd9456def73fb8667bea1 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 21 Dec 2023 20:56:01 +0100 Subject: [PATCH 12/21] Revert "Still run beam tests?" This reverts commit 12c4c57df577a21e0816bd6229049f141cadcb83. --- additional-tests-requirements.txt | 1 - setup.py | 3 ++- src/datasets/arrow_writer.py | 6 +++++- tests/test_beam.py | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/additional-tests-requirements.txt b/additional-tests-requirements.txt index f68d7e6da27..00b5b8d62a3 100644 --- a/additional-tests-requirements.txt +++ b/additional-tests-requirements.txt @@ -2,4 +2,3 @@ unbabel-comet>=1.0.0 git+https://github.com/google-research/bleurt.git git+https://github.com/ns-moosavi/coval.git git+https://github.com/hendrycks/math.git -apache-beam>=2.52.0; sys_platform != 'win32' and python_version<'3.10' # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions diff --git a/setup.py b/setup.py index 47e5539b452..8707642aafd 100644 --- a/setup.py +++ b/setup.py @@ -166,6 +166,7 @@ "pytest-datadir", "pytest-xdist", # optional dependencies + # "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions "elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch() "faiss-cpu>=1.6.4", "jax>=0.3.14; sys_platform != 'win32'", @@ -232,7 +233,7 @@ EXTRAS_REQUIRE = { "audio": AUDIO_REQUIRE, "vision": VISION_REQUIRE, - "apache-beam": ["apache-beam>=2.52.0"], + "apache-beam": ["apache-beam>=2.26.0"], "tensorflow": [ "tensorflow>=2.2.0,!=2.6.0,!=2.6.1; sys_platform != 'darwin' or platform_machine != 'arm64'", "tensorflow-macos; sys_platform == 'darwin' and platform_machine == 'arm64'", diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 96d5fe52b9e..587c64076bf 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -683,13 +683,17 @@ def finalize(self, metrics_query_result: dict): parquet_path = str(Path(parquet_path)) if not is_remote_filesystem(fs) else fs.unstrip_protocol(parquet_path) shards_metadata = list(beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list) - shards = sorted([metadata.path for metadata in shards_metadata]) # Our shard logic assumes sorted shards + shards = [metadata.path for metadata in shards_metadata] num_bytes = sum([metadata.size_in_bytes for metadata in shards_metadata]) shard_lengths = get_parquet_lengths(shards) # Convert to arrow if self._path.endswith(".arrow"): logger.info(f"Converting parquet files {self._parquet_path} to arrow {self._path}") + shards = [ + metadata.path + for metadata in beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list + ] try: # stream conversion num_bytes = 0 for shard in hf_tqdm(shards, unit="shards"): diff --git a/tests/test_beam.py b/tests/test_beam.py index aecfed5560b..3cc91d4e452 100644 --- a/tests/test_beam.py +++ b/tests/test_beam.py @@ -105,7 +105,7 @@ def test_download_and_prepare_sharded(self): self.assertTrue( os.path.exists( os.path.join( - tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00001-of-00002.arrow" + tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-00002.arrow" ) ) ) From 9a694d81ff8b47e1784051a4c45ed3a44c0b80f7 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 23 Jan 2024 19:21:45 +0100 Subject: [PATCH 13/21] Nit --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8707642aafd..1469c57bfa0 100644 --- a/setup.py +++ b/setup.py @@ -166,7 +166,7 @@ "pytest-datadir", "pytest-xdist", # optional dependencies - # "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and windows releases require older PyArrow versions + "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and on windows requires pyarrow<12.0.0 "elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch() "faiss-cpu>=1.6.4", "jax>=0.3.14; sys_platform != 'win32'", From 4828edfe57ce566231bffbd5ded3054d6835e8e2 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Fri, 26 Jan 2024 16:19:14 +0100 Subject: [PATCH 14/21] Cleaner implementation --- src/datasets/table.py | 248 +++++++++++++++++++++++------------------- 1 file changed, 137 insertions(+), 111 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index 2ed9896cac3..a1a04294339 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1797,17 +1797,9 @@ def wrapper(array, *args, **kwargs): return wrapper -_IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED = config.PYARROW_VERSION.major > 12 _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED = config.PYARROW_VERSION.major > 13 -def _list_array_flatten(array: Union[pa.ListArray, pa.FixedSizeListArray]) -> pa.Array: - if not _IS_FLATTEN_ON_NULL_LIST_ARRAY_SUPPORTED and pc.all(array.is_null()).as_py(): - return array.values[:0] # Preserve the type - else: - return array.flatten() - - def _list_array_to_fixed_size_list_array_values(array: pa.ListArray, list_size: int) -> pa.Array: array_offsets = np.array(array.offsets) offsets_pair_iter = iter(zip(array_offsets[:-1], array_offsets[1:])) @@ -1821,6 +1813,21 @@ def _list_array_to_fixed_size_list_array_values(array: pa.ListArray, list_size: return array_values +def _are_list_values_of_length(array: pa.ListArray, length: int) -> bool: + """Check if all the sub-lists of a `pa.ListArray` have the specified length.""" + return pc.all(pc.equal(array.value_lengths(), length)).as_py() + + +# def _combine_list_array_offsets_with_mask(array: pa.ListArray) -> np.ndarray: +# """Combine the offsets of a `pa.ListArray` with its mask to get the offsets of the non-null values.""" +# if array.null_count == 0: +# return np.array(array.offsets) +# else: +# offsets = np.array(array.offsets) +# offsets[1:] = np.cumsum(np.concatenate([[0], array.is_valid()[:-1]])) +# return offsets + + @_wrap_for_chunked_arrays def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): """Improved version of `pa.Array.cast` @@ -1863,11 +1870,8 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): return pa.StructArray.from_arrays(arrays, fields=list(pa_type), mask=array.is_null()) elif pa.types.is_list(array.type): if pa.types.is_fixed_size_list(pa_type): - if pc.all( - pc.equal(array.value_lengths(), pa_type.list_size) - ).as_py(): # Make sure all the sublists have the same length (non-valid members are ignored) + if _are_list_values_of_length(array, pa_type.list_size): if array.null_count > 0: - # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) array_values = array.cast(pa.list_(array.type.value_type, pa_type.list_size)).values @@ -1876,43 +1880,48 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make this cast fast." ) array_values = _list_array_to_fixed_size_list_array_values(array, pa_type.list_size) - return pa.Array.from_buffers( - pa_type, - len(array), - [array.is_valid().buffers()[1]], - children=[_c(array_values, pa_type.value_type)], - ) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa_type, + len(array), + [array.is_valid().buffers()[1]], + children=[_c(array_values, pa_type.value_type)], + ) + else: + return pa.FixedSizeListArray.from_arrays(array_values, pa_type.list_size, mask=array.is_null()) else: - array_values = _list_array_flatten(array) + array_values = array.values[ + array.offset * pa_type.length : (array.offset + len(array)) * pa_type.length + ] return pa.FixedSizeListArray.from_arrays(_c(array_values, pa_type.value_type), pa_type.list_size) elif pa.types.is_list(pa_type): - # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.offset: - array_offsets = np.array(array.offsets) - array_values = _list_array_flatten(array) - array_offsets -= array_offsets[0] - else: - array_offsets = array.offsets - array_values = array.values - return pa.ListArray.from_arrays(array_offsets, _c(array_values, pa_type.value_type), mask=array.is_null()) + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.null_count > 0: + array_offsets = pa.concat_arrays( + [ + pc.replace_with_mask(array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), + array_offsets[-1:], + ] + ) + return pa.ListArray.from_arrays(array_offsets, _c(array.values, pa_type.value_type)) elif pa.types.is_fixed_size_list(array.type): if pa.types.is_fixed_size_list(pa_type): if pa_type.list_size == array.type.list_size: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] # `array.values` preserves the null arrays unlike `array.flatten()` (we cannot use offsets here to define valid members) - # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 - return pa.Array.from_buffers( - pa_type, - len(array), - [array.is_valid().buffers()[1]], - children=[_c(array_values, pa_type.value_type)], - ) + ] # Don't use `array.flatten()` to preserve null lists + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa_type, + len(array), + [array.is_valid().buffers()[1]], + children=[_c(array_values, pa_type.value_type)], + ) + else: + return pa.FixedSizeListArray.from_arrays(array_values, pa_type.list_size, mask=array.is_null()) elif pa.types.is_list(pa_type): - array_values = _list_array_flatten(array) - array_offsets = np.concatenate([[0], array.is_valid()]) * array.type.list_size - array_offsets = np.cumsum(array_offsets, dtype=np.int32) - return pa.ListArray.from_arrays(array_offsets, _c(array_values, pa_type.value_type), mask=array.is_null()) + array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size + return pa.ListArray.from_arrays(array_offsets, _c(array.values, pa_type.value_type), mask=array.is_null()) else: if ( not allow_number_to_str @@ -1955,6 +1964,9 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ """ from .features.features import Sequence, get_nested_type + # if isinstance(feature, Sequence): + # print("array off", array.offset) + _c = partial(cast_array_to_feature, allow_number_to_str=allow_number_to_str) if isinstance(array, pa.ExtensionArray): @@ -1976,26 +1988,25 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) if isinstance(feature, list): - # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.offset: - array_offsets = np.array(array.offsets) - array_values = _list_array_flatten(array) - array_offsets -= array_offsets[0] - else: - array_offsets = array.offsets - array_values = array.values - c_array_values = _c(array_values, feature[0]) - if c_array_values.type == array_values.type: + array_offsets = array.offsets + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.null_count > 0: + array_offsets = pa.concat_arrays( + [ + pc.replace_with_mask(array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), + array_offsets[-1:], + ] + ) + + casted_array_values = _c(array.values, feature[0]) + if casted_array_values.type == array.values.type: return array else: - return pa.ListArray.from_arrays(array_offsets, c_array_values, mask=array.is_null()) + return pa.ListArray.from_arrays(array_offsets, casted_array_values) elif isinstance(feature, Sequence): if feature.length > -1: - if pc.all( - pc.equal(array.value_lengths(), feature.length) - ).as_py(): # # Make sure all the sublists have the same length (non-valid members are ignored) + if _are_list_values_of_length(array, feature.length): if array.null_count > 0: - # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) array_values = array.cast(pa.list_(array.type.value_type, feature.length)).values @@ -2004,56 +2015,67 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make this cast fast." ) array_values = _list_array_to_fixed_size_list_array_values(array, feature.length) - c_array_values = _c(array_values, feature.feature) - return pa.Array.from_buffers( - pa.list_(c_array_values.type, feature.length), - len(array), - [array.is_valid().buffers()[1]], - children=[c_array_values], - ) + casted_array_values = _c(array_values, feature.feature) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa.list_(casted_array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[casted_array_values], + ) + else: + return pa.FixedSizeListArray.from_arrays( + casted_array_values, feature.length, mask=array.is_null() + ) else: - array_values = _list_array_flatten(array) + array_values = array.values[ + array.offset * feature.length : (array.offset + len(array)) * feature.length + ] return pa.FixedSizeListArray.from_arrays(_c(array_values, feature.feature), feature.length) else: - # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.offset: - array_offsets = np.array(array.offsets) - array_values = _list_array_flatten(array) - array_offsets -= array_offsets[0] - else: - array_offsets = array.offsets - array_values = array.values - c_array_values = _c(array_values, feature.feature) - if c_array_values.type == array_values.type: + array_offsets = array.offsets + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.null_count > 0: + array_offsets = pa.concat_arrays( + [ + pc.replace_with_mask( + array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32()) + ), + array_offsets[-1:], + ] + ) + + casted_array_values = _c(array.values, feature.feature) + if casted_array_values.type == array.values.type: return array else: - return pa.ListArray.from_arrays(array_offsets, c_array_values, mask=array.is_null()) + return pa.ListArray.from_arrays(array_offsets, casted_array_values) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) if isinstance(feature, list): - array_values = _list_array_flatten(array) - array_offsets = np.concatenate([[0], array.is_valid()]) * array.type.list_size - array_offsets = np.cumsum(array_offsets, dtype=np.int32) - return pa.ListArray.from_arrays(array_offsets, _c(array_values, feature[0]), mask=array.is_null()) + array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size + return pa.ListArray.from_arrays(array_offsets, _c(array.values, feature[0]), mask=array.is_null()) elif isinstance(feature, Sequence): if feature.length > -1: if feature.length == array.type.list_size: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] # `array.values` preserves the null arrays unlike `array.flatten()` (we cannot use offsets here to define valid members) - # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 - c_array_values = _c(array_values, feature.feature) - return pa.Array.from_buffers( - pa.list_(c_array_values.type, feature.length), - len(array), - [array.is_valid().buffers()[1]], - children=[c_array_values], - ) + ] # Don't use `array.flatten()` to preserve null lists + casted_array_values = _c(array_values, feature.feature) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa.list_(casted_array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[casted_array_values], + ) + else: + return pa.FixedSizeListArray.from_arrays( + casted_array_values, feature.length, mask=array.is_null() + ) else: - array_values = _list_array_flatten(array) - array_offsets = np.concatenate([[0], array.is_valid()]) * array.type.list_size - array_offsets = np.cumsum(array_offsets, dtype=np.int32) - return pa.ListArray.from_arrays(array_offsets, _c(array_values, feature.feature), mask=array.is_null()) + array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size + return pa.ListArray.from_arrays(array_offsets, _c(array.values, feature.feature), mask=array.is_null()) if pa.types.is_null(array.type): return array_cast(array, get_nested_type(feature), allow_number_to_str=allow_number_to_str) elif not isinstance(feature, (Sequence, dict, list, tuple)): @@ -2065,7 +2087,7 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ def embed_array_storage(array: pa.Array, feature: "FeatureType"): """Embed data into an arrays's storage. For custom features like Audio or Image, it takes into account the "embed_storage" methods - they defined to enable embedding external data (e.g. an image file) into an other arrow types. + they define to embed external data (e.g. an image file) into an array. @@ -2102,32 +2124,36 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): return pa.StructArray.from_arrays(arrays, names=list(feature), mask=array.is_null()) elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - # Avoids the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.offset: - array_offsets = np.array(array.offsets) - array_values = _list_array_flatten(array) - array_offsets -= array_offsets[0] - else: - array_offsets = array.offsets - array_values = array.values + array_offsets = array.offsets + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + if array.null_count > 0: + array_offsets = pa.concat_arrays( + [ + pc.replace_with_mask(array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), + array_offsets[-1:], + ] + ) + if isinstance(feature, list): - return pa.ListArray.from_arrays(array_offsets, _e(array_values, feature[0]), mask=array.is_null()) + return pa.ListArray.from_arrays(array_offsets, _e(array.values, feature[0])) if isinstance(feature, Sequence) and feature.length == -1: - return pa.ListArray.from_arrays(array_offsets, _e(array_values, feature.feature), mask=array.is_null()) + return pa.ListArray.from_arrays(array_offsets, _e(array.values, feature.feature)) elif pa.types.is_fixed_size_list(array.type): # feature must be Sequence(subfeature) if isinstance(feature, Sequence) and feature.length > -1: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] # `array.values` preserves the null arrays unlike `array.flatten()` (we cannot use offsets here to define valid members) - # pa.FixedSizeListArray does not support mask: https://github.com/apache/arrow/issues/34316 - e_array_values = _e(array_values, feature.feature) - return pa.Array.from_buffers( - pa.list_(array_values.type, feature.length), - len(array), - [array.is_valid().buffers()[1]], - children=[e_array_values], - ) + ] # Don't use `array.flatten()` to preserve null lists + embedded_array_values = _e(array_values, feature.feature) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa.list_(array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[embedded_array_values], + ) + else: + return pa.FixedSizeListArray.from_arrays(embedded_array_values, feature.length, mask=array.is_null()) if not isinstance(feature, (Sequence, dict, list, tuple)): return array raise TypeError(f"Couldn't embed array of type\n{array.type}\nwith\n{feature}") From 087140eb1ab8c8047354b8e47fc39454eaeeec5a Mon Sep 17 00:00:00 2001 From: mariosasko Date: Wed, 31 Jan 2024 03:36:54 +0100 Subject: [PATCH 15/21] Cleaner impl part 2 --- src/datasets/table.py | 140 +++++++++++++++++------------------------- tests/test_table.py | 34 +++++----- 2 files changed, 75 insertions(+), 99 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index a1a04294339..a5ad2e0c16a 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1,6 +1,5 @@ import copy import os -import warnings from functools import partial from itertools import groupby from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Tuple, TypeVar, Union @@ -8,6 +7,7 @@ import numpy as np import pyarrow as pa import pyarrow.compute as pc +import pyarrow.types from . import config from .utils.logging import get_logger @@ -1797,35 +1797,35 @@ def wrapper(array, *args, **kwargs): return wrapper -_IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED = config.PYARROW_VERSION.major > 13 - - -def _list_array_to_fixed_size_list_array_values(array: pa.ListArray, list_size: int) -> pa.Array: - array_offsets = np.array(array.offsets) - offsets_pair_iter = iter(zip(array_offsets[:-1], array_offsets[1:])) - array_values = array.values[:0] - for is_value_valid in np.array(array.is_valid()): - start, end = next(offsets_pair_iter) - if is_value_valid: - array_values = pa.concat_arrays([array_values, array.values[start:end]]) - else: - array_values = pa.concat_arrays([array_values, pa.nulls(list_size, array_values.type)]) - return array_values - - def _are_list_values_of_length(array: pa.ListArray, length: int) -> bool: """Check if all the sub-lists of a `pa.ListArray` have the specified length.""" - return pc.all(pc.equal(array.value_lengths(), length)).as_py() + return pc.all(pc.equal(array.value_lengths(), length)).as_py() or array.null_count == len(array) + + +def _combine_list_array_offsets_with_mask(array: pa.ListArray) -> pa.Array: + """Add the null bitmap to the offsets of a `pa.ListArray`.""" + offsets = array.offsets + if array.null_count > 0: + offsets = pa.concat_arrays( + [ + pc.replace_with_mask(offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), + offsets[-1:], + ] + ) + return offsets -# def _combine_list_array_offsets_with_mask(array: pa.ListArray) -> np.ndarray: -# """Combine the offsets of a `pa.ListArray` with its mask to get the offsets of the non-null values.""" -# if array.null_count == 0: -# return np.array(array.offsets) -# else: -# offsets = np.array(array.offsets) -# offsets[1:] = np.cumsum(np.concatenate([[0], array.is_valid()[:-1]])) -# return offsets +def _storage_type(type: pa.DataType) -> pa.DataType: + """Convert a (possibly nested) `pa.ExtensionType` to its storage type.""" + if isinstance(type, pa.ExtensionType): + return _storage_type(type.storage_type) + elif isinstance(type, pa.StructType): + return pa.struct([pa.field(field.name, _storage_type(field.type)) for field in type]) + elif isinstance(type, pa.ListType): + return pa.list_(_storage_type(type.value_type)) + elif isinstance(type, pa.FixedSizeListType): + return pa.list_(_storage_type(type.value_type), type.list_size) + return type @_wrap_for_chunked_arrays @@ -1872,14 +1872,13 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): if pa.types.is_fixed_size_list(pa_type): if _are_list_values_of_length(array, pa_type.list_size): if array.null_count > 0: - if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: - # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) - array_values = array.cast(pa.list_(array.type.value_type, pa_type.list_size)).values - else: - warnings.warn( - "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make this cast fast." - ) - array_values = _list_array_to_fixed_size_list_array_values(array, pa_type.list_size) + # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array + array_type = array.type + # `pc.list_slice` does not support extension types, so we cast to the storage type, and re-cast to the original type after the slice operation + array = array_cast(array, _storage_type(array_type)) + array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) + array = array_cast(array, array_type) + array_values = array.values if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( pa_type, @@ -1888,7 +1887,9 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): children=[_c(array_values, pa_type.value_type)], ) else: - return pa.FixedSizeListArray.from_arrays(array_values, pa_type.list_size, mask=array.is_null()) + return pa.FixedSizeListArray.from_arrays( + _c(array_values, pa_type.value_type), pa_type.list_size, mask=array.is_null() + ) else: array_values = array.values[ array.offset * pa_type.length : (array.offset + len(array)) * pa_type.length @@ -1896,20 +1897,14 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): return pa.FixedSizeListArray.from_arrays(_c(array_values, pa_type.value_type), pa_type.list_size) elif pa.types.is_list(pa_type): # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.null_count > 0: - array_offsets = pa.concat_arrays( - [ - pc.replace_with_mask(array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), - array_offsets[-1:], - ] - ) + array_offsets = _combine_list_array_offsets_with_mask(array) return pa.ListArray.from_arrays(array_offsets, _c(array.values, pa_type.value_type)) elif pa.types.is_fixed_size_list(array.type): if pa.types.is_fixed_size_list(pa_type): if pa_type.list_size == array.type.list_size: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] # Don't use `array.flatten()` to preserve null lists + ] if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( pa_type, @@ -1918,7 +1913,9 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): children=[_c(array_values, pa_type.value_type)], ) else: - return pa.FixedSizeListArray.from_arrays(array_values, pa_type.list_size, mask=array.is_null()) + return pa.FixedSizeListArray.from_arrays( + _c(array_values, pa_type.value_type), pa_type.list_size, mask=array.is_null() + ) elif pa.types.is_list(pa_type): array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size return pa.ListArray.from_arrays(array_offsets, _c(array.values, pa_type.value_type), mask=array.is_null()) @@ -1988,33 +1985,24 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) if isinstance(feature, list): - array_offsets = array.offsets - # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.null_count > 0: - array_offsets = pa.concat_arrays( - [ - pc.replace_with_mask(array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), - array_offsets[-1:], - ] - ) - casted_array_values = _c(array.values, feature[0]) if casted_array_values.type == array.values.type: return array else: + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + array_offsets = _combine_list_array_offsets_with_mask(array) return pa.ListArray.from_arrays(array_offsets, casted_array_values) elif isinstance(feature, Sequence): if feature.length > -1: if _are_list_values_of_length(array, feature.length): if array.null_count > 0: - if _IS_LIST_ARRAY_TO_FIXED_SIZE_LIST_ARRAY_CAST_SUPPORTED: - # `array.cast` to the fixed type so that the null arrays' `.values` are preserved (the `pc.fill_null` kernel fails on extension types) - array_values = array.cast(pa.list_(array.type.value_type, feature.length)).values - else: - warnings.warn( - "Casting from ListArray to FixedSizeListArray is not natively supported in `pyarrow<14`. Consider upgrading to `pyarrow>=14` to make this cast fast." - ) - array_values = _list_array_to_fixed_size_list_array_values(array, feature.length) + # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array + array_type = array.type + # `pc.list_slice` does not support extension types, so we cast to the storage type, and re-cast to the original type after the slice operation + array = array_cast(array, _storage_type(array_type)) + array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) + array = array_cast(array, array_type) + array_values = array.values casted_array_values = _c(array_values, feature.feature) if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( @@ -2033,22 +2021,12 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ ] return pa.FixedSizeListArray.from_arrays(_c(array_values, feature.feature), feature.length) else: - array_offsets = array.offsets - # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.null_count > 0: - array_offsets = pa.concat_arrays( - [ - pc.replace_with_mask( - array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32()) - ), - array_offsets[-1:], - ] - ) - casted_array_values = _c(array.values, feature.feature) if casted_array_values.type == array.values.type: return array else: + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + array_offsets = _combine_list_array_offsets_with_mask(array) return pa.ListArray.from_arrays(array_offsets, casted_array_values) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) @@ -2060,7 +2038,7 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ if feature.length == array.type.list_size: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] # Don't use `array.flatten()` to preserve null lists + ] casted_array_values = _c(array_values, feature.feature) if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( @@ -2124,16 +2102,8 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): return pa.StructArray.from_arrays(arrays, names=list(feature), mask=array.is_null()) elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - array_offsets = array.offsets # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError - if array.null_count > 0: - array_offsets = pa.concat_arrays( - [ - pc.replace_with_mask(array_offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), - array_offsets[-1:], - ] - ) - + array_offsets = _combine_list_array_offsets_with_mask(array) if isinstance(feature, list): return pa.ListArray.from_arrays(array_offsets, _e(array.values, feature[0])) if isinstance(feature, Sequence) and feature.length == -1: @@ -2143,7 +2113,7 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): if isinstance(feature, Sequence) and feature.length > -1: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size - ] # Don't use `array.flatten()` to preserve null lists + ] embedded_array_values = _e(array_values, feature.feature) if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( diff --git a/tests/test_table.py b/tests/test_table.py index 30964a2c2a9..67c36f8727b 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -1169,6 +1169,8 @@ def test_cast_fixed_size_list_array_to_features_sequence(arr, slice, target_valu casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size)) assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=arr.type.list_size)) assert casted_array.to_pylist() == arr.to_pylist() + with pytest.raises(TypeError): + cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size + 1)) # Variable size list casted_array = cast_array_to_feature(arr, Sequence(target_value_feature)) assert casted_array.type == get_nested_type(Sequence(target_value_feature)) @@ -1177,9 +1179,6 @@ def test_cast_fixed_size_list_array_to_features_sequence(arr, slice, target_valu assert casted_array.type == get_nested_type([target_value_feature]) assert casted_array.to_pylist() == arr.to_pylist() - with pytest.raises(TypeError): - cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size + 1)) - @pytest.mark.parametrize( "arr", @@ -1199,17 +1198,24 @@ def test_cast_list_array_to_features_sequence(arr, slice, target_value_feature): assert casted_array.type == get_nested_type([target_value_feature]) assert casted_array.to_pylist() == arr.to_pylist() # Fixed size list - value_lengths = arr.value_lengths().to_pylist() - value_lengths = [i for i in value_lengths if i is not None] - if len(set(value_lengths)) == 1: - casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=value_lengths[0])) - assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=value_lengths[0])) - assert casted_array.to_pylist() == arr.to_pylist() - else: - with pytest.raises(TypeError): - cast_array_to_feature( - arr, Sequence(target_value_feature, length=value_lengths[0] if value_lengths else len(arr)) - ) + list_size = arr.value_lengths().drop_null()[0].as_py() if arr.value_lengths().drop_null() else 2 + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=list_size)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=list_size)) + assert casted_array.to_pylist() == arr.to_pylist() + + +def test_cast_list_extension_array_to_features_sequence(): + arr = np.random.randint(0, 10, size=(8, 2, 3)).tolist() + arr = Array2DExtensionType(shape=(2, 3), dtype="int64").wrap_array(pa.array(arr, pa.list_(pa.list_(pa.int64())))) + arr = pa.ListArray.from_arrays([0, None, 4, 8], arr) + # Variable size list + casted_array = cast_array_to_feature(arr, Sequence(Array2D(shape=(2, 3), dtype="int32"))) + assert casted_array.type == get_nested_type(Sequence(Array2D(shape=(2, 3), dtype="int32"))) + assert casted_array.to_pylist() == arr.to_pylist() + # Fixed size list + casted_array = cast_array_to_feature(arr, Sequence(Array2D(shape=(2, 3), dtype="int32"), length=4)) + assert casted_array.type == get_nested_type(Sequence(Array2D(shape=(2, 3), dtype="int32"), length=4)) + assert casted_array.to_pylist() == arr.to_pylist() def test_embed_array_storage(image_file): From 2881a1a852a5119780f9b1a5f855b94debe4be8b Mon Sep 17 00:00:00 2001 From: mariosasko Date: Wed, 31 Jan 2024 15:46:02 +0100 Subject: [PATCH 16/21] Nit --- src/datasets/table.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index a5ad2e0c16a..027bf732ebb 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1874,10 +1874,10 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): if array.null_count > 0: # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array array_type = array.type - # `pc.list_slice` does not support extension types, so we cast to the storage type, and re-cast to the original type after the slice operation - array = array_cast(array, _storage_type(array_type)) + # Temporarily convert to the storage type to support extension types in the slice operation + array = _c(array, _storage_type(array_type)) array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) - array = array_cast(array, array_type) + array = _c(array, array_type) array_values = array.values if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( @@ -1998,10 +1998,10 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ if array.null_count > 0: # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array array_type = array.type - # `pc.list_slice` does not support extension types, so we cast to the storage type, and re-cast to the original type after the slice operation - array = array_cast(array, _storage_type(array_type)) + # Temporarily convert to the storage type to support extension types in the slice operation + array = array_cast(array, _storage_type(array_type), allow_number_to_str=allow_number_to_str) array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) - array = array_cast(array, array_type) + array = array_cast(array, array_type, allow_number_to_str=allow_number_to_str) array_values = array.values casted_array_values = _c(array_values, feature.feature) if config.PYARROW_VERSION.major < 15: From 86c8ac289875e34d4e7bb341ea5939b6079d6b8d Mon Sep 17 00:00:00 2001 From: mariosasko Date: Wed, 31 Jan 2024 19:27:25 +0100 Subject: [PATCH 17/21] Fix CI --- src/datasets/arrow_writer.py | 22 ++++++---------------- tests/test_beam.py | 2 +- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 587c64076bf..b550d45d55f 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -674,33 +674,23 @@ def finalize(self, metrics_query_result: dict): metrics_query_result: `dict` obtained from pipeline_results.metrics().query(m_filter). Make sure that the filter keeps only the metrics for the considered split, under the namespace `split_name`. """ - import apache_beam as beam - - from .utils import beam_utils # Beam FileSystems require the system's path separator in the older versions fs, _, [parquet_path] = fsspec.get_fs_token_paths(self._parquet_path) parquet_path = str(Path(parquet_path)) if not is_remote_filesystem(fs) else fs.unstrip_protocol(parquet_path) - shards_metadata = list(beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list) - shards = [metadata.path for metadata in shards_metadata] - num_bytes = sum([metadata.size_in_bytes for metadata in shards_metadata]) + shards = fs.glob(parquet_path + "*.parquet") + num_bytes = sum(fs.sizes(shards)) shard_lengths = get_parquet_lengths(shards) # Convert to arrow if self._path.endswith(".arrow"): logger.info(f"Converting parquet files {self._parquet_path} to arrow {self._path}") - shards = [ - metadata.path - for metadata in beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list - ] try: # stream conversion num_bytes = 0 for shard in hf_tqdm(shards, unit="shards"): - with beam.io.filesystems.FileSystems.open(shard) as source: - with beam.io.filesystems.FileSystems.create( - shard.replace(".parquet", ".arrow") - ) as destination: + with fs.open(shard, "rb") as source: + with fs.open(shard.replace(".parquet", ".arrow"), "wb") as destination: shard_num_bytes, _ = parquet_to_arrow(source, destination) num_bytes += shard_num_bytes except OSError as e: # broken pipe can happen if the connection is unstable, do local conversion instead @@ -714,12 +704,12 @@ def finalize(self, metrics_query_result: dict): num_bytes = 0 for shard in hf_tqdm(shards, unit="shards"): local_parquet_path = os.path.join(local_convert_dir, hash_url_to_filename(shard) + ".parquet") - beam_utils.download_remote_to_local(shard, local_parquet_path) + fs.download(shard, local_parquet_path) local_arrow_path = local_parquet_path.replace(".parquet", ".arrow") shard_num_bytes, _ = parquet_to_arrow(local_parquet_path, local_arrow_path) num_bytes += shard_num_bytes remote_arrow_path = shard.replace(".parquet", ".arrow") - beam_utils.upload_local_to_remote(local_arrow_path, remote_arrow_path) + fs.upload(local_arrow_path, remote_arrow_path) # Save metrics counters_dict = {metric.key.metric.name: metric.result for metric in metrics_query_result["counters"]} diff --git a/tests/test_beam.py b/tests/test_beam.py index 3cc91d4e452..aecfed5560b 100644 --- a/tests/test_beam.py +++ b/tests/test_beam.py @@ -105,7 +105,7 @@ def test_download_and_prepare_sharded(self): self.assertTrue( os.path.exists( os.path.join( - tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-00002.arrow" + tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00001-of-00002.arrow" ) ) ) From 79ee0dffb5a79c3dc8a330896c671cd9f304f83c Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 1 Feb 2024 19:30:18 +0100 Subject: [PATCH 18/21] Optimization --- src/datasets/table.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index 027bf732ebb..174b5789993 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1874,10 +1874,14 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): if array.null_count > 0: # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array array_type = array.type - # Temporarily convert to the storage type to support extension types in the slice operation - array = _c(array, _storage_type(array_type)) - array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) - array = _c(array, array_type) + storage_type = _storage_type(array_type) + if array_type != storage_type: + # Temporarily convert to the storage type to support extension types in the slice operation + array = _c(array, storage_type) + array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) + array = _c(array, array_type) + else: + array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) array_values = array.values if config.PYARROW_VERSION.major < 15: return pa.Array.from_buffers( @@ -1998,10 +2002,14 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ if array.null_count > 0: # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array array_type = array.type + storage_type = _storage_type(array_type) # Temporarily convert to the storage type to support extension types in the slice operation - array = array_cast(array, _storage_type(array_type), allow_number_to_str=allow_number_to_str) - array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) - array = array_cast(array, array_type, allow_number_to_str=allow_number_to_str) + if array_type != storage_type: + array = array_cast(array, storage_type, allow_number_to_str=allow_number_to_str) + array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) + array = array_cast(array, array_type, allow_number_to_str=allow_number_to_str) + else: + array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) array_values = array.values casted_array_values = _c(array_values, feature.feature) if config.PYARROW_VERSION.major < 15: From d088db4dca2664200ee793c5ab7dd85e306051f9 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Thu, 1 Feb 2024 19:30:53 +0100 Subject: [PATCH 19/21] Nit --- src/datasets/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index 174b5789993..ddb5abb9f4c 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -2003,8 +2003,8 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array array_type = array.type storage_type = _storage_type(array_type) - # Temporarily convert to the storage type to support extension types in the slice operation if array_type != storage_type: + # Temporarily convert to the storage type to support extension types in the slice operation array = array_cast(array, storage_type, allow_number_to_str=allow_number_to_str) array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) array = array_cast(array, array_type, allow_number_to_str=allow_number_to_str) From c9343c06c96380ebae9183b990df7cd50a3ca454 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Sun, 4 Feb 2024 19:40:15 +0100 Subject: [PATCH 20/21] Nit --- tests/test_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_table.py b/tests/test_table.py index 67c36f8727b..7db58037245 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -1204,7 +1204,7 @@ def test_cast_list_array_to_features_sequence(arr, slice, target_value_feature): assert casted_array.to_pylist() == arr.to_pylist() -def test_cast_list_extension_array_to_features_sequence(): +def test_cast_array_xd_to_features_sequence(): arr = np.random.randint(0, 10, size=(8, 2, 3)).tolist() arr = Array2DExtensionType(shape=(2, 3), dtype="int64").wrap_array(pa.array(arr, pa.list_(pa.list_(pa.int64())))) arr = pa.ListArray.from_arrays([0, None, 4, 8], arr) From 3a68113f4674a4e2c3a779244b6bb02d58e2a174 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20=C5=A0a=C5=A1ko?= Date: Tue, 6 Feb 2024 17:31:42 +0100 Subject: [PATCH 21/21] Update src/datasets/table.py Co-authored-by: Quentin Lhoest <42851186+lhoestq@users.noreply.github.com> --- src/datasets/table.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/datasets/table.py b/src/datasets/table.py index ddb5abb9f4c..43aa228278f 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1965,9 +1965,6 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ """ from .features.features import Sequence, get_nested_type - # if isinstance(feature, Sequence): - # print("array off", array.offset) - _c = partial(cast_array_to_feature, allow_number_to_str=allow_number_to_str) if isinstance(array, pa.ExtensionArray):