From ec90764329232705225220b989de8d3c8d8a64a8 Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Tue, 15 Aug 2023 12:22:26 +0800 Subject: [PATCH] Implement Parallel-aware Hash Left Anti Semi (Not-In) Join For parallel-aware hash join, we need to sync between parallel workers to tell the right results when there are NULL values. If we are LASJ and found NULL value by ourself or sibling processes had found NULL values, quit and tell siblings to quit if possible. It's safe to fetch and set phs_lasj_has_null without lock here and at other places. As it's a boolean and we don't need to have the most recent value from CPU or Mem cache. And we should avoid more locks in HashJion Impl. If we miss it here and some others set it at the same time, just bypass and we may get it at the next Hash batch. If we missed it across all batches, we will know it when PHJ_BUILD_HASHING_INNER ends with the help of build_barrier. If we never participated in building hash table, check it when hash table creation job is finished. explain(costs off) select c1 from ao1 where c1 not in(select c2 from ao2); QUERY PLAN ---------------------------------------------------------------------- Gather Motion 12:1 (slice1; segments: 12) -> Parallel Hash Left Anti Semi (Not-In) Join Hash Cond: (ao1.c1 = ao2.c2) -> Parallel Seq Scan on ao1 -> Parallel Hash -> Parallel Broadcast Motion 12:12 (slice2; segments:12) -> Parallel Seq Scan on ao2 Optimizer: Postgres query optimizer (8 rows) Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/cdb/cdbpath.c | 7 -- src/backend/executor/nodeHash.c | 59 ++++++++++++++- src/backend/executor/nodeHashjoin.c | 1 + src/backend/optimizer/path/joinpath.c | 1 - src/include/executor/hashjoin.h | 1 + src/test/regress/expected/gp_parallel.out | 88 +++++++++++++++++++++-- src/test/regress/sql/gp_parallel.sql | 45 ++++++++++-- 7 files changed, 181 insertions(+), 21 deletions(-) diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index a9a2b8ee1dd..93b249f2b95 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -2980,13 +2980,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, outer.ok_to_replicate = !outer.has_wts; inner.ok_to_replicate = true; - /* - * For parallel mode, join is executed by each batches. - * It is hard to tell whether null exists in the whole table. - */ - if (parallel_aware && jointype == JOIN_LASJ_NOTIN) - goto fail; - switch (jointype) { case JOIN_INNER: diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 9c17ecbdfe3..0b1213e589e 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -324,6 +324,14 @@ MultiExecParallelHash(HashState *node) { bool hashkeys_null = false; + /* CBDB_PARALLEL: Siblings must have found null value. */ + if (pstate->phs_lasj_has_null) + { + node->hs_hashkeys_null = true; + ExecSquelchNode(outerNode); + break; + } + slot = ExecProcNode(outerNode); if (TupIsNull(slot)) break; @@ -333,14 +341,40 @@ MultiExecParallelHash(HashState *node) &hashvalue, &hashkeys_null)) ExecParallelHashTableInsert(hashtable, slot, hashvalue); hashtable->partialTuples++; + + if (node->hs_quit_if_hashkeys_null && hashkeys_null) + { + /* CBDB_PARALLEL: + * If we are LASJ and found NULL value by ourself or sibling processes had + * found NULL values, quit and tell siblings to quit if possible. + * + * It's safe to fetch and set phs_lasj_has_null without lock here and at + * other places. As it's a atomic boolean value. And we should avoid more locks in HashJion Impl. + * If other processes miss it here and some others set it at the same time, just bypass + * and we may get it at the next Hash batch. + * If we missed it across all batches, we will know it when PHJ_BUILD_HASHING_INNER + * ends with the help of build_barrier. + * If we never participated in building hash table, check it when hash table + * creation job is finished. + */ + pstate->phs_lasj_has_null = true; + pg_write_barrier(); + node->hs_hashkeys_null = true; + ExecSquelchNode(outerNode); + break; + } } + /* CBDB_PARALLEL: No need to flush tuples if phs_lasj_has_null. */ /* * Make sure that any tuples we wrote to disk are visible to * others before anyone tries to load them. */ - for (i = 0; i < hashtable->nbatch; ++i) - sts_end_write(hashtable->batches[i].inner_tuples); + if (!pstate->phs_lasj_has_null) + { + for (i = 0; i < hashtable->nbatch; ++i) + sts_end_write(hashtable->batches[i].inner_tuples); + } /* * Update shared counters. We need an accurate total tuple count @@ -366,9 +400,23 @@ MultiExecParallelHash(HashState *node) * skew). */ pstate->growth = PHJ_GROWTH_DISABLED; + /* In case we didn't find null values ourself. */ + if (pstate->phs_lasj_has_null) + { + node->hs_hashkeys_null = true; + return; + } } } + /* In case we didn't participate in PHJ_BUILD_HASHING_INNER */ + pg_memory_barrier(); + if (pstate->phs_lasj_has_null) + { + node->hs_hashkeys_null = true; + return; + } + /* * We're not yet attached to a batch. We all agree on the dimensions and * number of inner tuples (for the empty table optimization). @@ -3779,7 +3827,12 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); /* Detach from the batch we were last working on. */ - if (BarrierArriveAndDetach(&batch->batch_barrier)) + /* + * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware) + * If phs_lasj_has_null is true, that means we have found null when building hash table, + * there were no batches to detach. + */ + if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier)) { /* * Technically we shouldn't access the barrier because we're no diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index e49a857becf..67255b717d4 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -2065,6 +2065,7 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) BarrierInit(&pstate->sync_barrier, pcxt->nworkers); BarrierInit(&pstate->batch0_barrier, pcxt->nworkers); + pstate->phs_lasj_has_null = false; /* Set up the space we'll use for shared temporary files. */ SharedFileSetInit(&pstate->fileset, pcxt->seg); diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index c12317b2a89..d3220af0b48 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -2318,7 +2318,6 @@ hash_inner_and_outer(PlannerInfo *root, */ if (innerrel->partial_pathlist != NIL && save_jointype != JOIN_UNIQUE_INNER && - save_jointype != JOIN_LASJ_NOTIN && enable_parallel_hash) { cheapest_partial_inner = diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 0ecbcc63da0..05500c34526 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -286,6 +286,7 @@ typedef struct ParallelHashJoinState Barrier grow_buckets_barrier; Barrier sync_barrier; Barrier batch0_barrier; + volatile bool phs_lasj_has_null; /* LASJ has found null value, identify early quit */ pg_atomic_uint32 distributor; /* counter for load balancing */ SharedFileSet fileset; /* space for shared temporary files */ diff --git a/src/test/regress/expected/gp_parallel.out b/src/test/regress/expected/gp_parallel.out index 0ead2658f4c..daa16156db8 100644 --- a/src/test/regress/expected/gp_parallel.out +++ b/src/test/regress/expected/gp_parallel.out @@ -1728,10 +1728,12 @@ abort; create table t1(c1 int, c2 int) using ao_row distributed by (c1); create table t2(c1 int, c2 int) using ao_row distributed by (c1); create table t3_null(c1 int, c2 int) using ao_row distributed by (c1); -set enable_parallel = on; -set gp_appendonly_insert_files = 2; -set gp_appendonly_insert_files_tuples_range = 100; -set max_parallel_workers_per_gather = 2; +begin; +set local enable_parallel = on; +set local gp_appendonly_insert_files = 2; +set local gp_appendonly_insert_files_tuples_range = 100; +set local max_parallel_workers_per_gather = 2; +set local enable_parallel_hash = off; insert into t1 select i, i from generate_series(1, 5000000) i; insert into t2 select i+1, i from generate_series(1, 1200) i; insert into t3_null select i+1, i from generate_series(1, 1200) i; @@ -1779,7 +1781,7 @@ select * from t1 where c1 not in (select c1 from t3_null); (0 rows) -- non-parallel results. -set enable_parallel = off; +set local enable_parallel = off; select sum(t1.c1) from t1 where c1 not in (select c1 from t2); sum ---------------- @@ -1791,6 +1793,7 @@ select * from t1 where c1 not in (select c1 from t3_null); ----+---- (0 rows) +end; drop table t1; drop table t2; drop table t3_null; @@ -1798,6 +1801,81 @@ drop table t3_null; -- End of Test Parallel Hash Left Anti Semi (Not-In) Join. -- -- +-- Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- +begin; +create table t1(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t2(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t3_null(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +set local enable_parallel = on; +set local max_parallel_workers_per_gather = 2; +insert into t1 select i, i from generate_series(1, 500000) i; +insert into t2 select i, i+1 from generate_series(1, 500000) i; +insert into t3_null select i, i+1 from generate_series(1, 500000) i; +insert into t3_null values(NULL, NULL); +analyze t1; +analyze t2; +analyze t3_null; +explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); + QUERY PLAN +------------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> Parallel Hash Left Anti Semi (Not-In) Join + Hash Cond: (t1.c1 = t2.c2) + -> Parallel Seq Scan on t1 + -> Parallel Hash + -> Parallel Broadcast Motion 6:6 (slice2; segments: 6) + -> Parallel Seq Scan on t2 + Optimizer: Postgres query optimizer +(10 rows) + +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); + sum +----- + 1 +(1 row) + +explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); + QUERY PLAN +------------------------------------------------------------------------ + Gather Motion 6:1 (slice1; segments: 6) + -> Parallel Hash Left Anti Semi (Not-In) Join + Hash Cond: (t1.c1 = t3_null.c2) + -> Parallel Seq Scan on t1 + -> Parallel Hash + -> Parallel Broadcast Motion 6:6 (slice2; segments: 6) + -> Parallel Seq Scan on t3_null + Optimizer: Postgres query optimizer +(8 rows) + +select * from t1 where c1 not in (select c2 from t3_null); + c1 | c2 +----+---- +(0 rows) + +-- non-parallel results. +set local enable_parallel = off; +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); + sum +----- + 1 +(1 row) + +select * from t1 where c1 not in (select c2 from t3_null); + c1 | c2 +----+---- +(0 rows) + +drop table t1; +drop table t2; +drop table t3_null; +end; +-- +-- End of Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- +-- -- Test alter ao/aocs table parallel_workers options -- begin; diff --git a/src/test/regress/sql/gp_parallel.sql b/src/test/regress/sql/gp_parallel.sql index 16d2ef34afa..0e9c794bb34 100644 --- a/src/test/regress/sql/gp_parallel.sql +++ b/src/test/regress/sql/gp_parallel.sql @@ -491,10 +491,12 @@ abort; create table t1(c1 int, c2 int) using ao_row distributed by (c1); create table t2(c1 int, c2 int) using ao_row distributed by (c1); create table t3_null(c1 int, c2 int) using ao_row distributed by (c1); -set enable_parallel = on; -set gp_appendonly_insert_files = 2; -set gp_appendonly_insert_files_tuples_range = 100; -set max_parallel_workers_per_gather = 2; +begin; +set local enable_parallel = on; +set local gp_appendonly_insert_files = 2; +set local gp_appendonly_insert_files_tuples_range = 100; +set local max_parallel_workers_per_gather = 2; +set local enable_parallel_hash = off; insert into t1 select i, i from generate_series(1, 5000000) i; insert into t2 select i+1, i from generate_series(1, 1200) i; insert into t3_null select i+1, i from generate_series(1, 1200) i; @@ -507,9 +509,10 @@ select sum(t1.c1) from t1 where c1 not in (select c1 from t2); explain(costs off) select * from t1 where c1 not in (select c1 from t3_null); select * from t1 where c1 not in (select c1 from t3_null); -- non-parallel results. -set enable_parallel = off; +set local enable_parallel = off; select sum(t1.c1) from t1 where c1 not in (select c1 from t2); select * from t1 where c1 not in (select c1 from t3_null); +end; drop table t1; drop table t2; drop table t3_null; @@ -517,6 +520,38 @@ drop table t3_null; -- End of Test Parallel Hash Left Anti Semi (Not-In) Join. -- +-- +-- Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- +begin; +create table t1(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t2(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +create table t3_null(c1 int, c2 int) with(parallel_workers=2) distributed by (c1); +set local enable_parallel = on; +set local max_parallel_workers_per_gather = 2; +insert into t1 select i, i from generate_series(1, 500000) i; +insert into t2 select i, i+1 from generate_series(1, 500000) i; +insert into t3_null select i, i+1 from generate_series(1, 500000) i; +insert into t3_null values(NULL, NULL); +analyze t1; +analyze t2; +analyze t3_null; +explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); +explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); +select * from t1 where c1 not in (select c2 from t3_null); +-- non-parallel results. +set local enable_parallel = off; +select sum(t1.c1) from t1 where c1 not in (select c2 from t2); +select * from t1 where c1 not in (select c2 from t3_null); +drop table t1; +drop table t2; +drop table t3_null; +end; +-- +-- End of Test Parallel-aware Hash Left Anti Semi (Not-In) Join. +-- + -- -- Test alter ao/aocs table parallel_workers options --