Re: Visibility map, partial vacuums

Поиск
Список
Период
Сортировка
От Heikki Linnakangas
Тема Re: Visibility map, partial vacuums
Дата
Msg-id 491D7F52.6070908@enterprisedb.com
обсуждение исходный текст
Ответ на Re: Visibility map, partial vacuums  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Ответы Re: Visibility map, partial vacuums  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Список pgsql-hackers
Heikki Linnakangas wrote:
> So, I think I'll take this one step forward, and move RelationTruncate()
> to a new higher level file, e.g. src/backend/catalog/storage.c, and also
> create a new RelationCreateStorage() function that calls smgrcreate(),
> and move the WAL-logging from smgrcreate() to RelationCreateStorage().
>
> So, we'll have two functions in a new file:
>
> /* Create physical storage for a relation. If 'fsm' is true, an FSM fork
> is also created */
> RelationCreateStorage(Relation rel, bool fsm)
> /* Truncate the relation to 'nblocks' blocks. If 'fsm' is true, the FSM
> is also truncated */
> RelationTruncate(Relation rel, BlockNumber nblocks, bool fsm)
>
> The next question is whether the "pending rel deletion" stuff in smgr.c
> should be moved to the new file too. It seems like it would belong there
> better. That would leave smgr.c as a very thin wrapper around md.c

This new approach feels pretty good to me, attached is a patch to do
just that. Many of the functions formerly in smgr.c are now in
src/backend/catalog/storage.c, including all the WAL-logging and pending
rel deletion stuff. I kept their old names for now, though perhaps they
should be renamed now that they're above smgr level.

I also implemented Tom's idea of delaying creation of the FSM until it's
needed, not because of performance, but because it started to get quite
hairy to keep track of which relations should have a FSM and which
shouldn't. Creation of the FSM fork is now treated more like extending a
relation, as a non-WAL-logged operation, and it's up to freespace.c to
create the file when it's needed. There's no operation to explicitly
delete an individual fork of a relation, RelationCreateStorage only
creates the main fork, RelationDropStorage drops all forks, and
RelationTruncate truncates the FSM if and only if the FSM fork exists.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/gin/gininsert.c
--- src/backend/access/gin/gininsert.c
***************
*** 284,292 **** ginbuild(PG_FUNCTION_ARGS)
          elog(ERROR, "index \"%s\" already contains data",
               RelationGetRelationName(index));

-     /* Initialize FSM */
-     InitIndexFreeSpaceMap(index);
-
      initGinState(&buildstate.ginstate, index);

      /* initialize the root page */
--- 284,289 ----
*** src/backend/access/gin/ginvacuum.c
--- src/backend/access/gin/ginvacuum.c
***************
*** 16,21 ****
--- 16,22 ----

  #include "access/genam.h"
  #include "access/gin.h"
+ #include "catalog/storage.h"
  #include "commands/vacuum.h"
  #include "miscadmin.h"
  #include "storage/bufmgr.h"
***************
*** 757,763 **** ginvacuumcleanup(PG_FUNCTION_ARGS)
      if (info->vacuum_full && lastBlock > lastFilledBlock)
      {
          /* try to truncate index */
-         FreeSpaceMapTruncateRel(index, lastFilledBlock + 1);
          RelationTruncate(index, lastFilledBlock + 1);

          stats->pages_removed = lastBlock - lastFilledBlock;
--- 758,763 ----
*** src/backend/access/gist/gist.c
--- src/backend/access/gist/gist.c
***************
*** 103,111 **** gistbuild(PG_FUNCTION_ARGS)
          elog(ERROR, "index \"%s\" already contains data",
               RelationGetRelationName(index));

-     /* Initialize FSM */
-     InitIndexFreeSpaceMap(index);
-
      /* no locking is needed */
      initGISTstate(&buildstate.giststate, index);

--- 103,108 ----
*** src/backend/access/gist/gistvacuum.c
--- src/backend/access/gist/gistvacuum.c
***************
*** 16,21 ****
--- 16,22 ----

  #include "access/genam.h"
  #include "access/gist_private.h"
+ #include "catalog/storage.h"
  #include "commands/vacuum.h"
  #include "miscadmin.h"
  #include "storage/bufmgr.h"
***************
*** 603,609 **** gistvacuumcleanup(PG_FUNCTION_ARGS)

      if (info->vacuum_full && lastFilledBlock < lastBlock)
      {                            /* try to truncate index */
-         FreeSpaceMapTruncateRel(rel, lastFilledBlock + 1);
          RelationTruncate(rel, lastFilledBlock + 1);

          stats->std.pages_removed = lastBlock - lastFilledBlock;
--- 604,609 ----
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 4863,4870 **** heap_sync(Relation rel)
      /* FlushRelationBuffers will have opened rd_smgr */
      smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM);

!     /* sync FSM as well */
!     smgrimmedsync(rel->rd_smgr, FSM_FORKNUM);

      /* toast heap, if any */
      if (OidIsValid(rel->rd_rel->reltoastrelid))
--- 4863,4869 ----
      /* FlushRelationBuffers will have opened rd_smgr */
      smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM);

!     /* FSM is not critical, don't bother syncing it */

      /* toast heap, if any */
      if (OidIsValid(rel->rd_rel->reltoastrelid))
***************
*** 4874,4880 **** heap_sync(Relation rel)
          toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
          FlushRelationBuffers(toastrel);
          smgrimmedsync(toastrel->rd_smgr, MAIN_FORKNUM);
-         smgrimmedsync(toastrel->rd_smgr, FSM_FORKNUM);
          heap_close(toastrel, AccessShareLock);
      }
  }
--- 4873,4878 ----
*** src/backend/access/nbtree/nbtree.c
--- src/backend/access/nbtree/nbtree.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "access/nbtree.h"
  #include "access/relscan.h"
  #include "catalog/index.h"
+ #include "catalog/storage.h"
  #include "commands/vacuum.h"
  #include "miscadmin.h"
  #include "storage/bufmgr.h"
***************
*** 109,117 **** btbuild(PG_FUNCTION_ARGS)
          elog(ERROR, "index \"%s\" already contains data",
               RelationGetRelationName(index));

-     /* Initialize FSM */
-     InitIndexFreeSpaceMap(index);
-
      buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique, false);

      /*
--- 110,115 ----
***************
*** 696,702 **** btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
          /*
           * Okay to truncate.
           */
-         FreeSpaceMapTruncateRel(rel, new_pages);
          RelationTruncate(rel, new_pages);

          /* update statistics */
--- 694,699 ----
*** src/backend/access/transam/rmgr.c
--- src/backend/access/transam/rmgr.c
***************
*** 31,37 **** const RmgrData RmgrTable[RM_MAX_ID + 1] = {
      {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},
      {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
      {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
!     {"FreeSpaceMap", fsm_redo, fsm_desc, NULL, NULL, NULL},
      {"Reserved 8", NULL, NULL, NULL, NULL, NULL},
      {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
      {"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
--- 31,37 ----
      {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},
      {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},
      {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},
!     {"Reserved 7", NULL, NULL, NULL, NULL, NULL},
      {"Reserved 8", NULL, NULL, NULL, NULL, NULL},
      {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},
      {"Heap", heap_redo, heap_desc, NULL, NULL, NULL},
*** src/backend/access/transam/twophase.c
--- src/backend/access/transam/twophase.c
***************
*** 48,54 ****
--- 48,56 ----
  #include "access/twophase.h"
  #include "access/twophase_rmgr.h"
  #include "access/xact.h"
+ #include "access/xlogutils.h"
  #include "catalog/pg_type.h"
+ #include "catalog/storage.h"
  #include "funcapi.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
***************
*** 141,152 **** static void RecordTransactionCommitPrepared(TransactionId xid,
                                  int nchildren,
                                  TransactionId *children,
                                  int nrels,
!                                 RelFileFork *rels);
  static void RecordTransactionAbortPrepared(TransactionId xid,
                                 int nchildren,
                                 TransactionId *children,
                                 int nrels,
!                                RelFileFork *rels);
  static void ProcessRecords(char *bufptr, TransactionId xid,
                 const TwoPhaseCallback callbacks[]);

--- 143,154 ----
                                  int nchildren,
                                  TransactionId *children,
                                  int nrels,
!                                 RelFileNode *rels);
  static void RecordTransactionAbortPrepared(TransactionId xid,
                                 int nchildren,
                                 TransactionId *children,
                                 int nrels,
!                                RelFileNode *rels);
  static void ProcessRecords(char *bufptr, TransactionId xid,
                 const TwoPhaseCallback callbacks[]);

***************
*** 793,800 **** StartPrepare(GlobalTransaction gxact)
      TransactionId xid = gxact->proc.xid;
      TwoPhaseFileHeader hdr;
      TransactionId *children;
!     RelFileFork *commitrels;
!     RelFileFork *abortrels;

      /* Initialize linked list */
      records.head = palloc0(sizeof(XLogRecData));
--- 795,802 ----
      TransactionId xid = gxact->proc.xid;
      TwoPhaseFileHeader hdr;
      TransactionId *children;
!     RelFileNode *commitrels;
!     RelFileNode *abortrels;

      /* Initialize linked list */
      records.head = palloc0(sizeof(XLogRecData));
***************
*** 832,843 **** StartPrepare(GlobalTransaction gxact)
      }
      if (hdr.ncommitrels > 0)
      {
!         save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileFork));
          pfree(commitrels);
      }
      if (hdr.nabortrels > 0)
      {
!         save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileFork));
          pfree(abortrels);
      }
  }
--- 834,845 ----
      }
      if (hdr.ncommitrels > 0)
      {
!         save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
          pfree(commitrels);
      }
      if (hdr.nabortrels > 0)
      {
!         save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
          pfree(abortrels);
      }
  }
***************
*** 1140,1147 **** FinishPreparedTransaction(const char *gid, bool isCommit)
      TwoPhaseFileHeader *hdr;
      TransactionId latestXid;
      TransactionId *children;
!     RelFileFork *commitrels;
!     RelFileFork *abortrels;
      int            i;

      /*
--- 1142,1151 ----
      TwoPhaseFileHeader *hdr;
      TransactionId latestXid;
      TransactionId *children;
!     RelFileNode *commitrels;
!     RelFileNode *abortrels;
!     RelFileNode *delrels;
!     int            ndelrels;
      int            i;

      /*
***************
*** 1169,1178 **** FinishPreparedTransaction(const char *gid, bool isCommit)
      bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
      children = (TransactionId *) bufptr;
      bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
!     commitrels = (RelFileFork *) bufptr;
!     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileFork));
!     abortrels = (RelFileFork *) bufptr;
!     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileFork));

      /* compute latestXid among all children */
      latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
--- 1173,1182 ----
      bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
      children = (TransactionId *) bufptr;
      bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
!     commitrels = (RelFileNode *) bufptr;
!     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
!     abortrels = (RelFileNode *) bufptr;
!     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));

      /* compute latestXid among all children */
      latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
***************
*** 1214,1234 **** FinishPreparedTransaction(const char *gid, bool isCommit)
       */
      if (isCommit)
      {
!         for (i = 0; i < hdr->ncommitrels; i++)
!         {
!             SMgrRelation srel = smgropen(commitrels[i].rnode);
!             smgrdounlink(srel, commitrels[i].forknum, false, false);
!             smgrclose(srel);
!         }
      }
      else
      {
!         for (i = 0; i < hdr->nabortrels; i++)
          {
!             SMgrRelation srel = smgropen(abortrels[i].rnode);
!             smgrdounlink(srel, abortrels[i].forknum, false, false);
!             smgrclose(srel);
          }
      }

      /* And now do the callbacks */
--- 1218,1245 ----
       */
      if (isCommit)
      {
!         delrels = commitrels;
!         ndelrels = hdr->ncommitrels;
      }
      else
      {
!         delrels = abortrels;
!         ndelrels = hdr->nabortrels;
!     }
!     for (i = 0; i < ndelrels; i++)
!     {
!         SMgrRelation srel = smgropen(delrels[i]);
!         ForkNumber    fork;
!
!         for (fork = 0; fork <= MAX_FORKNUM; fork++)
          {
!             if (smgrexists(srel, fork))
!             {
!                 XLogDropRelation(delrels[i], fork);
!                 smgrdounlink(srel, fork, false, true);
!             }
          }
+         smgrclose(srel);
      }

      /* And now do the callbacks */
***************
*** 1639,1646 **** RecoverPreparedTransactions(void)
              bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
              subxids = (TransactionId *) bufptr;
              bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
!             bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileFork));
!             bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileFork));

              /*
               * Reconstruct subtrans state for the transaction --- needed
--- 1650,1657 ----
              bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
              subxids = (TransactionId *) bufptr;
              bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
!             bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
!             bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));

              /*
               * Reconstruct subtrans state for the transaction --- needed
***************
*** 1693,1699 **** RecordTransactionCommitPrepared(TransactionId xid,
                                  int nchildren,
                                  TransactionId *children,
                                  int nrels,
!                                 RelFileFork *rels)
  {
      XLogRecData rdata[3];
      int            lastrdata = 0;
--- 1704,1710 ----
                                  int nchildren,
                                  TransactionId *children,
                                  int nrels,
!                                 RelFileNode *rels)
  {
      XLogRecData rdata[3];
      int            lastrdata = 0;
***************
*** 1718,1724 **** RecordTransactionCommitPrepared(TransactionId xid,
      {
          rdata[0].next = &(rdata[1]);
          rdata[1].data = (char *) rels;
!         rdata[1].len = nrels * sizeof(RelFileFork);
          rdata[1].buffer = InvalidBuffer;
          lastrdata = 1;
      }
--- 1729,1735 ----
      {
          rdata[0].next = &(rdata[1]);
          rdata[1].data = (char *) rels;
!         rdata[1].len = nrels * sizeof(RelFileNode);
          rdata[1].buffer = InvalidBuffer;
          lastrdata = 1;
      }
***************
*** 1766,1772 **** RecordTransactionAbortPrepared(TransactionId xid,
                                 int nchildren,
                                 TransactionId *children,
                                 int nrels,
!                                RelFileFork *rels)
  {
      XLogRecData rdata[3];
      int            lastrdata = 0;
--- 1777,1783 ----
                                 int nchildren,
                                 TransactionId *children,
                                 int nrels,
!                                RelFileNode *rels)
  {
      XLogRecData rdata[3];
      int            lastrdata = 0;
***************
*** 1796,1802 **** RecordTransactionAbortPrepared(TransactionId xid,
      {
          rdata[0].next = &(rdata[1]);
          rdata[1].data = (char *) rels;
!         rdata[1].len = nrels * sizeof(RelFileFork);
          rdata[1].buffer = InvalidBuffer;
          lastrdata = 1;
      }
--- 1807,1813 ----
      {
          rdata[0].next = &(rdata[1]);
          rdata[1].data = (char *) rels;
!         rdata[1].len = nrels * sizeof(RelFileNode);
          rdata[1].buffer = InvalidBuffer;
          lastrdata = 1;
      }
*** src/backend/access/transam/xact.c
--- src/backend/access/transam/xact.c
***************
*** 28,33 ****
--- 28,34 ----
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
  #include "catalog/namespace.h"
+ #include "catalog/storage.h"
  #include "commands/async.h"
  #include "commands/tablecmds.h"
  #include "commands/trigger.h"
***************
*** 819,825 **** RecordTransactionCommit(void)
      bool        markXidCommitted = TransactionIdIsValid(xid);
      TransactionId latestXid = InvalidTransactionId;
      int            nrels;
!     RelFileFork *rels;
      bool        haveNonTemp;
      int            nchildren;
      TransactionId *children;
--- 820,826 ----
      bool        markXidCommitted = TransactionIdIsValid(xid);
      TransactionId latestXid = InvalidTransactionId;
      int            nrels;
!     RelFileNode *rels;
      bool        haveNonTemp;
      int            nchildren;
      TransactionId *children;
***************
*** 900,906 **** RecordTransactionCommit(void)
          {
              rdata[0].next = &(rdata[1]);
              rdata[1].data = (char *) rels;
!             rdata[1].len = nrels * sizeof(RelFileFork);
              rdata[1].buffer = InvalidBuffer;
              lastrdata = 1;
          }
--- 901,907 ----
          {
              rdata[0].next = &(rdata[1]);
              rdata[1].data = (char *) rels;
!             rdata[1].len = nrels * sizeof(RelFileNode);
              rdata[1].buffer = InvalidBuffer;
              lastrdata = 1;
          }
***************
*** 1165,1171 **** RecordTransactionAbort(bool isSubXact)
      TransactionId xid = GetCurrentTransactionIdIfAny();
      TransactionId latestXid;
      int            nrels;
!     RelFileFork *rels;
      int            nchildren;
      TransactionId *children;
      XLogRecData rdata[3];
--- 1166,1172 ----
      TransactionId xid = GetCurrentTransactionIdIfAny();
      TransactionId latestXid;
      int            nrels;
!     RelFileNode *rels;
      int            nchildren;
      TransactionId *children;
      XLogRecData rdata[3];
***************
*** 1226,1232 **** RecordTransactionAbort(bool isSubXact)
      {
          rdata[0].next = &(rdata[1]);
          rdata[1].data = (char *) rels;
!         rdata[1].len = nrels * sizeof(RelFileFork);
          rdata[1].buffer = InvalidBuffer;
          lastrdata = 1;
      }
--- 1227,1233 ----
      {
          rdata[0].next = &(rdata[1]);
          rdata[1].data = (char *) rels;
!         rdata[1].len = nrels * sizeof(RelFileNode);
          rdata[1].buffer = InvalidBuffer;
          lastrdata = 1;
      }
***************
*** 2078,2084 **** AbortTransaction(void)
      AtEOXact_xml();
      AtEOXact_on_commit_actions(false);
      AtEOXact_Namespace(false);
-     smgrabort();
      AtEOXact_Files();
      AtEOXact_ComboCid();
      AtEOXact_HashTables(false);
--- 2079,2084 ----
***************
*** 4239,4250 **** xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
      /* Make sure files supposed to be dropped are dropped */
      for (i = 0; i < xlrec->nrels; i++)
      {
!         SMgrRelation srel;

!         XLogDropRelation(xlrec->xnodes[i].rnode, xlrec->xnodes[i].forknum);
!
!         srel = smgropen(xlrec->xnodes[i].rnode);
!         smgrdounlink(srel, xlrec->xnodes[i].forknum, false, true);
          smgrclose(srel);
      }
  }
--- 4239,4255 ----
      /* Make sure files supposed to be dropped are dropped */
      for (i = 0; i < xlrec->nrels; i++)
      {
!         SMgrRelation srel = smgropen(xlrec->xnodes[i]);
!         ForkNumber fork;

!         for (fork = 0; fork <= MAX_FORKNUM; fork++)
!         {
!             if (smgrexists(srel, fork))
!             {
!                 XLogDropRelation(xlrec->xnodes[i], fork);
!                 smgrdounlink(srel, fork, false, true);
!             }
!         }
          smgrclose(srel);
      }
  }
***************
*** 4277,4288 **** xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
      /* Make sure files supposed to be dropped are dropped */
      for (i = 0; i < xlrec->nrels; i++)
      {
!         SMgrRelation srel;

!         XLogDropRelation(xlrec->xnodes[i].rnode, xlrec->xnodes[i].forknum);
!
!         srel = smgropen(xlrec->xnodes[i].rnode);
!         smgrdounlink(srel, xlrec->xnodes[i].forknum, false, true);
          smgrclose(srel);
      }
  }
--- 4282,4298 ----
      /* Make sure files supposed to be dropped are dropped */
      for (i = 0; i < xlrec->nrels; i++)
      {
!         SMgrRelation srel = smgropen(xlrec->xnodes[i]);
!         ForkNumber fork;

!         for (fork = 0; fork <= MAX_FORKNUM; fork++)
!         {
!             if (smgrexists(srel, fork))
!             {
!                 XLogDropRelation(xlrec->xnodes[i], fork);
!                 smgrdounlink(srel, fork, false, true);
!             }
!         }
          smgrclose(srel);
      }
  }
***************
*** 4339,4346 **** xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
          appendStringInfo(buf, "; rels:");
          for (i = 0; i < xlrec->nrels; i++)
          {
!             char *path = relpath(xlrec->xnodes[i].rnode,
!                                  xlrec->xnodes[i].forknum);
              appendStringInfo(buf, " %s", path);
              pfree(path);
          }
--- 4349,4355 ----
          appendStringInfo(buf, "; rels:");
          for (i = 0; i < xlrec->nrels; i++)
          {
!             char *path = relpath(xlrec->xnodes[i], MAIN_FORKNUM);
              appendStringInfo(buf, " %s", path);
              pfree(path);
          }
***************
*** 4367,4374 **** xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
          appendStringInfo(buf, "; rels:");
          for (i = 0; i < xlrec->nrels; i++)
          {
!             char *path = relpath(xlrec->xnodes[i].rnode,
!                                  xlrec->xnodes[i].forknum);
              appendStringInfo(buf, " %s", path);
              pfree(path);
          }
--- 4376,4382 ----
          appendStringInfo(buf, "; rels:");
          for (i = 0; i < xlrec->nrels; i++)
          {
!             char *path = relpath(xlrec->xnodes[i], MAIN_FORKNUM);
              appendStringInfo(buf, " %s", path);
              pfree(path);
          }
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 273,279 **** XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
       * filesystem loses an inode during a crash.  Better to write the data
       * until we are actually told to delete the file.)
       */
!     smgrcreate(smgr, forknum, false, true);

      lastblock = smgrnblocks(smgr, forknum);

--- 273,279 ----
       * filesystem loses an inode during a crash.  Better to write the data
       * until we are actually told to delete the file.)
       */
!     smgrcreate(smgr, forknum, true);

      lastblock = smgrnblocks(smgr, forknum);

*** src/backend/catalog/Makefile
--- src/backend/catalog/Makefile
***************
*** 13,19 **** include $(top_builddir)/src/Makefile.global
  OBJS = catalog.o dependency.o heap.o index.o indexing.o namespace.o aclchk.o \
         pg_aggregate.o pg_constraint.o pg_conversion.o pg_depend.o pg_enum.o \
         pg_largeobject.o pg_namespace.o pg_operator.o pg_proc.o pg_shdepend.o \
!        pg_type.o toasting.o

  BKIFILES = postgres.bki postgres.description postgres.shdescription

--- 13,19 ----
  OBJS = catalog.o dependency.o heap.o index.o indexing.o namespace.o aclchk.o \
         pg_aggregate.o pg_constraint.o pg_conversion.o pg_depend.o pg_enum.o \
         pg_largeobject.o pg_namespace.o pg_operator.o pg_proc.o pg_shdepend.o \
!        pg_type.o storage.o toasting.o

  BKIFILES = postgres.bki postgres.description postgres.shdescription

*** src/backend/catalog/heap.c
--- src/backend/catalog/heap.c
***************
*** 47,52 ****
--- 47,53 ----
  #include "catalog/pg_tablespace.h"
  #include "catalog/pg_type.h"
  #include "catalog/pg_type_fn.h"
+ #include "catalog/storage.h"
  #include "commands/tablecmds.h"
  #include "commands/typecmds.h"
  #include "miscadmin.h"
***************
*** 295,317 **** heap_create(const char *relname,
      /*
       * Have the storage manager create the relation's disk file, if needed.
       *
!      * We create storage for the main fork here, and also for the FSM for a
!      * heap or toast relation. The caller is responsible for creating any
!      * additional forks if needed.
       */
      if (create_storage)
!     {
!         Assert(rel->rd_smgr == NULL);
!         RelationOpenSmgr(rel);
!         smgrcreate(rel->rd_smgr, MAIN_FORKNUM, rel->rd_istemp, false);
!
!         /*
!          * For a real heap, create FSM fork as well. Indexams are
!          * responsible for creating any extra forks themselves.
!          */
!         if (relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE)
!             smgrcreate(rel->rd_smgr, FSM_FORKNUM, rel->rd_istemp, false);
!     }

      return rel;
  }
--- 296,306 ----
      /*
       * Have the storage manager create the relation's disk file, if needed.
       *
!      * We only create the main fork here, the other forks will be created
!      * on-demand.
       */
      if (create_storage)
!         RelationCreateStorage(rel->rd_node, rel->rd_istemp);

      return rel;
  }
***************
*** 1426,1438 **** heap_drop_with_catalog(Oid relid)
      if (rel->rd_rel->relkind != RELKIND_VIEW &&
          rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
      {
!         ForkNumber forknum;
!
!         RelationOpenSmgr(rel);
!         for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
!             if (smgrexists(rel->rd_smgr, forknum))
!                 smgrscheduleunlink(rel->rd_smgr, forknum, rel->rd_istemp);
!         RelationCloseSmgr(rel);
      }

      /*
--- 1415,1421 ----
      if (rel->rd_rel->relkind != RELKIND_VIEW &&
          rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
      {
!         RelationDropStorage(rel);
      }

      /*
***************
*** 2348,2354 **** heap_truncate(List *relids)
          Relation    rel = lfirst(cell);

          /* Truncate the FSM and actual file (and discard buffers) */
-         FreeSpaceMapTruncateRel(rel, 0);
          RelationTruncate(rel, 0);

          /* If this relation has indexes, truncate the indexes too */
--- 2331,2336 ----
*** src/backend/catalog/index.c
--- src/backend/catalog/index.c
***************
*** 41,46 ****
--- 41,47 ----
  #include "catalog/pg_opclass.h"
  #include "catalog/pg_tablespace.h"
  #include "catalog/pg_type.h"
+ #include "catalog/storage.h"
  #include "commands/tablecmds.h"
  #include "executor/executor.h"
  #include "miscadmin.h"
***************
*** 897,903 **** index_drop(Oid indexId)
      Relation    indexRelation;
      HeapTuple    tuple;
      bool        hasexprs;
-     ForkNumber    forknum;

      /*
       * To drop an index safely, we must grab exclusive lock on its parent
--- 898,903 ----
***************
*** 918,929 **** index_drop(Oid indexId)
      /*
       * Schedule physical removal of the files
       */
!     RelationOpenSmgr(userIndexRelation);
!     for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
!         if (smgrexists(userIndexRelation->rd_smgr, forknum))
!             smgrscheduleunlink(userIndexRelation->rd_smgr, forknum,
!                                userIndexRelation->rd_istemp);
!     RelationCloseSmgr(userIndexRelation);

      /*
       * Close and flush the index's relcache entry, to ensure relcache doesn't
--- 918,924 ----
      /*
       * Schedule physical removal of the files
       */
!     RelationDropStorage(userIndexRelation);

      /*
       * Close and flush the index's relcache entry, to ensure relcache doesn't
***************
*** 1283,1293 **** setNewRelfilenode(Relation relation, TransactionId freezeXid)
  {
      Oid            newrelfilenode;
      RelFileNode newrnode;
-     SMgrRelation srel;
      Relation    pg_class;
      HeapTuple    tuple;
      Form_pg_class rd_rel;
-     ForkNumber    i;

      /* Can't change relfilenode for nailed tables (indexes ok though) */
      Assert(!relation->rd_isnailed ||
--- 1278,1286 ----
***************
*** 1318,1325 **** setNewRelfilenode(Relation relation, TransactionId freezeXid)
               RelationGetRelid(relation));
      rd_rel = (Form_pg_class) GETSTRUCT(tuple);

-     RelationOpenSmgr(relation);
-
      /*
       * ... and create storage for corresponding forks in the new relfilenode.
       *
--- 1311,1316 ----
***************
*** 1327,1354 **** setNewRelfilenode(Relation relation, TransactionId freezeXid)
       */
      newrnode = relation->rd_node;
      newrnode.relNode = newrelfilenode;
-     srel = smgropen(newrnode);
-
-     /* Create the main fork, like heap_create() does */
-     smgrcreate(srel, MAIN_FORKNUM, relation->rd_istemp, false);

      /*
!      * For a heap, create FSM fork as well. Indexams are responsible for
!      * creating any extra forks themselves.
       */
!     if (relation->rd_rel->relkind == RELKIND_RELATION ||
!         relation->rd_rel->relkind == RELKIND_TOASTVALUE)
!         smgrcreate(srel, FSM_FORKNUM, relation->rd_istemp, false);
!
!     /* schedule unlinking old files */
!     for (i = 0; i <= MAX_FORKNUM; i++)
!     {
!         if (smgrexists(relation->rd_smgr, i))
!             smgrscheduleunlink(relation->rd_smgr, i, relation->rd_istemp);
!     }
!
!     smgrclose(srel);
!     RelationCloseSmgr(relation);

      /* update the pg_class row */
      rd_rel->relfilenode = newrelfilenode;
--- 1318,1330 ----
       */
      newrnode = relation->rd_node;
      newrnode.relNode = newrelfilenode;

      /*
!      * Create the main fork, like heap_create() does, and drop the old
!      * storage.
       */
!     RelationCreateStorage(newrnode, relation->rd_istemp);
!     RelationDropStorage(relation);

      /* update the pg_class row */
      rd_rel->relfilenode = newrelfilenode;
***************
*** 2326,2333 **** reindex_index(Oid indexId)
          if (inplace)
          {
              /*
!              * Truncate the actual file (and discard buffers). The indexam
!              * is responsible for truncating the FSM, if applicable
               */
              RelationTruncate(iRel, 0);
          }
--- 2302,2308 ----
          if (inplace)
          {
              /*
!              * Truncate the actual file (and discard buffers).
               */
              RelationTruncate(iRel, 0);
          }
*** /dev/null
--- src/backend/catalog/storage.c
***************
*** 0 ****
--- 1,460 ----
+ /*-------------------------------------------------------------------------
+  *
+  * storage.c
+  *      code to create and destroy physical storage for relations
+  *
+  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *      $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+
+ #include "postgres.h"
+
+ #include "access/xact.h"
+ #include "access/xlogutils.h"
+ #include "catalog/catalog.h"
+ #include "catalog/storage.h"
+ #include "storage/freespace.h"
+ #include "storage/smgr.h"
+ #include "utils/memutils.h"
+ #include "utils/rel.h"
+
+ /*
+  * We keep a list of all relations (represented as RelFileNode values)
+  * that have been created or deleted in the current transaction.  When
+  * a relation is created, we create the physical file immediately, but
+  * remember it so that we can delete the file again if the current
+  * transaction is aborted.    Conversely, a deletion request is NOT
+  * executed immediately, but is just entered in the list.  When and if
+  * the transaction commits, we can delete the physical file.
+  *
+  * To handle subtransactions, every entry is marked with its transaction
+  * nesting level.  At subtransaction commit, we reassign the subtransaction's
+  * entries to the parent nesting level.  At subtransaction abort, we can
+  * immediately execute the abort-time actions for all entries of the current
+  * nesting level.
+  *
+  * NOTE: the list is kept in TopMemoryContext to be sure it won't disappear
+  * unbetimes.  It'd probably be OK to keep it in TopTransactionContext,
+  * but I'm being paranoid.
+  */
+
+ typedef struct PendingRelDelete
+ {
+     RelFileNode relnode;        /* relation that may need to be deleted */
+     bool        isTemp;            /* is it a temporary relation? */
+     bool        atCommit;        /* T=delete at commit; F=delete at abort */
+     int            nestLevel;        /* xact nesting level of request */
+     struct PendingRelDelete *next;        /* linked-list link */
+ } PendingRelDelete;
+
+ static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+
+ /*
+  * Declarations for smgr-related XLOG records
+  *
+  * Note: we log file creation and truncation here, but logging of deletion
+  * actions is handled by xact.c, because it is part of transaction commit.
+  */
+
+ /* XLOG gives us high 4 bits */
+ #define XLOG_SMGR_CREATE    0x10
+ #define XLOG_SMGR_TRUNCATE    0x20
+
+ typedef struct xl_smgr_create
+ {
+     RelFileNode rnode;
+ } xl_smgr_create;
+
+ typedef struct xl_smgr_truncate
+ {
+     BlockNumber blkno;
+     RelFileNode rnode;
+ } xl_smgr_truncate;
+
+
+ /*
+  * RelationCreateStorage
+  *        Create physical storage for a relation.
+  *
+  * Create the underlying disk file storage for the relation. This only
+  * creates the main fork; additional forks are created lazily by the
+  * modules that need them.
+  *
+  * This function is transactional. The creation is WAL-logged, and if the
+  * transaction aborts later on, the storage will be destroyed.
+  */
+ void
+ RelationCreateStorage(RelFileNode rnode, bool istemp)
+ {
+     PendingRelDelete *pending;
+
+     XLogRecPtr    lsn;
+     XLogRecData rdata;
+     xl_smgr_create xlrec;
+     SMgrRelation srel;
+
+     srel = smgropen(rnode);
+     smgrcreate(srel, MAIN_FORKNUM, false);
+
+     smgrclose(srel);
+
+     if (istemp)
+     {
+         /*
+          * Make an XLOG entry showing the file creation.  If we abort, the file
+          * will be dropped at abort time.
+          */
+         xlrec.rnode = rnode;
+
+         rdata.data = (char *) &xlrec;
+         rdata.len = sizeof(xlrec);
+         rdata.buffer = InvalidBuffer;
+         rdata.next = NULL;
+
+         lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE, &rdata);
+     }
+
+     /* Add the relation to the list of stuff to delete at abort */
+     pending = (PendingRelDelete *)
+         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+     pending->relnode = rnode;
+     pending->isTemp = istemp;
+     pending->atCommit = false;    /* delete if abort */
+     pending->nestLevel = GetCurrentTransactionNestLevel();
+     pending->next = pendingDeletes;
+     pendingDeletes = pending;
+ }
+
+ /*
+  * RelationDropStorage
+  *        Schedule unlinking of physical storage at transaction commit.
+  */
+ void
+ RelationDropStorage(Relation rel)
+ {
+     PendingRelDelete *pending;
+
+     /* Add the relation to the list of stuff to delete at commit */
+     pending = (PendingRelDelete *)
+         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+     pending->relnode = rel->rd_node;
+     pending->isTemp = rel->rd_istemp;
+     pending->atCommit = true;    /* delete if commit */
+     pending->nestLevel = GetCurrentTransactionNestLevel();
+     pending->next = pendingDeletes;
+     pendingDeletes = pending;
+
+     /*
+      * NOTE: if the relation was created in this transaction, it will now be
+      * present in the pending-delete list twice, once with atCommit true and
+      * once with atCommit false.  Hence, it will be physically deleted at end
+      * of xact in either case (and the other entry will be ignored by
+      * smgrDoPendingDeletes, so no error will occur).  We could instead remove
+      * the existing list entry and delete the physical file immediately, but
+      * for now I'll keep the logic simple.
+      */
+
+     RelationCloseSmgr(rel);
+ }
+
+ /*
+  * RelationTruncate
+  *        Physically truncate a relation to the specified number of blocks.
+  *
+  * This includes getting rid of any buffers for the blocks that are to be
+  * dropped. If 'fsm' is true, the FSM of the relation is truncated as well.
+  */
+ void
+ RelationTruncate(Relation rel, BlockNumber nblocks)
+ {
+     bool fsm;
+
+     /* Open it at the smgr level if not already done */
+     RelationOpenSmgr(rel);
+
+     /* Make sure rd_targblock isn't pointing somewhere past end */
+     rel->rd_targblock = InvalidBlockNumber;
+
+     /* Truncate the FSM too if it exists. */
+     fsm = smgrexists(rel->rd_smgr, FSM_FORKNUM);
+     if (fsm)
+         FreeSpaceMapTruncateRel(rel, nblocks);
+
+     /*
+      * We WAL-log the truncation before actually truncating, which
+      * means trouble if the truncation fails. If we then crash, the WAL
+      * replay likely isn't going to succeed in the truncation either, and
+      * cause a PANIC. It's tempting to put a critical section here, but
+      * that cure would be worse than the disease. It would turn a usually
+      * harmless failure to truncate, that could spell trouble at WAL replay,
+      * into a certain PANIC.
+      */
+     if (rel->rd_istemp)
+     {
+         /*
+          * Make an XLOG entry showing the file truncation.
+          */
+         XLogRecPtr    lsn;
+         XLogRecData rdata;
+         xl_smgr_truncate xlrec;
+
+         xlrec.blkno = nblocks;
+         xlrec.rnode = rel->rd_node;
+
+         rdata.data = (char *) &xlrec;
+         rdata.len = sizeof(xlrec);
+         rdata.buffer = InvalidBuffer;
+         rdata.next = NULL;
+
+         lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE, &rdata);
+
+         /*
+          * Flush, because otherwise the truncation of the main relation
+          * might hit the disk before the WAL record of truncating the
+          * FSM is flushed. If we crashed during that window, we'd be
+          * left with a truncated heap, without a truncated FSM.
+          */
+         if (fsm)
+             XLogFlush(lsn);
+     }
+
+     /* Do the real work */
+     smgrtruncate(rel->rd_smgr, MAIN_FORKNUM, nblocks, rel->rd_istemp);
+ }
+
+ /*
+  *    smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
+  *
+  * This also runs when aborting a subxact; we want to clean up a failed
+  * subxact immediately.
+  */
+ void
+ smgrDoPendingDeletes(bool isCommit)
+ {
+     int            nestLevel = GetCurrentTransactionNestLevel();
+     PendingRelDelete *pending;
+     PendingRelDelete *prev;
+     PendingRelDelete *next;
+
+     prev = NULL;
+     for (pending = pendingDeletes; pending != NULL; pending = next)
+     {
+         next = pending->next;
+         if (pending->nestLevel < nestLevel)
+         {
+             /* outer-level entries should not be processed yet */
+             prev = pending;
+         }
+         else
+         {
+             /* unlink list entry first, so we don't retry on failure */
+             if (prev)
+                 prev->next = next;
+             else
+                 pendingDeletes = next;
+             /* do deletion if called for */
+             if (pending->atCommit == isCommit)
+             {
+                 int i;
+
+                 /* schedule unlinking old files */
+                 SMgrRelation srel;
+
+                 srel = smgropen(pending->relnode);
+                 for (i = 0; i <= MAX_FORKNUM; i++)
+                 {
+                     if (smgrexists(srel, i))
+                         smgrdounlink(srel,
+                                      i,
+                                      pending->isTemp,
+                                      false);
+                 }
+                 smgrclose(srel);
+             }
+             /* must explicitly free the list entry */
+             pfree(pending);
+             /* prev does not change */
+         }
+     }
+ }
+
+ /*
+  * smgrGetPendingDeletes() -- Get a list of relations to be deleted.
+  *
+  * The return value is the number of relations scheduled for termination.
+  * *ptr is set to point to a freshly-palloc'd array of RelFileForks.
+  * If there are no relations to be deleted, *ptr is set to NULL.
+  *
+  * If haveNonTemp isn't NULL, the bool it points to gets set to true if
+  * there is any non-temp table pending to be deleted; false if not.
+  *
+  * Note that the list does not include anything scheduled for termination
+  * by upper-level transactions.
+  */
+ int
+ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr, bool *haveNonTemp)
+ {
+     int            nestLevel = GetCurrentTransactionNestLevel();
+     int            nrels;
+     RelFileNode *rptr;
+     PendingRelDelete *pending;
+
+     nrels = 0;
+     if (haveNonTemp)
+         *haveNonTemp = false;
+     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
+     {
+         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit)
+             nrels++;
+     }
+     if (nrels == 0)
+     {
+         *ptr = NULL;
+         return 0;
+     }
+     rptr = (RelFileNode *) palloc(nrels * sizeof(RelFileNode));
+     *ptr = rptr;
+     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
+     {
+         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit)
+         {
+             *rptr = pending->relnode;
+             rptr++;
+         }
+         if (haveNonTemp && !pending->isTemp)
+             *haveNonTemp = true;
+     }
+     return nrels;
+ }
+
+ /*
+  *    PostPrepare_smgr -- Clean up after a successful PREPARE
+  *
+  * What we have to do here is throw away the in-memory state about pending
+  * relation deletes.  It's all been recorded in the 2PC state file and
+  * it's no longer smgr's job to worry about it.
+  */
+ void
+ PostPrepare_smgr(void)
+ {
+     PendingRelDelete *pending;
+     PendingRelDelete *next;
+
+     for (pending = pendingDeletes; pending != NULL; pending = next)
+     {
+         next = pending->next;
+         pendingDeletes = next;
+         /* must explicitly free the list entry */
+         pfree(pending);
+     }
+ }
+
+
+ /*
+  * AtSubCommit_smgr() --- Take care of subtransaction commit.
+  *
+  * Reassign all items in the pending-deletes list to the parent transaction.
+  */
+ void
+ AtSubCommit_smgr(void)
+ {
+     int            nestLevel = GetCurrentTransactionNestLevel();
+     PendingRelDelete *pending;
+
+     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
+     {
+         if (pending->nestLevel >= nestLevel)
+             pending->nestLevel = nestLevel - 1;
+     }
+ }
+
+ /*
+  * AtSubAbort_smgr() --- Take care of subtransaction abort.
+  *
+  * Delete created relations and forget about deleted relations.
+  * We can execute these operations immediately because we know this
+  * subtransaction will not commit.
+  */
+ void
+ AtSubAbort_smgr(void)
+ {
+     smgrDoPendingDeletes(false);
+ }
+
+ void
+ smgr_redo(XLogRecPtr lsn, XLogRecord *record)
+ {
+     uint8        info = record->xl_info & ~XLR_INFO_MASK;
+
+     if (info == XLOG_SMGR_CREATE)
+     {
+         xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(record);
+         SMgrRelation reln;
+
+         reln = smgropen(xlrec->rnode);
+         smgrcreate(reln, MAIN_FORKNUM, true);
+     }
+     else if (info == XLOG_SMGR_TRUNCATE)
+     {
+         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
+         SMgrRelation reln;
+
+         reln = smgropen(xlrec->rnode);
+
+         /*
+          * Forcibly create relation if it doesn't exist (which suggests that
+          * it was dropped somewhere later in the WAL sequence).  As in
+          * XLogOpenRelation, we prefer to recreate the rel and replay the log
+          * as best we can until the drop is seen.
+          */
+         smgrcreate(reln, MAIN_FORKNUM, true);
+
+         smgrtruncate(reln, MAIN_FORKNUM, xlrec->blkno, false);
+
+         /* Also tell xlogutils.c about it */
+         XLogTruncateRelation(xlrec->rnode, MAIN_FORKNUM, xlrec->blkno);
+
+         /* Truncate FSM too */
+         if (smgrexists(reln, FSM_FORKNUM))
+         {
+             Relation rel = CreateFakeRelcacheEntry(xlrec->rnode);
+             FreeSpaceMapTruncateRel(rel, xlrec->blkno);
+             FreeFakeRelcacheEntry(rel);
+         }
+
+     }
+     else
+         elog(PANIC, "smgr_redo: unknown op code %u", info);
+ }
+
+ void
+ smgr_desc(StringInfo buf, uint8 xl_info, char *rec)
+ {
+     uint8        info = xl_info & ~XLR_INFO_MASK;
+
+     if (info == XLOG_SMGR_CREATE)
+     {
+         xl_smgr_create *xlrec = (xl_smgr_create *) rec;
+         char *path = relpath(xlrec->rnode, MAIN_FORKNUM);
+
+         appendStringInfo(buf, "file create: %s", path);
+         pfree(path);
+     }
+     else if (info == XLOG_SMGR_TRUNCATE)
+     {
+         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) rec;
+         char *path = relpath(xlrec->rnode, MAIN_FORKNUM);
+
+         appendStringInfo(buf, "file truncate: %s to %u blocks", path,
+                          xlrec->blkno);
+         pfree(path);
+     }
+     else
+         appendStringInfo(buf, "UNKNOWN");
+ }
*** src/backend/commands/tablecmds.c
--- src/backend/commands/tablecmds.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "catalog/pg_trigger.h"
  #include "catalog/pg_type.h"
  #include "catalog/pg_type_fn.h"
