-
Notifications
You must be signed in to change notification settings - Fork 1k
[REVIEW] Enable writing to s3 storage in chunked parquet writer
#10769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a9ff579
8176093
7569db6
c523d19
6b01d73
9d3f9be
d05c339
a79e6d5
a11b658
aa7e050
23eccb6
867b8de
14a100a
3591b3e
75c5de3
e42796d
2a743a2
a11f507
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,19 @@ | ||
| # Copyright (c) 2019-2022, NVIDIA CORPORATION. | ||
|
|
||
| import shutil | ||
| import tempfile | ||
| import warnings | ||
| from collections import defaultdict | ||
| from contextlib import ExitStack | ||
| from typing import Dict, List, Tuple | ||
| from uuid import uuid4 | ||
|
|
||
| import numpy as np | ||
|
|
||
| try: | ||
| import s3fs | ||
| except (ImportError, ModuleNotFoundError): | ||
| s3fs = None | ||
| from pyarrow import dataset as ds, parquet as pq | ||
|
|
||
| import cudf | ||
|
|
@@ -206,12 +213,26 @@ def _process_dataset( | |
| filters = pq._filters_to_expression(filters) | ||
|
|
||
| # Initialize ds.FilesystemDataset | ||
| dataset = ds.dataset( | ||
| paths, | ||
| filesystem=fs, | ||
| format="parquet", | ||
| partitioning="hive", | ||
| ) | ||
| if ( | ||
| s3fs is not None | ||
| and isinstance(fs, s3fs.S3FileSystem) | ||
| and len(paths) == 1 | ||
| and fs.isdir(paths[0]) | ||
| ): | ||
| # TODO: Remove this workaround after following bug is fixed: | ||
| # https://issues.apache.org/jira/browse/ARROW-16438 | ||
| dataset = ds.dataset( | ||
| "s3://" + paths[0], | ||
| format="parquet", | ||
| partitioning="hive", | ||
| ) | ||
| else: | ||
| dataset = ds.dataset( | ||
| paths, | ||
| filesystem=fs, | ||
| format="parquet", | ||
| partitioning="hive", | ||
| ) | ||
| file_list = dataset.files | ||
| if len(file_list) == 0: | ||
| raise FileNotFoundError(f"{paths} could not be resolved to any files") | ||
|
|
@@ -724,6 +745,7 @@ def __init__( | |
| index=None, | ||
| compression=None, | ||
| statistics="ROWGROUP", | ||
| **kwargs, | ||
| ) -> None: | ||
| """ | ||
| Write a parquet file or dataset incrementally | ||
|
|
@@ -776,7 +798,12 @@ def __init__( | |
| <filename>.parquet | ||
|
|
||
| """ | ||
| self.path = path | ||
| if isinstance(path, str) and path.startswith("s3://"): | ||
| self.fs_meta = {"is_s3": True, "actual_path": path} | ||
| self.path = tempfile.TemporaryDirectory().name | ||
| else: | ||
| self.fs_meta = {} | ||
| self.path = path | ||
galipremsagar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.common_args = { | ||
| "index": index, | ||
| "compression": compression, | ||
|
|
@@ -792,6 +819,7 @@ def __init__( | |
| # in self._chunked_writers for reverse lookup | ||
| self.path_cw_map: Dict[str, int] = {} | ||
| self.filename = None | ||
| self.kwargs = kwargs | ||
|
|
||
| @_cudf_nvtx_annotate | ||
| def write_table(self, df): | ||
|
|
@@ -837,18 +865,19 @@ def write_table(self, df): | |
| ] | ||
| cw.write_table(grouped_df, this_cw_part_info) | ||
|
|
||
| # Create new cw for unhandled paths encountered in this write_table | ||
| new_paths, part_info, meta_paths = zip(*new_cw_paths) | ||
| self._chunked_writers.append( | ||
| ( | ||
| ParquetWriter(new_paths, **self.common_args), | ||
| new_paths, | ||
| meta_paths, | ||
| if new_cw_paths: | ||
| # Create new cw for unhandled paths encountered in this write_table | ||
| new_paths, part_info, meta_paths = zip(*new_cw_paths) | ||
| self._chunked_writers.append( | ||
| ( | ||
| ParquetWriter(new_paths, **self.common_args), | ||
| new_paths, | ||
| meta_paths, | ||
| ) | ||
|
Comment on lines
-1054
to
+1076
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it would be possible fto wrap these If I understand correctly (which I may not), we should be able to write directly to s3 without the file-copy workaround.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this was my initial goto-approach(same as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay - I still have a suspicion that we can make this work, but I do think the approach you are using here is reasonable - So, I can experiment and follow up separately. My question here is definitely not a blocker :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will be happy to chat about the findings. |
||
| ) | ||
| ) | ||
| new_cw_idx = len(self._chunked_writers) - 1 | ||
| self.path_cw_map.update({k: new_cw_idx for k in new_paths}) | ||
| self._chunked_writers[-1][0].write_table(grouped_df, part_info) | ||
| new_cw_idx = len(self._chunked_writers) - 1 | ||
| self.path_cw_map.update({k: new_cw_idx for k in new_paths}) | ||
| self._chunked_writers[-1][0].write_table(grouped_df, part_info) | ||
|
|
||
| @_cudf_nvtx_annotate | ||
| def close(self, return_metadata=False): | ||
|
|
@@ -862,6 +891,15 @@ def close(self, return_metadata=False): | |
| for cw, _, meta_path in self._chunked_writers | ||
| ] | ||
|
|
||
| if self.fs_meta.get("is_s3", False): | ||
| local_path = self.path | ||
| s3_path = self.fs_meta["actual_path"] | ||
| s3_file, _ = ioutils._get_filesystem_and_paths( | ||
| s3_path, **self.kwargs | ||
| ) | ||
| s3_file.put(local_path, s3_path, recursive=True) | ||
| shutil.rmtree(self.path) | ||
|
|
||
| if return_metadata: | ||
| return ( | ||
| merge_parquet_filemetadata(metadata) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.