Обсуждение: Refactoring xlogutils.c

Поиск
Список
Период
Сортировка

Refactoring xlogutils.c

От
"Heikki Linnakangas"
Дата:
Attached is an updated version of my patch to refactor the
XLogOpenRelation/XLogReadBuffer interface, in preparation for the
relation forks patch, and subsequently the FSM rewrite patch.

I'm satisfied enough with it that I plan to commit it in a few days,
barring objections.

Summary of changes:

- XLogOpenRelation is gone. XLogReadBuffer now takes a RelFileNode as
argument, instead of Relation. Fix all callers.
- For the routines that need a fake, or lightweight, Relation struct
anyway, there's a new function called
XLogFakeRelcacheEntry(RelFileNode), that returns a palloc'd Relation struct.
- Add ReadBufferWithoutRelcache variant of ReadBuffer, that takes
RelFileNode instead of Relation as argument. This is what XLogReadBuffer
uses internally.

The only user-visible changes are these error message changes:

1. "invalid page header in block %u of relation %s" and "unexpected data
beyond EOF in block %u of relation \"%s\" messages, emitted from
ReadBuffer, no longer print the relation name, but only its relfilenode.
(This is because relation name is no longer conveniently available where
that message is emitted. It could be passed there if necessary, but it
doesn't seem worth the extra complexity)

2. elog("failed to additem to index page in \"%s\""), in gistutil.c. For
similar reasons, no longer prints the relation name. Unfortunately,
gistfillbuffer() doesn't know the relfilenode either. Again we could
pass it, but it doesn't seem worth the extra complexity, given that this
is just a "should never happen" elog, and the user should usually know
which index is at fault from the context information.


--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/gin/ginxlog.c
--- b/src/backend/access/gin/ginxlog.c
***************
*** 71,82 **** static void
  ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

!     reln = XLogOpenRelation(*node);
!     buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 71,80 ----
  ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;

!     buffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 94,105 **** ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
      ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
-     Relation    reln;
      Buffer        buffer;
      Page        page;

!     reln = XLogOpenRelation(data->node);
!     buffer = XLogReadBuffer(reln, data->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 92,101 ----
  {
      ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
      ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
      Buffer        buffer;
      Page        page;

!     buffer = XLogReadBuffer(data->node, data->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 118,124 **** static void
  ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 114,119 ----
***************
*** 126,133 **** ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(data->node);
!     buffer = XLogReadBuffer(reln, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 121,127 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(data->node, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 228,253 **** static void
  ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        lbuffer,
                  rbuffer;
      Page        lpage,
                  rpage;
      uint32        flags = 0;

-     reln = XLogOpenRelation(data->node);
-
      if (data->isLeaf)
          flags |= GIN_LEAF;
      if (data->isData)
          flags |= GIN_DATA;

!     lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
      Assert(BufferIsValid(lbuffer));
      lpage = (Page) BufferGetPage(lbuffer);
      GinInitBuffer(lbuffer, flags);

!     rbuffer = XLogReadBuffer(reln, data->rblkno, true);
      Assert(BufferIsValid(rbuffer));
      rpage = (Page) BufferGetPage(rbuffer);
      GinInitBuffer(rbuffer, flags);
--- 222,244 ----
  ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
      Buffer        lbuffer,
                  rbuffer;
      Page        lpage,
                  rpage;
      uint32        flags = 0;

      if (data->isLeaf)
          flags |= GIN_LEAF;
      if (data->isData)
          flags |= GIN_DATA;

!     lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
      Assert(BufferIsValid(lbuffer));
      lpage = (Page) BufferGetPage(lbuffer);
      GinInitBuffer(lbuffer, flags);

!     rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
      Assert(BufferIsValid(rbuffer));
      rpage = (Page) BufferGetPage(rbuffer);
      GinInitBuffer(rbuffer, flags);
***************
*** 319,325 **** ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)

      if (data->isRootSplit)
      {
!         Buffer        rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
          Page        rootPage = BufferGetPage(rootBuf);

          GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
--- 310,316 ----

      if (data->isRootSplit)
      {
!         Buffer        rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
          Page        rootPage = BufferGetPage(rootBuf);

          GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
***************
*** 352,358 **** static void
  ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 343,348 ----
***************
*** 360,367 **** ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(data->node);
!     buffer = XLogReadBuffer(reln, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 350,356 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(data->node, data->blkno, false);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 403,417 **** static void
  ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

-     reln = XLogOpenRelation(data->node);
-
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(reln, data->blkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->flags = GIN_DELETED;
--- 392,403 ----
  ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
  {
      ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;

      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(data->node, data->blkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->flags = GIN_DELETED;
***************
*** 423,429 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)

      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(reln, data->parentBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          Assert(!GinPageIsLeaf(page));
--- 409,415 ----

      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          Assert(!GinPageIsLeaf(page));
***************
*** 436,442 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)

      if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
      {
!         buffer = XLogReadBuffer(reln, data->leftBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->rightlink = data->rightLink;
--- 422,428 ----

      if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
      {
!         buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
          page = BufferGetPage(buffer);
          Assert(GinPageIsData(page));
          GinPageGetOpaque(page)->rightlink = data->rightLink;
***************
*** 557,565 **** ginContinueSplit(ginIncompleteSplit *split)
       * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
       * split->leftBlkno, split->rightBlkno);
       */
!     reln = XLogOpenRelation(split->node);

!     buffer = XLogReadBuffer(reln, split->leftBlkno, false);

      if (split->rootBlkno == GIN_ROOT_BLKNO)
      {
--- 543,551 ----
       * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
       * split->leftBlkno, split->rightBlkno);
       */
!     buffer = XLogReadBuffer(split->node, split->leftBlkno, false);

!     reln = XLogFakeRelcacheEntry(split->node);

      if (split->rootBlkno == GIN_ROOT_BLKNO)
      {
***************
*** 581,586 **** ginContinueSplit(ginIncompleteSplit *split)
--- 567,574 ----
                                         GinPageGetOpaque(page)->maxoff))->key;
      }

+     pfree(reln);
+
      btree.rightblkno = split->rightBlkno;

      stack.blkno = split->leftBlkno;
*** a/src/backend/access/gist/gist.c
--- b/src/backend/access/gist/gist.c
***************
*** 462,468 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)

          if (!is_leaf)
              PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
!         gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);

          MarkBufferDirty(state->stack->buffer);

--- 462,468 ----

          if (!is_leaf)
              PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
!         gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);

          MarkBufferDirty(state->stack->buffer);

***************
*** 1008,1014 **** gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
      START_CRIT_SECTION();

      GISTInitBuffer(buffer, 0);
!     gistfillbuffer(r, page, itup, len, FirstOffsetNumber);

      MarkBufferDirty(buffer);

--- 1008,1014 ----
      START_CRIT_SECTION();

      GISTInitBuffer(buffer, 0);
!     gistfillbuffer(page, itup, len, FirstOffsetNumber);

      MarkBufferDirty(buffer);

*** a/src/backend/access/gist/gistutil.c
--- b/src/backend/access/gist/gistutil.c
***************
*** 27,37 **** static Datum attrS[INDEX_MAX_KEYS];
  static bool isnullS[INDEX_MAX_KEYS];

  /*
!  * Write itup vector to page, has no control of free space
   */
! OffsetNumber
! gistfillbuffer(Relation r, Page page, IndexTuple *itup,
!                int len, OffsetNumber off)
  {
      OffsetNumber l = InvalidOffsetNumber;
      int            i;
--- 27,36 ----
  static bool isnullS[INDEX_MAX_KEYS];

  /*
!  * Write itup vector to page, has no control of free space.
   */
! void
! gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
  {
      OffsetNumber l = InvalidOffsetNumber;
      int            i;
***************
*** 42,55 **** gistfillbuffer(Relation r, Page page, IndexTuple *itup,

      for (i = 0; i < len; i++)
      {
!         l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
!                         off, false, false);
          if (l == InvalidOffsetNumber)
!             elog(ERROR, "failed to add item to index page in \"%s\"",
!                  RelationGetRelationName(r));
          off++;
      }
-     return l;
  }

  /*
--- 41,53 ----

      for (i = 0; i < len; i++)
      {
!         Size sz = IndexTupleSize(itup[i]);
!         l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
          if (l == InvalidOffsetNumber)
!             elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
!                  i, len, sz);
          off++;
      }
  }

  /*
*** a/src/backend/access/gist/gistvacuum.c
--- b/src/backend/access/gist/gistvacuum.c
***************
*** 403,409 **** gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
              }
              else
                  /* enough free space */
!                 gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber);
          }
      }

--- 403,409 ----
              }
              else
                  /* enough free space */
!                 gistfillbuffer(tempPage, addon, curlenaddon, InvalidOffsetNumber);
          }
      }

*** a/src/backend/access/gist/gistxlog.c
--- b/src/backend/access/gist/gistxlog.c
***************
*** 189,195 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
  {
      gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
      PageUpdateRecord xlrec;
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 189,194 ----
***************
*** 208,215 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)

      decodePageUpdateRecord(&xlrec, record);

!     reln = XLogOpenRelation(xlrec.data->node);
!     buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 207,213 ----

      decodePageUpdateRecord(&xlrec, record);

!     buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 234,240 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)

      /* add tuples */
      if (xlrec.len > 0)
!         gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);

      /*
       * special case: leafpage, nothing to insert, nothing to delete, then
--- 232,238 ----

      /* add tuples */
      if (xlrec.len > 0)
!         gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);

      /*
       * special case: leafpage, nothing to insert, nothing to delete, then
***************
*** 262,268 **** static void
  gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
  {
      gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 260,265 ----
***************
*** 270,277 **** gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xldata->node);
!     buffer = XLogReadBuffer(reln, xldata->blkno, false);
      if (!BufferIsValid(buffer))
          return;

--- 267,273 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
      if (!BufferIsValid(buffer))
          return;

***************
*** 319,332 **** static void
  gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
  {
      PageSplitRecord xlrec;
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      int            i;
      int            flags;

      decodePageSplitRecord(&xlrec, record);
-     reln = XLogOpenRelation(xlrec.data->node);
      flags = xlrec.data->origleaf ? F_LEAF : 0;

      /* loop around all pages */
--- 315,326 ----
***************
*** 334,340 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
      {
          NewPage    *newpage = xlrec.page + i;

!         buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 328,334 ----
      {
          NewPage    *newpage = xlrec.page + i;

!         buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

***************
*** 342,348 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
          GISTInitBuffer(buffer, flags);

          /* and fill it */
!         gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);

          PageSetLSN(page, lsn);
          PageSetTLI(page, ThisTimeLineID);
--- 336,342 ----
          GISTInitBuffer(buffer, flags);

          /* and fill it */
!         gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);

          PageSetLSN(page, lsn);
          PageSetTLI(page, ThisTimeLineID);
***************
*** 361,372 **** static void
  gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

!     reln = XLogOpenRelation(*node);
!     buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 355,364 ----
  gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
  {
      RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;

!     buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 602,608 **** gistContinueInsert(gistIncompleteInsert *insert)
                  lenitup;
      Relation    index;

!     index = XLogOpenRelation(insert->node);

      /*
       * needed vector itup never will be more than initial lenblkno+2, because
--- 594,600 ----
                  lenitup;
      Relation    index;

!     index = XLogFakeRelcacheEntry(insert->node);

      /*
       * needed vector itup never will be more than initial lenblkno+2, because
***************
*** 624,630 **** gistContinueInsert(gistIncompleteInsert *insert)
           * it was split root, so we should only make new root. it can't be
           * simple insert into root, we should replace all content of root.
           */
!         Buffer        buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);

          gistnewroot(index, buffer, itup, lenitup, NULL);
          UnlockReleaseBuffer(buffer);
--- 616,622 ----
           * it was split root, so we should only make new root. it can't be
           * simple insert into root, we should replace all content of root.
           */
!         Buffer        buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);

          gistnewroot(index, buffer, itup, lenitup, NULL);
          UnlockReleaseBuffer(buffer);
***************
*** 703,709 **** gistContinueInsert(gistIncompleteInsert *insert)
                  LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                  GISTInitBuffer(buffers[numbuffer], 0);
                  pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
!                 gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
                  numbuffer++;

                  if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
--- 695,701 ----
                  LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                  GISTInitBuffer(buffers[numbuffer], 0);
                  pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
!                 gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
                  numbuffer++;

                  if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
***************
*** 749,755 **** gistContinueInsert(gistIncompleteInsert *insert)

                  for (j = 0; j < ntodelete; j++)
                      PageIndexTupleDelete(pages[0], todelete[j]);
!                 gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);

                  rdata = formUpdateRdata(index->rd_node, buffers[0],
                                          todelete, ntodelete,
--- 741,747 ----

                  for (j = 0; j < ntodelete; j++)
                      PageIndexTupleDelete(pages[0], todelete[j]);
!                 gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);

                  rdata = formUpdateRdata(index->rd_node, buffers[0],
                                          todelete, ntodelete,
***************
*** 794,799 **** gistContinueInsert(gistIncompleteInsert *insert)
--- 786,793 ----
          }
      }

+     pfree(index);
+
      ereport(LOG,
              (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
              insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 3944,3950 **** static void
  heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
  {
      xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber *end;
--- 3944,3949 ----
***************
*** 3958,3965 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 3957,3963 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 3980,3986 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
      Assert(nunused >= 0);

      /* Update all item pointers per the record, and repair fragmentation */
!     heap_page_prune_execute(reln, buffer,
                              redirected, nredirected,
                              nowdead, ndead,
                              nowunused, nunused,
--- 3978,3984 ----
      Assert(nunused >= 0);

      /* Update all item pointers per the record, and repair fragmentation */
!     heap_page_prune_execute(buffer,
                              redirected, nredirected,
                              nowdead, ndead,
                              nowunused, nunused,
***************
*** 4002,4016 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
      TransactionId cutoff_xid = xlrec->cutoff_xid;
-     Relation    reln;
      Buffer        buffer;
      Page        page;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 4000,4012 ----
  {
      xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
      TransactionId cutoff_xid = xlrec->cutoff_xid;
      Buffer        buffer;
      Page        page;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 4050,4056 **** static void
  heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;

--- 4046,4051 ----
***************
*** 4058,4065 **** heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
       * Note: the NEWPAGE log record is used for both heaps and indexes, so do
       * not do anything that assumes we are touching a heap.
       */
!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 4053,4059 ----
       * Note: the NEWPAGE log record is used for both heaps and indexes, so do
       * not do anything that assumes we are touching a heap.
       */
!     buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 4076,4082 **** static void
  heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4070,4075 ----
***************
*** 4086,4093 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->target.node);
!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4079,4085 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
***************
*** 4133,4139 **** static void
  heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4125,4130 ----
***************
*** 4149,4159 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

-     reln = XLogOpenRelation(xlrec->target.node);
-
      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(reln,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  true);
          Assert(BufferIsValid(buffer));
--- 4140,4148 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  true);
          Assert(BufferIsValid(buffer));
***************
*** 4163,4169 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      }
      else
      {
!         buffer = XLogReadBuffer(reln,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (!BufferIsValid(buffer))
--- 4152,4158 ----
      }
      else
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (!BufferIsValid(buffer))
***************
*** 4216,4222 **** static void
  heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
  {
      xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-     Relation    reln = XLogOpenRelation(xlrec->target.node);
      Buffer        buffer;
      bool        samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)));
--- 4205,4210 ----
***************
*** 4242,4248 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)

      /* Deal with old tuple version */

!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4230,4236 ----

      /* Deal with old tuple version */

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
***************
*** 4317,4323 **** newt:;

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(reln,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  true);
          Assert(BufferIsValid(buffer));
--- 4305,4311 ----

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  true);
          Assert(BufferIsValid(buffer));
***************
*** 4327,4333 **** newt:;
      }
      else
      {
!         buffer = XLogReadBuffer(reln,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  false);
          if (!BufferIsValid(buffer))
--- 4315,4321 ----
      }
      else
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                                  ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                  false);
          if (!BufferIsValid(buffer))
***************
*** 4399,4405 **** static void
  heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4387,4392 ----
***************
*** 4409,4416 **** heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     reln = XLogOpenRelation(xlrec->target.node);
!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4396,4402 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
***************
*** 4458,4464 **** static void
  heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
-     Relation    reln = XLogOpenRelation(xlrec->target.node);
      Buffer        buffer;
      Page        page;
      OffsetNumber offnum;
--- 4444,4449 ----
***************
*** 4470,4476 **** heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(reln,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
--- 4455,4461 ----
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                              false);
      if (!BufferIsValid(buffer))
