From 87c9cf3cfd5bf4666c0edacd1c97ecaf0cce4540 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Sun, 6 Sep 2020 18:01:26 +0200 Subject: [PATCH 01/10] Start adding a datacontainer This container splits up the real dataframe from the column names so that we do not need to have that many column renames anymore --- dask_sql/datacontainer.py | 92 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 dask_sql/datacontainer.py diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py new file mode 100644 index 000000000..da972fa6e --- /dev/null +++ b/dask_sql/datacontainer.py @@ -0,0 +1,92 @@ +from typing import List, Dict, Tuple, Union + +import dask.dataframe as dd + + +class ColumnContainer: + pass + + +class ColumnContainer: + def __init__( + self, + frontend_columns: List[str], + frontend_backend_mapping: Union[Dict[str, str], None] = None, + ): + self._frontend_columns = list(frontend_columns) + if frontend_backend_mapping is None: + self._frontend_backend_mapping = { + col: col for col in self._frontend_columns + } + else: + self._frontend_backend_mapping = frontend_backend_mapping + + def _copy(self) -> ColumnContainer: + return ColumnContainer(self._frontend_columns, self._frontend_backend_mapping) + + def limit_to(self, fields: List[str]) -> ColumnContainer: + assert all(f in self._frontend_backend_mapping for f in fields) + cc = self._copy() + cc._frontend_columns = list(fields) + return cc + + def rename(self, columns: Dict[str, str]) -> ColumnContainer: + cc = self._copy() + for column_from, column_to in columns.items(): + backend_column = self._frontend_backend_mapping[column_from] + cc._frontend_backend_mapping[column_to] = backend_column + + cc._frontend_columns = [ + columns[col] if col in columns else col for col in self._frontend_columns + ] + + return cc + + def mapping(self) -> List[Tuple[str, str]]: + return self._frontend_backend_mapping.items() + + def reverse_mapping(self) -> List[Tuple[str, str]]: + return {backend: frontend for frontend, backend in self.mapping()} + + @property + def columns(self) -> List[str]: + return self._frontend_columns + + def add( + self, frontend_column: str, backend_column: Union[str, None] = None + ) -> ColumnContainer: + cc = self._copy() + + cc._frontend_backend_mapping[frontend_column] = ( + backend_column or frontend_column + ) + cc._frontend_columns.append(frontend_column) + + return cc + + def get_backend_by_frontend_index(self, index: int) -> str: + frontend_column = self._frontend_columns[index] + backend_column = self._frontend_backend_mapping[frontend_column] + return backend_column + + def make_unique(self, prefix="col"): + """ + Make sure we have unique column names by calling each column + + prefix_number + + where number is the column index. + """ + return self.rename( + columns={col: f"{prefix}_{i}" for i, col in enumerate(self.columns)} + ) + + +class DataContainer: + def __init__(self, df: dd.DataFrame, column_container: ColumnContainer): + self.df = df + self.column_container = column_container + + def assign(self) -> dd.DataFrame: + df = self.df.rename(columns=self.column_container.reverse_mapping()) + return df[self.column_container.columns] From 90480694ea611c579acbb7899e5d8d3117e44402 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Sun, 6 Sep 2020 18:03:01 +0200 Subject: [PATCH 02/10] Use the new datatype in the rex classes --- dask_sql/physical/rex/base.py | 4 +++- dask_sql/physical/rex/convert.py | 7 ++++--- dask_sql/physical/rex/core/call.py | 5 +++-- dask_sql/physical/rex/core/input_ref.py | 9 +++++++-- dask_sql/physical/rex/core/literal.py | 4 +++- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/dask_sql/physical/rex/base.py b/dask_sql/physical/rex/base.py index dfe789686..652c46955 100644 --- a/dask_sql/physical/rex/base.py +++ b/dask_sql/physical/rex/base.py @@ -2,6 +2,8 @@ import dask.dataframe as dd +from dask_sql.datacontainer import DataContainer + class BaseRexPlugin: """ @@ -15,7 +17,7 @@ class BaseRexPlugin: class_name = None def convert( - self, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame + self, rex: "org.apache.calcite.rex.RexNode", dc: DataContainer ) -> Union[dd.Series, Any]: """Base method to implement""" raise NotImplementedError # pragma: no cover diff --git a/dask_sql/physical/rex/convert.py b/dask_sql/physical/rex/convert.py index 7e88340cb..bc6af5002 100644 --- a/dask_sql/physical/rex/convert.py +++ b/dask_sql/physical/rex/convert.py @@ -5,6 +5,7 @@ from dask_sql.java import get_java_class from dask_sql.utils import Pluggable from dask_sql.physical.rex.base import BaseRexPlugin +from dask_sql.datacontainer import DataContainer class RexConverter(Pluggable): @@ -30,8 +31,8 @@ def add_plugin_class(cls, plugin_class: BaseRexPlugin, replace=True): @classmethod def convert( - cls, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame - ) -> Union[dd.DataFrame, Any]: + cls, rex: "org.apache.calcite.rex.RexNode", dc: DataContainer + ) -> Union[dd.Series, Any]: """ Convert the given rel (java instance) into a python expression (a dask dataframe) @@ -47,5 +48,5 @@ def convert( f"No conversion for class {class_name} available (yet)." ) - df = plugin_instance.convert(rex, df=df) + df = plugin_instance.convert(rex, dc=dc) return df diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index d880b4627..991af635c 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -9,6 +9,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.utils import is_frame +from dask_sql.datacontainer import DataContainer class Operation: @@ -182,10 +183,10 @@ class RexCallPlugin: } def convert( - self, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame + self, rex: "org.apache.calcite.rex.RexNode", dc: DataContainer ) -> Union[dd.Series, Any]: # Prepare the operands by turning the RexNodes into python expressions - operands = [RexConverter.convert(o, df) for o in rex.getOperands()] + operands = [RexConverter.convert(o, dc) for o in rex.getOperands()] # Now use the operator name in the mapping operator_name = str(rex.getOperator().getName()) diff --git a/dask_sql/physical/rex/core/input_ref.py b/dask_sql/physical/rex/core/input_ref.py index 3f5b74f80..08043b5a1 100644 --- a/dask_sql/physical/rex/core/input_ref.py +++ b/dask_sql/physical/rex/core/input_ref.py @@ -1,6 +1,7 @@ import dask.dataframe as dd from dask_sql.physical.rex.base import BaseRexPlugin +from dask_sql.datacontainer import DataContainer class RexInputRefPlugin(BaseRexPlugin): @@ -13,8 +14,12 @@ class RexInputRefPlugin(BaseRexPlugin): class_name = "org.apache.calcite.rex.RexInputRef" def convert( - self, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame + self, rex: "org.apache.calcite.rex.RexNode", dc: DataContainer ) -> dd.Series: + df = dc.df + cc = dc.column_container + # The column is references by index index = rex.getIndex() - return df.iloc[:, index] + backend_column_name = cc.get_backend_by_frontend_index(index) + return df[backend_column_name] diff --git a/dask_sql/physical/rex/core/literal.py b/dask_sql/physical/rex/core/literal.py index ec5ecb302..95d292856 100644 --- a/dask_sql/physical/rex/core/literal.py +++ b/dask_sql/physical/rex/core/literal.py @@ -1,9 +1,11 @@ from typing import Any +import numpy as np import dask.dataframe as dd from dask_sql.physical.rex.base import BaseRexPlugin from dask_sql.mappings import sql_to_python_type, null_python_type +from dask_sql.datacontainer import DataContainer class RexLiteralPlugin(BaseRexPlugin): @@ -18,7 +20,7 @@ class RexLiteralPlugin(BaseRexPlugin): class_name = "org.apache.calcite.rex.RexLiteral" - def convert(self, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame) -> Any: + def convert(self, rex: "org.apache.calcite.rex.RexNode", dc: DataContainer) -> Any: literal_type = str(rex.getType()) python_type = sql_to_python_type(literal_type) From 0473dba2d073bf73b6f1f4cf83c15fd676582ba8 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Sun, 6 Sep 2020 18:04:13 +0200 Subject: [PATCH 03/10] Use the new datatype in the context --- dask_sql/context.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dask_sql/context.py b/dask_sql/context.py index 40928402d..cb9baee53 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -4,6 +4,7 @@ from dask_sql.mappings import python_to_sql_type from dask_sql.physical.rel import RelConverter, logical from dask_sql.physical.rex import RexConverter, core +from dask_sql.datacontainer import DataContainer, ColumnContainer class Context: @@ -54,13 +55,13 @@ def __init__(self): def register_dask_table(self, df: dd.DataFrame, name: str): """ - Registering a dask table makes it usable in SQl queries. + Registering a dask table makes it usable in SQL queries. The name you give here can be used as table name in the SQL later. Please note, that the table is stored as it is now. If you change the table later, you need to re-register. """ - self.tables[name] = df.copy() + self.tables[name] = DataContainer(df.copy(), ColumnContainer(df.columns)) def sql(self, sql: str, debug: bool = False) -> dd.DataFrame: """ @@ -71,8 +72,8 @@ def sql(self, sql: str, debug: bool = False) -> dd.DataFrame: """ # TODO: show a nice error message if something is broken rel = self._get_ral(sql, debug=debug) - df = RelConverter.convert(rel, tables=self.tables) - return df + dc = RelConverter.convert(rel, tables=self.tables) + return dc.assign() def _get_ral(self, sql, debug: bool = False): """Helper function to turn the sql query into a relational algebra""" @@ -80,8 +81,9 @@ def _get_ral(self, sql, debug: bool = False): # currently in our list schema = DaskSchema("schema") - for name, df in self.tables.items(): + for name, dc in self.tables.items(): table = DaskTable(name) + df = dc.df for order, column in enumerate(df.columns): data_type = df[column].dtype sql_data_type = python_to_sql_type(data_type) From 069c5706ec063acadb089d5b4313df7356475b5a Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Sun, 6 Sep 2020 18:05:51 +0200 Subject: [PATCH 04/10] Use the new datatype in the physical plans --- dask_sql/physical/rel/base.py | 12 +++--- dask_sql/physical/rel/logical/aggregate.py | 48 +++++++++++++-------- dask_sql/physical/rel/logical/filter.py | 16 ++++--- dask_sql/physical/rel/logical/join.py | 45 +++++++++++++------ dask_sql/physical/rel/logical/project.py | 21 ++++++--- dask_sql/physical/rel/logical/sort.py | 18 +++++--- dask_sql/physical/rel/logical/table_scan.py | 17 ++++---- dask_sql/physical/rel/logical/union.py | 40 ++++++++++++----- dask_sql/physical/rel/logical/values.py | 9 ++-- 9 files changed, 148 insertions(+), 78 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index b5376d5c2..2d5a92b0d 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -2,6 +2,8 @@ import dask.dataframe as dd +from dask_sql.datacontainer import ColumnContainer + class BaseRelPlugin: """ @@ -22,20 +24,20 @@ def convert( @staticmethod def fix_column_to_row_type( - df: dd.DataFrame, row_type: "org.apache.calcite.rel.type.RelDataType" - ) -> dd.DataFrame: + cc: ColumnContainer, row_type: "org.apache.calcite.rel.type.RelDataType" + ) -> ColumnContainer: """ - Make sure that the given dask dataframe + Make sure that the given column container has the column names specified by the row type. We assume that the column order is already correct and will just "blindly" rename the columns. """ field_names = [str(x) for x in row_type.getFieldNames()] - df = df.rename(columns=dict(zip(df.columns, field_names))) + cc = cc.rename(columns=dict(zip(cc.columns, field_names))) # TODO: We can also check for the types here and do any conversions if needed - return df[field_names] + return cc.limit_to(field_names) @staticmethod def check_columns_from_row_type( diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 433791620..74405a040 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -4,6 +4,7 @@ import dask.dataframe as dd from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer, ColumnContainer class LogicalAggregatePlugin(BaseRelPlugin): @@ -33,28 +34,34 @@ class LogicalAggregatePlugin(BaseRelPlugin): } def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: - (df,) = self.assert_inputs(rel, 1, tables) + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: + (dc,) = self.assert_inputs(rel, 1, tables) + + df = dc.df + cc = dc.column_container # We make our life easier with having unique column names - df = self.make_unique(df) + cc = cc.make_unique() # I have no idea what that is, but so far it was always of length 1 assert len(rel.getGroupSets()) == 1, "Do not know how to handle this case!" # Extract the information, which columns we need to group for group_column_indices = [int(i) for i in rel.getGroupSet()] - group_columns = [df.columns[i] for i in group_column_indices] + group_columns = [ + cc.get_backend_by_frontend_index(i) for i in group_column_indices + ] # Always keep an additional column around for empty groups and aggregates - additional_column_name = str(len(df.columns)) - df = df.assign(**{additional_column_name: 1}) + additional_column_name = str(len(cc.columns)) # Collect all aggregates - aggregations, output_column_order = self._collect_aggregations( - rel, df, group_columns, additional_column_name - ) + ( + aggregations, + output_column_order, + additional_column_needed, + ) = self._collect_aggregations(rel, cc, group_columns, additional_column_name) if not group_columns: # There was actually no GROUP BY specified in the SQL @@ -64,6 +71,11 @@ def convert( # It is important to do this after creating the aggregations, # as we do not want this additional column to be used anywhere group_columns = [additional_column_name] + additional_column_needed = True + + if additional_column_needed: + df = df.assign(**{additional_column_name: 1}) + cc.add(additional_column_name) # Now we can perform the aggregates df = df.groupby(by=group_columns).agg(aggregations) @@ -73,21 +85,22 @@ def convert( # Fix the column names and the order of them, as this was messed with during the aggregations df.columns = df.columns.get_level_values(-1) - df = df[output_column_order] + cc = ColumnContainer(df.columns) - df = self.fix_column_to_row_type(df, rel.getRowType()) - - return df + cc = self.fix_column_to_row_type(cc, rel.getRowType()) + return DataContainer(df, cc) def _collect_aggregations( self, rel: "org.apache.calcite.rel.RelNode", - df: dd.DataFrame, + cc: ColumnContainer, group_columns: List[str], additional_column_name: str, ) -> Tuple[Dict[str, Dict[str, str]], List[int]]: + aggregations = defaultdict(dict) output_column_order = [] + additional_column_needed = False # SQL needs to copy the old content also. As the values are the same for a single group # anyways, we just use the first row @@ -114,9 +127,10 @@ def _collect_aggregations( inputs = expr.getArgList() if len(inputs) == 1: - input_column_name = df.columns[inputs[0]] + input_column_name = cc.get_backend_by_frontend_index(inputs[0]) elif len(inputs) == 0: input_column_name = additional_column_name + additional_column_needed = True else: raise NotImplementedError( "Can not cope with more than one input" @@ -125,4 +139,4 @@ def _collect_aggregations( aggregations[input_column_name][output_column_name] = aggregation_function output_column_order.append(output_column_name) - return aggregations, output_column_order + return aggregations, output_column_order, additional_column_needed diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index ec7266803..ea7749ab2 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -4,6 +4,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer class LogicalFilterPlugin(BaseRelPlugin): @@ -15,14 +16,15 @@ class LogicalFilterPlugin(BaseRelPlugin): class_name = "org.apache.calcite.rel.logical.LogicalFilter" def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: - (df,) = self.assert_inputs(rel, 1, tables) - self.check_columns_from_row_type(df, rel.getExpectedInputRowType(0)) + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: + (dc,) = self.assert_inputs(rel, 1, tables) + df = dc.df + cc = dc.column_container condition = rel.getCondition() - df_condition = RexConverter.convert(condition, df) + df_condition = RexConverter.convert(condition, dc) df = df[df_condition] - df = self.fix_column_to_row_type(df, rel.getRowType()) - return df + cc = self.fix_column_to_row_type(cc, rel.getRowType()) + return DataContainer(df, cc) diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index a2a4f4cad..d76c658f0 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -8,6 +8,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.java import get_short_java_class from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer, ColumnContainer class LogicalJoinPlugin(BaseRelPlugin): @@ -36,20 +37,28 @@ class LogicalJoinPlugin(BaseRelPlugin): } def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: # Joining is a bit more complicated, so lets do it in steps: # 1. We now have two inputs (from left and right), so we fetch them both - df_lhs, df_rhs = self.assert_inputs(rel, 2, tables) + dc_lhs, dc_rhs = self.assert_inputs(rel, 2, tables) + cc_lhs = dc_lhs.column_container + cc_rhs = dc_rhs.column_container # 2. dask's merge will do some smart things with columns, which have the same name # on lhs an rhs (which also includes reordering). # However, that will confuse our column numbering in SQL. # So we make our life easier by converting the column names into unique names # We will convert back in the end - df_lhs_renamed = self.make_unique(df_lhs, "lhs") - df_rhs_renamed = self.make_unique(df_rhs, "rhs") + cc_lhs_renamed = cc_lhs.make_unique("lhs") + cc_rhs_renamed = cc_rhs.make_unique("rhs") + + dc_lhs_renamed = DataContainer(dc_lhs.df, cc_lhs_renamed) + dc_rhs_renamed = DataContainer(dc_rhs.df, cc_rhs_renamed) + + df_lhs_renamed = dc_lhs_renamed.assign() + df_rhs_renamed = dc_rhs_renamed.assign() join_type = rel.getJoinType() join_type = self.JOIN_TYPE_MAPPING[str(join_type)] @@ -70,7 +79,7 @@ def convert( # The given column indices are for the full, merged table which consists # of lhs and rhs put side-by-side (in this order) # We therefore need to normalize the rhs indices relative to the rhs table. - rhs_on = [index - len(df_lhs.columns) for index in rhs_on] + rhs_on = [index - len(df_lhs_renamed.columns) for index in rhs_on] # 4. dask can only merge on the same column names. # We therefore create new columns on purpose, which have a distinct name. @@ -109,21 +118,33 @@ def convert( # 6. So the next step is to make sure # we have the correct column order (and to remove the temporary join columns) - df = df[list(df_lhs_renamed.columns) + list(df_rhs_renamed.columns)] + correct_column_order = list(df_lhs_renamed.columns) + list( + df_rhs_renamed.columns + ) + cc = ColumnContainer(df.columns).limit_to(correct_column_order) + + # and to rename them like the rel specifies + row_type = rel.getRowType() + field_specifications = [str(f) for f in row_type.getFieldNames()] + cc = cc.rename( + { + from_col: to_col + for from_col, to_col in zip(cc.columns, field_specifications) + } + ) + dc = DataContainer(df, cc) # 7. Last but not least we apply any filters by and-chaining together the filters if filter_condition: # This line is a bit of code duplication with RexCallPlugin - but I guess it is worth to keep it separate filter_condition = reduce( operator.and_, - [RexConverter.convert(rex, df) for rex in filter_condition], + [RexConverter.convert(rex, dc) for rex in filter_condition], ) df = df[filter_condition] + dc = DataContainer(df, cc) - # Now we go back to the names requested by the rel - df = self.fix_column_to_row_type(df, rel.getRowType()) - - return df + return dc def _split_join_condition( self, join_condition: "org.apache.calcite.rex.RexCall" diff --git a/dask_sql/physical/rel/logical/project.py b/dask_sql/physical/rel/logical/project.py index a639eb2fe..930feef4f 100644 --- a/dask_sql/physical/rel/logical/project.py +++ b/dask_sql/physical/rel/logical/project.py @@ -4,6 +4,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer class LogicalProjectPlugin(BaseRelPlugin): @@ -19,20 +20,26 @@ def convert( self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] ) -> dd.DataFrame: # Get the input of the previous step - (df,) = self.assert_inputs(rel, 1, tables) + (dc,) = self.assert_inputs(rel, 1, tables) - # It is easiest to just replace all columns with the new ones + df = dc.df + cc = dc.column_container + + # Collect all new columns named_projects = rel.getNamedProjects() + # TODO: we do not need to create a new column if we already have it new_columns = {} for expr, key in named_projects: - new_columns[str(key)] = RexConverter.convert(expr, df) + new_columns[str(key)] = RexConverter.convert(expr, dc=dc) - df = df.drop(columns=list(df.columns)).assign(**new_columns) + df = df.assign(**new_columns) + for new_column in new_columns: + cc = cc.add(new_column) # Make sure the order is correct column_names = list(new_columns.keys()) - df = df[column_names] + cc = cc.limit_to(column_names) - df = self.fix_column_to_row_type(df, rel.getRowType()) - return df + cc = self.fix_column_to_row_type(cc, rel.getRowType()) + return DataContainer(df, cc) diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index ca38a2bfd..8afe80a6e 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -7,6 +7,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer class LogicalSortPlugin(BaseRelPlugin): @@ -19,13 +20,17 @@ class LogicalSortPlugin(BaseRelPlugin): class_name = "org.apache.calcite.rel.logical.LogicalSort" def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: - (df,) = self.assert_inputs(rel, 1, tables) - self.check_columns_from_row_type(df, rel.getExpectedInputRowType(0)) + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: + (dc,) = self.assert_inputs(rel, 1, tables) + df = dc.df + cc = dc.column_container sort_collation = rel.getCollation().getFieldCollations() - sort_columns = [df.columns[int(x.getFieldIndex())] for x in sort_collation] + sort_columns = [ + cc.get_backend_by_frontend_index(int(x.getFieldIndex())) + for x in sort_collation + ] sort_ascending = [str(x.getDirection()) == "ASCENDING" for x in sort_collation] offset = rel.offset @@ -45,8 +50,7 @@ def convert( if offset is not None or end is not None: df = self._apply_offset(df, offset, end) - df = self.fix_column_to_row_type(df, rel.getRowType()) - return df + return DataContainer(df, cc) def _apply_sort( self, df: dd.DataFrame, sort_columns: List[str], sort_ascending: List[bool] diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 9baf0b20b..4bea7fdbc 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -1,8 +1,7 @@ from typing import Dict -import dask.dataframe as dd - from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer class LogicalTableScanPlugin(BaseRelPlugin): @@ -20,8 +19,8 @@ class LogicalTableScanPlugin(BaseRelPlugin): class_name = "org.apache.calcite.rel.logical.LogicalTableScan" def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: # There should not be any input. This is the first step. self.assert_inputs(rel, 0) @@ -37,12 +36,14 @@ def convert( assert len(table_names) == 2 table_name = table_names[1] - df = tables[table_name] + dc = tables[table_name] + df = dc.df + cc = dc.column_container # Make sure we only return the requested columns row_type = table.getRowType() field_specifications = [str(f) for f in row_type.getFieldNames()] - df = df[field_specifications] + cc = cc.limit_to(field_specifications) - df = self.fix_column_to_row_type(df, rel.getRowType()) - return df + cc = self.fix_column_to_row_type(cc, rel.getRowType()) + return DataContainer(df, cc) diff --git a/dask_sql/physical/rel/logical/union.py b/dask_sql/physical/rel/logical/union.py index 2f6aeccdb..dead69334 100644 --- a/dask_sql/physical/rel/logical/union.py +++ b/dask_sql/physical/rel/logical/union.py @@ -4,6 +4,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer, ColumnContainer class LogicalUnionPlugin(BaseRelPlugin): @@ -15,26 +16,42 @@ class LogicalUnionPlugin(BaseRelPlugin): class_name = "org.apache.calcite.rel.logical.LogicalUnion" def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: - first_df, second_df = self.assert_inputs(rel, 2, tables) + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: + first_dc, second_dc = self.assert_inputs(rel, 2, tables) + + first_df = first_dc.df + first_cc = first_dc.column_container + + second_df = second_dc.df + second_cc = second_dc.column_container # For concatenating, they should have exactly the same fields output_field_names = [str(x) for x in rel.getRowType().getFieldNames()] - assert len(first_df.columns) == len(output_field_names) - first_df = first_df.rename( + assert len(first_cc.columns) == len(output_field_names) + first_cc = first_cc.rename( columns={ col: output_col - for col, output_col in zip(first_df.columns, output_field_names) + for col, output_col in zip(first_cc.columns, output_field_names) } ) - assert len(second_df.columns) == len(output_field_names) - second_df = second_df.rename( + first_dc = DataContainer(first_df, first_cc) + + assert len(second_cc.columns) == len(output_field_names) + second_cc = second_cc.rename( columns={ col: output_col - for col, output_col in zip(second_df.columns, output_field_names) + for col, output_col in zip(second_cc.columns, output_field_names) } ) + second_dc = DataContainer(second_df, second_cc) + + # To concat the to dataframes, we need to make sure the + # columns actually have the specified names in the + # column containers + # Otherwise the concat won't work + first_df = first_dc.assign() + second_df = second_dc.assign() self.check_columns_from_row_type(first_df, rel.getExpectedInputRowType(0)) self.check_columns_from_row_type(second_df, rel.getExpectedInputRowType(1)) @@ -44,5 +61,6 @@ def convert( if not rel.all: df = df.drop_duplicates() - df = self.fix_column_to_row_type(df, rel.getRowType()) - return df + cc = ColumnContainer(df.columns) + cc = self.fix_column_to_row_type(cc, rel.getRowType()) + return DataContainer(df, cc) diff --git a/dask_sql/physical/rel/logical/values.py b/dask_sql/physical/rel/logical/values.py index 05603613d..a49bd77f1 100644 --- a/dask_sql/physical/rel/logical/values.py +++ b/dask_sql/physical/rel/logical/values.py @@ -5,6 +5,7 @@ from dask_sql.physical.rex import RexConverter from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.datacontainer import DataContainer, ColumnContainer class LogicalValuesPlugin(BaseRelPlugin): @@ -24,8 +25,8 @@ class LogicalValuesPlugin(BaseRelPlugin): class_name = "org.apache.calcite.rel.logical.LogicalValues" def convert( - self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, dd.DataFrame] - ) -> dd.DataFrame: + self, rel: "org.apache.calcite.rel.RelNode", tables: Dict[str, DataContainer] + ) -> DataContainer: # There should not be any input. This is the first step. self.assert_inputs(rel, 0) @@ -37,6 +38,6 @@ def convert( # We assume here that when using the values plan, the resulting dataframe will be quite small # TODO: we explicitely reference pandas and dask here -> might we worth making this more general df = dd.from_pandas(pd.DataFrame(rows), npartitions=1) - df = self.fix_column_to_row_type(df, rel.getRowType()) + cc = ColumnContainer(df.columns) - return df + return DataContainer(df, cc) From 43854ef694a9ddd2edfb627f6234930e455961da Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Sun, 6 Sep 2020 18:23:42 +0200 Subject: [PATCH 05/10] Remove the now outdated make_unique function --- dask_sql/physical/rel/base.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/dask_sql/physical/rel/base.py b/dask_sql/physical/rel/base.py index 2d5a92b0d..92c134fa9 100644 --- a/dask_sql/physical/rel/base.py +++ b/dask_sql/physical/rel/base.py @@ -75,16 +75,3 @@ def assert_inputs( from dask_sql.physical.rel.convert import RelConverter return [RelConverter.convert(input_rel, tables) for input_rel in input_rels] - - @staticmethod - def make_unique(df, prefix="col"): - """ - Make sure we have unique column names by calling each column - - prefix_number - - where number is the column index. - """ - return df.rename( - columns={col: f"{prefix}_{i}" for i, col in enumerate(df.columns)} - ) From 4c1483ae7cd6db72fb078ec3b629446fce325011 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Tue, 8 Sep 2020 18:22:03 +0200 Subject: [PATCH 06/10] Make sure to always have a str column --- dask_sql/datacontainer.py | 37 ++++++++++++++++--------- dask_sql/physical/rel/logical/values.py | 19 ++++++++++--- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index da972fa6e..5ec041bc5 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -3,6 +3,9 @@ import dask.dataframe as dd +ColumnType = Union[str, int] + + class ColumnContainer: pass @@ -11,8 +14,11 @@ class ColumnContainer: def __init__( self, frontend_columns: List[str], - frontend_backend_mapping: Union[Dict[str, str], None] = None, + frontend_backend_mapping: Union[Dict[str, ColumnType], None] = None, ): + assert all( + isinstance(col, str) for col in frontend_columns + ), "All frontend columns need to be of string type" self._frontend_columns = list(frontend_columns) if frontend_backend_mapping is None: self._frontend_backend_mapping = { @@ -27,26 +33,24 @@ def _copy(self) -> ColumnContainer: def limit_to(self, fields: List[str]) -> ColumnContainer: assert all(f in self._frontend_backend_mapping for f in fields) cc = self._copy() - cc._frontend_columns = list(fields) + cc._frontend_columns = [str(x) for x in fields] return cc - def rename(self, columns: Dict[str, str]) -> ColumnContainer: + def rename(self, columns: Dict[str, ColumnType]) -> ColumnContainer: cc = self._copy() for column_from, column_to in columns.items(): - backend_column = self._frontend_backend_mapping[column_from] - cc._frontend_backend_mapping[column_to] = backend_column + backend_column = self._frontend_backend_mapping[str(column_from)] + cc._frontend_backend_mapping[str(column_to)] = backend_column cc._frontend_columns = [ - columns[col] if col in columns else col for col in self._frontend_columns + str(columns[col]) if col in columns else col + for col in self._frontend_columns ] return cc def mapping(self) -> List[Tuple[str, str]]: - return self._frontend_backend_mapping.items() - - def reverse_mapping(self) -> List[Tuple[str, str]]: - return {backend: frontend for frontend, backend in self.mapping()} + return list(self._frontend_backend_mapping.items()) @property def columns(self) -> List[str]: @@ -57,7 +61,9 @@ def add( ) -> ColumnContainer: cc = self._copy() - cc._frontend_backend_mapping[frontend_column] = ( + frontend_column = str(frontend_column) + + cc._frontend_backend_mapping[frontend_column] = str( backend_column or frontend_column ) cc._frontend_columns.append(frontend_column) @@ -78,7 +84,7 @@ def make_unique(self, prefix="col"): where number is the column index. """ return self.rename( - columns={col: f"{prefix}_{i}" for i, col in enumerate(self.columns)} + columns={str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)} ) @@ -88,5 +94,10 @@ def __init__(self, df: dd.DataFrame, column_container: ColumnContainer): self.column_container = column_container def assign(self) -> dd.DataFrame: - df = self.df.rename(columns=self.column_container.reverse_mapping()) + df = self.df.assign( + **{ + col_from: self.df[col_to] + for col_from, col_to in self.column_container.mapping() + } + ) return df[self.column_container.columns] diff --git a/dask_sql/physical/rel/logical/values.py b/dask_sql/physical/rel/logical/values.py index a49bd77f1..0e3a5ecde 100644 --- a/dask_sql/physical/rel/logical/values.py +++ b/dask_sql/physical/rel/logical/values.py @@ -32,12 +32,23 @@ def convert( rex_expression_rows = list(rel.getTuples()) rows = [] - for rex_expressions in rex_expression_rows: - rows.append([RexConverter.convert(rex, None) for rex in rex_expressions]) + for rex_expression_row in rex_expression_rows: + # We convert each of the cells in the row + # using a RexConverter. + # As we do not have any information on the + # column headers, we just name them with + # their index. + rows.append( + { + str(i): RexConverter.convert(rex_cell, None) + for i, rex_cell in enumerate(rex_expression_row) + } + ) - # We assume here that when using the values plan, the resulting dataframe will be quite small # TODO: we explicitely reference pandas and dask here -> might we worth making this more general - df = dd.from_pandas(pd.DataFrame(rows), npartitions=1) + # We assume here that when using the values plan, the resulting dataframe will be quite small + df = pd.DataFrame(rows) + df = dd.from_pandas(df, npartitions=1) cc = ColumnContainer(df.columns) return DataContainer(df, cc) From 503e7a122d90a57d7f98bcdcf747581b22858cd0 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Tue, 8 Sep 2020 18:25:49 +0200 Subject: [PATCH 07/10] Add a shortcut to not create the same column again and again --- dask_sql/physical/rel/logical/project.py | 28 +++++++++++++++++------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/dask_sql/physical/rel/logical/project.py b/dask_sql/physical/rel/logical/project.py index 930feef4f..d371185f4 100644 --- a/dask_sql/physical/rel/logical/project.py +++ b/dask_sql/physical/rel/logical/project.py @@ -3,8 +3,10 @@ import dask.dataframe as dd from dask_sql.physical.rex import RexConverter +from dask_sql.physical.rex.core.input_ref import RexInputRefPlugin from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.datacontainer import DataContainer +from dask_sql.java import get_java_class class LogicalProjectPlugin(BaseRelPlugin): @@ -25,20 +27,30 @@ def convert( df = dc.df cc = dc.column_container - # Collect all new columns + # Collect all (new) columns named_projects = rel.getNamedProjects() - # TODO: we do not need to create a new column if we already have it + column_names = [] new_columns = {} for expr, key in named_projects: - new_columns[str(key)] = RexConverter.convert(expr, dc=dc) - - df = df.assign(**new_columns) - for new_column in new_columns: - cc = cc.add(new_column) + key = str(key) + column_names.append(key) + + # shortcut: if we have a column already, there is no need to re-assign it again + # this is only the case if the expr is a RexInputRef + if get_java_class(expr) == RexInputRefPlugin.class_name: + index = expr.getIndex() + backend_column_name = cc.get_backend_by_frontend_index(index) + cc = cc.add(key, backend_column_name) + else: + new_columns[key] = RexConverter.convert(expr, dc=dc) + cc = cc.add(key, key) + + # Actually add the new columns + if new_columns: + df = df.assign(**new_columns) # Make sure the order is correct - column_names = list(new_columns.keys()) cc = cc.limit_to(column_names) cc = self.fix_column_to_row_type(cc, rel.getRowType()) From d7d08a71b7bd0bc07e2259bdff479adb004e738d Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Tue, 8 Sep 2020 18:27:34 +0200 Subject: [PATCH 08/10] Add a test for aliases --- tests/integration/test_select.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integration/test_select.py b/tests/integration/test_select.py index 84f00f335..27414340c 100644 --- a/tests/integration/test_select.py +++ b/tests/integration/test_select.py @@ -13,6 +13,15 @@ def test_select(self): assert_frame_equal(df, self.df) + def test_select_alias(self): + df = self.c.sql("SELECT a as b, b as a FROM df") + df = df.compute() + + expected_df = self.df + expected_df.assign(a=self.df.b, b=self.df.a) + + assert_frame_equal(df, expected_df) + def test_select_column(self): df = self.c.sql("SELECT a FROM df") df = df.compute() From 7872f1c9cec22bc9609ab5f231652708cb963ae7 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Wed, 9 Sep 2020 23:46:28 +0200 Subject: [PATCH 09/10] Re-add the column fixing --- dask_sql/physical/rel/logical/join.py | 1 + dask_sql/physical/rel/logical/sort.py | 1 + dask_sql/physical/rel/logical/values.py | 1 + 3 files changed, 3 insertions(+) diff --git a/dask_sql/physical/rel/logical/join.py b/dask_sql/physical/rel/logical/join.py index d76c658f0..07d391d42 100644 --- a/dask_sql/physical/rel/logical/join.py +++ b/dask_sql/physical/rel/logical/join.py @@ -132,6 +132,7 @@ def convert( for from_col, to_col in zip(cc.columns, field_specifications) } ) + cc = self.fix_column_to_row_type(cc, rel.getRowType()) dc = DataContainer(df, cc) # 7. Last but not least we apply any filters by and-chaining together the filters diff --git a/dask_sql/physical/rel/logical/sort.py b/dask_sql/physical/rel/logical/sort.py index 8afe80a6e..0612ddd3b 100644 --- a/dask_sql/physical/rel/logical/sort.py +++ b/dask_sql/physical/rel/logical/sort.py @@ -50,6 +50,7 @@ def convert( if offset is not None or end is not None: df = self._apply_offset(df, offset, end) + cc = self.fix_column_to_row_type(cc, rel.getRowType()) return DataContainer(df, cc) def _apply_sort( diff --git a/dask_sql/physical/rel/logical/values.py b/dask_sql/physical/rel/logical/values.py index 0e3a5ecde..e3ae59a34 100644 --- a/dask_sql/physical/rel/logical/values.py +++ b/dask_sql/physical/rel/logical/values.py @@ -51,4 +51,5 @@ def convert( df = dd.from_pandas(df, npartitions=1) cc = ColumnContainer(df.columns) + cc = self.fix_column_to_row_type(cc, rel.getRowType()) return DataContainer(df, cc) From 404eb9b7bdd16491b18c34d2925aff3b1eb1ebd0 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Thu, 10 Sep 2020 22:03:50 +0200 Subject: [PATCH 10/10] Some more comments --- dask_sql/datacontainer.py | 66 ++++++++++++++++++++-- dask_sql/physical/rel/logical/aggregate.py | 4 +- dask_sql/physical/rel/logical/filter.py | 2 + 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index 5ec041bc5..c5182566c 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -7,10 +7,20 @@ class ColumnContainer: + # Forward declaration pass class ColumnContainer: + """ + Helper class to store a list of columns, + which do not necessarily be the ones of the dask dataframe. + Instead, the container also stores a mapping from "frontend" + columns (columns with the names and order expected by SQL) + to "backend" columns (the real column names used by dask) + to prevent unnecessary renames. + """ + def __init__( self, frontend_columns: List[str], @@ -28,15 +38,29 @@ def __init__( self._frontend_backend_mapping = frontend_backend_mapping def _copy(self) -> ColumnContainer: + """ + Internal function to copy this container + """ return ColumnContainer(self._frontend_columns, self._frontend_backend_mapping) def limit_to(self, fields: List[str]) -> ColumnContainer: + """ + Create a new ColumnContainer, which has frontend columns + limited to only the ones given as parameter. + Also uses the order of these as the new column order. + """ assert all(f in self._frontend_backend_mapping for f in fields) cc = self._copy() cc._frontend_columns = [str(x) for x in fields] return cc - def rename(self, columns: Dict[str, ColumnType]) -> ColumnContainer: + def rename(self, columns: Dict[str, str]) -> ColumnContainer: + """ + Return a new ColumnContainer where the frontend columns + are renamed according to the given mapping. + Columns not present in the mapping are not touched, + the order is preserved. + """ cc = self._copy() for column_from, column_to in columns.items(): backend_column = self._frontend_backend_mapping[str(column_from)] @@ -49,16 +73,27 @@ def rename(self, columns: Dict[str, ColumnType]) -> ColumnContainer: return cc - def mapping(self) -> List[Tuple[str, str]]: + def mapping(self) -> List[Tuple[str, ColumnType]]: + """ + The mapping from frontend columns to backend columns. + """ return list(self._frontend_backend_mapping.items()) @property def columns(self) -> List[str]: + """ + The stored frontend columns in the correct order + """ return self._frontend_columns def add( self, frontend_column: str, backend_column: Union[str, None] = None ) -> ColumnContainer: + """ + Return a new ColumnContainer with the + given column added. + The column is added at the last position in the column list. + """ cc = self._copy() frontend_column = str(frontend_column) @@ -71,6 +106,10 @@ def add( return cc def get_backend_by_frontend_index(self, index: int) -> str: + """ + Get back the dask column, which is referenced by the + frontend (SQL) column with the given index. + """ frontend_column = self._frontend_columns[index] backend_column = self._frontend_backend_mapping[frontend_column] return backend_column @@ -79,9 +118,9 @@ def make_unique(self, prefix="col"): """ Make sure we have unique column names by calling each column - prefix_number + _ - where number is the column index. + where is the column index. """ return self.rename( columns={str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)} @@ -89,11 +128,30 @@ def make_unique(self, prefix="col"): class DataContainer: + """ + In SQL, every column operation or reference is done via + the column index. Some dask operations, such as grouping, + joining or concatenating preserve the columns in a different + order than SQL would expect. + However, we do not want to change the column data itself + all the time (because this would lead to computational overhead), + but still would like to keep the columns accessible by name and index. + For this, we add an additional `ColumnContainer` to each dataframe, + which does all the column mapping between "frontend" + (what SQL expects, also in the correct order) + and "backend" (what dask has). + """ + def __init__(self, df: dd.DataFrame, column_container: ColumnContainer): self.df = df self.column_container = column_container def assign(self) -> dd.DataFrame: + """ + Combine the column mapping with the actual data and return + a dataframe which has the the columns specified in the + stored ColumnContainer. + """ df = self.df.assign( **{ col_from: self.df[col_to] diff --git a/dask_sql/physical/rel/logical/aggregate.py b/dask_sql/physical/rel/logical/aggregate.py index 74405a040..23ed9b7ab 100644 --- a/dask_sql/physical/rel/logical/aggregate.py +++ b/dask_sql/physical/rel/logical/aggregate.py @@ -73,9 +73,11 @@ def convert( group_columns = [additional_column_name] additional_column_needed = True + # If needed, add a column to group by which is + # the same for all rows if additional_column_needed: df = df.assign(**{additional_column_name: 1}) - cc.add(additional_column_name) + cc = cc.add(additional_column_name) # Now we can perform the aggregates df = df.groupby(by=group_columns).agg(aggregations) diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index ea7749ab2..229dd76c7 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -22,6 +22,8 @@ def convert( df = dc.df cc = dc.column_container + # Every logic is handled in the RexConverter + # we just need to apply it here condition = rel.getCondition() df_condition = RexConverter.convert(condition, dc) df = df[df_condition]