-
Notifications
You must be signed in to change notification settings - Fork 3k
Download and prepare as Parquet for cloud storage #4724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The documentation is not available anymore as the PR was closed or merged. |
| from fsspec.implementations.local import AbstractFileSystem, LocalFileSystem, stringify_path | ||
|
|
||
|
|
||
| class MockFileSystem(AbstractFileSystem): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this mockfs fixture that is not only read but also write, this way we can do all our tests based on fsspec using this one :) @albertvillanova
mariosasko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! Some nits:
Co-authored-by: Mario Šaško <[email protected]>
philschmid
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't reviewed the code yet but left some comments regarding the DX. I think to increase the usage of datasets within DS/Research (who are currently using other libraries because of better cloud storage integrations) we have to provide a similar API to what they are used to/datasets have from Hub datasets, e.g. loading datasets from cloud storage with pandas is similar to loading datasets with pandas from a local disk.
docs/source/filesystems.mdx
Outdated
| >>> builder = load_dataset_builder("csv", data_files=data_files, cache_dir=cache_dir, storage_options=storage_options) | ||
| >>> builder.download_and_prepare(file_format="parquet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a UX perspective it would be cool if i could do
# singe file
ds = load_dataset("s3://my-bucket/datasets-cache/train.csv",storage_options=storage_options)
# multi
ds = load_dataset("s3://my-bucket/datasets-cache",data_files={"train":["path/to/train.csv"]},storage_options=storage_options)But not sure if this is possible to implement. Thats for example also how pandas is doing it
data = pd.read_csv('s3://bucket....csv')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup this is definitely doable :)
It sounds super intuitive and practical to use, thanks !
| >>> dataset = load_from_disk('gcs://my-private-datasets/imdb/train', fs=gcs) | ||
| ```py | ||
| # saves encoded_dataset to amazon s3 | ||
| >>> encoded_dataset.save_to_disk("s3://my-private-datasets/imdb/train", fs=fs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my comment above it would be nice to use the more "native" save methods, like to_csv, to_parquetetc...
,e.g.
encoded_dataset.to_csv("s3://my-bucket/dataset.csv",storage_options=storage_options)similar to pandas as well.
|
Just noticed that it would be more convenient to pass the output dir to download_and_prepare directly, to bypass the caching logic which prepares the dataset at builder = load_datadet_builder("squad")
# or with a custom cache
builder = load_datadet_builder("squad", cache_dir="path/to/local/cache/for/downloaded/files")
# download and prepare to s3
builder.download_and_prepare("s3://my_bucket/squad") |
|
Having thought about it a bit more, I also agree with @philschmid in that it's important to follow the existing APIs (pandas/dask), which means we should support the following at some point:
IMO these are the two main issues with the current approach:
|
|
Alright I did the last change I wanted to do, here is the final API: builder = load_dataset_builder(...)
builder.download_and_prepare("s3://...", storage_options={"token": ...})and it creates the arrow files directly in the specified directory, not in a nested subdirectory structure as we do in the cache !
Yup this can be explored in some future work I think. Though to keep things simple and clear I would keep the streaming behaviors only when you load a dataset in streaming mode, and not include it in ds = load_dataset(..., streaming=True)
ds.to_parquet("s3://...") |
albertvillanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lhoestq, awesome work!!! It is really important that we support multiple cloud storage.
Just a general comment about the documentation. Feel free to tell me if you don't agree.
I think the use of the word "loading" can be misleading: we are using it with different meanings:
- until now, "loading" was meaning "get" (load into memory or memory-map), contrary to "save" or "share"
- now we are using "loading" as "saving": "loading into a cloud storage", differently from "load a dataset from the HF Hub"
I think we should be clear and make the difference between:
- loading a dataset FROM a cloud storage, with an API as suggested by @philschmid
ds = load_dataset("s3://my-bucket/datasets-cache/train.csv",storage_options=storage_options)
- and saving it TO a cloud storage, with your proposal
albertvillanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some additional comments below.
| # cache_dir can be a remote bucket on GCS or S3 (when using BeamBasedBuilder for distributed data processing) | ||
| self._cache_dir_root = str(cache_dir or config.HF_DATASETS_CACHE) | ||
| self._cache_dir_root = ( | ||
| self._cache_dir_root if is_remote_url(self._cache_dir_root) else os.path.expanduser(self._cache_dir_root) | ||
| ) | ||
| path_join = posixpath.join if is_remote_url(self._cache_dir_root) else os.path.join | ||
| self._cache_dir_root = str(cache_dir) or os.path.expanduser(config.HF_DATASETS_CACHE) | ||
| self._cache_dir = self._build_cache_dir() | ||
| self._cache_downloaded_dir = ( | ||
| path_join(self._cache_dir_root, config.DOWNLOADED_DATASETS_DIR) | ||
| os.path.join(self._cache_dir_root, config.DOWNLOADED_DATASETS_DIR) | ||
| if cache_dir | ||
| else str(config.DOWNLOADED_DATASETS_PATH) | ||
| else os.path.expanduser(config.DOWNLOADED_DATASETS_PATH) | ||
| ) | ||
| self._cache_downloaded_dir = ( | ||
| self._cache_downloaded_dir | ||
| if is_remote_url(self._cache_downloaded_dir) | ||
| else os.path.expanduser(self._cache_downloaded_dir) | ||
| ) | ||
| self._cache_dir = self._build_cache_dir() | ||
| if not is_remote_url(self._cache_dir_root): | ||
| os.makedirs(self._cache_dir_root, exist_ok=True) | ||
| lock_path = os.path.join(self._cache_dir_root, self._cache_dir.replace(os.sep, "_") + ".lock") | ||
| with FileLock(lock_path): | ||
| if os.path.exists(self._cache_dir): # check if data exist | ||
| if len(os.listdir(self._cache_dir)) > 0: | ||
| logger.info("Overwrite dataset info from restored data version.") | ||
| self.info = DatasetInfo.from_directory(self._cache_dir) | ||
| else: # dir exists but no data, remove the empty dir as data aren't available anymore | ||
| logger.warning( | ||
| f"Old caching folder {self._cache_dir} for dataset {self.name} exists but not data were found. Removing it. " | ||
| ) | ||
| os.rmdir(self._cache_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you are reverting the changes introduced by:
Some concerns:
- What about if the user sets a remote
config.HF_DATASETS_CACHEorconfig.DOWNLOADED_DATASETS_DIR? - The
FileLockwas raising an error when working with a remote cache dir (BeamBasedBuilder)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch thanks !
The FileLock is should only be applied when the output directory is a local directory indeed
|
totally agree with your comment on the meaning of "loading", I'll update the docs |
Co-authored-by: Albert Villanova del Moral <[email protected]>
|
I took your comments into account and reverted all the changes related to |
mariosasko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks all good now! Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @lhoestq, awesome job!
Just a few cosmetic comments on the docs...

Download a dataset as Parquet in a cloud storage can be useful for streaming mode and to use with spark/dask/ray.
This PR adds support for
fsspecURIs likes3://...,gcs://...etc. and ads thefile_formatto save as parquet instead of arrow:EDIT: actually changed the API to
credentials to cloud storage can be passed using the
storage_optionsargument inFor consistency with the BeamBasedBuilder, I name the parquet files
{builder.name}-{split}-xxxxx-of-xxxxx.parquet. I think this is fine since we'll need to implement parquet sharding after this PR, so that a dataset can be used efficiently with dask for example.Note that images/audio files are not embedded yet in the parquet files, this will added in a subsequent PR
TODO: