Skip to content
Merged
18 changes: 18 additions & 0 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dask_cuda import LocalCUDACluster # noqa: F401
except ImportError:
cudf = None
LocalCUDACluster = None


@pytest.fixture()
Expand Down Expand Up @@ -255,6 +256,23 @@ def _assert_query_gives_same_result(query, sort_columns=None, **kwargs):
return _assert_query_gives_same_result


@pytest.fixture(scope="module")
def cluster():
if LocalCUDACluster is None:
pytest.skip("dask_cuda not installed")

cluster = LocalCUDACluster(protocol="tcp", scheduler_port=0)
yield cluster
cluster.close()


@pytest.fixture(scope="module")
def client(cluster):
client = Client(cluster)
yield client
client.close()


@pytest.fixture(scope="session", autouse=True)
def setup_dask_client():
"""Setup a dask client if requested"""
Expand Down
65 changes: 64 additions & 1 deletion tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ def training_df(c):
df = timeseries(freq="1d").reset_index(drop=True)
c.create_table("timeseries", df, persist=True)

return training_df
return None


@pytest.fixture()
def gpu_training_df(c):
cudf = pytest.importorskip("cudf")
df = timeseries(freq="1d")
df = df.map_partitions(cudf.from_pandas).reset_index(drop=True)
c.create_table("timeseries", input_table=df)
return None


def test_training_and_prediction(c, training_df):
Expand All @@ -58,6 +67,60 @@ def test_training_and_prediction(c, training_df):
check_trained_model(c)


def test_cuml_training_and_prediction(c, gpu_training_df):
cuml = pytest.importorskip("cuml", reason="cuml not installed")
model_query = """
CREATE OR REPLACE MODEL my_model WITH (
model_class = 'cuml.linear_model.LogisticRegression',
wrap_predict = True,
wrap_fit = False,
target_column = 'target'
) AS (
SELECT x, y, x*y > 0 AS target
FROM timeseries
)
"""
c.sql(model_query)
check_trained_model(c)


def test_dask_cuml_training_and_prediction(c, gpu_training_df, client):
cuml = pytest.importorskip("cuml", reason="cuml not installed")

model_query = """
CREATE OR REPLACE MODEL my_model WITH (
model_class = 'cuml.dask.linear_model.LinearRegression',
target_column = 'target'
) AS (
SELECT x, y, x*y AS target
FROM timeseries
)
"""
c.sql(model_query)
# TODO:
# currently failing due to lack of deep-copy in
# cuml.linear_model.linear_regression_mg.LinearRegressionMG
# Still triaging
# check_trained_model(c)


def test_xgboost_training_prediction(c, gpu_training_df, client):
xgboost = pytest.importorskip("xgboost", reason="xgboost not installed")

model_query = """
CREATE OR REPLACE MODEL my_model WITH (
model_class = 'xgboost.dask.DaskXGBRegressor',
target_column = 'target',
tree_method= 'gpu_hist'
) AS (
SELECT x, y, x*y AS target
FROM timeseries
)
"""
c.sql(model_query)
check_trained_model(c)


def test_clustering_and_prediction(c, training_df):
c.sql(
"""
Expand Down