Re: Map forks (WIP)

Поиск
Список
Период
Сортировка
От Heikki Linnakangas
Тема Re: Map forks (WIP)
Дата
Msg-id 4843F1A3.7040808@enterprisedb.com
обсуждение исходный текст
Ответ на Re: Map forks (WIP)  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-patches
Tom Lane wrote:
> The XLogOpenRelationWithFork stuff needs to be re-thought also,
> as again this is blurring the question of what's a logical and
> what's a physical relation --- and if forknum isn't part of the
> relation ID, that API is wrong either way.  I'm not sure about
> a good solution in this area, but I wonder if the right answer
> might be to make the XLOG replay stuff use SMgrRelations instead
> of bogus Relations.  IIRC the replay code design predates the
> existence of SMgrRelation, so maybe we need a fresh look there.

I joggled this around for a while, and settled on removing
XLogOpenRelation altogether, and modifying XLogReadBuffer so that it
takes RelFileNode directly as argument. XLogReadBuffer takes care of
calling smgropen and creating missing files, like XLogOpenRelation used
to. This fits nicely with the changes required for relation forks: the
blurring of logical and physical goes away with XLogOpenRelation, as
XLogReadBuffer clearly works at the physical level.

To support functions that really do need a Relation object, like b-tree
cleanup routine which needs to insert a new index pointer to parent
page, which uses the same code that's used during normal operation,
there's a function XLogFakeRelcacheEntry that let's you create a fake
relcache entry just like XLogOpenRelation used to.

There's one ugly modularity violation in the patch: in XLogReadBuffer,
I'm checking "if(smgr->md_fd == NULL)", to avoid calling smgrcreate if a
relation has already been accessed. I'm seeing two possible solutions to
it: push the functionality to smgr.c by adding a new function
smgropenandcreate that opens a relation and creates it if it doesn't
exist, or just document the wart in comments. I'm leaning towards just
documenting it.

Attached is a patch with just these refactorings, no relation fork or
FSM stuff included. I'm planning to commit this first, followed by the
patch to introduce relation forks, followed by FSM rewrite patch. I
think we're almost there with this refactoring patch and relation forks.
FSM still needs a fair amount of closing up loose ends, clean up, and
testing.

Ps. The ReadBuffer interface is getting out of hand. This patch
introduces one more ReadBuffer variant, ReadBufferWithoutRelcache, and
the upcoming relation forks patch will again introduce at least one new
variant.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
***************
*** 70,81 **** static void
  ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

!     reln = XLogOpenRelation(*node);
!     buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 70,79 ----
  ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;

!     buffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 93,104 **** ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
      ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
-     Relation    reln;
      Buffer        buffer;
      Page        page;

