Re: creating CHECK constraints as NOT VALID
| От | Alvaro Herrera |
|---|---|
| Тема | Re: creating CHECK constraints as NOT VALID |
| Дата | |
| Msg-id | 1308085945-sup-6351@alvh.no-ip.org обсуждение исходный текст |
| Ответ на | Re: creating CHECK constraints as NOT VALID (Alvaro Herrera <alvherre@commandprompt.com>) |
| Ответы |
Re: creating CHECK constraints as NOT VALID
Re: creating CHECK constraints as NOT VALID |
| Список | pgsql-hackers |
Excerpts from Alvaro Herrera's message of lun jun 13 18:08:12 -0400 2011:
> Excerpts from Dean Rasheed's message of sáb jun 11 09:32:15 -0400 2011:
> > I think that you also need to update the constraint exclusion code
> > (get_relation_constraints() or nearby), otherwise the planner might
> > exclude a relation on the basis of a CHECK constraint that is not
> > currently VALID.
>
> Ouch, yeah, thanks for pointing that out. Fortunately the patch to fix
> this is quite simple. I don't have it handy right now but I'll post it
> soon.
Here's the complete patch.
*** a/doc/src/sgml/catalogs.sgml
--- b/doc/src/sgml/catalogs.sgml
***************
*** 1898,1904 **** <entry><structfield>convalidated</structfield></entry> <entry><type>bool</type></entry>
<entry></entry>
! <entry>Has the constraint been validated? Can only be false for foreign keys</entry> </row> <row>
--- 1898,1904 ---- <entry><structfield>convalidated</structfield></entry> <entry><type>bool</type></entry>
<entry></entry>
! <entry>Has the constraint been validated? Can only be false for foreign keys and CHECK constraints</entry>
</row> <row>
*** a/doc/src/sgml/ref/alter_domain.sgml
--- b/doc/src/sgml/ref/alter_domain.sgml
***************
*** 28,37 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable
class="PARAMETER">name</replaceable> { SET | DROP } NOT NULL ALTER DOMAIN <replaceable
class="PARAMETER">name</replaceable>
! ADD <replaceable class="PARAMETER">domain_constraint</replaceable> ALTER DOMAIN <replaceable
class="PARAMETER">name</replaceable> DROP CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> [
RESTRICT| CASCADE ] ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> OWNER TO <replaceable
class="PARAMETER">new_owner</replaceable>ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> SET SCHEMA
<replaceableclass="PARAMETER">new_schema</replaceable>
--- 28,39 ---- ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> { SET | DROP } NOT NULL ALTER DOMAIN
<replaceableclass="PARAMETER">name</replaceable>
! ADD <replaceable class="PARAMETER">domain_constraint</replaceable> [ NOT VALID ] ALTER DOMAIN <replaceable
class="PARAMETER">name</replaceable> DROP CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> [
RESTRICT| CASCADE ] ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
+ VALIDATE CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable>
+ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> OWNER TO <replaceable
class="PARAMETER">new_owner</replaceable>ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> SET SCHEMA
<replaceableclass="PARAMETER">new_schema</replaceable>
***************
*** 70,82 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> </varlistentry> <varlistentry>
! <term>ADD <replaceable class="PARAMETER">domain_constraint</replaceable></term> <listitem> <para>
Thisform adds a new constraint to a domain using the same syntax as <xref linkend="SQL-CREATEDOMAIN">.
! This will only succeed if all columns using the domain satisfy the
! new constraint. </para> </listitem> </varlistentry>
--- 72,88 ---- </varlistentry> <varlistentry>
! <term>ADD <replaceable class="PARAMETER">domain_constraint</replaceable> [ NOT VALID ]</term> <listitem>
<para> This form adds a new constraint to a domain using the same syntax as <xref
linkend="SQL-CREATEDOMAIN">.
! If NOT VALID is not specified,
! the command will only succeed if all columns using the
! domain satisfy the new constraint.
! The constraint is going to be enforced on new data inserted into columns
! using the domain in all cases, regardless of <literal>NOT VALID</>.
! <literal>NOT VALID</> is only accepted for <literal>CHECK</> constraints. </para> </listitem>
</varlistentry>
***************
*** 91,96 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
--- 97,113 ---- </varlistentry> <varlistentry>
+ <term>VALIDATE CONSTRAINT</term>
+ <listitem>
+ <para>
+ This form validates a constraint previously added, that is, verify that
+ all data in columns using the domain satisfy the specified constraint.
+ </para>
+ </listitem>
+ </varlistentry>
+
+
+ <varlistentry> <term>OWNER</term> <listitem> <para>
***************
*** 156,161 **** ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
--- 173,188 ---- </varlistentry> <varlistentry>
+ <term><replaceable class="PARAMETER">NOT VALID</replaceable></term>
+ <listitem>
+ <para>
+ Do not verify existing column data for constraint validity.
+ </para>
+ </listitem>
+ </varlistentry>
+
+
+ <varlistentry> <term><literal>CASCADE</literal></term> <listitem> <para>
***************
*** 251,257 **** ALTER DOMAIN zipcode SET SCHEMA customers; <para> <command>ALTER DOMAIN</command> conforms to the
<acronym>SQL</acronym> standard,
! except for the <literal>OWNER</> and <literal>SET SCHEMA</> variants, which are
<productname>PostgreSQL</productname>extensions. </para> </refsect1>
--- 278,286 ---- <para> <command>ALTER DOMAIN</command> conforms to the <acronym>SQL</acronym> standard,
! except for the <literal>OWNER</>, <literal>SET SCHEMA</> and
! <literal>VALIDATE CONSTRAINT</> variants,
! as well as the <literal>NOT VALID</> clause of the <literal>ADD CONSTRAINT</> variant, which are
<productname>PostgreSQL</productname>extensions. </para> </refsect1>
*** a/doc/src/sgml/ref/alter_table.sgml
--- b/doc/src/sgml/ref/alter_table.sgml
***************
*** 240,246 **** ALTER TABLE <replaceable class="PARAMETER">name</replaceable> <listitem> <para> This
formadds a new constraint to a table using the same syntax as
! <xref linkend="SQL-CREATETABLE">. Newly added foreign key constraints can also be defined as <literal>NOT
VALID</literal>to avoid the potentially lengthy initial check that must otherwise be performed. Constraint
checksare skipped at create table time, so
--- 240,246 ---- <listitem> <para> This form adds a new constraint to a table using the same syntax as
! <xref linkend="SQL-CREATETABLE">. Newly added foreign key and CHECK constraints can also be defined as
<literal>NOTVALID</literal> to avoid the potentially lengthy initial check that must otherwise be performed.
Constraint checks are skipped at create table time, so
***************
*** 253,259 **** ALTER TABLE <replaceable class="PARAMETER">name</replaceable> <term><literal>VALIDATE
CONSTRAINT</literal></term> <listitem> <para>
! This form validates a foreign key constraint that was previously created as <literal>NOT VALID</literal>.
Constraintsalready marked valid do not cause an error response. </para>
--- 253,259 ---- <term><literal>VALIDATE CONSTRAINT</literal></term> <listitem> <para>
! This form validates a foreign key or CHECK constraint that was previously created as <literal>NOT
VALID</literal>.Constraints already marked valid do not cause an error response. </para>
*** a/src/backend/access/common/tupdesc.c
--- b/src/backend/access/common/tupdesc.c
***************
*** 200,205 **** CreateTupleDescCopyConstr(TupleDesc tupdesc)
--- 200,206 ---- cpy->check[i].ccname = pstrdup(constr->check[i].ccname); if
(constr->check[i].ccbin) cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
+ cpy->check[i].ccvalid = constr->check[i].ccvalid; } }
*** a/src/backend/catalog/heap.c
--- b/src/backend/catalog/heap.c
***************
*** 98,104 **** static Oid AddNewRelationType(const char *typeName, Oid new_array_type); static void
RelationRemoveInheritance(Oidrelid); static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
! bool is_local, int inhcount); static void StoreConstraints(Relation rel, List *cooked_constraints);
staticbool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, bool
allow_merge,bool is_local);
--- 98,104 ---- Oid new_array_type); static void RelationRemoveInheritance(Oid relid); static void
StoreRelCheck(Relationrel, char *ccname, Node *expr,
! bool is_validated, bool is_local, int inhcount); static void StoreConstraints(Relation rel, List
*cooked_constraints);static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
bool allow_merge, bool is_local);
***************
*** 1845,1851 **** StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr) */ static void StoreRelCheck(Relation
rel,char *ccname, Node *expr,
! bool is_local, int inhcount) { char *ccbin; char *ccsrc;
--- 1845,1851 ---- */ static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
! bool is_validated, bool is_local, int inhcount) { char *ccbin; char *ccsrc;
***************
*** 1906,1912 **** StoreRelCheck(Relation rel, char *ccname, Node *expr, CONSTRAINT_CHECK,
/* Constraint Type */ false, /* Is Deferrable */ false, /*
IsDeferred */
! true, /* Is Validated */ RelationGetRelid(rel), /*
relation*/ attNos, /* attrs in the constraint */ keycount,
/* # attrs in the constraint */
--- 1906,1912 ---- CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */ false, /* Is Deferred */
! is_validated, RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */ keycount, /* # attrs in
theconstraint */
***************
*** 1966,1972 **** StoreConstraints(Relation rel, List *cooked_constraints) StoreAttrDefault(rel,
con->attnum,con->expr); break; case CONSTR_CHECK:
! StoreRelCheck(rel, con->name, con->expr, con->is_local, con->inhcount);
numchecks++; break;
--- 1966,1972 ---- StoreAttrDefault(rel, con->attnum, con->expr); break;
caseCONSTR_CHECK:
! StoreRelCheck(rel, con->name, con->expr, !con->skip_validation,
con->is_local,con->inhcount); numchecks++; break;
***************
*** 2080,2085 **** AddRelationNewConstraints(Relation rel,
--- 2080,2086 ---- cooked->name = NULL; cooked->attnum = colDef->attnum; cooked->expr = expr;
+ cooked->skip_validation = false; cooked->is_local = is_local; cooked->inhcount = is_local ? 0
:1; cookedConstraints = lappend(cookedConstraints, cooked);
***************
*** 2193,2199 **** AddRelationNewConstraints(Relation rel, /* * OK, store it. */
! StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1); numchecks++;
--- 2194,2201 ---- /* * OK, store it. */
! StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local,
! is_local ? 0 : 1); numchecks++;
***************
*** 2202,2207 **** AddRelationNewConstraints(Relation rel,
--- 2204,2210 ---- cooked->name = ccname; cooked->attnum = 0; cooked->expr = expr;
+ cooked->skip_validation = cdef->skip_validation; cooked->is_local = is_local;
cooked->inhcount= is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked);
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 258,264 **** static void AlterIndexNamespaces(Relation classRel, Relation rel, static void
AlterSeqNamespaces(RelationclassRel, Relation rel, Oid oldNspOid, Oid newNspOid,
constchar *newNspName, LOCKMODE lockmode);
! static void ATExecValidateConstraint(Relation rel, const char *constrName); static int transformColumnNameList(Oid
relId,List *colList, int16 *attnums, Oid *atttypids); static int
transformFkeyGetPrimaryKey(Relationpkrel, Oid *indexOid,
--- 258,265 ---- static void AlterSeqNamespaces(Relation classRel, Relation rel, Oid oldNspOid, Oid
newNspOid, const char *newNspName, LOCKMODE lockmode);
! static void ATExecValidateConstraint(Relation rel, const char *constrName,
! bool recurse, bool recursing, LOCKMODE lockmode); static int transformColumnNameList(Oid
relId,List *colList, int16 *attnums, Oid *atttypids); static int
transformFkeyGetPrimaryKey(Relationpkrel, Oid *indexOid,
***************
*** 269,274 **** static Oid transformFkeyCheckAttrs(Relation pkrel,
--- 270,277 ---- int numattrs, int16 *attnums, Oid *opclasses); static
voidcheckFkeyPermissions(Relation rel, int16 *attnums, int natts);
+ static void validateCheckConstraint(char *conname, Relation rel,
+ HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid);
***************
*** 560,565 **** DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
--- 563,569 ---- cooked->name = NULL; cooked->attnum = attnum; cooked->expr =
colDef->cooked_default;
+ cooked->skip_validation = false; cooked->is_local = true; /* not used for defaults */
cooked->inhcount = 0; /* ditto */ cookedDefaults = lappend(cookedDefaults, cooked);
***************
*** 1567,1572 **** MergeAttributes(List *schema, List *supers, char relpersistence,
--- 1571,1577 ---- cooked->name = pstrdup(name); cooked->attnum = 0; /* not
usedfor constraints */ cooked->expr = expr;
+ cooked->skip_validation = false; cooked->is_local = false;
cooked->inhcount = 1; constraints = lappend(constraints, cooked);
***************
*** 2932,2938 **** ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepAddInherit(rel);
pass = AT_PASS_MISC; break;
! case AT_ValidateConstraint: case AT_EnableTrig: /* ENABLE TRIGGER variants */ case
AT_EnableAlwaysTrig: case AT_EnableReplicaTrig:
--- 2937,2950 ---- ATPrepAddInherit(rel); pass = AT_PASS_MISC; break;
! case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
! ATSimplePermissions(rel, ATT_TABLE);
! /* Recursion occurs during execution phase */
! /* No command-specific prep needed except saving recurse flag */
! if (recurse)
! cmd->subtype = AT_ValidateConstraintRecurse;
! pass = AT_PASS_MISC;
! break; case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
***************
*** 3097,3104 **** ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_AddIndexConstraint:
/* ADD CONSTRAINT USING INDEX */ ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break;
! case AT_ValidateConstraint:
! ATExecValidateConstraint(rel, cmd->name); break; case AT_DropConstraint: /* DROP
CONSTRAINT*/ ATExecDropConstraint(rel, cmd->name, cmd->behavior,
--- 3109,3120 ---- case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab,rel, (IndexStmt *) cmd->def, lockmode); break;
! case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
! ATExecValidateConstraint(rel, cmd->name, false, false, lockmode);
! break;
! case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
! * recursion */
! ATExecValidateConstraint(rel, cmd->name, true, false, lockmode); break; case
AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior,
***************
*** 5382,5400 **** ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
list_make1(copyObject(constr)), recursing, !recursing);
! /* Add each constraint to Phase 3's queue */ foreach(lcon, newcons) { CookedConstraint *ccon =
(CookedConstraint*) lfirst(lcon);
- NewConstraint *newcon;
! newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
! newcon->name = ccon->name;
! newcon->contype = ccon->contype;
! /* ExecQual wants implicit-AND format */
! newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
! tab->constraints = lappend(tab->constraints, newcon); /* Save the actually assigned name if it was
defaulted*/ if (constr->conname == NULL)
--- 5398,5420 ---- list_make1(copyObject(constr)),
recursing, !recursing);
! /* Add each to-be-validated constraint to Phase 3's queue */ foreach(lcon, newcons) {
CookedConstraint*ccon = (CookedConstraint *) lfirst(lcon);
! if (!ccon->skip_validation)
! {
! NewConstraint *newcon;
! newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
! newcon->name = ccon->name;
! newcon->contype = ccon->contype;
! /* ExecQual wants implicit-AND format */
! newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
!
! tab->constraints = lappend(tab->constraints, newcon);
! } /* Save the actually assigned name if it was defaulted */ if (constr->conname == NULL)
***************
*** 5753,5761 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * ALTER TABLE VALIDATE
CONSTRAINT */ static void
! ATExecValidateConstraint(Relation rel, const char *constrName) { Relation conrel; SysScanDesc scan;
--- 5773,5787 ---- /* * ALTER TABLE VALIDATE CONSTRAINT
+ *
+ * XXX The reason we handle recursion here rather than at Phase 1 is because
+ * there's no good way to skip recursing when handling foreign keys: there is
+ * no need to lock children in that case, yet we wouldn't be able to avoid
+ * doing so at that level. */ static void
! ATExecValidateConstraint(Relation rel, const char *constrName, bool recurse,
! bool recursing, LOCKMODE lockmode) { Relation conrel; SysScanDesc scan;
***************
*** 5779,5786 **** ATExecValidateConstraint(Relation rel, const char *constrName) while (HeapTupleIsValid(tuple =
systable_getnext(scan))) { con = (Form_pg_constraint) GETSTRUCT(tuple);
! if (con->contype == CONSTRAINT_FOREIGN &&
! strcmp(NameStr(con->conname), constrName) == 0) { found = true; break;
--- 5805,5811 ---- while (HeapTupleIsValid(tuple = systable_getnext(scan))) { con =
(Form_pg_constraint)GETSTRUCT(tuple);
! if (strcmp(NameStr(con->conname), constrName) == 0) { found = true; break;
***************
*** 5790,5828 **** ATExecValidateConstraint(Relation rel, const char *constrName) if (!found)
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
! errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist",
constrName,RelationGetRelationName(rel)))); if (!con->convalidated) {
! Oid conid = HeapTupleGetOid(tuple);
! HeapTuple copyTuple = heap_copytuple(tuple);
! Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
! Relation refrel;
! /*
! * Triggers are already in place on both tables, so a concurrent write
! * that alters the result here is not possible. Normally we can run a
! * query here to do the validation, which would only require
! * AccessShareLock. In some cases, it is possible that we might need
! * to fire triggers to perform the check, so we take a lock at
! * RowShareLock level just in case.
! */
! refrel = heap_open(con->confrelid, RowShareLock);
! validateForeignKeyConstraint((char *) constrName, rel, refrel,
! con->conindid,
! conid); /* * Now update the catalog, while we have the door
open. */ copy_con->convalidated = true; simple_heap_update(conrel, ©Tuple->t_self,
copyTuple); CatalogUpdateIndexes(conrel, copyTuple); heap_freetuple(copyTuple);
-
- heap_close(refrel, NoLock); } systable_endscan(scan);
--- 5815,5918 ---- if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
! errmsg("constraint \"%s\" of relation \"%s\" does not exist",
! constrName, RelationGetRelationName(rel))));
!
! if (con->contype != CONSTRAINT_FOREIGN &&
! con->contype != CONSTRAINT_CHECK)
! ereport(ERROR,
! (errcode(ERRCODE_WRONG_OBJECT_TYPE),
! errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
constrName, RelationGetRelationName(rel)))); if (!con->convalidated) {
! HeapTuple copyTuple;
! Form_pg_constraint copy_con;
! if (con->contype == CONSTRAINT_FOREIGN)
! {
! Oid conid = HeapTupleGetOid(tuple);
! Relation refrel;
! /*
! * Triggers are already in place on both tables, so a concurrent write
! * that alters the result here is not possible. Normally we can run a
! * query here to do the validation, which would only require
! * AccessShareLock. In some cases, it is possible that we might need
! * to fire triggers to perform the check, so we take a lock at
! * RowShareLock level just in case.
! */
! refrel = heap_open(con->confrelid, RowShareLock);
!
! validateForeignKeyConstraint((char *) constrName, rel, refrel,
! con->conindid,
! conid);
! heap_close(refrel, NoLock);
!
! /*
! * Foreign keys do not inherit, so we purposedly ignore the
! * recursion bit here
! */
! }
! else if (con->contype == CONSTRAINT_CHECK)
! {
! List *children = NIL;
! ListCell *child;
!
! /*
! * If we're recursing, the parent has already done this, so skip
! * it.
! */
! if (!recursing)
! children = find_all_inheritors(RelationGetRelid(rel),
! lockmode, NULL);
!
! /*
! * For CHECK constraints, we must ensure that we only mark the
! * constraint as validated on the parent if it's already validated
! * on the children.
! *
! * We recurse before validating on the parent, to reduce risk of
! * deadlocks.
! */
! foreach(child, children)
! {
! Oid childoid = lfirst_oid(child);
! Relation childrel;
!
! if (childoid == RelationGetRelid(rel))
! continue;
!
! /*
! * If we are told not to recurse, there had better not be any
! * child tables; else the addition would put them out of step.
! */
! if (!recurse)
! ereport(ERROR,
! (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
! errmsg("constraint must be validated on child tables too")));
!
! /* find_all_inheritors already got lock */
! childrel = heap_open(childoid, NoLock);
!
! ATExecValidateConstraint(childrel, constrName, false,
! true, lockmode);
! heap_close(childrel, NoLock);
! }
!
! validateCheckConstraint((char *) constrName, rel, tuple);
! } /* * Now update the catalog, while we have the door open. */
+ copyTuple = heap_copytuple(tuple);
+ copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); copy_con->convalidated = true;
simple_heap_update(conrel,©Tuple->t_self, copyTuple); CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple); } systable_endscan(scan);
***************
*** 6128,6133 **** checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
--- 6218,6292 ---- } /*
+ * Scan the existing rows in a table to verify they meet a proposed
+ * CHECK constraint.
+ *
+ * The caller must have opened and locked the relation appropriately.
+ */
+ static void
+ validateCheckConstraint(char *conname, Relation rel, HeapTuple constrtup)
+ {
+ EState *estate;
+ Datum val;
+ char *conbin;
+ Expr *origexpr;
+ List *exprstate;
+ TupleDesc tupdesc;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ ExprContext *econtext;
+ MemoryContext oldcxt;
+ TupleTableSlot *slot;
+ bool isnull;
+
+ estate = CreateExecutorState();
+ /*
+ * XXX this tuple doesn't really come from a syscache, but this doesn't
+ * matter to SysCacheGetAttr, because it only wants to be able to fetch the
+ * tupdesc
+ */
+ val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
+ &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u",
+ HeapTupleGetOid(constrtup));
+ conbin = TextDatumGetCString(val);
+ origexpr = (Expr *) stringToNode(conbin);
+ exprstate = (List *) ExecPrepareExpr((Expr *) make_ands_implicit((Expr *) origexpr), estate);
+
+ econtext = GetPerTupleExprContext(estate);
+ tupdesc = RelationGetDescr(rel);
+ slot = MakeSingleTupleTableSlot(tupdesc);
+ econtext->ecxt_scantuple = slot;
+
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+
+ /*
+ * Switch to per-tuple memory context and reset it for each tuple
+ * produced, so we don't leak memory.
+ */
+ oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+ if (!ExecQual(exprstate, econtext, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("check constraint \"%s\" is violated by some row",
+ conname)));
+
+ ResetExprContext(econtext);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+ heap_endscan(scan);
+ ExecDropSingleTupleTableSlot(slot);
+ FreeExecutorState(estate);
+ }
+
+ /* * Scan the existing rows in a table to verify they meet a proposed FK * constraint. *
*** a/src/backend/commands/typecmds.c
--- b/src/backend/commands/typecmds.c
***************
*** 86,91 **** static Oid findTypeSendFunction(List *procname, Oid typeOid);
--- 86,92 ---- static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodoutFunction(List
*procname);static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid);
+ static void validateDomainConstraint(Oid domainoid, char *ccbin); static List *get_rels_with_domain(Oid domainOid,
LOCKMODElockmode); static void checkDomainOwner(HeapTuple tup); static void checkEnumOwner(HeapTuple tup);
***************
*** 1941,1954 **** AlterDomainAddConstraint(List *names, Node *newConstraint) Relation typrel; HeapTuple
tup; Form_pg_type typTup;
- List *rels;
- ListCell *rt;
- EState *estate;
- ExprContext *econtext;
- char *ccbin;
- Expr *expr;
- ExprState *exprstate; Constraint *constr; /* Make a TypeName so we can use standard type lookup
machinery*/ typename = makeTypeNameFromNameList(names);
--- 1942,1949 ---- Relation typrel; HeapTuple tup; Form_pg_type typTup; Constraint *constr;
+ char *ccbin; /* Make a TypeName so we can use standard type lookup machinery */ typename =
makeTypeNameFromNameList(names);
***************
*** 2027,2036 **** AlterDomainAddConstraint(List *names, Node *newConstraint) constr,
NameStr(typTup->typname)); /*
! * Test all values stored in the attributes based on the domain the
! * constraint is being added to. */
! expr = (Expr *) stringToNode(ccbin); /* Need an EState to run ExecEvalExpr */ estate =
CreateExecutorState();
--- 2022,2150 ---- constr, NameStr(typTup->typname)); /*
! * If requested to validate the constraint, test all values stored in the
! * attributes based on the domain the constraint is being added to. */
! if (!constr->skip_validation)
! validateDomainConstraint(domainoid, ccbin);
!
! /* Clean up */
! heap_close(typrel, RowExclusiveLock);
! }
!
! /*
! * AlterDomainValidateConstraint
! *
! * Implements the ALTER DOMAIN .. VALIDATE CONSTRAINT statement.
! */
! void
! AlterDomainValidateConstraint(List *names, char *constrName)
! {
! TypeName *typename;
! Oid domainoid;
! Relation typrel;
! Relation conrel;
! HeapTuple tup;
! Form_pg_type typTup;
! Form_pg_constraint con;
! Form_pg_constraint copy_con;
! char *conbin;
! SysScanDesc scan;
! Datum val;
! bool found = false;
! bool isnull;
! HeapTuple tuple;
! HeapTuple copyTuple;
! ScanKeyData key;
!
! /* Make a TypeName so we can use standard type lookup machinery */
! typename = makeTypeNameFromNameList(names);
! domainoid = typenameTypeId(NULL, typename);
!
! /* Look up the domain in the type table */
! typrel = heap_open(TypeRelationId, AccessShareLock);
!
! tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainoid));
! if (!HeapTupleIsValid(tup))
! elog(ERROR, "cache lookup failed for type %u", domainoid);
! typTup = (Form_pg_type) GETSTRUCT(tup);
!
! /* Check it's a domain and check user has permission for ALTER DOMAIN */
! checkDomainOwner(tup);
!
! /*
! * Find and check the target constraint
! */
! conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
! ScanKeyInit(&key,
! Anum_pg_constraint_contypid,
! BTEqualStrategyNumber, F_OIDEQ,
! ObjectIdGetDatum(domainoid));
! scan = systable_beginscan(conrel, ConstraintTypidIndexId,
! true, SnapshotNow, 1, &key);
!
! while (HeapTupleIsValid(tuple = systable_getnext(scan)))
! {
! con = (Form_pg_constraint) GETSTRUCT(tuple);
! if (strcmp(NameStr(con->conname), constrName) == 0)
! {
! found = true;
! break;
! }
! }
!
! if (!found)
! {
! con = NULL; /* keep compiler quiet */
! ereport(ERROR,
! (errcode(ERRCODE_UNDEFINED_OBJECT),
! errmsg("constraint \"%s\" of domain \"%s\" does not exist",
! constrName, NameStr(con->conname))));
! }
!
! if (con->contype != CONSTRAINT_CHECK)
! ereport(ERROR,
! (errcode(ERRCODE_WRONG_OBJECT_TYPE),
! errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint",
! constrName, NameStr(con->conname))));
!
! val = SysCacheGetAttr(CONSTROID, tuple,
! Anum_pg_constraint_conbin,
! &isnull);
! if (isnull)
! elog(ERROR, "null conbin for constraint %u",
! HeapTupleGetOid(tuple));
! conbin = TextDatumGetCString(val);
!
! validateDomainConstraint(domainoid, conbin);
!
! /*
! * Now update the catalog, while we have the door open.
! */
! copyTuple = heap_copytuple(tuple);
! copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
! copy_con->convalidated = true;
! simple_heap_update(conrel, ©Tuple->t_self, copyTuple);
! CatalogUpdateIndexes(conrel, copyTuple);
! heap_freetuple(copyTuple);
!
! systable_endscan(scan);
!
! heap_close(typrel, AccessShareLock);
! heap_close(conrel, RowExclusiveLock);
!
! ReleaseSysCache(tup);
! }
!
! static void
! validateDomainConstraint(Oid domainoid, char *ccbin)
! {
! Expr *expr = (Expr *) stringToNode(ccbin);
! List *rels;
! ListCell *rt;
! EState *estate;
! ExprContext *econtext;
! ExprState *exprstate; /* Need an EState to run ExecEvalExpr */ estate = CreateExecutorState();
***************
*** 2092,2102 **** AlterDomainAddConstraint(List *names, Node *newConstraint) } FreeExecutorState(estate);
-
- /* Clean up */
- heap_close(typrel, RowExclusiveLock); }
- /* * get_rels_with_domain *
--- 2206,2212 ----
***************
*** 2416,2422 **** domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
CONSTRAINT_CHECK, /* Constraint Type */ false, /* Is Deferrable */
false, /* Is Deferred */
! true, /* Is Validated */ InvalidOid, /* not a relation
constraint*/ NULL, 0,
--- 2526,2532 ---- CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */ false, /* Is Deferred */
! !constr->skip_validation, /* Is Validated */ InvalidOid, /*
nota relation constraint */ NULL, 0,
*** a/src/backend/optimizer/util/plancat.c
--- b/src/backend/optimizer/util/plancat.c
***************
*** 552,558 **** get_relation_data_width(Oid relid, int32 *attr_widths) /* * get_relation_constraints *
! * Retrieve the CHECK constraint expressions of the given relation. * * Returns a List (possibly empty) of
constraintexpressions. Each one * has been canonicalized, and its Vars are changed to have the varno
--- 552,558 ---- /* * get_relation_constraints *
! * Retrieve the validated CHECK constraint expressions of the given relation. * * Returns a List (possibly empty)
ofconstraint expressions. Each one * has been canonicalized, and its Vars are changed to have the varno
***************
*** 591,596 **** get_relation_constraints(PlannerInfo *root,
--- 591,603 ---- { Node *cexpr;
+ /*
+ * If this constraint hasn't been fully validated yet, we must
+ * ignore it here.
+ */
+ if (!constr->check[i].ccvalid)
+ continue;
+ cexpr = stringToNode(constr->check[i].ccbin); /*
***************
*** 663,669 **** get_relation_constraints(PlannerInfo *root, * * Detect whether the relation need not be scanned
becauseit has either * self-inconsistent restrictions, or restrictions inconsistent with the
! * relation's CHECK constraints. * * Note: this examines only rel->relid, rel->reloptkind, and *
rel->baserestrictinfo;therefore it can be called before filling in
--- 670,676 ---- * * Detect whether the relation need not be scanned because it has either * self-inconsistent
restrictions,or restrictions inconsistent with the
! * relation's validated CHECK constraints. * * Note: this examines only rel->relid, rel->reloptkind, and *
rel->baserestrictinfo;therefore it can be called before filling in
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 2746,2751 **** ConstraintElem:
--- 2746,2753 ---- n->location = @1; n->raw_expr = $3;
n->cooked_expr= NULL;
+ n->skip_validation = false;
+ n->initially_valid = true; if ($5 != 0)
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
***************
*** 2753,2758 **** ConstraintElem:
--- 2755,2771 ---- parser_errposition(@5))); $$ = (Node *)n;
}
+ | CHECK '(' a_expr ')' NOT VALID
+ {
+ Constraint *n = makeNode(Constraint);
+ n->contype = CONSTR_CHECK;
+ n->location = @1;
+ n->raw_expr = $3;
+ n->cooked_expr = NULL;
+ n->skip_validation = true;
+ n->initially_valid = false;
+ $$ = (Node *)n;
+ } | UNIQUE '(' columnList ')' opt_definition OptConsTableSpace
ConstraintAttributeSpec {
***************
*** 7563,7568 **** AlterDomainStmt:
--- 7576,7590 ---- n->behavior = $7; $$ = (Node *)n; }
+ /* ALTER DOMAIN <domain> VALIDATE CONSTRAINT <name> */
+ | ALTER DOMAIN_P any_name VALIDATE CONSTRAINT name
+ {
+ AlterDomainStmt *n = makeNode(AlterDomainStmt);
+ n->subtype = 'V';
+ n->typeName = $3;
+ n->name = $6;
+ $$ = (Node *)n;
+ } ; opt_as: AS {}
*** a/src/backend/tcop/utility.c
--- b/src/backend/tcop/utility.c
***************
*** 820,825 **** standard_ProcessUtility(Node *parsetree,
--- 820,829 ---- stmt->name,
stmt->behavior); break;
+ case 'V': /* VALIDATE CONSTRAINT */
+ AlterDomainValidateConstraint(stmt->typeName,
+ stmt->name);
+ break; default: /* oops */ elog(ERROR,
"unrecognizedalter domain type: %d", (int) stmt->subtype);
*** a/src/backend/utils/cache/relcache.c
--- b/src/backend/utils/cache/relcache.c
***************
*** 3251,3256 **** CheckConstraintFetch(Relation relation)
--- 3251,3257 ---- elog(ERROR, "unexpected constraint record found for rel %s",
RelationGetRelationName(relation));
+ check[found].ccvalid = conform->convalidated; check[found].ccname =
MemoryContextStrdup(CacheMemoryContext, NameStr(conform->conname));
*** a/src/include/access/tupdesc.h
--- b/src/include/access/tupdesc.h
***************
*** 29,34 **** typedef struct constrCheck
--- 29,35 ---- { char *ccname; char *ccbin; /* nodeToString representation of expr */
+ bool ccvalid; } ConstrCheck; /* This structure contains constraints of a tuple */
*** a/src/include/catalog/heap.h
--- b/src/include/catalog/heap.h
***************
*** 30,35 **** typedef struct CookedConstraint
--- 30,36 ---- char *name; /* name, or NULL if none */ AttrNumber attnum; /*
whichattr (only for DEFAULT) */ Node *expr; /* transformed default or check expr */
+ bool skip_validation; /* skip validation? (only for CHECK) */ bool is_local; /*
constrainthas local (non-inherited) def */ int inhcount; /* number of times constraint is
inherited*/ } CookedConstraint;
*** a/src/include/commands/typecmds.h
--- b/src/include/commands/typecmds.h
***************
*** 31,36 **** extern Oid AssignTypeArrayOid(void);
--- 31,37 ---- extern void AlterDomainDefault(List *names, Node *defaultRaw); extern void AlterDomainNotNull(List
*names,bool notNull); extern void AlterDomainAddConstraint(List *names, Node *constr);
+ extern void AlterDomainValidateConstraint(List *names, char *constrName); extern void AlterDomainDropConstraint(List
*names,const char *constrName, DropBehavior behavior);
*** a/src/include/nodes/parsenodes.h
--- b/src/include/nodes/parsenodes.h
***************
*** 1190,1195 **** typedef enum AlterTableType
--- 1190,1196 ---- AT_AddConstraint, /* add constraint */ AT_AddConstraintRecurse, /* internal to
commands/tablecmds.c*/ AT_ValidateConstraint, /* validate constraint */
+ AT_ValidateConstraintRecurse, /* internal to commands/tablecmds.c */ AT_ProcessedConstraint, /*
pre-processedadd constraint (local in * parser/parse_utilcmd.c) */
AT_AddIndexConstraint, /* add constraint using existing index */
***************
*** 1543,1548 **** typedef struct Constraint
--- 1544,1551 ---- char fk_matchtype; /* FULL, PARTIAL, UNSPECIFIED */ char fk_upd_action;
/*ON UPDATE action */ char fk_del_action; /* ON DELETE action */
+
+ /* Fields used for constraints that allow a NOT VALID specification */ bool skip_validation; /*
skipvalidation of existing rows? */ bool initially_valid; /* start the new constraint as valid */ }
Constraint;
*** a/src/test/regress/expected/alter_table.out
--- b/src/test/regress/expected/alter_table.out
***************
*** 196,205 **** DELETE FROM tmp3 where a=5;
--- 196,241 ---- -- Try (and succeed) and repeat to show it works on already valid constraint ALTER TABLE tmp3 validate
constrainttmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr;
+ -- Try a non-verified CHECK constraint
+ ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail
+ ERROR: check constraint "b_greater_than_ten" is violated by some row
+ ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails
+ ERROR: check constraint "b_greater_than_ten" is violated by some row
+ DELETE FROM tmp3 WHERE NOT b > 10;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
+ -- Test inherited NOT VALID CHECK constraints
+ select * from tmp3;
+ a | b
+ ---+----
+ 1 | 20
+ (1 row)
+
+ CREATE TABLE tmp6 () INHERITS (tmp3);
+ CREATE TABLE tmp7 () INHERITS (tmp3);
+ INSERT INTO tmp6 VALUES (6, 30), (7, 16);
+ ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails
+ ERROR: check constraint "b_le_20" is violated by some row
+ DELETE FROM tmp6 WHERE b > 20;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds
+ -- An already validated constraint must not be revalidated
+ CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN
$1;END; $$;
+ INSERT INTO tmp7 VALUES (8, 18);
+ ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b));
+ NOTICE: boo: 18
+ ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID;
+ NOTICE: merging constraint "identity" with inherited definition
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT identity;
+ NOTICE: boo: 16
+ NOTICE: boo: 20 -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- tmp4 is a,b
ALTERTABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; ERROR: there is no unique
constraintmatching given keys for referenced table "tmp4"
+ DROP TABLE tmp7;
+ DROP TABLE tmp6; DROP TABLE tmp5; DROP TABLE tmp4; DROP TABLE tmp3;
*** a/src/test/regress/expected/domain.out
--- b/src/test/regress/expected/domain.out
***************
*** 352,357 **** alter domain con drop constraint t;
--- 352,368 ---- insert into domcontest values (-5); --fails ERROR: value for domain con violates check constraint
"con_check"insert into domcontest values (42);
+ -- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID
+ create domain things AS INT;
+ CREATE TABLE thethings (stuff things);
+ INSERT INTO thethings (stuff) VALUES (55);
+ ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11);
+ ERROR: column "stuff" of table "thethings" contains values that violate the new constraint
+ ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID;
+ ALTER DOMAIN things VALIDATE CONSTRAINT meow;
+ ERROR: column "stuff" of table "thethings" contains values that violate the new constraint
+ UPDATE thethings SET stuff = 10;
+ ALTER DOMAIN things VALIDATE CONSTRAINT meow; -- Confirm ALTER DOMAIN with RULES. create table domtab (col1 integer);
createdomain dom as integer;
*** a/src/test/regress/sql/alter_table.sql
--- b/src/test/regress/sql/alter_table.sql
***************
*** 236,247 **** DELETE FROM tmp3 where a=5;
--- 236,276 ---- ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr;
+ -- Try a non-verified CHECK constraint
+ ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail
+ ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails
+ DELETE FROM tmp3 WHERE NOT b > 10;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
+
+ -- Test inherited NOT VALID CHECK constraints
+ select * from tmp3;
+ CREATE TABLE tmp6 () INHERITS (tmp3);
+ CREATE TABLE tmp7 () INHERITS (tmp3);
+
+ INSERT INTO tmp6 VALUES (6, 30), (7, 16);
+ ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails
+ DELETE FROM tmp6 WHERE b > 20;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds
+
+ -- An already validated constraint must not be revalidated
+ CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN
$1;END; $$;
+ INSERT INTO tmp7 VALUES (8, 18);
+ ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b));
+ ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID;
+ ALTER TABLE tmp3 VALIDATE CONSTRAINT identity; -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) -
uniqueconstraint on -- tmp4 is a,b ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match
full;
+ DROP TABLE tmp7;
+
+ DROP TABLE tmp6;
+ DROP TABLE tmp5; DROP TABLE tmp4;
*** a/src/test/regress/sql/domain.sql
--- b/src/test/regress/sql/domain.sql
***************
*** 259,264 **** alter domain con drop constraint t;
--- 259,274 ---- insert into domcontest values (-5); --fails insert into domcontest values (42);
+ -- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID
+ create domain things AS INT;
+ CREATE TABLE thethings (stuff things);
+ INSERT INTO thethings (stuff) VALUES (55);
+ ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11);
+ ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID;
+ ALTER DOMAIN things VALIDATE CONSTRAINT meow;
+ UPDATE thethings SET stuff = 10;
+ ALTER DOMAIN things VALIDATE CONSTRAINT meow;
+ -- Confirm ALTER DOMAIN with RULES. create table domtab (col1 integer); create domain dom as integer;
--
Álvaro Herrera <alvherre@commandprompt.com>
The PostgreSQL Company - Command Prompt, Inc.
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
В списке pgsql-hackers по дате отправления: