diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c2b72a87e67..7b4864f529d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -39,6 +39,7 @@ #include "catalog/storage_tablespace.h" #include "catalog/storage_database.h" #include "commands/async.h" +#include "commands/matview.h" #include "commands/dbcommands.h" #include "commands/extension.h" #include "commands/resgroupcmds.h" @@ -3565,6 +3566,7 @@ AbortTransaction(void) AtAbort_Notify(); AtEOXact_RelationMap(false, is_parallel_worker); AtAbort_Twophase(); + AtAbort_IVM(); /* * Advertise the fact that we aborted in pg_xact (assuming that we got as @@ -6152,6 +6154,9 @@ AbortSubTransaction(void) AbortBufferIO(); UnlockBuffers(); + /* Clean up hash entries for incremental view maintenance */ + AtAbort_IVM(); + /* Reset WAL record construction state */ XLogResetInsertion(); diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 4b6f11597d1..6ab984a08cf 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1306,6 +1306,7 @@ InsertPgClassTuple(Relation pg_class_desc, values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite); values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid); values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid); + values[Anum_pg_class_relisivm - 1] = BoolGetDatum(rd_rel->relisivm); if (relacl != (Datum) 0) values[Anum_pg_class_relacl - 1] = relacl; else diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 715a3ddf4b7..cdb71d17c8a 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1002,6 +1002,7 @@ index_create(Relation heapRelation, indexRelation->rd_rel->relowner = heapRelation->rd_rel->relowner; indexRelation->rd_rel->relam = accessMethodObjectId; indexRelation->rd_rel->relispartition = OidIsValid(parentIndexRelid); + indexRelation->rd_rel->relisivm = false; /* * store index's pg_class entry diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index c824f7da2a5..e0b54cac8a1 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -32,15 +32,26 @@ #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" +#include "catalog/index.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_inherits.h" +#include "catalog/pg_trigger.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/defrem.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" +#include "commands/trigger.h" #include "commands/view.h" #include "miscadmin.h" +#include "optimizer/optimizer.h" +#include "optimizer/prep.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "parser/parser.h" +#include "parser/parsetree.h" #include "parser/parse_clause.h" #include "postmaster/autostats.h" #include "rewrite/rewriteHandler.h" @@ -86,6 +97,12 @@ static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self); static void intorel_shutdown(DestReceiver *self); static void intorel_destroy(DestReceiver *self); +static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, + Relids *relids, bool ex_lock); +static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock); +static void check_ivm_restriction(Node *node); +static bool check_ivm_restriction_walker(Node *node, void *context); +static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList); /* * create_ctas_internal @@ -418,6 +435,18 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, if (query_info_collect_hook) (*query_info_collect_hook)(METRICS_QUERY_SUBMIT, queryDesc); + if (is_matview && into->ivm) + { + /* check if the query is supported in IMMV definition */ + if (contain_mutable_functions((Node *) query)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("mutable function is not supported on incrementally maintainable materialized view"), + errhint("functions must be marked IMMUTABLE"))); + + check_ivm_restriction((Node *) query); + } + if (into->skipData) { /* @@ -483,6 +512,27 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, /* Restore userid and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); + + if (into->ivm) + { + Oid matviewOid = address.objectId; + Relation matviewRel = table_open(matviewOid, NoLock); + + /* + * Mark relisivm field, if it's a matview and into->ivm is true. + */ + SetMatViewIVMState(matviewRel, true); + + if (!into->skipData) + { + /* Create an index on incremental maintainable materialized view, if possible */ + //CreateIndexOnIMMV((Query *) into->viewQuery, matviewRel); + + /* Create triggers on incremental maintainable materialized view */ + CreateIvmTriggersOnBaseTables((Query *) into->viewQuery, matviewOid); + } + table_close(matviewRel, NoLock); + } } return address; @@ -809,3 +859,642 @@ GetIntoRelOid(QueryDesc *queryDesc) else return InvalidOid; } + +/* + * CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables + */ +void +CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid) +{ + Relids relids = NULL; + bool ex_lock = false; + RangeTblEntry *rte; + + /* Immediately return if we don't have any base tables. */ + if (list_length(qry->rtable) < 1) + return; + + /* + * If the view has more than one base tables, we need an exclusive lock + * on the view so that the view would be maintained serially to avoid + * the inconsistency that occurs when two base tables are modified in + * concurrent transactions. However, if the view has only one table, + * we can use a weaker lock. + * + * The type of lock should be determined here, because if we check the + * view definition at maintenance time, we need to acquire a weaker lock, + * and upgrading the lock level after this increases probability of + * deadlock. + */ + + rte = list_nth(qry->rtable, 0); + if (list_length(qry->rtable) > 1 || rte->rtekind != RTE_RELATION) + ex_lock = true; + + CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock); + + bms_free(relids); +} + +static void +CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, + Relids *relids, bool ex_lock) +{ + if (node == NULL) + return; + + /* This can recurse, so check for excessive recursion */ + check_stack_depth(); + + switch (nodeTag(node)) + { + case T_Query: + { + Query *query = (Query *) node; + + CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)query->jointree, matviewOid, relids, ex_lock); + } + break; + + case T_RangeTblRef: + { + int rti = ((RangeTblRef *) node)->rtindex; + RangeTblEntry *rte = rt_fetch(rti, qry->rtable); + + if (rte->rtekind == RTE_RELATION && !bms_is_member(rte->relid, *relids)) + { + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true); + + *relids = bms_add_member(*relids, rte->relid); + } + } + break; + + case T_FromExpr: + { + FromExpr *f = (FromExpr *) node; + ListCell *l; + + foreach(l, f->fromlist) + CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), matviewOid, relids, ex_lock); + } + break; + + case T_JoinExpr: + { + JoinExpr *j = (JoinExpr *) node; + + CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, matviewOid, relids, ex_lock); + } + break; + + default: + elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); + } +} + +/* + * CreateIvmTrigger -- create IVM trigger on a base table + */ +static void +CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock) +{ + ObjectAddress refaddr; + ObjectAddress address; + CreateTrigStmt *ivm_trigger; + List *transitionRels = NIL; + + Assert(timing == TRIGGER_TYPE_BEFORE || timing == TRIGGER_TYPE_AFTER); + + refaddr.classId = RelationRelationId; + refaddr.objectId = viewOid; + refaddr.objectSubId = 0; + + ivm_trigger = makeNode(CreateTrigStmt); + ivm_trigger->relation = makeRangeVar(get_namespace_name(get_rel_namespace(relOid)), get_rel_name(relOid), -1); + // FIXME(yang): use statment-level trigger + ivm_trigger->row = false; + + ivm_trigger->timing = timing; + ivm_trigger->events = type; + + switch (type) + { + case TRIGGER_TYPE_INSERT: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_ins_before" : "IVM_trigger_ins_after"); + break; + case TRIGGER_TYPE_DELETE: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_del_before" : "IVM_trigger_del_after"); + break; + case TRIGGER_TYPE_UPDATE: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_upd_before" : "IVM_trigger_upd_after"); + break; + case TRIGGER_TYPE_TRUNCATE: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_truncate_before" : "IVM_trigger_truncate_after"); + break; + default: + elog(ERROR, "unsupported trigger type"); + } + + if (timing == TRIGGER_TYPE_AFTER) + { + if (type == TRIGGER_TYPE_INSERT || type == TRIGGER_TYPE_UPDATE) + { + TriggerTransition *n = makeNode(TriggerTransition); + n->name = "__ivm_newtable"; + n->isNew = true; + n->isTable = true; + + transitionRels = lappend(transitionRels, n); + } + if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE) + { + TriggerTransition *n = makeNode(TriggerTransition); + n->name = "__ivm_oldtable"; + n->isNew = false; + n->isTable = true; + + transitionRels = lappend(transitionRels, n); + } + } + + /* + * XXX: When using DELETE or UPDATE, we must use exclusive lock for now + * because apply_old_delta(_with_count) uses ctid to identify the tuple + * to be deleted/deleted, but doesn't work in concurrent situations. + * + * If the view doesn't have aggregate, distinct, or tuple duplicate, + * then it would work even in concurrent situations. However, we don't have + * any way to guarantee the view has a unique key before opening the IMMV + * at the maintenance time because users may drop the unique index. + */ + + if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE) + ex_lock = true; + + ivm_trigger->funcname = + (timing == TRIGGER_TYPE_BEFORE ? SystemFuncName("ivm_immediate_before") : SystemFuncName("ivm_immediate_maintenance")); + + ivm_trigger->columns = NIL; + ivm_trigger->transitionRels = transitionRels; + ivm_trigger->whenClause = NULL; + ivm_trigger->isconstraint = false; + ivm_trigger->deferrable = false; + ivm_trigger->initdeferred = false; + ivm_trigger->constrrel = NULL; + ivm_trigger->args = list_make2( + makeString(DatumGetPointer(DirectFunctionCall1(oidout, ObjectIdGetDatum(viewOid)))), + makeString(DatumGetPointer(DirectFunctionCall1(boolout, BoolGetDatum(ex_lock)))) + ); + + address = CreateTrigger(ivm_trigger, NULL, relOid, InvalidOid, InvalidOid, + InvalidOid, InvalidOid, InvalidOid, NULL, false, false); + + recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO); + + if (Gp_role == GP_ROLE_DISPATCH && ENABLE_DISPATCH()) + { + CdbDispatchUtilityStatement((Node *) ivm_trigger, + DF_CANCEL_ON_ERROR| + DF_WITH_SNAPSHOT| + DF_NEED_TWO_PHASE, + GetAssignedOidsForDispatch(), + NULL); + } + /* Make changes-so-far visible */ + CommandCounterIncrement(); +} + +/* + * check_ivm_restriction --- look for specify nodes in the query tree + */ +static void +check_ivm_restriction(Node *node) +{ + check_ivm_restriction_walker(node, NULL); +} + +static bool +check_ivm_restriction_walker(Node *node, void *context) +{ + if (node == NULL) + return false; + + /* + * We currently don't support Sub-Query. + */ + if (IsA(node, SubPlan) || IsA(node, SubLink)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("subquery is not supported on incrementally maintainable materialized view"))); + + /* This can recurse, so check for excessive recursion */ + check_stack_depth(); + + switch (nodeTag(node)) + { + case T_Query: + { + Query *qry = (Query *)node; + ListCell *lc; + List *vars; + + /* if contained CTE, return error */ + if (qry->cteList != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CTE is not supported on incrementally maintainable materialized view"))); + if (qry->havingQual != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg(" HAVING clause is not supported on incrementally maintainable materialized view"))); + if (qry->sortClause != NIL) /* There is a possibility that we don't need to return an error */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("ORDER BY clause is not supported on incrementally maintainable materialized view"))); + if (qry->limitOffset != NULL || qry->limitCount != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("LIMIT/OFFSET clause is not supported on incrementally maintainable materialized view"))); + if (qry->distinctClause) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DISTINCT is not supported on incrementally maintainable materialized view"))); + if (qry->hasDistinctOn) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DISTINCT ON is not supported on incrementally maintainable materialized view"))); + if (qry->hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("window functions are not supported on incrementally maintainable materialized view"))); + if (qry->groupingSets != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("GROUPING SETS, ROLLUP, or CUBE clauses is not supported on incrementally maintainable materialized view"))); + if (qry->setOperations != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("UNION/INTERSECT/EXCEPT statements are not supported on incrementally maintainable materialized view"))); + if (list_length(qry->targetList) == 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("empty target list is not supported on incrementally maintainable materialized view"))); + if (qry->rowMarks != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("FOR UPDATE/SHARE clause is not supported on incrementally maintainable materialized view"))); + + /* system column restrictions */ + vars = pull_vars_of_level((Node *) qry, 0); + foreach(lc, vars) + { + if (IsA(lfirst(lc), Var)) + { + Var *var = (Var *) lfirst(lc); + /* if system column, return error */ + if (var->varattno < 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("system column is not supported on incrementally maintainable materialized view"))); + } + } + + /* restrictions for rtable */ + foreach(lc, qry->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); + + if (rte->subquery) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("subquery is not supported on incrementally maintainable materialized view"))); + + if (rte->tablesample != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("TABLESAMPLE clause is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("partitioned table is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_RELATION && has_superclass(rte->relid)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("partitions is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_RELATION && find_inheritance_children(rte->relid, NoLock) != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("inheritance parent is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("foreign table is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_VIEW || + rte->relkind == RELKIND_MATVIEW) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("VIEW or MATERIALIZED VIEW is not supported on incrementally maintainable materialized view"))); + + if (rte->rtekind == RTE_VALUES) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("VALUES is not supported on incrementally maintainable materialized view"))); + + } + + query_tree_walker(qry, check_ivm_restriction_walker, NULL, QTW_IGNORE_RANGE_TABLE); + + break; + } + case T_TargetEntry: + { + TargetEntry *tle = (TargetEntry *)node; + if (isIvmName(tle->resname)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("column name %s is not supported on incrementally maintainable materialized view", tle->resname))); + + expression_tree_walker(node, check_ivm_restriction_walker, NULL); + break; + } + case T_JoinExpr: + { + JoinExpr *joinexpr = (JoinExpr *)node; + + if (joinexpr->jointype > JOIN_INNER) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view"))); + + expression_tree_walker(node, check_ivm_restriction_walker, NULL); + } + break; + case T_Aggref: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function is not supported on incrementally maintainable materialized view"))); + break; + default: + expression_tree_walker(node, check_ivm_restriction_walker, (void *) context); + break; + } + return false; +} + +/* + * CreateIndexOnIMMV + * + * Create a unique index on incremental maintainable materialized view. + * If the view definition query has a GROUP BY clause, the index is created + * on the columns of GROUP BY expressions. Otherwise, if the view contains + * all primary key attritubes of its base tables in the target list, the index + * is created on these attritubes. In other cases, no index is created. + */ +void +CreateIndexOnIMMV(Query *query, Relation matviewRel) +{ + ListCell *lc; + IndexStmt *index; + ObjectAddress address; + List *constraintList = NIL; + char idxname[NAMEDATALEN]; + List *indexoidlist = RelationGetIndexList(matviewRel); + ListCell *indexoidscan; + Bitmapset *key_attnos; + + snprintf(idxname, sizeof(idxname), "%s_index", RelationGetRelationName(matviewRel)); + + index = makeNode(IndexStmt); + + index->unique = true; + index->primary = false; + index->isconstraint = false; + index->deferrable = false; + index->initdeferred = false; + index->idxname = idxname; + index->relation = + makeRangeVar(get_namespace_name(RelationGetNamespace(matviewRel)), + pstrdup(RelationGetRelationName(matviewRel)), + -1); + index->accessMethod = DEFAULT_INDEX_TYPE; + index->options = NIL; + index->tableSpace = get_tablespace_name(matviewRel->rd_rel->reltablespace); + index->whereClause = NULL; + index->indexParams = NIL; + index->indexIncludingParams = NIL; + index->excludeOpNames = NIL; + index->idxcomment = NULL; + index->indexOid = InvalidOid; + index->oldNode = InvalidOid; + index->oldCreateSubid = InvalidSubTransactionId; + index->oldFirstRelfilenodeSubid = InvalidSubTransactionId; + index->transformed = true; + index->concurrent = false; + index->if_not_exists = false; + + /* create index on the base tables' primary key columns */ + key_attnos = get_primary_key_attnos_from_query(query, &constraintList); + if (key_attnos) + { + foreach(lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1); + + if (bms_is_member(tle->resno - FirstLowInvalidHeapAttributeNumber, key_attnos)) + { + IndexElem *iparam; + + iparam = makeNode(IndexElem); + iparam->name = pstrdup(NameStr(attr->attname)); + iparam->expr = NULL; + iparam->indexcolname = NULL; + iparam->collation = NIL; + iparam->opclass = NIL; + iparam->opclassopts = NIL; + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + index->indexParams = lappend(index->indexParams, iparam); + } + } + } + else + { + /* create no index, just notice that an appropriate index is necessary for efficient IVM */ + ereport(NOTICE, + (errmsg("could not create an index on materialized view \"%s\" automatically", + RelationGetRelationName(matviewRel)), + errdetail("This target list does not have all the primary key columns. "), + errhint("Create an index on the materialized view for efficient incremental maintenance."))); + return; + } + + /* If we have a compatible index, we don't need to create another. */ + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + bool hasCompatibleIndex = false; + + indexRel = index_open(indexoid, AccessShareLock); + + if (CheckIndexCompatible(indexRel->rd_id, + index->accessMethod, + index->indexParams, + index->excludeOpNames)) + hasCompatibleIndex = true; + + index_close(indexRel, AccessShareLock); + + if (hasCompatibleIndex) + return; + } + + address = DefineIndex(RelationGetRelid(matviewRel), + index, + InvalidOid, + InvalidOid, + InvalidOid, + false, true, false, false, true); + + ereport(NOTICE, + (errmsg("created index \"%s\" on materialized view \"%s\"", + idxname, RelationGetRelationName(matviewRel)))); + + /* + * Make dependencies so that the index is dropped if any base tables's + * primary key is dropped. + */ + foreach(lc, constraintList) + { + Oid constraintOid = lfirst_oid(lc); + ObjectAddress refaddr; + + refaddr.classId = ConstraintRelationId; + refaddr.objectId = constraintOid; + refaddr.objectSubId = 0; + + recordDependencyOn(&address, &refaddr, DEPENDENCY_NORMAL); + } +} + + +/* + * get_primary_key_attnos_from_query + * + * Identify the columns in base tables' primary keys in the target list. + * + * Returns a Bitmapset of the column attnos of the primary key's columns of + * tables that used in the query. The attnos are offset by + * FirstLowInvalidHeapAttributeNumber as same as get_primary_key_attnos. + * + * If any table has no primary key or any primary key's columns is not in + * the target list, return NULL. We also return NULL if any pkey constraint + * is deferrable. + * + * constraintList is set to a list of the OIDs of the pkey constraints. + */ +static Bitmapset * +get_primary_key_attnos_from_query(Query *query, List **constraintList) +{ + List *key_attnos_list = NIL; + ListCell *lc; + int i; + Bitmapset *keys = NULL; + Relids rels_in_from; + + /* + * Collect primary key attributes from all tables used in query. The key attributes + * sets for each table are stored in key_attnos_list in order by RTE index. + */ + foreach(lc, query->rtable) + { + RangeTblEntry *r = (RangeTblEntry*) lfirst(lc); + Bitmapset *key_attnos; + bool has_pkey = true; + + /* for tables, call get_primary_key_attnos */ + if (r->rtekind == RTE_RELATION) + { + Oid constraintOid; + key_attnos = get_primary_key_attnos(r->relid, false, &constraintOid); + *constraintList = lappend_oid(*constraintList, constraintOid); + has_pkey = (key_attnos != NULL); + } + /* for other RTEs, store NULL into key_attnos_list */ + else + key_attnos = NULL; + + /* + * If any table or subquery has no primary key or its pkey constraint is deferrable, + * we cannot get key attributes for this query, so return NULL. + */ + if (!has_pkey) + return NULL; + + key_attnos_list = lappend(key_attnos_list, key_attnos); + } + + /* Collect key attributes appearing in the target list */ + i = 1; + foreach(lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) flatten_join_alias_vars(query, lfirst(lc)); + + if (IsA(tle->expr, Var)) + { + Var *var = (Var*) tle->expr; + Bitmapset *key_attnos = list_nth(key_attnos_list, var->varno - 1); + + /* check if this attribute is from a base table's primary key */ + if (bms_is_member(var->varattno - FirstLowInvalidHeapAttributeNumber, key_attnos)) + { + /* + * Remove found key attributes from key_attnos_list, and add this + * to the result list. + */ + key_attnos = bms_del_member(key_attnos, var->varattno - FirstLowInvalidHeapAttributeNumber); + if (bms_is_empty(key_attnos)) + { + key_attnos_list = list_delete_nth_cell(key_attnos_list, var->varno - 1); + key_attnos_list = list_insert_nth(key_attnos_list, var->varno - 1, NULL); + } + keys = bms_add_member(keys, i - FirstLowInvalidHeapAttributeNumber); + } + } + i++; + } + + /* Collect RTE indexes of relations appearing in the FROM clause */ + rels_in_from = get_relids_in_jointree((Node *) query->jointree, false); + + /* + * Check if all key attributes of relations in FROM are appearing in the target + * list. If an attribute remains in key_attnos_list in spite of the table is used + * in FROM clause, the target is missing this key attribute, so we return NULL. + */ + i = 1; + foreach(lc, key_attnos_list) + { + Bitmapset *bms = (Bitmapset *)lfirst(lc); + if (!bms_is_empty(bms) && bms_is_member(i, rels_in_from)) + return NULL; + i++; + } + + return keys; +} diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 61a580873b1..1b96bff58a8 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -26,21 +26,30 @@ #include "catalog/namespace.h" #include "catalog/oid_dispatch.h" #include "catalog/pg_am.h" +#include "catalog/pg_depend.h" +#include "catalog/pg_trigger.h" #include "catalog/pg_opclass.h" #include "catalog/pg_operator.h" #include "cdb/cdbaocsam.h" #include "cdb/cdbappendonlyam.h" #include "cdb/cdbvars.h" #include "commands/cluster.h" +#include "commands/createas.h" #include "commands/matview.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "executor/executor.h" #include "executor/spi.h" +#include "executor/tstoreReceiver.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "parser/analyze.h" +#include "parser/parse_clause.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "pgstat.h" #include "rewrite/rewriteHandler.h" +#include "rewrite/rowsecurity.h" #include "storage/lmgr.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" @@ -49,6 +58,7 @@ #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/typcache.h" typedef struct @@ -67,6 +77,52 @@ typedef struct uint64 processed; /* GPDB: number of tuples inserted */ } DR_transientrel; +#define MV_INIT_QUERYHASHSIZE 16 + +/* + * MV_TriggerHashEntry + * + * Hash entry for base tables on which IVM trigger is invoked + */ +typedef struct MV_TriggerHashEntry +{ + Oid matview_id; /* OID of the materialized view */ + int before_trig_count; /* count of before triggers invoked */ + int after_trig_count; /* count of after triggers invoked */ + + Snapshot snapshot; /* Snapshot just before table change */ + + List *tables; /* List of MV_TriggerTable */ + bool has_old; /* tuples are deleted from any table? */ + bool has_new; /* tuples are inserted into any table? */ +} MV_TriggerHashEntry; + +/* + * MV_TriggerTable + * + * IVM related data for tables on which the trigger is invoked. + */ +typedef struct MV_TriggerTable +{ + Oid table_id; /* OID of the modified table */ + List *old_tuplestores; /* tuplestores for deleted tuples */ + List *new_tuplestores; /* tuplestores for inserted tuples */ + + List *rte_indexes; /* List of RTE index of the modified table */ + RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */ + + Relation rel; /* relation of the modified table */ + TupleTableSlot *slot; /* for checking visibility in the pre-state table */ +} MV_TriggerTable; + +static HTAB *mv_trigger_info = NULL; + +static bool in_delta_calculation = false; + +/* ENR name for materialized view delta */ +#define NEW_DELTA_ENRNAME "new_delta" +#define OLD_DELTA_ENRNAME "old_delta" + static int matview_maintenance_depth = 0; static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO); @@ -75,6 +131,8 @@ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); static void transientrel_destroy(DestReceiver *self); static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, + QueryEnvironment *queryEnv, + TupleDesc *resultTupleDesc, const char *queryString, RefreshClause *refreshClause); static char *make_temptable_name_n(char *tempname, int n); static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, @@ -83,6 +141,37 @@ static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersist static bool is_usable_unique_index(Relation indexRel); static void OpenMatViewIncrementalMaintenance(void); static void CloseMatViewIncrementalMaintenance(void); +static Query *get_matview_query(Relation matviewRel); + +static Query *rewrite_query_for_preupdate_state(Query *query, List *tables, + ParseState *pstate, Oid matviewid); +static void register_delta_ENRs(ParseState *pstate, Query *query, List *tables); +static char *make_delta_enr_name(const char *prefix, Oid relid, int count); +static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, + QueryEnvironment *queryEnv, Oid matviewid); +static RangeTblEntry *replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new, + QueryEnvironment *queryEnv); +static Query *rewrite_query_for_counting(Query *query, ParseState *pstate); + +static void calc_delta(Oid matviewOid,MV_TriggerTable *table, int rte_index, Query *query, + DestReceiver *dest_old, DestReceiver *dest_new, + TupleDesc *tupdesc_old, TupleDesc *tupdesc_new, + QueryEnvironment *queryEnv); +static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index); + +static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores, + TupleDesc tupdesc_old, TupleDesc tupdesc_new, + Query *query); +static void apply_old_delta(const char *matviewname, const char *deltaname_old, + List *keys); +static void apply_new_delta(const char *matviewname, const char *deltaname_new, + StringInfo target_list); +static char *get_matching_condition_string(List *keys); +static void generate_equal(StringInfo querybuf, Oid opttype, + const char *leftop, const char *rightop); + +static void mv_InitHashTables(void); +static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort); /* * SetMatViewPopulatedState @@ -138,6 +227,46 @@ MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoA return refreshClause; } +/* + * SetMatViewIVMState + * Mark a materialized view as IVM, or not. + * + * NOTE: caller must be holding an appropriate lock on the relation. + */ +void +SetMatViewIVMState(Relation relation, bool newstate) +{ + Relation pgrel; + HeapTuple tuple; + + Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); + + /* + * Update relation's pg_class entry. Crucial side-effect: other backends + * (and this one too!) are sent SI message to make them rebuild relcache + * entries. + */ + pgrel = table_open(RelationRelationId, RowExclusiveLock); + tuple = SearchSysCacheCopy1(RELOID, + ObjectIdGetDatum(RelationGetRelid(relation))); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", + RelationGetRelid(relation)); + + ((Form_pg_class) GETSTRUCT(tuple))->relisivm = newstate; + + CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); + + heap_freetuple(tuple); + table_close(pgrel, RowExclusiveLock); + + /* + * Advance command counter to make the updated pg_class row locally + * visible. + */ + CommandCounterIncrement(); +} + /* * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command * @@ -180,6 +309,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, int save_nestlevel; ObjectAddress address; RefreshClause *refreshClause; + bool oldPopulated; /* MATERIALIZED_VIEW_FIXME: Refresh MatView is not MPP-fied. */ @@ -205,6 +335,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, SetUserIdAndSecContext(relowner, save_sec_context | SECURITY_RESTRICTED_OPERATION); save_nestlevel = NewGUCNestLevel(); + oldPopulated = RelationIsPopulated(matviewRel); /* Make sure it is a materialized view. */ if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW) @@ -329,6 +460,74 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, relpersistence = matviewRel->rd_rel->relpersistence; } + /* delete IMMV triggers. */ + if (RelationIsIVM(matviewRel) && stmt->skipData) + { + Relation tgRel; + Relation depRel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tup; + ObjectAddresses *immv_triggers; + + immv_triggers = new_object_addresses(); + + tgRel = table_open(TriggerRelationId, RowExclusiveLock); + depRel = table_open(DependRelationId, RowExclusiveLock); + + /* search triggers that depends on IMMV. */ + ScanKeyInit(&key, + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(matviewOid)); + scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, 1, &key); + while ((tup = systable_getnext(scan)) != NULL) + { + ObjectAddress obj; + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + + if (foundDep->classid == TriggerRelationId) + { + HeapTuple tgtup; + ScanKeyData tgkey[1]; + SysScanDesc tgscan; + Form_pg_trigger tgform; + + /* Find the trigger name. */ + ScanKeyInit(&tgkey[0], + Anum_pg_trigger_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(foundDep->objid)); + + tgscan = systable_beginscan(tgRel, TriggerOidIndexId, true, + NULL, 1, tgkey); + tgtup = systable_getnext(tgscan); + if (!HeapTupleIsValid(tgtup)) + elog(ERROR, "could not find tuple for immv trigger %u", foundDep->objid); + + tgform = (Form_pg_trigger) GETSTRUCT(tgtup); + + /* If trigger is created by IMMV, delete it. */ + if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0) + { + obj.classId = foundDep->classid; + obj.objectId = foundDep->objid; + obj.objectSubId = foundDep->refobjsubid; + add_exact_object_address(&obj, immv_triggers); + } + systable_endscan(tgscan); + } + } + systable_endscan(scan); + + performMultipleDeletions(immv_triggers, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); + + table_close(depRel, RowExclusiveLock); + table_close(tgRel, RowExclusiveLock); + free_object_addresses(immv_triggers); + } + /* * Create the transient table that will receive the regenerated data. Lock * it against access by any other process until commit (by which time it @@ -355,7 +554,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, * In GPDB, we call refresh_matview_datafill() even when WITH NO DATA was * specified, because it will dispatch the operation to the segments. */ - processed = refresh_matview_datafill(dest, dataQuery, queryString, refreshClause); + processed = refresh_matview_datafill(dest, dataQuery, NULL, NULL, queryString, refreshClause); /* Make the matview match the newly generated data. */ if (concurrent) @@ -398,6 +597,12 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, // pgstat_count_heap_insert(matviewRel, processed); } + if (!stmt->skipData && RelationIsIVM(matviewRel) && !oldPopulated) + { + //CreateIndexOnIMMV(dataQuery, matviewRel); + CreateIvmTriggersOnBaseTables(dataQuery, matviewOid); + } + table_close(matviewRel, NoLock); /* Roll back any GUC changes */ @@ -432,6 +637,8 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, + QueryEnvironment *queryEnv, + TupleDesc *resultTupleDesc, const char *queryString, RefreshClause *refreshClause) { List *rewritten; @@ -493,7 +700,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, /* Create a QueryDesc, redirecting output to our tuple receiver */ queryDesc = CreateQueryDesc(plan, queryString, GetActiveSnapshot(), InvalidSnapshot, - dest, NULL, NULL, 0); + dest, NULL, queryEnv ? queryEnv: NULL, 0); RestoreOidAssignments(saved_dispatch_oids); @@ -513,6 +720,9 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, */ processed = queryDesc->estate->es_processed; + if (resultTupleDesc) + *resultTupleDesc = CreateTupleDescCopy(queryDesc->tupDesc); + /* and clean up */ ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); @@ -1133,3 +1343,1246 @@ CloseMatViewIncrementalMaintenance(void) matview_maintenance_depth--; Assert(matview_maintenance_depth >= 0); } + +/* + * get_matview_query - get the Query from a matview's _RETURN rule. + */ +static Query * +get_matview_query(Relation matviewRel) +{ + RewriteRule *rule; + List * actions; + + /* + * Check that everything is correct for a refresh. Problems at this point + * are internal errors, so elog is sufficient. + */ + if (matviewRel->rd_rel->relhasrules == false || + matviewRel->rd_rules->numLocks < 1) + elog(ERROR, + "materialized view \"%s\" is missing rewrite information", + RelationGetRelationName(matviewRel)); + + if (matviewRel->rd_rules->numLocks > 1) + elog(ERROR, + "materialized view \"%s\" has too many rules", + RelationGetRelationName(matviewRel)); + + rule = matviewRel->rd_rules->rules[0]; + if (rule->event != CMD_SELECT || !(rule->isInstead)) + elog(ERROR, + "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule", + RelationGetRelationName(matviewRel)); + + actions = rule->actions; + if (list_length(actions) != 1) + elog(ERROR, + "the rule for materialized view \"%s\" is not a single action", + RelationGetRelationName(matviewRel)); + + /* + * The stored query was rewritten at the time of the MV definition, but + * has not been scribbled on by the planner. + */ + return linitial_node(Query, actions); +} + + +/* ---------------------------------------------------- + * Incremental View Maintenance routines + * --------------------------------------------------- + */ + +/* + * ivm_immediate_before + * + * IVM trigger function invoked before base table is modified. If this is + * invoked firstly in the same statement, we save the transaction id and the + * command id at that time. + */ +Datum +ivm_immediate_before(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; + char *ex_lock_text = trigdata->tg_trigger->tgargs[1]; + Oid matviewOid; + MV_TriggerHashEntry *entry; + bool found; + bool ex_lock; + + matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text))); + ex_lock = DatumGetBool(DirectFunctionCall1(boolin, CStringGetDatum(ex_lock_text))); + + /* If the view has more than one tables, we have to use an exclusive lock. */ + if (ex_lock) + { + /* + * Wait for concurrent transactions which update this materialized view at + * READ COMMITED. This is needed to see changes committed in other + * transactions. No wait and raise an error at REPEATABLE READ or + * SERIALIZABLE to prevent update anomalies of matviews. + * XXX: dead-lock is possible here. + */ + if (!IsolationUsesXactSnapshot()) + LockRelationOid(matviewOid, ExclusiveLock); + else if (!ConditionalLockRelationOid(matviewOid, ExclusiveLock)) + { + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(matviewOid); + + if (!relname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on materialized view during incremental maintenance"))); + + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on materialized view \"%s\" during incremental maintenance", + relname))); + } + } + else + LockRelationOid(matviewOid, RowExclusiveLock); + + /* + * On the first call initialize the hashtable + */ + if (!mv_trigger_info) + mv_InitHashTables(); + + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_ENTER, &found); + + /* On the first BEFORE to update the view, initialize trigger data */ + if (!found) + { + /* + * Get a snapshot just before the table was modified for checking + * tuple visibility in the pre-update state of the table. + */ + Snapshot snapshot = GetActiveSnapshot(); + + entry->matview_id = matviewOid; + entry->before_trig_count = 0; + entry->after_trig_count = 0; + entry->snapshot = RegisterSnapshot(snapshot); + entry->tables = NIL; + entry->has_old = false; + entry->has_new = false; + } + + entry->before_trig_count++; + elog(INFO, "trigger ivm_immediate_before."); + + return PointerGetDatum(NULL); +} + +/* + * ivm_immediate_maintenance + * + * IVM trigger function invoked after base table is modified. + * For each table, tuplestores of transition tables are collected. + * and after the last modification + */ +Datum +ivm_immediate_maintenance(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Relation rel; + Oid relid; + Oid matviewOid; + Query *query; + Query *rewritten = NULL; + char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; + Relation matviewRel; + int old_depth = matview_maintenance_depth; + + Oid relowner; + Tuplestorestate *old_tuplestore = NULL; + Tuplestorestate *new_tuplestore = NULL; + DestReceiver *dest_new = NULL, *dest_old = NULL; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + MV_TriggerHashEntry *entry; + MV_TriggerTable *table; + bool found; + + ParseState *pstate; + QueryEnvironment *queryEnv = create_queryEnv(); + MemoryContext oldcxt; + ListCell *lc; + int i; + + + /* Create a ParseState for rewriting the view definition query */ + pstate = make_parsestate(NULL); + pstate->p_queryEnv = queryEnv; + pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; + + rel = trigdata->tg_relation; + relid = rel->rd_id; + + matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text))); + + /* + * On the first call initialize the hashtable + */ + if (!mv_trigger_info) + mv_InitHashTables(); + + /* get the entry for this materialized view */ + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_FIND, &found); + Assert (found && entry != NULL); + entry->after_trig_count++; + + /* search the entry for the modified table and create new entry if not found */ + found = false; + foreach(lc, entry->tables) + { + table = (MV_TriggerTable *) lfirst(lc); + if (table->table_id == relid) + { + found = true; + break; + } + } + if (!found) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + table = (MV_TriggerTable *) palloc0(sizeof(MV_TriggerTable)); + table->table_id = relid; + table->old_tuplestores = NIL; + table->new_tuplestores = NIL; + table->rte_indexes = NIL; + table->slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel)); + table->rel = table_open(RelationGetRelid(rel), NoLock); + entry->tables = lappend(entry->tables, table); + + MemoryContextSwitchTo(oldcxt); + } + + /* Save the transition tables and make a request to not free immediately */ + if (trigdata->tg_oldtable) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + table->old_tuplestores = lappend(table->old_tuplestores, trigdata->tg_oldtable); + entry->has_old = true; + MemoryContextSwitchTo(oldcxt); + } + if (trigdata->tg_newtable) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + table->new_tuplestores = lappend(table->new_tuplestores, trigdata->tg_newtable); + entry->has_new = true; + MemoryContextSwitchTo(oldcxt); + } + if (entry->has_new || entry->has_old) + { + CmdType cmd; + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + cmd = CMD_INSERT; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + cmd = CMD_DELETE; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + cmd = CMD_UPDATE; + else + elog(ERROR,"unsupported trigger type"); + + /* Prolong lifespan of transition tables to the end of the last AFTER trigger */ + SetTransitionTablePreserved(relid, cmd); + } + + + /* If this is not the last AFTER trigger call, immediately exit. */ + Assert (entry->before_trig_count >= entry->after_trig_count); + if (entry->before_trig_count != entry->after_trig_count) + return PointerGetDatum(NULL); + + /* + * If this is the last AFTER trigger call, continue and update the view. + */ + + /* + * Advance command counter to make the updated base table row locally + * visible. + */ + CommandCounterIncrement(); + + matviewRel = table_open(matviewOid, NoLock); + + /* Make sure it is a materialized view. */ + Assert(matviewRel->rd_rel->relkind == RELKIND_MATVIEW); + + /* + * Get and push the latast snapshot to see any changes which is committed + * during waiting in other transactions at READ COMMITTED level. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * Check for active uses of the relation in the current transaction, such + * as open scans. + * + * NB: We count on this to protect us against problems with refreshing the + * data using TABLE_INSERT_FROZEN. + */ + CheckTableNotInUse(matviewRel, "refresh a materialized view incrementally"); + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also arrange to make GUC variable changes local to this command. + * We will switch modes when we are about to execute user code. + */ + relowner = matviewRel->rd_rel->relowner; + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + /* get view query*/ + query = get_matview_query(matviewRel); + + /* + * When a base table is truncated, the view content will be empty if the + * view definition query does not contain an aggregate without a GROUP clause. + * Therefore, such views can be truncated. + */ + if (TRIGGER_FIRED_BY_TRUNCATE(trigdata->tg_event)) + { + ExecuteTruncateGuts(list_make1(matviewRel), list_make1_oid(matviewOid), + NIL, DROP_RESTRICT, false, false); + + /* Clean up hash entry and delete tuplestores */ + clean_up_IVM_hash_entry(entry, false); + + /* Pop the original snapshot. */ + PopActiveSnapshot(); + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + return PointerGetDatum(NULL); + } + + /* + * rewrite query for calculating deltas + */ + + rewritten = copyObject(query); + + /* Replace resnames in a target list with materialized view's attnames */ + i = 0; + foreach (lc, rewritten->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + char *resname = NameStr(attr->attname); + + tle->resname = pstrdup(resname); + i++; + } + + /* Set all tables in the query to pre-update state */ + rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables, + pstate, matviewOid); + /* Rewrite for counting duplicated tuples */ + rewritten = rewrite_query_for_counting(rewritten, pstate); + + /* Create tuplestores to store view deltas */ + if (entry->has_old) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + old_tuplestore = tuplestore_begin_heap(false, false, work_mem); + dest_old = CreateDestReceiver(DestTuplestore); + SetTuplestoreDestReceiverParams(dest_old, + old_tuplestore, + TopTransactionContext, + false, + NULL, + NULL); + + MemoryContextSwitchTo(oldcxt); + } + if (entry->has_new) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + new_tuplestore = tuplestore_begin_heap(false, false, work_mem); + dest_new = CreateDestReceiver(DestTuplestore); + SetTuplestoreDestReceiverParams(dest_new, + new_tuplestore, + TopTransactionContext, + false, + NULL, + NULL); + MemoryContextSwitchTo(oldcxt); + } + + // FIXME: This is a hack to avoid error in oid_dispatch.c + Gp_role = GP_ROLE_UTILITY; + /* for all modified tables */ + foreach(lc, entry->tables) + { + ListCell *lc2; + + table = (MV_TriggerTable *) lfirst(lc); + + /* loop for self-join */ + foreach(lc2, table->rte_indexes) + { + int rte_index = lfirst_int(lc2); + TupleDesc tupdesc_old; + TupleDesc tupdesc_new; + + /* calculate delta tables */ + calc_delta(matviewOid, table, rte_index, rewritten, dest_old, dest_new, + &tupdesc_old, &tupdesc_new, queryEnv); + + /* Set the table in the query to post-update state */ + rewritten = rewrite_query_for_postupdate_state(rewritten, table, rte_index); + + PG_TRY(); + { + /* apply the delta tables to the materialized view */ + apply_delta(matviewOid, old_tuplestore, new_tuplestore, + tupdesc_old, tupdesc_new, query); + } + PG_CATCH(); + { + matview_maintenance_depth = old_depth; + PG_RE_THROW(); + } + PG_END_TRY(); + + /* clear view delta tuplestores */ + if (old_tuplestore) + tuplestore_clear(old_tuplestore); + if (new_tuplestore) + tuplestore_clear(new_tuplestore); + } + } + Gp_role = GP_ROLE_EXECUTE; + + /* Clean up hash entry and delete tuplestores */ + clean_up_IVM_hash_entry(entry, false); + if (old_tuplestore) + { + dest_old->rDestroy(dest_old); + tuplestore_end(old_tuplestore); + } + if (new_tuplestore) + { + dest_new->rDestroy(dest_new); + tuplestore_end(new_tuplestore); + } + + /* Pop the original snapshot. */ + PopActiveSnapshot(); + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + elog(INFO, "trigger ivm_immediate_maintenance."); + return PointerGetDatum(NULL); +} + +/* + * rewrite_query_for_preupdate_state + * + * Rewrite the query so that base tables' RTEs will represent "pre-update" + * state of tables. This is necessary to calculate view delta after multiple + * tables are modified. + */ +static Query* +rewrite_query_for_preupdate_state(Query *query, List *tables, + ParseState *pstate, Oid matviewid) +{ + ListCell *lc; + int num_rte = list_length(query->rtable); + int i; + + + /* register delta ENRs */ + register_delta_ENRs(pstate, query, tables); + + /* XXX: Is necessary? Is this right timing? */ + AcquireRewriteLocks(query, true, false); + + i = 1; + foreach(lc, query->rtable) + { + RangeTblEntry *r = (RangeTblEntry*) lfirst(lc); + + ListCell *lc2; + foreach(lc2, tables) + { + MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc2); + /* + * if the modified table is found then replace the original RTE with + * "pre-state" RTE and append its index to the list. + */ + if (r->relid == table->table_id) + { + List *securityQuals; + List *withCheckOptions; + bool hasRowSecurity; + bool hasSubLinks; + + RangeTblEntry *rte_pre = get_prestate_rte(r, table, pstate->p_queryEnv, matviewid); + + /* + * Set a row security poslicies of the modified table to the subquery RTE which + * represents the pre-update state of the table. + */ + get_row_security_policies(query, table->original_rte, i, + &securityQuals, &withCheckOptions, + &hasRowSecurity, &hasSubLinks); + + if (hasRowSecurity) + { + query->hasRowSecurity = true; + rte_pre->security_barrier = true; + } + if (hasSubLinks) + query->hasSubLinks = true; + + rte_pre->securityQuals = securityQuals; + lfirst(lc) = rte_pre; + + table->rte_indexes = lappend_int(table->rte_indexes, i); + break; + } + } + + /* finish the loop if we processed all RTE included in the original query */ + if (i++ >= num_rte) + break; + } + + return query; +} + +/* + * register_delta_ENRs + * + * For all modified tables, make ENRs for their transition tables + * and register them to the queryEnv. ENR's RTEs are also appended + * into the list in query tree. + */ +static void +register_delta_ENRs(ParseState *pstate, Query *query, List *tables) +{ + QueryEnvironment *queryEnv = pstate->p_queryEnv; + ListCell *lc; + RangeTblEntry *rte; + + foreach(lc, tables) + { + MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc); + ListCell *lc2; + int count; + + count = 0; + foreach(lc2, table->old_tuplestores) + { + Tuplestorestate *oldtable = (Tuplestorestate *) lfirst(lc2); + EphemeralNamedRelation enr = + palloc(sizeof(EphemeralNamedRelationData)); + ParseNamespaceItem *nsitem; + + enr->md.name = make_delta_enr_name("old", table->table_id, count); + enr->md.reliddesc = table->table_id; + enr->md.tupdesc = NULL; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(oldtable); + enr->reldata = oldtable; + register_ENR(queryEnv, enr); + + nsitem = addRangeTableEntryForENR(pstate, makeRangeVar(NULL, enr->md.name, -1), true); + rte = nsitem->p_rte; + + query->rtable = lappend(query->rtable, rte); + + count++; + } + + count = 0; + foreach(lc2, table->new_tuplestores) + { + Tuplestorestate *newtable = (Tuplestorestate *) lfirst(lc2); + EphemeralNamedRelation enr = + palloc(sizeof(EphemeralNamedRelationData)); + ParseNamespaceItem *nsitem; + + enr->md.name = make_delta_enr_name("new", table->table_id, count); + enr->md.reliddesc = table->table_id; + enr->md.tupdesc = NULL; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(newtable); + enr->reldata = newtable; + register_ENR(queryEnv, enr); + + nsitem = addRangeTableEntryForENR(pstate, makeRangeVar(NULL, enr->md.name, -1), true); + rte = nsitem->p_rte; + + query->rtable = lappend(query->rtable, rte); + + count++; + } + } +} + +#define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X)) +#define PG_GETARG_ITEMPOINTER(n) DatumGetItemPointer(PG_GETARG_DATUM(n)) + +/* + * ivm_visible_in_prestate + * + * Check visibility of a tuple specified by the tableoid and item pointer + * using the snapshot taken just before the table was modified. + */ +Datum +ivm_visible_in_prestate(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + ItemPointer itemPtr = PG_GETARG_ITEMPOINTER(1); + Oid matviewOid = PG_GETARG_OID(2); + MV_TriggerHashEntry *entry; + MV_TriggerTable *table = NULL; + ListCell *lc; + bool found; + bool result; + + if (!in_delta_calculation) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("ivm_visible_in_prestate can be called only in delta calculation"))); + + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_FIND, &found); + Assert (found && entry != NULL); + + foreach(lc, entry->tables) + { + table = (MV_TriggerTable *) lfirst(lc); + if (table->table_id == tableoid) + break; + } + + Assert (table != NULL); + + result = table_tuple_fetch_row_version(table->rel, itemPtr, entry->snapshot, table->slot); + + PG_RETURN_BOOL(result); +} + +/* + * get_prestate_rte + * + * Rewrite RTE of the modified table to a subquery which represents + * "pre-state" table. The original RTE is saved in table->rte_original. + */ +static RangeTblEntry* +get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, + QueryEnvironment *queryEnv, Oid matviewid) +{ + StringInfoData str; + RawStmt *raw; + Query *subquery; + Relation rel; + ParseState *pstate; + char *relname; + int i; + + pstate = make_parsestate(NULL); + pstate->p_queryEnv = queryEnv; + pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; + + /* + * We can use NoLock here since AcquireRewriteLocks should + * have locked the relation already. + */ + rel = table_open(table->table_id, NoLock); + relname = quote_qualified_identifier( + get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel)); + table_close(rel, NoLock); + + /* + * Filtering inserted row using the snapshot taken before the table + * is modified. ctid is required for maintaining outer join views. + */ + initStringInfo(&str); + appendStringInfo(&str, + "SELECT t.* FROM %s t" + " WHERE pg_catalog.ivm_visible_in_prestate(t.tableoid, t.ctid ,%d::pg_catalog.oid)", + relname, matviewid); + + /* + * Append deleted rows contained in old transition tables. + */ + for (i = 0; i < list_length(table->old_tuplestores); i++) + { + appendStringInfo(&str, " UNION ALL "); + appendStringInfo(&str," SELECT * FROM %s", + make_delta_enr_name("old", table->table_id, i)); + } + + /* Get a subquery representing pre-state of the table */ + raw = (RawStmt*)linitial(raw_parser(str.data, RAW_PARSE_DEFAULT)); + subquery = transformStmt(pstate, raw->stmt); + + /* save the original RTE */ + table->original_rte = copyObject(rte); + + rte->rtekind = RTE_SUBQUERY; + rte->subquery = subquery; + rte->security_barrier = false; + + /* Clear fields that should not be set in a subquery RTE */ + rte->relid = InvalidOid; + rte->relkind = 0; + rte->rellockmode = 0; + rte->tablesample = NULL; + rte->inh = false; /* must not be set for a subquery */ + + return rte; +} + +/* + * make_delta_enr_name + * + * Make a name for ENR of a transition table from the base table's oid. + * prefix will be "new" or "old" depending on its transition table kind.. + */ +static char* +make_delta_enr_name(const char *prefix, Oid relid, int count) +{ + char buf[NAMEDATALEN]; + char *name; + + snprintf(buf, NAMEDATALEN, "__ivm_%s_%u_%u", prefix, relid, count); + name = pstrdup(buf); + + return name; +} + +/* + * replace_rte_with_delta + * + * Replace RTE of the modified table with a single table delta that combine its + * all transition tables. + */ +static RangeTblEntry* +replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new, + QueryEnvironment *queryEnv) +{ + Oid relid = table->table_id; + StringInfoData str; + ParseState *pstate; + RawStmt *raw; + Query *sub; + int num_tuplestores = list_length(is_new ? table->new_tuplestores : table->old_tuplestores); + int i; + + /* the previous RTE must be a subquery which represents "pre-state" table */ + Assert(rte->rtekind == RTE_SUBQUERY); + + /* Create a ParseState for rewriting the view definition query */ + pstate = make_parsestate(NULL); + pstate->p_queryEnv = queryEnv; + pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; + + initStringInfo(&str); + + for (i = 0; i < num_tuplestores; i++) + { + if (i > 0) + appendStringInfo(&str, " UNION ALL "); + + appendStringInfo(&str, + " SELECT * FROM %s", + make_delta_enr_name(is_new ? "new" : "old", relid, i)); + } + + raw = (RawStmt*)linitial(raw_parser(str.data, RAW_PARSE_DEFAULT)); + sub = transformStmt(pstate, raw->stmt); + + /* + * Update the subquery so that it represent the combined transition + * table. Note that we leave the security_barrier and securityQuals + * fields so that the subquery relation can be protected by the RLS + * policy as same as the modified table. + */ + rte->rtekind = RTE_SUBQUERY; + rte->subquery = sub; + + return rte; +} + +/* + * rewrite_query_for_counting + * + * Rewrite query for counting duplicated tuples. + */ +static Query * +rewrite_query_for_counting(Query *query, ParseState *pstate) +{ + TargetEntry *tle_count; + FuncCall *fn; + Node *node; + + /* Add count(*) for counting distinct tuples in views */ + fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1); + fn->agg_star = true; + if (!query->groupClause && !query->hasAggs) + query->groupClause = transformDistinctClause(NULL, &query->targetList, query->sortClause, false); + + node = ParseFuncOrColumn(pstate, fn->funcname, NIL, NULL, fn, false, -1); + + tle_count = makeTargetEntry((Expr *) node, + list_length(query->targetList) + 1, + pstrdup("__ivm_count__"), + false); + query->targetList = lappend(query->targetList, tle_count); + query->hasAggs = true; + + return query; +} + +/* + * calc_delta + * + * Calculate view deltas generated under the modification of a table specified + * by the RTE index. + */ +static void +calc_delta(Oid matviewOid, MV_TriggerTable *table, int rte_index, Query *query, + DestReceiver *dest_old, DestReceiver *dest_new, + TupleDesc *tupdesc_old, TupleDesc *tupdesc_new, + QueryEnvironment *queryEnv) +{ + ListCell *lc = list_nth_cell(query->rtable, rte_index - 1); + RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); + RefreshClause *refreshClause; + in_delta_calculation = true; + + RangeVar *relation = makeRangeVar(get_namespace_name(get_rel_namespace(matviewOid)), get_rel_name(matviewOid), -1); + + refreshClause = MakeRefreshClause(false, false, relation, + RelationIsAppendOptimized(table->rel)); + + /* Generate old delta */ + if (list_length(table->old_tuplestores) > 0) + { + /* Replace the modified table with the old delta table and calculate the old view delta. */ + replace_rte_with_delta(rte, table, false, queryEnv); + refresh_matview_datafill(dest_old, query, queryEnv, tupdesc_old, "", refreshClause); + } + + /* Generate new delta */ + if (list_length(table->new_tuplestores) > 0) + { + /* Replace the modified table with the new delta table and calculate the new view delta*/ + replace_rte_with_delta(rte, table, true, queryEnv); + refresh_matview_datafill(dest_new, query, queryEnv, tupdesc_new, "", refreshClause); + } + + in_delta_calculation = false; +} + +/* + * rewrite_query_for_postupdate_state + * + * Rewrite the query so that the specified base table's RTEs will represent + * "post-update" state of tables. This is called after the view delta + * calculation due to changes on this table finishes. + */ +static Query* +rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index) +{ + ListCell *lc = list_nth_cell(query->rtable, rte_index - 1); + + /* Retore the original RTE */ + lfirst(lc) = table->original_rte; + + return query; +} + +/* + * apply_delta + * + * Apply deltas to the materialized view. In outer join cases, this requires + * the view maintenance graph. + */ +static void +apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores, + TupleDesc tupdesc_old, TupleDesc tupdesc_new, + Query *query) +{ + StringInfoData querybuf; + StringInfoData target_list_buf; + Relation matviewRel; + char *matviewname; + ListCell *lc; + int i; + List *keys = NIL; + + + /* + * get names of the materialized view and delta tables + */ + + matviewRel = table_open(matviewOid, NoLock); + matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel)); + + /* + * Build parts of the maintenance queries + */ + + initStringInfo(&querybuf); + initStringInfo(&target_list_buf); + + /* build string of target list */ + for (i = 0; i < matviewRel->rd_att->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + char *resname = NameStr(attr->attname); + + if (i != 0) + appendStringInfo(&target_list_buf, ", "); + appendStringInfo(&target_list_buf, "%s", quote_qualified_identifier(NULL, resname)); + } + + i = 0; + foreach (lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + + i++; + + if (tle->resjunk) + continue; + + keys = lappend(keys, attr); + } + + /* Start maintaining the materialized view. */ + OpenMatViewIncrementalMaintenance(); + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* For tuple deletion */ + if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0) + { + EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData)); + int rc; + + /* convert tuplestores to ENR, and register for SPI */ + enr->md.name = pstrdup(OLD_DELTA_ENRNAME); + enr->md.reliddesc = InvalidOid; + enr->md.tupdesc = tupdesc_old; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(old_tuplestores); + enr->reldata = old_tuplestores; + + rc = SPI_register_relation(enr); + if (rc != SPI_OK_REL_REGISTER) + elog(ERROR, "SPI_register failed"); + + apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys); + + } + /* For tuple insertion */ + if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0) + { + EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData)); + int rc; + + /* convert tuplestores to ENR, and register for SPI */ + enr->md.name = pstrdup(NEW_DELTA_ENRNAME); + enr->md.reliddesc = InvalidOid; + enr->md.tupdesc = tupdesc_new;; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(new_tuplestores); + enr->reldata = new_tuplestores; + + rc = SPI_register_relation(enr); + if (rc != SPI_OK_REL_REGISTER) + elog(ERROR, "SPI_register failed"); + + /* apply new delta */ + apply_new_delta(matviewname, NEW_DELTA_ENRNAME, &target_list_buf); + } + + /* We're done maintaining the materialized view. */ + CloseMatViewIncrementalMaintenance(); + + table_close(matviewRel, NoLock); + + /* Close SPI context. */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + +/* + * apply_old_delta + * + * Execute a query for applying a delta table given by deltname_old + * which contains tuples to be deleted from to a materialized view given by + * matviewname. This is used when counting is not required. + */ +static void +apply_old_delta(const char *matviewname, const char *deltaname_old, + List *keys) +{ + StringInfoData querybuf; + StringInfoData keysbuf; + char *match_cond; + ListCell *lc; + + /* build WHERE condition for searching tuples to be deleted */ + match_cond = get_matching_condition_string(keys); + + /* build string of keys list */ + initStringInfo(&keysbuf); + foreach (lc, keys) + { + Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc); + char *resname = NameStr(attr->attname); + appendStringInfo(&keysbuf, "%s", quote_qualified_identifier("mv", resname)); + if (lnext(keys, lc)) + appendStringInfo(&keysbuf, ", "); + } + + /* Search for matching tuples from the view and update or delete if found. */ + initStringInfo(&querybuf); + appendStringInfo(&querybuf, + "DELETE FROM %s WHERE ctid IN (" + "SELECT tid FROM (SELECT pg_catalog.row_number() over (partition by %s) AS \"__ivm_row_number__\"," + "mv.ctid AS tid," + "diff.\"__ivm_count__\"" + "FROM %s AS mv, %s AS diff " + "WHERE %s) v " + "WHERE v.\"__ivm_row_number__\" OPERATOR(pg_catalog.<=) v.\"__ivm_count__\")", + matviewname, + keysbuf.data, + matviewname, deltaname_old, + match_cond); + + if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); +} + +/* + * apply_new_delta + * + * Execute a query for applying a delta table given by deltname_new + * which contains tuples to be inserted into a materialized view given by + * matviewname. This is used when counting is not required. + */ +static void +apply_new_delta(const char *matviewname, const char *deltaname_new, + StringInfo target_list) +{ + StringInfoData querybuf; + + /* Search for matching tuples from the view and update or delete if found. */ + initStringInfo(&querybuf); + appendStringInfo(&querybuf, + "INSERT INTO %s (%s) SELECT %s FROM (" + "SELECT diff.*, pg_catalog.generate_series(1, diff.\"__ivm_count__\")" + " AS __ivm_generate_series__ " + "FROM %s AS diff) AS v", + matviewname, target_list->data, target_list->data, + deltaname_new); + + if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); +} + +/* + * get_matching_condition_string + * + * Build a predicate string for looking for a tuple with given keys. + */ +static char * +get_matching_condition_string(List *keys) +{ + StringInfoData match_cond; + ListCell *lc; + + /* If there is no key columns, the condition is always true. */ + if (keys == NIL) + return "true"; + + initStringInfo(&match_cond); + foreach (lc, keys) + { + Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc); + char *resname = NameStr(attr->attname); + char *mv_resname = quote_qualified_identifier("mv", resname); + char *diff_resname = quote_qualified_identifier("diff", resname); + Oid typid = attr->atttypid; + + /* Considering NULL values, we can not use simple = operator. */ + appendStringInfo(&match_cond, "("); + generate_equal(&match_cond, typid, mv_resname, diff_resname); + appendStringInfo(&match_cond, " OR (%s IS NULL AND %s IS NULL))", + mv_resname, diff_resname); + + if (lnext(keys, lc)) + appendStringInfo(&match_cond, " AND "); + } + + return match_cond.data; +} + +/* + * generate_equals + * + * Generate an equality clause using given operands' default equality + * operator. + */ +static void +generate_equal(StringInfo querybuf, Oid opttype, + const char *leftop, const char *rightop) +{ + TypeCacheEntry *typentry; + + typentry = lookup_type_cache(opttype, TYPECACHE_EQ_OPR); + if (!OidIsValid(typentry->eq_opr)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be_qualified(opttype)))); + + generate_operator_clause(querybuf, + leftop, opttype, + typentry->eq_opr, + rightop, opttype); +} + +/* + * mv_InitHashTables + */ +static void +mv_InitHashTables(void) +{ + HASHCTL ctl; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(MV_TriggerHashEntry); + mv_trigger_info = hash_create("MV trigger info", + MV_INIT_QUERYHASHSIZE, + &ctl, HASH_ELEM | HASH_BLOBS); +} + +/* + * AtAbort_IVM + * + * Clean up hash entries for all materialized views. This is called at + * transaction abort. + */ +void +AtAbort_IVM() +{ + HASH_SEQ_STATUS seq; + MV_TriggerHashEntry *entry; + + if (mv_trigger_info) + { + hash_seq_init(&seq, mv_trigger_info); + while ((entry = hash_seq_search(&seq)) != NULL) + clean_up_IVM_hash_entry(entry, true); + } + in_delta_calculation = false; +} + +/* + * clean_up_IVM_hash_entry + * + * Clean up tuple stores and hash entries for a materialized view after its + * maintenance finished. + */ +static void +clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort) +{ + bool found; + ListCell *lc; + + foreach(lc, entry->tables) + { + MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc); + + list_free(table->old_tuplestores); + list_free(table->new_tuplestores); + if (!is_abort) + { + ExecDropSingleTupleTableSlot(table->slot); + table_close(table->rel, NoLock); + } + } + list_free(entry->tables); + + if (!is_abort) + UnregisterSnapshot(entry->snapshot); + + hash_search(mv_trigger_info, (void *) &entry->matview_id, HASH_REMOVE, &found); +} + +/* + * isIvmName + * + * Check if this is a IVM hidden column from the name. + */ +bool +isIvmName(const char *s) +{ + if (s) + return (strncmp(s, "__ivm_", 6) == 0); + return false; +} + + +Datum +ivm_rule_before(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + bool result = true; + elog(INFO, "trigger ivm_rule_before %d", tableoid); + PG_RETURN_BOOL(result); +} + +Datum +ivm_rule_after(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + bool result = true; + elog(INFO, "trigger ivm_rule_after %d", tableoid); + PG_RETURN_BOOL(result); +} diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 023624dbd22..9007d1544f4 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2272,12 +2272,12 @@ ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo) TriggerDesc *trigdesc; int i; TriggerData LocTriggerData = {0}; - - if (Gp_role == GP_ROLE_EXECUTE) - { - /* Don't fire statement-triggers in executor nodes. */ - return; - } + //FIXME: + // if (Gp_role == GP_ROLE_EXECUTE) + // { + // /* Don't fire statement-triggers in executor nodes. */ + // return; + // } trigdesc = relinfo->ri_TrigDesc; @@ -3552,6 +3552,10 @@ typedef struct AfterTriggerEventList * end of the list, so it is relatively easy to discard them. The event * list chunks themselves are stored in event_cxt. * + * prolonged_tuplestored is a list of transition table tuplestores whose + * life are prolonged to the end of the outmost query instead of each nested + * query. + * * query_depth is the current depth of nested AfterTriggerBeginQuery calls * (-1 when the stack is empty). * @@ -3617,6 +3621,7 @@ typedef struct AfterTriggersData SetConstraintState state; /* the active S C state */ AfterTriggerEventList events; /* deferred-event list */ MemoryContext event_cxt; /* memory context for events, if any */ + List *prolonged_tuplestores; /* list of prolonged tuplestores */ /* per-query-level data: */ AfterTriggersQueryData *query_stack; /* array of structs shown below */ @@ -3652,6 +3657,7 @@ struct AfterTriggersTableData bool closed; /* true when no longer OK to add tuples */ bool before_trig_done; /* did we already queue BS triggers? */ bool after_trig_done; /* did we already queue AS triggers? */ + bool prolonged; /* are transition tables prolonged? */ AfterTriggerEventList after_trig_events; /* if so, saved list pointer */ Tuplestorestate *old_tuplestore; /* "old" transition table, if any */ Tuplestorestate *new_tuplestore; /* "new" transition table, if any */ @@ -3674,6 +3680,7 @@ static AfterTriggersTableData *GetAfterTriggersTableData(Oid relid, static TupleTableSlot *GetAfterTriggersStoreSlot(AfterTriggersTableData *table, TupleDesc tupdesc); static void AfterTriggerFreeQuery(AfterTriggersQueryData *qs); +static void release_or_prolong_tuplestore(Tuplestorestate *ts, bool prolonged); static SetConstraintState SetConstraintStateCreate(int numalloc); static SetConstraintState SetConstraintStateCopy(SetConstraintState state); static SetConstraintState SetConstraintStateAddItem(SetConstraintState state, @@ -4432,6 +4439,45 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events, } +/* + * SetTransitionTablePreserved + * + * Prolong lifespan of transition tables corresponding specified relid and + * command type to the end of the outmost query instead of each nested query. + * This enables to use nested AFTER trigger's transition tables from outer + * query's triggers. Currently, only immediate incremental view maintenance + * uses this. + */ +void +SetTransitionTablePreserved(Oid relid, CmdType cmdType) +{ + AfterTriggersTableData *table; + AfterTriggersQueryData *qs; + bool found = false; + ListCell *lc; + + /* Check state, like AfterTriggerSaveEvent. */ + if (afterTriggers.query_depth < 0) + elog(ERROR, "SetTransitionTablePreserved() called outside of query"); + + qs = &afterTriggers.query_stack[afterTriggers.query_depth]; + + foreach(lc, qs->tables) + { + table = (AfterTriggersTableData *) lfirst(lc); + if (table->relid == relid && table->cmdType == cmdType && + table->closed) + { + table->prolonged = true; + found = true; + } + } + + if (!found) + elog(ERROR,"could not find table with OID %d and command type %d", relid, cmdType); +} + + /* * GetAfterTriggersTableData * @@ -4626,6 +4672,7 @@ AfterTriggerBeginXact(void) */ afterTriggers.firing_counter = (CommandId) 1; /* mustn't be 0 */ afterTriggers.query_depth = -1; + afterTriggers.prolonged_tuplestores = NIL; /* * Verify that there is no leftover state remaining. If these assertions @@ -4786,11 +4833,11 @@ AfterTriggerFreeQuery(AfterTriggersQueryData *qs) ts = table->old_tuplestore; table->old_tuplestore = NULL; if (ts) - tuplestore_end(ts); + release_or_prolong_tuplestore(ts, table->prolonged); ts = table->new_tuplestore; table->new_tuplestore = NULL; if (ts) - tuplestore_end(ts); + release_or_prolong_tuplestore(ts, table->prolonged); if (table->storeslot) ExecDropSingleTupleTableSlot(table->storeslot); } @@ -4802,6 +4849,34 @@ AfterTriggerFreeQuery(AfterTriggersQueryData *qs) */ qs->tables = NIL; list_free_deep(tables); + + /* Release prolonged tuplestores at the end of the outmost query */ + if (afterTriggers.query_depth == 0) + { + foreach(lc, afterTriggers.prolonged_tuplestores) + { + ts = (Tuplestorestate *) lfirst(lc); + if (ts) + tuplestore_end(ts); + } + afterTriggers.prolonged_tuplestores = NIL; + } +} + +/* + * Release the tuplestore, or append it to the prolonged tuplestores list. + */ +static void +release_or_prolong_tuplestore(Tuplestorestate *ts, bool prolonged) +{ + if (prolonged && afterTriggers.query_depth > 0) + { + MemoryContext oldcxt = MemoryContextSwitchTo(CurTransactionContext); + afterTriggers.prolonged_tuplestores = lappend(afterTriggers.prolonged_tuplestores, ts); + MemoryContextSwitchTo(oldcxt); + } + else + tuplestore_end(ts); } @@ -5613,10 +5688,13 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, */ if (afterTriggers.query_depth < 0) elog(ERROR, "AfterTriggerSaveEvent() called outside of query"); - + //FIXME: /* Don't fire statement-triggers in executor nodes. */ - if (!row_trigger && Gp_role == GP_ROLE_EXECUTE) - return; + // if (!row_trigger && Gp_role == GP_ROLE_EXECUTE) + // { + // elog(INFO, "AfterTriggerSaveEvent() Gp_role %d.", Gp_role); + // return; + // } /* Be sure we have enough space to record events at this query depth. */ if (afterTriggers.query_depth >= afterTriggers.maxquerydepth) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 70cff912d51..0a8ec864028 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -488,6 +488,7 @@ static void check_expressions_in_partition_key(PartitionSpec *spec, core_yyscan_ %type OptTempTableName %type into_clause create_as_target create_mv_target +%type incremental %type createfunc_opt_item common_func_opt_item dostmt_opt_item %type func_arg func_arg_with_default table_func_column aggr_arg @@ -764,7 +765,7 @@ static void check_expressions_in_partition_key(PartitionSpec *spec, core_yyscan_ HANDLER HAVING HEADER_P HOLD HOUR_P IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE - INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P + INCLUDING INCREMENT INCREMENTAL INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION @@ -6694,31 +6695,33 @@ ext_opt_encoding_item: *****************************************************************************/ CreateMatViewStmt: - CREATE OptNoLog MATERIALIZED VIEW create_mv_target AS SelectStmt opt_with_data OptDistributedBy + CREATE OptNoLog incremental MATERIALIZED VIEW create_mv_target AS SelectStmt opt_with_data OptDistributedBy { CreateTableAsStmt *ctas = makeNode(CreateTableAsStmt); - ctas->query = $7; - ctas->into = $5; + ctas->query = $8; + ctas->into = $6; ctas->objtype = OBJECT_MATVIEW; ctas->is_select_into = false; ctas->if_not_exists = false; /* cram additional flags into the IntoClause */ - $5->rel->relpersistence = $2; - $5->skipData = !($8); - ctas->into->distributedBy = $9; + $6->rel->relpersistence = $2; + $6->skipData = !($9); + $6->ivm = $3; + ctas->into->distributedBy = $10; $$ = (Node *) ctas; } - | CREATE OptNoLog MATERIALIZED VIEW IF_P NOT EXISTS create_mv_target AS SelectStmt opt_with_data + | CREATE OptNoLog incremental MATERIALIZED VIEW IF_P NOT EXISTS create_mv_target AS SelectStmt opt_with_data { CreateTableAsStmt *ctas = makeNode(CreateTableAsStmt); - ctas->query = $10; - ctas->into = $8; + ctas->query = $11; + ctas->into = $9; ctas->objtype = OBJECT_MATVIEW; ctas->is_select_into = false; ctas->if_not_exists = true; /* cram additional flags into the IntoClause */ - $8->rel->relpersistence = $2; - $8->skipData = !($11); + $9->rel->relpersistence = $2; + $9->skipData = !($12); + $9->ivm = $3; $$ = (Node *) ctas; } ; @@ -6735,11 +6738,16 @@ create_mv_target: $$->tableSpaceName = $5; $$->viewQuery = NULL; /* filled at analysis time */ $$->skipData = false; /* might get changed later */ + $$->ivm = false; $$->accessMethod = greenplumLegacyAOoptions($$->accessMethod, &$$->options); } ; +incremental: INCREMENTAL { $$ = true; } + | /*EMPTY*/ { $$ = false; } + ; + OptNoLog: UNLOGGED { $$ = RELPERSISTENCE_UNLOGGED; } | /*EMPTY*/ { $$ = RELPERSISTENCE_PERMANENT; } ; @@ -18583,6 +18591,7 @@ unreserved_keyword: | INCLUDING | INCLUSIVE | INCREMENT + | INCREMENTAL | INDEX | INDEXES | INHERIT @@ -19511,6 +19520,7 @@ bare_label_keyword: | INCLUDING | INCLUSIVE | INCREMENT + | INCREMENTAL | INDEX | INDEXES | INHERIT diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 1d63212197a..f5c96d20ac2 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -2044,6 +2044,30 @@ is_agg_partial_capable(Oid aggid) return result; } +/* + * get_rel_relisivm + * + * Returns the relisivm flag associated with a given relation. + */ +bool +get_rel_relisivm(Oid relid) +{ + HeapTuple tp; + + tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (HeapTupleIsValid(tp)) + { + Form_pg_class reltup = (Form_pg_class) GETSTRUCT(tp); + bool result; + + result = reltup->relisivm; + ReleaseSysCache(tp); + return result; + } + else + return false; +} + /* * get_rel_tablespace * diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index f8dd5c30368..deef5d3887d 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -1966,6 +1966,8 @@ formrdesc(const char *relationName, Oid relationReltype, /* ... and they're always populated, too */ relation->rd_rel->relispopulated = true; + /* ... and they're always no ivm, too */ + relation->rd_rel->relisivm = false; relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING; relation->rd_rel->relpages = 0; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ef2a276c048..be9dd73c109 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -7386,6 +7386,7 @@ getTables(Archive *fout, int *numTables) int i_ispartition; int i_partbound; int i_amname; + int i_isivm; /* * Find all the tables and table-like objects. @@ -7504,7 +7505,8 @@ getTables(Archive *fout, int *numTables) "AS changed_acl, " "%s AS partkeydef, " "%s AS ispartition, " - "%s AS partbound " + "%s AS partbound, " + "c.relisivm AS isivm " "FROM pg_class c " "LEFT JOIN pg_depend d ON " "(c.relkind = '%c' AND " @@ -8073,6 +8075,7 @@ getTables(Archive *fout, int *numTables) i_ispartition = PQfnumber(res, "ispartition"); i_partbound = PQfnumber(res, "partbound"); i_amname = PQfnumber(res, "amname"); + i_isivm = PQfnumber(res, "isivm"); if (dopt->lockWaitTimeout) { @@ -8199,6 +8202,7 @@ getTables(Archive *fout, int *numTables) tblinfo[i].partkeydef = pg_strdup(PQgetvalue(res, i, i_partkeydef)); tblinfo[i].ispartition = (strcmp(PQgetvalue(res, i, i_ispartition), "t") == 0); tblinfo[i].partbound = pg_strdup(PQgetvalue(res, i, i_partbound)); + tblinfo[i].isivm = (strcmp(PQgetvalue(res, i, i_isivm), "t") == 0); /* foreign server */ tblinfo[i].foreign_server = atooid(PQgetvalue(res, i, i_foreignserver)); @@ -18115,9 +18119,11 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo) } } - appendPQExpBuffer(q, "CREATE %s%s %s", + appendPQExpBuffer(q, "CREATE %s%s%s %s", tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED ? "UNLOGGED " : "", + tbinfo->relkind == RELKIND_MATVIEW && tbinfo->isivm ? + "INCREMENTAL " : "", reltypename, qualrelname); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 009762223ac..c56c3439073 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -396,6 +396,7 @@ typedef struct _tableInfo struct _tableDataInfo *dataObj; /* TableDataInfo, if dumping its data */ int numTriggers; /* number of triggers for table */ struct _triggerInfo *triggers; /* array of TriggerInfo structs */ + bool isivm; /* is incrementally maintainable materialized view? */ } TableInfo; typedef struct _tableAttachInfo diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 03d076806d6..a06a994ed3e 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2171,6 +2171,21 @@ { exclude_dump_test_schema => 1, no_toast_compression => 1, }, }, + 'CREATE MATERIALIZED VIEW matview_ivm' => { + create_order => 21, + create_sql => 'CREATE INCREMENTAL MATERIALIZED VIEW dump_test.matview_ivm (col1) AS + SELECT col1 FROM dump_test.test_table;', + regexp => qr/^ + \QCREATE INCREMENTAL MATERIALIZED VIEW dump_test.matview_ivm AS\E + \n\s+\QSELECT test_table.col1\E + \n\s+\QFROM dump_test.test_table\E + \n\s+\QWITH NO DATA;\E + /xm, + like => + { %full_runs, %dump_test_schema_runs, section_pre_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + 'CREATE POLICY p1 ON test_table' => { create_order => 22, create_sql => 'CREATE POLICY p1 ON dump_test.test_table diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index f66e4535b8e..c1c23315ee9 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -1922,6 +1922,7 @@ describeOneTableDetails(const char *schemaname, char relpersistence; char relreplident; char *relam; + bool isivm; char *compressionType; char *compressionLevel; @@ -1953,7 +1954,8 @@ describeOneTableDetails(const char *schemaname, "c.relhastriggers, c.relrowsecurity, c.relforcerowsecurity, " "false AS relhasoids, c.relispartition, %s, c.reltablespace, " "CASE WHEN c.reloftype = 0 THEN '' ELSE c.reloftype::pg_catalog.regtype::pg_catalog.text END, " - "c.relpersistence, c.relreplident, am.amname\n" + "c.relpersistence, c.relreplident, am.amname, " + "c.relisivm\n" "FROM pg_catalog.pg_class c\n " "LEFT JOIN pg_catalog.pg_class tc ON (c.reltoastrelid = tc.oid)\n" "LEFT JOIN pg_catalog.pg_am am ON (c.relam = am.oid)\n" @@ -2148,6 +2150,7 @@ describeOneTableDetails(const char *schemaname, (char *) NULL : pg_strdup(PQgetvalue(res, 0, 14)); else tableinfo.relam = NULL; + tableinfo.isivm = strcmp(PQgetvalue(res, 0, 15), "t") == 0; /* GPDB Only: relstorage */ if (pset.sversion < 120000 && isGPDB()) @@ -4094,6 +4097,12 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, _("Access method: %s"), tableinfo.relam); printTableAddFooter(&cont, buf.data); } + + /* Incremental view maintance info */ + if (verbose && tableinfo.relkind == RELKIND_MATVIEW && tableinfo.isivm) + { + printTableAddFooter(&cont, _("Incremental view maintenance: yes")); + } } /* reloptions, if verbose */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 5cae4f6a677..2ba21b7eba9 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1101,6 +1101,7 @@ static const pgsql_thing_t words_after_create[] = { {"FOREIGN TABLE", NULL, NULL, NULL}, {"FUNCTION", NULL, NULL, Query_for_list_of_functions}, {"GROUP", Query_for_list_of_roles}, + {"INCREMENTAL MATERIALIZED VIEW", NULL, NULL, &Query_for_list_of_matviews, THING_NO_DROP | THING_NO_ALTER}, {"INDEX", NULL, NULL, &Query_for_list_of_indexes}, {"LANGUAGE", Query_for_list_of_languages}, {"LARGE OBJECT", NULL, NULL, NULL, THING_NO_CREATE | THING_NO_DROP}, @@ -2756,7 +2757,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("SEQUENCE", "TABLE", "VIEW"); /* Complete "CREATE UNLOGGED" with TABLE or MATVIEW */ else if (TailMatches("CREATE", "UNLOGGED")) - COMPLETE_WITH("TABLE", "MATERIALIZED VIEW"); + COMPLETE_WITH("TABLE", "MATERIALIZED VIEW", "INCREMENTAL MATERIALIZED VIEW"); /* Complete PARTITION BY with RANGE ( or LIST ( or ... */ else if (TailMatches("PARTITION", "BY")) COMPLETE_WITH("RANGE (", "LIST (", "HASH ("); @@ -3077,13 +3078,16 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("SELECT"); /* CREATE MATERIALIZED VIEW */ - else if (Matches("CREATE", "MATERIALIZED")) + else if (Matches("CREATE", "MATERIALIZED") || + Matches("CREATE", "INCREMENTAL", "MATERIALIZED")) COMPLETE_WITH("VIEW"); - /* Complete CREATE MATERIALIZED VIEW with AS */ - else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny)) + /* Complete CREATE MATERIALIZED VIEW with AS */ + else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny) || + Matches("CREATE", "INCREMENTAL", "MATERIALIZED", "VIEW", MatchAny)) COMPLETE_WITH("AS"); /* Complete "CREATE MATERIALIZED VIEW AS with "SELECT" */ - else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny, "AS")) + else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny, "AS") || + Matches("CREATE", "INCREMENTAL", "MATERIALIZED", "VIEW", MatchAny, "AS")) COMPLETE_WITH("SELECT"); /* CREATE EVENT TRIGGER */ diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h index 02e56aed583..813d47fee83 100644 --- a/src/include/catalog/pg_class.h +++ b/src/include/catalog/pg_class.h @@ -119,6 +119,9 @@ CATALOG(pg_class,1259,RelationRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83,Relat /* is relation a partition? */ bool relispartition BKI_DEFAULT(f); + /* is relation a matview with ivm? */ + bool relisivm BKI_DEFAULT(f); + /* link to original rel during table rewrite; otherwise 0 */ Oid relrewrite BKI_DEFAULT(0) BKI_LOOKUP_OPT(pg_class); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d0833282b22..a972261b275 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12475,5 +12475,21 @@ { oid => 7077, descr => 'Update gp_segment_configuration mode and status by dbid', proname => 'gp_update_segment_configuration_mode_status', proisstrict => 'f', provolatile => 'v', proparallel => 'r', prorettype => 'int2', proargtypes => 'int4 char char', prosrc => 'gp_update_segment_configuration_mode_status'}, +# IVM +{ oid => '786', descr => 'ivm trigger (before)', + proname => 'ivm_immediate_before', provolatile => 'v', prorettype => 'trigger', + proargtypes => '', prosrc => 'ivm_immediate_before' }, +{ oid => '787', descr => 'ivm trigger (after)', + proname => 'ivm_immediate_maintenance', provolatile => 'v', prorettype => 'trigger', + proargtypes => '', prosrc => 'ivm_immediate_maintenance' }, +{ oid => '788', descr => 'ivm filetring ', + proname => 'ivm_visible_in_prestate', provolatile => 's', prorettype => 'bool', + proargtypes => 'oid tid oid', prosrc => 'ivm_visible_in_prestate' }, +{ oid => '789', descr => 'ivm rule before ', + proname => 'ivm_rule_before', provolatile => 'v', prorettype => 'bool', + proargtypes => 'oid', prosrc => 'ivm_rule_before' }, +{ oid => '7201', descr => 'ivm rule after ', + proname => 'ivm_rule_after', provolatile => 'v', prorettype => 'bool', + proargtypes => 'oid', prosrc => 'ivm_rule_after' }, ] diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index e346df3c636..ad4b2abf6ca 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -16,6 +16,7 @@ #include "catalog/objectaddress.h" #include "nodes/params.h" +#include "nodes/pathnodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" #include "utils/queryenvironment.h" @@ -25,6 +26,9 @@ extern ObjectAddress ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *st ParamListInfo params, QueryEnvironment *queryEnv, QueryCompletion *qc); +extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid); +extern void CreateIndexOnIMMV(Query *query, Relation matviewRel); + extern int GetIntoRelEFlags(IntoClause *intoClause); extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause); diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h index 767232a6259..ed3ad127681 100644 --- a/src/include/commands/matview.h +++ b/src/include/commands/matview.h @@ -24,6 +24,8 @@ extern void SetMatViewPopulatedState(Relation relation, bool newstate); +extern void SetMatViewIVMState(Relation relation, bool newstate); + extern ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, QueryCompletion *qc); @@ -34,4 +36,12 @@ extern bool MatViewIncrementalMaintenanceIsEnabled(void); extern void transientrel_init(QueryDesc *queryDesc); +extern Datum ivm_immediate_before(PG_FUNCTION_ARGS); +extern Datum ivm_immediate_maintenance(PG_FUNCTION_ARGS); +extern Datum ivm_visible_in_prestate(PG_FUNCTION_ARGS); +extern Datum ivm_rule_before(PG_FUNCTION_ARGS); +extern Datum ivm_rule_after(PG_FUNCTION_ARGS); +extern void AtAbort_IVM(void); +extern bool isIvmName(const char *s); + #endif /* MATVIEW_H */ diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index fc826b80006..5d9aa063c62 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -254,6 +254,8 @@ extern void AfterTriggerEndSubXact(bool isCommit); extern void AfterTriggerSetState(ConstraintsSetStmt *stmt); extern bool AfterTriggerPendingOnRel(Oid relid); +extern void SetTransitionTablePreserved(Oid relid, CmdType cmdType); + /* * in utils/adt/ri_triggers.c diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 57fb269564e..00b5ab952ce 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -122,6 +122,7 @@ typedef struct IntoClause Node *viewQuery; /* materialized view's SELECT query */ bool skipData; /* true for WITH NO DATA */ Node *distributedBy; /* GPDB: columns to distribubte the data on. */ + bool ivm; /* true for WITH IVM */ } IntoClause; typedef struct CopyIntoClause diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index fc275952c71..363fa5e70ba 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -229,6 +229,7 @@ PG_KEYWORD("include", INCLUDE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("including", INCLUDING, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("inclusive", INCLUSIVE, UNRESERVED_KEYWORD, BARE_LABEL) /* GPDB */ PG_KEYWORD("increment", INCREMENT, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("incremental", INCREMENTAL, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("index", INDEX, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("indexes", INDEXES, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("inherit", INHERIT, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 6369d4d36fd..aa2d4014de6 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -171,6 +171,7 @@ extern Oid get_rel_namespace(Oid relid); extern Oid get_rel_type_id(Oid relid); extern char get_rel_relkind(Oid relid); extern bool get_rel_relispartition(Oid relid); +extern bool get_rel_relisivm(Oid relid); extern Oid get_rel_tablespace(Oid relid); extern char get_rel_persistence(Oid relid); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 6c3757dab00..eb1dc64726e 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -728,6 +728,8 @@ typedef struct ViewOptions */ #define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated) +#define RelationIsIVM(relation) ((relation)->rd_rel->relisivm) + /* * RelationIsAccessibleInLogicalDecoding * True if we need to log enough information to have access via