-
Notifications
You must be signed in to change notification settings - Fork 1k
[REVIEW] Enable writing to s3 storage in chunked parquet writer
#10769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
a9ff579
8176093
7569db6
c523d19
6b01d73
9d3f9be
d05c339
a79e6d5
a11b658
aa7e050
23eccb6
867b8de
14a100a
3591b3e
75c5de3
e42796d
2a743a2
a11f507
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| # Copyright (c) 2019-2022, NVIDIA CORPORATION. | ||
|
|
||
| import shutil | ||
| import tempfile | ||
| import warnings | ||
| from collections import defaultdict | ||
| from contextlib import ExitStack | ||
|
|
@@ -206,12 +208,15 @@ def _process_dataset( | |
| filters = pq._filters_to_expression(filters) | ||
|
|
||
| # Initialize ds.FilesystemDataset | ||
| # TODO: Remove the if len(paths) workaround after following bug is fixed: | ||
| # https://issues.apache.org/jira/browse/ARROW-16438 | ||
| dataset = ds.dataset( | ||
| paths, | ||
| source=paths[0] if len(paths) == 1 else paths, | ||
| filesystem=fs, | ||
| format="parquet", | ||
| partitioning="hive", | ||
| ) | ||
|
|
||
| file_list = dataset.files | ||
| if len(file_list) == 0: | ||
| raise FileNotFoundError(f"{paths} could not be resolved to any files") | ||
|
|
@@ -716,6 +721,58 @@ def _get_partitioned( | |
|
|
||
|
|
||
| class ParquetDatasetWriter: | ||
| """ | ||
| Write a parquet file or dataset incrementally | ||
|
|
||
| Parameters | ||
| ---------- | ||
| path : str or s3 and URL | ||
galipremsagar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| A Local directory path/s3 URL. Will be used as Root Directory | ||
| path while writing a partitioned dataset. | ||
galipremsagar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| partition_cols : list | ||
| Column names by which to partition the dataset | ||
| Columns are partitioned in the order they are given | ||
| index : bool, default None | ||
| If ``True``, include the dataframe’s index(es) in the file output. | ||
galipremsagar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| If ``False``, they will not be written to the file. If ``None``, | ||
| index(es) other than RangeIndex will be saved as columns. | ||
| compression : {'snappy', None}, default 'snappy' | ||
| Name of the compression to use. Use ``None`` for no compression. | ||
| statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' | ||
| Level at which column statistics should be included in file. | ||
|
Comment on lines
+860
to
+863
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does compression accept
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I'll open a separate PR to standardize this. |
||
|
|
||
|
|
||
| Examples | ||
| -------- | ||
| Using a context | ||
|
|
||
| >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) | ||
| >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) | ||
| >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: | ||
| ... cw.write_table(df1) | ||
| ... cw.write_table(df2) | ||
|
|
||
| By manually calling ``close()`` | ||
|
|
||
| >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) | ||
| >>> cw.write_table(df1) | ||
| >>> cw.write_table(df2) | ||
| >>> cw.close() | ||
|
|
||
| Both the methods will generate the same directory structure | ||
|
|
||
| .. code-block:: bash | ||
galipremsagar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| dataset/ | ||
| a=1 | ||
| <filename>.parquet | ||
| a=2 | ||
| <filename>.parquet | ||
| a=3 | ||
| <filename>.parquet | ||
|
|
||
| """ | ||
|
|
||
| @_cudf_nvtx_annotate | ||
| def __init__( | ||
| self, | ||
|
|
@@ -724,59 +781,14 @@ def __init__( | |
| index=None, | ||
| compression=None, | ||
| statistics="ROWGROUP", | ||
| **kwargs, | ||
| ) -> None: | ||
| """ | ||
| Write a parquet file or dataset incrementally | ||
|
|
||
| Parameters | ||
| ---------- | ||
| path : str | ||
| File path or Root Directory path. Will be used as Root Directory | ||
| path while writing a partitioned dataset. | ||
| partition_cols : list | ||
| Column names by which to partition the dataset | ||
| Columns are partitioned in the order they are given | ||
| index : bool, default None | ||
| If ``True``, include the dataframe’s index(es) in the file output. | ||
| If ``False``, they will not be written to the file. If ``None``, | ||
| index(es) other than RangeIndex will be saved as columns. | ||
| compression : {'snappy', None}, default 'snappy' | ||
| Name of the compression to use. Use ``None`` for no compression. | ||
| statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' | ||
| Level at which column statistics should be included in file. | ||
|
|
||
|
|
||
| Examples | ||
| ________ | ||
| Using a context | ||
|
|
||
| >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) | ||
| >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) | ||
| >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: | ||
| ... cw.write_table(df1) | ||
| ... cw.write_table(df2) | ||
|
|
||
| By manually calling ``close()`` | ||
|
|
||
| >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) | ||
| >>> cw.write_table(df1) | ||
| >>> cw.write_table(df2) | ||
| >>> cw.close() | ||
|
|
||
| Both the methods will generate the same directory structure | ||
|
|
||
| .. code-block:: bash | ||
|
|
||
| dataset/ | ||
| a=1 | ||
| <filename>.parquet | ||
| a=2 | ||
| <filename>.parquet | ||
| a=3 | ||
| <filename>.parquet | ||
|
|
||
| """ | ||
| self.path = path | ||
| if isinstance(path, str) and path.startswith("s3://"): | ||
| self.fs_meta = {"is_s3": True, "actual_path": path} | ||
| self.path = tempfile.TemporaryDirectory().name | ||
| else: | ||
| self.fs_meta = {} | ||
| self.path = path | ||
galipremsagar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.common_args = { | ||
| "index": index, | ||
| "compression": compression, | ||
|
|
@@ -792,6 +804,7 @@ def __init__( | |
| # in self._chunked_writers for reverse lookup | ||
| self.path_cw_map: Dict[str, int] = {} | ||
| self.filename = None | ||
| self.kwargs = kwargs | ||
|
|
||
| @_cudf_nvtx_annotate | ||
| def write_table(self, df): | ||
|
|
@@ -837,18 +850,19 @@ def write_table(self, df): | |
| ] | ||
| cw.write_table(grouped_df, this_cw_part_info) | ||
|
|
||
| # Create new cw for unhandled paths encountered in this write_table | ||
| new_paths, part_info, meta_paths = zip(*new_cw_paths) | ||
| self._chunked_writers.append( | ||
| ( | ||
| ParquetWriter(new_paths, **self.common_args), | ||
| new_paths, | ||
| meta_paths, | ||
| if new_cw_paths: | ||
| # Create new cw for unhandled paths encountered in this write_table | ||
| new_paths, part_info, meta_paths = zip(*new_cw_paths) | ||
| self._chunked_writers.append( | ||
| ( | ||
| ParquetWriter(new_paths, **self.common_args), | ||
| new_paths, | ||
| meta_paths, | ||
| ) | ||
|
Comment on lines
-1054
to
+1076
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it would be possible fto wrap these If I understand correctly (which I may not), we should be able to write directly to s3 without the file-copy workaround.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this was my initial goto-approach(same as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay - I still have a suspicion that we can make this work, but I do think the approach you are using here is reasonable - So, I can experiment and follow up separately. My question here is definitely not a blocker :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will be happy to chat about the findings. |
||
| ) | ||
| ) | ||
| new_cw_idx = len(self._chunked_writers) - 1 | ||
| self.path_cw_map.update({k: new_cw_idx for k in new_paths}) | ||
| self._chunked_writers[-1][0].write_table(grouped_df, part_info) | ||
| new_cw_idx = len(self._chunked_writers) - 1 | ||
| self.path_cw_map.update({k: new_cw_idx for k in new_paths}) | ||
| self._chunked_writers[-1][0].write_table(grouped_df, part_info) | ||
|
|
||
| @_cudf_nvtx_annotate | ||
| def close(self, return_metadata=False): | ||
|
|
@@ -862,6 +876,15 @@ def close(self, return_metadata=False): | |
| for cw, _, meta_path in self._chunked_writers | ||
| ] | ||
|
|
||
| if self.fs_meta.get("is_s3", False): | ||
| local_path = self.path | ||
| s3_path = self.fs_meta["actual_path"] | ||
| s3_file, _ = ioutils._get_filesystem_and_paths( | ||
| s3_path, **self.kwargs | ||
| ) | ||
| s3_file.put(local_path, s3_path, recursive=True) | ||
| shutil.rmtree(self.path) | ||
|
|
||
| if return_metadata: | ||
| return ( | ||
| merge_parquet_filemetadata(metadata) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| import shlex | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| import socket | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| import subprocess | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from contextlib import contextmanager | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -18,10 +19,20 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||
| import cudf | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| from cudf.testing._utils import assert_eq | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| moto = pytest.importorskip("moto", minversion="1.3.14") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| moto = pytest.importorskip("moto", minversion="3.1.6") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| boto3 = pytest.importorskip("boto3") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| requests = pytest.importorskip("requests") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| s3fs = pytest.importorskip("s3fs") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| flask = pytest.importorskip("flask") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| flask_cors = pytest.importorskip("flask_cors") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
galipremsagar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.fixture(scope="session") | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| def endpoint_port(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Return a free port per worker session. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| sock = socket.socket() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| sock.bind(("", 0)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| return sock.getsockname()[1] | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| @pytest.fixture(scope="session") | |
| def endpoint_port(): | |
| # Return a free port per worker session. | |
| sock = socket.socket() | |
| sock.bind(("", 0)) | |
| return sock.getsockname()[1] | |
| @pytest.fixture(scope="session") | |
| def endpoint_port(): | |
| # Return a free port per worker session. | |
| with socket.socket() as sock: | |
| sock.bind(("127.0.0.1", 0)) | |
| yield sock.getsockname()[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did make the above change locally but it seems to error consistently, did it work for you?:
During handling of the above exception, another exception occurred:
s3_base = 'http://127.0.0.1:47913/', s3so = {'client_kwargs': {'endpoint_url': 'http://127.0.0.1:47913/'}}
pdf = Integer Float Integer2 String Boolean
0 2345 9.001 2345 Alpha True
1 11987 8.343 106 Beta False
2 9027 6.000 2088 Gamma True
3 9027 2.781 789277 Delta False
bytes_per_thread = 32
@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
fname = "test_csv_reader.csv"
bname = "csv"
buffer = pdf.to_csv(index=False)
# Use fsspec file object
> with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
python/cudf/cudf/tests/test_s3.py:150:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../envs/cudfdev/lib/python3.8/contextlib.py:113: in __enter__
return next(self.gen)
python/cudf/cudf/tests/test_s3.py:105: in s3_context
client.create_bucket(Bucket=bucket, ACL="public-read-write")
../envs/cudfdev/lib/python3.8/site-packages/botocore/client.py:395: in _api_call
return self._make_api_call(operation_name, kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/client.py:711: in _make_api_call
http, parsed_response = self._make_request(
../envs/cudfdev/lib/python3.8/site-packages/botocore/client.py:731: in _make_request
return self._endpoint.make_request(operation_model, request_dict)
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:107: in make_request
return self._send_request(request_dict, operation_model)
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:183: in _send_request
while self._needs_retry(attempts, operation_model, request_dict,
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:305: in _needs_retry
responses = self._event_emitter.emit(
../envs/cudfdev/lib/python3.8/site-packages/botocore/hooks.py:357: in emit
return self._emitter.emit(aliased_event_name, **kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/hooks.py:228: in emit
return self._emit(event_name, kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/hooks.py:211: in _emit
response = handler(**kwargs)
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:192: in __call__
if self._checker(**checker_kwargs):
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:265: in __call__
should_retry = self._should_retry(attempt_number, response,
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:292: in _should_retry
return self._checker(attempt_number, response, caught_exception)
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:331: in __call__
checker_response = checker(attempt_number, response,
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:231: in __call__
return self._check_caught_exception(
../envs/cudfdev/lib/python3.8/site-packages/botocore/retryhandler.py:374: in _check_caught_exception
raise caught_exception
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:249: in _do_get_response
http_response = self._send(request)
../envs/cudfdev/lib/python3.8/site-packages/botocore/endpoint.py:321: in _send
return self.http_session.send(request)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <botocore.httpsession.URLLib3Session object at 0x7efe37194bb0>
request = <AWSPreparedRequest stream_output=False, method=PUT, url=http://127.0.0.1:47913/csv, headers={'x-amz-acl': b'public-re...nvocation-id': b'23e9f9cc-531f-4d50-ad98-8ca7ab3f4a60', 'amz-sdk-request': b'attempt=5; max=5', 'Content-Length': '0'}>
def send(self, request):
try:
proxy_url = self._proxy_config.proxy_url_for(request.url)
manager = self._get_connection_manager(request.url, proxy_url)
conn = manager.connection_from_url(request.url)
self._setup_ssl_cert(conn, request.url, self._verify)
if ensure_boolean(
os.environ.get('BOTO_EXPERIMENTAL__ADD_PROXY_HOST_HEADER', '')
):
# This is currently an "experimental" feature which provides
# no guarantees of backwards compatibility. It may be subject
# to change or removal in any patch version. Anyone opting in
# to this feature should strictly pin botocore.
host = urlparse(request.url).hostname
conn.proxy_headers['host'] = host
request_target = self._get_request_target(request.url, proxy_url)
urllib_response = conn.urlopen(
method=request.method,
url=request_target,
body=request.body,
headers=request.headers,
retries=Retry(False),
assert_same_host=False,
preload_content=False,
decode_content=False,
chunked=self._chunked(request.headers),
)
http_response = botocore.awsrequest.AWSResponse(
request.url,
urllib_response.status,
urllib_response.headers,
urllib_response,
)
if not request.stream_output:
# Cause the raw stream to be exhausted immediately. We do it
# this way instead of using preload_content because
# preload_content will never buffer chunked responses
http_response.content
return http_response
except URLLib3SSLError as e:
raise SSLError(endpoint_url=request.url, error=e)
except (NewConnectionError, socket.gaierror) as e:
> raise EndpointConnectionError(endpoint_url=request.url, error=e)
E botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "http://127.0.0.1:47913/csv"
../envs/cudfdev/lib/python3.8/site-packages/botocore/httpsession.py:434: EndpointConnectionError
---------------------------------------------------- Captured stderr setup --------------------------------There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I've seen it close connections to the ports very consistently with the existing approach too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't test this before suggesting, but I'm not sure what's going wrong. I wonder if the context manager is yielding the port and immediately exiting the context / closing the resource? Might need to refresh myself on how pytest fixtures interact with context managers, or add some print statements to check what's happening. Alternatively, you can try this instead.
| @pytest.fixture(scope="session") | |
| def endpoint_port(): | |
| # Return a free port per worker session. | |
| sock = socket.socket() | |
| sock.bind(("", 0)) | |
| return sock.getsockname()[1] | |
| @pytest.fixture(scope="session") | |
| def endpoint_port(): | |
| # Return a free port per worker session. | |
| sock = socket.socket() | |
| sock.bind(("127.0.0.1", 0)) | |
| yield sock.getsockname()[1] | |
| sock.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.fixture(scope="session")
def endpoint_port():
# Return a free port per worker session.
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
yield sock.getsockname()[1]
sock.close()⬆️ This too gives the same error, but this works:
@pytest.fixture(scope="session")
def endpoint_port():
# Return a free port per worker session.
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.close()
return portI've pushed this change, let me know if that looks good to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're using moto in Stand-alone server mode. Is it possible to use the Python API for this server instead of calling subprocess? http://docs.getmoto.org/en/latest/docs/server_mode.html#start-within-python
I think it'd look like this:
from moto.server import ThreadedMotoServer
server = ThreadedMotoServer(ip_address="127.0.0.1", port=endpoint_port)
server.start()
yield endpoint_uri
# run tests
server.stop()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're hard-coding the endpoint URI in a few locations in this file. Can we re-use the host/port/endpoint values instead? (Possibly with a fixture.)
Uh oh!
There was an error while loading. Please reload this page.