Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from dask_sql.input_utils import InputType, InputUtil
from dask_sql.integrations.ipython import ipython_integration
from dask_sql.java import com, get_java_class, org
from dask_sql.java import com, get_java_class, java, org
from dask_sql.mappings import python_to_sql_type
from dask_sql.physical.rel import RelConverter, custom, logical
from dask_sql.physical.rex import RexConverter, core
Expand Down Expand Up @@ -853,7 +853,7 @@ def _get_ral(self, sql):
)

generator_builder = RelationalAlgebraGeneratorBuilder(
self.schema_name, case_sensitive
self.schema_name, case_sensitive, java.util.ArrayList()
)
for schema in schemas:
generator_builder = generator_builder.addSchema(schema)
Expand Down
124 changes: 85 additions & 39 deletions planner/src/main/java/com/dask/sql/application/DaskPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.rules.DateRangeRules;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.PruneEmptyRules;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* DaskPlanner is a cost-based optimizer based on the Calcite VolcanoPlanner.
*
Expand All @@ -33,44 +39,60 @@ public class DaskPlanner extends VolcanoPlanner {

private final Context defaultContext;

public DaskPlanner() {
// Allow transformation between logical and dask nodes
addRule(DaskAggregateRule.INSTANCE);
addRule(DaskFilterRule.INSTANCE);
addRule(DaskJoinRule.INSTANCE);
addRule(DaskProjectRule.INSTANCE);
addRule(DaskSampleRule.INSTANCE);
addRule(DaskSortLimitRule.INSTANCE);
addRule(DaskTableScanRule.INSTANCE);
addRule(DaskUnionRule.INSTANCE);
addRule(DaskValuesRule.INSTANCE);
addRule(DaskWindowRule.INSTANCE);

// Set of core rules
addRule(PruneEmptyRules.UNION_INSTANCE);
addRule(PruneEmptyRules.INTERSECT_INSTANCE);
addRule(PruneEmptyRules.MINUS_INSTANCE);
addRule(PruneEmptyRules.PROJECT_INSTANCE);
addRule(PruneEmptyRules.FILTER_INSTANCE);
addRule(PruneEmptyRules.SORT_INSTANCE);
addRule(PruneEmptyRules.AGGREGATE_INSTANCE);
addRule(PruneEmptyRules.JOIN_LEFT_INSTANCE);
addRule(PruneEmptyRules.JOIN_RIGHT_INSTANCE);
addRule(PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE);
addRule(DateRangeRules.FILTER_INSTANCE);
addRule(CoreRules.INTERSECT_TO_DISTINCT);
addRule(CoreRules.PROJECT_FILTER_TRANSPOSE);
addRule(CoreRules.FILTER_PROJECT_TRANSPOSE);
addRule(CoreRules.FILTER_INTO_JOIN);
addRule(CoreRules.JOIN_PUSH_EXPRESSIONS);
addRule(CoreRules.FILTER_AGGREGATE_TRANSPOSE);
addRule(CoreRules.PROJECT_WINDOW_TRANSPOSE);
addRule(CoreRules.JOIN_COMMUTE);
addRule(CoreRules.FILTER_INTO_JOIN);
addRule(CoreRules.PROJECT_JOIN_TRANSPOSE);
addRule(CoreRules.SORT_PROJECT_TRANSPOSE);
addRule(CoreRules.SORT_JOIN_TRANSPOSE);
addRule(CoreRules.SORT_UNION_TRANSPOSE);
public static final ArrayList<RelOptRule> ALL_RULES = new ArrayList<>(
Arrays.asList(
// Allow transformation between logical and dask nodes
DaskAggregateRule.INSTANCE,
DaskFilterRule.INSTANCE,
DaskJoinRule.INSTANCE,
DaskProjectRule.INSTANCE,
DaskSampleRule.INSTANCE,
DaskSortLimitRule.INSTANCE,
DaskTableScanRule.INSTANCE,
DaskUnionRule.INSTANCE,
DaskValuesRule.INSTANCE,
DaskWindowRule.INSTANCE,

// Set of core rules
PruneEmptyRules.UNION_INSTANCE,
PruneEmptyRules.INTERSECT_INSTANCE,
PruneEmptyRules.MINUS_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
PruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE,
DateRangeRules.FILTER_INSTANCE,
CoreRules.INTERSECT_TO_DISTINCT,
CoreRules.PROJECT_FILTER_TRANSPOSE,
CoreRules.FILTER_PROJECT_TRANSPOSE,
CoreRules.FILTER_INTO_JOIN,
CoreRules.JOIN_PUSH_EXPRESSIONS,
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
CoreRules.PROJECT_WINDOW_TRANSPOSE,
CoreRules.JOIN_COMMUTE,
CoreRules.PROJECT_JOIN_TRANSPOSE,
CoreRules.SORT_PROJECT_TRANSPOSE,
CoreRules.SORT_JOIN_TRANSPOSE,
CoreRules.SORT_UNION_TRANSPOSE)
);

private ArrayList<RelOptRule> disabledRules = new ArrayList<>();

public DaskPlanner(ArrayList<RelOptRule> disabledRules) {

if (disabledRules != null) {
this.disabledRules = disabledRules;

// Iterate through all rules and only add the ones not disabled
for (RelOptRule rule : ALL_RULES) {
if (!disabledRules.contains(rule)) {
addRule(rule);
}
}
}

// Enable conventions to turn from logical to dask
addRelTraitDef(ConventionTraitDef.INSTANCE);
Expand All @@ -86,4 +108,28 @@ public DaskPlanner() {
public Context getContext() {
return defaultContext;
}

public List<String> getAllRules() {
return this.ALL_RULES.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
}

public List<String> getDisabledRules() {
if (this.disabledRules != null) {
return this.disabledRules.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
} else {
return new ArrayList<String>();
}
}

public List<String> getEnabledRules() {
ArrayList<RelOptRule> enabledRules = this.ALL_RULES;
enabledRules.removeAll(this.disabledRules);
return enabledRules.stream()
.map(object -> Objects.toString(object, null))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.dask.sql.application;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import com.dask.sql.schema.DaskSchema;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.SqlDialect;
Expand All @@ -28,8 +31,23 @@ public class RelationalAlgebraGenerator {

public RelationalAlgebraGenerator(final String rootSchemaName,
final List<DaskSchema> schemas,
final boolean case_sensitive) throws SQLException {
this.planner = new DaskPlanner();
final boolean case_sensitive,
ArrayList<String> disabledRulesPython) throws SQLException {
ArrayList<RelOptRule> disabledRules = new ArrayList<>();
if (disabledRulesPython != null) {
// Operationally complex but fine for object instantiation and seen
// as a trade-off since this structure is more efficient in other areas
for (String pythonRuleString : disabledRulesPython) {
for (RelOptRule rule : DaskPlanner.ALL_RULES) {
if (rule.toString().equalsIgnoreCase(pythonRuleString)) {
disabledRules.add(rule);
break;
}
}
}
}

this.planner = new DaskPlanner(disabledRules);
this.sqlToRelConverter = new DaskSqlToRelConverter(this.planner, rootSchemaName, schemas, case_sensitive);
this.program = new DaskProgram(this.planner);
this.parser = new DaskSqlParser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ public class RelationalAlgebraGeneratorBuilder {
private final String rootSchemaName;
private final List<DaskSchema> schemas;
private final boolean case_sensitive; // False if case should be ignored when comparing SQLNode(s)
private final ArrayList<String> disabled_rules; // CBO rules that should be disabled for this context

public RelationalAlgebraGeneratorBuilder(final String rootSchemaName, final boolean case_sensitive) {
public RelationalAlgebraGeneratorBuilder(final String rootSchemaName, final boolean case_sensitive, final ArrayList<String> disabled_rules) {
this.rootSchemaName = rootSchemaName;
this.schemas = new ArrayList<>();
this.case_sensitive = case_sensitive;
this.disabled_rules = disabled_rules;
}

public RelationalAlgebraGeneratorBuilder addSchema(final DaskSchema schema) {
Expand All @@ -27,6 +29,6 @@ public RelationalAlgebraGeneratorBuilder addSchema(final DaskSchema schema) {
}

public RelationalAlgebraGenerator build() throws ClassNotFoundException, SQLException {
return new RelationalAlgebraGenerator(rootSchemaName, schemas, this.case_sensitive);
return new RelationalAlgebraGenerator(rootSchemaName, schemas, this.case_sensitive, this.disabled_rules);
}
}