diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d76ae7209ce..4a65aefc759 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,7 @@ jobs: run: pip install --upgrade pyarrow huggingface-hub dill - name: Install dependencies (minimum versions) if: ${{ matrix.deps_versions != 'deps-latest' }} - run: pip install pyarrow==8.0.0 huggingface-hub==0.19.4 transformers dill==0.3.1.1 + run: pip install pyarrow==12.0.0 huggingface-hub==0.19.4 transformers dill==0.3.1.1 - name: Test with pytest run: | python -m pytest -rfExX -m ${{ matrix.test }} -n 2 --dist loadfile -sv ./tests/ diff --git a/setup.py b/setup.py index c6448b8b5f3..dafb0bd719c 100644 --- a/setup.py +++ b/setup.py @@ -113,8 +113,8 @@ # We use numpy>=1.17 to have np.random.Generator (Dataset shuffling) "numpy>=1.17", # Backend and serialization. - # Minimum 8.0.0 to be able to use .to_reader() - "pyarrow>=8.0.0", + # Minimum 12.0.0 to be able to concatenate extension arrays + "pyarrow>=12.0.0", # As long as we allow pyarrow < 14.0.1, to fix vulnerability CVE-2023-47248 "pyarrow-hotfix", # For smart caching dataset processing @@ -166,7 +166,7 @@ "pytest-datadir", "pytest-xdist", # optional dependencies - "apache-beam>=2.26.0,<2.44.0;python_version<'3.10'", # doesn't support recent dill versions for recent python versions + "apache-beam>=2.26.0; sys_platform != 'win32' and python_version<'3.10'", # doesn't support recent dill versions for recent python versions and on windows requires pyarrow<12.0.0 "elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch() "faiss-cpu>=1.6.4", "jax>=0.3.14; sys_platform != 'win32'", @@ -233,7 +233,7 @@ EXTRAS_REQUIRE = { "audio": AUDIO_REQUIRE, "vision": VISION_REQUIRE, - "apache-beam": ["apache-beam>=2.26.0,<2.44.0"], + "apache-beam": ["apache-beam>=2.26.0"], "tensorflow": [ "tensorflow>=2.2.0,!=2.6.0,!=2.6.1; sys_platform != 'darwin' or platform_machine != 'arm64'", "tensorflow-macos; sys_platform == 'darwin' and platform_machine == 'arm64'", diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 5d6d8141f6d..b550d45d55f 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -39,7 +39,7 @@ from .filesystems import is_remote_filesystem from .info import DatasetInfo from .keyhash import DuplicatedKeysError, KeyHasher -from .table import array_cast, array_concat, cast_array_to_feature, embed_table_storage, table_cast +from .table import array_cast, cast_array_to_feature, embed_table_storage, table_cast from .utils import logging from .utils import tqdm as hf_tqdm from .utils.file_utils import hash_url_to_filename @@ -441,7 +441,12 @@ def write_examples_on_file(self): # This can happen in `.map()` when we want to re-write the same Arrow data if all(isinstance(row[0][col], (pa.Array, pa.ChunkedArray)) for row in self.current_examples): arrays = [row[0][col] for row in self.current_examples] - batch_examples[col] = array_concat(arrays) + arrays = [ + chunk + for array in arrays + for chunk in (array.chunks if isinstance(array, pa.ChunkedArray) else [array]) + ] + batch_examples[col] = pa.concat_arrays(arrays) else: batch_examples[col] = [ row[0][col].to_pylist()[0] if isinstance(row[0][col], (pa.Array, pa.ChunkedArray)) else row[0][col] @@ -669,33 +674,23 @@ def finalize(self, metrics_query_result: dict): metrics_query_result: `dict` obtained from pipeline_results.metrics().query(m_filter). Make sure that the filter keeps only the metrics for the considered split, under the namespace `split_name`. """ - import apache_beam as beam - - from .utils import beam_utils # Beam FileSystems require the system's path separator in the older versions fs, _, [parquet_path] = fsspec.get_fs_token_paths(self._parquet_path) parquet_path = str(Path(parquet_path)) if not is_remote_filesystem(fs) else fs.unstrip_protocol(parquet_path) - shards_metadata = list(beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list) - shards = [metadata.path for metadata in shards_metadata] - num_bytes = sum([metadata.size_in_bytes for metadata in shards_metadata]) + shards = fs.glob(parquet_path + "*.parquet") + num_bytes = sum(fs.sizes(shards)) shard_lengths = get_parquet_lengths(shards) # Convert to arrow if self._path.endswith(".arrow"): logger.info(f"Converting parquet files {self._parquet_path} to arrow {self._path}") - shards = [ - metadata.path - for metadata in beam.io.filesystems.FileSystems.match([parquet_path + "*.parquet"])[0].metadata_list - ] try: # stream conversion num_bytes = 0 for shard in hf_tqdm(shards, unit="shards"): - with beam.io.filesystems.FileSystems.open(shard) as source: - with beam.io.filesystems.FileSystems.create( - shard.replace(".parquet", ".arrow") - ) as destination: + with fs.open(shard, "rb") as source: + with fs.open(shard.replace(".parquet", ".arrow"), "wb") as destination: shard_num_bytes, _ = parquet_to_arrow(source, destination) num_bytes += shard_num_bytes except OSError as e: # broken pipe can happen if the connection is unstable, do local conversion instead @@ -709,12 +704,12 @@ def finalize(self, metrics_query_result: dict): num_bytes = 0 for shard in hf_tqdm(shards, unit="shards"): local_parquet_path = os.path.join(local_convert_dir, hash_url_to_filename(shard) + ".parquet") - beam_utils.download_remote_to_local(shard, local_parquet_path) + fs.download(shard, local_parquet_path) local_arrow_path = local_parquet_path.replace(".parquet", ".arrow") shard_num_bytes, _ = parquet_to_arrow(local_parquet_path, local_arrow_path) num_bytes += shard_num_bytes remote_arrow_path = shard.replace(".parquet", ".arrow") - beam_utils.upload_local_to_remote(local_arrow_path, remote_arrow_path) + fs.upload(local_arrow_path, remote_arrow_path) # Save metrics counters_dict = {metric.key.metric.name: metric.result for metric in metrics_query_result["counters"]} @@ -735,8 +730,9 @@ def get_parquet_lengths(sources) -> List[int]: def parquet_to_arrow(source, destination) -> List[int]: """Convert parquet file to arrow file. Inputs can be str paths or file-like objects""" stream = None if isinstance(destination, str) else destination - with ArrowWriter(path=destination, stream=stream) as writer: - parquet_file = pa.parquet.ParquetFile(source) + parquet_file = pa.parquet.ParquetFile(source) + # Beam can create empty Parquet files, so we need to pass the source Parquet file's schema + with ArrowWriter(schema=parquet_file.schema_arrow, path=destination, stream=stream) as writer: for record_batch in parquet_file.iter_batches(): pa_table = pa.Table.from_batches([record_batch]) writer.write_table(pa_table) diff --git a/src/datasets/table.py b/src/datasets/table.py index de896218191..43aa228278f 100644 --- a/src/datasets/table.py +++ b/src/datasets/table.py @@ -1,6 +1,5 @@ import copy import os -import warnings from functools import partial from itertools import groupby from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Tuple, TypeVar, Union @@ -8,6 +7,7 @@ import numpy as np import pyarrow as pa import pyarrow.compute as pc +import pyarrow.types from . import config from .utils.logging import get_logger @@ -239,11 +239,7 @@ def to_pylist(self, *args, **kwargs): Returns: `list` """ - try: - return self.table.to_pylist(*args, **kwargs) - except AttributeError: # pyarrow <7 does not have to_pylist, so we use to_pydict - pydict = self.table.to_pydict(*args, **kwargs) - return [{k: pydict[k][i] for k in pydict} for i in range(len(self.table))] + return self.table.to_pylist(*args, **kwargs) def to_pandas(self, *args, **kwargs): """ @@ -1801,102 +1797,35 @@ def wrapper(array, *args, **kwargs): return wrapper -def _is_extension_type(pa_type: pa.DataType) -> bool: - """ - Check (recursively) if a pyarrow type is an extension type. - """ - if isinstance(pa_type, pa.StructType): - return any(_is_extension_type(field.type) for field in pa_type) - elif isinstance(pa_type, (pa.ListType, pa.FixedSizeListType, pa.LargeListType)): - return _is_extension_type(pa_type.value_type) - elif isinstance(pa_type, pa.ExtensionType): - return True - else: - return False - - -def array_concat(arrays: List[pa.Array]): - """Improved version of pa.concat_arrays +def _are_list_values_of_length(array: pa.ListArray, length: int) -> bool: + """Check if all the sub-lists of a `pa.ListArray` have the specified length.""" + return pc.all(pc.equal(array.value_lengths(), length)).as_py() or array.null_count == len(array) - It supports concatenating pa.ExtensionArray objects by concatenating the underlying storages. - Args: - arrays (List[pa.Array]): List of arrays to contatenate - - Raises: - pa.ArrowInvalid: if the arrow array concatenation fails - ValueError: if the list of arrays is empty - TypeError: if the arrays to be concatenated have different types +def _combine_list_array_offsets_with_mask(array: pa.ListArray) -> pa.Array: + """Add the null bitmap to the offsets of a `pa.ListArray`.""" + offsets = array.offsets + if array.null_count > 0: + offsets = pa.concat_arrays( + [ + pc.replace_with_mask(offsets[:-1], array.is_null(), pa.nulls(len(array), pa.int32())), + offsets[-1:], + ] + ) + return offsets - Returns: - array (:obj:`pyarrow.Array`): the concatenated array - """ - arrays = list(arrays) - array_types = {array.type for array in arrays} - - if not array_types: - raise ValueError("Couldn't concatenate empty list of arrays") - if len(array_types) > 1: - array_types = list(array_types) - raise TypeError(f"Couldn't concatenate arrays with different types {array_types[0]} and {array_types[1]}") - - array_type = arrays[0].type - arrays = [chunk for arr in arrays for chunk in (arr.chunks if isinstance(arr, pa.ChunkedArray) else (arr,))] - - if not _is_extension_type(array_type): - return pa.concat_arrays(arrays) - - def _offsets_concat(offsets): - offset = offsets[0] - concatenated_offsets = offset - for offset in offsets[1:]: - offset = pc.subtract(offset, offset[0]) - offset = pc.add(offset[1:], concatenated_offsets[-1]) - concatenated_offsets = pa.concat_arrays([concatenated_offsets, offset]) - return concatenated_offsets - - def _concat_arrays(arrays): - array_type = arrays[0].type - if isinstance(array_type, pa.ExtensionType): - return array_type.wrap_array(_concat_arrays([array.storage for array in arrays])) - elif pa.types.is_struct(array_type): - return pa.StructArray.from_arrays( - [_concat_arrays([array.field(field.name) for array in arrays]) for field in array_type], - fields=list(array_type), - mask=pa.concat_arrays([array.is_null() for array in arrays]), - ) - elif pa.types.is_list(array_type): - if any(array.null_count > 0 for array in arrays): - if config.PYARROW_VERSION.major < 10: - warnings.warn( - "None values are converted to empty lists in `pyarrow<10.0.0` when concatenating list arrays with None values. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays( - _offsets_concat([array.offsets for array in arrays]), - _concat_arrays([array.values for array in arrays]), - mask=pa.concat_arrays([array.is_null() for array in arrays]), - ) - return pa.ListArray.from_arrays( - _offsets_concat([array.offsets for array in arrays]), - _concat_arrays([array.values for array in arrays]), - ) - elif pa.types.is_fixed_size_list(array_type): - if config.PYARROW_VERSION.major < 15: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - return pa.FixedSizeListArray.from_arrays( - _concat_arrays([array.values[array.offset * array.type.list_size :] for array in arrays]), - array_type.list_size, - ) - else: - return pa.FixedSizeListArray.from_arrays( - _concat_arrays([array.values for array in arrays]), - array_type.value_type, - array_type.list_size, - ) - return pa.concat_arrays(arrays) - return _concat_arrays(arrays) +def _storage_type(type: pa.DataType) -> pa.DataType: + """Convert a (possibly nested) `pa.ExtensionType` to its storage type.""" + if isinstance(type, pa.ExtensionType): + return _storage_type(type.storage_type) + elif isinstance(type, pa.StructType): + return pa.struct([pa.field(field.name, _storage_type(field.type)) for field in type]) + elif isinstance(type, pa.ListType): + return pa.list_(_storage_type(type.value_type)) + elif isinstance(type, pa.FixedSizeListType): + return pa.list_(_storage_type(type.value_type), type.list_size) + return type @_wrap_for_chunked_arrays @@ -1941,44 +1870,59 @@ def array_cast(array: pa.Array, pa_type: pa.DataType, allow_number_to_str=True): return pa.StructArray.from_arrays(arrays, fields=list(pa_type), mask=array.is_null()) elif pa.types.is_list(array.type): if pa.types.is_fixed_size_list(pa_type): - if pa_type.list_size * len(array) == len(array.values): - return pa.FixedSizeListArray.from_arrays( - _c(array.values, pa_type.value_type), - pa_type.list_size, - ) - elif pa.types.is_list(pa_type): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {pa_type}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) + if _are_list_values_of_length(array, pa_type.list_size): + if array.null_count > 0: + # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array + array_type = array.type + storage_type = _storage_type(array_type) + if array_type != storage_type: + # Temporarily convert to the storage type to support extension types in the slice operation + array = _c(array, storage_type) + array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) + array = _c(array, array_type) + else: + array = pc.list_slice(array, 0, pa_type.list_size, return_fixed_size_list=True) + array_values = array.values + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa_type, + len(array), + [array.is_valid().buffers()[1]], + children=[_c(array_values, pa_type.value_type)], + ) + else: + return pa.FixedSizeListArray.from_arrays( + _c(array_values, pa_type.value_type), pa_type.list_size, mask=array.is_null() + ) else: - return pa.ListArray.from_arrays( - array.offsets, _c(array.values, pa_type.value_type), mask=array.is_null() - ) - return pa.ListArray.from_arrays(array.offsets, _c(array.values, pa_type.value_type)) + array_values = array.values[ + array.offset * pa_type.length : (array.offset + len(array)) * pa_type.length + ] + return pa.FixedSizeListArray.from_arrays(_c(array_values, pa_type.value_type), pa_type.list_size) + elif pa.types.is_list(pa_type): + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + array_offsets = _combine_list_array_offsets_with_mask(array) + return pa.ListArray.from_arrays(array_offsets, _c(array.values, pa_type.value_type)) elif pa.types.is_fixed_size_list(array.type): - array_values = array.values - if config.PYARROW_VERSION.major < 15: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[array.offset * array.type.list_size :] if pa.types.is_fixed_size_list(pa_type): - return pa.FixedSizeListArray.from_arrays( - _c(array_values, pa_type.value_type), - pa_type.list_size, - ) - elif pa.types.is_list(pa_type): - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {pa_type}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." + if pa_type.list_size == array.type.list_size: + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa_type, + len(array), + [array.is_valid().buffers()[1]], + children=[_c(array_values, pa_type.value_type)], ) else: - return pa.ListArray.from_arrays( - offsets_arr, _c(array_values, pa_type.value_type), mask=array.is_null() + return pa.FixedSizeListArray.from_arrays( + _c(array_values, pa_type.value_type), pa_type.list_size, mask=array.is_null() ) - return pa.ListArray.from_arrays(offsets_arr, _c(array_values, pa_type.value_type)) + elif pa.types.is_list(pa_type): + array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size + return pa.ListArray.from_arrays(array_offsets, _c(array.values, pa_type.value_type), mask=array.is_null()) else: if ( not allow_number_to_str @@ -2042,69 +1986,79 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) if isinstance(feature, list): - casted_values = _c(array.values, feature[0]) - if casted_values.type == array.values.type: + casted_array_values = _c(array.values, feature[0]) + if casted_array_values.type == array.values.type: return array else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." - ) - else: - return pa.ListArray.from_arrays(array.offsets, casted_values, mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, casted_values) + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + array_offsets = _combine_list_array_offsets_with_mask(array) + return pa.ListArray.from_arrays(array_offsets, casted_array_values) elif isinstance(feature, Sequence): if feature.length > -1: - if feature.length * len(array) == len(array.values): - return pa.FixedSizeListArray.from_arrays(_c(array.values, feature.feature), feature.length) - else: - casted_values = _c(array.values, feature.feature) - if casted_values.type == array.values.type: - return array - else: + if _are_list_values_of_length(array, feature.length): if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists in `pyarrow<10.0.0` when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676." + # Ensure each null value in the array translates to [null] * pa_type.list_size in the array's values array + array_type = array.type + storage_type = _storage_type(array_type) + if array_type != storage_type: + # Temporarily convert to the storage type to support extension types in the slice operation + array = array_cast(array, storage_type, allow_number_to_str=allow_number_to_str) + array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) + array = array_cast(array, array_type, allow_number_to_str=allow_number_to_str) + else: + array = pc.list_slice(array, 0, feature.length, return_fixed_size_list=True) + array_values = array.values + casted_array_values = _c(array_values, feature.feature) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa.list_(casted_array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[casted_array_values], ) else: - return pa.ListArray.from_arrays( - array.offsets, _c(array.values, feature.feature), mask=array.is_null() + return pa.FixedSizeListArray.from_arrays( + casted_array_values, feature.length, mask=array.is_null() ) - return pa.ListArray.from_arrays(array.offsets, _c(array.values, feature.feature)) + else: + array_values = array.values[ + array.offset * feature.length : (array.offset + len(array)) * feature.length + ] + return pa.FixedSizeListArray.from_arrays(_c(array_values, feature.feature), feature.length) + else: + casted_array_values = _c(array.values, feature.feature) + if casted_array_values.type == array.values.type: + return array + else: + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + array_offsets = _combine_list_array_offsets_with_mask(array) + return pa.ListArray.from_arrays(array_offsets, casted_array_values) elif pa.types.is_fixed_size_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) - array_values = array.values if isinstance(feature, list): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays(array.offsets, _c(array_values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, _c(array_values, feature[0])) + array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size + return pa.ListArray.from_arrays(array_offsets, _c(array.values, feature[0]), mask=array.is_null()) elif isinstance(feature, Sequence): if feature.length > -1: - if array.offset and feature.length * len(array) != len(array_values): + if feature.length == array.type.list_size: array_values = array.values[ array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size ] - if feature.length * len(array) == len(array_values): - return pa.FixedSizeListArray.from_arrays(_c(array_values, feature.feature), feature.length) - else: - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when converting array to {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" + casted_array_values = _c(array_values, feature.feature) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa.list_(casted_array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[casted_array_values], ) else: - return pa.ListArray.from_arrays( - offsets_arr, _c(array_values, feature.feature), mask=array.is_null() + return pa.FixedSizeListArray.from_arrays( + casted_array_values, feature.length, mask=array.is_null() ) - return pa.ListArray.from_arrays(offsets_arr, _c(array_values, feature.feature)) + else: + array_offsets = (np.arange(len(array) + 1) + array.offset) * array.type.list_size + return pa.ListArray.from_arrays(array_offsets, _c(array.values, feature.feature), mask=array.is_null()) if pa.types.is_null(array.type): return array_cast(array, get_nested_type(feature), allow_number_to_str=allow_number_to_str) elif not isinstance(feature, (Sequence, dict, list, tuple)): @@ -2116,7 +2070,7 @@ def cast_array_to_feature(array: pa.Array, feature: "FeatureType", allow_number_ def embed_array_storage(array: pa.Array, feature: "FeatureType"): """Embed data into an arrays's storage. For custom features like Audio or Image, it takes into account the "embed_storage" methods - they defined to enable embedding external data (e.g. an image file) into an other arrow types. + they define to embed external data (e.g. an image file) into an array. @@ -2153,65 +2107,28 @@ def embed_array_storage(array: pa.Array, feature: "FeatureType"): return pa.StructArray.from_arrays(arrays, names=list(feature), mask=array.is_null()) elif pa.types.is_list(array.type): # feature must be either [subfeature] or Sequence(subfeature) + # Merge offsets with the null bitmap to avoid the "Null bitmap with offsets slice not supported" ArrowNotImplementedError + array_offsets = _combine_list_array_offsets_with_mask(array) if isinstance(feature, list): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature[0])) - elif isinstance(feature, Sequence): - if feature.length > -1: - if feature.length * len(array) == len(array.values): - return pa.FixedSizeListArray.from_arrays(_e(array.values, feature.feature), feature.length) - else: - casted_values = _e(array.values, feature.feature) - if casted_values.type == array.values.type: - return array - else: - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays( - array.offsets, _e(array.values, feature.feature), mask=array.is_null() - ) - return pa.ListArray.from_arrays(array.offsets, _e(array.values, feature.feature)) + return pa.ListArray.from_arrays(array_offsets, _e(array.values, feature[0])) + if isinstance(feature, Sequence) and feature.length == -1: + return pa.ListArray.from_arrays(array_offsets, _e(array.values, feature.feature)) elif pa.types.is_fixed_size_list(array.type): - # feature must be either [subfeature] or Sequence(subfeature) - array_values = array.values - if config.PYARROW_VERSION.major < 15: - # PyArrow bug: https://github.com/apache/arrow/issues/35360 - array_values = array.values[array.offset * array.type.list_size :] - if isinstance(feature, list): - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays(array.offsets, _e(array_values, feature[0]), mask=array.is_null()) - return pa.ListArray.from_arrays(array.offsets, _e(array_values, feature[0])) - elif isinstance(feature, Sequence): - if feature.length > -1: - if feature.length * len(array) == len(array_values): - return pa.FixedSizeListArray.from_arrays(_e(array_values, feature.feature), feature.length) + # feature must be Sequence(subfeature) + if isinstance(feature, Sequence) and feature.length > -1: + array_values = array.values[ + array.offset * array.type.list_size : (array.offset + len(array)) * array.type.list_size + ] + embedded_array_values = _e(array_values, feature.feature) + if config.PYARROW_VERSION.major < 15: + return pa.Array.from_buffers( + pa.list_(array_values.type, feature.length), + len(array), + [array.is_valid().buffers()[1]], + children=[embedded_array_values], + ) else: - offsets_arr = pa.array(np.arange(len(array) + 1) * array.type.list_size, pa.int32()) - if array.null_count > 0: - if config.PYARROW_VERSION.major < 10: - warnings.warn( - f"None values are converted to empty lists when embedding array storage with {feature}. Install `pyarrow>=10.0.0` to avoid this behavior. More info: https://github.com/huggingface/datasets/issues/3676. This will raise an error in a future major version of `datasets`" - ) - else: - return pa.ListArray.from_arrays( - offsets_arr, _e(array_values, feature.feature), mask=array.is_null() - ) - return pa.ListArray.from_arrays(offsets_arr, _e(array_values, feature.feature)) + return pa.FixedSizeListArray.from_arrays(embedded_array_values, feature.length, mask=array.is_null()) if not isinstance(feature, (Sequence, dict, list, tuple)): return array raise TypeError(f"Couldn't embed array of type\n{array.type}\nwith\n{feature}") diff --git a/tests/test_beam.py b/tests/test_beam.py index 3cc91d4e452..aecfed5560b 100644 --- a/tests/test_beam.py +++ b/tests/test_beam.py @@ -105,7 +105,7 @@ def test_download_and_prepare_sharded(self): self.assertTrue( os.path.exists( os.path.join( - tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-00002.arrow" + tmp_cache_dir, builder.name, "default", "0.0.0", f"{builder.name}-train-00001-of-00002.arrow" ) ) ) diff --git a/tests/test_table.py b/tests/test_table.py index ba6dfbfb5f2..7db58037245 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -1,15 +1,13 @@ import copy import pickle -import warnings from typing import List, Union import numpy as np import pyarrow as pa import pytest -import datasets from datasets import Sequence, Value -from datasets.features.features import Array2D, Array2DExtensionType, ClassLabel, Features, Image +from datasets.features.features import Array2D, Array2DExtensionType, ClassLabel, Features, Image, get_nested_type from datasets.table import ( ConcatenationTable, InMemoryTable, @@ -19,9 +17,7 @@ _in_memory_arrow_table_from_buffer, _in_memory_arrow_table_from_file, _interpolation_search, - _is_extension_type, _memory_mapped_arrow_table_from_file, - array_concat, cast_array_to_feature, concat_tables, embed_array_storage, @@ -1081,35 +1077,6 @@ def test_indexed_table_mixin(): assert table.fast_slice(2, 13) == pa_table.slice(2, 13) -@pytest.mark.parametrize( - "arrays", - [ - [pa.array([[1, 2, 3, 4]]), pa.array([[10, 2]])], - [ - pa.array([[[1, 2], [3]]], pa.list_(pa.list_(pa.int32()), 2)), - pa.array([[[10, 2, 3], [2]]], pa.list_(pa.list_(pa.int32()), 2)), - ], - [pa.array([[[1, 2, 3]], [[2, 3], [20, 21]], [[4]]]).slice(1), pa.array([[[1, 2, 3]]])], - ], -) -def test_concat_arrays(arrays): - assert array_concat(arrays) == pa.concat_arrays(arrays) - - -def test_concat_arrays_nested_with_nulls(): - arrays = [pa.array([{"a": 21, "b": [[1, 2], [3]]}]), pa.array([{"a": 100, "b": [[1], None]}])] - concatenated_arrays = array_concat(arrays) - assert concatenated_arrays == pa.array([{"a": 21, "b": [[1, 2], [3]]}, {"a": 100, "b": [[1], None]}]) - - -def test_concat_extension_arrays(): - arrays = [pa.array([[[1, 2], [3, 4]]]), pa.array([[[10, 2], [3, 4]]])] - extension_type = Array2DExtensionType((2, 2), "int64") - assert array_concat([extension_type.wrap_array(array) for array in arrays]) == extension_type.wrap_array( - pa.concat_arrays(arrays) - ) - - def test_cast_array_to_features(): arr = pa.array([[0, 1]]) assert cast_array_to_feature(arr, Sequence(Value("string"))).type == pa.list_(pa.string()) @@ -1130,28 +1097,17 @@ def test_cast_array_to_features_to_nested_with_no_fields(): assert cast_array_to_feature(arr, {}).to_pylist() == arr.to_pylist() -def test_cast_array_to_features_nested_with_null_values(): +def test_cast_array_to_features_nested_with_nulls(): # same type arr = pa.array([{"foo": [None, [0]]}], pa.struct({"foo": pa.list_(pa.list_(pa.int64()))})) casted_array = cast_array_to_feature(arr, {"foo": [[Value("int64")]]}) assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int64()))}) assert casted_array.to_pylist() == arr.to_pylist() - # different type arr = pa.array([{"foo": [None, [0]]}], pa.struct({"foo": pa.list_(pa.list_(pa.int64()))})) - if datasets.config.PYARROW_VERSION.major < 10: - with pytest.warns(UserWarning, match="None values are converted to empty lists.+"): - casted_array = cast_array_to_feature(arr, {"foo": [[Value("int32")]]}) - assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int32()))}) - assert casted_array.to_pylist() == [ - {"foo": [[], [0]]} - ] # empty list because of https://github.com/huggingface/datasets/issues/3676 - else: - with warnings.catch_warnings(): - warnings.simplefilter("error") - casted_array = cast_array_to_feature(arr, {"foo": [[Value("int32")]]}) - assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int32()))}) - assert casted_array.to_pylist() == [{"foo": [None, [0]]}] + casted_array = cast_array_to_feature(arr, {"foo": [[Value("int32")]]}) + assert casted_array.type == pa.struct({"foo": pa.list_(pa.list_(pa.int32()))}) + assert casted_array.to_pylist() == [{"foo": [None, [0]]}] def test_cast_array_to_features_to_null_type(): @@ -1199,23 +1155,67 @@ def test_cast_array_to_features_sequence_classlabel(): assert cast_array_to_feature(arr, Sequence(ClassLabel(names=["foo", "bar"]))) -def test_cast_fixed_size_array_to_features_sequence(): - arr = pa.array([[0, 1, 2], [3, 4, 5], [6, 7, 8]], pa.list_(pa.int32(), 3)) +@pytest.mark.parametrize( + "arr", + [ + pa.array([[0, 1, 2], [3, None, 5], None, [6, 7, 8], None], pa.list_(pa.int32(), 3)), + ], +) +@pytest.mark.parametrize("slice", [None, slice(1, None), slice(-1), slice(1, 3), slice(2, 3), slice(1, 1)]) +@pytest.mark.parametrize("target_value_feature", [Value("int64")]) +def test_cast_fixed_size_list_array_to_features_sequence(arr, slice, target_value_feature): + arr = arr if slice is None else arr[slice] # Fixed size list - casted_array = cast_array_to_feature(arr, Sequence(Value("int64"), length=3)) - assert casted_array.type == pa.list_(pa.int64(), 3) + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=arr.type.list_size)) + assert casted_array.to_pylist() == arr.to_pylist() + with pytest.raises(TypeError): + cast_array_to_feature(arr, Sequence(target_value_feature, length=arr.type.list_size + 1)) + # Variable size list + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature)) + assert casted_array.to_pylist() == arr.to_pylist() + casted_array = cast_array_to_feature(arr, [target_value_feature]) + assert casted_array.type == get_nested_type([target_value_feature]) assert casted_array.to_pylist() == arr.to_pylist() + + +@pytest.mark.parametrize( + "arr", + [ + pa.array([[0, 1, 2], [3, None, 5], None, [6, 7, 8], None], pa.list_(pa.int32())), + ], +) +@pytest.mark.parametrize("slice", [None, slice(1, None), slice(-1), slice(1, 3), slice(2, 3), slice(1, 1)]) +@pytest.mark.parametrize("target_value_feature", [Value("int64")]) +def test_cast_list_array_to_features_sequence(arr, slice, target_value_feature): + arr = arr if slice is None else arr[slice] # Variable size list - casted_array = cast_array_to_feature(arr, Sequence(Value("int64"))) - assert casted_array.type == pa.list_(pa.int64()) + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature)) + assert casted_array.to_pylist() == arr.to_pylist() + casted_array = cast_array_to_feature(arr, [target_value_feature]) + assert casted_array.type == get_nested_type([target_value_feature]) + assert casted_array.to_pylist() == arr.to_pylist() + # Fixed size list + list_size = arr.value_lengths().drop_null()[0].as_py() if arr.value_lengths().drop_null() else 2 + casted_array = cast_array_to_feature(arr, Sequence(target_value_feature, length=list_size)) + assert casted_array.type == get_nested_type(Sequence(target_value_feature, length=list_size)) assert casted_array.to_pylist() == arr.to_pylist() -def test_cast_sliced_fixed_size_array_to_features(): - arr = pa.array([[0, 1, 2], [3, 4, 5], [6, 7, 8]], pa.list_(pa.int32(), 3)) - casted_array = cast_array_to_feature(arr[1:], Sequence(Value("int64"), length=3)) - assert casted_array.type == pa.list_(pa.int64(), 3) - assert casted_array.to_pylist() == arr[1:].to_pylist() +def test_cast_array_xd_to_features_sequence(): + arr = np.random.randint(0, 10, size=(8, 2, 3)).tolist() + arr = Array2DExtensionType(shape=(2, 3), dtype="int64").wrap_array(pa.array(arr, pa.list_(pa.list_(pa.int64())))) + arr = pa.ListArray.from_arrays([0, None, 4, 8], arr) + # Variable size list + casted_array = cast_array_to_feature(arr, Sequence(Array2D(shape=(2, 3), dtype="int32"))) + assert casted_array.type == get_nested_type(Sequence(Array2D(shape=(2, 3), dtype="int32"))) + assert casted_array.to_pylist() == arr.to_pylist() + # Fixed size list + casted_array = cast_array_to_feature(arr, Sequence(Array2D(shape=(2, 3), dtype="int32"), length=4)) + assert casted_array.type == get_nested_type(Sequence(Array2D(shape=(2, 3), dtype="int32"), length=4)) + assert casted_array.to_pylist() == arr.to_pylist() def test_embed_array_storage(image_file): @@ -1268,17 +1268,3 @@ def test_table_iter(table, batch_size, drop_last_batch): if num_rows > 0: reloaded = pa.concat_tables(subtables) assert table.slice(0, num_rows).to_pydict() == reloaded.to_pydict() - - -@pytest.mark.parametrize( - "pa_type, expected", - [ - (pa.int8(), False), - (pa.struct({"col1": pa.int8(), "col2": pa.int64()}), False), - (pa.struct({"col1": pa.list_(pa.int8()), "col2": Array2DExtensionType((1, 3), "int64")}), True), - (pa.list_(pa.int8()), False), - (pa.list_(Array2DExtensionType((1, 3), "int64"), 4), True), - ], -) -def test_is_extension_type(pa_type, expected): - assert _is_extension_type(pa_type) == expected