-
Notifications
You must be signed in to change notification settings - Fork 72
Support for LIMIT clause with DataFusion #529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 63 commits
b1900cf
1e48597
fd41a8c
682c009
ab69dd8
ce4c31e
8785b8c
3e45ab8
1734b89
ac7d9f6
cbf5db0
d9380a6
ad56fc2
7b20e66
7dd2017
984f523
1405fea
789aaad
652205e
5127f87
cf568dc
4fb640e
32127e5
f5e24fe
11cf212
46dfb0a
fa71674
f6e86ca
3d1a5ad
384e446
199b9d2
c9dffae
c9aad43
11100fa
643e85d
b36ef16
5c94fbc
bb461c8
f65b1ab
dc7553f
f1dc0b2
940e867
6769ca0
c4ed9bd
05c5788
a33aa63
20efd5c
281baf7
d666bdd
533f50a
a42a133
10cd463
a1841c3
3001943
7ec66da
b72917b
651c9ab
4e69813
3219ad0
5a88155
bd94ccf
bf91e8f
3dc6a89
08b38aa
7b52f41
e129068
5e0de03
2d11de5
c993377
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| use crate::expression::PyExpr; | ||
|
|
||
| use datafusion::scalar::ScalarValue; | ||
| use pyo3::prelude::*; | ||
|
|
||
| use datafusion::logical_expr::{logical_plan::Limit, Expr, LogicalPlan}; | ||
|
|
||
| #[pyclass(name = "Limit", module = "dask_planner", subclass)] | ||
| #[derive(Clone)] | ||
| pub struct PyLimit { | ||
| limit: Limit, | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl PyLimit { | ||
| #[pyo3(name = "getLimitN")] | ||
| pub fn limit_n(&self) -> PyResult<PyExpr> { | ||
| Ok(PyExpr::from( | ||
| Expr::Literal(ScalarValue::UInt64(Some(self.limit.n.try_into().unwrap()))), | ||
| Some(self.limit.input.clone()), | ||
| )) | ||
| } | ||
| } | ||
|
|
||
| impl From<LogicalPlan> for PyLimit { | ||
| fn from(logical_plan: LogicalPlan) -> PyLimit { | ||
| match logical_plan { | ||
| LogicalPlan::Limit(limit) => PyLimit { limit: limit }, | ||
| _ => panic!("something went wrong here!!!????"), | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| use crate::expression::PyExpr; | ||
|
|
||
| use datafusion::scalar::ScalarValue; | ||
| use pyo3::prelude::*; | ||
|
|
||
| use datafusion::logical_expr::{logical_plan::Offset, Expr, LogicalPlan}; | ||
|
|
||
| #[pyclass(name = "Offset", module = "dask_planner", subclass)] | ||
| #[derive(Clone)] | ||
| pub struct PyOffset { | ||
| offset: Offset, | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl PyOffset { | ||
| #[pyo3(name = "getOffset")] | ||
| pub fn offset(&self) -> PyResult<PyExpr> { | ||
| // TODO: Waiting on DataFusion issue: https://github.com/apache/arrow-datafusion/issues/2377 | ||
jdye64 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(PyExpr::from( | ||
| Expr::Literal(ScalarValue::UInt64(Some(self.offset.offset as u64))), | ||
| Some(self.offset.input.clone()), | ||
| )) | ||
| } | ||
|
|
||
| #[pyo3(name = "getFetch")] | ||
| pub fn offset_fetch(&self) -> PyResult<PyExpr> { | ||
| // TODO: Still need to implement fetch size! For now get everything from offset on with '0' | ||
| Ok(PyExpr::from( | ||
| Expr::Literal(ScalarValue::UInt64(Some(0))), | ||
| Some(self.offset.input.clone()), | ||
| )) | ||
| } | ||
| } | ||
|
|
||
| impl From<LogicalPlan> for PyOffset { | ||
| fn from(logical_plan: LogicalPlan) -> PyOffset { | ||
| match logical_plan { | ||
| LogicalPlan::Offset(offset) => PyOffset { offset: offset }, | ||
| _ => panic!("Issue #501"), | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,8 @@ | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| import dask.dataframe as dd | ||
|
|
||
| from dask_sql.datacontainer import DataContainer | ||
| from dask_sql.physical.rel.base import BaseRelPlugin | ||
| from dask_sql.physical.rex import RexConverter | ||
| from dask_sql.physical.utils.map import map_on_partition_index | ||
|
|
||
| if TYPE_CHECKING: | ||
| import dask_sql | ||
|
|
@@ -25,82 +22,12 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai | |
| df = dc.df | ||
| cc = dc.column_container | ||
|
|
||
| offset = rel.getOffset() | ||
| if offset: | ||
| offset = RexConverter.convert(offset, df, context=context) | ||
|
|
||
| end = rel.getFetch() | ||
| if end: | ||
| end = RexConverter.convert(end, df, context=context) | ||
|
|
||
| if offset: | ||
| end += offset | ||
| limit = RexConverter.convert(rel, rel.limit().getLimitN(), df, context=context) | ||
|
|
||
| df = self._apply_offset(df, offset, end) | ||
| # If an offset was present it would have already been processed at this point. | ||
| # Therefore it is always safe to start at 0 when applying the limit | ||
| df = df.head(limit, npartitions=-1, compute=False) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My personal opinion here is to still check if the first partition has enough elements and if not call head with @charlesbluca Do you think this is worth a broader issue/discussion to see how this can be optimized? SELECT * from really_large_dataset LIMIT 100is going to read every single partition if |
||
|
|
||
| cc = self.fix_column_to_row_type(cc, rel.getRowType()) | ||
| # No column type has changed, so no need to cast again | ||
| return DataContainer(df, cc) | ||
|
|
||
| def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame: | ||
| """ | ||
| Limit the dataframe to the window [offset, end]. | ||
| That is unfortunately, not so simple as we do not know how many | ||
| items we have in each partition. We have therefore no other way than to | ||
| calculate (!!!) the sizes of each partition. | ||
|
|
||
| After that, we can create a new dataframe from the old | ||
| dataframe by calculating for each partition if and how much | ||
| it should be used. | ||
| We do this via generating our own dask computation graph as | ||
| we need to pass the partition number to the selection | ||
| function, which is not possible with normal "map_partitions". | ||
| """ | ||
| if not offset: | ||
| # We do a (hopefully) very quick check: if the first partition | ||
| # is already enough, we will just use this | ||
| first_partition_length = len(df.partitions[0]) | ||
| if first_partition_length >= end: | ||
| return df.head(end, compute=False) | ||
|
|
||
| # First, we need to find out which partitions we want to use. | ||
| # Therefore we count the total number of entries | ||
| partition_borders = df.map_partitions(lambda x: len(x)) | ||
|
|
||
| # Now we let each of the partitions figure out, how much it needs to return | ||
| # using these partition borders | ||
| # For this, we generate out own dask computation graph (as it does not really | ||
| # fit well with one of the already present methods). | ||
|
|
||
| # (a) we define a method to be calculated on each partition | ||
| # This method returns the part of the partition, which falls between [offset, fetch] | ||
| # Please note that the dask object "partition_borders", will be turned into | ||
| # its pandas representation at this point and we can calculate the cumsum | ||
| # (which is not possible on the dask object). Recalculating it should not cost | ||
| # us much, as we assume the number of partitions is rather small. | ||
| def select_from_to(df, partition_index, partition_borders): | ||
| partition_borders = partition_borders.cumsum().to_dict() | ||
| this_partition_border_left = ( | ||
| partition_borders[partition_index - 1] if partition_index > 0 else 0 | ||
| ) | ||
| this_partition_border_right = partition_borders[partition_index] | ||
|
|
||
| if (end and end < this_partition_border_left) or ( | ||
| offset and offset >= this_partition_border_right | ||
| ): | ||
| return df.iloc[0:0] | ||
|
|
||
| from_index = max(offset - this_partition_border_left, 0) if offset else 0 | ||
| to_index = ( | ||
| min(end, this_partition_border_right) | ||
| if end | ||
| else this_partition_border_right | ||
| ) - this_partition_border_left | ||
|
|
||
| return df.iloc[from_index:to_index] | ||
|
|
||
| # (b) Now we just need to apply the function on every partition | ||
| # We do this via the delayed interface, which seems the easiest one. | ||
| return map_on_partition_index( | ||
| df, select_from_to, partition_borders, meta=df._meta | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.