+ #include "catalog/storage.h"
  #include "catalog/toasting.h"
  #include "commands/cluster.h"
  #include "commands/defrem.h"
***************
*** 6482,6488 **** ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
      Relation    pg_class;
      HeapTuple    tuple;
      Form_pg_class rd_rel;
!     ForkNumber    forkNum;

      /*
       * Need lock here in case we are recursing to toast table or index
--- 6483,6489 ----
      Relation    pg_class;
      HeapTuple    tuple;
      Form_pg_class rd_rel;
!     ForkNumber      forkNum;

      /*
       * Need lock here in case we are recursing to toast table or index
***************
*** 6558,6564 **** ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
      newrnode = rel->rd_node;
      newrnode.relNode = newrelfilenode;
      newrnode.spcNode = newTableSpace;
-     dstrel = smgropen(newrnode);

      RelationOpenSmgr(rel);

--- 6559,6564 ----
***************
*** 6567,6588 **** ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
       * of old physical files.
       *
       * NOTE: any conflict in relfilenode value will be caught in
!      *         smgrcreate() below.
       */
!     for (forkNum = 0; forkNum <= MAX_FORKNUM; forkNum++)
      {
          if (smgrexists(rel->rd_smgr, forkNum))
          {
!             smgrcreate(dstrel, forkNum, rel->rd_istemp, false);
              copy_relation_data(rel->rd_smgr, dstrel, forkNum, rel->rd_istemp);
-
-             smgrscheduleunlink(rel->rd_smgr, forkNum, rel->rd_istemp);
          }
      }

      /* Close old and new relation */
      smgrclose(dstrel);
-     RelationCloseSmgr(rel);

      /* update the pg_class row */
      rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
--- 6567,6592 ----
       * of old physical files.
       *
       * NOTE: any conflict in relfilenode value will be caught in
!      *         RelationCreateStorage().
       */
!     RelationCreateStorage(newrnode, rel->rd_istemp);
!
!     dstrel = smgropen(newrnode);
!
!     copy_relation_data(rel->rd_smgr, dstrel, MAIN_FORKNUM, rel->rd_istemp);
!     for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++)
      {
          if (smgrexists(rel->rd_smgr, forkNum))
          {
!             smgrcreate(dstrel, forkNum, false);
              copy_relation_data(rel->rd_smgr, dstrel, forkNum, rel->rd_istemp);
          }
      }

+     RelationDropStorage(rel);
+
      /* Close old and new relation */
      smgrclose(dstrel);

      /* update the pg_class row */
      rd_rel->reltablespace = (newTableSpace == MyDatabaseTableSpace) ? InvalidOid : newTableSpace;
*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 31,36 ****
--- 31,37 ----
  #include "catalog/namespace.h"
  #include "catalog/pg_database.h"
  #include "catalog/pg_namespace.h"
+ #include "catalog/storage.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
  #include "executor/executor.h"
***************
*** 2863,2869 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
      /* Truncate relation, if needed */
      if (blkno < nblocks)
      {
-         FreeSpaceMapTruncateRel(onerel, blkno);
          RelationTruncate(onerel, blkno);
          vacrelstats->rel_pages = blkno; /* set new number of blocks */
      }
--- 2864,2869 ----
***************
*** 3258,3264 **** vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
                  (errmsg("\"%s\": truncated %u to %u pages",
                          RelationGetRelationName(onerel),
                          vacrelstats->rel_pages, relblocks)));
-         FreeSpaceMapTruncateRel(onerel, relblocks);
          RelationTruncate(onerel, relblocks);
          vacrelstats->rel_pages = relblocks;        /* set new number of blocks */
      }
--- 3258,3263 ----
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 40,45 ****
--- 40,46 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "catalog/storage.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
  #include "miscadmin.h"
***************
*** 827,833 **** lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
      /*
       * Okay to truncate.
       */
-     FreeSpaceMapTruncateRel(onerel, new_rel_pages);
      RelationTruncate(onerel, new_rel_pages);

      /*
--- 828,833 ----
*** src/backend/rewrite/rewriteDefine.c
--- src/backend/rewrite/rewriteDefine.c
***************
*** 19,31 ****
  #include "catalog/indexing.h"
  #include "catalog/namespace.h"
  #include "catalog/pg_rewrite.h"
  #include "miscadmin.h"
  #include "nodes/nodeFuncs.h"
  #include "parser/parse_utilcmd.h"
  #include "rewrite/rewriteDefine.h"
  #include "rewrite/rewriteManip.h"
  #include "rewrite/rewriteSupport.h"
- #include "storage/smgr.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
  #include "utils/inval.h"
--- 19,31 ----
  #include "catalog/indexing.h"
  #include "catalog/namespace.h"
  #include "catalog/pg_rewrite.h"
+ #include "catalog/storage.h"
  #include "miscadmin.h"
  #include "nodes/nodeFuncs.h"
  #include "parser/parse_utilcmd.h"
  #include "rewrite/rewriteDefine.h"
  #include "rewrite/rewriteManip.h"
  #include "rewrite/rewriteSupport.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
  #include "utils/inval.h"
***************
*** 484,499 **** DefineQueryRewrite(char *rulename,
       * XXX what about getting rid of its TOAST table?  For now, we don't.
       */
      if (RelisBecomingView)
!     {
!         ForkNumber forknum;
!
!         RelationOpenSmgr(event_relation);
!         for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
!             if (smgrexists(event_relation->rd_smgr, forknum))
!                 smgrscheduleunlink(event_relation->rd_smgr, forknum,
!                                    event_relation->rd_istemp);
!         RelationCloseSmgr(event_relation);
!     }

      /* Close rel, but keep lock till commit... */
      heap_close(event_relation, NoLock);
--- 484,490 ----
       * XXX what about getting rid of its TOAST table?  For now, we don't.
       */
      if (RelisBecomingView)
!         RelationDropStorage(event_relation);

      /* Close rel, but keep lock till commit... */
      heap_close(event_relation, NoLock);
*** src/backend/storage/buffer/bufmgr.c
--- src/backend/storage/buffer/bufmgr.c
***************
*** 1695,1702 **** void
  BufmgrCommit(void)
  {
      /* Nothing to do in bufmgr anymore... */
-
-     smgrcommit();
  }

  /*
--- 1695,1700 ----
***************
*** 1848,1873 **** RelationGetNumberOfBlocks(Relation relation)
      return smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
  }

- /*
-  * RelationTruncate
-  *        Physically truncate a relation to the specified number of blocks.
-  *
-  * As of Postgres 8.1, this includes getting rid of any buffers for the
-  * blocks that are to be dropped; previously, callers had to do that.
-  */
- void
- RelationTruncate(Relation rel, BlockNumber nblocks)
- {
-     /* Open it at the smgr level if not already done */
-     RelationOpenSmgr(rel);
-
-     /* Make sure rd_targblock isn't pointing somewhere past end */
-     rel->rd_targblock = InvalidBlockNumber;
-
-     /* Do the real work */
-     smgrtruncate(rel->rd_smgr, MAIN_FORKNUM, nblocks, rel->rd_istemp);
- }
-
  /* ---------------------------------------------------------------------
   *        DropRelFileNodeBuffers
   *
--- 1846,1851 ----
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 47,53 ****
   * MaxFSMRequestSize depends on the architecture and BLCKSZ, but assuming
   * default 8k BLCKSZ, and that MaxFSMRequestSize is 24 bytes, the categories
   * look like this
!  *
   *
   * Range     Category
   * 0    - 31   0
--- 47,53 ----
   * MaxFSMRequestSize depends on the architecture and BLCKSZ, but assuming
   * default 8k BLCKSZ, and that MaxFSMRequestSize is 24 bytes, the categories
   * look like this
!  *
   *
   * Range     Category
   * 0    - 31   0
***************
*** 93,107 **** typedef struct
  /* Address of the root page. */
  static const FSMAddress FSM_ROOT_ADDRESS = { FSM_ROOT_LEVEL, 0 };

- /* XLOG record types */
- #define XLOG_FSM_TRUNCATE     0x00    /* truncate */
-
- typedef struct
- {
-     RelFileNode node;            /* truncated relation */
-     BlockNumber nheapblocks;    /* new number of blocks in the heap */
- } xl_fsm_truncate;
-
  /* functions to navigate the tree */
  static FSMAddress fsm_get_child(FSMAddress parent, uint16 slot);
  static FSMAddress fsm_get_parent(FSMAddress child, uint16 *slot);
--- 93,98 ----
***************
*** 110,116 **** static BlockNumber fsm_get_heap_blk(FSMAddress addr, uint16 slot);
  static BlockNumber fsm_logical_to_physical(FSMAddress addr);

  static Buffer fsm_readbuf(Relation rel, FSMAddress addr, bool extend);
! static void fsm_extend(Relation rel, BlockNumber nfsmblocks);

  /* functions to convert amount of free space to a FSM category */
  static uint8 fsm_space_avail_to_cat(Size avail);
--- 101,107 ----
  static BlockNumber fsm_logical_to_physical(FSMAddress addr);

  static Buffer fsm_readbuf(Relation rel, FSMAddress addr, bool extend);
! static void fsm_extend(Relation rel, BlockNumber nfsmblocks, bool createstorage);

  /* functions to convert amount of free space to a FSM category */
  static uint8 fsm_space_avail_to_cat(Size avail);
***************
*** 123,130 **** static int fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
  static BlockNumber fsm_search(Relation rel, uint8 min_cat);
  static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof);

- static void fsm_redo_truncate(xl_fsm_truncate *xlrec);
-

  /******** Public API ********/

--- 114,119 ----
***************
*** 275,280 **** FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
--- 264,276 ----

      RelationOpenSmgr(rel);

+     /*
+      * If no FSM has been created yet for this relation, there's nothing to
+      * truncate.
+      */
+     if (!smgrexists(rel->rd_smgr, FSM_FORKNUM))
+         return;
+
      /* Get the location in the FSM of the first removed heap block */
      first_removed_address = fsm_get_location(nblocks, &first_removed_slot);

***************
*** 307,348 **** FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
      smgrtruncate(rel->rd_smgr, FSM_FORKNUM, new_nfsmblocks, rel->rd_istemp);

      /*
-      * FSM truncations are WAL-logged, because we must never return a block
-      * that doesn't exist in the heap, not even if we crash before the FSM
-      * truncation has made it to disk. smgrtruncate() writes its own WAL
-      * record, but that's not enough to zero out the last remaining FSM page.
-      * (if we didn't need to zero out anything above, we can skip this)
-      */
-     if (!rel->rd_istemp && first_removed_slot != 0)
-     {
-         xl_fsm_truncate xlrec;
-         XLogRecData        rdata;
-         XLogRecPtr        recptr;
-
-         xlrec.node = rel->rd_node;
-         xlrec.nheapblocks = nblocks;
-
-         rdata.data = (char *) &xlrec;
-         rdata.len = sizeof(xl_fsm_truncate);
-         rdata.buffer = InvalidBuffer;
-         rdata.next = NULL;
-
-         recptr = XLogInsert(RM_FREESPACE_ID, XLOG_FSM_TRUNCATE, &rdata);
-
-         /*
-          * Flush, because otherwise the truncation of the main relation
-          * might hit the disk before the WAL record of truncating the
-          * FSM is flushed. If we crashed during that window, we'd be
-          * left with a truncated heap, without a truncated FSM.
-          */
-         XLogFlush(recptr);
-     }
-
-     /*
       * Need to invalidate the relcache entry, because rd_fsm_nblocks_cache
       * seen by other backends is no longer valid.
       */
!     CacheInvalidateRelcache(rel);

      rel->rd_fsm_nblocks_cache = new_nfsmblocks;
  }
--- 303,313 ----
      smgrtruncate(rel->rd_smgr, FSM_FORKNUM, new_nfsmblocks, rel->rd_istemp);

      /*
       * Need to invalidate the relcache entry, because rd_fsm_nblocks_cache
       * seen by other backends is no longer valid.
       */
!     if (!InRecovery)
!         CacheInvalidateRelcache(rel);

      rel->rd_fsm_nblocks_cache = new_nfsmblocks;
  }
***************
*** 538,551 **** fsm_readbuf(Relation rel, FSMAddress addr, bool extend)

      RelationOpenSmgr(rel);

!     if (rel->rd_fsm_nblocks_cache == InvalidBlockNumber ||
          rel->rd_fsm_nblocks_cache <= blkno)
!         rel->rd_fsm_nblocks_cache = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);

      if (blkno >= rel->rd_fsm_nblocks_cache)
      {
          if (extend)
!             fsm_extend(rel, blkno + 1);
          else
              return InvalidBuffer;
      }
--- 503,521 ----

      RelationOpenSmgr(rel);

!     if (rel->rd_fsm_nblocks_cache == InvalidBlockNumber ||
          rel->rd_fsm_nblocks_cache <= blkno)
!     {
!         if (!smgrexists(rel->rd_smgr, FSM_FORKNUM))
!             fsm_extend(rel, blkno + 1, true);
!         else
!             rel->rd_fsm_nblocks_cache = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
!     }

      if (blkno >= rel->rd_fsm_nblocks_cache)
      {
          if (extend)
!             fsm_extend(rel, blkno + 1, false);
          else
              return InvalidBuffer;
      }
***************
*** 566,575 **** fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
  /*
   * Ensure that the FSM fork is at least n_fsmblocks long, extending
   * it if necessary with empty pages. And by empty, I mean pages filled
!  * with zeros, meaning there's no free space.
   */
  static void