!     reln = XLogOpenRelation(data->node);
!     buffer = XLogReadBuffer(reln, data->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 91,100 ----
  {
      ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
      ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
      Buffer        buffer;
      Page        page;

!     buffer = XLogReadBuffer(data->node, data->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 117,123 **** static void
  ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 113,118 ----
***************
*** 125,132 **** ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(data->node);
!     buffer = XLogReadBuffer(reln, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 120,126 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(data->node, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 227,252 **** static void
  ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        lbuffer,
                  rbuffer;
      Page        lpage,
                  rpage;
      uint32        flags = 0;

-     reln = XLogOpenRelation(data->node);
-
      if (data->isLeaf)
          flags |= GIN_LEAF;
      if (data->isData)
          flags |= GIN_DATA;

!     lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
      Assert(BufferIsValid(lbuffer));
      lpage = (Page) BufferGetPage(lbuffer);
      GinInitBuffer(lbuffer, flags);

!     rbuffer = XLogReadBuffer(reln, data->rblkno, true);
      Assert(BufferIsValid(rbuffer));
      rpage = (Page) BufferGetPage(rbuffer);
      GinInitBuffer(rbuffer, flags);
--- 221,243 ----
  ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
      Buffer        lbuffer,
                  rbuffer;
      Page        lpage,
                  rpage;
      uint32        flags = 0;

      if (data->isLeaf)
          flags |= GIN_LEAF;
      if (data->isData)
          flags |= GIN_DATA;

!     lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
      Assert(BufferIsValid(lbuffer));
      lpage = (Page) BufferGetPage(lbuffer);
      GinInitBuffer(lbuffer, flags);

!     rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
      Assert(BufferIsValid(rbuffer));
      rpage = (Page) BufferGetPage(rbuffer);
      GinInitBuffer(rbuffer, flags);
***************
*** 318,324 **** ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)

      if (data->isRootSplit)
      {
!         Buffer        rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
          Page        rootPage = BufferGetPage(rootBuf);

          GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
--- 309,315 ----

      if (data->isRootSplit)
      {
!         Buffer        rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
          Page        rootPage = BufferGetPage(rootBuf);

          GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
***************
*** 351,357 **** static void
  ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 342,347 ----
***************
*** 359,366 **** ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(data->node);
!     buffer = XLogReadBuffer(reln, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 349,355 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(data->node, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 402,416 **** static void
  ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

-     reln = XLogOpenRelation(data->node);
-
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(reln, data->blkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->flags = GIN_DELETED;
--- 391,402 ----
  ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;

      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(data->node, data->blkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->flags = GIN_DELETED;
***************
*** 422,428 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)

      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(reln, data->parentBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          Assert(!GinPageIsLeaf(page));
--- 408,414 ----

      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          Assert(!GinPageIsLeaf(page));
***************
*** 435,441 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)

      if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
      {
!         buffer = XLogReadBuffer(reln, data->leftBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->rightlink = data->rightLink;
--- 421,427 ----

      if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
      {
!         buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->rightlink = data->rightLink;
***************
*** 556,564 **** ginContinueSplit(ginIncompleteSplit *split)
       * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
       * split->leftBlkno, split->rightBlkno);
       */
!     reln = XLogOpenRelation(split->node);

!     buffer = XLogReadBuffer(reln, split->leftBlkno, false);

      if (split->rootBlkno == GIN_ROOT_BLKNO)
      {
--- 542,550 ----
       * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
       * split->leftBlkno, split->rightBlkno);
       */
!     buffer = XLogReadBuffer(split->node, split->leftBlkno, false);

!     reln = XLogFakeRelcacheEntry(split->node);

      if (split->rootBlkno == GIN_ROOT_BLKNO)
      {
***************
*** 580,585 **** ginContinueSplit(ginIncompleteSplit *split)
--- 566,573 ----
                                         GinPageGetOpaque(page)->maxoff))->key;
      }

+     pfree(reln);
+
      btree.rightblkno = split->rightBlkno;

      stack.blkno = split->leftBlkno;
*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 188,194 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
  {
      gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
      PageUpdateRecord xlrec;
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 188,193 ----
***************
*** 207,214 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)

      decodePageUpdateRecord(&xlrec, record);

!     reln = XLogOpenRelation(xlrec.data->node);
!     buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 206,212 ----

      decodePageUpdateRecord(&xlrec, record);

!     buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 233,239 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
--- 231,241 ----

      /* add tuples */
      if (xlrec.len > 0)
+     {
+         Relation reln = XLogFakeRelcacheEntry(xlrec.data->node);
          gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
+         pfree(reln);
+     }

      /*
       * special case: leafpage, nothing to insert, nothing to delete, then
***************
*** 261,267 **** static void
  gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
  {
      gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 263,268 ----
***************
*** 269,276 **** gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xldata->node);
!     buffer = XLogReadBuffer(reln, xldata->blkno, false);
      if (!BufferIsValid(buffer))
          return;

--- 270,276 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
      if (!BufferIsValid(buffer))
          return;

***************
*** 325,339 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
      int            flags;

      decodePageSplitRecord(&xlrec, record);
-     reln = XLogOpenRelation(xlrec.data->node);
      flags = xlrec.data->origleaf ? F_LEAF : 0;

      /* loop around all pages */
      for (i = 0; i < xlrec.data->npage; i++)
      {
          NewPage    *newpage = xlrec.page + i;

!         buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 325,340 ----
      int            flags;

      decodePageSplitRecord(&xlrec, record);
      flags = xlrec.data->origleaf ? F_LEAF : 0;

+     reln = XLogFakeRelcacheEntry(xlrec.data->node);
+
      /* loop around all pages */
      for (i = 0; i < xlrec.data->npage; i++)
      {
          NewPage    *newpage = xlrec.page + i;

!         buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

***************
*** 348,353 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
--- 349,355 ----
          MarkBufferDirty(buffer);
          UnlockReleaseBuffer(buffer);
      }
+     pfree(reln);

      forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);

***************
*** 360,371 **** static void
  gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

!     reln = XLogOpenRelation(*node);
!     buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 362,371 ----
  gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;

!     buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 601,607 **** gistContinueInsert(gistIncompleteInsert *insert)
                  lenitup;
      Relation    index;

!     index = XLogOpenRelation(insert->node);

      /*
       * needed vector itup never will be more than initial lenblkno+2, because
--- 601,607 ----
                  lenitup;
      Relation    index;

!     index = XLogFakeRelcacheEntry(insert->node);

      /*
       * needed vector itup never will be more than initial lenblkno+2, because
***************
*** 623,629 **** gistContinueInsert(gistIncompleteInsert *insert)
           * it was split root, so we should only make new root. it can't be
           * simple insert into root, we should replace all content of root.
           */
!         Buffer        buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);

          gistnewroot(index, buffer, itup, lenitup, NULL);
          UnlockReleaseBuffer(buffer);
--- 623,629 ----
           * it was split root, so we should only make new root. it can't be
           * simple insert into root, we should replace all content of root.
           */
!         Buffer        buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);

          gistnewroot(index, buffer, itup, lenitup, NULL);
          UnlockReleaseBuffer(buffer);
***************
*** 793,798 **** gistContinueInsert(gistIncompleteInsert *insert)
--- 793,800 ----
          }
      }

+     pfree(index);
+
      ereport(LOG,
              (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
              insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 3954,3961 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 3954,3960 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 3976,3986 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 3975,3987 ----
      Assert(nunused >= 0);

      /* Update all item pointers per the record, and repair fragmentation */
+     reln = XLogFakeRelcacheEntry(xlrec->node);
      heap_page_prune_execute(reln, buffer,
                              redirected, nredirected,
                              nowdead, ndead,
                              nowunused, nunused,
                              clean_move);
+     pfree(reln);

      /*
       * Note: we don't worry about updating the page's prunability hints.
***************
*** 3998,4012 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
      TransactionId cutoff_xid = xlrec->cutoff_xid;
-     Relation    reln;
      Buffer        buffer;
      Page        page;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 3999,4011 ----
  {
      xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
      TransactionId cutoff_xid = xlrec->cutoff_xid;
      Buffer        buffer;
      Page        page;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 4046,4052 **** static void
  heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 4045,4050 ----
***************
*** 4054,4061 **** heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
       * Note: the NEWPAGE log record is used for both heaps and indexes, so do
       * not do anything that assumes we are touching a heap.
       */
!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 4052,4058 ----
       * Note: the NEWPAGE log record is used for both heaps and indexes, so do
       * not do anything that assumes we are touching a heap.
       */
!     buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 4072,4078 **** static void
  heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4069,4074 ----
***************
*** 4082,4089 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->target.node);
!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4078,4084 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
***************
*** 4129,4135 **** static void
  heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4124,4129 ----
***************
*** 4145,4155 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

-     reln = XLogOpenRelation(xlrec->target.node);
-
      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(reln,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  true);
          Assert(BufferIsValid(buffer));
--- 4139,4147 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  true);
          Assert(BufferIsValid(buffer));
***************
*** 4159,4165 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      }
      else
      {
!         buffer = XLogReadBuffer(reln,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (!BufferIsValid(buffer))
--- 4151,4157 ----
      }
      else
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (!BufferIsValid(buffer))
***************
*** 4212,4218 **** static void
  heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
  {
      xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-     Relation    reln = XLogOpenRelation(xlrec->target.node);
      Buffer        buffer;
      bool        samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)));
--- 4204,4209 ----
***************
*** 4238,4244 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)

      /* Deal with old tuple version */

!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4229,4235 ----

      /* Deal with old tuple version */

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
***************
*** 4313,4319 **** newt:;

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(reln,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  true);
          Assert(BufferIsValid(buffer));
--- 4304,4310 ----

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  true);
          Assert(BufferIsValid(buffer));
***************
*** 4323,4329 **** newt:;
      }
      else
      {
!         buffer = XLogReadBuffer(reln,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  false);
          if (!BufferIsValid(buffer))
--- 4314,4320 ----
      }
      else
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  false);
          if (!BufferIsValid(buffer))
***************
*** 4395,4401 **** static void
  heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4386,4391 ----
***************
*** 4405,4412 **** heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->target.node);
!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4395,4401 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
***************
*** 4454,4460 **** static void
  heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
-     Relation    reln = XLogOpenRelation(xlrec->target.node);
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4443,4448 ----
***************
*** 4466,4472 **** heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4454,4460 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 149,155 **** _bt_restore_page(Page page, char *from, int len)
  }

  static void
! _bt_restore_meta(Relation reln, XLogRecPtr lsn,
                   BlockNumber root, uint32 level,
                   BlockNumber fastroot, uint32 fastlevel)
  {
--- 149,155 ----
  }

  static void
! _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
                   BlockNumber root, uint32 level,
                   BlockNumber fastroot, uint32 fastlevel)
  {
***************
*** 158,164 **** _bt_restore_meta(Relation reln, XLogRecPtr lsn,
      BTMetaPageData *md;
      BTPageOpaque pageop;

!     metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true);
      Assert(BufferIsValid(metabuf));
      metapg = BufferGetPage(metabuf);

--- 158,164 ----
      BTMetaPageData *md;
      BTPageOpaque pageop;

!     metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
      Assert(BufferIsValid(metabuf));
      metapg = BufferGetPage(metabuf);

***************
*** 193,199 **** btree_xlog_insert(bool isleaf, bool ismeta,
                    XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      char       *datapos;
--- 193,198 ----
***************
*** 219,229 **** btree_xlog_insert(bool isleaf, bool ismeta,
      if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
          return;                    /* nothing to do */

-     reln = XLogOpenRelation(xlrec->target.node);
-
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(reln,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (BufferIsValid(buffer))
--- 218,226 ----
      if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
          return;                    /* nothing to do */

      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (BufferIsValid(buffer))
***************
*** 250,256 **** btree_xlog_insert(bool isleaf, bool ismeta,
      }

      if (ismeta)
!         _bt_restore_meta(reln, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);

--- 247,253 ----
      }

      if (ismeta)
!         _bt_restore_meta(xlrec->target.node, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);

***************
*** 264,270 **** btree_xlog_split(bool onleft, bool isroot,
                   XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        rbuf;
      Page        rpage;
      BTPageOpaque ropaque;
--- 261,266 ----
***************
*** 276,283 **** btree_xlog_split(bool onleft, bool isroot,
      Item        left_hikey = NULL;
      Size        left_hikeysz = 0;

-     reln = XLogOpenRelation(xlrec->node);
-
      datapos = (char *) xlrec + SizeOfBtreeSplit;
      datalen = record->xl_len - SizeOfBtreeSplit;

--- 272,277 ----
***************
*** 327,333 **** btree_xlog_split(bool onleft, bool isroot,
      }

      /* Reconstruct right (new) sibling from scratch */
!     rbuf = XLogReadBuffer(reln, xlrec->rightsib, true);
      Assert(BufferIsValid(rbuf));
      rpage = (Page) BufferGetPage(rbuf);

--- 321,327 ----
      }

      /* Reconstruct right (new) sibling from scratch */
!     rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
      Assert(BufferIsValid(rbuf));
      rpage = (Page) BufferGetPage(rbuf);

***************
*** 368,374 **** btree_xlog_split(bool onleft, bool isroot,
       */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         Buffer        lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);

          if (BufferIsValid(lbuf))
          {
--- 362,368 ----
       */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         Buffer        lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);

          if (BufferIsValid(lbuf))
          {
***************
*** 438,444 **** btree_xlog_split(bool onleft, bool isroot,
      /* Fix left-link of the page to the right of the new right sibling */
      if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         Buffer        buffer = XLogReadBuffer(reln, xlrec->rnext, false);

          if (BufferIsValid(buffer))
          {
--- 432,438 ----
      /* Fix left-link of the page to the right of the new right sibling */
      if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         Buffer        buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);

          if (BufferIsValid(buffer))
          {
***************
*** 467,473 **** static void
  btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_delete *xlrec;
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      BTPageOpaque opaque;
--- 461,466 ----
***************
*** 476,483 **** btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
          return;

      xlrec = (xl_btree_delete *) XLogRecGetData(record);
!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 469,475 ----
          return;

      xlrec = (xl_btree_delete *) XLogRecGetData(record);
!     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 516,522 **** static void
  btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
-     Relation    reln;
      BlockNumber parent;
      BlockNumber target;
      BlockNumber leftsib;
--- 508,513 ----
***************
*** 525,531 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      Page        page;
      BTPageOpaque pageop;

-     reln = XLogOpenRelation(xlrec->target.node);
      parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
      target = xlrec->deadblk;
      leftsib = xlrec->leftblk;
--- 516,521 ----
***************
*** 534,540 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      /* parent page */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(reln, parent, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
--- 524,530 ----
      /* parent page */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(xlrec->target.node, parent, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
***************
*** 580,586 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      /* Fix left-link of right sibling */
      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(reln, rightsib, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
--- 570,576 ----
      /* Fix left-link of right sibling */
      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
***************
*** 606,612 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      {
          if (leftsib != P_NONE)
          {
!             buffer = XLogReadBuffer(reln, leftsib, false);
              if (BufferIsValid(buffer))
              {
                  page = (Page) BufferGetPage(buffer);
--- 596,602 ----
      {
          if (leftsib != P_NONE)
          {
!             buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
              if (BufferIsValid(buffer))
              {
                  page = (Page) BufferGetPage(buffer);
***************
*** 629,635 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      }

      /* Rewrite target page as empty deleted page */
!     buffer = XLogReadBuffer(reln, target, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 619,625 ----
      }

      /* Rewrite target page as empty deleted page */
!     buffer = XLogReadBuffer(xlrec->target.node, target, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 654,660 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)

          memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
                 sizeof(xl_btree_metadata));
!         _bt_restore_meta(reln, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);
      }
--- 644,650 ----

          memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
                 sizeof(xl_btree_metadata));
!         _bt_restore_meta(xlrec->target.node, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);
      }
***************
*** 671,684 **** static void
  btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      BTPageOpaque pageop;
      BlockNumber downlink = 0;

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 661,672 ----
  btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;
      BTPageOpaque pageop;
      BlockNumber downlink = 0;

!     buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 710,716 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);

!     _bt_restore_meta(reln, lsn,
                       xlrec->rootblk, xlrec->level,
                       xlrec->rootblk, xlrec->level);

--- 698,704 ----
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);

!     _bt_restore_meta(xlrec->node, lsn,
                       xlrec->rootblk, xlrec->level,
                       xlrec->rootblk, xlrec->level);

***************
*** 903,911 **** btree_xlog_cleanup(void)
      foreach(l, incomplete_actions)
      {
          bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
-         Relation    reln;

-         reln = XLogOpenRelation(action->node);
          if (action->is_split)
          {
              /* finish an incomplete split */
--- 891,897 ----
***************
*** 916,929 **** btree_xlog_cleanup(void)
              BTPageOpaque lpageop,
                          rpageop;
              bool        is_only;

!             lbuf = XLogReadBuffer(reln, action->leftblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(lbuf))
                  elog(PANIC, "btree_xlog_cleanup: left block unfound");
              lpage = (Page) BufferGetPage(lbuf);
              lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
!             rbuf = XLogReadBuffer(reln, action->rightblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(rbuf))
                  elog(PANIC, "btree_xlog_cleanup: right block unfound");
--- 902,916 ----
              BTPageOpaque lpageop,
                          rpageop;
              bool        is_only;
+             Relation    reln;

!             lbuf = XLogReadBuffer(action->node, action->leftblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(lbuf))
                  elog(PANIC, "btree_xlog_cleanup: left block unfound");
              lpage = (Page) BufferGetPage(lbuf);
              lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
!             rbuf = XLogReadBuffer(action->node, action->rightblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(rbuf))
                  elog(PANIC, "btree_xlog_cleanup: right block unfound");
***************
*** 933,950 **** btree_xlog_cleanup(void)
              /* if the pages are all of their level, it's a only-page split */
              is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);

              _bt_insert_parent(reln, lbuf, rbuf, NULL,
                                action->is_root, is_only);
          }
          else
          {
              /* finish an incomplete deletion (of a half-dead page) */
              Buffer        buf;

!             buf = XLogReadBuffer(reln, action->delblk, false);
              if (BufferIsValid(buf))
                  if (_bt_pagedel(reln, buf, NULL, true) == 0)
                      elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
          }
      }
      incomplete_actions = NIL;
--- 920,945 ----
              /* if the pages are all of their level, it's a only-page split */
              is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);

+             reln = XLogFakeRelcacheEntry(action->node);
              _bt_insert_parent(reln, lbuf, rbuf, NULL,
                                action->is_root, is_only);
+             pfree(reln);
          }
          else
          {
              /* finish an incomplete deletion (of a half-dead page) */
              Buffer        buf;

!             buf = XLogReadBuffer(action->node, action->delblk, false);
              if (BufferIsValid(buf))
+             {
+                 Relation reln;
+
+                 reln = XLogFakeRelcacheEntry(action->node);
                  if (_bt_pagedel(reln, buf, NULL, true) == 0)
                      elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
+                 pfree(reln);
+             }
          }
      }
      incomplete_actions = NIL;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2806,2812 **** CleanupBackupHistory(void)
  static void
  RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
  {
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      BkpBlock    bkpb;
--- 2806,2811 ----
***************
*** 2822,2829 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
          memcpy(&bkpb, blk, sizeof(BkpBlock));
          blk += sizeof(BkpBlock);

!         reln = XLogOpenRelation(bkpb.node);
!         buffer = XLogReadBuffer(reln, bkpb.block, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 2821,2827 ----
          memcpy(&bkpb, blk, sizeof(BkpBlock));
          blk += sizeof(BkpBlock);

!         buffer = XLogReadBuffer(bkpb.node, bkpb.block, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

***************
*** 5027,5035 **** StartupXLOG(void)
                                  BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
          }

!         /* Start up the recovery environment */
!         XLogInitRelationCache();
!
          for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
          {
              if (RmgrTable[rmid].rm_startup != NULL)
--- 5025,5031 ----
                                  BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
          }

!         /* Initialize resource managers */
          for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
          {
              if (RmgrTable[rmid].rm_startup != NULL)
***************
*** 5293,5303 **** StartupXLOG(void)
           * allows some extra error checking in xlog_redo.
           */
          CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
-
-         /*
-          * Close down recovery environment
-          */
-         XLogCloseRelationCache();
      }

      /*
--- 5289,5294 ----
*** a/src/backend/access/transam/xlogutils.c
--- b/src/backend/access/transam/xlogutils.c
***************
*** 190,195 **** XLogCheckInvalidPages(void)
--- 190,198 ----

      if (foundone)
          elog(PANIC, "WAL contains references to invalid pages");
+
+     hash_destroy(invalid_page_tab);
+     invalid_page_tab = NULL;
  }


***************
*** 218,244 **** XLogCheckInvalidPages(void)
   * at the end of WAL replay.)
   */
  Buffer
! XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
  {
!     BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
      Buffer        buffer;

      Assert(blkno != P_NEW);

      if (blkno < lastblock)
      {
          /* page exists in file */
!         if (init)
!             buffer = ReadOrZeroBuffer(reln, blkno);
!         else
!             buffer = ReadBuffer(reln, blkno);
      }
      else
      {
          /* hm, page doesn't exist in file */
          if (!init)
          {
!             log_invalid_page(reln->rd_node, blkno, false);
              return InvalidBuffer;
          }
          /* OK to extend the file */
--- 221,261 ----
   * at the end of WAL replay.)
   */
  Buffer
! XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
!     BlockNumber lastblock;
      Buffer        buffer;
+     SMgrRelation smgr;

      Assert(blkno != P_NEW);

+     /* Open the relation at smgr level */
+     smgr = smgropen(rnode);
+
+     /*
+      * Create the target file if it doesn't already exist.  This lets us cope
+      * if the replay sequence contains writes to a relation that is later
+      * deleted.  (The original coding of this routine would instead suppress
+      * the writes, but that seems like it risks losing valuable data if the
+      * filesystem loses an inode during a crash.  Better to write the data
+      * until we are actually told to delete the file.)
+      */
+     if (smgr->md_fd == NULL)    /* XXX: Modularity violation! */
+         smgrcreate(smgr, false, true);
+
+     lastblock = smgrnblocks(smgr);
+
      if (blkno < lastblock)
      {
          /* page exists in file */
!         buffer = ReadBufferWithoutRelcache(rnode, false, blkno, init);
      }
      else
      {
          /* hm, page doesn't exist in file */
          if (!init)
          {
!             log_invalid_page(rnode, blkno, false);
              return InvalidBuffer;
          }
          /* OK to extend the file */
***************
*** 249,255 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
          {
              if (buffer != InvalidBuffer)
                  ReleaseBuffer(buffer);
!             buffer = ReadBuffer(reln, P_NEW);
              lastblock++;
          }
          Assert(BufferGetBlockNumber(buffer) == blkno);
--- 266,272 ----
          {
              if (buffer != InvalidBuffer)
                  ReleaseBuffer(buffer);
!             buffer = ReadBufferWithoutRelcache(rnode, false, P_NEW, false);
              lastblock++;
          }
          Assert(BufferGetBlockNumber(buffer) == blkno);
***************
*** 265,271 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
          if (PageIsNew((PageHeader) page))
          {
              UnlockReleaseBuffer(buffer);
!             log_invalid_page(reln->rd_node, blkno, true);
              return InvalidBuffer;
          }
      }
--- 282,288 ----
          if (PageIsNew((PageHeader) page))
          {
              UnlockReleaseBuffer(buffer);
!             log_invalid_page(rnode, blkno, true);
              return InvalidBuffer;
          }
      }
***************
*** 273,472 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
      return buffer;
  }

!
! /*
!  * Lightweight "Relation" cache --- this substitutes for the normal relcache
!  * during XLOG replay.
!  */
!
! typedef struct XLogRelDesc
! {
!     RelationData reldata;
!     struct XLogRelDesc *lessRecently;
!     struct XLogRelDesc *moreRecently;
! } XLogRelDesc;
!
! typedef struct XLogRelCacheEntry
! {
!     RelFileNode rnode;
!     XLogRelDesc *rdesc;
! } XLogRelCacheEntry;
!
! static HTAB *_xlrelcache;
! static XLogRelDesc *_xlrelarr = NULL;
! static Form_pg_class _xlpgcarr = NULL;
! static int    _xlast = 0;
! static int    _xlcnt = 0;
!
! #define _XLOG_RELCACHESIZE    512
!
! static void
! _xl_init_rel_cache(void)
! {
!     HASHCTL        ctl;
!
!     _xlcnt = _XLOG_RELCACHESIZE;
!     _xlast = 0;
!     _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
!     memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
!     _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
!     memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
!
!     _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
!     _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
!
!     memset(&ctl, 0, sizeof(ctl));
!     ctl.keysize = sizeof(RelFileNode);
!     ctl.entrysize = sizeof(XLogRelCacheEntry);
!     ctl.hash = tag_hash;
!
!     _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
!                               &ctl, HASH_ELEM | HASH_FUNCTION);
! }
!
! static void
! _xl_remove_hash_entry(XLogRelDesc *rdesc)
! {
!     Form_pg_class tpgc = rdesc->reldata.rd_rel;
!     XLogRelCacheEntry *hentry;
!
!     rdesc->lessRecently->moreRecently = rdesc->moreRecently;
!     rdesc->moreRecently->lessRecently = rdesc->lessRecently;
!
!     hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
!                       (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
!     if (hentry == NULL)
!         elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
!
!     RelationCloseSmgr(&(rdesc->reldata));
!
!     memset(rdesc, 0, sizeof(XLogRelDesc));
!     memset(tpgc, 0, sizeof(FormData_pg_class));
!     rdesc->reldata.rd_rel = tpgc;
! }
!
! static XLogRelDesc *
! _xl_new_reldesc(void)
! {
!     XLogRelDesc *res;
!
!     _xlast++;
!     if (_xlast < _xlcnt)
!     {
!         _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
!         return &(_xlrelarr[_xlast]);
!     }
!
!     /* reuse */
!     res = _xlrelarr[0].moreRecently;
!
!     _xl_remove_hash_entry(res);
!
!     _xlast--;
!     return res;
! }
!
!
! void
! XLogInitRelationCache(void)
  {
!     _xl_init_rel_cache();
!     invalid_page_tab = NULL;
! }
!
! void
! XLogCloseRelationCache(void)
! {
!     HASH_SEQ_STATUS status;
!     XLogRelCacheEntry *hentry;
!
!     if (!_xlrelarr)
!         return;

!     hash_seq_init(&status, _xlrelcache);
!
!     while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
!         _xl_remove_hash_entry(hentry->rdesc);
!
!     hash_destroy(_xlrelcache);
!
!     free(_xlrelarr);
!     free(_xlpgcarr);
!
!     _xlrelarr = NULL;
! }

  /*
!  * Open a relation during XLOG replay
   *
!  * Note: this once had an API that allowed NULL return on failure, but it
!  * no longer does; any failure results in elog().
   */
  Relation
! XLogOpenRelation(RelFileNode rnode)
  {
!     XLogRelDesc *res;
!     XLogRelCacheEntry *hentry;
!     bool        found;

!     hentry = (XLogRelCacheEntry *)
!         hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);

!     if (hentry)
!     {
!         res = hentry->rdesc;

!         res->lessRecently->moreRecently = res->moreRecently;
!         res->moreRecently->lessRecently = res->lessRecently;
!     }
!     else
!     {
!         res = _xl_new_reldesc();
!
!         sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
!
!         res->reldata.rd_node = rnode;
!
!         /*
!          * We set up the lockRelId in case anything tries to lock the dummy
!          * relation.  Note that this is fairly bogus since relNode may be
!          * different from the relation's OID.  It shouldn't really matter
!          * though, since we are presumably running by ourselves and can't have
!          * any lock conflicts ...
!          */
!         res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
!         res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
!
!         hentry = (XLogRelCacheEntry *)
!             hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
!
!         if (found)
!             elog(PANIC, "xlog relation already present on insert into cache");
!
!         hentry->rdesc = res;
!
!         res->reldata.rd_targblock = InvalidBlockNumber;
!         res->reldata.rd_smgr = NULL;
!         RelationOpenSmgr(&(res->reldata));
!
!         /*
!          * Create the target file if it doesn't already exist.  This lets us
!          * cope if the replay sequence contains writes to a relation that is
!          * later deleted.  (The original coding of this routine would instead
!          * return NULL, causing the writes to be suppressed. But that seems
!          * like it risks losing valuable data if the filesystem loses an inode
!          * during a crash.    Better to write the data until we are actually
!          * told to delete the file.)
!          */
!         smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
!     }

!     res->moreRecently = &(_xlrelarr[0]);
!     res->lessRecently = _xlrelarr[0].lessRecently;
!     _xlrelarr[0].lessRecently = res;
!     res->lessRecently->moreRecently = res;

!     return &(res->reldata);
  }

  /*
--- 290,349 ----
      return buffer;
  }

! /* Struct returned by XLogFakeRelcacheEntry */
! typedef struct
  {
!     RelationData        reldata;    /* Note: this must be first */
!     FormData_pg_class    pgc;
! } FakeRelCacheEntryData;

! typedef FakeRelCacheEntryData *FakeRelCacheEntry;

  /*
!  * Create a fake relation cache entry for a physical relation
   *
!  * It's often convenient to use the same functions in XLOG replay as in the
!  * main codepath, but those functions often work with a relcache entry. We
!  * don't have a working relation cache during XLOG replay, but this function
!  * can be used to create a fake relcache entry instead. Only the fields
!  * related to physical storage, like rd_rel, are initialized, so the fake
!  * entry is only usable in low-level operations.
!  *
!  * Caller must pfree() the returned entry.
   */
  Relation
! XLogFakeRelcacheEntry(RelFileNode rnode)
  {
!     FakeRelCacheEntry fakeentry;
!     Relation rel;

!     /*
!      * We allocate the RelationData struct and all related space in one
!      * block, so that the caller can free it with one simple pfree call.
!      */
!     fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
!     rel = (Relation) fakeentry;

!     rel->rd_rel = &fakeentry->pgc;
!     rel->rd_node = rnode;

!     /* We don't know the name of the relation; use relfilenode instead */
!     sprintf(RelationGetRelationName(rel), "%u", rnode.relNode);
!
!     /*
!      * We set up the lockRelId in case anything tries to lock the dummy
!      * relation.  Note that this is fairly bogus since relNode may be
!      * different from the relation's OID.  It shouldn't really matter
!      * though, since we are presumably running by ourselves and can't have
!      * any lock conflicts ...
!      */
!     rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode;
!     rel->rd_lockInfo.lockRelId.relId = rnode.relNode;

!     rel->rd_targblock = InvalidBlockNumber;
!     rel->rd_smgr = NULL;

!     return rel;
  }

  /*
***************
*** 484,500 **** XLogOpenRelation(RelFileNode rnode)
  void
  XLogDropRelation(RelFileNode rnode)
  {
!     XLogRelCacheEntry *hentry;
!
!     hentry = (XLogRelCacheEntry *)
!         hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
!
!     if (hentry)
!     {
!         XLogRelDesc *rdesc = hentry->rdesc;
!
!         RelationCloseSmgr(&(rdesc->reldata));
!     }

      forget_invalid_pages(rnode, 0);
  }
--- 361,367 ----
  void
  XLogDropRelation(RelFileNode rnode)
  {
!     smgrclosenode(rnode);

      forget_invalid_pages(rnode, 0);
  }
***************
*** 507,524 **** XLogDropRelation(RelFileNode rnode)
  void
  XLogDropDatabase(Oid dbid)
  {
!     HASH_SEQ_STATUS status;
!     XLogRelCacheEntry *hentry;
!
!     hash_seq_init(&status, _xlrelcache);
!
!     while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
!     {
!         XLogRelDesc *rdesc = hentry->rdesc;
!
!         if (hentry->rnode.dbNode == dbid)
!             RelationCloseSmgr(&(rdesc->reldata));
!     }

      forget_invalid_pages_db(dbid);
  }
--- 374,386 ----
  void
  XLogDropDatabase(Oid dbid)
  {
!     /*
!      * This is unnecessarily heavy-handed, as it will close SMgrRelation
!      * objects for other databases as well. DROP DATABASE occurs seldom
!      * enough that it's not worth introducing a variant of smgrclose for
!      * just this purpose.
!      */
!     smgrcloseall();

      forget_invalid_pages_db(dbid);
  }
*** a/src/backend/commands/sequence.c
--- b/src/backend/commands/sequence.c
***************
*** 1266,1272 **** void
  seq_redo(XLogRecPtr lsn, XLogRecord *record)
  {
      uint8        info = record->xl_info & ~XLR_INFO_MASK;
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      char       *item;
--- 1266,1271 ----
***************
*** 1277,1284 **** seq_redo(XLogRecPtr lsn, XLogRecord *record)
      if (info != XLOG_SEQ_LOG)
          elog(PANIC, "seq_redo: unknown op code %u", info);

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, 0, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 1276,1282 ----
      if (info != XLOG_SEQ_LOG)
          elog(PANIC, "seq_redo: unknown op code %u", info);

!     buffer = XLogReadBuffer(xlrec->node, 0, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
***************
*** 76,84 **** static bool IsForInput;
  static volatile BufferDesc *PinCountWaitBuf = NULL;


! static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum,
!                   bool zeroPage,
!                   BufferAccessStrategy strategy);
  static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
--- 76,83 ----
  static volatile BufferDesc *PinCountWaitBuf = NULL;


! static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf, BlockNumber blockNum,
!                   bool zeroPage, BufferAccessStrategy strategy, bool *hit);
  static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
***************
*** 89,95 **** static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
  static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
                    int set_flag_bits);
  static void buffer_write_error_callback(void *arg);
! static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr);
  static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
--- 88,94 ----
  static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
                    int set_flag_bits);
  static void buffer_write_error_callback(void *arg);
! static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr);
  static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
