WIP: fix SET WITHOUT OIDS, add SET WITH OIDS

Поиск
Список
Период
Сортировка
От Tom Lane
Тема WIP: fix SET WITHOUT OIDS, add SET WITH OIDS
Дата
Msg-id 20594.1234109627@sss.pgh.pa.us
обсуждение исходный текст
Ответы Re: WIP: fix SET WITHOUT OIDS, add SET WITH OIDS  (Andrew Dunstan <andrew@dunslane.net>)
Список pgsql-hackers
We have an open problem with CVS HEAD that ALTER TABLE SET WITHOUT OIDS
causes problems:
http://archives.postgresql.org/pgsql-hackers/2008-11/msg00332.php

I opined at the time that what we really have here is a table whose
tuples do not match its declared rowtype, and that the proper fix is
to make SET WITHOUT OIDS rewrite the table to physically get rid of
the OIDs.  The attached patch (which lacks doc changes or regression
tests as yet) does that, and also adds the inverse SET WITH OIDS
operation to do what you'd expect, ie, add an OID column if it isn't
there already.

The major objection to this would probably be that SET WITHOUT OIDS has
historically been a "free" catalog-change operation, and now it will be
expensive on large tables.  But given that we've deprecated OIDs in user
tables since 8.0, I think most people have been through that conversion
already, or have decided to keep their OIDs anyway.  I don't think it's
worth taking a continuing risk of backend bugs in order to make life a
bit easier for any remaining laggards.

A different discussion is whether it's appropriate to put in SET WITH
OIDS now, when we're well past feature freeze.  If we stripped that out
of this patch it'd save a few dozen lines of code, but I'm not really
seeing the point.  The asymmetry of having SET WITHOUT and not SET WITH
has always been an implementation artifact anyway.

Comments?

            regards, tom lane

Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.279
diff -c -r1.279 tablecmds.c
*** src/backend/commands/tablecmds.c    2 Feb 2009 19:31:38 -0000    1.279
--- src/backend/commands/tablecmds.c    8 Feb 2009 15:52:00 -0000
***************
*** 138,143 ****
--- 138,144 ----
      List       *constraints;    /* List of NewConstraint */
      List       *newvals;        /* List of NewColumnValue */
      bool        new_notnull;    /* T if we added new NOT NULL constraints */
+     bool        new_changeoids;    /* T if we added/dropped the OID column */
      Oid            newTableSpace;    /* new tablespace; 0 means no change */
      /* Objects to rebuild after completing ALTER TYPE operations */
      List       *changedConstraintOids;    /* OIDs of constraints to rebuild */
***************
*** 269,276 ****
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
                  AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
!                 ColumnDef *colDef);
  static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
  static void ATExecDropNotNull(Relation rel, const char *colName);
  static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
                   const char *colName);
--- 270,279 ----
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
                  AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
!                 ColumnDef *colDef, bool isOid);
  static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
+ static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse,
+                 AlterTableCmd *cmd);
  static void ATExecDropNotNull(Relation rel, const char *colName);
  static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
                   const char *colName);
***************
*** 282,288 ****
                      Node *newValue);
  static void ATExecSetStorage(Relation rel, const char *colName,
                   Node *newValue);
! static void ATExecDropColumn(Relation rel, const char *colName,
                   DropBehavior behavior,
                   bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
--- 285,291 ----
                      Node *newValue);
  static void ATExecSetStorage(Relation rel, const char *colName,
                   Node *newValue);
! static void ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
                   DropBehavior behavior,
                   bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
***************
*** 2452,2457 ****
--- 2455,2467 ----
              /* No command-specific prep needed */
              pass = AT_PASS_MISC;
              break;
+         case AT_AddOids:        /* SET WITH OIDS */
+             ATSimplePermissions(rel, false);
+             /* Performs own recursion */
+             if (!rel->rd_rel->relhasoids || recursing)
+                 ATPrepAddOids(wqueue, rel, recurse, cmd);
+             pass = AT_PASS_ADD_COL;
+             break;
          case AT_DropOids:        /* SET WITHOUT OIDS */
              ATSimplePermissions(rel, false);
              /* Performs own recursion */
