Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
b1900cf
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
1e48597
Updates for test_filter
jdye64 Mar 31, 2022
fd41a8c
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
682c009
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
ab69dd8
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
ce4c31e
Remove PyArrow dependency
jdye64 Apr 20, 2022
8785b8c
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
3e45ab8
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
1734b89
timestamp with loglca time zone
jdye64 Apr 21, 2022
ac7d9f6
Bump DataFusion version (#494)
andygrove Apr 21, 2022
cbf5db0
Include RelDataType work
jdye64 Apr 21, 2022
d9380a6
Include RelDataType work
jdye64 Apr 21, 2022
ad56fc2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
7b20e66
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
7dd2017
add getRowType() for logical.rs
jdye64 Apr 22, 2022
984f523
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
1405fea
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
789aaad
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
652205e
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
5127f87
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
cf568dc
removed a few print statements
jdye64 Apr 24, 2022
4fb640e
commit to share with colleague
jdye64 Apr 24, 2022
32127e5
updates
jdye64 Apr 25, 2022
f5e24fe
checkpoint
jdye64 Apr 25, 2022
11cf212
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
46dfb0a
formatting after upstream merge
jdye64 Apr 25, 2022
fa71674
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
f6e86ca
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
3d1a5ad
accept INT as INTEGER
jdye64 Apr 25, 2022
384e446
tests update
jdye64 Apr 25, 2022
199b9d2
checkpoint
jdye64 Apr 25, 2022
c9dffae
checkpoint
jdye64 Apr 27, 2022
c9aad43
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
11100fa
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
643e85d
Basic DataFusion Select Functionality (#489)
jdye64 Apr 28, 2022
b36ef16
updates for expression
jdye64 Apr 28, 2022
5c94fbc
uncommented pytests
jdye64 Apr 28, 2022
bb461c8
uncommented pytests
jdye64 Apr 28, 2022
f65b1ab
code cleanup for review
jdye64 Apr 28, 2022
dc7553f
code cleanup for review
jdye64 Apr 28, 2022
f1dc0b2
Enabled more pytest that work now
jdye64 Apr 28, 2022
940e867
Enabled more pytest that work now
jdye64 Apr 28, 2022
6769ca0
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
c4ed9bd
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
05c5788
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
a33aa63
Handle Between operation for case-when
jdye64 Apr 29, 2022
20efd5c
adjust timestamp casting
jdye64 May 2, 2022
281baf7
merge with upstream
jdye64 May 6, 2022
d666bdd
merge with upstream/datafusion-sql-planner
jdye64 May 9, 2022
533f50a
Refactor projection _column_name() logic to the _column_name logic in…
jdye64 May 9, 2022
a42a133
removed println! statements
jdye64 May 9, 2022
10cd463
merge with upstream
jdye64 May 10, 2022
a1841c3
Updates from review
jdye64 May 11, 2022
3001943
Add Offset and point to repo with offset in datafusion
jdye64 May 11, 2022
7ec66da
Introduce offset
jdye64 May 12, 2022
b72917b
limit updates
jdye64 May 12, 2022
651c9ab
commit before upstream merge
jdye64 May 15, 2022
4e69813
merged with upstream/datafusion-sql-planner
jdye64 May 16, 2022
3219ad0
Code formatting
jdye64 May 16, 2022
5a88155
Merge with upstream
jdye64 May 16, 2022
bd94ccf
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 17, 2022
bf91e8f
update Cargo.toml to use Arrow-DataFusion version with LIMIT logic
jdye64 May 17, 2022
3dc6a89
Bump DataFusion version to get changes around variant_name()
jdye64 May 18, 2022
08b38aa
Use map partitions for determining the offset
jdye64 May 19, 2022
7b52f41
Merge with upstream datafusion-crossjoin merge
jdye64 May 19, 2022
e129068
Refactor offset partition func
charlesbluca May 23, 2022
5e0de03
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 23, 2022
2d11de5
Update to use TryFrom logic
jdye64 May 23, 2022
c993377
Add cloudpickle to independent scheduler requirements
charlesbluca May 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ version: '3'
services:
dask-scheduler:
container_name: dask-scheduler
image: daskdev/dask:latest
image: daskdev/dask:dev
command: dask-scheduler
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "cloudpickle>=1.5.0" # match client cloudpickle version
ports:
- "8786:8786"
dask-worker:
container_name: dask-worker
image: daskdev/dask:latest
image: daskdev/dask:dev
command: dask-worker dask-scheduler:8786
environment:
USE_MAMBA: "true"
EXTRA_CONDA_PACKAGES: "pyarrow>=4.0.0" # required for parquet IO
volumes:
- /tmp:/tmp
2 changes: 1 addition & 1 deletion dask_planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ rust-version = "1.59"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
rand = "0.7"
pyo3 = { version = "0.16", features = ["extension-module", "abi3", "abi3-py38"] }
datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "8.0.0" }
datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "78207f5092fc5204ecd791278d403dcb6f0ae683" }
uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", default-features = false }
parking_lot = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use datafusion::logical_expr::{lit, BuiltinScalarFunction, Expr};

