Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
19a631d
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
4524eaa
Updates for test_filter
jdye64 Mar 31, 2022
5d7d7b3
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
caf079d
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
47e58cd
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
1248cf7
Remove PyArrow dependency
jdye64 Apr 20, 2022
9f63722
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
91e5738
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
e0c9c7e
timestamp with loglca time zone
jdye64 Apr 21, 2022
6f72654
Include RelDataType work
jdye64 Apr 21, 2022
6ab3ad2
Include RelDataType work
jdye64 Apr 21, 2022
fd17ec2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
964a203
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
8293675
add getRowType() for logical.rs
jdye64 Apr 22, 2022
8ae1c3e
Merge branch 'datafusion-sql-planner' into datafusion-select
jdye64 Apr 22, 2022
37b36f1
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
11f2cb4
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
fd3f493
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
31e20e9
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
e89152e
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
759c652
removed a few print statements
jdye64 Apr 24, 2022
76dfc43
commit to share with colleague
jdye64 Apr 24, 2022
b553bc5
updates
jdye64 Apr 25, 2022
0ea254f
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
43e171b
Merge branch 'datafusion-select' into datafusion-cast
jdye64 Apr 25, 2022
0b9558d
formatting after upstream merge
jdye64 Apr 25, 2022
42bf976
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
da8583e
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
7db0d3c
accept INT as INTEGER
jdye64 Apr 25, 2022
5aebcf3
Merge branch 'datafusion-select' into datafusion-cast
jdye64 Apr 25, 2022
e6ff411
checkpoint
jdye64 Apr 27, 2022
d6f890e
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
4d2cb6d
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
9dfec09
Merge with upstream/datafusion-sql-planner
jdye64 Apr 28, 2022
c475c25
uncommented pytests
jdye64 Apr 28, 2022
56367bb
uncommented pytests
jdye64 Apr 28, 2022
60a09d6
code cleanup for review
jdye64 Apr 28, 2022
3b8016e
code cleanup for review
jdye64 Apr 28, 2022
1be9e54
Enabled more pytest that work now
jdye64 Apr 28, 2022
32d2c8b
Enabled more pytest that work now
jdye64 Apr 28, 2022
e757237
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
6777b13
Mark just the GPU tests as skipped
jdye64 Apr 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dask_planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ rust-version = "1.59"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
rand = "0.7"
pyo3 = { version = "0.16", features = ["extension-module", "abi3", "abi3-py38"] }
datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "23f1c77569d1f3b0ff42ade56f9b2afb53d44292" }
datafusion-expr = { git="https://github.com/apache/arrow-datafusion/", rev = "23f1c77569d1f3b0ff42ade56f9b2afb53d44292" }
datafusion = { git="https://github.com/apache/arrow-datafusion/", rev = "ef49d2858c2aba1ea7cd5fed3b1e5feb77fc2233" }
datafusion-expr = { git="https://github.com/apache/arrow-datafusion/", rev = "ef49d2858c2aba1ea7cd5fed3b1e5feb77fc2233" }
uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", default-features = false }
sqlparser = "0.14.0"
Expand Down
38 changes: 17 additions & 21 deletions dask_planner/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::convert::{From, Into};
use datafusion::error::DataFusionError;

use datafusion::arrow::datatypes::DataType;
use datafusion_expr::{col, lit, BuiltinScalarFunction, Expr};
use datafusion_expr::{lit, BuiltinScalarFunction, Expr};

use datafusion::scalar::ScalarValue;

Expand All @@ -31,15 +31,6 @@ impl From<PyExpr> for Expr {
}
}

impl From<Expr> for PyExpr {
fn from(expr: Expr) -> PyExpr {
PyExpr {
input_plan: None,
expr: expr,
}
}
}

#[pyclass(name = "ScalarValue", module = "datafusion", subclass)]
#[derive(Debug, Clone)]
pub struct PyScalarValue {
Expand Down Expand Up @@ -70,13 +61,9 @@ impl PyExpr {
}
}