***************
*** 114,120 **** static void AtProcExit_Buffers(int code, Datum arg);
  Buffer
  ReadBuffer(Relation reln, BlockNumber blockNum)
  {
!     return ReadBuffer_common(reln, blockNum, false, NULL);
  }

  /*
--- 113,130 ----
  Buffer
  ReadBuffer(Relation reln, BlockNumber blockNum)
  {
!     bool hit;
!     Buffer buf;
!
!     /* Open it at the smgr level if not already done */
!     RelationOpenSmgr(reln);
!
!     pgstat_count_buffer_read(reln);
!     buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, false, NULL, &hit);
!     if (hit)
!         pgstat_count_buffer_hit(reln);
!
!     return buf;
  }

  /*
***************
*** 125,131 **** Buffer
  ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                         BufferAccessStrategy strategy)
  {
!     return ReadBuffer_common(reln, blockNum, false, strategy);
  }

  /*
--- 135,151 ----
  ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                         BufferAccessStrategy strategy)
  {
!     bool hit;
!     Buffer buf;
!
!     /* Open it at the smgr level if not already done */
!     RelationOpenSmgr(reln);
!
!     pgstat_count_buffer_read(reln);
!     buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, false, strategy, &hit);
!     if (hit)
!         pgstat_count_buffer_hit(reln);
!     return buf;
  }

  /*
***************
*** 142,182 **** ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
  Buffer
  ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
  {
!     return ReadBuffer_common(reln, blockNum, true, NULL);
  }

  /*
   * ReadBuffer_common -- common logic for ReadBuffer variants
   */
  static Buffer
! ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
!                   BufferAccessStrategy strategy)
  {
      volatile BufferDesc *bufHdr;
      Block        bufBlock;
      bool        found;
      bool        isExtend;
!     bool        isLocalBuf;

      /* Make sure we will have room to remember the buffer pin */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      isExtend = (blockNum == P_NEW);
-     isLocalBuf = reln->rd_istemp;
-
-     /* Open it at the smgr level if not already done */
-     RelationOpenSmgr(reln);

      /* Substitute proper block number if caller asked for P_NEW */
      if (isExtend)
!         blockNum = smgrnblocks(reln->rd_smgr);
!
!     pgstat_count_buffer_read(reln);

      if (isLocalBuf)
      {
          ReadLocalBufferCount++;
!         bufHdr = LocalBufferAlloc(reln, blockNum, &found);
          if (found)
              LocalBufferHitCount++;
      }
--- 162,222 ----
  Buffer
  ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
  {
!     bool hit;
!     Buffer buf;
!
!     /* Open it at the smgr level if not already done */
!     RelationOpenSmgr(reln);
!
!     pgstat_count_buffer_read(reln);
!     buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, true, NULL, &hit);
!     if (hit)
!         pgstat_count_buffer_hit(reln);
!     return buf;
! }
!
! /*
!  * ReadBufferWithoutRelcache -- like ReadBuffer, but can be used to read
!  *        read a page without having a relcache entry. If zeroPage is true,
!  *        this behaves like ReadOrZeroBuffer rather than ReadBuffer.
!  */
! Buffer
! ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
!                           BlockNumber blockNum, bool zeroPage)
! {
!     bool hit;
!
!     SMgrRelation smgr = smgropen(rnode);
!     return ReadBuffer_common(smgr, isTemp, blockNum, zeroPage, NULL, &hit);
  }

  /*
   * ReadBuffer_common -- common logic for ReadBuffer variants
   */
  static Buffer
! ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, BlockNumber blockNum,
!                   bool zeroPage, BufferAccessStrategy strategy, bool *hit)
  {
      volatile BufferDesc *bufHdr;
      Block        bufBlock;
      bool        found;
      bool        isExtend;
!
!     *hit = false;

      /* Make sure we will have room to remember the buffer pin */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      isExtend = (blockNum == P_NEW);

      /* Substitute proper block number if caller asked for P_NEW */
      if (isExtend)
!         blockNum = smgrnblocks(smgr);

      if (isLocalBuf)
      {
          ReadLocalBufferCount++;
!         bufHdr = LocalBufferAlloc(smgr, blockNum, &found);
          if (found)
              LocalBufferHitCount++;
      }
***************
*** 188,194 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
           * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
           * not currently in memory.
           */
!         bufHdr = BufferAlloc(reln, blockNum, strategy, &found);
          if (found)
              BufferHitCount++;
      }
--- 228,234 ----
           * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
           * not currently in memory.
           */
!         bufHdr = BufferAlloc(smgr, blockNum, strategy, &found);
          if (found)
              BufferHitCount++;
      }
***************
*** 201,207 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
          if (!isExtend)
          {
              /* Just need to update stats before we exit */
!             pgstat_count_buffer_hit(reln);

              if (VacuumCostActive)
                  VacuumCostBalance += VacuumCostPageHit;
--- 241,247 ----
          if (!isExtend)
          {
              /* Just need to update stats before we exit */
!             *hit = true;

              if (VacuumCostActive)
                  VacuumCostBalance += VacuumCostPageHit;
***************
*** 225,232 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
          bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
          if (!PageIsNew((PageHeader) bufBlock))
              ereport(ERROR,
!                     (errmsg("unexpected data beyond EOF in block %u of relation \"%s\"",
!                             blockNum, RelationGetRelationName(reln)),
                       errhint("This has been seen to occur with buggy kernels; consider updating your system.")));

          /*
--- 265,272 ----
          bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
          if (!PageIsNew((PageHeader) bufBlock))
              ereport(ERROR,
!                     (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u",
!                             blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode),
                       errhint("This has been seen to occur with buggy kernels; consider updating your system.")));

          /*
***************
*** 278,285 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
      {
          /* new buffers are zero-filled */
          MemSet((char *) bufBlock, 0, BLCKSZ);
!         smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
!                    reln->rd_istemp);
      }
      else
      {
--- 318,325 ----
      {
          /* new buffers are zero-filled */
          MemSet((char *) bufBlock, 0, BLCKSZ);
!         smgrextend(smgr, blockNum, (char *) bufBlock,
!                    isLocalBuf);
      }
      else
      {
***************
*** 290,296 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
          if (zeroPage)
              MemSet((char *) bufBlock, 0, BLCKSZ);
          else
!             smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
          /* check for garbage data */
          if (!PageHeaderIsValid((PageHeader) bufBlock))
          {
--- 330,336 ----
          if (zeroPage)
              MemSet((char *) bufBlock, 0, BLCKSZ);
          else
!             smgrread(smgr, blockNum, (char *) bufBlock);
          /* check for garbage data */
          if (!PageHeaderIsValid((PageHeader) bufBlock))
          {
***************
*** 298,312 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
              {
                  ereport(WARNING,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                          errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
!                                 blockNum, RelationGetRelationName(reln))));
                  MemSet((char *) bufBlock, 0, BLCKSZ);
              }
              else
                  ereport(ERROR,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                  errmsg("invalid page header in block %u of relation \"%s\"",
!                         blockNum, RelationGetRelationName(reln))));
          }
      }

