Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
b1900cf
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
1e48597
Updates for test_filter
jdye64 Mar 31, 2022
fd41a8c
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
682c009
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
ab69dd8
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
ce4c31e
Remove PyArrow dependency
jdye64 Apr 20, 2022
8785b8c
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
3e45ab8
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
1734b89
timestamp with loglca time zone
jdye64 Apr 21, 2022
ac7d9f6
Bump DataFusion version (#494)
andygrove Apr 21, 2022
cbf5db0
Include RelDataType work
jdye64 Apr 21, 2022
d9380a6
Include RelDataType work
jdye64 Apr 21, 2022
ad56fc2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
7b20e66
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
7dd2017
add getRowType() for logical.rs
jdye64 Apr 22, 2022
984f523
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
1405fea
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
789aaad
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
652205e
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
5127f87
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
cf568dc
removed a few print statements
jdye64 Apr 24, 2022
4fb640e
commit to share with colleague
jdye64 Apr 24, 2022
32127e5
updates
jdye64 Apr 25, 2022
f5e24fe
checkpoint
jdye64 Apr 25, 2022
11cf212
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
46dfb0a
formatting after upstream merge
jdye64 Apr 25, 2022
fa71674
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
f6e86ca
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
3d1a5ad
accept INT as INTEGER
jdye64 Apr 25, 2022
384e446
tests update
jdye64 Apr 25, 2022
199b9d2
checkpoint
jdye64 Apr 25, 2022
c9dffae
checkpoint
jdye64 Apr 27, 2022
c9aad43
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
11100fa
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
643e85d
Basic DataFusion Select Functionality (#489)
jdye64 Apr 28, 2022
b36ef16
updates for expression
jdye64 Apr 28, 2022
5c94fbc
uncommented pytests
jdye64 Apr 28, 2022
bb461c8
uncommented pytests
jdye64 Apr 28, 2022
f65b1ab
code cleanup for review
jdye64 Apr 28, 2022
dc7553f
code cleanup for review
jdye64 Apr 28, 2022
f1dc0b2
Enabled more pytest that work now
jdye64 Apr 28, 2022
940e867
Enabled more pytest that work now
jdye64 Apr 28, 2022
6769ca0
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
c4ed9bd
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
05c5788
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
a33aa63
Handle Between operation for case-when
jdye64 Apr 29, 2022
20efd5c
adjust timestamp casting
jdye64 May 2, 2022
281baf7
merge with upstream
jdye64 May 6, 2022
d666bdd
merge with upstream/datafusion-sql-planner
jdye64 May 9, 2022
533f50a
Refactor projection _column_name() logic to the _column_name logic in…
jdye64 May 9, 2022
a42a133
removed println! statements
jdye64 May 9, 2022
10cd463
merge with upstream
jdye64 May 10, 2022
a1841c3
Updates from review
jdye64 May 11, 2022
a1bf8dc
refactor String::from() to .to_string()
jdye64 May 12, 2022
fb29855
When no ELSE statement is present in CASE/WHEN statement default to None
jdye64 May 12, 2022
ef4409e
Remove println
jdye64 May 12, 2022
c1c3773
Re-enable rex test that previously could not be ran
jdye64 May 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@ dask-worker-space/
node_modules/
docs/source/_build/
dask_planner/Cargo.lock

# Ignore development specific local testing files
dev_tests
130 changes: 79 additions & 51 deletions dask_planner/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::sql::logical;
use crate::sql::types::RexType;

use pyo3::prelude::*;
use std::convert::{From, Into};
use std::convert::From;

use datafusion::error::{DataFusionError, Result};

Expand Down Expand Up @@ -64,10 +64,38 @@ impl PyExpr {
}
}

fn _column_name(&self, plan: LogicalPlan) -> Result<String> {
/// Determines the name of the `Expr` instance by examining the LogicalPlan
pub fn _column_name(&self, plan: &LogicalPlan) -> Result<String> {
let field = expr_to_field(&self.expr, &plan)?;
Ok(field.unqualified_column().name.clone())
}

fn _rex_type(&self, expr: &Expr) -> RexType {
match expr {
Expr::Alias(..) => RexType::Reference,
Expr::Column(..) => RexType::Reference,
Expr::ScalarVariable(..) => RexType::Literal,
Expr::Literal(..) => RexType::Literal,
Expr::BinaryExpr { .. } => RexType::Call,
Expr::Not(..) => RexType::Call,
Expr::IsNotNull(..) => RexType::Call,
Expr::Negative(..) => RexType::Call,
Expr::GetIndexedField { .. } => RexType::Reference,
Expr::IsNull(..) => RexType::Call,
Expr::Between { .. } => RexType::Call,
Expr::Case { .. } => RexType::Call,
Expr::Cast { .. } => RexType::Call,
Expr::TryCast { .. } => RexType::Call,
Expr::Sort { .. } => RexType::Call,
Expr::ScalarFunction { .. } => RexType::Call,
Expr::AggregateFunction { .. } => RexType::Call,
Expr::WindowFunction { .. } => RexType::Call,
Expr::AggregateUDF { .. } => RexType::Call,
Expr::InList { .. } => RexType::Call,
Expr::Wildcard => RexType::Call,
_ => RexType::Other,
}
}
}

#[pymethods]
Expand Down Expand Up @@ -147,53 +175,24 @@ impl PyExpr {

/// Determines the type of this Expr based on its variant
#[pyo3(name = "getRexType")]
pub fn rex_type(&self) -> RexType {
match &self.expr {
Expr::Alias(expr, name) => RexType::Reference,
Expr::Column(..) => RexType::Reference,
Expr::ScalarVariable(..) => RexType::Literal,
Expr::Literal(..) => RexType::Literal,
Expr::BinaryExpr { .. } => RexType::Call,
Expr::Not(..) => RexType::Call,
Expr::IsNotNull(..) => RexType::Call,
Expr::Negative(..) => RexType::Call,
Expr::GetIndexedField { .. } => RexType::Reference,
Expr::IsNull(..) => RexType::Call,
Expr::Between { .. } => RexType::Call,
Expr::Case { .. } => RexType::Call,
Expr::Cast { .. } => RexType::Call,
Expr::TryCast { .. } => RexType::Call,
Expr::Sort { .. } => RexType::Call,
Expr::ScalarFunction { .. } => RexType::Call,
Expr::AggregateFunction { .. } => RexType::Call,
Expr::WindowFunction { .. } => RexType::Call,
Expr::AggregateUDF { .. } => RexType::Call,
Expr::InList { .. } => RexType::Call,
Expr::Wildcard => RexType::Call,
_ => RexType::Other,
}
pub fn rex_type(&self) -> PyResult<RexType> {
Ok(self._rex_type(&self.expr))
}

/// Python friendly shim code to get the name of a column referenced by an expression
pub fn column_name(&self, mut plan: logical::PyLogicalPlan) -> PyResult<String> {
self._column_name(plan.current_node())
self._column_name(&plan.current_node())
.map_err(|e| py_runtime_err(e))
}

/// Gets the operands for a BinaryExpr call
#[pyo3(name = "getOperands")]
pub fn get_operands(&self) -> PyResult<Vec<PyExpr>> {
match &self.expr {
Expr::BinaryExpr { left, op: _, right } => {
let mut operands: Vec<PyExpr> = Vec::new();
let left_desc: Expr = *left.clone();
let py_left: PyExpr = PyExpr::from(left_desc, self.input_plan.clone());
operands.push(py_left);
let right_desc: Expr = *right.clone();
let py_right: PyExpr = PyExpr::from(right_desc, self.input_plan.clone());
operands.push(py_right);
Ok(operands)
}
Expr::BinaryExpr { left, right, .. } => Ok(vec![
PyExpr::from(*left.clone(), self.input_plan.clone()),
PyExpr::from(*right.clone(), self.input_plan.clone()),
]),
Expr::ScalarFunction { fun: _, args } => {
let mut operands: Vec<PyExpr> = Vec::new();
for arg in args {
Expand All @@ -203,15 +202,44 @@ impl PyExpr {
Ok(operands)
}
Expr::Cast { expr, data_type: _ } => {
Ok(vec![PyExpr::from(*expr.clone(), self.input_plan.clone())])
}
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let mut operands: Vec<PyExpr> = Vec::new();
let ex: Expr = *expr.clone();
let py_ex: PyExpr = PyExpr::from(ex, self.input_plan.clone());
operands.push(py_ex);

if let Some(e) = expr {
operands.push(PyExpr::from(*e.clone(), self.input_plan.clone()));
};

for (when, then) in when_then_expr {
operands.push(PyExpr::from(*when.clone(), self.input_plan.clone()));
operands.push(PyExpr::from(*then.clone(), self.input_plan.clone()));
}

if let Some(e) = else_expr {
operands.push(PyExpr::from(*e.clone(), self.input_plan.clone()));
};

Ok(operands)
}
_ => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"unknown Expr type encountered",
)),
Expr::Between {
expr,
negated: _,
low,
high,
} => Ok(vec![
PyExpr::from(*expr.clone(), self.input_plan.clone()),
PyExpr::from(*low.clone(), self.input_plan.clone()),
PyExpr::from(*high.clone(), self.input_plan.clone()),
]),
_ => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(format!(
"unknown Expr type {:?} encountered",
&self.expr
))),
}
}

Expand All @@ -224,13 +252,13 @@ impl PyExpr {
right: _,
} => Ok(format!("{}", op)),
Expr::ScalarFunction { fun, args: _ } => Ok(format!("{}", fun)),
Expr::Cast {
expr: _,
data_type: _,
} => Ok(String::from("cast")),
_ => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"Catch all triggered ....",
)),
Expr::Cast { .. } => Ok("cast".to_string()),
Expr::Between { .. } => Ok("between".to_string()),
Expr::Case { .. } => Ok("case".to_string()),
_ => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(format!(
"Catch all triggered for get_operator_name: {:?}",
&self.expr
))),
}
}