*** a/src/backend/access/heap/pruneheap.c
--- b/src/backend/access/heap/pruneheap.c
***************
*** 225,231 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
           * and update the page's hint bit about whether it has free line
           * pointers.
           */
!         heap_page_prune_execute(relation, buffer,
                                  prstate.redirected, prstate.nredirected,
                                  prstate.nowdead, prstate.ndead,
                                  prstate.nowunused, prstate.nunused,
--- 225,231 ----
           * and update the page's hint bit about whether it has free line
           * pointers.
           */
!         heap_page_prune_execute(buffer,
                                  prstate.redirected, prstate.nredirected,
                                  prstate.nowdead, prstate.ndead,
                                  prstate.nowunused, prstate.nunused,
***************
*** 696,702 **** heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
   * arguments are identical to those of log_heap_clean().
   */
  void
! heap_page_prune_execute(Relation reln, Buffer buffer,
                          OffsetNumber *redirected, int nredirected,
                          OffsetNumber *nowdead, int ndead,
                          OffsetNumber *nowunused, int nunused,
--- 696,702 ----
   * arguments are identical to those of log_heap_clean().
   */
  void
! heap_page_prune_execute(Buffer buffer,
                          OffsetNumber *redirected, int nredirected,
                          OffsetNumber *nowdead, int ndead,
                          OffsetNumber *nowunused, int nunused,
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 150,156 **** _bt_restore_page(Page page, char *from, int len)
  }

  static void
! _bt_restore_meta(Relation reln, XLogRecPtr lsn,
                   BlockNumber root, uint32 level,
                   BlockNumber fastroot, uint32 fastlevel)
  {
--- 150,156 ----
  }

  static void
! _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
                   BlockNumber root, uint32 level,
                   BlockNumber fastroot, uint32 fastlevel)
  {
***************
*** 159,165 **** _bt_restore_meta(Relation reln, XLogRecPtr lsn,
      BTMetaPageData *md;
      BTPageOpaque pageop;

!     metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true);
      Assert(BufferIsValid(metabuf));
      metapg = BufferGetPage(metabuf);

--- 159,165 ----
      BTMetaPageData *md;
      BTPageOpaque pageop;

!     metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
      Assert(BufferIsValid(metabuf));
      metapg = BufferGetPage(metabuf);

***************
*** 194,200 **** btree_xlog_insert(bool isleaf, bool ismeta,
                    XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      char       *datapos;
--- 194,199 ----
***************
*** 220,230 **** btree_xlog_insert(bool isleaf, bool ismeta,
      if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
          return;                    /* nothing to do */

-     reln = XLogOpenRelation(xlrec->target.node);
-
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(reln,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (BufferIsValid(buffer))
--- 219,227 ----
      if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
          return;                    /* nothing to do */

      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
                               ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                  false);
          if (BufferIsValid(buffer))
***************
*** 251,257 **** btree_xlog_insert(bool isleaf, bool ismeta,
      }

      if (ismeta)
!         _bt_restore_meta(reln, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);

--- 248,254 ----
      }

      if (ismeta)
!         _bt_restore_meta(xlrec->target.node, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);

***************
*** 265,271 **** btree_xlog_split(bool onleft, bool isroot,
                   XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        rbuf;
      Page        rpage;
      BTPageOpaque ropaque;
--- 262,267 ----
***************
*** 277,284 **** btree_xlog_split(bool onleft, bool isroot,
      Item        left_hikey = NULL;
      Size        left_hikeysz = 0;

-     reln = XLogOpenRelation(xlrec->node);
-
      datapos = (char *) xlrec + SizeOfBtreeSplit;
      datalen = record->xl_len - SizeOfBtreeSplit;

--- 273,278 ----
***************
*** 328,334 **** btree_xlog_split(bool onleft, bool isroot,
      }

      /* Reconstruct right (new) sibling from scratch */
!     rbuf = XLogReadBuffer(reln, xlrec->rightsib, true);
      Assert(BufferIsValid(rbuf));
      rpage = (Page) BufferGetPage(rbuf);

--- 322,328 ----
      }

      /* Reconstruct right (new) sibling from scratch */
!     rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
      Assert(BufferIsValid(rbuf));
      rpage = (Page) BufferGetPage(rbuf);

***************
*** 369,375 **** btree_xlog_split(bool onleft, bool isroot,
       */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         Buffer        lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);

          if (BufferIsValid(lbuf))
          {
--- 363,369 ----
       */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         Buffer        lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);

          if (BufferIsValid(lbuf))
          {
***************
*** 439,445 **** btree_xlog_split(bool onleft, bool isroot,
      /* Fix left-link of the page to the right of the new right sibling */
      if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         Buffer        buffer = XLogReadBuffer(reln, xlrec->rnext, false);

          if (BufferIsValid(buffer))
          {
--- 433,439 ----
      /* Fix left-link of the page to the right of the new right sibling */
      if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         Buffer        buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);

          if (BufferIsValid(buffer))
          {
***************
*** 468,474 **** static void
  btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_delete *xlrec;
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      BTPageOpaque opaque;
--- 462,467 ----
***************
*** 477,484 **** btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
          return;

      xlrec = (xl_btree_delete *) XLogRecGetData(record);
!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 470,476 ----
          return;

      xlrec = (xl_btree_delete *) XLogRecGetData(record);
!     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 517,523 **** static void
  btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
-     Relation    reln;
      BlockNumber parent;
      BlockNumber target;
      BlockNumber leftsib;
--- 509,514 ----
***************
*** 526,532 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      Page        page;
      BTPageOpaque pageop;

-     reln = XLogOpenRelation(xlrec->target.node);
      parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
      target = xlrec->deadblk;
      leftsib = xlrec->leftblk;
--- 517,522 ----
***************
*** 535,541 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      /* parent page */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(reln, parent, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
--- 525,531 ----
      /* parent page */
      if (!(record->xl_info & XLR_BKP_BLOCK_1))
      {
!         buffer = XLogReadBuffer(xlrec->target.node, parent, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
***************
*** 581,587 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      /* Fix left-link of right sibling */
      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(reln, rightsib, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
--- 571,577 ----
      /* Fix left-link of right sibling */
      if (!(record->xl_info & XLR_BKP_BLOCK_2))
      {
!         buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
          if (BufferIsValid(buffer))
          {
              page = (Page) BufferGetPage(buffer);
***************
*** 607,613 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      {
          if (leftsib != P_NONE)
          {
!             buffer = XLogReadBuffer(reln, leftsib, false);
              if (BufferIsValid(buffer))
              {
                  page = (Page) BufferGetPage(buffer);
--- 597,603 ----
      {
          if (leftsib != P_NONE)
          {
!             buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
              if (BufferIsValid(buffer))
              {
                  page = (Page) BufferGetPage(buffer);
***************
*** 630,636 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
      }

      /* Rewrite target page as empty deleted page */
!     buffer = XLogReadBuffer(reln, target, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 620,626 ----
      }

      /* Rewrite target page as empty deleted page */
!     buffer = XLogReadBuffer(xlrec->target.node, target, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 655,661 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)

          memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
                 sizeof(xl_btree_metadata));
!         _bt_restore_meta(reln, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);
      }
--- 645,651 ----

          memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
                 sizeof(xl_btree_metadata));
!         _bt_restore_meta(xlrec->target.node, lsn,
                           md.root, md.level,
                           md.fastroot, md.fastlevel);
      }
***************
*** 672,685 **** static void
  btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      BTPageOpaque pageop;
      BlockNumber downlink = 0;

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 662,673 ----
  btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  {
      xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
      Buffer        buffer;
      Page        page;
      BTPageOpaque pageop;
      BlockNumber downlink = 0;

!     buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

***************
*** 711,717 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);

!     _bt_restore_meta(reln, lsn,
                       xlrec->rootblk, xlrec->level,
                       xlrec->rootblk, xlrec->level);

--- 699,705 ----
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);

!     _bt_restore_meta(xlrec->node, lsn,
                       xlrec->rootblk, xlrec->level,
                       xlrec->rootblk, xlrec->level);

***************
*** 904,912 **** btree_xlog_cleanup(void)
      foreach(l, incomplete_actions)
      {
          bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
-         Relation    reln;

-         reln = XLogOpenRelation(action->node);
          if (action->is_split)
          {
              /* finish an incomplete split */
--- 892,898 ----
***************
*** 917,930 **** btree_xlog_cleanup(void)
              BTPageOpaque lpageop,
                          rpageop;
              bool        is_only;

!             lbuf = XLogReadBuffer(reln, action->leftblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(lbuf))
                  elog(PANIC, "btree_xlog_cleanup: left block unfound");
              lpage = (Page) BufferGetPage(lbuf);
              lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
!             rbuf = XLogReadBuffer(reln, action->rightblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(rbuf))
                  elog(PANIC, "btree_xlog_cleanup: right block unfound");
--- 903,917 ----
              BTPageOpaque lpageop,
                          rpageop;
              bool        is_only;
+             Relation    reln;

!             lbuf = XLogReadBuffer(action->node, action->leftblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(lbuf))
                  elog(PANIC, "btree_xlog_cleanup: left block unfound");
              lpage = (Page) BufferGetPage(lbuf);
              lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
!             rbuf = XLogReadBuffer(action->node, action->rightblk, false);
              /* failure is impossible because we wrote this page earlier */
              if (!BufferIsValid(rbuf))
                  elog(PANIC, "btree_xlog_cleanup: right block unfound");
***************
*** 934,951 **** btree_xlog_cleanup(void)
              /* if the pages are all of their level, it's a only-page split */
              is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);

              _bt_insert_parent(reln, lbuf, rbuf, NULL,
                                action->is_root, is_only);
          }
          else
          {
              /* finish an incomplete deletion (of a half-dead page) */
              Buffer        buf;

!             buf = XLogReadBuffer(reln, action->delblk, false);
              if (BufferIsValid(buf))
                  if (_bt_pagedel(reln, buf, NULL, true) == 0)
                      elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
          }
      }
      incomplete_actions = NIL;
--- 921,946 ----
              /* if the pages are all of their level, it's a only-page split */
              is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);

+             reln = XLogFakeRelcacheEntry(action->node);
              _bt_insert_parent(reln, lbuf, rbuf, NULL,
                                action->is_root, is_only);
+             pfree(reln);
          }
          else
          {
              /* finish an incomplete deletion (of a half-dead page) */
              Buffer        buf;

!             buf = XLogReadBuffer(action->node, action->delblk, false);
              if (BufferIsValid(buf))
+             {
+                 Relation reln;
+
+                 reln = XLogFakeRelcacheEntry(action->node);
                  if (_bt_pagedel(reln, buf, NULL, true) == 0)
                      elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
+                 pfree(reln);
+             }
          }
      }
      incomplete_actions = NIL;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2840,2846 **** CleanupBackupHistory(void)
  static void
  RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
  {
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      BkpBlock    bkpb;
--- 2840,2845 ----
***************
*** 2856,2863 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
          memcpy(&bkpb, blk, sizeof(BkpBlock));
          blk += sizeof(BkpBlock);

!         reln = XLogOpenRelation(bkpb.node);
!         buffer = XLogReadBuffer(reln, bkpb.block, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 2855,2861 ----
          memcpy(&bkpb, blk, sizeof(BkpBlock));
          blk += sizeof(BkpBlock);

!         buffer = XLogReadBuffer(bkpb.node, bkpb.block, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

***************
*** 5064,5072 **** StartupXLOG(void)
                                  BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
          }

!         /* Start up the recovery environment */
!         XLogInitRelationCache();
!
          for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
          {
              if (RmgrTable[rmid].rm_startup != NULL)
--- 5062,5068 ----
                                  BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
          }

!         /* Initialize resource managers */
          for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
          {
              if (RmgrTable[rmid].rm_startup != NULL)
***************
*** 5330,5340 **** StartupXLOG(void)
           * allows some extra error checking in xlog_redo.
           */
          CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
-
-         /*
-          * Close down recovery environment
-          */
-         XLogCloseRelationCache();
      }

      /*
--- 5326,5331 ----
*** a/src/backend/access/transam/xlogutils.c
--- b/src/backend/access/transam/xlogutils.c
***************
*** 190,195 **** XLogCheckInvalidPages(void)
--- 190,198 ----

      if (foundone)
          elog(PANIC, "WAL contains references to invalid pages");
+
+     hash_destroy(invalid_page_tab);
+     invalid_page_tab = NULL;
  }


***************
*** 218,244 **** XLogCheckInvalidPages(void)
   * at the end of WAL replay.)
   */
  Buffer
! XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
  {
!     BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
      Buffer        buffer;

      Assert(blkno != P_NEW);

      if (blkno < lastblock)
      {
          /* page exists in file */
!         if (init)
!             buffer = ReadOrZeroBuffer(reln, blkno);
!         else
!             buffer = ReadBuffer(reln, blkno);
      }
      else
      {
          /* hm, page doesn't exist in file */
          if (!init)
          {
!             log_invalid_page(reln->rd_node, blkno, false);
              return InvalidBuffer;
          }
          /* OK to extend the file */
--- 221,260 ----
   * at the end of WAL replay.)
   */
  Buffer
! XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
!     BlockNumber lastblock;
      Buffer        buffer;
+     SMgrRelation smgr;

      Assert(blkno != P_NEW);

+     /* Open the relation at smgr level */
+     smgr = smgropen(rnode);
+
+     /*
+      * Create the target file if it doesn't already exist.  This lets us cope
+      * if the replay sequence contains writes to a relation that is later
+      * deleted.  (The original coding of this routine would instead suppress
+      * the writes, but that seems like it risks losing valuable data if the
+      * filesystem loses an inode during a crash.  Better to write the data
+      * until we are actually told to delete the file.)
+      */
+     smgrcreate(smgr, false, true);
+
+     lastblock = smgrnblocks(smgr);
+
      if (blkno < lastblock)
      {
          /* page exists in file */
!         buffer = ReadBufferWithoutRelcache(rnode, false, blkno, init);
      }
      else
      {
          /* hm, page doesn't exist in file */
          if (!init)
          {
!             log_invalid_page(rnode, blkno, false);
              return InvalidBuffer;
          }
          /* OK to extend the file */
***************
*** 249,255 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
          {
              if (buffer != InvalidBuffer)
                  ReleaseBuffer(buffer);
!             buffer = ReadBuffer(reln, P_NEW);
              lastblock++;
          }
          Assert(BufferGetBlockNumber(buffer) == blkno);
--- 265,271 ----
          {
              if (buffer != InvalidBuffer)
                  ReleaseBuffer(buffer);
!             buffer = ReadBufferWithoutRelcache(rnode, false, P_NEW, false);
              lastblock++;
          }
          Assert(BufferGetBlockNumber(buffer) == blkno);
***************
*** 265,271 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
          if (PageIsNew((PageHeader) page))
          {
              UnlockReleaseBuffer(buffer);
!             log_invalid_page(reln->rd_node, blkno, true);
              return InvalidBuffer;
          }
      }
--- 281,287 ----
          if (PageIsNew((PageHeader) page))
          {
              UnlockReleaseBuffer(buffer);
!             log_invalid_page(rnode, blkno, true);
              return InvalidBuffer;
          }
      }
***************
*** 275,500 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)


  /*
!  * Lightweight "Relation" cache --- this substitutes for the normal relcache
!  * during XLOG replay.
   */
!
! typedef struct XLogRelDesc
! {
!     RelationData reldata;
!     struct XLogRelDesc *lessRecently;
!     struct XLogRelDesc *moreRecently;
! } XLogRelDesc;
!
! typedef struct XLogRelCacheEntry
  {
!     RelFileNode rnode;
!     XLogRelDesc *rdesc;
! } XLogRelCacheEntry;

! static HTAB *_xlrelcache;
! static XLogRelDesc *_xlrelarr = NULL;
! static Form_pg_class _xlpgcarr = NULL;
! static int    _xlast = 0;
! static int    _xlcnt = 0;
!
! #define _XLOG_RELCACHESIZE    512
!
! static void
! _xl_init_rel_cache(void)
! {
!     HASHCTL        ctl;
!
!     _xlcnt = _XLOG_RELCACHESIZE;
!     _xlast = 0;
!     _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
!     memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
!     _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
!     memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
!
!     _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
!     _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
!
!     memset(&ctl, 0, sizeof(ctl));
!     ctl.keysize = sizeof(RelFileNode);
!     ctl.entrysize = sizeof(XLogRelCacheEntry);
!     ctl.hash = tag_hash;
!
!     _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
!                               &ctl, HASH_ELEM | HASH_FUNCTION);
! }
!
! static void
! _xl_remove_hash_entry(XLogRelDesc *rdesc)
! {
!     Form_pg_class tpgc = rdesc->reldata.rd_rel;
!     XLogRelCacheEntry *hentry;
!
!     rdesc->lessRecently->moreRecently = rdesc->moreRecently;
!     rdesc->moreRecently->lessRecently = rdesc->lessRecently;
!
!     hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
!                       (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
!     if (hentry == NULL)
!         elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
!
!     RelationCloseSmgr(&(rdesc->reldata));
!
!     memset(rdesc, 0, sizeof(XLogRelDesc));
!     memset(tpgc, 0, sizeof(FormData_pg_class));
!     rdesc->reldata.rd_rel = tpgc;
! }
!
! static XLogRelDesc *
! _xl_new_reldesc(void)
! {
!     XLogRelDesc *res;
!
!     _xlast++;
!     if (_xlast < _xlcnt)
!     {
!         _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
!         return &(_xlrelarr[_xlast]);
!     }
!
!     /* reuse */
!     res = _xlrelarr[0].moreRecently;
!
!     _xl_remove_hash_entry(res);
!
!     _xlast--;
!     return res;
! }
!
!
! void
! XLogInitRelationCache(void)
! {
!     _xl_init_rel_cache();
!     invalid_page_tab = NULL;
! }
!
! void
! XLogCloseRelationCache(void)
! {
!     HASH_SEQ_STATUS status;
!     XLogRelCacheEntry *hentry;
!
!     if (!_xlrelarr)
!         return;
!
!     hash_seq_init(&status, _xlrelcache);
!
!     while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
!         _xl_remove_hash_entry(hentry->rdesc);
!
!     hash_destroy(_xlrelcache);
!
!     free(_xlrelarr);
!     free(_xlpgcarr);
!
!     _xlrelarr = NULL;
! }

  /*
!  * Open a relation during XLOG replay
   *
!  * Note: this once had an API that allowed NULL return on failure, but it
!  * no longer does; any failure results in elog().
   */
  Relation
! XLogOpenRelation(RelFileNode rnode)
  {
!     XLogRelDesc *res;
!     XLogRelCacheEntry *hentry;
!     bool        found;

!     hentry = (XLogRelCacheEntry *)
!         hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);

!     if (hentry)
!     {
!         res = hentry->rdesc;

!         res->lessRecently->moreRecently = res->moreRecently;
!         res->moreRecently->lessRecently = res->lessRecently;
!     }
!     else
!     {
!         res = _xl_new_reldesc();
!
!         sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
!
!         res->reldata.rd_node = rnode;
!
!         /*
!          * We set up the lockRelId in case anything tries to lock the dummy
!          * relation.  Note that this is fairly bogus since relNode may be
!          * different from the relation's OID.  It shouldn't really matter
!          * though, since we are presumably running by ourselves and can't have
!          * any lock conflicts ...
!          */
!         res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
!         res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
!
!         hentry = (XLogRelCacheEntry *)
!             hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
!
!         if (found)
!             elog(PANIC, "xlog relation already present on insert into cache");
!
!         hentry->rdesc = res;
!
!         res->reldata.rd_targblock = InvalidBlockNumber;
!         res->reldata.rd_smgr = NULL;
!         RelationOpenSmgr(&(res->reldata));
!
!         /*
!          * Create the target file if it doesn't already exist.  This lets us
!          * cope if the replay sequence contains writes to a relation that is
!          * later deleted.  (The original coding of this routine would instead
!          * return NULL, causing the writes to be suppressed. But that seems
!          * like it risks losing valuable data if the filesystem loses an inode
!          * during a crash.    Better to write the data until we are actually
!          * told to delete the file.)
!          */
!         smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
!     }

!     res->moreRecently = &(_xlrelarr[0]);
!     res->lessRecently = _xlrelarr[0].lessRecently;
!     _xlrelarr[0].lessRecently = res;
!     res->lessRecently->moreRecently = res;

!     return &(res->reldata);
  }

  /*
   * Drop a relation during XLOG replay
   *
!  * This is called when the relation is about to be deleted; we need to ensure
!  * that there is no dangling smgr reference in the xlog relation cache.
!  *
!  * Currently, we don't bother to physically remove the relation from the
!  * cache, we just let it age out normally.
!  *
!  * This also takes care of removing any open "invalid-page" records for
!  * the relation.
   */
  void
  XLogDropRelation(RelFileNode rnode)
  {
!     XLogRelCacheEntry *hentry;
!
!     hentry = (XLogRelCacheEntry *)
!         hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
!
!     if (hentry)
!     {
!         XLogRelDesc *rdesc = hentry->rdesc;
!
!         RelationCloseSmgr(&(rdesc->reldata));
!     }

      forget_invalid_pages(rnode, 0);
  }
--- 291,365 ----


  /*
!  * Struct actually returned by XLogFakeRelcacheEntry, though the declared
!  * return type is Relation.
   */
! typedef struct
  {
!     RelationData        reldata;    /* Note: this must be first */
!     FormData_pg_class    pgc;
! } FakeRelCacheEntryData;

! typedef FakeRelCacheEntryData *FakeRelCacheEntry;

  /*
!  * Create a fake relation cache entry for a physical relation
   *
!  * It's often convenient to use the same functions in XLOG replay as in the
!  * main codepath, but those functions typically work with a relcache entry.
!  * We don't have a working relation cache during XLOG replay, but this
!  * function can be used to create a fake relcache entry instead. Only the
!  * fields related to physical storage, like rd_rel, are initialized, so the
!  * fake entry is only usable in low-level operations.
!  *
!  * Caller must pfree() the returned entry.
   */
  Relation
! XLogFakeRelcacheEntry(RelFileNode rnode)
  {
!     FakeRelCacheEntry fakeentry;
!     Relation rel;

!     /*
!      * We allocate the RelationData struct and all related space in one
!      * block, so that the caller can free it with one simple pfree call.
!      */
!     fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
!     rel = (Relation) fakeentry;

!     rel->rd_rel = &fakeentry->pgc;
!     rel->rd_node = rnode;

!     /* We don't know the name of the relation; use relfilenode instead */
!     sprintf(RelationGetRelationName(rel), "%u", rnode.relNode);
!
!     /*
!      * We set up the lockRelId in case anything tries to lock the dummy
!      * relation.  Note that this is fairly bogus since relNode may be
!      * different from the relation's OID.  It shouldn't really matter
!      * though, since we are presumably running by ourselves and can't have
!      * any lock conflicts ...
!      */
!     rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode;
!     rel->rd_lockInfo.lockRelId.relId = rnode.relNode;

!     rel->rd_targblock = InvalidBlockNumber;
!     rel->rd_smgr = NULL;

!     return rel;
  }

  /*
   * Drop a relation during XLOG replay
   *
!  * This is called when the relation is about to be deleted; we need to remove
!  * any open "invalid-page" records for the relation.
   */
  void
  XLogDropRelation(RelFileNode rnode)
  {
!     /* Tell smgr to forget about this relation as well */
!     smgrclosenode(rnode);

      forget_invalid_pages(rnode, 0);
  }
***************
*** 507,524 **** XLogDropRelation(RelFileNode rnode)
  void
  XLogDropDatabase(Oid dbid)
  {
!     HASH_SEQ_STATUS status;
!     XLogRelCacheEntry *hentry;
!
!     hash_seq_init(&status, _xlrelcache);
!
!     while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
!     {
!         XLogRelDesc *rdesc = hentry->rdesc;
!
!         if (hentry->rnode.dbNode == dbid)
!             RelationCloseSmgr(&(rdesc->reldata));
!     }

      forget_invalid_pages_db(dbid);
  }
--- 372,385 ----
  void
  XLogDropDatabase(Oid dbid)
  {
!     /*
!      * This is unnecessarily heavy-handed, as it will close SMgrRelation
!      * objects for other databases as well. DROP DATABASE occurs seldom
!      * enough that it's not worth introducing a variant of smgrclose for
!      * just this purpose. XXX: Or should we rather leave the smgr entries
!      * dangling?
!      */
!     smgrcloseall();

      forget_invalid_pages_db(dbid);
  }
***************
*** 526,533 **** XLogDropDatabase(Oid dbid)
  /*
   * Truncate a relation during XLOG replay
   *
!  * We don't need to do anything to the fake relcache, but we do need to
!  * clean up any open "invalid-page" records for the dropped pages.
   */
  void
  XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks)
--- 387,393 ----
  /*
   * Truncate a relation during XLOG replay
   *
!  * We need to clean up any open "invalid-page" records for the dropped pages.
   */
  void
  XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks)
*** a/src/backend/commands/sequence.c
--- b/src/backend/commands/sequence.c
***************
*** 1332,1338 **** void
  seq_redo(XLogRecPtr lsn, XLogRecord *record)
  {
      uint8        info = record->xl_info & ~XLR_INFO_MASK;
-     Relation    reln;
      Buffer        buffer;
      Page        page;
      char       *item;
--- 1332,1337 ----
***************
*** 1343,1350 **** seq_redo(XLogRecPtr lsn, XLogRecord *record)
      if (info != XLOG_SEQ_LOG)
          elog(PANIC, "seq_redo: unknown op code %u", info);

!     reln = XLogOpenRelation(xlrec->node);
!     buffer = XLogReadBuffer(reln, 0, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

--- 1342,1348 ----
      if (info != XLOG_SEQ_LOG)
          elog(PANIC, "seq_redo: unknown op code %u", info);

!     buffer = XLogReadBuffer(xlrec->node, 0, true);
      Assert(BufferIsValid(buffer));
      page = (Page) BufferGetPage(buffer);

*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
***************
*** 76,84 **** static bool IsForInput;
  static volatile BufferDesc *PinCountWaitBuf = NULL;


! static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum,
!                   bool zeroPage,
!                   BufferAccessStrategy strategy);
  static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
--- 76,85 ----
  static volatile BufferDesc *PinCountWaitBuf = NULL;


! static Buffer ReadBuffer_relcache(Relation reln, BlockNumber blockNum,
!                   bool zeroPage, BufferAccessStrategy strategy);
! static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf, BlockNumber blockNum,
!                   bool zeroPage, BufferAccessStrategy strategy, bool *hit);
  static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
***************
*** 89,95 **** static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
  static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
                    int set_flag_bits);
  static void buffer_write_error_callback(void *arg);
! static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr);
  static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
--- 90,96 ----
  static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
                    int set_flag_bits);
  static void buffer_write_error_callback(void *arg);
! static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr);
  static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
***************
*** 114,120 **** static void AtProcExit_Buffers(int code, Datum arg);
  Buffer
  ReadBuffer(Relation reln, BlockNumber blockNum)
  {
!     return ReadBuffer_common(reln, blockNum, false, NULL);
  }

  /*
--- 115,121 ----
  Buffer
  ReadBuffer(Relation reln, BlockNumber blockNum)
  {
!     return ReadBuffer_relcache(reln, blockNum, false, NULL);
  }

  /*
***************
*** 125,131 **** Buffer
  ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                         BufferAccessStrategy strategy)
  {
!     return ReadBuffer_common(reln, blockNum, false, strategy);
  }

  /*
--- 126,132 ----
  ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                         BufferAccessStrategy strategy)
  {
!     return ReadBuffer_relcache(reln, blockNum, false, strategy);
  }

  /*
***************
*** 142,182 **** ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
  Buffer
  ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
  {
!     return ReadBuffer_common(reln, blockNum, true, NULL);
  }

  /*
!  * ReadBuffer_common -- common logic for ReadBuffer variants
   */
  static Buffer
! ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
!                   BufferAccessStrategy strategy)
  {
      volatile BufferDesc *bufHdr;
      Block        bufBlock;
      bool        found;
      bool        isExtend;
!     bool        isLocalBuf;

      /* Make sure we will have room to remember the buffer pin */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      isExtend = (blockNum == P_NEW);
-     isLocalBuf = reln->rd_istemp;
-
-     /* Open it at the smgr level if not already done */
-     RelationOpenSmgr(reln);

      /* Substitute proper block number if caller asked for P_NEW */
      if (isExtend)
!         blockNum = smgrnblocks(reln->rd_smgr);
!
!     pgstat_count_buffer_read(reln);

      if (isLocalBuf)
      {
          ReadLocalBufferCount++;
!         bufHdr = LocalBufferAlloc(reln, blockNum, &found);
          if (found)
              LocalBufferHitCount++;
      }
--- 143,221 ----
  Buffer
  ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
  {
!     return ReadBuffer_relcache(reln, blockNum, true, NULL);
  }

  /*
!  * ReadBufferWithoutRelcache -- like ReadBuffer, but can be used to read
!  *        read a page without having a relcache entry. If zeroPage is true,
!  *        this behaves like ReadOrZeroBuffer rather than ReadBuffer.
!  */
! Buffer
! ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
!                           BlockNumber blockNum, bool zeroPage)
! {
!     bool hit;
!
!     SMgrRelation smgr = smgropen(rnode);
!     return ReadBuffer_common(smgr, isTemp, blockNum, zeroPage, NULL, &hit);
! }
!
! /*
!  * ReadBuffer_relcache -- common logic for ReadBuffer-variants that
!  *        operate on a Relation.
!  */
! static Buffer
! ReadBuffer_relcache(Relation reln, BlockNumber blockNum,
!                     bool zeroPage, BufferAccessStrategy strategy)
! {
!     bool hit;
!     Buffer buf;
!
!     /* Open it at the smgr level if not already done */
!     RelationOpenSmgr(reln);
!
!     /*
!      * Read the buffer, and update pgstat counters to reflect a cache
!      * hit or miss.
!      */
!     pgstat_count_buffer_read(reln);
!     buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum,
!                             zeroPage, strategy, &hit);
!     if (hit)
!         pgstat_count_buffer_hit(reln);
!     return buf;
! }
!
! /*
!  * ReadBuffer_common -- common logic for all ReadBuffer variants
!  *
!  * *hit is set to true if the request was satisfied from shared buffer cache.
   */
  static Buffer
! ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, BlockNumber blockNum,
!                   bool zeroPage, BufferAccessStrategy strategy, bool *hit)
  {
      volatile BufferDesc *bufHdr;
      Block        bufBlock;
      bool        found;
      bool        isExtend;
!
!     *hit = false;

      /* Make sure we will have room to remember the buffer pin */
      ResourceOwnerEnlargeBuffers(CurrentResourceOwner);

      isExtend = (blockNum == P_NEW);

      /* Substitute proper block number if caller asked for P_NEW */
      if (isExtend)
!         blockNum = smgrnblocks(smgr);

      if (isLocalBuf)
      {
          ReadLocalBufferCount++;
!         bufHdr = LocalBufferAlloc(smgr, blockNum, &found);
          if (found)
              LocalBufferHitCount++;
      }
***************
*** 188,194 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
           * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
           * not currently in memory.
           */
!         bufHdr = BufferAlloc(reln, blockNum, strategy, &found);
          if (found)
              BufferHitCount++;
      }
--- 227,233 ----
           * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
           * not currently in memory.
           */
!         bufHdr = BufferAlloc(smgr, blockNum, strategy, &found);
          if (found)
              BufferHitCount++;
      }
***************
*** 201,207 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
          if (!isExtend)
          {
              /* Just need to update stats before we exit */
!             pgstat_count_buffer_hit(reln);

              if (VacuumCostActive)
                  VacuumCostBalance += VacuumCostPageHit;
--- 240,246 ----
          if (!isExtend)
          {
              /* Just need to update stats before we exit */
!             *hit = true;

              if (VacuumCostActive)
                  VacuumCostBalance += VacuumCostPageHit;
***************
*** 225,232 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
          bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
          if (!PageIsNew((PageHeader) bufBlock))
              ereport(ERROR,
!                     (errmsg("unexpected data beyond EOF in block %u of relation \"%s\"",
!                             blockNum, RelationGetRelationName(reln)),
                       errhint("This has been seen to occur with buggy kernels; consider updating your system.")));

          /*
--- 264,271 ----
          bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
          if (!PageIsNew((PageHeader) bufBlock))
              ereport(ERROR,
!                     (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u",
!                             blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode),
                       errhint("This has been seen to occur with buggy kernels; consider updating your system.")));

          /*
***************
*** 278,285 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
      {
          /* new buffers are zero-filled */
          MemSet((char *) bufBlock, 0, BLCKSZ);
!         smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
!                    reln->rd_istemp);
      }
      else
      {
--- 317,324 ----
      {
          /* new buffers are zero-filled */
          MemSet((char *) bufBlock, 0, BLCKSZ);
!         smgrextend(smgr, blockNum, (char *) bufBlock,
!                    isLocalBuf);
      }
      else
      {
***************
*** 290,296 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
          if (zeroPage)
              MemSet((char *) bufBlock, 0, BLCKSZ);
          else
!             smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
          /* check for garbage data */
          if (!PageHeaderIsValid((PageHeader) bufBlock))
          {
--- 329,335 ----
          if (zeroPage)
              MemSet((char *) bufBlock, 0, BLCKSZ);
          else
!             smgrread(smgr, blockNum, (char *) bufBlock);
          /* check for garbage data */
          if (!PageHeaderIsValid((PageHeader) bufBlock))
          {
***************
*** 298,312 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
              {
                  ereport(WARNING,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                          errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
!                                 blockNum, RelationGetRelationName(reln))));
                  MemSet((char *) bufBlock, 0, BLCKSZ);
              }
              else
                  ereport(ERROR,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                  errmsg("invalid page header in block %u of relation \"%s\"",
!                         blockNum, RelationGetRelationName(reln))));
          }
      }