***************
*** 2589,2595 ****
      {
          case AT_AddColumn:        /* ADD COLUMN */
          case AT_AddColumnToView: /* add column via CREATE OR REPLACE VIEW */
!             ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
              break;
          case AT_ColumnDefault:    /* ALTER COLUMN DEFAULT */
              ATExecColumnDefault(rel, cmd->name, cmd->def);
--- 2599,2605 ----
      {
          case AT_AddColumn:        /* ADD COLUMN */
          case AT_AddColumnToView: /* add column via CREATE OR REPLACE VIEW */
!             ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false);
              break;
          case AT_ColumnDefault:    /* ALTER COLUMN DEFAULT */
              ATExecColumnDefault(rel, cmd->name, cmd->def);
***************
*** 2607,2616 ****
              ATExecSetStorage(rel, cmd->name, cmd->def);
              break;
          case AT_DropColumn:        /* DROP COLUMN */
!             ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
              break;
          case AT_DropColumnRecurse:        /* DROP COLUMN with recursion */
!             ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
              break;
          case AT_AddIndex:        /* ADD INDEX */
              ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
--- 2617,2628 ----
              ATExecSetStorage(rel, cmd->name, cmd->def);
              break;
          case AT_DropColumn:        /* DROP COLUMN */
!             ATExecDropColumn(wqueue, rel, cmd->name,
!                              cmd->behavior, false, false);
              break;
          case AT_DropColumnRecurse:        /* DROP COLUMN with recursion */
!             ATExecDropColumn(wqueue, rel, cmd->name,
!                              cmd->behavior, true, false);
              break;
          case AT_AddIndex:        /* ADD INDEX */
              ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
***************
*** 2644,2649 ****
--- 2656,2666 ----
          case AT_DropCluster:    /* SET WITHOUT CLUSTER */
              ATExecDropCluster(rel);
              break;
+         case AT_AddOids:        /* SET WITH OIDS */
+             /* Use the ADD COLUMN code, unless prep decided to do nothing */
+             if (cmd->def != NULL)
+                 ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true);
+             break;
          case AT_DropOids:        /* SET WITHOUT OIDS */

              /*
***************
*** 2748,2756 ****

          /*
           * We only need to rewrite the table if at least one column needs to
!          * be recomputed.
           */
!         if (tab->newvals != NIL)
          {
              /* Build a temporary relation and copy data */
              Oid            OIDNewHeap;
--- 2765,2773 ----

          /*
           * We only need to rewrite the table if at least one column needs to
!          * be recomputed, or we are adding/removing the OID column.
           */
!         if (tab->newvals != NIL || tab->new_changeoids)
          {
              /* Build a temporary relation and copy data */
              Oid            OIDNewHeap;
***************
*** 2976,2983 ****
      {
          NewColumnValue *ex = lfirst(l);

-         needscan = true;
-
          ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
      }

--- 2993,2998 ----
***************
*** 3000,3006 ****
              needscan = true;
      }

!     if (needscan)
      {
          ExprContext *econtext;
          Datum       *values;
--- 3015,3021 ----
              needscan = true;
      }

!     if (newrel || needscan)
      {
          ExprContext *econtext;
          Datum       *values;
***************
*** 3479,3485 ****

  static void
  ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
!                 ColumnDef *colDef)
  {
      Oid            myrelid = RelationGetRelid(rel);
      Relation    pgclass,
--- 3494,3500 ----

  static void
  ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
!                 ColumnDef *colDef, bool isOid)
  {
      Oid            myrelid = RelationGetRelid(rel);
      Relation    pgclass,
***************
*** 3512,3518 ****
              Oid            ctypeId;
              int32        ctypmod;

!             /* Okay if child matches by type */
              ctypeId = typenameTypeId(NULL, colDef->typename, &ctypmod);
              if (ctypeId != childatt->atttypid ||
                  ctypmod != childatt->atttypmod)
--- 3527,3533 ----
              Oid            ctypeId;
              int32        ctypmod;

!             /* Child column must match by type */
              ctypeId = typenameTypeId(NULL, colDef->typename, &ctypmod);
              if (ctypeId != childatt->atttypid ||
                  ctypmod != childatt->atttypmod)
***************
*** 3521,3526 ****
--- 3536,3548 ----
                           errmsg("child table \"%s\" has different type for column \"%s\"",
                              RelationGetRelationName(rel), colDef->colname)));

+             /* If it's OID, child column must actually be OID */
+             if (isOid && childatt->attnum != ObjectIdAttributeNumber)
+                 ereport(ERROR,
+                         (errcode(ERRCODE_DATATYPE_MISMATCH),
+                          errmsg("child table \"%s\" has a conflicting \"%s\" column",
+                             RelationGetRelationName(rel), colDef->colname)));
+
              /* Bump the existing child att's inhcount */
              childatt->attinhcount++;
              simple_heap_update(attrdesc, &tuple->t_self, tuple);
***************
*** 3560,3571 ****
                   errmsg("column \"%s\" of relation \"%s\" already exists",
                          colDef->colname, RelationGetRelationName(rel))));