fn _column_name(&self, mut plan: LogicalPlan) -> String {
fn _column_name(&self, plan: LogicalPlan) -> String {
match &self.expr {
Expr::Alias(expr, name) => {
println!("Alias encountered with name: {:?}", name);
// let reference: Expr = *expr.as_ref();
// let plan: logical::PyLogicalPlan = reference.input().clone().into();

// Only certain LogicalPlan variants are valid in this nested Alias scenario so we
// extract the valid ones and error on the invalid ones
match expr.as_ref() {
Expand Down Expand Up @@ -160,7 +147,7 @@ impl PyExpr {
impl PyExpr {
#[staticmethod]
pub fn literal(value: PyScalarValue) -> PyExpr {
lit(value.scalar_value).into()
PyExpr::from(lit(value.scalar_value), None)
}

/// If this Expression instances references an existing
Expand All @@ -173,6 +160,11 @@ impl PyExpr {
}
}

#[pyo3(name = "toString")]
pub fn to_string(&self) -> PyResult<String> {
Ok(format!("{}", &self.expr))
}

/// Gets the positional index of the Expr instance from the LogicalPlan DFSchema
#[pyo3(name = "getIndex")]
pub fn index(&self) -> PyResult<usize> {
Expand Down Expand Up @@ -230,7 +222,7 @@ impl PyExpr {
#[pyo3(name = "getRexType")]
pub fn rex_type(&self) -> RexType {
match &self.expr {
Expr::Alias(..) => RexType::Reference,
Expr::Alias(expr, name) => RexType::Reference,
Expr::Column(..) => RexType::Reference,
Expr::ScalarVariable(..) => RexType::Literal,
Expr::Literal(..) => RexType::Literal,
Expand Down Expand Up @@ -267,22 +259,26 @@ impl PyExpr {
Expr::BinaryExpr { left, op: _, right } => {
let mut operands: Vec<PyExpr> = Vec::new();
let left_desc: Expr = *left.clone();
operands.push(left_desc.into());
let py_left: PyExpr = PyExpr::from(left_desc, self.input_plan.clone());
operands.push(py_left);
let right_desc: Expr = *right.clone();
operands.push(right_desc.into());
let py_right: PyExpr = PyExpr::from(right_desc, self.input_plan.clone());
operands.push(py_right);
Ok(operands)
}
Expr::ScalarFunction { fun: _, args } => {
let mut operands: Vec<PyExpr> = Vec::new();
for arg in args {
operands.push(arg.clone().into());
let py_arg: PyExpr = PyExpr::from(arg.clone(), self.input_plan.clone());
operands.push(py_arg);
}
Ok(operands)
}
Expr::Cast { expr, data_type: _ } => {
let mut operands: Vec<PyExpr> = Vec::new();
let ex: Expr = *expr.clone();
operands.push(ex.into());
let py_ex: PyExpr = PyExpr::from(ex, self.input_plan.clone());
operands.push(py_ex);
Ok(operands)
}
_ => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
Expand Down
1 change: 0 additions & 1 deletion dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ impl DaskSQLContext {
statement: statement::PyStatement,
) -> PyResult<logical::PyLogicalPlan> {
let planner = SqlToRel::new(self);

match planner.statement_to_plan(statement.statement) {
Ok(k) => {
println!("\nLogicalPlan: {:?}\n\n", k);
Expand Down
1 change: 1 addition & 0 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl PyLogicalPlan {
LogicalPlan::Explain(_explain) => "Explain",
LogicalPlan::Analyze(_analyze) => "Analyze",
LogicalPlan::Extension(_extension) => "Extension",
LogicalPlan::Subquery(_sub_query) => "Subquery",
LogicalPlan::SubqueryAlias(_sqalias) => "SubqueryAlias",
LogicalPlan::CreateCatalogSchema(_create) => "CreateCatalogSchema",
LogicalPlan::CreateCatalog(_create_catalog) => "CreateCatalog",
Expand Down
52 changes: 36 additions & 16 deletions dask_planner/src/sql/logical/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ pub struct PyProjection {
pub(crate) projection: Projection,
}

impl PyProjection {
/// Projection: Gets the names of the fields that should be projected
fn projected_expressions(&mut self, local_expr: &PyExpr) -> Vec<PyExpr> {
let mut projs: Vec<PyExpr> = Vec::new();
match &local_expr.expr {
Expr::Alias(expr, _name) => {
let ex: Expr = *expr.clone();
let mut py_expr: PyExpr = PyExpr::from(ex, Some(self.projection.input.clone()));
py_expr.input_plan = local_expr.input_plan.clone();
projs.extend_from_slice(self.projected_expressions(&py_expr).as_slice());
}
_ => projs.push(local_expr.clone()),
}
projs
}
}

#[pymethods]
impl PyProjection {
#[pyo3(name = "getColumnName")]
Expand Down Expand Up @@ -39,9 +56,21 @@ impl PyProjection {
_ => unimplemented!("projection.rs column_name is unimplemented for LogicalPlan variant: {:?}", self.projection.input),
}
}
Expr::Cast { expr, data_type: _ } => {
let ex_type: Expr = *expr.clone();
let py_type: PyExpr =
PyExpr::from(ex_type, Some(self.projection.input.clone()));
val = self.column_name(py_type).unwrap();
println!("Setting col name to: {:?}", val);
}
_ => panic!("not supported: {:?}", expr),
},
Expr::Column(col) => val = col.name.clone(),
Expr::Cast { expr, data_type: _ } => {
let ex_type: Expr = *expr;
let py_type: PyExpr = PyExpr::from(ex_type, Some(self.projection.input.clone()));
val = self.column_name(py_type).unwrap()
}
_ => {
panic!(
"column_name is unimplemented for Expr variant: {:?}",
Expand All @@ -52,25 +81,16 @@ impl PyProjection {
Ok(val)
}

/// Projection: Gets the names of the fields that should be projected
#[pyo3(name = "getProjectedExpressions")]
fn projected_expressions(&mut self) -> PyResult<Vec<PyExpr>> {
let mut projs: Vec<PyExpr> = Vec::new();
for expr in &self.projection.expr {
projs.push(PyExpr::from(
expr.clone(),
Some(self.projection.input.clone()),
));
}
Ok(projs)
}

#[pyo3(name = "getNamedProjects")]
fn named_projects(&mut self) -> PyResult<Vec<(String, PyExpr)>> {
let mut named: Vec<(String, PyExpr)> = Vec::new();
for expr in &self.projected_expressions().unwrap() {
let name: String = self.column_name(expr.clone()).unwrap();
named.push((name, expr.clone()));
for expression in self.projection.expr.clone() {
let mut py_expr: PyExpr = PyExpr::from(expression, Some(self.projection.input.clone()));
py_expr.input_plan = Some(self.projection.input.clone());
for expr in self.projected_expressions(&py_expr) {
let name: String = self.column_name(expr.clone()).unwrap();
named.push((name, expr.clone()));
}
}
Ok(named)
}
Expand Down
1 change: 0 additions & 1 deletion dask_planner/src/sql/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod rel_data_type;
pub mod rel_data_type_field;

use pyo3::prelude::*;
use pyo3::types::PyAny;
use pyo3::types::PyDict;

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
Expand Down
3 changes: 0 additions & 3 deletions dask_sql/physical/rel/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ def fix_dtype_to_row_type(dc: DataContainer, row_type: "RelDataType"):
expected_type = sql_to_python_type(field_type.getSqlType())
df_field_name = cc.get_backend_by_frontend_name(field_name)

logger.debug(
f"Before cast df_field_name: {df_field_name}, expected_type: {expected_type}"
)
df = cast_column_type(df, df_field_name, expected_type)

return DataContainer(df, dc.column_container)
2 changes: 1 addition & 1 deletion dask_sql/physical/rel/logical/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
else:
random_name = new_temporary_column(df)
new_columns[random_name] = RexConverter.convert(
expr, dc, context=context
rel, expr, dc, context=context
)
logger.debug(f"Adding a new column {key} out of {expr}")
new_mappings[key] = random_name
Expand Down
13 changes: 5 additions & 8 deletions dask_sql/physical/rex/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@


_REX_TYPE_TO_PLUGIN = {
"Alias": "InputRef",
"Column": "InputRef",
"BinaryExpr": "RexCall",
"Literal": "RexLiteral",
"ScalarFunction": "RexCall",
"Cast": "RexCall",
"RexType.Reference": "InputRef",
"RexType.Call": "RexCall",
"RexType.Literal": "RexLiteral",
}


Expand Down Expand Up @@ -55,12 +52,12 @@ def convert(
context: "dask_sql.Context",
) -> Union[dd.DataFrame, Any]:
"""
Convert the given rel (java instance)
Convert the given Expression
into a python expression (a dask dataframe)
using the stored plugins and the dictionary of
registered dask tables.
"""
expr_type = _REX_TYPE_TO_PLUGIN[rex.getExprType()]
expr_type = _REX_TYPE_TO_PLUGIN[str(rex.getRexType())]

try:
plugin_instance = cls.get_plugin(expr_type)
Expand Down
4 changes: 1 addition & 3 deletions dask_sql/physical/rex/core/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ def cast(self, operand, rex=None) -> SeriesOrScalar:
return operand

output_type = str(rex.getType())
python_type = sql_to_python_type(
output_type=sql_to_python_type(output_type.upper())
)
python_type = sql_to_python_type(SqlTypeName.fromString(output_type.upper()))

return_column = cast_column_to_type(operand, python_type)

Expand Down
8 changes: 5 additions & 3 deletions dask_sql/physical/rex/core/input_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ class RexInputRefPlugin(BaseRexPlugin):
def convert(
self,
rel: "LogicalPlan",
expr: "Expression",
rex: "Expression",
dc: DataContainer,
context: "dask_sql.Context",
) -> dd.Series:
df = dc.df
cc = dc.column_container

# The column is references by index
column_name = str(expr.column_name(rel))
return df[column_name]
index = rex.getIndex()
backend_column_name = cc.get_backend_by_frontend_index(index)
return df[backend_column_name]
15 changes: 8 additions & 7 deletions tests/integration/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def test_filter(c, df):
assert_eq(return_df, expected_df)


@pytest.mark.skip(reason="WIP DataFusion")
def test_filter_scalar(c, df):
return_df = c.sql("SELECT * FROM df WHERE True")

Expand All @@ -37,7 +36,6 @@ def test_filter_scalar(c, df):
assert_eq(return_df, expected_df, check_index_type=False)


@pytest.mark.skip(reason="WIP DataFusion")
def test_filter_complicated(c, df):
return_df = c.sql("SELECT * FROM df WHERE a < 3 AND (b > 1 AND b < 3)")

Expand All @@ -48,7 +46,6 @@ def test_filter_complicated(c, df):
)


@pytest.mark.skip(reason="WIP DataFusion")
def test_filter_with_nan(c):
return_df = c.sql("SELECT * FROM user_table_nan WHERE c = 3")

Expand All @@ -62,7 +59,6 @@ def test_filter_with_nan(c):
)


@pytest.mark.skip(reason="WIP DataFusion")
def test_string_filter(c, string_table):
return_df = c.sql("SELECT * FROM string_table WHERE a = 'a normal string'")

Expand All @@ -77,7 +73,10 @@ def test_string_filter(c, string_table):
"input_table",
[
"datetime_table",
pytest.param("gpu_datetime_table", marks=pytest.mark.gpu),
pytest.param(
"gpu_datetime_table",
marks=(pytest.mark.gpu, pytest.mark.skip(reason="WIP DataFusion")),
),
],
)
def test_filter_cast_date(c, input_table, request):
Expand All @@ -101,7 +100,10 @@ def test_filter_cast_date(c, input_table, request):
"input_table",
[
"datetime_table",
pytest.param("gpu_datetime_table", marks=pytest.mark.gpu),
pytest.param(
"gpu_datetime_table",
marks=(pytest.mark.gpu, pytest.mark.skip(reason="WIP DataFusion")),
),
],
)
def test_filter_cast_timestamp(c, input_table, request):
Expand Down Expand Up @@ -206,7 +208,6 @@ def test_predicate_pushdown(c, parquet_ddf, query, df_func, filters):
assert_eq(return_df, expected_df, check_divisions=False)


@pytest.mark.skip(reason="WIP DataFusion")
def test_filtered_csv(tmpdir, c):
# Predicate pushdown is NOT supported for CSV data.
# This test just checks that the "attempted"
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def test_limit(c, input_table, limit, offset, request):
assert_eq(c.sql(query), long_table.iloc[offset : offset + limit if limit else None])


@pytest.mark.skip(reason="WIP DataFusion")
@pytest.mark.parametrize(
"input_table",
[
Expand Down Expand Up @@ -178,7 +177,6 @@ def test_date_casting(c, input_table, request):
assert_eq(result_df, expected_df)


@pytest.mark.skip(reason="WIP DataFusion")
@pytest.mark.parametrize(
"input_table",
[
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_sql(gpu):
assert_eq(result, data_frame)


@pytest.mark.skip(reason="WIP DataFusion - missing create statement logic")
@pytest.mark.parametrize(
"gpu",
[
Expand Down