! fsm_extend(Relation rel, BlockNumber n_fsmblocks)
  {
      BlockNumber n_fsmblocks_now;
      Page pg;
--- 536,546 ----
  /*
   * Ensure that the FSM fork is at least n_fsmblocks long, extending
   * it if necessary with empty pages. And by empty, I mean pages filled
!  * with zeros, meaning there's no free space. If createstorage is true,
!  * the FSM file might need to be created first.
   */
  static void
! fsm_extend(Relation rel, BlockNumber n_fsmblocks, bool createstorage)
  {
      BlockNumber n_fsmblocks_now;
      Page pg;
***************
*** 584,595 **** fsm_extend(Relation rel, BlockNumber n_fsmblocks)
       * FSM happens seldom enough that it doesn't seem worthwhile to
       * have a separate lock tag type for it.
       *
!      * Note that another backend might have extended the relation
!      * before we get the lock.
       */
      LockRelationForExtension(rel, ExclusiveLock);

!     n_fsmblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
      while (n_fsmblocks_now < n_fsmblocks)
      {
          smgrextend(rel->rd_smgr, FSM_FORKNUM, n_fsmblocks_now,
--- 555,574 ----
       * FSM happens seldom enough that it doesn't seem worthwhile to
       * have a separate lock tag type for it.
       *
!      * Note that another backend might have extended or created the
!      * relation before we get the lock.
       */
      LockRelationForExtension(rel, ExclusiveLock);

!     /* Create the FSM file first if it doesn't exist */
!     if (createstorage && !smgrexists(rel->rd_smgr, FSM_FORKNUM))
!     {
!         smgrcreate(rel->rd_smgr, FSM_FORKNUM, false);
!         n_fsmblocks_now = 0;
!     }
!     else
!         n_fsmblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
!
      while (n_fsmblocks_now < n_fsmblocks)
      {
          smgrextend(rel->rd_smgr, FSM_FORKNUM, n_fsmblocks_now,
***************
*** 799,873 **** fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)

      return max_avail;
  }
-
-
- /****** WAL-logging ******/
-
- static void
- fsm_redo_truncate(xl_fsm_truncate *xlrec)
- {
-     FSMAddress    first_removed_address;
-     uint16        first_removed_slot;
-     BlockNumber fsmblk;
-     Buffer        buf;
-
-     /* Get the location in the FSM of the first removed heap block */
-     first_removed_address = fsm_get_location(xlrec->nheapblocks,
-                                              &first_removed_slot);
-     fsmblk = fsm_logical_to_physical(first_removed_address);
-
-     /*
-      * Zero out the tail of the last remaining FSM page. We rely on the
-      * replay of the smgr truncation record to remove completely unused
-      * pages.
-      */
-     buf = XLogReadBufferExtended(xlrec->node, FSM_FORKNUM, fsmblk,
-                                  RBM_ZERO_ON_ERROR);
-     if (BufferIsValid(buf))
-     {
-         Page page = BufferGetPage(buf);
-
-         if (PageIsNew(page))
-             PageInit(page, BLCKSZ, 0);
-         fsm_truncate_avail(page, first_removed_slot);
-         MarkBufferDirty(buf);
-         UnlockReleaseBuffer(buf);
-     }
- }
-
- void
- fsm_redo(XLogRecPtr lsn, XLogRecord *record)
- {
-     uint8        info = record->xl_info & ~XLR_INFO_MASK;
-
-     switch (info)
-     {
-         case XLOG_FSM_TRUNCATE:
-             fsm_redo_truncate((xl_fsm_truncate *) XLogRecGetData(record));
-             break;
-         default:
-             elog(PANIC, "fsm_redo: unknown op code %u", info);
-     }
- }
-
- void
- fsm_desc(StringInfo buf, uint8 xl_info, char *rec)
- {
-     uint8           info = xl_info & ~XLR_INFO_MASK;
-
-     switch (info)
-     {
-         case XLOG_FSM_TRUNCATE:
-         {
-             xl_fsm_truncate *xlrec = (xl_fsm_truncate *) rec;
-
-             appendStringInfo(buf, "truncate: rel %u/%u/%u; nheapblocks %u;",
-                              xlrec->node.spcNode, xlrec->node.dbNode,
-                              xlrec->node.relNode, xlrec->nheapblocks);
-             break;
-         }
-         default:
-             appendStringInfo(buf, "UNKNOWN");
-             break;
-     }
- }
--- 778,780 ----
*** src/backend/storage/freespace/indexfsm.c
--- src/backend/storage/freespace/indexfsm.c
***************
*** 31,50 ****
   */

  /*
-  * InitIndexFreeSpaceMap - Create or reset the FSM fork for relation.
-  */
- void
- InitIndexFreeSpaceMap(Relation rel)
- {
-     /* Create FSM fork if it doesn't exist yet, or truncate it if it does */
-     RelationOpenSmgr(rel);
-     if (!smgrexists(rel->rd_smgr, FSM_FORKNUM))
-         smgrcreate(rel->rd_smgr, FSM_FORKNUM, rel->rd_istemp, false);
-     else
-         smgrtruncate(rel->rd_smgr, FSM_FORKNUM, 0, rel->rd_istemp);
- }
-
- /*
   * GetFreeIndexPage - return a free page from the FSM
   *
   * As a side effect, the page is marked as used in the FSM.
--- 31,36 ----
*** src/backend/storage/smgr/smgr.c
--- src/backend/storage/smgr/smgr.c
***************
*** 17,31 ****
   */
  #include "postgres.h"

- #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
  #include "commands/tablespace.h"
  #include "storage/bufmgr.h"
  #include "storage/ipc.h"
  #include "storage/smgr.h"
  #include "utils/hsearch.h"
- #include "utils/memutils.h"


  /*
--- 17,30 ----
   */
  #include "postgres.h"

  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
  #include "commands/tablespace.h"
  #include "storage/bufmgr.h"
+ #include "storage/freespace.h"
  #include "storage/ipc.h"
  #include "storage/smgr.h"
  #include "utils/hsearch.h"


  /*
***************
*** 58,65 **** typedef struct f_smgr
      void        (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
                                    BlockNumber nblocks, bool isTemp);
      void        (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
-     void        (*smgr_commit) (void);    /* may be NULL */
-     void        (*smgr_abort) (void);    /* may be NULL */
      void        (*smgr_pre_ckpt) (void);        /* may be NULL */
      void        (*smgr_sync) (void);    /* may be NULL */
      void        (*smgr_post_ckpt) (void);        /* may be NULL */
--- 57,62 ----
***************
*** 70,76 **** static const f_smgr smgrsw[] = {
      /* magnetic disk */
      {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
          mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
!         NULL, NULL, mdpreckpt, mdsync, mdpostckpt
      }
  };

--- 67,73 ----
      /* magnetic disk */
      {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
          mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
!         mdpreckpt, mdsync, mdpostckpt
      }
  };

***************
*** 82,146 **** static const int NSmgr = lengthof(smgrsw);
   */
  static HTAB *SMgrRelationHash = NULL;

- /*
-  * We keep a list of all relations (represented as RelFileNode values)
-  * that have been created or deleted in the current transaction.  When
-  * a relation is created, we create the physical file immediately, but
-  * remember it so that we can delete the file again if the current
-  * transaction is aborted.    Conversely, a deletion request is NOT
-  * executed immediately, but is just entered in the list.  When and if
-  * the transaction commits, we can delete the physical file.
-  *
-  * To handle subtransactions, every entry is marked with its transaction
-  * nesting level.  At subtransaction commit, we reassign the subtransaction's
-  * entries to the parent nesting level.  At subtransaction abort, we can
-  * immediately execute the abort-time actions for all entries of the current
-  * nesting level.
-  *
-  * NOTE: the list is kept in TopMemoryContext to be sure it won't disappear
-  * unbetimes.  It'd probably be OK to keep it in TopTransactionContext,
-  * but I'm being paranoid.
-  */
-
- typedef struct PendingRelDelete
- {
-     RelFileNode relnode;        /* relation that may need to be deleted */
-     ForkNumber    forknum;        /* fork number that may need to be deleted */
-     int            which;            /* which storage manager? */
-     bool        isTemp;            /* is it a temporary relation? */
-     bool        atCommit;        /* T=delete at commit; F=delete at abort */
-     int            nestLevel;        /* xact nesting level of request */
-     struct PendingRelDelete *next;        /* linked-list link */
- } PendingRelDelete;
-
- static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
-
-
- /*
-  * Declarations for smgr-related XLOG records
-  *
-  * Note: we log file creation and truncation here, but logging of deletion
-  * actions is handled by xact.c, because it is part of transaction commit.
-  */
-
- /* XLOG gives us high 4 bits */
- #define XLOG_SMGR_CREATE    0x10
- #define XLOG_SMGR_TRUNCATE    0x20
-
- typedef struct xl_smgr_create
- {
-     RelFileNode rnode;
-     ForkNumber    forknum;
- } xl_smgr_create;
-
- typedef struct xl_smgr_truncate
- {
-     BlockNumber blkno;
-     RelFileNode rnode;
-     ForkNumber forknum;
- } xl_smgr_truncate;
-
-
  /* local function prototypes */
  static void smgrshutdown(int code, Datum arg);
  static void smgr_internal_unlink(RelFileNode rnode, ForkNumber forknum,
--- 79,84 ----
***************
*** 341,358 **** smgrclosenode(RelFileNode rnode)
   *        to be created.
   *
   *        If isRedo is true, it is okay for the underlying file to exist
!  *        already because we are in a WAL replay sequence.  In this case
!  *        we should make no PendingRelDelete entry; the WAL sequence will
!  *        tell whether to drop the file.
   */
  void
! smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isTemp, bool isRedo)
  {
-     XLogRecPtr    lsn;
-     XLogRecData rdata;
-     xl_smgr_create xlrec;
-     PendingRelDelete *pending;
-
      /*
       * Exit quickly in WAL replay mode if we've already opened the file.
       * If it's open, it surely must exist.
--- 279,289 ----
   *        to be created.
   *
   *        If isRedo is true, it is okay for the underlying file to exist
!  *        already because we are in a WAL replay sequence.
   */
  void
! smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
  {
      /*
       * Exit quickly in WAL replay mode if we've already opened the file.
       * If it's open, it surely must exist.
***************
*** 374,442 **** smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isTemp, bool isRedo)
                              isRedo);

      (*(smgrsw[reln->smgr_which].smgr_create)) (reln, forknum, isRedo);
-
-     if (isRedo)
-         return;
-
-     /*
-      * Make an XLOG entry showing the file creation.  If we abort, the file
-      * will be dropped at abort time.
-      */
-     xlrec.rnode = reln->smgr_rnode;
-     xlrec.forknum = forknum;
-
-     rdata.data = (char *) &xlrec;
-     rdata.len = sizeof(xlrec);
-     rdata.buffer = InvalidBuffer;
-     rdata.next = NULL;
-
-     lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE, &rdata);
-
-     /* Add the relation to the list of stuff to delete at abort */
-     pending = (PendingRelDelete *)
-         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-     pending->relnode = reln->smgr_rnode;
-     pending->forknum = forknum;
-     pending->which = reln->smgr_which;
-     pending->isTemp = isTemp;
-     pending->atCommit = false;    /* delete if abort */
-     pending->nestLevel = GetCurrentTransactionNestLevel();
-     pending->next = pendingDeletes;
-     pendingDeletes = pending;
- }
-
- /*
-  *    smgrscheduleunlink() -- Schedule unlinking a relation at xact commit.
-  *
-  *        The fork is marked to be removed from the store if we successfully
-  *        commit the current transaction.
-  */
- void
- smgrscheduleunlink(SMgrRelation reln, ForkNumber forknum, bool isTemp)
- {
-     PendingRelDelete *pending;
-
-     /* Add the relation to the list of stuff to delete at commit */
-     pending = (PendingRelDelete *)
-         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-     pending->relnode = reln->smgr_rnode;
-     pending->forknum = forknum;
-     pending->which = reln->smgr_which;
-     pending->isTemp = isTemp;
-     pending->atCommit = true;    /* delete if commit */
-     pending->nestLevel = GetCurrentTransactionNestLevel();
-     pending->next = pendingDeletes;
-     pendingDeletes = pending;
-
-     /*
-      * NOTE: if the relation was created in this transaction, it will now be
-      * present in the pending-delete list twice, once with atCommit true and
-      * once with atCommit false.  Hence, it will be physically deleted at end
-      * of xact in either case (and the other entry will be ignored by
-      * smgrDoPendingDeletes, so no error will occur).  We could instead remove
-      * the existing list entry and delete the physical file immediately, but
-      * for now I'll keep the logic simple.
-      */
  }

  /*
--- 305,310 ----
***************
*** 573,599 **** smgrtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks,
      /* Do the truncation */
      (*(smgrsw[reln->smgr_which].smgr_truncate)) (reln, forknum, nblocks,
                                                   isTemp);
-
-     if (!isTemp)
-     {
-         /*
-          * Make an XLOG entry showing the file truncation.
-          */
-         XLogRecPtr    lsn;
-         XLogRecData rdata;
-         xl_smgr_truncate xlrec;
-
-         xlrec.blkno = nblocks;
-         xlrec.rnode = reln->smgr_rnode;
-         xlrec.forknum = forknum;
-
-         rdata.data = (char *) &xlrec;
-         rdata.len = sizeof(xlrec);
-         rdata.buffer = InvalidBuffer;
-         rdata.next = NULL;
-
-         lsn = XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE, &rdata);
-     }
  }

  /*
--- 441,446 ----
***************
*** 627,813 **** smgrimmedsync(SMgrRelation reln, ForkNumber forknum)


  /*
-  *    PostPrepare_smgr -- Clean up after a successful PREPARE
-  *
-  * What we have to do here is throw away the in-memory state about pending
-  * relation deletes.  It's all been recorded in the 2PC state file and
-  * it's no longer smgr's job to worry about it.
-  */
- void
- PostPrepare_smgr(void)
- {
-     PendingRelDelete *pending;
-     PendingRelDelete *next;
-
-     for (pending = pendingDeletes; pending != NULL; pending = next)
-     {
-         next = pending->next;
-         pendingDeletes = next;
-         /* must explicitly free the list entry */
-         pfree(pending);
-     }
- }
-
-
- /*
-  *    smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
-  *
-  * This also runs when aborting a subxact; we want to clean up a failed
-  * subxact immediately.
-  */
- void
- smgrDoPendingDeletes(bool isCommit)
- {
-     int            nestLevel = GetCurrentTransactionNestLevel();
-     PendingRelDelete *pending;
-     PendingRelDelete *prev;
-     PendingRelDelete *next;
-
-     prev = NULL;
-     for (pending = pendingDeletes; pending != NULL; pending = next)
-     {
-         next = pending->next;
-         if (pending->nestLevel < nestLevel)
-         {
-             /* outer-level entries should not be processed yet */
-             prev = pending;
-         }
-         else
-         {
-             /* unlink list entry first, so we don't retry on failure */
-             if (prev)
-                 prev->next = next;
-             else
-                 pendingDeletes = next;
-             /* do deletion if called for */
-             if (pending->atCommit == isCommit)
-                 smgr_internal_unlink(pending->relnode,
-                                      pending->forknum,
-                                      pending->which,
-                                      pending->isTemp,
-                                      false);
-             /* must explicitly free the list entry */
-             pfree(pending);
-             /* prev does not change */
-         }
-     }
- }
-
- /*
-  * smgrGetPendingDeletes() -- Get a list of relations to be deleted.
-  *
-  * The return value is the number of relations scheduled for termination.
-  * *ptr is set to point to a freshly-palloc'd array of RelFileForks.
-  * If there are no relations to be deleted, *ptr is set to NULL.
-  *
-  * If haveNonTemp isn't NULL, the bool it points to gets set to true if
-  * there is any non-temp table pending to be deleted; false if not.
-  *
-  * Note that the list does not include anything scheduled for termination
-  * by upper-level transactions.
-  */
- int
- smgrGetPendingDeletes(bool forCommit, RelFileFork **ptr, bool *haveNonTemp)
- {
-     int            nestLevel = GetCurrentTransactionNestLevel();
-     int            nrels;
-     RelFileFork *rptr;
-     PendingRelDelete *pending;
-
-     nrels = 0;
-     if (haveNonTemp)
-         *haveNonTemp = false;
-     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
-     {
-         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit)
-             nrels++;
-     }
-     if (nrels == 0)
-     {
-         *ptr = NULL;
-         return 0;
-     }
-     rptr = (RelFileFork *) palloc(nrels * sizeof(RelFileFork));
-     *ptr = rptr;
-     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
-     {
-         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit)
-         {
-             rptr->rnode = pending->relnode;
-             rptr->forknum = pending->forknum;
-             rptr++;
-         }
-         if (haveNonTemp && !pending->isTemp)
-             *haveNonTemp = true;
-     }
-     return nrels;
- }
-
- /*
-  * AtSubCommit_smgr() --- Take care of subtransaction commit.
-  *
-  * Reassign all items in the pending-deletes list to the parent transaction.
-  */
- void
- AtSubCommit_smgr(void)
- {
-     int            nestLevel = GetCurrentTransactionNestLevel();
-     PendingRelDelete *pending;
-
-     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
-     {
-         if (pending->nestLevel >= nestLevel)
-             pending->nestLevel = nestLevel - 1;
-     }
- }
-
- /*
-  * AtSubAbort_smgr() --- Take care of subtransaction abort.
-  *
-  * Delete created relations and forget about deleted relations.
-  * We can execute these operations immediately because we know this
-  * subtransaction will not commit.
-  */
- void
- AtSubAbort_smgr(void)
- {
-     smgrDoPendingDeletes(false);
- }
-
- /*
-  *    smgrcommit() -- Prepare to commit changes made during the current
-  *                    transaction.
-  *
-  *        This is called before we actually commit.
-  */
- void
- smgrcommit(void)
- {
-     int            i;
-
-     for (i = 0; i < NSmgr; i++)
-     {
-         if (smgrsw[i].smgr_commit)
-             (*(smgrsw[i].smgr_commit)) ();
-     }
- }
-
- /*
-  *    smgrabort() -- Clean up after transaction abort.
-  */
- void
- smgrabort(void)
- {
-     int            i;
-
-     for (i = 0; i < NSmgr; i++)
-     {
-         if (smgrsw[i].smgr_abort)
-             (*(smgrsw[i].smgr_abort)) ();
-     }
- }
-
- /*
   *    smgrpreckpt() -- Prepare for checkpoint.
   */
  void
