Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE
Дата
Msg-id 1727369.1597951104@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE  (Michael Paquier <michael@paquier.xyz>)
Список pgsql-bugs
I wrote:
> I did some more investigation of this today.  There are at least three
> related bugs in this vicinity:
> * Vars in CHECK constraints coming from LIKE ... INCLUDING CONSTRAINTS
> are not renumbered when they need to be.
> * Vars in GENERATED expressions coming from LIKE ... INCLUDING GENERATED
> are not renumbered when they need to be.
> * Vars in GENERATED expressions coming from inheritance parents
> are not renumbered when they need to be.
> The attached proposed patch fixes those three things.  But it fails
> to fix the initially-complained-of problem with indexes, because the
> indexes are not part of the CreateStmt data structure received by
> DefineRelation.  Still, this is progress towards that, because at
> least we are now building an AttrMap that would be appropriate to use
> for that purpose.  The remaining problem is to be able to apply that
> AttrMap to the IndexStmt(s).
> One idea is to pass in the list of statements generated by
> transformCreateStmt to DefineRelation and let it hack on them.
> That seems like an enormous violation of modularity, though.
> Another, perhaps marginally cleaner, answer is to pass back
> the AttrMap and let ProcessUtility hack up the IndexStmts
> as it loops over the list.

I thought of a somewhat less messy answer, which is to divide the
existing transformTableLikeClause processing into two phases, one of
which is delayed until after we've performed DefineRelation.  At that
point it's no problem to convert everything from the LIKE parent
table's attnums to the new child's.  MergeAttributes still has to
fix up GENERATED expressions passed down from traditional-inheritance
parents, but that does not require abusing its API like the previous
patch did.

So attached is a complete fix for the issues discussed in this thread.

Although I originally felt that we weren't going to be able to create
a back-patchable fix, this seems much less invasive than my prior
attempt, so I think it might be reasonable to back-patch.  The only
external API break is the new AT_CookedColumnDefault subcommand,
which we could stick at the end of the enum in stable branches.

Thoughts?

            regards, tom lane

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index cd989c95e5..790c09c522 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -405,6 +405,8 @@ static bool ConstraintImpliedByRelConstraint(Relation scanrel,
                                              List *testConstraint, List *provenConstraint);
 static ObjectAddress ATExecColumnDefault(Relation rel, const char *colName,
                                          Node *newDefault, LOCKMODE lockmode);
+static ObjectAddress ATExecCookedColumnDefault(Relation rel, AttrNumber attnum,
+                                               Node *newDefault);
 static ObjectAddress ATExecAddIdentity(Relation rel, const char *colName,
                                        Node *def, LOCKMODE lockmode);
 static ObjectAddress ATExecSetIdentity(Relation rel, const char *colName,
@@ -2054,8 +2056,8 @@ storage_name(char c)
  * 'schema' is the column/attribute definition for the table. (It's a list
  *        of ColumnDef's.) It is destructively changed.
  * 'supers' is a list of OIDs of parent relations, already locked by caller.
- * 'relpersistence' is a persistence type of the table.
- * 'is_partition' tells if the table is a partition
+ * 'relpersistence' is the persistence type of the table.
+ * 'is_partition' tells if the table is a partition.
  *
  * Output arguments:
  * 'supconstr' receives a list of constraints belonging to the parents,
@@ -2218,7 +2220,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
         TupleDesc    tupleDesc;
         TupleConstr *constr;
         AttrMap    *newattmap;
+        List       *inherited_defaults;
+        List       *cols_with_defaults;
         AttrNumber    parent_attno;
+        ListCell   *lc1;
+        ListCell   *lc2;

         /* caller already got lock */
         relation = table_open(parent, NoLock);
@@ -2304,6 +2310,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
          */
         newattmap = make_attrmap(tupleDesc->natts);

+        /* We can't process inherited defaults until newattmap is complete. */
+        inherited_defaults = cols_with_defaults = NIL;
+
         for (parent_attno = 1; parent_attno <= tupleDesc->natts;
              parent_attno++)
         {
@@ -2359,7 +2368,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                                        get_collation_name(defCollId),
                                        get_collation_name(attribute->attcollation))));

-                /* Copy storage parameter */
+                /* Copy/check storage parameter */
                 if (def->storage == 0)
                     def->storage = attribute->attstorage;
                 else if (def->storage != attribute->attstorage)
