Skip to content

Commit 9e0c786

Browse files
Alex Behmjenkins
authored andcommitted
IMPALA-1102: Clean up and fix removal of redundant join predicates.
Correct elimination of redundant join predicates relies on slot equivalences being enforced at the lowest possible plan node possibly by generating new predicates. Previously, we only enforced such equivalences at scan and aggregation nodes which is insufficient because join materialize a new tuple combination which may also require construction of new predicates to establish known slot equivalences. This patch generalies the existing helper function for constructing the minimum spanning tree to cover known slot equivalences for each equivalence class. The function is intended to be called during bottom-up plan generation at nodes that change the tuple composition (scans, joins, aggs, etc.) Change-Id: I73880310553c63296486b2f77a51618738005167 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/4781 Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: jenkins
1 parent 89c914f commit 9e0c786

File tree

4 files changed

+213
-113
lines changed

4 files changed

+213
-113
lines changed

fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java

Lines changed: 164 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -826,17 +826,12 @@ public void createAuxEquivPredicate(Expr lhs, Expr rhs) {
826826
/**
827827
* Creates an analyzed equality predicate between the given slots.
828828
*/
829-
public Expr createEqPredicate(SlotId lhsSlotId, SlotId rhsSlotId) {
829+
public BinaryPredicate createEqPredicate(SlotId lhsSlotId, SlotId rhsSlotId) {
830830
BinaryPredicate pred = new BinaryPredicate(BinaryPredicate.Operator.EQ,
831831
new SlotRef(globalState_.descTbl.getSlotDesc(lhsSlotId)),
832832
new SlotRef(globalState_.descTbl.getSlotDesc(rhsSlotId)));
833833
// analyze() creates casts, if needed
834-
try {
835-
pred.analyze(this);
836-
} catch (Exception e) {
837-
throw new IllegalStateException(
838-
"constructed predicate failed analysis: " + pred.toSql(), e);
839-
}
834+
pred.analyzeNoThrow(this);
840835
return pred;
841836
}
842837

@@ -1193,34 +1188,141 @@ public void invertOuterJoinState(TableRef oldRhsTbl, TableRef newRhsTbl) {
11931188

11941189
/**
11951190
* For each equivalence class, adds/removes predicates from conjuncts such that
1196-
* it contains a minimum set of <slot> = <slot> predicates that "cover" the equivalent
1197-
* slots belonging to tid. The returned predicates are a minimum spanning tree of the
1198-
* complete graph formed by connecting all of tid's equivalent slots of that class.
1199-
* Preserves original conjuncts when possible. Should be called by PlanNodes that
1200-
* materialize a new tuple and evaluate conjuncts (scan and aggregation nodes).
1191+
* it contains a minimum set of <lhsSlot> = <rhsSlot> predicates that establish
1192+
* the known equivalences between slots in lhsTids and rhsTid. Preserves original
1193+
* conjuncts when possible. Assumes that predicates for establishing equivalences
1194+
* among slots in only lhsTids and only rhsTid have already been established.
1195+
* This function adds the remaining predicates to "connect" the disjoint equivalent
1196+
* slot sets of lhsTids and rhsTid.
1197+
* The intent of this function is to enable construction of a minimum spanning tree
1198+
* to cover the known slot equivalences. This function should be called for join
1199+
* nodes during plan generation to (1) remove redundant join predicates, and (2)
1200+
* establish equivalences among slots materialized at that join node.
1201+
* TODO: Consider optimizing for the cheapest minimum set of predicates.
1202+
* TODO: Consider caching the DisjointSet during plan generation instead of
1203+
* re-creating it here on every invocation.
1204+
*/
1205+
public <T extends Expr> void createEquivConjuncts(List<TupleId> lhsTids, TupleId rhsTid,
1206+
List<T> conjuncts) {
1207+
Preconditions.checkState(!lhsTids.contains(rhsTid));
1208+
1209+
// Equivalence classes only containing slots belonging to lhsTids.
1210+
Map<EquivalenceClassId, List<SlotId>> planEquivClasses =
1211+
getEquivClasses(lhsTids);
1212+
1213+
// Equivalence classes only containing slots belonging to rhsTid.
1214+
Map<EquivalenceClassId, List<SlotId>> tidEquivClasses =
1215+
getEquivClasses(Lists.newArrayList(rhsTid));
1216+
1217+
// Maps from a slot id to its set of equivalent slots. Used to track equivalences
1218+
// that have been established by predicates assigned/generated to plan nodes
1219+
// materializing lhsTids as well as the given conjuncts.
1220+
DisjointSet<SlotId> partialEquivSlots = new DisjointSet<SlotId>();
1221+
// Add the partial equivalences to the partialEquivSlots map. The equivalent-slot
1222+
// sets of slots from lhsTids are disjoint from those of slots from rhsTid.
1223+
// We need to 'connect' the disjoint slot sets by constructing a new predicate
1224+
// for each equivalence class (unless there is already one in 'conjuncts').
1225+
for (List<SlotId> partialEquivClass: planEquivClasses.values()) {
1226+
partialEquivSlots.bulkUnion(partialEquivClass);
1227+
}
1228+
for (List<SlotId> partialEquivClass: tidEquivClasses.values()) {
1229+
partialEquivSlots.bulkUnion(partialEquivClass);
1230+
}
1231+
1232+
// Set of outer-joined slots referenced by conjuncts.
1233+
Set<SlotId> outerJoinedSlots = Sets.newHashSet();
1234+
1235+
// Update partialEquivSlots based on equality predicates in 'conjuncts'. Removes
1236+
// redundant conjuncts, unless they reference outer-joined slots (see below).
1237+
Iterator<T> conjunctIter = conjuncts.iterator();
1238+
while (conjunctIter.hasNext()) {
1239+
Expr conjunct = conjunctIter.next();
1240+
Pair<SlotId, SlotId> eqSlots = BinaryPredicate.getEqSlots(conjunct);
1241+
if (eqSlots == null) continue;
1242+
EquivalenceClassId firstEqClassId = getEquivClassId(eqSlots.first);
1243+
EquivalenceClassId secondEqClassId = getEquivClassId(eqSlots.second);
1244+
// slots may not be in the same eq class due to outer joins
1245+
if (!firstEqClassId.equals(secondEqClassId)) continue;
1246+
1247+
// Retain an otherwise redundant predicate if it references a slot of an
1248+
// outer-joined tuple that is not already referenced by another join predicate
1249+
// to maintain that the rows must satisfy outer-joined-slot IS NOT NULL
1250+
// (otherwise NULL tuples from outer joins could survive).
1251+
// TODO: Consider better fixes for outer-joined slots: (1) Create IS NOT NULL
1252+
// predicates and place them at the lowest possible plan node. (2) Convert outer
1253+
// joins into inner joins (or full outer joins into left/right outer joins).
1254+
boolean filtersOuterJoinNulls = false;
1255+
if (isOuterJoined(eqSlots.first)
1256+
&& lhsTids.contains(getTupleId(eqSlots.first))
1257+
&& !outerJoinedSlots.contains(eqSlots.first)) {
1258+
outerJoinedSlots.add(eqSlots.first);
1259+
filtersOuterJoinNulls = true;
1260+
}
1261+
if (isOuterJoined(eqSlots.second)
1262+
&& lhsTids.contains(getTupleId(eqSlots.second))
1263+
&& !outerJoinedSlots.contains(eqSlots.second)) {
1264+
outerJoinedSlots.add(eqSlots.second);
1265+
filtersOuterJoinNulls = true;
1266+
}
1267+
// retain conjunct if it connects two formerly unconnected equiv classes or
1268+
// it is required for outer-join semantics
1269+
if (!partialEquivSlots.union(eqSlots.first, eqSlots.second)
1270+
&& !filtersOuterJoinNulls) {
1271+
conjunctIter.remove();
1272+
}
1273+
}
1274+
1275+
// For each equivalence class, construct a new predicate to 'connect' the disjoint
1276+
// slot sets.
1277+
for (Map.Entry<EquivalenceClassId, List<SlotId>> tidEquivClass:
1278+
tidEquivClasses.entrySet()) {
1279+
List<SlotId> lhsSlots = planEquivClasses.get(tidEquivClass.getKey());
1280+
if (lhsSlots == null) continue;
1281+
List<SlotId> rhsSlots = tidEquivClass.getValue();
1282+
Preconditions.checkState(!lhsSlots.isEmpty() && !rhsSlots.isEmpty());
1283+
1284+
if (!partialEquivSlots.union(lhsSlots.get(0), rhsSlots.get(0))) continue;
1285+
T newEqPred = (T) createEqPredicate(lhsSlots.get(0), rhsSlots.get(0));
1286+
newEqPred.analyzeNoThrow(this);
1287+
conjuncts.add(newEqPred);
1288+
}
1289+
}
1290+
1291+
/**
1292+
* For each equivalence class, adds/removes predicates from conjuncts such that
1293+
* it contains a minimum set of <slot> = <slot> predicates that establish
1294+
* the known equivalences between slots belonging to tid. Preserves original
1295+
* conjuncts when possible.
1296+
* The intent of this function is to enable construction of a minimum spanning tree
1297+
* to cover the known slot equivalences. This function should be called to add
1298+
* conjuncts to plan nodes that materialize a new tuple, e.g., scans and aggregations.
12011299
* Does not enforce equivalence between slots in ignoreSlots. Equivalences (if any)
1202-
* among slots in ignoreSlots can be assumed to have already been enforced.
1300+
* among slots in ignoreSlots are assumed to have already been enforced.
12031301
* TODO: Consider optimizing for the cheapest minimum set of predicates.
12041302
*/
1205-
public void createEquivConjuncts(TupleId tid, List<Expr> conjuncts,
1303+
public <T extends Expr> void createEquivConjuncts(TupleId tid, List<T> conjuncts,
12061304
Set<SlotId> ignoreSlots) {
1207-
// Tracks slot equivalences established by the given conjuncts and ignored slots.
1208-
// Maps each slot id to its set of equivalenc slots.
1209-
DisjointSet<SlotId> conjunctsEquivSlots = new DisjointSet<SlotId>();
1305+
// Maps from a slot id to its set of equivalent slots. Used to track equivalences
1306+
// that have been established by 'conjuncts' and the 'ignoredsSlots'.
1307+
DisjointSet<SlotId> partialEquivSlots = new DisjointSet<SlotId>();
1308+
12101309
// Treat ignored slots as already connected. Add the ignored slots at this point
12111310
// such that redundant conjuncts are removed.
1212-
conjunctsEquivSlots.bulkUnion(ignoreSlots);
1213-
conjunctsEquivSlots.checkConsistency();
1214-
Iterator<Expr> conjunctIter = conjuncts.iterator();
1311+
partialEquivSlots.bulkUnion(ignoreSlots);
1312+
partialEquivSlots.checkConsistency();
1313+
1314+
// Update partialEquivSlots based on equality predicates in 'conjuncts'. Removes
1315+
// redundant conjuncts, unless they reference outer-joined slots (see below).
1316+
Iterator<T> conjunctIter = conjuncts.iterator();
12151317
while (conjunctIter.hasNext()) {
12161318
Expr conjunct = conjunctIter.next();
12171319
Pair<SlotId, SlotId> eqSlots = BinaryPredicate.getEqSlots(conjunct);
12181320
if (eqSlots == null) continue;
1219-
EquivalenceClassId lhsEqClassId = getEquivClassId(eqSlots.first);
1220-
EquivalenceClassId rhsEqClassId = getEquivClassId(eqSlots.second);
1321+
EquivalenceClassId firstEqClassId = getEquivClassId(eqSlots.first);
1322+
EquivalenceClassId secondEqClassId = getEquivClassId(eqSlots.second);
12211323
// slots may not be in the same eq class due to outer joins
1222-
if (!lhsEqClassId.equals(rhsEqClassId)) continue;
1223-
if (!conjunctsEquivSlots.union(eqSlots.first, eqSlots.second)) {
1324+
if (!firstEqClassId.equals(secondEqClassId)) continue;
1325+
if (!partialEquivSlots.union(eqSlots.first, eqSlots.second)) {
12241326
// conjunct is redundant
12251327
conjunctIter.remove();
12261328
}
@@ -1239,41 +1341,26 @@ public void createEquivConjuncts(TupleId tid, List<Expr> conjuncts,
12391341
// Assuming e1 = {s1, s2, s3, s4, s5} we need to generate one additional equality
12401342
// predicate to "connect" {s1, s2} and {s3, s4, s5}.
12411343

1242-
// Equivalences among slots belonging to tid by equivalence class. These equivalences
1243-
// are derived from the whole query and must hold for the final query result.
1244-
Map<EquivalenceClassId, List<SlotId>> targetEquivSlots = Maps.newHashMap();
1245-
TupleDescriptor tupleDesc = getTupleDesc(tid);
1246-
for (SlotDescriptor slotDesc: tupleDesc.getSlots()) {
1247-
EquivalenceClassId eqClassId = getEquivClassId(slotDesc.getId());
1248-
// Ignore equivalence classes that are empty or only have a single member.
1249-
if (globalState_.equivClassMembers.get(eqClassId).size() <= 1) continue;
1250-
List<SlotId> slotIds = targetEquivSlots.get(eqClassId);
1251-
if (slotIds == null) {
1252-
slotIds = Lists.newArrayList();
1253-
targetEquivSlots.put(eqClassId, slotIds);
1254-
}
1255-
slotIds.add(slotDesc.getId());
1256-
}
1257-
1258-
// For each equivalence class, add missing predicates to conjuncts to form the
1259-
// minimum spanning tree.
1260-
for (Map.Entry<EquivalenceClassId, List<SlotId>> targetEqClass:
1261-
targetEquivSlots.entrySet()) {
1262-
List<SlotId> equivClassSlots = targetEqClass.getValue();
1263-
if (equivClassSlots.size() < 2) continue;
1264-
1344+
// These are the equivalences that need to be established by constructing conjuncts
1345+
// to form a minimum spanning tree.
1346+
Map<EquivalenceClassId, List<SlotId>> targetEquivClasses =
1347+
getEquivClasses(Lists.newArrayList(tid));
1348+
for (Map.Entry<EquivalenceClassId, List<SlotId>> targetEquivClass:
1349+
targetEquivClasses.entrySet()) {
12651350
// Loop over all pairs of equivalent slots and merge their disjoint slots sets,
12661351
// creating missing equality predicates as necessary.
1352+
List<SlotId> slotIds = targetEquivClass.getValue();
12671353
boolean done = false;
1268-
for (int i = 1; i < equivClassSlots.size(); ++i) {
1269-
SlotId rhs = equivClassSlots.get(i);
1354+
for (int i = 1; i < slotIds.size(); ++i) {
1355+
SlotId rhs = slotIds.get(i);
12701356
for (int j = 0; j < i; ++j) {
1271-
SlotId lhs = equivClassSlots.get(j);
1272-
if (!conjunctsEquivSlots.union(lhs, rhs)) continue;
1273-
Expr newEqPred = createEqPredicate(lhs, rhs);
1357+
SlotId lhs = slotIds.get(j);
1358+
if (!partialEquivSlots.union(lhs, rhs)) continue;
1359+
T newEqPred = (T) createEqPredicate(lhs, rhs);
1360+
newEqPred.analyzeNoThrow(this);
12741361
conjuncts.add(newEqPred);
12751362
// Check for early termination.
1276-
if (conjunctsEquivSlots.get(lhs).size() == equivClassSlots.size()) {
1363+
if (partialEquivSlots.get(lhs).size() == slotIds.size()) {
12771364
done = true;
12781365
break;
12791366
}
@@ -1283,10 +1370,32 @@ public void createEquivConjuncts(TupleId tid, List<Expr> conjuncts,
12831370
}
12841371
}
12851372

1286-
public void createEquivConjuncts(TupleId tid, List<Expr> conjuncts) {
1373+
public <T extends Expr> void createEquivConjuncts(TupleId tid, List<T> conjuncts) {
12871374
createEquivConjuncts(tid, conjuncts, new HashSet<SlotId>());
12881375
}
12891376

1377+
/**
1378+
* Returns a map of partial equivalence classes that only contains slot ids belonging
1379+
* to the given tuple ids. Only contains equivalence classes with more than one member.
1380+
*/
1381+
private Map<EquivalenceClassId, List<SlotId>> getEquivClasses(List<TupleId> tids) {
1382+
Map<EquivalenceClassId, List<SlotId>> result = Maps.newHashMap();
1383+
for (TupleId tid: tids) {
1384+
for (SlotDescriptor slotDesc: getTupleDesc(tid).getSlots()) {
1385+
EquivalenceClassId eqClassId = getEquivClassId(slotDesc.getId());
1386+
// Ignore equivalence classes that are empty or only have a single member.
1387+
if (globalState_.equivClassMembers.get(eqClassId).size() <= 1) continue;
1388+
List<SlotId> slotIds = result.get(eqClassId);
1389+
if (slotIds == null) {
1390+
slotIds = Lists.newArrayList();
1391+
result.put(eqClassId, slotIds);
1392+
}
1393+
slotIds.add(slotDesc.getId());
1394+
}
1395+
}
1396+
return result;
1397+
}
1398+
12901399
/**
12911400
* Returns a list of slot mappings from srcTid to destTid for the purpose of predicate
12921401
* propagation. Each mapping assigns every slot in srcSids to an equivalent slot in
@@ -1597,6 +1706,7 @@ public boolean hasUnassignedConjuncts() {
15971706
if (globalState_.assignedConjuncts.contains(id)) continue;
15981707
Expr e = globalState_.conjuncts.get(id);
15991708
if (e.isAuxExpr()) continue;
1709+
LOG.trace("unassigned: " + e.toSql() + " " + e.debugString());
16001710
return true;
16011711
}
16021712
return false;

0 commit comments

Comments
 (0)