use datafusion::scalar::ScalarValue;

pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;

use datafusion::prelude::Column;

Expand Down
3 changes: 2 additions & 1 deletion dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::sql::exceptions::ParsingException;

use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::catalog::{ResolvedTableReference, TableReference};
use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::ScalarFunctionImplementation;
use datafusion::physical_plan::udaf::AggregateUDF;
Expand Down Expand Up @@ -55,7 +56,7 @@ impl ContextProvider for DaskSQLContext {
fn get_table_provider(
&self,
name: TableReference,
) -> Result<Arc<dyn table::TableProvider>, DataFusionError> {
) -> Result<Arc<dyn TableProvider>, DataFusionError> {
let reference: ResolvedTableReference =
name.resolve(&self.default_catalog_name, &self.default_schema_name);
match self.schemas.get(&self.default_schema_name) {
Expand Down
17 changes: 15 additions & 2 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ mod cross_join;
mod explain;
mod filter;
mod join;
mod limit;
mod offset;
pub mod projection;
mod sort;

pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;

use datafusion::common::Result;
use datafusion::prelude::Column;
Expand Down Expand Up @@ -85,6 +87,16 @@ impl PyLogicalPlan {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Limit as PyLimit
pub fn limit(&self) -> PyResult<limit::PyLimit> {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Offset as PyOffset
pub fn offset(&self) -> PyResult<offset::PyOffset> {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Projection as PyProjection
pub fn projection(&self) -> PyResult<projection::PyProjection> {
to_py_plan(self.current_node.as_ref())
Expand Down Expand Up @@ -140,6 +152,7 @@ impl PyLogicalPlan {
LogicalPlan::TableScan(_table_scan) => "TableScan",
LogicalPlan::EmptyRelation(_empty_relation) => "EmptyRelation",
LogicalPlan::Limit(_limit) => "Limit",
LogicalPlan::Offset(_offset) => "Offset",
LogicalPlan::CreateExternalTable(_create_external_table) => "CreateExternalTable",
LogicalPlan::CreateMemoryTable(_create_memory_table) => "CreateMemoryTable",
LogicalPlan::DropTable(_drop_table) => "DropTable",
Expand All @@ -151,7 +164,7 @@ impl PyLogicalPlan {
LogicalPlan::SubqueryAlias(_sqalias) => "SubqueryAlias",
LogicalPlan::CreateCatalogSchema(_create) => "CreateCatalogSchema",
LogicalPlan::CreateCatalog(_create_catalog) => "CreateCatalog",
LogicalPlan::CreateView(_) => "CreateView",
LogicalPlan::CreateView(_create_view) => "CreateView",
})
}

Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::expression::PyExpr;

use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{logical_plan::Aggregate, Expr};
pub use datafusion::logical_expr::{logical_plan::JoinType, LogicalPlan};

use crate::sql::exceptions::py_type_err;
use pyo3::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::expression::PyExpr;

use datafusion::logical_expr::logical_plan::Filter;
pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;

use crate::sql::exceptions::py_type_err;
use pyo3::prelude::*;
Expand Down
35 changes: 35 additions & 0 deletions dask_planner/src/sql/logical/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::expression::PyExpr;
use crate::sql::exceptions::py_type_err;

use datafusion::scalar::ScalarValue;
use pyo3::prelude::*;

use datafusion::logical_expr::{logical_plan::Limit, Expr, LogicalPlan};

#[pyclass(name = "Limit", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyLimit {
limit: Limit,
}

#[pymethods]
impl PyLimit {
#[pyo3(name = "getLimitN")]
pub fn limit_n(&self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(self.limit.n.try_into().unwrap()))),
Some(self.limit.input.clone()),
))
}
}

impl TryFrom<LogicalPlan> for PyLimit {
type Error = PyErr;

fn try_from(logical_plan: LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
LogicalPlan::Limit(limit) => Ok(PyLimit { limit: limit }),
_ => Err(py_type_err("unexpected plan")),
}
}
}
44 changes: 44 additions & 0 deletions dask_planner/src/sql/logical/offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use crate::expression::PyExpr;
use crate::sql::exceptions::py_type_err;

use datafusion::scalar::ScalarValue;
use pyo3::prelude::*;