@@ -2410,7 +2419,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
             }

             /*
-             * Copy default if any
+             * Locate default if any
              */
             if (attribute->atthasdef)
             {
@@ -2432,23 +2441,59 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                 Assert(this_default != NULL);

                 /*
-                 * If default expr could contain any vars, we'd need to fix
-                 * 'em, but it can't; so default is ready to apply to child.
-                 *
-                 * If we already had a default from some prior parent, check
-                 * to see if they are the same.  If so, no problem; if not,
-                 * mark the column as having a bogus default. Below, we will
-                 * complain if the bogus default isn't overridden by the child
-                 * schema.
+                 * If it's a GENERATED default, it might contain Vars that
+                 * need to be mapped to the inherited column(s)' new numbers.
+                 * We can't do that till newattmap is ready, so just remember
+                 * all the inherited default expressions for the moment.
                  */
-                Assert(def->raw_default == NULL);
-                if (def->cooked_default == NULL)
-                    def->cooked_default = this_default;
-                else if (!equal(def->cooked_default, this_default))
-                {
-                    def->cooked_default = &bogus_marker;
-                    have_bogus_defaults = true;
-                }
+                inherited_defaults = lappend(inherited_defaults, this_default);
+                cols_with_defaults = lappend(cols_with_defaults, def);
+            }
+        }
+
+        /*
+         * Now process any inherited default expressions, adjusting attnos
+         * using the completed newattmap map.
+         */
+        forboth(lc1, inherited_defaults, lc2, cols_with_defaults)
+        {
+            Node       *this_default = (Node *) lfirst(lc1);
+            ColumnDef  *def = (ColumnDef *) lfirst(lc2);
+            bool        found_whole_row;
+
+            /* Adjust Vars to match new table's column numbering */
+            this_default = map_variable_attnos(this_default,
+                                               1, 0,
+                                               newattmap,
+                                               InvalidOid, &found_whole_row);
+
+            /*
+             * For the moment we have to reject whole-row variables.  We could
+             * convert them, if we knew the new table's rowtype OID, but that
+             * hasn't been assigned yet.  (A variable could only appear in a
+             * generation expression, so the error message is correct.)
+             */
+            if (found_whole_row)
+                ereport(ERROR,
+                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                         errmsg("cannot convert whole-row table reference"),
+                         errdetail("Generation expression for column \"%s\" contains a whole-row reference to table
\"%s\".",
+                                   def->colname,
+                                   RelationGetRelationName(relation))));
+
+            /*
+             * If we already had a default from some prior parent, check to
+             * see if they are the same.  If so, no problem; if not, mark the
+             * column as having a bogus default.  Below, we will complain if
+             * the bogus default isn't overridden by the child schema.
+             */
+            Assert(def->raw_default == NULL);
+            if (def->cooked_default == NULL)
+                def->cooked_default = this_default;
+            else if (!equal(def->cooked_default, this_default))
+            {
+                def->cooked_default = &bogus_marker;
+                have_bogus_defaults = true;
             }
         }

@@ -2667,7 +2712,6 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                     def->raw_default = newdef->raw_default;
                     def->cooked_default = newdef->cooked_default;
                 }
-
             }
             else
             {
@@ -3781,6 +3825,7 @@ AlterTableGetLockLevel(List *cmds)
                  * Theoretically, these could be ShareRowExclusiveLock.
                  */
             case AT_ColumnDefault:
+            case AT_CookedColumnDefault:
             case AT_AlterConstraint:
             case AT_AddIndex:    /* from ADD CONSTRAINT */
             case AT_AddIndexConstraint:
@@ -4040,6 +4085,13 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
             /* No command-specific prep needed */
             pass = cmd->def ? AT_PASS_ADD_OTHERCONSTR : AT_PASS_DROP;
             break;
+        case AT_CookedColumnDefault:    /* add a pre-cooked default */
+            /* This is currently used only in CREATE TABLE */
+            /* (so the permission check really isn't necessary) */
+            ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
+            /* This command never recurses */
+            pass = AT_PASS_ADD_OTHERCONSTR;
+            break;
         case AT_AddIdentity:
             ATSimplePermissions(rel, ATT_TABLE | ATT_VIEW | ATT_FOREIGN_TABLE);
             /* This command never recurses */
@@ -4398,6 +4450,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
         case AT_ColumnDefault:    /* ALTER COLUMN DEFAULT */
             address = ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode);
             break;
+        case AT_CookedColumnDefault:    /* add a pre-cooked default */
+            address = ATExecCookedColumnDefault(rel, cmd->num, cmd->def);
+            break;
         case AT_AddIdentity:
             cmd = ATParseTransformCmd(wqueue, tab, rel, cmd, false, lockmode,
                                       cur_pass, context);
@@ -6859,6 +6914,35 @@ ATExecColumnDefault(Relation rel, const char *colName,
     return address;
 }

+/*
+ * Add a pre-cooked default expression.
+ *
+ * Return the address of the affected column.
+ */
+static ObjectAddress
+ATExecCookedColumnDefault(Relation rel, AttrNumber attnum,
+                          Node *newDefault)
+{
+    ObjectAddress address;
+
+    /* We assume no checking is required */
+
+    /*
+     * Remove any old default for the column.  We use RESTRICT here for
+     * safety, but at present we do not expect anything to depend on the
+     * default.  (In ordinary cases, there could not be a default in place
+     * anyway, but it's possible when combining LIKE with inheritance.)
+     */
+    RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false,
+                      true);
+
+    (void) StoreAttrDefault(rel, attnum, newDefault, true, false);
+
+    ObjectAddressSubSet(address, RelationRelationId,
+                        RelationGetRelid(rel), attnum);
+    return address;
+}
+
 /*
  * ALTER TABLE ALTER COLUMN ADD IDENTITY
  *
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 25abc544fc..6a73cc4702 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -86,7 +86,6 @@ typedef struct
     List       *ckconstraints;    /* CHECK constraints */
     List       *fkconstraints;    /* FOREIGN KEY constraints */
     List       *ixconstraints;    /* index-creating constraints */
-    List       *inh_indexes;    /* cloned indexes from INCLUDING INDEXES */
     List       *extstats;        /* cloned extended statistics */
     List       *blist;            /* "before list" of things to do before
                                  * creating the table */
@@ -241,7 +240,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
     cxt.ckconstraints = NIL;
     cxt.fkconstraints = NIL;
     cxt.ixconstraints = NIL;
-    cxt.inh_indexes = NIL;
     cxt.extstats = NIL;
     cxt.blist = NIL;
     cxt.alist = NIL;
@@ -917,18 +915,18 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
  * transformTableLikeClause
  *
  * Change the LIKE <srctable> portion of a CREATE TABLE statement into
- * column definitions which recreate the user defined column portions of
- * <srctable>.
+ * column definitions that recreate the user defined column portions of
+ * <srctable>.  Also, if there are any LIKE options that we can't fully
+ * process at this point, add the TableLikeClause to cxt->alist, which
+ * will cause utility.c to call expandTableLikeClause() after the new
+ * table has been created.
  */
 static void
 transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_clause)
 {
     AttrNumber    parent_attno;
-    AttrNumber    new_attno;
     Relation    relation;
     TupleDesc    tupleDesc;
-    TupleConstr *constr;
-    AttrMap    *attmap;
     AclResult    aclresult;
     char       *comment;
     ParseCallbackState pcbstate;
@@ -942,6 +940,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                  errmsg("LIKE is not supported for creating foreign tables")));

+    /* Open the relation referenced by the LIKE clause */
     relation = relation_openrv(table_like_clause->relation, AccessShareLock);

     if (relation->rd_rel->relkind != RELKIND_RELATION &&
@@ -978,37 +977,11 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
     }

     tupleDesc = RelationGetDescr(relation);
-    constr = tupleDesc->constr;
-
-    /*
-     * Initialize column number map for map_variable_attnos().  We need this
-     * since dropped columns in the source table aren't copied, so the new
-     * table can have different column numbers.
-     */
-    attmap = make_attrmap(tupleDesc->natts);
-
-    /*
-     * We must fill the attmap now so that it can be used to process generated
-     * column default expressions in the per-column loop below.
-     */
-    new_attno = 1;
-    for (parent_attno = 1; parent_attno <= tupleDesc->natts;
-         parent_attno++)
-    {
-        Form_pg_attribute attribute = TupleDescAttr(tupleDesc,
-                                                    parent_attno - 1);
-
-        /*
-         * Ignore dropped columns in the parent.  attmap entry is left zero.
-         */
-        if (attribute->attisdropped)
-            continue;
-
-        attmap->attnums[parent_attno - 1] = list_length(cxt->columns) + (new_attno++);
-    }

     /*
      * Insert the copied attributes into the cxt for the new table definition.
+     * We must do this now so that they appear in the table in the relative
+     * position where the LIKE clause is, as required by SQL99.
      */
     for (parent_attno = 1; parent_attno <= tupleDesc->natts;
          parent_attno++)
@@ -1052,52 +1025,12 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
         cxt->columns = lappend(cxt->columns, def);

         /*
-         * Copy default, if present and it should be copied.  We have separate
-         * options for plain default expressions and GENERATED defaults.
+         * Although we don't transfer the column's default/generation
+         * expression now, we need to mark it GENERATED if appropriate.
          */
-        if (attribute->atthasdef &&
-            (attribute->attgenerated ?
-             (table_like_clause->options & CREATE_TABLE_LIKE_GENERATED) :
-             (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS)))
-        {
-            Node       *this_default = NULL;
-            AttrDefault *attrdef;
-            int            i;
-            bool        found_whole_row;
-
-            /* Find default in constraint structure */
-            Assert(constr != NULL);
-            attrdef = constr->defval;
-            for (i = 0; i < constr->num_defval; i++)
-            {
-                if (attrdef[i].adnum == parent_attno)
-                {
-                    this_default = stringToNode(attrdef[i].adbin);
-                    break;
-                }
-            }
-            Assert(this_default != NULL);
-
-            def->cooked_default = map_variable_attnos(this_default,
-                                                      1, 0,
-                                                      attmap,
-                                                      InvalidOid, &found_whole_row);
-
-            /*
-             * Prevent this for the same reason as for constraints below. Note
-             * that defaults cannot contain any vars, so it's OK that the
-             * error message refers to generated columns.
-             */
-            if (found_whole_row)
-                ereport(ERROR,
-                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                         errmsg("cannot convert whole-row table reference"),
-                         errdetail("Generation expression for column \"%s\" contains a whole-row reference to table
\"%s\".",
-                                   attributeName,
-                                   RelationGetRelationName(relation))));
-
+        if (attribute->atthasdef && attribute->attgenerated &&
+            (table_like_clause->options & CREATE_TABLE_LIKE_GENERATED))
             def->generated = attribute->attgenerated;
-        }

         /*
          * Copy identity if requested
@@ -1145,14 +1078,191 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
         }
     }

+    /*
+     * We cannot yet deal with defaults, CHECK constraints, or indexes, since
+     * we don't yet know what column numbers the copied columns will have in
+     * the finished table.  If any of those options are specified, add the
+     * LIKE clause to cxt->alist so that expandTableLikeClause will be called
+     * after we do know that.
+     */
+    if (table_like_clause->options &
+        (CREATE_TABLE_LIKE_DEFAULTS |
+         CREATE_TABLE_LIKE_GENERATED |
+         CREATE_TABLE_LIKE_CONSTRAINTS |
+         CREATE_TABLE_LIKE_INDEXES))
+        cxt->alist = lappend(cxt->alist, table_like_clause);
+
+    /*
+     * We may copy extended statistics if requested, since the representation
+     * of CreateStatsStmt doesn't depend on column numbers.
+     */
+    if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS)
+    {
+        List       *parent_extstats;
+        ListCell   *l;
+
+        parent_extstats = RelationGetStatExtList(relation);
+
+        foreach(l, parent_extstats)
+        {
+            Oid            parent_stat_oid = lfirst_oid(l);
+            CreateStatsStmt *stats_stmt;
+
+            stats_stmt = generateClonedExtStatsStmt(cxt->relation,
+                                                    RelationGetRelid(relation),
+                                                    parent_stat_oid);
+
+            /* Copy comment on statistics object, if requested */
+            if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
+            {
+                comment = GetComment(parent_stat_oid, StatisticExtRelationId, 0);
+
+                /*
+                 * We make use of CreateStatsStmt's stxcomment option, so as
+                 * not to need to know now what name the statistics will have.
+                 */
+                stats_stmt->stxcomment = comment;
+            }
+
+            cxt->extstats = lappend(cxt->extstats, stats_stmt);
+        }
+
+        list_free(parent_extstats);
+    }
+
+    /*
+     * Close the parent rel, but keep our AccessShareLock on it until xact
+     * commit.  That will prevent someone else from deleting or ALTERing the
+     * parent before we can run expandTableLikeClause.
+     */
+    table_close(relation, NoLock);
+}
+
+/*
+ * expandTableLikeClause
+ *
+ * Process LIKE options that require knowing the final column numbers
+ * assigned to the new table's columns.  This executes after we have
+ * run DefineRelation for the new table.  It returns a list of utility
+ * commands that should be run to generate indexes etc.
+ */
+List *
+expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
+{
+    List       *result = NIL;
+    List       *atsubcmds = NIL;
+    AttrNumber    parent_attno;
+    Relation    relation;
+    Relation    childrel;
+    TupleDesc    tupleDesc;
+    TupleConstr *constr;
+    AttrMap    *attmap;
+    char       *comment;
+
+    /*
+     * Open the relation referenced by the LIKE clause.  We should still have
+     * the table lock obtained by transformTableLikeClause (and this'll throw
+     * an assertion failure if not).  Hence, no need to recheck privileges
+     * etc.
+     */
+    relation = relation_openrv(table_like_clause->relation, NoLock);
+
+    tupleDesc = RelationGetDescr(relation);
+    constr = tupleDesc->constr;
+
+    /*
+     * Open the newly-created child relation; we have lock on that too.
+     */
+    childrel = relation_openrv(heapRel, NoLock);
+
+    /*
+     * Construct a map from the LIKE relation's attnos to the child rel's.
+     * This re-checks type match etc, although it shouldn't be possible to
+     * have a failure since both tables are locked.
+     */
+    attmap = build_attrmap_by_name(RelationGetDescr(childrel),
+                                   tupleDesc);
+
+    /*
+     * Process defaults, if required.
+     */
+    if ((table_like_clause->options &
+         (CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED)) &&
+        constr != NULL)
+    {
+        AttrDefault *attrdef = constr->defval;
+
+        for (parent_attno = 1; parent_attno <= tupleDesc->natts;
+             parent_attno++)
+        {
+            Form_pg_attribute attribute = TupleDescAttr(tupleDesc,
+                                                        parent_attno - 1);
+
+            /*
+             * Ignore dropped columns in the parent.
+             */
+            if (attribute->attisdropped)
+                continue;
+
+            /*
+             * Copy default, if present and it should be copied.  We have
+             * separate options for plain default expressions and GENERATED
+             * defaults.
+             */
+            if (attribute->atthasdef &&
+                (attribute->attgenerated ?
+                 (table_like_clause->options & CREATE_TABLE_LIKE_GENERATED) :
+                 (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS)))
+            {
+                Node       *this_default = NULL;
+                AlterTableCmd *atsubcmd;
+                bool        found_whole_row;
+
+                /* Find default in constraint structure */
+                for (int i = 0; i < constr->num_defval; i++)
+                {
+                    if (attrdef[i].adnum == parent_attno)
+                    {
+                        this_default = stringToNode(attrdef[i].adbin);
+                        break;
+                    }
+                }
+                Assert(this_default != NULL);
+
+                atsubcmd = makeNode(AlterTableCmd);
+                atsubcmd->subtype = AT_CookedColumnDefault;
+                atsubcmd->num = attmap->attnums[parent_attno - 1];
+                atsubcmd->def = map_variable_attnos(this_default,
+                                                    1, 0,
+                                                    attmap,
+                                                    InvalidOid,
+                                                    &found_whole_row);
+
+                /*
+                 * Prevent this for the same reason as for constraints below.
+                 * Note that defaults cannot contain any vars, so it's OK that
+                 * the error message refers to generated columns.
+                 */
+                if (found_whole_row)
+                    ereport(ERROR,
+                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                             errmsg("cannot convert whole-row table reference"),
+                             errdetail("Generation expression for column \"%s\" contains a whole-row reference to
table\"%s\".", 
+                                       NameStr(attribute->attname),
+                                       RelationGetRelationName(relation))));
+
+                atsubcmds = lappend(atsubcmds, atsubcmd);
+            }
+        }
+    }
+
     /*
      * Copy CHECK constraints if requested, being careful to adjust attribute
      * numbers so they match the child.
      */
     if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
-        tupleDesc->constr)
+        constr != NULL)
     {
-        TupleConstr *constr = tupleDesc->constr;
         int            ccnum;

         for (ccnum = 0; ccnum < constr->num_check; ccnum++)
@@ -1160,9 +1270,10 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
             char       *ccname = constr->check[ccnum].ccname;
             char       *ccbin = constr->check[ccnum].ccbin;
             bool        ccnoinherit = constr->check[ccnum].ccnoinherit;
-            Constraint *n = makeNode(Constraint);
             Node       *ccbin_node;
             bool        found_whole_row;
+            Constraint *n;
+            AlterTableCmd *atsubcmd;

             ccbin_node = map_variable_attnos(stringToNode(ccbin),
                                              1, 0,
@@ -1183,13 +1294,22 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
                                    ccname,
                                    RelationGetRelationName(relation))));

+            n = makeNode(Constraint);
             n->contype = CONSTR_CHECK;
             n->conname = pstrdup(ccname);
             n->location = -1;
             n->is_no_inherit = ccnoinherit;
             n->raw_expr = NULL;
             n->cooked_expr = nodeToString(ccbin_node);
-            cxt->ckconstraints = lappend(cxt->ckconstraints, n);
+
+            /* We can skip validation, since the new table should be empty. */
+            n->skip_validation = true;
+            n->initially_valid = true;
+
+            atsubcmd = makeNode(AlterTableCmd);
+            atsubcmd->subtype = AT_AddConstraint;
+            atsubcmd->def = (Node *) n;
+            atsubcmds = lappend(atsubcmds, atsubcmd);

             /* Copy comment on constraint */
             if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) &&
@@ -1201,18 +1321,34 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
                 CommentStmt *stmt = makeNode(CommentStmt);

                 stmt->objtype = OBJECT_TABCONSTRAINT;
-                stmt->object = (Node *) list_make3(makeString(cxt->relation->schemaname),
-                                                   makeString(cxt->relation->relname),
+                stmt->object = (Node *) list_make3(makeString(heapRel->schemaname),
+                                                   makeString(heapRel->relname),
                                                    makeString(n->conname));
                 stmt->comment = comment;

-                cxt->alist = lappend(cxt->alist, stmt);
+                result = lappend(result, stmt);
             }
         }
     }

     /*
-     * Likewise, copy indexes if requested
+     * If we generated any ALTER TABLE actions above, wrap them into a single
+     * ALTER TABLE command.  Stick it at the front of the result, so it runs
+     * before any CommentStmts we made above.
+     */
+    if (atsubcmds)
+    {
+        AlterTableStmt *atcmd = makeNode(AlterTableStmt);
+
+        atcmd->relation = copyObject(heapRel);
+        atcmd->cmds = atsubcmds;
+        atcmd->objtype = OBJECT_TABLE;
+        atcmd->missing_ok = false;
+        result = lcons(atcmd, result);
+    }
+
+    /*
+     * Process indexes if required.
      */
     if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) &&
         relation->rd_rel->relhasindex)
@@ -1231,7 +1367,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
             parent_index = index_open(parent_index_oid, AccessShareLock);

             /* Build CREATE INDEX statement to recreate the parent_index */
-            index_stmt = generateClonedIndexStmt(cxt->relation,
+            index_stmt = generateClonedIndexStmt(heapRel,
                                                  parent_index,
                                                  attmap,
                                                  NULL);
@@ -1248,49 +1384,14 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
                 index_stmt->idxcomment = comment;
             }

-            /* Save it in the inh_indexes list for the time being */
-            cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
+            result = lappend(result, index_stmt);

             index_close(parent_index, AccessShareLock);
         }
     }

-    /*
-     * Likewise, copy extended statistics if requested
-     */
-    if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS)
-    {
-        List       *parent_extstats;
-        ListCell   *l;
-
-        parent_extstats = RelationGetStatExtList(relation);
-
-        foreach(l, parent_extstats)
-        {
-            Oid            parent_stat_oid = lfirst_oid(l);
-            CreateStatsStmt *stats_stmt;
-
-            stats_stmt = generateClonedExtStatsStmt(cxt->relation,
-                                                    RelationGetRelid(relation),
-                                                    parent_stat_oid);
-
-            /* Copy comment on statistics object, if requested */
-            if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
-            {
-                comment = GetComment(parent_stat_oid, StatisticExtRelationId, 0);
-
-                /*
-                 * We make use of CreateStatsStmt's stxcomment option, so as
-                 * not to need to know now what name the statistics will have.
-                 */
-                stats_stmt->stxcomment = comment;
-            }
-
-            cxt->extstats = lappend(cxt->extstats, stats_stmt);
-        }
-
-        list_free(parent_extstats);
-    }
+    /* Done with child rel */
+    table_close(childrel, NoLock);

     /*
      * Close the parent rel, but keep our AccessShareLock on it until xact
@@ -1298,6 +1399,8 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
      * parent before the child is committed.
      */
     table_close(relation, NoLock);
+
+    return result;
 }

 static void
@@ -1897,24 +2000,6 @@ transformIndexConstraints(CreateStmtContext *cxt)
         indexlist = lappend(indexlist, index);
     }

-    /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */
-    foreach(lc, cxt->inh_indexes)
-    {
-        index = (IndexStmt *) lfirst(lc);
-
-        if (index->primary)
-        {
-            if (cxt->pkey != NULL)
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-                         errmsg("multiple primary keys for table \"%s\" are not allowed",
-                                cxt->relation->relname)));
-            cxt->pkey = index;
-        }
-
-        indexlist = lappend(indexlist, index);
-    }
-
     /*
      * Scan the index list and remove any redundant index specifications. This
      * can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A
@@ -3115,7 +3200,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
     cxt.ckconstraints = NIL;
     cxt.fkconstraints = NIL;
     cxt.ixconstraints = NIL;
-    cxt.inh_indexes = NIL;
     cxt.extstats = NIL;
     cxt.blist = NIL;
     cxt.alist = NIL;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 9b0c376c8c..6154d2c8c6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1197,6 +1197,28 @@ ProcessUtilitySlow(ParseState *pstate,
                                                              secondaryObject,
                                                              stmt);
                         }
+                        else if (IsA(stmt, TableLikeClause))
+                        {
+                            /*
+                             * Do delayed processing of LIKE options.  This
+                             * will result in additional sub-statements for us
+                             * to process.  We can just tack those onto the
+                             * to-do list.
+                             */
+                            TableLikeClause *like = (TableLikeClause *) stmt;
+                            RangeVar   *rv = ((CreateStmt *) parsetree)->relation;
+                            List       *morestmts;
+
+                            morestmts = expandTableLikeClause(rv, like);
+                            stmts = list_concat(stmts, morestmts);
+
+                            /*
+                             * We don't need a CCI now, besides which the "l"
+                             * list pointer is now possibly invalid, so just
+                             * skip the CCI test below.
+                             */
+                            continue;
+                        }
                         else
                         {
                             /*
@@ -1405,6 +1427,7 @@ ProcessUtilitySlow(ParseState *pstate,
                     IndexStmt  *stmt = (IndexStmt *) parsetree;
                     Oid            relid;
                     LOCKMODE    lockmode;
+                    bool        is_alter_table;

                     if (stmt->concurrent)
                         PreventInTransactionBlock(isTopLevel,
@@ -1466,6 +1489,17 @@ ProcessUtilitySlow(ParseState *pstate,
                         list_free(inheritors);
                     }

+                    /*
+                     * If the IndexStmt is already transformed, it must have
+                     * come from generateClonedIndexStmt, which in current
+                     * usage means it came from expandTableLikeClause rather
+                     * than from original parse analysis.  And that means we
+                     * must treat it like ALTER TABLE ADD INDEX, not CREATE.
+                     * (This is a bit grotty, but currently it doesn't seem
+                     * worth adding a separate bool field for the purpose.)
+                     */
+                    is_alter_table = stmt->transformed;
+
                     /* Run parse analysis ... */
                     stmt = transformIndexStmt(relid, stmt, queryString);

@@ -1477,7 +1511,7 @@ ProcessUtilitySlow(ParseState *pstate,
                                     InvalidOid, /* no predefined OID */
                                     InvalidOid, /* no parent index */
                                     InvalidOid, /* no parent constraint */
-                                    false,    /* is_alter_table */
+                                    is_alter_table,
                                     true,    /* check_rights */
                                     true,    /* check_not_in_use */
                                     false,    /* skip_build */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 151bcdb7ef..47d4c07306 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1786,6 +1786,7 @@ typedef enum AlterTableType
     AT_AddColumnRecurse,        /* internal to commands/tablecmds.c */
     AT_AddColumnToView,            /* implicitly via CREATE OR REPLACE VIEW */
     AT_ColumnDefault,            /* alter column default */
+    AT_CookedColumnDefault,        /* add a pre-cooked column default */
     AT_DropNotNull,                /* alter column drop not null */
     AT_SetNotNull,                /* alter column set not null */
     AT_DropExpression,            /* alter column drop expression */
diff --git a/src/include/parser/parse_utilcmd.h b/src/include/parser/parse_utilcmd.h
index 1a5e0b83a7..bc3d66ed88 100644
--- a/src/include/parser/parse_utilcmd.h
+++ b/src/include/parser/parse_utilcmd.h
@@ -31,6 +31,8 @@ extern void transformRuleStmt(RuleStmt *stmt, const char *queryString,
 extern List *transformCreateSchemaStmt(CreateSchemaStmt *stmt);
 extern PartitionBoundSpec *transformPartitionBound(ParseState *pstate, Relation parent,
                                                    PartitionBoundSpec *spec);
+extern List *expandTableLikeClause(RangeVar *heapRel,
+                                   TableLikeClause *table_like_clause);
 extern IndexStmt *generateClonedIndexStmt(RangeVar *heapRel,
                                           Relation source_idx,
                                           const struct AttrMap *attmap,
diff --git a/src/test/modules/test_ddl_deparse/expected/create_table.out
b/src/test/modules/test_ddl_deparse/expected/create_table.out
index c7c9bf8971..0f2a2c164e 100644
--- a/src/test/modules/test_ddl_deparse/expected/create_table.out
+++ b/src/test/modules/test_ddl_deparse/expected/create_table.out
@@ -135,6 +135,8 @@ CREATE TABLE like_fkey_table (
   INCLUDING STORAGE
 );
 NOTICE:  DDL test: type simple, tag CREATE TABLE
+NOTICE:  DDL test: type alter table, tag ALTER TABLE
+NOTICE:    subcommand: ALTER COLUMN SET DEFAULT (precooked)
 NOTICE:  DDL test: type simple, tag CREATE INDEX
 NOTICE:  DDL test: type simple, tag CREATE INDEX
 -- Volatile table types
diff --git a/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
b/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
index b7bdb88ce7..def4e39f19 100644
--- a/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
+++ b/src/test/modules/test_ddl_deparse/test_ddl_deparse.c
@@ -111,6 +111,9 @@ get_altertable_subcmdtypes(PG_FUNCTION_ARGS)
             case AT_ColumnDefault:
                 strtype = "ALTER COLUMN SET DEFAULT";
                 break;
+            case AT_CookedColumnDefault:
+                strtype = "ALTER COLUMN SET DEFAULT (precooked)";
+                break;
             case AT_DropNotNull:
                 strtype = "DROP NOT NULL";
                 break;
diff --git a/src/test/regress/expected/create_table_like.out b/src/test/regress/expected/create_table_like.out
index 655e8e41dd..e3edbd8b51 100644
--- a/src/test/regress/expected/create_table_like.out
+++ b/src/test/regress/expected/create_table_like.out
@@ -160,7 +160,9 @@ SELECT * FROM test_like_gen_3;

 DROP TABLE test_like_gen_1, test_like_gen_2, test_like_gen_3;
 -- also test generated column with a "forward" reference (bug #16342)
-CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) STORED, a int);
+CREATE TABLE test_like_4 (b int DEFAULT 42,
+  c int GENERATED ALWAYS AS (a * 2) STORED,
+  a int CHECK (a > 0));
 \d test_like_4
                           Table "public.test_like_4"
  Column |  Type   | Collation | Nullable |              Default
@@ -168,6 +170,8 @@ CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) ST
  b      | integer |           |          | 42
  c      | integer |           |          | generated always as (a * 2) stored
  a      | integer |           |          |
+Check constraints:
+    "test_like_4_a_check" CHECK (a > 0)

 CREATE TABLE test_like_4a (LIKE test_like_4);
 CREATE TABLE test_like_4b (LIKE test_like_4 INCLUDING DEFAULTS);
@@ -233,7 +237,32 @@ SELECT a, b, c FROM test_like_4d;
  11 | 42 | 22
 (1 row)

+-- Test renumbering of Vars when combining LIKE with inheritance
+CREATE TABLE test_like_5 (x point, y point, z point);
+CREATE TABLE test_like_5x (p int CHECK (p > 0),
+   q int GENERATED ALWAYS AS (p * 2) STORED);
+CREATE TABLE test_like_5c (LIKE test_like_4 INCLUDING ALL)
+  INHERITS (test_like_5, test_like_5x);
+\d test_like_5c
+                         Table "public.test_like_5c"
+ Column |  Type   | Collation | Nullable |              Default
+--------+---------+-----------+----------+------------------------------------
+ x      | point   |           |          |
+ y      | point   |           |          |
+ z      | point   |           |          |
+ p      | integer |           |          |
+ q      | integer |           |          | generated always as (p * 2) stored
+ b      | integer |           |          | 42
+ c      | integer |           |          | generated always as (a * 2) stored
+ a      | integer |           |          |
+Check constraints:
+    "test_like_4_a_check" CHECK (a > 0)
+    "test_like_5x_p_check" CHECK (p > 0)
+Inherits: test_like_5,
+          test_like_5x
+
 DROP TABLE test_like_4, test_like_4a, test_like_4b, test_like_4c, test_like_4d;
+DROP TABLE test_like_5, test_like_5x, test_like_5c;
 CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
 INSERT INTO inhg VALUES (5, 10);
 INSERT INTO inhg VALUES (20, 10); -- should fail
@@ -269,9 +298,10 @@ ALTER TABLE ctlt1 ALTER COLUMN a SET STORAGE MAIN;
 CREATE TABLE ctlt2 (c text);
 ALTER TABLE ctlt2 ALTER COLUMN c SET STORAGE EXTERNAL;
 COMMENT ON COLUMN ctlt2.c IS 'C';
-CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text);
+CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text CHECK (length(c) < 7));
 ALTER TABLE ctlt3 ALTER COLUMN c SET STORAGE EXTERNAL;
 ALTER TABLE ctlt3 ALTER COLUMN a SET STORAGE MAIN;
+CREATE INDEX ctlt3_fnidx ON ctlt3 ((a || c));
 COMMENT ON COLUMN ctlt3.a IS 'A3';
 COMMENT ON COLUMN ctlt3.c IS 'C';
 COMMENT ON CONSTRAINT ctlt3_a_check ON ctlt3 IS 't3_a_check';
@@ -327,10 +357,11 @@ NOTICE:  merging multiple inherited definitions of column "a"
 Check constraints:
     "ctlt1_a_check" CHECK (length(a) > 2)
     "ctlt3_a_check" CHECK (length(a) < 5)
+    "ctlt3_c_check" CHECK (length(c) < 7)
 Inherits: ctlt1,
           ctlt3

-CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE) INHERITS (ctlt1);
+CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING INDEXES INCLUDING COMMENTS INCLUDING STORAGE)
INHERITS(ctlt1); 
 NOTICE:  merging column "a" with inherited definition
 \d+ ctlt13_like
                                Table "public.ctlt13_like"
@@ -339,9 +370,12 @@ NOTICE:  merging column "a" with inherited definition
  a      | text |           | not null |         | main     |              | A3
  b      | text |           |          |         | extended |              |
  c      | text |           |          |         | external |              | C
+Indexes:
+    "ctlt13_like_expr_idx" btree ((a || c))
 Check constraints:
     "ctlt1_a_check" CHECK (length(a) > 2)
     "ctlt3_a_check" CHECK (length(a) < 5)
+    "ctlt3_c_check" CHECK (length(c) < 7)
 Inherits: ctlt1

 SELECT description FROM pg_description, pg_constraint c WHERE classoid = 'pg_constraint'::regclass AND objoid = c.oid
ANDc.conrelid = 'ctlt13_like'::regclass; 
diff --git a/src/test/regress/sql/create_table_like.sql b/src/test/regress/sql/create_table_like.sql
index 6981ac0cbe..f0a8a56b76 100644
--- a/src/test/regress/sql/create_table_like.sql
+++ b/src/test/regress/sql/create_table_like.sql
@@ -66,7 +66,9 @@ SELECT * FROM test_like_gen_3;
 DROP TABLE test_like_gen_1, test_like_gen_2, test_like_gen_3;

 -- also test generated column with a "forward" reference (bug #16342)
-CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) STORED, a int);
+CREATE TABLE test_like_4 (b int DEFAULT 42,
+  c int GENERATED ALWAYS AS (a * 2) STORED,
+  a int CHECK (a > 0));
 \d test_like_4
 CREATE TABLE test_like_4a (LIKE test_like_4);
 CREATE TABLE test_like_4b (LIKE test_like_4 INCLUDING DEFAULTS);
@@ -84,7 +86,17 @@ SELECT a, b, c FROM test_like_4c;
 \d test_like_4d
 INSERT INTO test_like_4d (a) VALUES(11);
 SELECT a, b, c FROM test_like_4d;
+
+-- Test renumbering of Vars when combining LIKE with inheritance
+CREATE TABLE test_like_5 (x point, y point, z point);
+CREATE TABLE test_like_5x (p int CHECK (p > 0),
+   q int GENERATED ALWAYS AS (p * 2) STORED);
+CREATE TABLE test_like_5c (LIKE test_like_4 INCLUDING ALL)
+  INHERITS (test_like_5, test_like_5x);
+\d test_like_5c
+
 DROP TABLE test_like_4, test_like_4a, test_like_4b, test_like_4c, test_like_4d;
+DROP TABLE test_like_5, test_like_5x, test_like_5c;

 CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
 INSERT INTO inhg VALUES (5, 10);
@@ -119,9 +131,10 @@ CREATE TABLE ctlt2 (c text);
 ALTER TABLE ctlt2 ALTER COLUMN c SET STORAGE EXTERNAL;
 COMMENT ON COLUMN ctlt2.c IS 'C';

-CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text);
+CREATE TABLE ctlt3 (a text CHECK (length(a) < 5), c text CHECK (length(c) < 7));
 ALTER TABLE ctlt3 ALTER COLUMN c SET STORAGE EXTERNAL;
 ALTER TABLE ctlt3 ALTER COLUMN a SET STORAGE MAIN;
+CREATE INDEX ctlt3_fnidx ON ctlt3 ((a || c));
 COMMENT ON COLUMN ctlt3.a IS 'A3';
 COMMENT ON COLUMN ctlt3.c IS 'C';
 COMMENT ON CONSTRAINT ctlt3_a_check ON ctlt3 IS 't3_a_check';
@@ -138,7 +151,7 @@ CREATE TABLE ctlt1_inh (LIKE ctlt1 INCLUDING CONSTRAINTS INCLUDING COMMENTS) INH
 SELECT description FROM pg_description, pg_constraint c WHERE classoid = 'pg_constraint'::regclass AND objoid = c.oid
ANDc.conrelid = 'ctlt1_inh'::regclass; 
 CREATE TABLE ctlt13_inh () INHERITS (ctlt1, ctlt3);
 \d+ ctlt13_inh
-CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE) INHERITS (ctlt1);
+CREATE TABLE ctlt13_like (LIKE ctlt3 INCLUDING CONSTRAINTS INCLUDING INDEXES INCLUDING COMMENTS INCLUDING STORAGE)
INHERITS(ctlt1); 
 \d+ ctlt13_like
 SELECT description FROM pg_description, pg_constraint c WHERE classoid = 'pg_constraint'::regclass AND objoid = c.oid
ANDc.conrelid = 'ctlt13_like'::regclass; 


В списке pgsql-bugs по дате отправления:

Предыдущее
От: PG Bug reporting form
Дата:
Сообщение: BUG #16587: Error when download
Следующее
От: Tom Lane
Дата:
Сообщение: Re: BUG #16585: Wrong filtering on a COALESCE field after using GROUPING SETS