diff --git a/.github/docker-compose.yaml b/.github/docker-compose.yaml index 01cd9e3ec..21997b505 100644 --- a/.github/docker-compose.yaml +++ b/.github/docker-compose.yaml @@ -3,15 +3,19 @@ version: '3' services: dask-scheduler: container_name: dask-scheduler - image: daskdev/dask:latest + image: daskdev/dask:dev command: dask-scheduler + environment: + USE_MAMBA: "true" + EXTRA_CONDA_PACKAGES: "cloudpickle>=1.5.0" # match client cloudpickle version ports: - "8786:8786" dask-worker: container_name: dask-worker - image: daskdev/dask:latest + image: daskdev/dask:dev command: dask-worker dask-scheduler:8786 environment: + USE_MAMBA: "true" EXTRA_CONDA_PACKAGES: "pyarrow>=4.0.0" # required for parquet IO volumes: - /tmp:/tmp diff --git a/dask_planner/Cargo.toml b/dask_planner/Cargo.toml index 4fddbdeba..20ad0ab2a 100644 --- a/dask_planner/Cargo.toml +++ b/dask_planner/Cargo.toml @@ -12,7 +12,7 @@ rust-version = "1.59" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } rand = "0.7" pyo3 = { version = "0.16", features = ["extension-module", "abi3", "abi3-py38"] } -datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "8.0.0" } +datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "78207f5092fc5204ecd791278d403dcb6f0ae683" } uuid = { version = "0.8", features = ["v4"] } mimalloc = { version = "*", default-features = false } parking_lot = "0.12" diff --git a/dask_planner/src/expression.rs b/dask_planner/src/expression.rs index 81c91b86a..617eaa2c0 100644 --- a/dask_planner/src/expression.rs +++ b/dask_planner/src/expression.rs @@ -11,7 +11,7 @@ use datafusion::logical_expr::{lit, BuiltinScalarFunction, Expr}; use datafusion::scalar::ScalarValue; -pub use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::LogicalPlan; use datafusion::prelude::Column; diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index 7112ead55..532fb4fef 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -11,6 +11,7 @@ use crate::sql::exceptions::ParsingException; use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::catalog::{ResolvedTableReference, TableReference}; +use datafusion::datasource::TableProvider; use datafusion::error::DataFusionError; use datafusion::logical_expr::ScalarFunctionImplementation; use datafusion::physical_plan::udaf::AggregateUDF; @@ -55,7 +56,7 @@ impl ContextProvider for DaskSQLContext { fn get_table_provider( &self, name: TableReference, - ) -> Result, DataFusionError> { + ) -> Result, DataFusionError> { let reference: ResolvedTableReference = name.resolve(&self.default_catalog_name, &self.default_schema_name); match self.schemas.get(&self.default_schema_name) { diff --git a/dask_planner/src/sql/logical.rs b/dask_planner/src/sql/logical.rs index 282c12323..59928dc95 100644 --- a/dask_planner/src/sql/logical.rs +++ b/dask_planner/src/sql/logical.rs @@ -7,10 +7,12 @@ mod cross_join; mod explain; mod filter; mod join; +mod limit; +mod offset; pub mod projection; mod sort; -pub use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::LogicalPlan; use datafusion::common::Result; use datafusion::prelude::Column; @@ -85,6 +87,16 @@ impl PyLogicalPlan { to_py_plan(self.current_node.as_ref()) } + /// LogicalPlan::Limit as PyLimit + pub fn limit(&self) -> PyResult { + to_py_plan(self.current_node.as_ref()) + } + + /// LogicalPlan::Offset as PyOffset + pub fn offset(&self) -> PyResult { + to_py_plan(self.current_node.as_ref()) + } + /// LogicalPlan::Projection as PyProjection pub fn projection(&self) -> PyResult { to_py_plan(self.current_node.as_ref()) @@ -140,6 +152,7 @@ impl PyLogicalPlan { LogicalPlan::TableScan(_table_scan) => "TableScan", LogicalPlan::EmptyRelation(_empty_relation) => "EmptyRelation", LogicalPlan::Limit(_limit) => "Limit", + LogicalPlan::Offset(_offset) => "Offset", LogicalPlan::CreateExternalTable(_create_external_table) => "CreateExternalTable", LogicalPlan::CreateMemoryTable(_create_memory_table) => "CreateMemoryTable", LogicalPlan::DropTable(_drop_table) => "DropTable", @@ -151,7 +164,7 @@ impl PyLogicalPlan { LogicalPlan::SubqueryAlias(_sqalias) => "SubqueryAlias", LogicalPlan::CreateCatalogSchema(_create) => "CreateCatalogSchema", LogicalPlan::CreateCatalog(_create_catalog) => "CreateCatalog", - LogicalPlan::CreateView(_) => "CreateView", + LogicalPlan::CreateView(_create_view) => "CreateView", }) } diff --git a/dask_planner/src/sql/logical/aggregate.rs b/dask_planner/src/sql/logical/aggregate.rs index a50b5c832..1fba45b20 100644 --- a/dask_planner/src/sql/logical/aggregate.rs +++ b/dask_planner/src/sql/logical/aggregate.rs @@ -1,7 +1,7 @@ use crate::expression::PyExpr; +use datafusion::logical_expr::LogicalPlan; use datafusion::logical_expr::{logical_plan::Aggregate, Expr}; -pub use datafusion::logical_expr::{logical_plan::JoinType, LogicalPlan}; use crate::sql::exceptions::py_type_err; use pyo3::prelude::*; diff --git a/dask_planner/src/sql/logical/filter.rs b/dask_planner/src/sql/logical/filter.rs index c90211613..df1a01955 100644 --- a/dask_planner/src/sql/logical/filter.rs +++ b/dask_planner/src/sql/logical/filter.rs @@ -1,7 +1,7 @@ use crate::expression::PyExpr; use datafusion::logical_expr::logical_plan::Filter; -pub use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::LogicalPlan; use crate::sql::exceptions::py_type_err; use pyo3::prelude::*; diff --git a/dask_planner/src/sql/logical/limit.rs b/dask_planner/src/sql/logical/limit.rs new file mode 100644 index 000000000..a57ba24b1 --- /dev/null +++ b/dask_planner/src/sql/logical/limit.rs @@ -0,0 +1,35 @@ +use crate::expression::PyExpr; +use crate::sql::exceptions::py_type_err; + +use datafusion::scalar::ScalarValue; +use pyo3::prelude::*; + +use datafusion::logical_expr::{logical_plan::Limit, Expr, LogicalPlan}; + +#[pyclass(name = "Limit", module = "dask_planner", subclass)] +#[derive(Clone)] +pub struct PyLimit { + limit: Limit, +} + +#[pymethods] +impl PyLimit { + #[pyo3(name = "getLimitN")] + pub fn limit_n(&self) -> PyResult { + Ok(PyExpr::from( + Expr::Literal(ScalarValue::UInt64(Some(self.limit.n.try_into().unwrap()))), + Some(self.limit.input.clone()), + )) + } +} + +impl TryFrom for PyLimit { + type Error = PyErr; + + fn try_from(logical_plan: LogicalPlan) -> Result { + match logical_plan { + LogicalPlan::Limit(limit) => Ok(PyLimit { limit: limit }), + _ => Err(py_type_err("unexpected plan")), + } + } +} diff --git a/dask_planner/src/sql/logical/offset.rs b/dask_planner/src/sql/logical/offset.rs new file mode 100644 index 000000000..a6074dc81 --- /dev/null +++ b/dask_planner/src/sql/logical/offset.rs @@ -0,0 +1,44 @@ +use crate::expression::PyExpr; +use crate::sql::exceptions::py_type_err; + +use datafusion::scalar::ScalarValue; +use pyo3::prelude::*; + +use datafusion::logical_expr::{logical_plan::Offset, Expr, LogicalPlan}; + +#[pyclass(name = "Offset", module = "dask_planner", subclass)] +#[derive(Clone)] +pub struct PyOffset { + offset: Offset, +} + +#[pymethods] +impl PyOffset { + #[pyo3(name = "getOffset")] + pub fn offset(&self) -> PyResult { + Ok(PyExpr::from( + Expr::Literal(ScalarValue::UInt64(Some(self.offset.offset as u64))), + Some(self.offset.input.clone()), + )) + } + + #[pyo3(name = "getFetch")] + pub fn offset_fetch(&self) -> PyResult { + // TODO: Still need to implement fetch size! For now get everything from offset on with '0' + Ok(PyExpr::from( + Expr::Literal(ScalarValue::UInt64(Some(0))), + Some(self.offset.input.clone()), + )) + } +} + +impl TryFrom for PyOffset { + type Error = PyErr; + + fn try_from(logical_plan: LogicalPlan) -> Result { + match logical_plan { + LogicalPlan::Offset(offset) => Ok(PyOffset { offset: offset }), + _ => Err(py_type_err("unexpected plan")), + } + } +} diff --git a/dask_planner/src/sql/logical/projection.rs b/dask_planner/src/sql/logical/projection.rs index a47533d06..6ee720c6e 100644 --- a/dask_planner/src/sql/logical/projection.rs +++ b/dask_planner/src/sql/logical/projection.rs @@ -1,6 +1,6 @@ use crate::expression::PyExpr; -pub use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::LogicalPlan; use datafusion::logical_expr::{logical_plan::Projection, Expr}; use crate::sql::exceptions::py_type_err; diff --git a/dask_planner/src/sql/table.rs b/dask_planner/src/sql/table.rs index 44a5c28a5..159d441c1 100644 --- a/dask_planner/src/sql/table.rs +++ b/dask_planner/src/sql/table.rs @@ -7,14 +7,13 @@ use crate::sql::types::SqlTypeName; use async_trait::async_trait; use datafusion::arrow::datatypes::{DataType, Field, SchemaRef}; -pub use datafusion::datasource::TableProvider; +use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::DataFusionError; use datafusion::logical_expr::{Expr, LogicalPlan, TableSource}; use datafusion::physical_plan::{empty::EmptyExec, project_schema, ExecutionPlan}; use pyo3::prelude::*; -use datafusion::datasource::TableType; use std::any::Any; use std::sync::Arc; diff --git a/dask_sql/context.py b/dask_sql/context.py index 55f3e7f04..a3204a20b 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -99,6 +99,7 @@ def __init__(self, logging_level=logging.INFO): RelConverter.add_plugin_class(logical.DaskJoinPlugin, replace=False) RelConverter.add_plugin_class(logical.DaskCrossJoinPlugin, replace=False) RelConverter.add_plugin_class(logical.DaskLimitPlugin, replace=False) + RelConverter.add_plugin_class(logical.DaskOffsetPlugin, replace=False) RelConverter.add_plugin_class(logical.DaskProjectPlugin, replace=False) RelConverter.add_plugin_class(logical.DaskSortPlugin, replace=False) RelConverter.add_plugin_class(logical.DaskTableScanPlugin, replace=False) diff --git a/dask_sql/physical/rel/logical/__init__.py b/dask_sql/physical/rel/logical/__init__.py index 10fbb2fba..9e6160699 100644 --- a/dask_sql/physical/rel/logical/__init__.py +++ b/dask_sql/physical/rel/logical/__init__.py @@ -4,6 +4,7 @@ from .filter import DaskFilterPlugin from .join import DaskJoinPlugin from .limit import DaskLimitPlugin +from .offset import DaskOffsetPlugin from .project import DaskProjectPlugin from .sample import SamplePlugin from .sort import DaskSortPlugin @@ -18,6 +19,7 @@ DaskJoinPlugin, DaskCrossJoinPlugin, DaskLimitPlugin, + DaskOffsetPlugin, DaskProjectPlugin, DaskSortPlugin, DaskTableScanPlugin, diff --git a/dask_sql/physical/rel/logical/limit.py b/dask_sql/physical/rel/logical/limit.py index 76773e37e..04af385d8 100644 --- a/dask_sql/physical/rel/logical/limit.py +++ b/dask_sql/physical/rel/logical/limit.py @@ -1,11 +1,8 @@ from typing import TYPE_CHECKING -import dask.dataframe as dd - from dask_sql.datacontainer import DataContainer from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rex import RexConverter -from dask_sql.physical.utils.map import map_on_partition_index if TYPE_CHECKING: import dask_sql @@ -25,82 +22,12 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai df = dc.df cc = dc.column_container - offset = rel.getOffset() - if offset: - offset = RexConverter.convert(offset, df, context=context) - - end = rel.getFetch() - if end: - end = RexConverter.convert(end, df, context=context) - - if offset: - end += offset + limit = RexConverter.convert(rel, rel.limit().getLimitN(), df, context=context) - df = self._apply_offset(df, offset, end) + # If an offset was present it would have already been processed at this point. + # Therefore it is always safe to start at 0 when applying the limit + df = df.head(limit, npartitions=-1, compute=False) cc = self.fix_column_to_row_type(cc, rel.getRowType()) # No column type has changed, so no need to cast again return DataContainer(df, cc) - - def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame: - """ - Limit the dataframe to the window [offset, end]. - That is unfortunately, not so simple as we do not know how many - items we have in each partition. We have therefore no other way than to - calculate (!!!) the sizes of each partition. - - After that, we can create a new dataframe from the old - dataframe by calculating for each partition if and how much - it should be used. - We do this via generating our own dask computation graph as - we need to pass the partition number to the selection - function, which is not possible with normal "map_partitions". - """ - if not offset: - # We do a (hopefully) very quick check: if the first partition - # is already enough, we will just use this - first_partition_length = len(df.partitions[0]) - if first_partition_length >= end: - return df.head(end, compute=False) - - # First, we need to find out which partitions we want to use. - # Therefore we count the total number of entries - partition_borders = df.map_partitions(lambda x: len(x)) - - # Now we let each of the partitions figure out, how much it needs to return - # using these partition borders - # For this, we generate out own dask computation graph (as it does not really - # fit well with one of the already present methods). - - # (a) we define a method to be calculated on each partition - # This method returns the part of the partition, which falls between [offset, fetch] - # Please note that the dask object "partition_borders", will be turned into - # its pandas representation at this point and we can calculate the cumsum - # (which is not possible on the dask object). Recalculating it should not cost - # us much, as we assume the number of partitions is rather small. - def select_from_to(df, partition_index, partition_borders): - partition_borders = partition_borders.cumsum().to_dict() - this_partition_border_left = ( - partition_borders[partition_index - 1] if partition_index > 0 else 0 - ) - this_partition_border_right = partition_borders[partition_index] - - if (end and end < this_partition_border_left) or ( - offset and offset >= this_partition_border_right - ): - return df.iloc[0:0] - - from_index = max(offset - this_partition_border_left, 0) if offset else 0 - to_index = ( - min(end, this_partition_border_right) - if end - else this_partition_border_right - ) - this_partition_border_left - - return df.iloc[from_index:to_index] - - # (b) Now we just need to apply the function on every partition - # We do this via the delayed interface, which seems the easiest one. - return map_on_partition_index( - df, select_from_to, partition_borders, meta=df._meta - ) diff --git a/dask_sql/physical/rel/logical/offset.py b/dask_sql/physical/rel/logical/offset.py new file mode 100644 index 000000000..1eda6a71c --- /dev/null +++ b/dask_sql/physical/rel/logical/offset.py @@ -0,0 +1,72 @@ +from typing import TYPE_CHECKING + +import dask.dataframe as dd + +from dask_sql.datacontainer import DataContainer +from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.physical.rex import RexConverter + +if TYPE_CHECKING: + import dask_sql + from dask_planner.rust import LogicalPlan + + +class DaskOffsetPlugin(BaseRelPlugin): + """ + Offset is used to modify the effective expression bounds in a larger table + (OFFSET). + """ + + class_name = "Offset" + + def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer: + (dc,) = self.assert_inputs(rel, 1, context) + df = dc.df + cc = dc.column_container + + offset = RexConverter.convert( + rel, rel.offset().getOffset(), df, context=context + ) + + df = self._apply_offset(df, offset) + + cc = self.fix_column_to_row_type(cc, rel.getRowType()) + # No column type has changed, so no need to cast again + return DataContainer(df, cc) + + def _apply_offset(self, df: dd.DataFrame, offset: int) -> dd.DataFrame: + """ + Limit the dataframe to the window [offset, end]. + + Unfortunately, Dask does not currently support row selection through `iloc`, so this must be done using a custom partition function. + However, it is sometimes possible to compute this window using `head` when an `offset` is not specified. + """ + # compute the size of each partition + # TODO: compute `cumsum` here when dask#9067 is resolved + partition_borders = df.map_partitions(lambda x: len(x)) + + def offset_partition_func(df, partition_borders, partition_info=None): + """Limit the partition to values contained within the specified window, returning an empty dataframe if there are none""" + + # TODO: remove the `cumsum` call here when dask#9067 is resolved + partition_borders = partition_borders.cumsum().to_dict() + partition_index = ( + partition_info["number"] if partition_info is not None else 0 + ) + + partition_border_left = ( + partition_borders[partition_index - 1] if partition_index > 0 else 0 + ) + partition_border_right = partition_borders[partition_index] + + if offset >= partition_border_right: + return df.iloc[0:0] + + from_index = max(offset - partition_border_left, 0) + + return df.iloc[from_index:] + + return df.map_partitions( + offset_partition_func, + partition_borders=partition_borders, + ) diff --git a/dask_sql/physical/rex/convert.py b/dask_sql/physical/rex/convert.py index bbbeda1db..35bc50ec1 100644 --- a/dask_sql/physical/rex/convert.py +++ b/dask_sql/physical/rex/convert.py @@ -13,16 +13,6 @@ logger = logging.getLogger(__name__) - -# _REX_TYPE_TO_PLUGIN = { -# "Alias": "InputRef", -# "Column": "InputRef", -# "BinaryExpr": "RexCall", -# "Literal": "RexLiteral", -# "ScalarFunction": "RexCall", -# "Cast": "RexCall", -# } - _REX_TYPE_TO_PLUGIN = { "RexType.Reference": "InputRef", "RexType.Call": "RexCall", diff --git a/tests/integration/test_select.py b/tests/integration/test_select.py index f96969e15..eba5e3608 100644 --- a/tests/integration/test_select.py +++ b/tests/integration/test_select.py @@ -76,7 +76,6 @@ def test_select_of_select(c, df): ) AS "inner" """ ) - expected_df = pd.DataFrame({"e": 2 * (df["a"] - 1), "f": 2 * df["b"] - 1}) assert_eq(result_df, expected_df) @@ -119,7 +118,6 @@ def test_timezones(c, datetime_table): assert_eq(result_df, datetime_table) -@pytest.mark.skip(reason="WIP DataFusion") @pytest.mark.parametrize( "input_table", [