Expand Down
2 changes: 0 additions & 2 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::sql::table;
use crate::sql::types::rel_data_type::RelDataType;
use crate::sql::types::rel_data_type_field::RelDataTypeField;
use datafusion::logical_plan::DFField;

mod aggregate;
mod filter;
Expand All @@ -12,7 +11,6 @@ pub use datafusion_expr::LogicalPlan;

use datafusion::common::Result;
use datafusion::prelude::Column;
use pyo3::ffi::Py_FatalError;

use crate::sql::exceptions::py_type_err;
use pyo3::prelude::*;
Expand Down
1 change: 0 additions & 1 deletion dask_planner/src/sql/logical/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ impl PyJoin {

let mut join_conditions: Vec<(column::PyColumn, column::PyColumn)> = Vec::new();
for (mut lhs, mut rhs) in self.join.on.clone() {
println!("lhs: {:?} rhs: {:?}", lhs, rhs);
lhs.relation = Some(lhs_table_name.clone());
rhs.relation = Some(rhs_table_name.clone());
join_conditions.push((lhs.into(), rhs.into()));
Expand Down
61 changes: 5 additions & 56 deletions dask_planner/src/sql/logical/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ impl PyProjection {
let mut projs: Vec<PyExpr> = Vec::new();
match &local_expr.expr {
Expr::Alias(expr, _name) => {
let ex: Expr = *expr.clone();
let mut py_expr: PyExpr = PyExpr::from(ex, Some(self.projection.input.clone()));
py_expr.input_plan = local_expr.input_plan.clone();
let py_expr: PyExpr =
PyExpr::from(*expr.clone(), Some(self.projection.input.clone()));
projs.extend_from_slice(self.projected_expressions(&py_expr).as_slice());
}
_ => projs.push(local_expr.clone()),
Expand All @@ -30,66 +29,16 @@ impl PyProjection {

#[pymethods]
impl PyProjection {
#[pyo3(name = "getColumnName")]
fn column_name(&mut self, expr: PyExpr) -> PyResult<String> {
let mut val: String = String::from("OK");
match expr.expr {
Expr::Alias(expr, _alias) => match expr.as_ref() {
Expr::Column(col) => {
let index = self.projection.input.schema().index_of_column(col).unwrap();
match self.projection.input.as_ref() {
LogicalPlan::Aggregate(agg) => {
let mut exprs = agg.group_expr.clone();
exprs.extend_from_slice(&agg.aggr_expr);
match &exprs[index] {
Expr::AggregateFunction { args, .. } => match &args[0] {
Expr::Column(col) => {
println!("AGGREGATE COLUMN IS {}", col.name);
val = col.name.clone();
}
_ => unimplemented!("projection.rs column_name is unimplemented for Expr variant: {:?}", &args[0]),
},
_ => unimplemented!("projection.rs column_name is unimplemented for Expr variant: {:?}", &exprs[index]),
}
}
LogicalPlan::TableScan(table_scan) => val = table_scan.table_name.clone(),
_ => unimplemented!("projection.rs column_name is unimplemented for LogicalPlan variant: {:?}", self.projection.input),
}
}
Expr::Cast { expr, data_type: _ } => {
let ex_type: Expr = *expr.clone();
let py_type: PyExpr =
PyExpr::from(ex_type, Some(self.projection.input.clone()));
val = self.column_name(py_type).unwrap();
println!("Setting col name to: {:?}", val);
}
_ => panic!("not supported: {:?}", expr),
},
Expr::Column(col) => val = col.name.clone(),
Expr::Cast { expr, data_type: _ } => {
let ex_type: Expr = *expr;
let py_type: PyExpr = PyExpr::from(ex_type, Some(self.projection.input.clone()));
val = self.column_name(py_type).unwrap()
}
_ => {
panic!(
"column_name is unimplemented for Expr variant: {:?}",
expr.expr
);
}
}
Ok(val)
}

#[pyo3(name = "getNamedProjects")]
fn named_projects(&mut self) -> PyResult<Vec<(String, PyExpr)>> {
let mut named: Vec<(String, PyExpr)> = Vec::new();
for expression in self.projection.expr.clone() {
let mut py_expr: PyExpr = PyExpr::from(expression, Some(self.projection.input.clone()));
py_expr.input_plan = Some(self.projection.input.clone());
for expr in self.projected_expressions(&py_expr) {
let name: String = self.column_name(expr.clone()).unwrap();
named.push((name, expr.clone()));
if let Ok(name) = expr._column_name(&*self.projection.input) {
named.push((name, expr.clone()));
}
}
}
Ok(named)
Expand Down
1 change: 0 additions & 1 deletion dask_planner/src/sql/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ impl DaskTable {
qualified_name.push(table_scan.table_name);
}
_ => {
println!("Nothing matches");
qualified_name.push(self.name.clone());
}
}
Expand Down
5 changes: 1 addition & 4 deletions dask_planner/src/sql/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ impl DaskTypeMap {
#[new]
#[args(sql_type, py_kwargs = "**")]
fn new(sql_type: SqlTypeName, py_kwargs: Option<&PyDict>) -> Self {
// println!("sql_type={:?} - py_kwargs={:?}", sql_type, py_kwargs);

let d_type: DataType = match sql_type {
SqlTypeName::TIMESTAMP_WITH_LOCAL_TIME_ZONE => {
let (unit, tz) = match py_kwargs {
Expand Down Expand Up @@ -206,8 +204,7 @@ impl SqlTypeName {
SqlTypeName::DATE => DataType::Date64,
SqlTypeName::VARCHAR => DataType::Utf8,
_ => {
println!("Type: {:?}", self);
todo!();
todo!("Type: {:?}", self);
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

if TYPE_CHECKING:
import dask_sql
from dask_sql.java import org
from dask_planner.rust import LogicalPlan


class DaskLimitPlugin(BaseRelPlugin):
Expand All @@ -18,11 +18,9 @@ class DaskLimitPlugin(BaseRelPlugin):
(LIMIT).
"""

class_name = "com.dask.sql.nodes.DaskLimit"
class_name = "Limit"

def convert(
self, rel: "org.apache.calcite.rel.RelNode", context: "dask_sql.Context"
) -> DataContainer:
def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
(dc,) = self.assert_inputs(rel, 1, context)
df = dc.df
cc = dc.column_container
Expand Down
9 changes: 9 additions & 0 deletions dask_sql/physical/rex/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
logger = logging.getLogger(__name__)


# _REX_TYPE_TO_PLUGIN = {
# "Alias": "InputRef",
# "Column": "InputRef",
# "BinaryExpr": "RexCall",
# "Literal": "RexLiteral",
# "ScalarFunction": "RexCall",
# "Cast": "RexCall",
# }

_REX_TYPE_TO_PLUGIN = {
"RexType.Reference": "InputRef",
"RexType.Call": "RexCall",
Expand Down
Loading