Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
b1900cf
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
1e48597
Updates for test_filter
jdye64 Mar 31, 2022
fd41a8c
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
682c009
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
ab69dd8
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
ce4c31e
Remove PyArrow dependency
jdye64 Apr 20, 2022
8785b8c
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
3e45ab8
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
1734b89
timestamp with loglca time zone
jdye64 Apr 21, 2022
ac7d9f6
Bump DataFusion version (#494)
andygrove Apr 21, 2022
cbf5db0
Include RelDataType work
jdye64 Apr 21, 2022
d9380a6
Include RelDataType work
jdye64 Apr 21, 2022
ad56fc2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
7b20e66
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
7dd2017
add getRowType() for logical.rs
jdye64 Apr 22, 2022
984f523
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
1405fea
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
789aaad
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
652205e
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
5127f87
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
cf568dc
removed a few print statements
jdye64 Apr 24, 2022
4fb640e
commit to share with colleague
jdye64 Apr 24, 2022
32127e5
updates
jdye64 Apr 25, 2022
f5e24fe
checkpoint
jdye64 Apr 25, 2022
11cf212
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
46dfb0a
formatting after upstream merge
jdye64 Apr 25, 2022
fa71674
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
f6e86ca
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
3d1a5ad
accept INT as INTEGER
jdye64 Apr 25, 2022
384e446
tests update
jdye64 Apr 25, 2022
199b9d2
checkpoint
jdye64 Apr 25, 2022
c9dffae
checkpoint
jdye64 Apr 27, 2022
c9aad43
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
11100fa
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
643e85d
Basic DataFusion Select Functionality (#489)
jdye64 Apr 28, 2022
b36ef16
updates for expression
jdye64 Apr 28, 2022
5c94fbc
uncommented pytests
jdye64 Apr 28, 2022
bb461c8
uncommented pytests
jdye64 Apr 28, 2022
f65b1ab
code cleanup for review
jdye64 Apr 28, 2022
dc7553f
code cleanup for review
jdye64 Apr 28, 2022
f1dc0b2
Enabled more pytest that work now
jdye64 Apr 28, 2022
940e867
Enabled more pytest that work now
jdye64 Apr 28, 2022
6769ca0
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
c4ed9bd
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
05c5788
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
a33aa63
Handle Between operation for case-when
jdye64 Apr 29, 2022
20efd5c
adjust timestamp casting
jdye64 May 2, 2022
281baf7
merge with upstream
jdye64 May 6, 2022
d666bdd
merge with upstream/datafusion-sql-planner
jdye64 May 9, 2022
533f50a
Refactor projection _column_name() logic to the _column_name logic in…
jdye64 May 9, 2022
a42a133
removed println! statements
jdye64 May 9, 2022
10cd463
merge with upstream
jdye64 May 10, 2022
a1841c3
Updates from review
jdye64 May 11, 2022
a1bf8dc
refactor String::from() to .to_string()
jdye64 May 12, 2022
81cd046
Fix mappings
jdye64 May 12, 2022
2c0bda1
Add cross_join.py and cross_join.rs
jdye64 May 12, 2022
c3d9b0d
merge with upstream/datafusion-sql-planner
jdye64 May 12, 2022
a514782
Add pytest for cross_join
jdye64 May 12, 2022
406f9ee
Address review comments
jdye64 May 16, 2022
7c57efb
Resolve upstream merge issues, will require a force push due to previ…
jdye64 May 16, 2022
fde54da
Fix module import issue where typo was introduced
jdye64 May 17, 2022
5a757a5
manually supply test fixtures
jdye64 May 19, 2022
e7697dc
Remove request from test method signature
jdye64 May 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::sql::types::rel_data_type::RelDataType;
use crate::sql::types::rel_data_type_field::RelDataTypeField;

mod aggregate;
mod cross_join;
mod explain;
mod filter;
mod join;
Expand Down Expand Up @@ -63,6 +64,11 @@ impl PyLogicalPlan {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::CrossJoin as PyCrossJoin
pub fn cross_join(&self) -> PyResult<cross_join::PyCrossJoin> {
to_py_plan(self.current_node.as_ref())
}

/// LogicalPlan::Explain as PyExplain
pub fn explain(&self) -> PyResult<explain::PyExplain> {
to_py_plan(self.current_node.as_ref())
Expand All @@ -85,12 +91,7 @@ impl PyLogicalPlan {

/// LogicalPlan::Sort as PySort
pub fn sort(&self) -> PyResult<sort::PySort> {
self.current_node
.as_ref()
.map(|plan| plan.clone().into())
.ok_or(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"current_node was None",
))
to_py_plan(self.current_node.as_ref())
}

/// Gets the "input" for the current LogicalPlan
Expand Down
20 changes: 20 additions & 0 deletions dask_planner/src/sql/logical/cross_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use datafusion::logical_plan::{CrossJoin, LogicalPlan};

use pyo3::prelude::*;

#[pyclass(name = "CrossJoin", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyCrossJoin {
cross_join: CrossJoin,
}

impl From<LogicalPlan> for PyCrossJoin {
fn from(logical_plan: LogicalPlan) -> PyCrossJoin {
match logical_plan {
LogicalPlan::CrossJoin(cross_join) => PyCrossJoin {
cross_join: cross_join,
},
_ => panic!("something went wrong here"),
}
}
}
4 changes: 2 additions & 2 deletions dask_planner/src/sql/logical/join.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::sql::column;

use datafusion::logical_expr::logical_plan::Join;
pub use datafusion::logical_expr::{logical_plan::JoinType, LogicalPlan};
use datafusion::logical_plan::{JoinType, LogicalPlan};

use pyo3::prelude::*;

Expand Down Expand Up @@ -51,7 +51,7 @@ impl PyJoin {
impl From<LogicalPlan> for PyJoin {
fn from(logical_plan: LogicalPlan) -> PyJoin {
match logical_plan {
LogicalPlan::Join(join) => PyJoin { join },
LogicalPlan::Join(join) => PyJoin { join: join },
_ => panic!("something went wrong here"),
}
}
Expand Down
1 change: 1 addition & 0 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(self, logging_level=logging.INFO):
RelConverter.add_plugin_class(logical.DaskAggregatePlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskFilterPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskJoinPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskCrossJoinPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskLimitPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskProjectPlugin, replace=False)
RelConverter.add_plugin_class(logical.DaskSortPlugin, replace=False)
Expand Down
4 changes: 3 additions & 1 deletion dask_sql/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
}

if FLOAT_NAN_IMPLEMENTED: # pragma: no cover
_PYTHON_TO_SQL.update({pd.Float32Dtype(): "FLOAT", pd.Float64Dtype(): "FLOAT"})
_PYTHON_TO_SQL.update(
{pd.Float32Dtype(): SqlTypeName.FLOAT, pd.Float64Dtype(): SqlTypeName.DOUBLE}
)

# Default mapping between SQL types and python types
# for values
Expand Down
2 changes: 2 additions & 0 deletions dask_sql/physical/rel/logical/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .aggregate import DaskAggregatePlugin
from .cross_join import DaskCrossJoinPlugin
from .explain import ExplainPlugin
from .filter import DaskFilterPlugin
from .join import DaskJoinPlugin
Expand All @@ -15,6 +16,7 @@
DaskAggregatePlugin,
DaskFilterPlugin,
DaskJoinPlugin,
DaskCrossJoinPlugin,
DaskLimitPlugin,
DaskProjectPlugin,
DaskSortPlugin,
Expand Down
41 changes: 41 additions & 0 deletions dask_sql/physical/rel/logical/cross_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
from typing import TYPE_CHECKING

import dask.dataframe as dd

import dask_sql.utils as utils
from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

if TYPE_CHECKING:
import dask_sql
from dask_planner.rust import LogicalPlan

logger = logging.getLogger(__name__)


class DaskCrossJoinPlugin(BaseRelPlugin):
"""
While similar to `DaskJoinPlugin` a `CrossJoin` has enough of a differing
structure to justify its own plugin. This in turn limits the number of
Dask tasks that are generated for `CrossJoin`'s when compared to a
standard `Join`
"""

class_name = "CrossJoin"

def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
# We now have two inputs (from left and right), so we fetch them both
dc_lhs, dc_rhs = self.assert_inputs(rel, 2, context)

df_lhs = dc_lhs.df
df_rhs = dc_rhs.df

# Create a 'key' column in both DataFrames to join on
cross_join_key = utils.new_temporary_column(df_lhs)
df_lhs[cross_join_key] = 1
df_rhs[cross_join_key] = 1

result = dd.merge(df_lhs, df_rhs, on=cross_join_key).drop(cross_join_key, 1)

return DataContainer(result, ColumnContainer(result.columns))
7 changes: 7 additions & 0 deletions tests/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def df():
)


@pytest.fixture()
def department_table():
return pd.DataFrame({"department_name": ["English", "Math", "Science"]})


@pytest.fixture()
def user_table_1():
return pd.DataFrame({"user_id": [2, 1, 2, 3], "b": [3, 3, 1, 3]})
Expand Down Expand Up @@ -159,6 +164,7 @@ def c(
df_simple,
df_wide,
df,
department_table,
user_table_1,
user_table_2,
long_table,
Expand All @@ -177,6 +183,7 @@ def c(
"df_simple": df_simple,
"df_wide": df_wide,
"df": df,
"department_table": department_table,
"user_table_1": user_table_1,
"user_table_2": user_table_2,
"long_table": long_table,
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ def test_join_right(c):
assert_eq(return_df, expected_df, check_index=False)


def test_join_cross(c, user_table_1, department_table):
return_df = c.sql(
"""
SELECT user_id, b, department_name
FROM user_table_1, department_table
"""
)

user_table_1["key"] = 1
department_table["key"] = 1

expected_df = dd.merge(user_table_1, department_table, on="key").drop("key", 1)

assert_eq(return_df, expected_df, check_index=False)


@pytest.mark.skip(reason="WIP DataFusion")
def test_join_complex(c):
return_df = c.sql(
Expand Down