--- 337,356 ----
              {
                  ereport(WARNING,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                          errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page",
!                                 blockNum,
!                                 smgr->smgr_rnode.spcNode,
!                                 smgr->smgr_rnode.dbNode,
!                                 smgr->smgr_rnode.relNode)));
                  MemSet((char *) bufBlock, 0, BLCKSZ);
              }
              else
                  ereport(ERROR,
                          (errcode(ERRCODE_DATA_CORRUPTED),
!                  errmsg("invalid page header in block %u of relation %u/%u/%u",
!                         blockNum, smgr->smgr_rnode.spcNode,
!                         smgr->smgr_rnode.dbNode,
!                         smgr->smgr_rnode.relNode)));
          }
      }

***************
*** 347,353 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
   * No locks are held either at entry or exit.
   */
  static volatile BufferDesc *
! BufferAlloc(Relation reln,
              BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr)
--- 391,397 ----
   * No locks are held either at entry or exit.
   */
  static volatile BufferDesc *
! BufferAlloc(SMgrRelation smgr,
              BlockNumber blockNum,
              BufferAccessStrategy strategy,
              bool *foundPtr)
***************
*** 364,370 **** BufferAlloc(Relation reln,
      bool        valid;

      /* create a tag so we can lookup the buffer */
!     INIT_BUFFERTAG(newTag, reln, blockNum);

      /* determine its hash code and partition lock ID */
      newHash = BufTableHashCode(&newTag);
--- 408,414 ----
      bool        valid;

      /* create a tag so we can lookup the buffer */
!     INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);

      /* determine its hash code and partition lock ID */
      newHash = BufTableHashCode(&newTag);
