From d31eaaa32f0829cc77dd37a543e7914eefc33cb2 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 11 Oct 2021 13:02:31 -0700 Subject: [PATCH 1/8] Add support for GPU table creation in dask / location plugins --- dask_sql/input_utils/dask.py | 21 ++++++++++++++++++++- dask_sql/input_utils/intake.py | 8 ++++---- dask_sql/input_utils/location.py | 28 +++++++++++++++++++++++----- dask_sql/input_utils/pandaslike.py | 14 +++++++++----- dask_sql/input_utils/sqlalchemy.py | 10 +++++++++- tests/integration/test_create.py | 14 +------------- tests/unit/test_context.py | 24 ++---------------------- 7 files changed, 68 insertions(+), 51 deletions(-) diff --git a/dask_sql/input_utils/dask.py b/dask_sql/input_utils/dask.py index 3cfc33996..2da11e701 100644 --- a/dask_sql/input_utils/dask.py +++ b/dask_sql/input_utils/dask.py @@ -4,6 +4,11 @@ from dask_sql.input_utils.base import BaseInputPlugin +try: + import dask_cudf +except ImportError: + dask_cudf = None + class DaskInputPlugin(BaseInputPlugin): """Input Plugin for Dask DataFrames, just keeping them""" @@ -13,5 +18,19 @@ def is_correct_input( ): return isinstance(input_item, dd.DataFrame) or format == "dask" - def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs): + def to_dc( + self, + input_item: Any, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs + ): + if gpu: # pragma: no cover + if not dask_cudf: + raise ModuleNotFoundError( + "Setting `gpu=True` for table creation requires dask_cudf" + ) + if not isinstance(input_item, dask_cudf.DataFrame): + input_item = dask_cudf.from_dask_dataframe(input_item, **kwargs) return input_item diff --git a/dask_sql/input_utils/intake.py b/dask_sql/input_utils/intake.py index 709072d67..830f02409 100644 --- a/dask_sql/input_utils/intake.py +++ b/dask_sql/input_utils/intake.py @@ -26,13 +26,13 @@ def to_dc( gpu: bool = False, **kwargs, ): + if gpu: # pragma: no cover + raise Exception("Intake does not support gpu") + table_name = kwargs.pop("intake_table_name", table_name) catalog_kwargs = kwargs.pop("catalog_kwargs", {}) if isinstance(input_item, str): input_item = intake.open_catalog(input_item, **catalog_kwargs) - if gpu: # pragma: no cover - raise Exception("Intake does not support gpu") - else: - return input_item[table_name].to_dask(**kwargs) + return input_item[table_name].to_dask(**kwargs) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index a7f8a036f..b69c143a3 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -2,10 +2,17 @@ from typing import Any import dask.dataframe as dd +import pandas as pd from distributed.client import default_client from dask_sql.input_utils.base import BaseInputPlugin +try: + import cudf + import dask_cudf +except ImportError: + dask_cudf = None + class LocationInputPlugin(BaseInputPlugin): """Input Plugin for everything, which can be read in from a file (on disk, remote etc.)""" @@ -23,20 +30,31 @@ def to_dc( gpu: bool = False, **kwargs, ): - + if gpu and not dask_cudf: # pragma: no cover + raise ModuleNotFoundError( + "Setting `gpu=True` for table creation requires dask_cudf" + ) if format == "memory": client = default_client() - return client.get_dataset(input_item, **kwargs) + df = client.get_dataset(input_item, **kwargs) + if gpu: # pragma: no cover + if isinstance(df, pd.DataFrame): + npartitions = kwargs.pop("npartitions", 1) + df = dask_cudf.from_dask_dataframe( + cudf.from_pandas(df), npartitions=npartitions, **kwargs + ) + elif isinstance(df, dd.DataFrame) and not isinstance( + df, dask_cudf.DataFrame + ): + df = dask_cudf.from_dask_dataframe(df, **kwargs) + return df if not format: _, extension = os.path.splitext(input_item) format = extension.lstrip(".") - try: if gpu: # pragma: no cover - import dask_cudf - read_function = getattr(dask_cudf, f"read_{format}") else: read_function = getattr(dd, f"read_{format}") diff --git a/dask_sql/input_utils/pandaslike.py b/dask_sql/input_utils/pandaslike.py index 681c2d459..64676335c 100644 --- a/dask_sql/input_utils/pandaslike.py +++ b/dask_sql/input_utils/pandaslike.py @@ -1,12 +1,13 @@ import dask.dataframe as dd import pandas as pd +from dask_sql.input_utils.base import BaseInputPlugin + try: import cudf -except ImportError: # pragma: no cover - cudf = None - -from dask_sql.input_utils.base import BaseInputPlugin + import dask_cudf +except ImportError: + dask_cudf = None class PandasLikeInputPlugin(BaseInputPlugin): @@ -28,7 +29,10 @@ def to_dc( ): npartitions = kwargs.pop("npartitions", 1) if gpu: # pragma: no cover - import dask_cudf + if not dask_cudf: + raise ModuleNotFoundError( + "Setting `gpu=True` for table creation requires dask_cudf" + ) if isinstance(input_item, pd.DataFrame): return dask_cudf.from_cudf( diff --git a/dask_sql/input_utils/sqlalchemy.py b/dask_sql/input_utils/sqlalchemy.py index 9a8199bb8..e4f6e4794 100644 --- a/dask_sql/input_utils/sqlalchemy.py +++ b/dask_sql/input_utils/sqlalchemy.py @@ -16,8 +16,16 @@ def is_correct_input( return correct_prefix def to_dc( - self, input_item: Any, table_name: str, format: str = None, **kwargs + self, + input_item: Any, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs ): # pragma: no cover + if gpu: + raise Exception("Hive does not support gpu") + import sqlalchemy engine_kwargs = {} diff --git a/tests/integration/test_create.py b/tests/integration/test_create.py index 990c3beab..5c10a400a 100644 --- a/tests/integration/test_create.py +++ b/tests/integration/test_create.py @@ -37,19 +37,7 @@ def test_create_from_csv(c, df, temporary_data_file, gpu): @pytest.mark.parametrize( - "gpu", - [ - False, - pytest.param( - True, - marks=[ - pytest.mark.gpu, - pytest.mark.xfail( - reason="dataframes on memory currently aren't being converted to dask-cudf" - ), - ], - ), - ], + "gpu", [False, pytest.param(True, marks=pytest.mark.gpu),], ) def test_cluster_memory(client, c, df, gpu): client.publish_dataset(df=dd.from_pandas(df, npartitions=1)) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index e2bd0f471..732aa4286 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -86,17 +86,7 @@ def test_explain(gpu): @pytest.mark.parametrize( - "gpu", - [ - False, - pytest.param( - True, - marks=( - pytest.mark.gpu, - pytest.mark.xfail(reason="create_table(gpu=True) doesn't work"), - ), - ), - ], + "gpu", [False, pytest.param(True, marks=pytest.mark.gpu,),], ) def test_sql(gpu): c = Context() @@ -120,17 +110,7 @@ def test_sql(gpu): @pytest.mark.parametrize( - "gpu", - [ - False, - pytest.param( - True, - marks=( - pytest.mark.gpu, - pytest.mark.xfail(reason="create_table(gpu=True) doesn't work"), - ), - ), - ], + "gpu", [False, pytest.param(True, marks=pytest.mark.gpu,),], ) def test_input_types(temporary_data_file, gpu): c = Context() From 1718a76b57a6ad2e097f98034ce261e98dffe760 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 11 Oct 2021 15:01:16 -0700 Subject: [PATCH 2/8] Make sure to define cudf in case of ImportErrors --- dask_sql/input_utils/location.py | 1 + dask_sql/input_utils/pandaslike.py | 1 + dask_sql/physical/utils/sort.py | 1 + 3 files changed, 3 insertions(+) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index b69c143a3..e4ccf1186 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -11,6 +11,7 @@ import cudf import dask_cudf except ImportError: + cudf = None dask_cudf = None diff --git a/dask_sql/input_utils/pandaslike.py b/dask_sql/input_utils/pandaslike.py index 64676335c..17d45d015 100644 --- a/dask_sql/input_utils/pandaslike.py +++ b/dask_sql/input_utils/pandaslike.py @@ -7,6 +7,7 @@ import cudf import dask_cudf except ImportError: + cudf = None dask_cudf = None diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 5857a3321..39527d8aa 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -18,6 +18,7 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: + breakpoint() # if we have a single partition, we can sometimes sort with map_partitions if df.npartitions == 1: if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame): From 69ec13a0a91a05aea068a332ee352f24d1816118 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 11 Oct 2021 18:45:02 -0700 Subject: [PATCH 3/8] Remove breakpoints --- dask_sql/physical/utils/sort.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 39527d8aa..5857a3321 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -18,7 +18,6 @@ def apply_sort( sort_ascending: List[bool], sort_null_first: List[bool], ) -> dd.DataFrame: - breakpoint() # if we have a single partition, we can sometimes sort with map_partitions if df.npartitions == 1: if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame): From 76c4d5e1dc05ce3baaceac20d162ed4eb4d3c1d8 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 4 Nov 2021 09:23:01 -0700 Subject: [PATCH 4/8] Use more specific errors for not implemented GPU support --- dask_sql/input_utils/intake.py | 2 +- dask_sql/input_utils/sqlalchemy.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_sql/input_utils/intake.py b/dask_sql/input_utils/intake.py index 830f02409..cd88ff951 100644 --- a/dask_sql/input_utils/intake.py +++ b/dask_sql/input_utils/intake.py @@ -27,7 +27,7 @@ def to_dc( **kwargs, ): if gpu: # pragma: no cover - raise Exception("Intake does not support gpu") + raise NotImplementedError("Intake does not support gpu") table_name = kwargs.pop("intake_table_name", table_name) catalog_kwargs = kwargs.pop("catalog_kwargs", {}) diff --git a/dask_sql/input_utils/sqlalchemy.py b/dask_sql/input_utils/sqlalchemy.py index e4f6e4794..c45120317 100644 --- a/dask_sql/input_utils/sqlalchemy.py +++ b/dask_sql/input_utils/sqlalchemy.py @@ -24,7 +24,7 @@ def to_dc( **kwargs ): # pragma: no cover if gpu: - raise Exception("Hive does not support gpu") + raise NotImplementedError("Hive does not support gpu") import sqlalchemy From 4885635e7faaddc55f758de634e50665f96ea097 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 4 Nov 2021 10:57:15 -0700 Subject: [PATCH 5/8] Simplify location/pandas input utils --- dask_sql/input_utils/location.py | 26 +++++++++----------------- dask_sql/input_utils/pandaslike.py | 18 +++++------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index e4ccf1186..984f0d2d7 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -6,12 +6,12 @@ from distributed.client import default_client from dask_sql.input_utils.base import BaseInputPlugin +from dask_sql.input_utils.dask import DaskInputPlugin +from dask_sql.input_utils.pandaslike import PandasLikeInputPlugin try: - import cudf import dask_cudf except ImportError: - cudf = None dask_cudf = None @@ -31,31 +31,23 @@ def to_dc( gpu: bool = False, **kwargs, ): - if gpu and not dask_cudf: # pragma: no cover - raise ModuleNotFoundError( - "Setting `gpu=True` for table creation requires dask_cudf" - ) if format == "memory": client = default_client() df = client.get_dataset(input_item, **kwargs) - if gpu: # pragma: no cover - if isinstance(df, pd.DataFrame): - npartitions = kwargs.pop("npartitions", 1) - df = dask_cudf.from_dask_dataframe( - cudf.from_pandas(df), npartitions=npartitions, **kwargs - ) - elif isinstance(df, dd.DataFrame) and not isinstance( - df, dask_cudf.DataFrame - ): - df = dask_cudf.from_dask_dataframe(df, **kwargs) - return df + for plugin in [DaskInputPlugin(), PandasLikeInputPlugin()]: + if plugin.is_correct_input(df, table_name, format, **kwargs): + return plugin.to_dc(df, table_name, format, gpu, **kwargs) if not format: _, extension = os.path.splitext(input_item) format = extension.lstrip(".") try: if gpu: # pragma: no cover + if not dask_cudf: + raise ModuleNotFoundError( + "Setting `gpu=True` for table creation requires dask-cudf" + ) read_function = getattr(dask_cudf, f"read_{format}") else: read_function = getattr(dd, f"read_{format}") diff --git a/dask_sql/input_utils/pandaslike.py b/dask_sql/input_utils/pandaslike.py index 17d45d015..06e06b9cd 100644 --- a/dask_sql/input_utils/pandaslike.py +++ b/dask_sql/input_utils/pandaslike.py @@ -5,10 +5,8 @@ try: import cudf - import dask_cudf except ImportError: cudf = None - dask_cudf = None class PandasLikeInputPlugin(BaseInputPlugin): @@ -30,18 +28,12 @@ def to_dc( ): npartitions = kwargs.pop("npartitions", 1) if gpu: # pragma: no cover - if not dask_cudf: + if not cudf: raise ModuleNotFoundError( - "Setting `gpu=True` for table creation requires dask_cudf" + "Setting `gpu=True` for table creation requires cudf" ) if isinstance(input_item, pd.DataFrame): - return dask_cudf.from_cudf( - cudf.from_pandas(input_item), npartitions=npartitions, **kwargs, - ) - else: - return dask_cudf.from_cudf( - input_item, npartitions=npartitions, **kwargs, - ) - else: - return dd.from_pandas(input_item, npartitions=npartitions, **kwargs) + input_item = cudf.from_pandas(input_item) + + return dd.from_pandas(input_item, npartitions=npartitions, **kwargs) From e45115754438adca731d179400bc99b279799a42 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 4 Nov 2021 11:04:42 -0700 Subject: [PATCH 6/8] Fix flake8 issues --- dask_sql/input_utils/location.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index 984f0d2d7..c367a4f54 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -2,7 +2,6 @@ from typing import Any import dask.dataframe as dd -import pandas as pd from distributed.client import default_client from dask_sql.input_utils.base import BaseInputPlugin From 42ed94bf7382f077b17e07cdb8cf51384500d225 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 29 Nov 2021 15:13:13 -0500 Subject: [PATCH 7/8] Generalize check for pandas like input --- dask_sql/input_utils/pandaslike.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dask_sql/input_utils/pandaslike.py b/dask_sql/input_utils/pandaslike.py index 06e06b9cd..32d7ff5ea 100644 --- a/dask_sql/input_utils/pandaslike.py +++ b/dask_sql/input_utils/pandaslike.py @@ -15,8 +15,10 @@ class PandasLikeInputPlugin(BaseInputPlugin): def is_correct_input( self, input_item, table_name: str, format: str = None, **kwargs ): - is_cudf_type = cudf and isinstance(input_item, cudf.DataFrame) - return is_cudf_type or isinstance(input_item, pd.DataFrame) or format == "dask" + return ( + dd.utils.is_dataframe_like(input_item) + and not isinstance(input_item, dd.DataFrame) + ) or format == "dask" def to_dc( self, From cefdfe47e24e050fb47a6a412b2cc6c9367f9d0d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 29 Nov 2021 15:49:41 -0500 Subject: [PATCH 8/8] Use input util to get plugins list for location --- dask_sql/input_utils/location.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dask_sql/input_utils/location.py b/dask_sql/input_utils/location.py index c367a4f54..8cb23a444 100644 --- a/dask_sql/input_utils/location.py +++ b/dask_sql/input_utils/location.py @@ -5,8 +5,7 @@ from distributed.client import default_client from dask_sql.input_utils.base import BaseInputPlugin -from dask_sql.input_utils.dask import DaskInputPlugin -from dask_sql.input_utils.pandaslike import PandasLikeInputPlugin +from dask_sql.input_utils.convert import InputUtil try: import dask_cudf @@ -34,7 +33,9 @@ def to_dc( client = default_client() df = client.get_dataset(input_item, **kwargs) - for plugin in [DaskInputPlugin(), PandasLikeInputPlugin()]: + plugin_list = InputUtil.get_plugins() + + for plugin in plugin_list: if plugin.is_correct_input(df, table_name, format, **kwargs): return plugin.to_dc(df, table_name, format, gpu, **kwargs) if not format: