Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE
Дата
Msg-id 8840.1587948815@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: BUG #16272: Index expression can refer to wrong attributes if index is created via CREATE TABLE LIKE  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-bugs
I wrote:
> PG Bug reporting form <noreply@postgresql.org> writes:
>> The index expression in the index created via LIKE ... INCLUDING INDEXES
>> still refers to the first two attributes of the table, although an attribute
>> has been put in place before the columns the expression referred to in the
>> original index.

> Ugh.  So the problem here is that transformTableLikeClause carefully
> renumbers the Vars in the index expression to match the new column
> numbers ... as they stand when it runs, which is before any account
> has been taken of inheritance.  It looks like Vars in check constraints
> are likewise misprocessed, and probably GENERATED expressions as well.

I did some more investigation of this today.  There are at least three
related bugs in this vicinity:

* Vars in CHECK constraints coming from LIKE ... INCLUDING CONSTRAINTS
are not renumbered when they need to be.

* Vars in GENERATED expressions coming from LIKE ... INCLUDING GENERATED
are not renumbered when they need to be.

* Vars in GENERATED expressions coming from inheritance parents
are not renumbered when they need to be.

The attached proposed patch fixes those three things.  But it fails
to fix the initially-complained-of problem with indexes, because the
indexes are not part of the CreateStmt data structure received by
DefineRelation.  Still, this is progress towards that, because at
least we are now building an AttrMap that would be appropriate to use
for that purpose.  The remaining problem is to be able to apply that
AttrMap to the IndexStmt(s).

One idea is to pass in the list of statements generated by
transformCreateStmt to DefineRelation and let it hack on them.
That seems like an enormous violation of modularity, though.
Another, perhaps marginally cleaner, answer is to pass back
the AttrMap and let ProcessUtility hack up the IndexStmts
as it loops over the list.

I can't avoid the impression that we ought to rewrite all this logic
... DefineRelation was rather ugly even when we got it from Berkeley,
and we've layered more ugliness on it over the years.  But I'm not
sure offhand what a nicer design would look like.

            regards, tom lane

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 5745cd6..7aa1a18 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -310,8 +310,10 @@ static void truncate_check_perms(Oid relid, Form_pg_class reltuple);
 static void truncate_check_activity(Relation rel);
 static void RangeVarCallbackForTruncate(const RangeVar *relation,
                                         Oid relId, Oid oldRelId, void *arg);
-static List *MergeAttributes(List *schema, List *supers, char relpersistence,
-                             bool is_partition, List **supconstr);
+static List *MergeAttributes(List *schema, List *supers,
+                             List *schema_constraints,
+                             char relpersistence, bool is_partition,
+                             List **supconstr);
 static bool MergeCheckConstraint(List *constraints, char *name, Node *expr);
 static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
 static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
@@ -774,10 +776,14 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
     /*
      * Look up inheritance ancestors and generate relation schema, including
      * inherited attributes.  (Note that stmt->tableElts is destructively
-     * modified by MergeAttributes.)
+     * modified by MergeAttributes.)  This step is also responsible for
+     * adjusting default and constraint expressions as needed to reflect any
+     * column renumbering that happens due to inheritance.  (While "raw"
+     * expressions don't need adjustment, pre-"cooked" ones do.)
      */
     stmt->tableElts =
         MergeAttributes(stmt->tableElts, inheritOids,
+                        stmt->constraints,
                         stmt->relation->relpersistence,
                         stmt->partbound != NULL,
                         &old_constraints);
@@ -2054,8 +2060,11 @@ storage_name(char c)
  * 'schema' is the column/attribute definition for the table. (It's a list
  *        of ColumnDef's.) It is destructively changed.
  * 'supers' is a list of OIDs of parent relations, already locked by caller.
- * 'relpersistence' is a persistence type of the table.
- * 'is_partition' tells if the table is a partition
+ * 'schema_constraints' is the list of constraints for the table.  If there
+ *        are any pre-cooked expressions in that, we'll update them as needed
+ *        to account for possible column renumbering.
+ * 'relpersistence' is the persistence type of the table.
+ * 'is_partition' tells if the table is a partition.
  *
  * Output arguments:
  * 'supconstr' receives a list of constraints belonging to the parents,
@@ -2104,8 +2113,9 @@ storage_name(char c)
  *----------
  */
 static List *
-MergeAttributes(List *schema, List *supers, char relpersistence,
-                bool is_partition, List **supconstr)
+MergeAttributes(List *schema, List *supers, List *schema_constraints,
+                char relpersistence, bool is_partition,
+                List **supconstr)
 {
     List       *inhSchema = NIL;
     List       *constraints = NIL;
@@ -2218,7 +2228,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
         TupleDesc    tupleDesc;
         TupleConstr *constr;
         AttrMap    *newattmap;
+        List       *inherited_defaults;
+        List       *cols_with_defaults;
         AttrNumber    parent_attno;
+        ListCell   *lc1;
+        ListCell   *lc2;

         /* caller already got lock */
         relation = table_open(parent, NoLock);
@@ -2304,6 +2318,9 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
          */
         newattmap = make_attrmap(tupleDesc->natts);

+        /* We can't process inherited defaults until newattmap is complete. */
+        inherited_defaults = cols_with_defaults = NIL;
+
         for (parent_attno = 1; parent_attno <= tupleDesc->natts;
              parent_attno++)
         {
@@ -2359,7 +2376,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                                        get_collation_name(defCollId),
                                        get_collation_name(attribute->attcollation))));

-                /* Copy storage parameter */
+                /* Copy/check storage parameter */
                 if (def->storage == 0)
                     def->storage = attribute->attstorage;
                 else if (def->storage != attribute->attstorage)
@@ -2371,18 +2388,18 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                                        storage_name(def->storage),
                                        storage_name(attribute->attstorage))));

-                def->inhcount++;
-                /* Merge of NOT NULL constraints = OR 'em together */
-                def->is_not_null |= attribute->attnotnull;
-                /* Default and other constraints are handled below */
-                newattmap->attnums[parent_attno - 1] = exist_attno;
-
                 /* Check for GENERATED conflicts */
                 if (def->generated != attribute->attgenerated)
                     ereport(ERROR,
                             (errcode(ERRCODE_DATATYPE_MISMATCH),
                              errmsg("inherited column \"%s\" has a generation conflict",
                                     attributeName)));
+
+                def->inhcount++;
+                /* Merge of NOT NULL constraints = OR 'em together */
+                def->is_not_null |= attribute->attnotnull;
+                /* Default and other constraints are handled below */
+                newattmap->attnums[parent_attno - 1] = exist_attno;
             }
             else
             {
@@ -2410,7 +2427,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
             }

             /*
-             * Copy default if any
+             * Locate default if any
              */
             if (attribute->atthasdef)
             {
@@ -2432,23 +2449,58 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                 Assert(this_default != NULL);

                 /*
-                 * If default expr could contain any vars, we'd need to fix
-                 * 'em, but it can't; so default is ready to apply to child.
-                 *
-                 * If we already had a default from some prior parent, check
-                 * to see if they are the same.  If so, no problem; if not,
-                 * mark the column as having a bogus default. Below, we will
-                 * complain if the bogus default isn't overridden by the child
-                 * schema.
+                 * If it's a GENERATED default, it might contain Vars that
+                 * need to be mapped to the inherited column(s)' new numbers.
+                 * We can't do that till newattmap is ready, so just remember
+                 * all the inherited default expressions for the moment.
                  */
-                Assert(def->raw_default == NULL);
-                if (def->cooked_default == NULL)
-                    def->cooked_default = this_default;
-                else if (!equal(def->cooked_default, this_default))
-                {
-                    def->cooked_default = &bogus_marker;
-                    have_bogus_defaults = true;
-                }
+                inherited_defaults = lappend(inherited_defaults, this_default);
+                cols_with_defaults = lappend(cols_with_defaults, def);
+            }
+        }
+
+        /*
+         * Now process any inherited default expressions, adjusting attnos
+         * using the completed newattmap map.
+         */
+        forboth(lc1, inherited_defaults, lc2, cols_with_defaults)
+        {
+            Node       *this_default = (Node *) lfirst(lc1);
+            ColumnDef  *def = (ColumnDef *) lfirst(lc2);
+            bool        found_whole_row;
+
+            /* Adjust Vars to match new table's column numbering */
+            this_default = map_variable_attnos(this_default,
+                                               1, 0,
+                                               newattmap,
+                                               InvalidOid, &found_whole_row);
+
+            /*
+             * For the moment we have to reject whole-row variables.  We could
+             * convert them, if we knew the new table's rowtype OID, but that
+             * hasn't been assigned yet.
+             */
+            if (found_whole_row)
+                ereport(ERROR,
+                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                         errmsg("cannot convert whole-row table reference"),
+                         errdetail("Default for column \"%s\" contains a whole-row reference to table \"%s\".",
+                                   def->colname,
+                                   RelationGetRelationName(relation))));
+
+            /*
+             * If we already had a default from some prior parent, check to
+             * see if they are the same.  If so, no problem; if not, mark the
+             * column as having a bogus default.  Below, we will complain if
+             * the bogus default isn't overridden by the child schema.
+             */
+            Assert(def->raw_default == NULL);
+            if (def->cooked_default == NULL)
+                def->cooked_default = this_default;
+            else if (!equal(def->cooked_default, this_default))
+            {
+                def->cooked_default = &bogus_marker;
+                have_bogus_defaults = true;
             }
         }

@@ -2525,12 +2577,24 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
     /*
      * If we had no inherited attributes, the result schema is just the
      * explicitly declared columns.  Otherwise, we need to merge the declared
-     * columns into the inherited schema list.  Although, we never have any
-     * explicitly declared columns if the table is a partition.
+     * columns into the inherited schema list.  That also means that we might
+     * have to renumber Vars in any pre-cooked expressions in the schema.
      */
     if (inhSchema != NIL)
     {
         int            schema_attno = 0;
+        AttrMap    *newattmap;
+        List       *cols_with_defaults;
+        ListCell   *lc1;
+
+        /*
+         * newattmap will contain the possibly-adjusted column numbers for the
+         * explicitly declared columns.
+         */
+        newattmap = make_attrmap(list_length(schema));
+
+        /* We can't process cooked defaults until newattmap is complete. */
+        cols_with_defaults = NIL;

         foreach(entry, schema)
         {
@@ -2614,15 +2678,21 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                                        storage_name(def->storage),
                                        storage_name(newdef->storage))));

+                /* OK, build the attmap */
+                newattmap->attnums[schema_attno - 1] = exist_attno;
+
                 /* Mark the column as locally defined */
                 def->is_local = true;
                 /* Merge of NOT NULL constraints = OR 'em together */
                 def->is_not_null |= newdef->is_not_null;
                 /* If new def has a default, override previous default */
-                if (newdef->raw_default != NULL)
+                if (newdef->raw_default || newdef->cooked_default)
                 {
                     def->raw_default = newdef->raw_default;
                     def->cooked_default = newdef->cooked_default;
+                    /* We'll need to reprocess default, if it's cooked */
+                    if (def->cooked_default)
+                        cols_with_defaults = lappend(cols_with_defaults, def);
                 }
             }
             else
@@ -2631,6 +2701,13 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                  * No, attach new column to result schema
                  */
                 inhSchema = lappend(inhSchema, newdef);
+
+                /* Build the attmap */
+                newattmap->attnums[schema_attno - 1] = list_length(inhSchema);
+
+                /* We'll need to reprocess default, if it's cooked */
+                if (newdef->cooked_default)
+                    cols_with_defaults = lappend(cols_with_defaults, newdef);
             }
         }

@@ -2645,6 +2722,80 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
                     (errcode(ERRCODE_TOO_MANY_COLUMNS),
                      errmsg("tables can have at most %d columns",
                             MaxHeapAttributeNumber)));
+
+        /*
+         * If any columns have cooked defaults that came from the declared
+         * schema, update Var numbers in those expressions.  (This can only
+         * happen with defaults coming from LIKE clauses, since only those
+         * could be pre-cooked at this stage.)
+         */
+        foreach(lc1, cols_with_defaults)
+        {
+            ColumnDef  *def = (ColumnDef *) lfirst(lc1);
+            bool        found_whole_row;
+
+            def->cooked_default = map_variable_attnos(def->cooked_default,
+                                                      1, 0,
+                                                      newattmap,
+                                                      InvalidOid,
+                                                      &found_whole_row);
+
+            /*
+             * For the moment we have to reject whole-row variables.  We could
+             * convert them, if we knew the new table's rowtype OID, but that
+             * hasn't been assigned yet.
+             */
+            if (found_whole_row)
+                ereport(ERROR,
+                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                         errmsg("cannot convert whole-row table reference"),
+                         errdetail("Default for column \"%s\" contains a whole-row reference.",
+                                   def->colname)));
+        }
+
+        /*
+         * Likewise, any pre-cooked expressions in CHECK constraints in the
+         * declared schema need to be adjusted.  (Again, LIKE is to blame for
+         * needing to do this.)
+         */
+        foreach(lc1, schema_constraints)
+        {
+            Constraint *cdef = (Constraint *) lfirst(lc1);
+            Node       *expr;
+            bool        found_whole_row;
+
+            if (cdef->contype != CONSTR_CHECK)
+                continue;
+
+            if (cdef->cooked_expr == NULL)
+                continue;
+
+            expr = stringToNode(cdef->cooked_expr);
+
+            expr = map_variable_attnos(expr,
+                                       1, 0,
+                                       newattmap,
+                                       InvalidOid,
+                                       &found_whole_row);
+
+            /*
+             * For the moment we have to reject whole-row variables.  We could
+             * convert them, if we knew the new table's rowtype OID, but that
+             * hasn't been assigned yet.  (Note: there should probably always
+             * be a constraint name available at this point, so we don't try
+             * too hard to deal with the case where there isn't.)
+             */
+            if (found_whole_row)
+                ereport(ERROR,
+                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                         errmsg("cannot convert whole-row table reference"),
+                         errdetail("Constraint \"%s\" contains a whole-row reference.",
+                                   cdef->conname ? cdef->conname : "(unnamed)")));
+
+            cdef->cooked_expr = nodeToString(expr);
+        }
+
+        free_attrmap(newattmap);
     }

     /*
diff --git a/src/test/regress/expected/create_table_like.out b/src/test/regress/expected/create_table_like.out
index 655e8e4..6989dd7 100644
--- a/src/test/regress/expected/create_table_like.out
+++ b/src/test/regress/expected/create_table_like.out
@@ -160,7 +160,9 @@ SELECT * FROM test_like_gen_3;

 DROP TABLE test_like_gen_1, test_like_gen_2, test_like_gen_3;
 -- also test generated column with a "forward" reference (bug #16342)
-CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) STORED, a int);
+CREATE TABLE test_like_4 (b int DEFAULT 42,
+  c int GENERATED ALWAYS AS (a * 2) STORED,
+  a int CHECK (a > 0));
 \d test_like_4
                           Table "public.test_like_4"
  Column |  Type   | Collation | Nullable |              Default
@@ -168,6 +170,8 @@ CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) ST
  b      | integer |           |          | 42
  c      | integer |           |          | generated always as (a * 2) stored
  a      | integer |           |          |
+Check constraints:
+    "test_like_4_a_check" CHECK (a > 0)

 CREATE TABLE test_like_4a (LIKE test_like_4);
 CREATE TABLE test_like_4b (LIKE test_like_4 INCLUDING DEFAULTS);
@@ -233,7 +237,32 @@ SELECT a, b, c FROM test_like_4d;
  11 | 42 | 22
 (1 row)

+-- Test renumbering of Vars when combining LIKE with inheritance
+CREATE TABLE test_like_5 (x point, y point, z point);
+CREATE TABLE test_like_5x (p int CHECK (p > 0),
+   q int GENERATED ALWAYS AS (p * 2) STORED);
+CREATE TABLE test_like_5c (LIKE test_like_4 INCLUDING ALL)
+  INHERITS (test_like_5, test_like_5x);
+\d test_like_5c
+                         Table "public.test_like_5c"
+ Column |  Type   | Collation | Nullable |              Default
+--------+---------+-----------+----------+------------------------------------
+ x      | point   |           |          |
+ y      | point   |           |          |
+ z      | point   |           |          |
+ p      | integer |           |          |
+ q      | integer |           |          | generated always as (p * 2) stored
+ b      | integer |           |          | 42
+ c      | integer |           |          | generated always as (a * 2) stored
+ a      | integer |           |          |
+Check constraints:
+    "test_like_4_a_check" CHECK (a > 0)
+    "test_like_5x_p_check" CHECK (p > 0)
+Inherits: test_like_5,
+          test_like_5x
+
 DROP TABLE test_like_4, test_like_4a, test_like_4b, test_like_4c, test_like_4d;
+DROP TABLE test_like_5, test_like_5x, test_like_5c;
 CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
 INSERT INTO inhg VALUES (5, 10);
 INSERT INTO inhg VALUES (20, 10); -- should fail
diff --git a/src/test/regress/sql/create_table_like.sql b/src/test/regress/sql/create_table_like.sql
index 6981ac0..708948b 100644
--- a/src/test/regress/sql/create_table_like.sql
+++ b/src/test/regress/sql/create_table_like.sql
@@ -66,7 +66,9 @@ SELECT * FROM test_like_gen_3;
 DROP TABLE test_like_gen_1, test_like_gen_2, test_like_gen_3;

 -- also test generated column with a "forward" reference (bug #16342)
-CREATE TABLE test_like_4 (b int DEFAULT 42, c int GENERATED ALWAYS AS (a * 2) STORED, a int);
+CREATE TABLE test_like_4 (b int DEFAULT 42,
+  c int GENERATED ALWAYS AS (a * 2) STORED,
+  a int CHECK (a > 0));
 \d test_like_4
 CREATE TABLE test_like_4a (LIKE test_like_4);
 CREATE TABLE test_like_4b (LIKE test_like_4 INCLUDING DEFAULTS);
@@ -84,7 +86,17 @@ SELECT a, b, c FROM test_like_4c;
 \d test_like_4d
 INSERT INTO test_like_4d (a) VALUES(11);
 SELECT a, b, c FROM test_like_4d;
+
+-- Test renumbering of Vars when combining LIKE with inheritance
+CREATE TABLE test_like_5 (x point, y point, z point);
+CREATE TABLE test_like_5x (p int CHECK (p > 0),
+   q int GENERATED ALWAYS AS (p * 2) STORED);
+CREATE TABLE test_like_5c (LIKE test_like_4 INCLUDING ALL)
+  INHERITS (test_like_5, test_like_5x);
+\d test_like_5c
+
 DROP TABLE test_like_4, test_like_4a, test_like_4b, test_like_4c, test_like_4d;
+DROP TABLE test_like_5, test_like_5x, test_like_5c;

 CREATE TABLE inhg (x text, LIKE inhx INCLUDING INDEXES, y text); /* copies indexes */
 INSERT INTO inhg VALUES (5, 10);

В списке pgsql-bugs по дате отправления:

Предыдущее
От: Bruce Momjian
Дата:
Сообщение: Re: BUG #16392: Unable to logon
Следующее
От: Sandeep Thakkar
Дата:
Сообщение: Re: BUG #16364: ICACLS error when installing under system context "NTAUTHORITY\SYSTEM" ie installing with SCCM