Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
b1900cf
Condition for BinaryExpr, filter, input_ref, rexcall, and rexliteral
jdye64 Mar 26, 2022
1e48597
Updates for test_filter
jdye64 Mar 31, 2022
fd41a8c
more of test_filter.py working with the exception of some date pytests
jdye64 Mar 31, 2022
682c009
Add workflow to keep datafusion dev branch up to date (#440)
charlesbluca Mar 25, 2022
ab69dd8
Include setuptools-rust in conda build recipie, in host and run
jdye64 Apr 13, 2022
ce4c31e
Remove PyArrow dependency
jdye64 Apr 20, 2022
8785b8c
rebase with datafusion-sql-planner
jdye64 Apr 21, 2022
3e45ab8
refactor changes that were inadvertent during rebase
jdye64 Apr 21, 2022
1734b89
timestamp with loglca time zone
jdye64 Apr 21, 2022
ac7d9f6
Bump DataFusion version (#494)
andygrove Apr 21, 2022
cbf5db0
Include RelDataType work
jdye64 Apr 21, 2022
d9380a6
Include RelDataType work
jdye64 Apr 21, 2022
ad56fc2
Introduced SqlTypeName Enum in Rust and mappings for Python
jdye64 Apr 22, 2022
7b20e66
impl PyExpr.getIndex()
jdye64 Apr 22, 2022
7dd2017
add getRowType() for logical.rs
jdye64 Apr 22, 2022
984f523
Introduce DaskTypeMap for storing correlating SqlTypeName and DataTypes
jdye64 Apr 23, 2022
1405fea
use str values instead of Rust Enums, Python is unable to Hash the Ru…
jdye64 Apr 23, 2022
789aaad
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
652205e
linter changes, why did that work on my local pre-commit??
jdye64 Apr 23, 2022
5127f87
Convert final strs to SqlTypeName Enum
jdye64 Apr 24, 2022
cf568dc
removed a few print statements
jdye64 Apr 24, 2022
4fb640e
commit to share with colleague
jdye64 Apr 24, 2022
32127e5
updates
jdye64 Apr 25, 2022
f5e24fe
checkpoint
jdye64 Apr 25, 2022
11cf212
Temporarily disable conda run_test.py script since it uses features n…
jdye64 Apr 25, 2022
46dfb0a
formatting after upstream merge
jdye64 Apr 25, 2022
fa71674
expose fromString method for SqlTypeName to use Enums instead of stri…
jdye64 Apr 25, 2022
f6e86ca
expanded SqlTypeName from_string() support
jdye64 Apr 25, 2022
3d1a5ad
accept INT as INTEGER
jdye64 Apr 25, 2022
384e446
tests update
jdye64 Apr 25, 2022
199b9d2
checkpoint
jdye64 Apr 25, 2022
c9dffae
checkpoint
jdye64 Apr 27, 2022
c9aad43
Refactor PyExpr by removing From trait, and using recursion to expand…
jdye64 Apr 28, 2022
11100fa
skip test that uses create statement for gpuci
jdye64 Apr 28, 2022
643e85d
Basic DataFusion Select Functionality (#489)
jdye64 Apr 28, 2022
b36ef16
updates for expression
jdye64 Apr 28, 2022
5c94fbc
uncommented pytests
jdye64 Apr 28, 2022
bb461c8
uncommented pytests
jdye64 Apr 28, 2022
f65b1ab
code cleanup for review
jdye64 Apr 28, 2022
dc7553f
code cleanup for review
jdye64 Apr 28, 2022
f1dc0b2
Enabled more pytest that work now
jdye64 Apr 28, 2022
940e867
Enabled more pytest that work now
jdye64 Apr 28, 2022
6769ca0
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
c4ed9bd
Output Expression as String when BinaryExpr does not contain a named …
jdye64 Apr 29, 2022
05c5788
Disable 2 pytest that are causing gpuCI issues. They will be address …
jdye64 Apr 29, 2022
a33aa63
Handle Between operation for case-when
jdye64 Apr 29, 2022
20efd5c
adjust timestamp casting
jdye64 May 2, 2022
281baf7
merge with upstream
jdye64 May 6, 2022
d666bdd
merge with upstream/datafusion-sql-planner
jdye64 May 9, 2022
533f50a
Refactor projection _column_name() logic to the _column_name logic in…
jdye64 May 9, 2022
a42a133
removed println! statements
jdye64 May 9, 2022
dc12f5d
introduce join getCondition() logic for retrieving the combining Rex …
jdye64 May 10, 2022
9dce68a
merge with upstream
jdye64 May 10, 2022
10cd463
merge with upstream
jdye64 May 10, 2022
a1841c3
Updates from review
jdye64 May 11, 2022
3001943
Add Offset and point to repo with offset in datafusion
jdye64 May 11, 2022
7ec66da
Introduce offset
jdye64 May 12, 2022
b72917b
limit updates
jdye64 May 12, 2022
651c9ab
commit before upstream merge
jdye64 May 15, 2022
4e69813
merged with upstream/datafusion-sql-planner
jdye64 May 16, 2022
3219ad0
Code formatting
jdye64 May 16, 2022
5a88155
Merge with upstream
jdye64 May 16, 2022
23adefa
Merge with upstream
jdye64 May 16, 2022
bd94ccf
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 17, 2022
bf91e8f
update Cargo.toml to use Arrow-DataFusion version with LIMIT logic
jdye64 May 17, 2022
3dc6a89
Bump DataFusion version to get changes around variant_name()
jdye64 May 18, 2022
08b38aa
Use map partitions for determining the offset
jdye64 May 19, 2022
7b52f41
Merge with upstream datafusion-crossjoin merge
jdye64 May 19, 2022
6638930
Added multiple LogicalPlan inputs for join conditions
jdye64 May 20, 2022
e24b97f
Merge with upstream LIMIT PR
jdye64 May 20, 2022
61bd864
Merge remote-tracking branch 'upstream/datafusion-sql-planner' into d…
jdye64 May 22, 2022
e3b0d2b
Merge with upstream
jdye64 May 23, 2022
0407c6f
Rename underlying DataContainer's DataFrame instance to match the col…
jdye64 May 23, 2022
af1c138
Adjust ColumnContainer mapping after join.py logic to entire the bake…
jdye64 May 23, 2022
8853765
Add enumerate to column_{i} generation string to ensure columns exist…
jdye64 May 24, 2022
2adc5ce
Adjust join schema logic to perform merge instead of join on rust sid…
jdye64 May 24, 2022
6005018
Handle DataFusion COUNT(UInt8(1)) as COUNT(*)
jdye64 May 24, 2022
f640e1d
commit before merge
jdye64 May 24, 2022
f0cc07b
merge with upstream datafusion-sql-planner
jdye64 May 24, 2022
3159645
Update function for gathering index of a expression
jdye64 May 24, 2022
ba8cec2
Update for review check
jdye64 May 25, 2022
a8fba46
Adjust RelDataType to retrieve fully qualified column names
jdye64 May 26, 2022
8a1a865
Adjust base.py to get fully qualified column name
jdye64 May 26, 2022
6e966b6
Enable passing pytests in test_join.py
jdye64 May 26, 2022
b9604cc
Adjust keys provided by getting backend column mapping name
jdye64 May 27, 2022
014fe68
Adjust output_col to not use the backend_column name for special rese…
jdye64 May 27, 2022
5b0dba3
uncomment cross join pytest which works now
jdye64 May 27, 2022
d17d859
Uncomment passing pytests in test_select.py
jdye64 May 27, 2022
805ec8a
Review updates
jdye64 May 28, 2022
7728bd4
Add back complex join case condition, not just cross join but 'comple…
jdye64 May 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions continuous_integration/environment-3.9-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: dask-sql
channels:
- conda-forge
- nodefaults
- rapidsai-nightly
- nvidia
dependencies:
- adagio>=0.2.3
- antlr4-python3-runtime>=4.9.2, <4.10.0 # Remove max pin after qpd(fugue dependency) updates their conda recipe
Expand Down
67 changes: 54 additions & 13 deletions dask_planner/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use datafusion::prelude::Column;

use crate::sql::exceptions::py_runtime_err;
use datafusion::common::DFField;
use datafusion::logical_plan::exprlist_to_fields;
use datafusion::logical_plan::{exprlist_to_fields, DFSchema};
use std::sync::Arc;

/// An PyExpr that can be used on a DataFrame
#[pyclass(name = "Expression", module = "datafusion", subclass)]
#[derive(Debug, Clone)]
pub struct PyExpr {
pub input_plan: Option<Arc<LogicalPlan>>,
pub expr: Expr,
// Why a Vec here? Because BinaryExpr on Join might have multiple LogicalPlans
pub input_plan: Option<Vec<Arc<LogicalPlan>>>,
}

impl From<PyExpr> for Expr {
Expand Down Expand Up @@ -57,7 +58,7 @@ impl PyExpr {
/// However in this case Expr does not contain the contextual
/// `LogicalPlan` instance that we need so we need to make a instance
/// function to take and create the PyExpr.
pub fn from(expr: Expr, input: Option<Arc<LogicalPlan>>) -> PyExpr {
pub fn from(expr: Expr, input: Option<Vec<Arc<LogicalPlan>>>) -> PyExpr {
PyExpr {
input_plan: input,
expr: expr,
Expand All @@ -67,7 +68,7 @@ impl PyExpr {
/// Determines the name of the `Expr` instance by examining the LogicalPlan
pub fn _column_name(&self, plan: &LogicalPlan) -> Result<String> {
let field = expr_to_field(&self.expr, &plan)?;
Ok(field.unqualified_column().name.clone())
Ok(field.qualified_column().flat_name().clone())
}

fn _rex_type(&self, expr: &Expr) -> RexType {
Expand Down Expand Up @@ -123,16 +124,56 @@ impl PyExpr {
/// Gets the positional index of the Expr instance from the LogicalPlan DFSchema
#[pyo3(name = "getIndex")]
pub fn index(&self) -> PyResult<usize> {
let input: &Option<Arc<LogicalPlan>> = &self.input_plan;
let input: &Option<Vec<Arc<LogicalPlan>>> = &self.input_plan;
match input {
Some(plan) => {
let name: Result<String> = self.expr.name(plan.schema());
match name {
Ok(fq_name) => Ok(plan
.schema()
.index_of_column(&Column::from_qualified_name(&fq_name))
.unwrap()),
Err(e) => panic!("{:?}", e),
Some(input_plans) => {
if input_plans.len() == 1 {
let name: Result<String> = self.expr.name(input_plans[0].schema());
match name {
Ok(fq_name) => Ok(input_plans[0]
.schema()
.index_of_column(&Column::from_qualified_name(&fq_name))
.unwrap()),
Err(e) => panic!("{:?}", e),
}
} else if input_plans.len() >= 2 {
let mut base_schema: DFSchema = (**input_plans[0].schema()).clone();
for input_idx in 1..input_plans.len() {
let input_schema: DFSchema = (**input_plans[input_idx].schema()).clone();
base_schema.merge(&input_schema);
}
let name: Result<String> = self.expr.name(&base_schema);
match name {
Ok(fq_name) => {
let idx: Result<usize> =
base_schema.index_of_column(&Column::from_qualified_name(&fq_name));
match idx {
Ok(index) => Ok(index),
Err(e) => {
// This logic is encountered when an non-qualified column name is
// provided AND there exists more than one entry with that
// unqualified. This logic will attempt to narrow down to the
// qualified column name.
let qualified_fields: Vec<&DFField> =
base_schema.fields_with_unqualified_name(&fq_name);
for qf in &qualified_fields {
if qf.name().eq(&fq_name) {
let qualifier: String = qf.qualifier().unwrap().clone();
let qual: Option<&str> = Some(&qualifier);
let index: usize = base_schema
.index_of_column_by_name(qual, &qf.name())
.unwrap();
return Ok(index);
}
}
panic!("Unable to find match for column with name: '{}' in DFSchema", &fq_name);
}
}
}
Err(e) => panic!("{:?}", e),
}
} else {
panic!("Not really sure what we should do right here???");
}
}
None => {
Expand Down
37 changes: 27 additions & 10 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ mod sort;

use datafusion::logical_expr::LogicalPlan;

use datafusion::common::Result;
use datafusion::common::{DataFusionError, Result};
use datafusion::logical_plan::DFSchemaRef;
use datafusion::prelude::Column;

use crate::sql::exceptions::py_type_err;
Expand Down Expand Up @@ -116,15 +117,6 @@ impl PyLogicalPlan {
Ok(py_inputs)
}

/// Examines the current_node and get the fields associated with it
pub fn get_field_names(&mut self) -> PyResult<Vec<String>> {
let mut field_names: Vec<String> = Vec::new();
for field in self.current_node().schema().fields() {
field_names.push(String::from(field.name()));
}
Ok(field_names)
}

/// If the LogicalPlan represents access to a Table that instance is returned
/// otherwise None is returned
#[pyo3(name = "getTable")]
Expand All @@ -137,6 +129,31 @@ impl PyLogicalPlan {
}
}

#[pyo3(name = "getCurrentNodeSchemaName")]
pub fn get_current_node_schema_name(&self) -> PyResult<&str> {
match &self.current_node {
Some(e) => {
let sch: &DFSchemaRef = e.schema();
//TODO: Where can I actually get this in the context of the running query?
Ok("root")
}
None => Err(py_type_err(DataFusionError::Plan(format!(
"Current schema not found. Defaulting to {:?}",
"root"
)))),
}
}

#[pyo3(name = "getCurrentNodeTableName")]
pub fn get_current_node_table_name(&mut self) -> PyResult<String> {
match self.table() {
Ok(dask_table) => Ok(dask_table.name.clone()),
Err(_e) => Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"Unable to determine current node table name",
)),
}
}

/// Gets the Relation "type" of the current node. Ex: Projection, TableScan, etc
pub fn get_current_node_type(&mut self) -> PyResult<&str> {
Ok(match self.current_node() {
Expand Down
6 changes: 3 additions & 3 deletions dask_planner/src/sql/logical/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl PyAggregate {
for expr in &self.aggregate.group_expr {
group_exprs.push(PyExpr::from(
expr.clone(),
Some(self.aggregate.input.clone()),
Some(vec![self.aggregate.input.clone()]),
));
}
Ok(group_exprs)
Expand All @@ -33,7 +33,7 @@ impl PyAggregate {
for expr in &self.aggregate.aggr_expr {
agg_exprs.push(PyExpr::from(
expr.clone(),
Some(self.aggregate.input.clone()),
Some(vec![self.aggregate.input.clone()]),
));
}
Ok(agg_exprs)
Expand All @@ -54,7 +54,7 @@ impl PyAggregate {
let mut exprs: Vec<PyExpr> = Vec::new();
for expr in args {
exprs.push(PyExpr {
input_plan: Some(self.aggregate.input.clone()),
input_plan: Some(vec![self.aggregate.input.clone()]),
expr: expr,
});
}
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl PyFilter {
pub fn get_condition(&mut self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
self.filter.predicate.clone(),
Some(self.filter.input.clone()),
Some(vec![self.filter.input.clone()]),
))
}
}
Expand Down
40 changes: 39 additions & 1 deletion dask_planner/src/sql/logical/join.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::expression::PyExpr;
use crate::sql::column;

use datafusion::physical_plan::expressions::Column;

use datafusion::logical_expr::logical_plan::Join;
use datafusion::logical_plan::{JoinType, LogicalPlan};
use datafusion::logical_plan::{JoinType, LogicalPlan, Operator};
use datafusion::prelude::{col, Expr};

use crate::sql::exceptions::py_type_err;
use pyo3::prelude::*;
Expand All @@ -14,6 +18,40 @@ pub struct PyJoin {

#[pymethods]
impl PyJoin {
#[pyo3(name = "getCondition")]
pub fn join_condition(&self) -> PyExpr {
// TODO: This logic should be altered once https://github.com/apache/arrow-datafusion/issues/2496 is complete
if self.join.on.len() >= 1 {
let (left_col, right_col) = &self.join.on[0];
let mut root_expr: Expr = Expr::BinaryExpr {
left: Box::new(Expr::Column(left_col.clone())),
op: Operator::Eq,
right: Box::new(Expr::Column(right_col.clone())),
};
for idx in 1..self.join.on.len() {
let (left_col, right_col) = &self.join.on[idx];
let ex: Expr = Expr::BinaryExpr {
left: Box::new(Expr::Column(left_col.clone())),
op: Operator::Eq,
right: Box::new(Expr::Column(right_col.clone())),
};

root_expr = Expr::BinaryExpr {
left: Box::new(root_expr),
op: Operator::Eq,
right: Box::new(ex),
}
}
PyExpr::from(
root_expr,
Some(vec![self.join.left.clone(), self.join.right.clone()]),
)
} else {
panic!("Join Length: {}, Encountered a Join with more than a single column for the join condition. This is not currently supported
until DataFusion makes some changes to allow for Joining logic other than just Equijoin.", self.join.on.len())
}
}

#[pyo3(name = "getJoinConditions")]
pub fn join_conditions(&mut self) -> PyResult<Vec<(column::PyColumn, column::PyColumn)>> {
let lhs_table_name: String = match &*self.join.left {
Expand Down
2 changes: 1 addition & 1 deletion dask_planner/src/sql/logical/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl PyLimit {
pub fn limit_n(&self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(self.limit.n.try_into().unwrap()))),
Some(self.limit.input.clone()),
Some(vec![self.limit.input.clone()]),
))
}
}
Expand Down
4 changes: 2 additions & 2 deletions dask_planner/src/sql/logical/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl PyOffset {
pub fn offset(&self) -> PyResult<PyExpr> {
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(self.offset.offset as u64))),
Some(self.offset.input.clone()),
Some(vec![self.offset.input.clone()]),
))
}

Expand All @@ -27,7 +27,7 @@ impl PyOffset {
// TODO: Still need to implement fetch size! For now get everything from offset on with '0'
Ok(PyExpr::from(
Expr::Literal(ScalarValue::UInt64(Some(0))),
Some(self.offset.input.clone()),
Some(vec![self.offset.input.clone()]),
))
}
}
Expand Down
6 changes: 3 additions & 3 deletions dask_planner/src/sql/logical/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl PyProjection {
match &local_expr.expr {
Expr::Alias(expr, _name) => {
let py_expr: PyExpr =
PyExpr::from(*expr.clone(), Some(self.projection.input.clone()));
PyExpr::from(*expr.clone(), Some(vec![self.projection.input.clone()]));
projs.extend_from_slice(self.projected_expressions(&py_expr).as_slice());
}
_ => projs.push(local_expr.clone()),
Expand All @@ -34,8 +34,8 @@ impl PyProjection {
fn named_projects(&mut self) -> PyResult<Vec<(String, PyExpr)>> {
let mut named: Vec<(String, PyExpr)> = Vec::new();
for expression in self.projection.expr.clone() {
let mut py_expr: PyExpr = PyExpr::from(expression, Some(self.projection.input.clone()));
py_expr.input_plan = Some(self.projection.input.clone());
let mut py_expr: PyExpr =
PyExpr::from(expression, Some(vec![self.projection.input.clone()]));
for expr in self.projected_expressions(&py_expr) {
if let Ok(name) = expr._column_name(&*self.projection.input) {
named.push((name, expr.clone()));
Expand Down
5 changes: 4 additions & 1 deletion dask_planner/src/sql/logical/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ impl PySort {
pub fn sort_expressions(&self) -> PyResult<Vec<PyExpr>> {
let mut sort_exprs: Vec<PyExpr> = Vec::new();
for expr in &self.sort.expr {
sort_exprs.push(PyExpr::from(expr.clone(), Some(self.sort.input.clone())));
sort_exprs.push(PyExpr::from(
expr.clone(),
Some(vec![self.sort.input.clone()]),
));
}
Ok(sort_exprs)
}
Expand Down
6 changes: 5 additions & 1 deletion dask_planner/src/sql/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ pub(crate) fn table_from_logical_plan(plan: &LogicalPlan) -> Option<DaskTable> {
table_from_logical_plan(&join.left)
}
LogicalPlan::Aggregate(agg) => table_from_logical_plan(&agg.input),
_ => todo!("table_from_logical_plan: unimplemented LogicalPlan type encountered"),
LogicalPlan::SubqueryAlias(alias) => table_from_logical_plan(&alias.input),
_ => todo!(
"table_from_logical_plan: unimplemented LogicalPlan type {:?} encountered",
plan
),
}
}
4 changes: 2 additions & 2 deletions dask_planner/src/sql/types/rel_data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const PRECISION_NOT_SPECIFIED: i32 = i32::MIN;
const SCALE_NOT_SPECIFIED: i32 = -1;

/// RelDataType represents the type of a scalar expression or entire row returned from a relational expression.
#[pyclass]
#[pyclass(name = "RelDataType", module = "dask_planner", subclass)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RelDataType {
nullable: bool,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl RelDataType {
assert!(!self.field_list.is_empty());
let mut field_names: Vec<String> = Vec::new();
for field in &self.field_list {
field_names.push(String::from(field.name()));
field_names.push(String::from(field.qualified_name()));
}
field_names
}
Expand Down
Loading