Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ services:
ports:
- "8786:8786"
environment:
EXTRA_CONDA_PACKAGES: "pandas>=1.3 numpy=1.20.2 -c conda-forge"
EXTRA_CONDA_PACKAGES: "pandas>=1.0.0"
dask-worker:
container_name: dask-worker
image: daskdev/dask:latest
command: dask-worker dask-scheduler:8786
environment:
EXTRA_CONDA_PACKAGES: "pandas>=1.3 numpy=1.20.2 -c conda-forge"
EXTRA_CONDA_PACKAGES: "pandas>=1.0.0"
volumes:
- /tmp:/tmp
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

pytest_plugins = ["distributed.utils_test", "tests.integration.fixtures"]
pytest_plugins = ["tests.integration.fixtures"]


def pytest_addoption(parser):
Expand Down
15 changes: 14 additions & 1 deletion tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import pandas as pd
import pytest
from dask.datasets import timeseries
from dask.distributed import Client
from dask.distributed import Client, LocalCluster
from dask.distributed.utils_test import loop # noqa: F401
from pandas.testing import assert_frame_equal

try:
Expand Down Expand Up @@ -296,3 +297,15 @@ def setup_dask_client():
os.getenv("DASK_SQL_TEST_SCHEDULER", None) is not None,
reason="Can not run with external cluster",
)


@pytest.fixture()
def cluster(loop): # noqa: F811
with LocalCluster(loop=loop) as cluster:
yield cluster


@pytest.fixture()
def client(cluster):
with Client(cluster) as client:
yield client
6 changes: 4 additions & 2 deletions tests/integration/test_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from dask import config as dask_config
from mock import MagicMock, patch
from prompt_toolkit.application import create_app_session
from prompt_toolkit.input import create_pipe_input
Expand Down Expand Up @@ -103,8 +104,9 @@ def test_meta_commands(c, client, capsys):
match="Timed out during handshake while "
"connecting to tcp://localhost:8787 after 5 s",
):
client = _meta_commands("\\dsc localhost:8787", context=c, client=client)
assert client.scheduler.__dict__["addr"] == "localhost:8787"
with dask_config.set({"distributed.comm.timeouts.connect": 5}):
client = _meta_commands("\\dsc localhost:8787", context=c, client=client)
assert client.scheduler.__dict__["addr"] == "localhost:8787"


def test_connection_info(c, client, capsys):
Expand Down
4 changes: 0 additions & 4 deletions tests/integration/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
from pandas.testing import assert_frame_equal

import dask_sql
from tests.integration.fixtures import skip_if_external_scheduler


@skip_if_external_scheduler
@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_create_from_csv(c, df, temporary_data_file, gpu):
df.to_csv(temporary_data_file, index=False)
Expand Down Expand Up @@ -66,7 +64,6 @@ def test_cluster_memory(client, c, df, gpu):
assert_frame_equal(df, return_df)


@skip_if_external_scheduler
@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_create_from_csv_persist(c, df, temporary_data_file, gpu):
df.to_csv(temporary_data_file, index=False)
Expand Down Expand Up @@ -159,7 +156,6 @@ def test_create_from_query(c, df):
assert_frame_equal(df, return_df)


@skip_if_external_scheduler
@pytest.mark.parametrize(
"gpu",
[
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pandas.testing import assert_frame_equal

from dask_sql import Context
from tests.integration.fixtures import skip_if_external_scheduler

fugue_sql = pytest.importorskip("fugue_sql")

Expand Down Expand Up @@ -38,6 +39,9 @@ def test_simple_statement():
assert_frame_equal(return_df, pd.DataFrame({"a": [1], "b": ["world"]}))


# TODO: Revisit fixing this on an independant cluster (without dask-sql) based on the
# discussion in https://github.com/dask-contrib/dask-sql/issues/407
@skip_if_external_scheduler
def test_fsql():
def assert_eq(df: pd.DataFrame) -> None:
assert_frame_equal(df, pd.DataFrame({"a": [1]}))
Expand Down