--- 338,357 ----
              {
                  ereport(WARNING,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                          errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page",
!                                 blockNum,
!                                 smgr->smgr_rnode.spcNode,
!                                 smgr->smgr_rnode.dbNode,
!                                 smgr->smgr_rnode.relNode)));
                  MemSet((char *) bufBlock, 0, BLCKSZ);
              }
              else
                  ereport(ERROR,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                  errmsg("invalid page header in block %u of relation %u/%u/%u",
!                         blockNum, smgr->smgr_rnode.spcNode,
!                         smgr->smgr_rnode.dbNode,
!                         smgr->smgr_rnode.relNode)));
          }
      }

***************
*** 347,353 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
   * No locks are held either at entry or exit.
   */
  static volatile BufferDesc *
! BufferAlloc(Relation reln,
              BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr)
--- 392,398 ----
   * No locks are held either at entry or exit.
   */
  static volatile BufferDesc *
! BufferAlloc(SMgrRelation smgr,
              BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr)
***************
*** 364,370 **** BufferAlloc(Relation reln,
      bool        valid;

      /* create a tag so we can lookup the buffer */
!     INIT_BUFFERTAG(newTag, reln, blockNum);

      /* determine its hash code and partition lock ID */
      newHash = BufTableHashCode(&newTag);
--- 409,415 ----
      bool        valid;

      /* create a tag so we can lookup the buffer */
!     INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);

      /* determine its hash code and partition lock ID */
      newHash = BufTableHashCode(&newTag);
*** a/src/backend/storage/buffer/localbuf.c
--- b/src/backend/storage/buffer/localbuf.c
***************
*** 61,67 **** static Block GetLocalBufferStorage(void);
   * (hence, usage_count is always advanced).
   */
  BufferDesc *
! LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
  {
      BufferTag    newTag;            /* identity of requested block */
      LocalBufferLookupEnt *hresult;
--- 61,67 ----
   * (hence, usage_count is always advanced).
   */
  BufferDesc *
! LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
  {
      BufferTag    newTag;            /* identity of requested block */
      LocalBufferLookupEnt *hresult;
***************
*** 70,76 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
      int            trycounter;
      bool        found;

!     INIT_BUFFERTAG(newTag, reln, blockNum);

      /* Initialize local buffers if first request in this session */
      if (LocalBufHash == NULL)
--- 70,76 ----
      int            trycounter;
      bool        found;

!     INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);

      /* Initialize local buffers if first request in this session */
      if (LocalBufHash == NULL)
***************
*** 87,93 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
          Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
  #ifdef LBDEBUG
          fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
!                 RelationGetRelid(reln), blockNum, -b - 1);
  #endif
          /* this part is equivalent to PinBuffer for a shared buffer */
          if (LocalRefCount[b] == 0)
--- 87,93 ----
          Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
  #ifdef LBDEBUG
          fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
!                 smgr->smgr_rnode.relNode, blockNum, -b - 1);
  #endif
          /* this part is equivalent to PinBuffer for a shared buffer */
          if (LocalRefCount[b] == 0)
*** a/src/backend/utils/init/flatfiles.c
--- b/src/backend/utils/init/flatfiles.c
***************
*** 701,712 **** BuildFlatFiles(bool database_only)
                  rel_authid,
                  rel_authmem;

-     /*
-      * We don't have any hope of running a real relcache, but we can use the
-      * same fake-relcache facility that WAL replay uses.
-      */
-     XLogInitRelationCache();
-
      /* Need a resowner to keep the heapam and buffer code happy */
      owner = ResourceOwnerCreate(NULL, "BuildFlatFiles");
      CurrentResourceOwner = owner;
--- 701,706 ----
***************
*** 716,724 **** BuildFlatFiles(bool database_only)
      rnode.dbNode = 0;
      rnode.relNode = DatabaseRelationId;

!     /* No locking is needed because no one else is alive yet */
!     rel_db = XLogOpenRelation(rnode);
      write_database_file(rel_db, true);

      if (!database_only)
      {
--- 710,724 ----
      rnode.dbNode = 0;
      rnode.relNode = DatabaseRelationId;

!     /*
!      * We don't have any hope of running a real relcache, but we can use the
!      * same fake-relcache facility that WAL replay uses.
!      *
!      * No locking is needed because no one else is alive yet.
!      */
!     rel_db = XLogFakeRelcacheEntry(rnode);
      write_database_file(rel_db, true);
+     pfree(rel_db);

      if (!database_only)
      {
***************
*** 726,746 **** BuildFlatFiles(bool database_only)
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthIdRelationId;
!         rel_authid = XLogOpenRelation(rnode);

          /* hard-wired path to pg_auth_members */
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthMemRelationId;
!         rel_authmem = XLogOpenRelation(rnode);

          write_auth_file(rel_authid, rel_authmem);
      }

      CurrentResourceOwner = NULL;
      ResourceOwnerDelete(owner);
-
-     XLogCloseRelationCache();
  }


--- 726,746 ----
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthIdRelationId;
!         rel_authid = XLogFakeRelcacheEntry(rnode);

          /* hard-wired path to pg_auth_members */
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthMemRelationId;
!         rel_authmem = XLogFakeRelcacheEntry(rnode);

          write_auth_file(rel_authid, rel_authmem);
+         pfree(rel_authid);
+         pfree(rel_authmem);
      }

      CurrentResourceOwner = NULL;
      ResourceOwnerDelete(owner);
  }


*** a/src/include/access/xlogutils.h
--- b/src/include/access/xlogutils.h
***************
*** 12,29 ****
  #define XLOG_UTILS_H

  #include "storage/buf.h"
  #include "utils/rel.h"


- extern void XLogInitRelationCache(void);
  extern void XLogCheckInvalidPages(void);
- extern void XLogCloseRelationCache(void);

! extern Relation XLogOpenRelation(RelFileNode rnode);
  extern void XLogDropRelation(RelFileNode rnode);
  extern void XLogDropDatabase(Oid dbid);
  extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);

! extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);

  #endif
--- 12,29 ----
  #define XLOG_UTILS_H

  #include "storage/buf.h"
+ #include "storage/smgr.h"
  #include "utils/rel.h"


  extern void XLogCheckInvalidPages(void);

! extern Relation XLogFakeRelcacheEntry(RelFileNode rnode);
!
  extern void XLogDropRelation(RelFileNode rnode);
  extern void XLogDropDatabase(Oid dbid);
  extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);

! extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);

  #endif
*** a/src/include/storage/buf_internals.h
--- b/src/include/storage/buf_internals.h
***************
*** 18,23 ****
--- 18,24 ----
  #include "storage/buf.h"
  #include "storage/lwlock.h"
  #include "storage/shmem.h"
+ #include "storage/smgr.h"
  #include "storage/spin.h"
  #include "utils/rel.h"

***************
*** 75,83 **** typedef struct buftag
      (a).blockNum = InvalidBlockNumber \
  )

! #define INIT_BUFFERTAG(a,xx_reln,xx_blockNum) \
  ( \
!     (a).rnode = (xx_reln)->rd_node, \
      (a).blockNum = (xx_blockNum) \
  )

--- 76,84 ----
      (a).blockNum = InvalidBlockNumber \
  )

! #define INIT_BUFFERTAG(a,xx_rnode,xx_blockNum) \
  ( \
!     (a).rnode = (xx_rnode), \
      (a).blockNum = (xx_blockNum) \
  )

***************
*** 201,207 **** extern int    BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id);
  extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);

  /* localbuf.c */
! extern BufferDesc *LocalBufferAlloc(Relation reln, BlockNumber blockNum,
                   bool *foundPtr);
  extern void MarkLocalBufferDirty(Buffer buffer);
  extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
--- 202,208 ----
  extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);

  /* localbuf.c */
! extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, BlockNumber blockNum,
                   bool *foundPtr);
  extern void MarkLocalBufferDirty(Buffer buffer);
  extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
*** a/src/include/storage/bufmgr.h
--- b/src/include/storage/bufmgr.h
***************
*** 121,126 **** extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
--- 121,128 ----
  extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                         BufferAccessStrategy strategy);
  extern Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum);
+ extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
+                              BlockNumber blockNum, bool zeroPage);
  extern void ReleaseBuffer(Buffer buffer);
  extern void UnlockReleaseBuffer(Buffer buffer);
  extern void MarkBufferDirty(Buffer buffer);

В списке pgsql-patches по дате отправления:

Предыдущее
От: Gregory Stark
Дата:
Сообщение: Re: Feature: give pg_dump a WHERE clause expression
Следующее
От: Tom Lane
Дата:
Сообщение: Re: Feature: give pg_dump a WHERE clause expression