!     newattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts + 1;
!     if (newattnum > MaxHeapAttributeNumber)
!         ereport(ERROR,
!                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
!                  errmsg("tables can have at most %d columns",
!                         MaxHeapAttributeNumber)));

      typeTuple = typenameType(NULL, colDef->typename, &typmod);
      tform = (Form_pg_type) GETSTRUCT(typeTuple);
--- 3582,3599 ----
                   errmsg("column \"%s\" of relation \"%s\" already exists",
                          colDef->colname, RelationGetRelationName(rel))));

!     /* Determine the new attribute's number */
!     if (isOid)
!         newattnum = ObjectIdAttributeNumber;
!     else
!     {
!         newattnum = ((Form_pg_class) GETSTRUCT(reltup))->relnatts + 1;
!         if (newattnum > MaxHeapAttributeNumber)
!             ereport(ERROR,
!                     (errcode(ERRCODE_TOO_MANY_COLUMNS),
!                      errmsg("tables can have at most %d columns",
!                             MaxHeapAttributeNumber)));
!     }

      typeTuple = typenameType(NULL, colDef->typename, &typmod);
      tform = (Form_pg_type) GETSTRUCT(typeTuple);
***************
*** 3578,3584 ****
      attribute.attrelid = myrelid;
      namestrcpy(&(attribute.attname), colDef->colname);
      attribute.atttypid = typeOid;
!     attribute.attstattarget = -1;
      attribute.attlen = tform->typlen;
      attribute.attcacheoff = -1;
      attribute.atttypmod = typmod;
--- 3606,3612 ----
      attribute.attrelid = myrelid;
      namestrcpy(&(attribute.attname), colDef->colname);
      attribute.atttypid = typeOid;
!     attribute.attstattarget = (newattnum > 0) ? -1 : 0;
      attribute.attlen = tform->typlen;
      attribute.attcacheoff = -1;
      attribute.atttypmod = typmod;
***************
*** 3601,3609 ****
      heap_close(attrdesc, RowExclusiveLock);

      /*
!      * Update number of attributes in pg_class tuple
       */
!     ((Form_pg_class) GETSTRUCT(reltup))->relnatts = newattnum;

      simple_heap_update(pgclass, &reltup->t_self, reltup);

--- 3629,3640 ----
      heap_close(attrdesc, RowExclusiveLock);

      /*
!      * Update pg_class tuple as appropriate
       */
!     if (isOid)
!         ((Form_pg_class) GETSTRUCT(reltup))->relhasoids = true;
!     else
!         ((Form_pg_class) GETSTRUCT(reltup))->relnatts = newattnum;

      simple_heap_update(pgclass, &reltup->t_self, reltup);

***************
*** 3665,3671 ****
       * defaults, not even for domain-typed columns.  And in any case we mustn't
       * invoke Phase 3 on a view, since it has no storage.
       */
!     if (relkind != RELKIND_VIEW)
      {
          defval = (Expr *) build_column_default(rel, attribute.attnum);

--- 3696,3702 ----
       * defaults, not even for domain-typed columns.  And in any case we mustn't
       * invoke Phase 3 on a view, since it has no storage.
       */
!     if (relkind != RELKIND_VIEW && attribute.attnum > 0)
      {
          defval = (Expr *) build_column_default(rel, attribute.attnum);

***************
*** 3702,3712 ****
--- 3733,3753 ----

          /*
           * If the new column is NOT NULL, tell Phase 3 it needs to test that.
+          * (Note we don't do this for an OID column.  OID will be marked not
+          * null, but since it's filled specially, there's no need to test
+          * anything.)
           */
          tab->new_notnull |= colDef->is_not_null;
      }

      /*
+      * If we are adding an OID column, we have to tell Phase 3 to rewrite
+      * the table to fix that.
+      */
+     if (isOid)
+         tab->new_changeoids = true;
+
+     /*
       * Add needed dependency entries for the new column.
       */
      add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid);
***************
*** 3731,3736 ****
--- 3772,3801 ----
  }

  /*
+  * ALTER TABLE SET WITH OIDS
+  *
+  * Basically this is an ADD COLUMN for the special OID column.  We have
+  * to cons up a ColumnDef node because the ADD COLUMN code needs one.
+  */
+ static void
+ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd)
+ {
+     /* If we're recursing to a child table, the ColumnDef is already set up */
+     if (cmd->def == NULL)
+     {
+         ColumnDef  *cdef = makeNode(ColumnDef);
+
+         cdef->colname = pstrdup("oid");
+         cdef->typename = makeTypeNameFromOid(OIDOID, -1);
+         cdef->inhcount = 0;
+         cdef->is_local = true;
+         cdef->is_not_null = true;
+         cmd->def = (Node *) cdef;
+     }
+     ATPrepAddColumn(wqueue, rel, recurse, cmd);
+ }
+
+ /*
   * ALTER TABLE ALTER COLUMN DROP NOT NULL
   */
  static void