*** a/src/backend/storage/buffer/localbuf.c
--- b/src/backend/storage/buffer/localbuf.c
***************
*** 61,67 **** static Block GetLocalBufferStorage(void);
   * (hence, usage_count is always advanced).
   */
  BufferDesc *
! LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
  {
      BufferTag    newTag;            /* identity of requested block */
      LocalBufferLookupEnt *hresult;
--- 61,67 ----
   * (hence, usage_count is always advanced).
   */
  BufferDesc *
! LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
  {
      BufferTag    newTag;            /* identity of requested block */
      LocalBufferLookupEnt *hresult;
***************
*** 70,76 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
      int            trycounter;
      bool        found;

!     INIT_BUFFERTAG(newTag, reln, blockNum);

      /* Initialize local buffers if first request in this session */
      if (LocalBufHash == NULL)
--- 70,76 ----
      int            trycounter;
      bool        found;

!     INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);

      /* Initialize local buffers if first request in this session */
      if (LocalBufHash == NULL)
***************
*** 87,93 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
          Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
  #ifdef LBDEBUG
          fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
!                 RelationGetRelid(reln), blockNum, -b - 1);
  #endif
          /* this part is equivalent to PinBuffer for a shared buffer */
          if (LocalRefCount[b] == 0)
--- 87,93 ----
          Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
  #ifdef LBDEBUG
          fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