use datafusion::logical_expr::{logical_plan::Offset, Expr, LogicalPlan};

#[pyclass(name = "Offset", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyOffset {
offset: Offset,
}

#[pymethods]
impl PyOffset {
#[pyo3(name = "getOffset")]
pub fn offset(&self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(self.offset.offset as u64))),
Some(self.offset.input.clone()),
))
}

#[pyo3(name = "getFetch")]
pub fn offset_fetch(&self) -> PyResult<PyExpr> {
// TODO: Still need to implement fetch size! For now get everything from offset on with '0'
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(0))),
Some(self.offset.input.clone()),
))
}
}

impl TryFrom<LogicalPlan> for PyOffset {
type Error = PyErr;

fn try_from(logical_plan: LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
LogicalPlan::Offset(offset) => Ok(PyOffset { offset: offset }),
_ => Err(py_type_err("unexpected plan")),
}
}
}
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/projection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::expression::PyExpr;

pub use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{logical_plan::Projection, Expr};

use crate::sql::exceptions::py_type_err;
Expand Down
3 changes: 1 addition & 2 deletions dask_planner/src/sql/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use crate::sql::types::SqlTypeName;
use async_trait::async_trait;

use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
pub use datafusion::datasource::TableProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{Expr, LogicalPlan, TableSource};
use datafusion::physical_plan::{empty::EmptyExec, project_schema, ExecutionPlan};

use pyo3::prelude::*;

use datafusion::datasource::TableType;
use std::any::Any;
use std::sync::Arc;

Expand Down
1 change: 1 addition & 0 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(self, logging_level=logging.INFO):
RelConverter.add_plugin_class(logical.DaskJoinPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskCrossJoinPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskLimitPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskOffsetPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskProjectPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskSortPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskTableScanPlugin, replace=False)
Expand Down
2 changes: 2 additions & 0 deletions dask_sql/physical/rel/logical/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .filter import DaskFilterPlugin
from .join import DaskJoinPlugin
from .limit import DaskLimitPlugin
from .offset import DaskOffsetPlugin
from .project import DaskProjectPlugin
from .sample import SamplePlugin
from .sort import DaskSortPlugin
Expand All @@ -18,6 +19,7 @@
DaskJoinPlugin,
DaskCrossJoinPlugin,
DaskLimitPlugin,
DaskOffsetPlugin,
DaskProjectPlugin,
DaskSortPlugin,
DaskTableScanPlugin,
Expand Down
81 changes: 4 additions & 77 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from typing import TYPE_CHECKING

import dask.dataframe as dd

from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex import RexConverter
from dask_sql.physical.utils.map import map_on_partition_index

if TYPE_CHECKING:
import dask_sql
Expand All @@ -25,82 +22,12 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
df = dc.df
cc = dc.column_container

offset = rel.getOffset()
if offset:
offset = RexConverter.convert(offset, df, context=context)

end = rel.getFetch()
if end:
end = RexConverter.convert(end, df, context=context)

if offset:
end += offset
limit = RexConverter.convert(rel, rel.limit().getLimitN(), df, context=context)

df = self._apply_offset(df, offset, end)
# If an offset was present it would have already been processed at this point.
# Therefore it is always safe to start at 0 when applying the limit
df = df.head(limit, npartitions=-1, compute=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion here is to still check if the first partition has enough elements and if not call head with npartitions=-1.

@charlesbluca Do you think this is worth a broader issue/discussion to see how this can be optimized?
For example in cases like this

SELECT * from really_large_dataset LIMIT 100

is going to read every single partition if npartitions=-1 before returning 100 rows which isn't ideal.


cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
return DataContainer(df, cc)

def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
"""
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition.

After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
it should be used.
We do this via generating our own dask computation graph as
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".
"""
if not offset:
# We do a (hopefully) very quick check: if the first partition
# is already enough, we will just use this
first_partition_length = len(df.partitions[0])
if first_partition_length >= end:
return df.head(end, compute=False)

# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
partition_borders = df.map_partitions(lambda x: len(x))

# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).

# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
def select_from_to(df, partition_index, partition_borders):
partition_borders = partition_borders.cumsum().to_dict()
this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
this_partition_border_right = partition_borders[partition_index]

if (end and end < this_partition_border_left) or (
offset and offset >= this_partition_border_right
):
return df.iloc[0:0]

from_index = max(offset - this_partition_border_left, 0) if offset else 0
to_index = (
min(end, this_partition_border_right)
if end
else this_partition_border_right
) - this_partition_border_left

return df.iloc[from_index:to_index]

# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return map_on_partition_index(
df, select_from_to, partition_borders, meta=df._meta
)
Loading