***************
*** 4088,4099 ****
   * because we have to decide at runtime whether to recurse or not depending
   * on whether attinhcount goes to zero or not.    (We can't check this in a
   * static pre-pass because it won't handle multiple inheritance situations
!  * correctly.)    Since DROP COLUMN doesn't need to create any work queue
!  * entries for Phase 3, it's okay to recurse internally in this routine
!  * without considering the work queue.
   */
  static void
! ATExecDropColumn(Relation rel, const char *colName,
                   DropBehavior behavior,
                   bool recurse, bool recursing)
  {
--- 4153,4162 ----
   * because we have to decide at runtime whether to recurse or not depending
   * on whether attinhcount goes to zero or not.    (We can't check this in a
   * static pre-pass because it won't handle multiple inheritance situations
!  * correctly.)
   */
  static void
! ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
                   DropBehavior behavior,
                   bool recurse, bool recursing)
  {
***************
*** 4178,4184 ****
                  if (childatt->attinhcount == 1 && !childatt->attislocal)
                  {
                      /* Time to delete this child column, too */
!                     ATExecDropColumn(childrel, colName, behavior, true, true);
                  }
                  else
                  {
--- 4241,4248 ----
                  if (childatt->attinhcount == 1 && !childatt->attislocal)
                  {
                      /* Time to delete this child column, too */
!                     ATExecDropColumn(wqueue, childrel, colName,
!                                      behavior, true, true);
                  }
                  else
                  {
***************
*** 4230,4241 ****
      performDeletion(&object, behavior);

      /*
!      * If we dropped the OID column, must adjust pg_class.relhasoids
       */
      if (attnum == ObjectIdAttributeNumber)
      {
          Relation    class_rel;
          Form_pg_class tuple_class;

          class_rel = heap_open(RelationRelationId, RowExclusiveLock);

--- 4294,4307 ----
      performDeletion(&object, behavior);

      /*
!      * If we dropped the OID column, must adjust pg_class.relhasoids and
!      * tell Phase 3 to physically get rid of the column.
       */
      if (attnum == ObjectIdAttributeNumber)
      {
          Relation    class_rel;
          Form_pg_class tuple_class;
+         AlteredTableInfo *tab;

          class_rel = heap_open(RelationRelationId, RowExclusiveLock);

***************
*** 4254,4259 ****
--- 4320,4331 ----
          CatalogUpdateIndexes(class_rel, tuple);

          heap_close(class_rel, RowExclusiveLock);
+
+         /* Find or create work queue entry for this table */
+         tab = ATGetQueueEntry(wqueue, rel);
+
+         /* Tell Phase 3 to physically remove the OID column */
+         tab->new_changeoids = true;
      }
  }

Index: src/backend/parser/gram.y
===================================================================
RCS file: /cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.657
diff -c -r2.657 gram.y
*** src/backend/parser/gram.y    2 Feb 2009 19:31:39 -0000    2.657
--- src/backend/parser/gram.y    8 Feb 2009 15:52:01 -0000
***************
*** 1636,1641 ****
--- 1636,1648 ----
                      n->behavior = $4;
                      $$ = (Node *)n;
                  }
+             /* ALTER TABLE <name> SET WITH OIDS  */
+             | SET WITH OIDS
+                 {
+                     AlterTableCmd *n = makeNode(AlterTableCmd);
+                     n->subtype = AT_AddOids;
+                     $$ = (Node *)n;
+                 }
              /* ALTER TABLE <name> SET WITHOUT OIDS  */
              | SET WITHOUT OIDS
                  {
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.390
diff -c -r1.390 parsenodes.h
*** src/include/nodes/parsenodes.h    2 Feb 2009 19:31:40 -0000    1.390
--- src/include/nodes/parsenodes.h    8 Feb 2009 15:52:01 -0000
***************
*** 1127,1132 ****
--- 1127,1133 ----
      AT_ChangeOwner,                /* change owner */
      AT_ClusterOn,                /* CLUSTER ON */
      AT_DropCluster,                /* SET WITHOUT CLUSTER */
+     AT_AddOids,                    /* SET WITH OIDS */
      AT_DropOids,                /* SET WITHOUT OIDS */
      AT_SetTableSpace,            /* SET TABLESPACE */
      AT_SetRelOptions,            /* SET (...) -- AM specific parameters */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andrew Dunstan
Дата:
Сообщение: Re: Is a plan for lmza commpression in pg_dump
Следующее
От: Andrew Dunstan
Дата:
Сообщение: Re: WIP: fix SET WITHOUT OIDS, add SET WITH OIDS