!                 smgr->smgr_rnode.relNode, blockNum, -b - 1);
  #endif
          /* this part is equivalent to PinBuffer for a shared buffer */
          if (LocalRefCount[b] == 0)
*** a/src/backend/storage/smgr/md.c
--- b/src/backend/storage/smgr/md.c
***************
*** 208,216 **** mdcreate(SMgrRelation reln, bool isRedo)
      char       *path;
      File        fd;

-     if (isRedo && reln->md_fd != NULL)
-         return;                    /* created and opened already... */
-
      Assert(reln->md_fd == NULL);

      path = relpath(reln->smgr_rnode);
--- 208,213 ----
*** a/src/backend/storage/smgr/smgr.c
--- b/src/backend/storage/smgr/smgr.c
***************
*** 330,335 **** smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo)
--- 330,338 ----
      xl_smgr_create xlrec;
      PendingRelDelete *pending;

+     if (isRedo && reln->md_fd != NULL)
+         return;                    /* created and opened already... */
+
      /*
       * We may be using the target table space for the first time in this
       * database, so create a per-database subdirectory if needed.
*** a/src/backend/utils/init/flatfiles.c
--- b/src/backend/utils/init/flatfiles.c
***************
*** 704,715 **** BuildFlatFiles(bool database_only)
                  rel_authid,
                  rel_authmem;

-     /*
-      * We don't have any hope of running a real relcache, but we can use the
-      * same fake-relcache facility that WAL replay uses.
-      */
-     XLogInitRelationCache();
-
      /* Need a resowner to keep the heapam and buffer code happy */
      owner = ResourceOwnerCreate(NULL, "BuildFlatFiles");
      CurrentResourceOwner = owner;
--- 704,709 ----
***************
*** 719,727 **** BuildFlatFiles(bool database_only)
      rnode.dbNode = 0;
      rnode.relNode = DatabaseRelationId;

!     /* No locking is needed because no one else is alive yet */
!     rel_db = XLogOpenRelation(rnode);
      write_database_file(rel_db, true);

      if (!database_only)
      {
--- 713,727 ----
      rnode.dbNode = 0;
      rnode.relNode = DatabaseRelationId;

!     /*
!      * We don't have any hope of running a real relcache, but we can use the
!      * same fake-relcache facility that WAL replay uses.
!      *
!      * No locking is needed because no one else is alive yet.
!      */
!     rel_db = XLogFakeRelcacheEntry(rnode);
      write_database_file(rel_db, true);
+     pfree(rel_db);

      if (!database_only)
      {
***************
*** 729,749 **** BuildFlatFiles(bool database_only)
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthIdRelationId;
!         rel_authid = XLogOpenRelation(rnode);

          /* hard-wired path to pg_auth_members */
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthMemRelationId;
!         rel_authmem = XLogOpenRelation(rnode);

          write_auth_file(rel_authid, rel_authmem);
      }

      CurrentResourceOwner = NULL;
      ResourceOwnerDelete(owner);
-
-     XLogCloseRelationCache();
  }


--- 729,749 ----
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthIdRelationId;
!         rel_authid = XLogFakeRelcacheEntry(rnode);

          /* hard-wired path to pg_auth_members */
          rnode.spcNode = GLOBALTABLESPACE_OID;
          rnode.dbNode = 0;
          rnode.relNode = AuthMemRelationId;
!         rel_authmem = XLogFakeRelcacheEntry(rnode);

          write_auth_file(rel_authid, rel_authmem);
+         pfree(rel_authid);
+         pfree(rel_authmem);
      }

      CurrentResourceOwner = NULL;
      ResourceOwnerDelete(owner);
  }


*** a/src/include/access/gist_private.h
--- b/src/include/access/gist_private.h
***************
*** 284,291 **** extern bool gistfitpage(IndexTuple *itvec, int len);
  extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
  extern void gistcheckpage(Relation rel, Buffer buf);
  extern Buffer gistNewBuffer(Relation r);
! extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
!                int len, OffsetNumber off);
  extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
  extern IndexTuple *gistjoinvector(
                 IndexTuple *itvec, int *len,
--- 284,291 ----
  extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
  extern void gistcheckpage(Relation rel, Buffer buf);
  extern Buffer gistNewBuffer(Relation r);
! extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
!                            OffsetNumber off);
  extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
  extern IndexTuple *gistjoinvector(
                 IndexTuple *itvec, int *len,
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 124,130 **** extern void heap_page_prune_opt(Relation relation, Buffer buffer,
  extern int heap_page_prune(Relation relation, Buffer buffer,
                  TransactionId OldestXmin,
                  bool redirect_move, bool report_stats);
! extern void heap_page_prune_execute(Relation reln, Buffer buffer,
                          OffsetNumber *redirected, int nredirected,
                          OffsetNumber *nowdead, int ndead,
                          OffsetNumber *nowunused, int nunused,
--- 124,130 ----
  extern int heap_page_prune(Relation relation, Buffer buffer,
                  TransactionId OldestXmin,
                  bool redirect_move, bool report_stats);
! extern void heap_page_prune_execute(Buffer buffer,
                          OffsetNumber *redirected, int nredirected,
                          OffsetNumber *nowdead, int ndead,
                          OffsetNumber *nowunused, int nunused,
*** a/src/include/access/xlogutils.h
--- b/src/include/access/xlogutils.h
***************
*** 12,29 ****
  #define XLOG_UTILS_H

  #include "storage/buf.h"
  #include "utils/rel.h"


- extern void XLogInitRelationCache(void);
  extern void XLogCheckInvalidPages(void);
- extern void XLogCloseRelationCache(void);

! extern Relation XLogOpenRelation(RelFileNode rnode);
  extern void XLogDropRelation(RelFileNode rnode);
  extern void XLogDropDatabase(Oid dbid);
  extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);

! extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);

  #endif
--- 12,29 ----
  #define XLOG_UTILS_H

  #include "storage/buf.h"
+ #include "storage/smgr.h"
  #include "utils/rel.h"


  extern void XLogCheckInvalidPages(void);

! extern Relation XLogFakeRelcacheEntry(RelFileNode rnode);
!
  extern void XLogDropRelation(RelFileNode rnode);
  extern void XLogDropDatabase(Oid dbid);
  extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);

! extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);

  #endif
*** a/src/include/storage/buf_internals.h
--- b/src/include/storage/buf_internals.h
***************
*** 18,23 ****
--- 18,24 ----
  #include "storage/buf.h"
  #include "storage/lwlock.h"
  #include "storage/shmem.h"
+ #include "storage/smgr.h"
  #include "storage/spin.h"
  #include "utils/rel.h"

***************
*** 75,83 **** typedef struct buftag
      (a).blockNum = InvalidBlockNumber \
  )

! #define INIT_BUFFERTAG(a,xx_reln,xx_blockNum) \
  ( \
!     (a).rnode = (xx_reln)->rd_node, \
      (a).blockNum = (xx_blockNum) \
  )

--- 76,84 ----
      (a).blockNum = InvalidBlockNumber \
  )

! #define INIT_BUFFERTAG(a,xx_rnode,xx_blockNum) \
  ( \
!     (a).rnode = (xx_rnode), \
      (a).blockNum = (xx_blockNum) \
  )

***************
*** 201,207 **** extern int    BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id);
  extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);

  /* localbuf.c */
! extern BufferDesc *LocalBufferAlloc(Relation reln, BlockNumber blockNum,
                   bool *foundPtr);
  extern void MarkLocalBufferDirty(Buffer buffer);
  extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
--- 202,208 ----
  extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);

  /* localbuf.c */
! extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, BlockNumber blockNum,
                   bool *foundPtr);
  extern void MarkLocalBufferDirty(Buffer buffer);
  extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
*** a/src/include/storage/bufmgr.h
--- b/src/include/storage/bufmgr.h
***************
*** 145,150 **** extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
--- 145,152 ----
  extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                         BufferAccessStrategy strategy);
  extern Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum);
+ extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
+                              BlockNumber blockNum, bool zeroPage);
  extern void ReleaseBuffer(Buffer buffer);
  extern void UnlockReleaseBuffer(Buffer buffer);
  extern void MarkBufferDirty(Buffer buffer);

Re: Refactoring xlogutils.c

От
Tom Lane
Дата:
"Heikki Linnakangas" <heikki@enterprisedb.com> writes:
> Attached is an updated version of my patch to refactor the
> XLogOpenRelation/XLogReadBuffer interface, in preparation for the
> relation forks patch, and subsequently the FSM rewrite patch.

The code motion in md.c looks fairly bogus; was that a copy-and-paste
error?

Otherwise it looks pretty sane, but I have one stylistic gripe:
I'm dubious about your willingness to assume that pfree() is enough for
getting rid of a fake relcache entry.  Relcache entries are complex
beasts and it may not continue to work to do that.  It's also possible
that we'd have additional resources attached to them someday.  So
I think it'd be worth having a FreeFakeRelcacheEntry() routine to
localize the knowledge of how to get rid of them.

            regards, tom lane

Re: Refactoring xlogutils.c

От
Teodor Sigaev
Дата:
> - For the routines that need a fake, or lightweight, Relation struct
> anyway, there's a new function called
> XLogFakeRelcacheEntry(RelFileNode), that returns a palloc'd Relation
> struct.

Is that fake structure applicable for ReadBuffer()?
ginContinueSplit() calls findParents() with a fake Relation struct, and
findParents will call ReadBuffer()...
GiST's function gistContinueInsert() is working by similar way.
--
Teodor Sigaev                                   E-mail: teodor@sigaev.ru
                                                    WWW: http://www.sigaev.ru/

Re: Refactoring xlogutils.c

От
"Heikki Linnakangas"
Дата:
Teodor Sigaev wrote:
>> - For the routines that need a fake, or lightweight, Relation struct
>> anyway, there's a new function called
>> XLogFakeRelcacheEntry(RelFileNode), that returns a palloc'd Relation
>> struct.
>
> Is that fake structure applicable for ReadBuffer()?

Yes.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com

Re: Refactoring xlogutils.c

От
"Heikki Linnakangas"
Дата:
Tom Lane wrote:
> "Heikki Linnakangas" <heikki@enterprisedb.com> writes:
>> Attached is an updated version of my patch to refactor the
>> XLogOpenRelation/XLogReadBuffer interface, in preparation for the
>> relation forks patch, and subsequently the FSM rewrite patch.
>
> The code motion in md.c looks fairly bogus; was that a copy-and-paste
> error?

This one?

> *** a/src/backend/storage/smgr/md.c
> --- b/src/backend/storage/smgr/md.c
> ***************
> *** 208,216 **** mdcreate(SMgrRelation reln, bool isRedo)
>         char       *path;
>         File            fd;
>
> -       if (isRedo && reln->md_fd != NULL)
> -               return;                                 /* created and opened already... */
> -
>         Assert(reln->md_fd == NULL);
>
>         path = relpath(reln->smgr_rnode);
> --- 208,213 ----

That's intentional. That check has been moved to smgrcreate(), so that
it's done before calling TablespaceCreateDbSpace(). The reason for that
is that smgrcreate() is now called for every XLogReadBuffer()
invocation, so we want to make it exit quickly if there's nothing to do.

On second look though, I think we should actually leave that check in
mdcreate(). Even though it'd be dead code since we do the same check in
smgrcreate already, removing it changes the semantics of mdcreate():
it'd no longer be acceptable to call mdcreate() if the file is open already.

> Otherwise it looks pretty sane, but I have one stylistic gripe:
> I'm dubious about your willingness to assume that pfree() is enough for
> getting rid of a fake relcache entry.  Relcache entries are complex
> beasts and it may not continue to work to do that.  It's also possible
> that we'd have additional resources attached to them someday.  So
> I think it'd be worth having a FreeFakeRelcacheEntry() routine to
> localize the knowledge of how to get rid of them.

Yeah, I guess you're right.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com

Re: Refactoring xlogutils.c

От
Tom Lane
Дата:
"Heikki Linnakangas" <heikki@enterprisedb.com> writes:
> Tom Lane wrote:
>> The code motion in md.c looks fairly bogus; was that a copy-and-paste
>> error?

> That's intentional. That check has been moved to smgrcreate(), so that
> it's done before calling TablespaceCreateDbSpace(). The reason for that
> is that smgrcreate() is now called for every XLogReadBuffer()
> invocation, so we want to make it exit quickly if there's nothing to do.

Oh, I see --- I misread the patch the first time. That seems like a
reasonable change.

> On second look though, I think we should actually leave that check in
> mdcreate(). Even though it'd be dead code since we do the same check in
> smgrcreate already, removing it changes the semantics of mdcreate():
> it'd no longer be acceptable to call mdcreate() if the file is open already.

I don't believe there is any way to call mdcreate except through
smgrcreate, so this is probably not a problem.

            regards, tom lane