--- 474,479 ----
***************
*** 852,931 **** smgrpostckpt(void)
      }
  }

-
- void
- smgr_redo(XLogRecPtr lsn, XLogRecord *record)
- {
-     uint8        info = record->xl_info & ~XLR_INFO_MASK;
-
-     if (info == XLOG_SMGR_CREATE)
-     {
-         xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(record);
-         SMgrRelation reln;
-
-         reln = smgropen(xlrec->rnode);
-         smgrcreate(reln, xlrec->forknum, false, true);
-     }
-     else if (info == XLOG_SMGR_TRUNCATE)
-     {
-         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
-         SMgrRelation reln;
-
-         reln = smgropen(xlrec->rnode);
-
-         /*
-          * Forcibly create relation if it doesn't exist (which suggests that
-          * it was dropped somewhere later in the WAL sequence).  As in
-          * XLogOpenRelation, we prefer to recreate the rel and replay the log
-          * as best we can until the drop is seen.
-          */
-         smgrcreate(reln, xlrec->forknum, false, true);
-
-         /* Can't use smgrtruncate because it would try to xlog */
-
-         /*
-          * First, force bufmgr to drop any buffers it has for the to-be-
-          * truncated blocks.  We must do this, else subsequent XLogReadBuffer
-          * operations will not re-extend the file properly.
-          */
-         DropRelFileNodeBuffers(xlrec->rnode, xlrec->forknum, false,
-                                xlrec->blkno);
-
-         /* Do the truncation */
-         (*(smgrsw[reln->smgr_which].smgr_truncate)) (reln,
-                                                      xlrec->forknum,
-                                                      xlrec->blkno,
-                                                      false);
-
-         /* Also tell xlogutils.c about it */
-         XLogTruncateRelation(xlrec->rnode, xlrec->forknum, xlrec->blkno);
-     }
-     else
-         elog(PANIC, "smgr_redo: unknown op code %u", info);
- }
-
- void
- smgr_desc(StringInfo buf, uint8 xl_info, char *rec)
- {
-     uint8        info = xl_info & ~XLR_INFO_MASK;
-
-     if (info == XLOG_SMGR_CREATE)
-     {
-         xl_smgr_create *xlrec = (xl_smgr_create *) rec;
-         char *path = relpath(xlrec->rnode, xlrec->forknum);
-
-         appendStringInfo(buf, "file create: %s", path);
-         pfree(path);
-     }
-     else if (info == XLOG_SMGR_TRUNCATE)
-     {
-         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) rec;
-         char *path = relpath(xlrec->rnode, xlrec->forknum);
-
-         appendStringInfo(buf, "file truncate: %s to %u blocks", path,
-                          xlrec->blkno);
-         pfree(path);
-     }
-     else
-         appendStringInfo(buf, "UNKNOWN");
- }
--- 518,520 ----
*** src/include/access/rmgr.h
--- src/include/access/rmgr.h
***************
*** 23,29 **** typedef uint8 RmgrId;
  #define RM_DBASE_ID                4
  #define RM_TBLSPC_ID            5
  #define RM_MULTIXACT_ID            6
- #define RM_FREESPACE_ID            7
  #define RM_HEAP2_ID                9
  #define RM_HEAP_ID                10
  #define RM_BTREE_ID                11
--- 23,28 ----
*** src/include/access/xact.h
--- src/include/access/xact.h
***************
*** 90,97 **** typedef struct xl_xact_commit
      TimestampTz xact_time;        /* time of commit */
      int            nrels;            /* number of RelFileForks */
      int            nsubxacts;        /* number of subtransaction XIDs */
!     /* Array of RelFileFork(s) to drop at commit */
!     RelFileFork    xnodes[1];        /* VARIABLE LENGTH ARRAY */
      /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
  } xl_xact_commit;

--- 90,97 ----
      TimestampTz xact_time;        /* time of commit */
      int            nrels;            /* number of RelFileForks */
      int            nsubxacts;        /* number of subtransaction XIDs */
!     /* Array of RelFileNode(s) to drop at commit */
!     RelFileNode    xnodes[1];        /* VARIABLE LENGTH ARRAY */
      /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
  } xl_xact_commit;

***************
*** 102,109 **** typedef struct xl_xact_abort
      TimestampTz xact_time;        /* time of abort */
      int            nrels;            /* number of RelFileForks */
      int            nsubxacts;        /* number of subtransaction XIDs */
!     /* Array of RelFileFork(s) to drop at abort */
!     RelFileFork    xnodes[1];        /* VARIABLE LENGTH ARRAY */
      /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
  } xl_xact_abort;

--- 102,109 ----
      TimestampTz xact_time;        /* time of abort */
      int            nrels;            /* number of RelFileForks */
      int            nsubxacts;        /* number of subtransaction XIDs */
!     /* Array of RelFileNode(s) to drop at abort */
!     RelFileNode    xnodes[1];        /* VARIABLE LENGTH ARRAY */
      /* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
  } xl_xact_abort;

*** /dev/null
--- src/include/catalog/storage.h
***************
*** 0 ****
--- 1,32 ----
+ /*-------------------------------------------------------------------------
+  *
+  * heap.h
+  *      prototypes for functions in backend/catalog/heap.c
+  *
+  *
+  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef STORAGE_H
+ #define STORAGE_H
+
+ #include "storage/block.h"
+ #include "storage/relfilenode.h"
+ #include "utils/rel.h"
+
+ extern void RelationCreateStorage(RelFileNode rnode, bool istemp);
+ extern void RelationDropStorage(Relation rel);
+ extern void RelationTruncate(Relation rel, BlockNumber nblocks);
+
+ extern void smgrDoPendingDeletes(bool isCommit);
+ extern int smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr,
+                       bool *haveNonTemp);
+ extern void AtSubCommit_smgr(void);
+ extern void AtSubAbort_smgr(void);
+ extern void PostPrepare_smgr(void);
+
+ #endif   /* STORAGE_H */
*** src/include/storage/bufmgr.h
--- src/include/storage/bufmgr.h
***************
*** 176,182 **** extern void PrintBufferLeakWarning(Buffer buffer);
  extern void CheckPointBuffers(int flags);
  extern BlockNumber BufferGetBlockNumber(Buffer buffer);
  extern BlockNumber RelationGetNumberOfBlocks(Relation relation);
- extern void RelationTruncate(Relation rel, BlockNumber nblocks);
  extern void FlushRelationBuffers(Relation rel);
  extern void FlushDatabaseBuffers(Oid dbid);
  extern void DropRelFileNodeBuffers(RelFileNode rnode, ForkNumber forkNum,
--- 176,181 ----
*** src/include/storage/freespace.h
--- src/include/storage/freespace.h
***************
*** 33,40 **** extern void XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
  extern void FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks);
  extern void FreeSpaceMapVacuum(Relation rel);

- /* WAL prototypes */
- extern void fsm_desc(StringInfo buf, uint8 xl_info, char *rec);
- extern void fsm_redo(XLogRecPtr lsn, XLogRecord *record);
-
  #endif   /* FREESPACE_H */
--- 33,36 ----
*** src/include/storage/indexfsm.h
--- src/include/storage/indexfsm.h
***************
*** 20,26 **** extern BlockNumber GetFreeIndexPage(Relation rel);
  extern void RecordFreeIndexPage(Relation rel, BlockNumber page);
  extern void RecordUsedIndexPage(Relation rel, BlockNumber page);

- extern void InitIndexFreeSpaceMap(Relation rel);
  extern void IndexFreeSpaceMapTruncate(Relation rel, BlockNumber nblocks);
  extern void IndexFreeSpaceMapVacuum(Relation rel);

--- 20,25 ----
*** src/include/storage/relfilenode.h
--- src/include/storage/relfilenode.h
***************
*** 78,90 **** typedef struct RelFileNode
       (node1).dbNode == (node2).dbNode && \
       (node1).spcNode == (node2).spcNode)

- /*
-  * RelFileFork identifies a particular fork of a relation.
-  */
- typedef struct RelFileFork
- {
-     RelFileNode rnode;
-     ForkNumber forknum;
- } RelFileFork;
-
  #endif   /* RELFILENODE_H */
--- 78,81 ----
*** src/include/storage/smgr.h
--- src/include/storage/smgr.h
***************
*** 65,74 **** extern void smgrsetowner(SMgrRelation *owner, SMgrRelation reln);
  extern void smgrclose(SMgrRelation reln);
  extern void smgrcloseall(void);
  extern void smgrclosenode(RelFileNode rnode);
! extern void smgrcreate(SMgrRelation reln, ForkNumber forknum,
!                        bool isTemp, bool isRedo);
! extern void smgrscheduleunlink(SMgrRelation reln, ForkNumber forknum,
!                                bool isTemp);
  extern void smgrdounlink(SMgrRelation reln, ForkNumber forknum,
                           bool isTemp, bool isRedo);
  extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
--- 65,71 ----
  extern void smgrclose(SMgrRelation reln);
  extern void smgrcloseall(void);
  extern void smgrclosenode(RelFileNode rnode);
! extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
  extern void smgrdounlink(SMgrRelation reln, ForkNumber forknum,
                           bool isTemp, bool isRedo);
  extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
***************
*** 81,94 **** extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
  extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum,
                           BlockNumber nblocks, bool isTemp);
  extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
- extern void smgrDoPendingDeletes(bool isCommit);
- extern int smgrGetPendingDeletes(bool forCommit, RelFileFork **ptr,
-                       bool *haveNonTemp);
- extern void AtSubCommit_smgr(void);
- extern void AtSubAbort_smgr(void);
- extern void PostPrepare_smgr(void);
- extern void smgrcommit(void);
- extern void smgrabort(void);
  extern void smgrpreckpt(void);
  extern void smgrsync(void);
  extern void smgrpostckpt(void);
--- 78,83 ----

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Alvaro Herrera
Дата:
Сообщение: Re: WIP: Column-level Privileges
Следующее
От: Alvaro Herrera
Дата:
Сообщение: Re: WIP: Column-level Privileges