Обсуждение: Updating FSM on recovery

Поиск
Список
Период
Сортировка

Updating FSM on recovery

От
Heikki Linnakangas
Дата:
The one remaining issue I'd like to address in the new FSM
implementation is the fact that the FSM is currently not updated at all
in WAL recovery. The old FSM wasn't updated on WAL recovery either, and
was in fact completely thrown away if the system wasn't shut down
cleanly. The difference is that after recovery, we used to start with no
FSM information at all, and all inserts would have to extend the
relations until the next vacuum, while now the inserts use the old data
in the FSM. In case of a PITR recovery or warm stand-by, the FSM would
information would come from the last base backup, which could be *very* old.

The first inserter after the recovery might have to visit a lot of pages
that the FSM claimed had free space, but didn't in reality, before
finding a suitable target. In the absolutely worst case, where the table
was almost empty when the base backup was taken, but is now full, it
might have to visit every single heap page. That's not good.

So we should try to update the FSM during recovery as well. It doesn't
need to be very accurate, as the FSM information isn't accurate anyway,
but we should try to avoid the worst case scenarios.

The attached patch is my first attempt at that. Arbitrarily, if after a
heap insert/update there's less than 20% of free space on the page, the
FSM is updated. Compared to updating it every time, that saves a lot of
overhead, while doing a pretty good job at marking full pages as full in
the FSM. My first thought was to update the FSM if there isn't enough
room on the page for a new tuple of the same size as the one just
inserted; that would be pretty close to the logic we have during normal
operation, where the FSM is updated when the tuple that we're about to
insert doesn't fit on the page. But because we don't know the fillfactor
during recovery, I don't think we can do reliably.

One issue with this patch is that it doesn't update the FSM at all when
pages are restored from full page images. It would require fetching the
page and checking the free space on it, or peeking into the size of the
backup block data, and I'm not sure if it's worth the extra code to do that.

Thoughts?

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1d43b0b..a9bc17a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -54,6 +54,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
+#include "storage/freespace.h"
 #include "storage/lmgr.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
@@ -4029,6 +4030,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
     int            nredirected;
     int            ndead;
     int            nunused;
+    Size        freespace;

     if (record->xl_info & XLR_BKP_BLOCK_1)
         return;
@@ -4068,6 +4070,15 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
     PageSetLSN(page, lsn);
     PageSetTLI(page, ThisTimeLineID);
     MarkBufferDirty(buffer);
+
+    /*
+     * update the FSM as well
+     *
+     * XXX: We don't get here if the page was restored from full page image
+     */
+    freespace = PageGetHeapFreeSpace(page);
+    XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
+
     UnlockReleaseBuffer(buffer);
 }

@@ -4212,6 +4223,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
     HeapTupleHeader htup;
     xl_heap_header xlhdr;
     uint32        newlen;
+    Size        freespace;

     if (record->xl_info & XLR_BKP_BLOCK_1)
         return;
@@ -4271,6 +4283,19 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
     PageSetLSN(page, lsn);
     PageSetTLI(page, ThisTimeLineID);
     MarkBufferDirty(buffer);
+
+    /*
+     * If the page is running low on free space, update the FSM as well.
+     * Pretty arbitrarily, our definition of low is less than 20%. We can't
+     * do much better than that without knowing the fill-factor for the table.
+     *
+     * XXX: We don't get here if the page was restored from full page image
+     */
+    freespace = PageGetHeapFreeSpace(page);
+    if (freespace < BLCKSZ / 5)
+        XLogRecordPageWithFreeSpace(xlrec->target.node,
+                                    BufferGetBlockNumber(buffer), freespace);
+
     UnlockReleaseBuffer(buffer);
 }

@@ -4296,6 +4321,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
     xl_heap_header xlhdr;
     int            hsize;
     uint32        newlen;
+    Size        freespace;

     if (record->xl_info & XLR_BKP_BLOCK_1)
     {
@@ -4456,6 +4482,16 @@ newsame:;
     PageSetLSN(page, lsn);
     PageSetTLI(page, ThisTimeLineID);
     MarkBufferDirty(buffer);
+
+    /*
+     * If the page is running low on free space, update the FSM as well.
+     * XXX: We don't get here if the page was restored from full page image
+     */
+    freespace = PageGetHeapFreeSpace(page);
+    if (freespace < BLCKSZ / 5)
+        XLogRecordPageWithFreeSpace(xlrec->target.node,
+                                    BufferGetBlockNumber(buffer), freespace);
+
     UnlockReleaseBuffer(buffer);
 }

diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 17f733f..7aa72c9 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -203,6 +203,51 @@ RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
 }

 /*
+ * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
+ *        WAL replay
+ */
+void
+XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
+                            Size spaceAvail)
+{
+    int            new_cat = fsm_space_avail_to_cat(spaceAvail);
+    FSMAddress    addr;
+    uint16        slot;
+    BlockNumber blkno;
+    Buffer        buf;
+
+    /* Get the location of the FSM byte representing the heap block */
+    addr = fsm_get_location(heapBlk, &slot);
+    blkno = fsm_logical_to_physical(addr);
+
+    /* If the page doesn't exist already, extend */
+    buf = XLogReadBufferWithFork(rnode, FSM_FORKNUM, blkno, false);
+    if (!BufferIsValid(buf))
+    {
+        /*
+         * There's no direct way to tell XLogReadBuffer() that it's OK
+         * if the page doesn't exist. It will log it as an invalid page,
+         * and error at the end of WAL replay. To avoid that, lie to
+         * xlogutils.c that the file was in fact truncated, and initialize
+         * the page.
+         *
+         * XXX: Perhaps we should change XLogReadBufferWithFork() so that
+         * instead of the 'init' boolean argument, make it an an enum so
+         * that the third state means "silently extend the relation if the
+         * page doesn't exist".
+         */
+        XLogTruncateRelation(rnode, FSM_FORKNUM, blkno);
+        buf = XLogReadBufferWithFork(rnode, FSM_FORKNUM, blkno, true);
+        PageInit(BufferGetPage(buf), BLCKSZ, 0);
+    }
+    Assert(BufferIsValid(buf));
+
+    if (fsm_set_avail(BufferGetPage(buf), slot, new_cat))
+        MarkBufferDirty(buf);
+    UnlockReleaseBuffer(buf);
+}
+
+/*
  * GetRecordedFreePage - return the amount of free space on a particular page,
  *        according to the FSM.
  */
diff --git a/src/include/storage/freespace.h b/src/include/storage/freespace.h
index 7a1664f..e17a8d5 100644
--- a/src/include/storage/freespace.h
+++ b/src/include/storage/freespace.h
@@ -27,6 +27,8 @@ extern BlockNumber RecordAndGetPageWithFreeSpace(Relation rel,
                               Size spaceNeeded);
 extern void RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
                                     Size spaceAvail);
+extern void XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
+                                        Size spaceAvail);

 extern void FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks);
 extern void FreeSpaceMapVacuum(Relation rel);

Re: Updating FSM on recovery

От
Tom Lane
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> So we should try to update the FSM during recovery as well. It doesn't 
> need to be very accurate, as the FSM information isn't accurate anyway, 
> but we should try to avoid the worst case scenarios.

Agreed.

> One issue with this patch is that it doesn't update the FSM at all when 
> pages are restored from full page images. It would require fetching the 
> page and checking the free space on it, or peeking into the size of the 
> backup block data, and I'm not sure if it's worth the extra code to do that.

I'd vote not to bother, at least not in the first cut.  As you say, 100%
accuracy isn't required, and I think that in typical scenarios an
insert/update that causes a page to become full would be relatively less
likely to have a full-page image.

As far as the ugliness in XLogRecordPageWithFreeSpace goes: couldn't you
just call XLogReadBufferWithFork with init = true, and then initialize
the page if PageIsNew?
        regards, tom lane


Re: Updating FSM on recovery

От
Simon Riggs
Дата:
On Tue, 2008-10-28 at 16:22 +0200, Heikki Linnakangas wrote:

> Arbitrarily, if after a 
> heap insert/update there's less than 20% of free space on the page,
> the FSM is updated. Compared to updating it every time, that saves a
> lot of overhead, while doing a pretty good job at marking full pages
> as full in  the FSM. My first thought was to update the FSM if there
> isn't enough room on the page for a new tuple of the same size as the
> one just 
> inserted; that would be pretty close to the logic we have during
> normal 
> operation, where the FSM is updated when the tuple that we're about
> to 
> insert doesn't fit on the page. But because we don't know the
> fillfactor 
> during recovery, I don't think we can do reliably.

With HOT, we tend to hover around the nearly-full state, so this seems
like it will trigger repeatedly.

Is it possible that we could put an extra field onto a heap_clean record
to show remaining space. We would use it only for VACUUMs, not HOT, just
as we do now.

Probably good idea to make a list of user cases and say what we do in
eahc case. e.g. COPY, other bulk ops, HOT etc..

I wonder if there is merit in having an XLogInsertMulti() which inserts
multiple records in a batch as a way of reducing WALInsertLock traffic.
It might be possible to piggyback FSM records onto the main heap
changes.

-- Simon Riggs           www.2ndQuadrant.comPostgreSQL Training, Services and Support



Re: Updating FSM on recovery

От
Simon Riggs
Дата:
On Tue, 2008-10-28 at 15:35 +0000, Simon Riggs wrote:

> I wonder if there is merit in having an XLogInsertMulti() which inserts
> multiple records in a batch as a way of reducing WALInsertLock traffic.
> It might be possible to piggyback FSM records onto the main heap
> changes.

Or possibly an XLogInsertDeferred() which just queues up some work so
the next time we call XLogInsert() it will insert the deferred work as
well as the main work all in one lock cycle. It would only be usable for
low priority info like FSM stuff that isn't needed for recovery. Maybe
we could do that with hints also.

-- Simon Riggs           www.2ndQuadrant.comPostgreSQL Training, Services and Support



Re: Updating FSM on recovery

От
Heikki Linnakangas
Дата:
Simon Riggs wrote:
> On Tue, 2008-10-28 at 16:22 +0200, Heikki Linnakangas wrote:
> 
>> Arbitrarily, if after a 
>> heap insert/update there's less than 20% of free space on the page,
>> the FSM is updated. Compared to updating it every time, that saves a
>> lot of overhead, while doing a pretty good job at marking full pages
>> as full in  the FSM. My first thought was to update the FSM if there
>> isn't enough room on the page for a new tuple of the same size as the
>> one just 
>> inserted; that would be pretty close to the logic we have during
>> normal 
>> operation, where the FSM is updated when the tuple that we're about
>> to 
>> insert doesn't fit on the page. But because we don't know the
>> fillfactor 
>> during recovery, I don't think we can do reliably.
> 
> With HOT, we tend to hover around the nearly-full state, so this seems
> like it will trigger repeatedly.

Hmm, true. Perhaps we should skip updating the FSM on HOT updates. After 
recovery, the new HOT-updated tuples are prunable anyway, so for 
inserting a new tuple, the page is almost as good as it was before the 
HOT update.

> Is it possible that we could put an extra field onto a heap_clean record
> to show remaining space. We would use it only for VACUUMs, not HOT, just
> as we do now.

Sure, we could do that. I'm more worried about "killing" the pages from 
the FSM that are full, though, than keeping track of pages with plenty 
of free space accurately.

> I wonder if there is merit in having an XLogInsertMulti() which inserts
> multiple records in a batch as a way of reducing WALInsertLock traffic.
> It might be possible to piggyback FSM records onto the main heap
> changes.

Umm, in the version that was finally committed, FSM doesn't generate any 
extra WAL records (except for truncation).

--   Heikki Linnakangas  EnterpriseDB   http://www.enterprisedb.com


Re: Updating FSM on recovery

От
Tom Lane
Дата:
Simon Riggs <simon@2ndQuadrant.com> writes:
> Or possibly an XLogInsertDeferred() which just queues up some work so
> the next time we call XLogInsert() it will insert the deferred work as
> well as the main work all in one lock cycle. It would only be usable for
> low priority info like FSM stuff that isn't needed for recovery. Maybe
> we could do that with hints also.

If it isn't needed for recovery, why would we be logging it at all?
        regards, tom lane


Re: Updating FSM on recovery

От
Simon Riggs
Дата:
On Tue, 2008-10-28 at 12:16 -0400, Tom Lane wrote:
> Simon Riggs <simon@2ndQuadrant.com> writes:
> > Or possibly an XLogInsertDeferred() which just queues up some work so
> > the next time we call XLogInsert() it will insert the deferred work as
> > well as the main work all in one lock cycle. It would only be usable for
> > low priority info like FSM stuff that isn't needed for recovery. Maybe
> > we could do that with hints also.
> 
> If it isn't needed for recovery, why would we be logging it at all?

You just agreed that the info didn't need to be very accurate. There's a
few things on the server that aren't needed for recovery, but it might
be useful if they were logged occasionally to give roughly correct
values.

Contention on WALInsertLock seems to be a problem, yet writing WAL to
disk is not a bottleneck. Deferring writing it slightly to allow things
to be batched might be one way of smoothing the impact of that type of
operation. That might be better than a heuristic method of reducing the
number of inserts.

-- Simon Riggs           www.2ndQuadrant.comPostgreSQL Training, Services and Support



Re: Updating FSM on recovery

От
Heikki Linnakangas
Дата:
Tom Lane wrote:
> As far as the ugliness in XLogRecordPageWithFreeSpace goes: couldn't you
> just call XLogReadBufferWithFork with init = true, and then initialize
> the page if PageIsNew?

With init=true, we don't even try to read the page from the disk (since 
8.3), so all FSM pages accessed during recovery would be zeroed out. I 
don't think that's what you intended.

--   Heikki Linnakangas  EnterpriseDB   http://www.enterprisedb.com


Re: Updating FSM on recovery

От
Tom Lane
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> Tom Lane wrote:
>> As far as the ugliness in XLogRecordPageWithFreeSpace goes: couldn't you
>> just call XLogReadBufferWithFork with init = true, and then initialize
>> the page if PageIsNew?

> With init=true, we don't even try to read the page from the disk (since 
> 8.3), so all FSM pages accessed during recovery would be zeroed out. I 
> don't think that's what you intended.

Ah, right.  Maybe the API change you suggested in the comment is the
way to go.
        regards, tom lane


Re: Updating FSM on recovery

От
Heikki Linnakangas
Дата:
Tom Lane wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
>> Tom Lane wrote:
>>> As far as the ugliness in XLogRecordPageWithFreeSpace goes: couldn't you
>>> just call XLogReadBufferWithFork with init = true, and then initialize
>>> the page if PageIsNew?
>
>> With init=true, we don't even try to read the page from the disk (since
>> 8.3), so all FSM pages accessed during recovery would be zeroed out. I
>> don't think that's what you intended.
>
> Ah, right.  Maybe the API change you suggested in the comment is the
> way to go.

Done, patch attached. But while I was hacking that, I realized another
problem:

Because changes to FSM pages are not WAL-logged, they can be "torn" if
at crash, one part of the page is flushed to disk, but another is not.
The FSM code will recover from internally inconsistent pages, caused by
torn pages or other errors, but we still have a problem if the FSM file
is extended, and the new page is torn. It can happen that the first part
of the page, containing the page header, doesn't make it to disk, but
other parts of the page do. ReadBuffer() checks that the page header is
valid, so it will throw an error on a torn page like that. ReadBuffer()
doesn't complain about a page that is all-zeros, but it's not in this
scenario.

The FSM would be perfectly happy to just initialize torn or otherwise
damaged pages, so I think we should add yet another mode to ReadBuffer()
to allow that. We could also treat read() errors as merely warnings in
that mode, effectively the same as with zero_damaged_pages=on.

The ReadBuffer() interface is already pretty complex, with all the
different variants. We should probably keep the good old ReadBuffer()
the same, for the sake of simplicity in the callers, but try to reduce
the number of other variatns.

The current API is this:

Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
Buffer ReadBufferWithFork(Relation reln, ForkNumber forkNum, BlockNumber
blockNum);
Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
BufferAccessStrategy strategy);
Buffer ReadOrZeroBuffer(Relation reln, ForkNumber forkNum, BlockNumber
blockNum);
Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
ForkNumber forkNum, BlockNumber blockNum, bool zeroPage);

Here's my proposal for new API:

typedef enum
{
   RBM_NORMAL,        /* checks header, ereport(ERROR) on errors */
   RBM_INIT,        /* just allocate a buffer, don't read from disk. Caller
must initialize the page */
   RBM_INIT_ON_ERROR    /* read, but instead of ERRORing, return an
all-zeros page */
} ReadBufferMode;

Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
Buffer ReadBufferExt(Relation reln, ForkNumber forkNum, BlockNumber
blockNum, BufferAccessStrategy strategy, ReadBufferMode mode);
Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode);

Thoughts?

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "miscadmin.h"
  #include "pgstat.h"
  #include "storage/bufmgr.h"
+ #include "storage/freespace.h"
  #include "storage/lmgr.h"
  #include "storage/procarray.h"
  #include "storage/smgr.h"
***************
*** 4029,4034 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4030,4036 ----
      int            nredirected;
      int            ndead;
      int            nunused;
+     Size        freespace;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;
***************
*** 4060,4065 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4062,4069 ----
                              nowunused, nunused,
                              clean_move);

+     freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
      /*
       * Note: we don't worry about updating the page's prunability hints.
       * At worst this will cause an extra prune cycle to occur soon.
***************
*** 4069,4074 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4073,4087 ----
      PageSetTLI(page, ThisTimeLineID);
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);
+
+     /*
+      * Update the FSM as well.
+      *
+      * XXX: We don't get here if the page was restored from full page image.
+      * We don't bother to update the FSM in that case, it doesn't need to be
+      * totally accurate anyway.
+      */
+     XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
  }

  static void
***************
*** 4212,4226 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      HeapTupleHeader htup;
      xl_heap_header xlhdr;
      uint32        newlen;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
!                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
!                                 true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 4225,4241 ----
      HeapTupleHeader htup;
      xl_heap_header xlhdr;
      uint32        newlen;
+     Size        freespace;
+     BlockNumber    blkno;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

+     blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
!         buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

***************
*** 4228,4236 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      }
      else
      {
!         buffer = XLogReadBuffer(xlrec->target.node,
!                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
!                                 false);
          if (!BufferIsValid(buffer))
              return;
          page = (Page) BufferGetPage(buffer);
--- 4243,4249 ----
      }
      else
      {
!         buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
          if (!BufferIsValid(buffer))
              return;
          page = (Page) BufferGetPage(buffer);
***************
*** 4268,4277 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4281,4305 ----
      offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
      if (offnum == InvalidOffsetNumber)
          elog(PANIC, "heap_insert_redo: failed to add tuple");
+
+     freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
      PageSetLSN(page, lsn);
      PageSetTLI(page, ThisTimeLineID);
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);
+
+     /*
+      * If the page is running low on free space, update the FSM as well.
+      * Arbitrarily, our definition of "low" is less than 20%. We can't do
+      * much better than that without knowing the fill-factor for the table.
+      *
+      * XXX: We don't get here if the page was restored from full page image.
+      * We don't bother to update the FSM in that case, it doesn't need to be
+      * totally accurate anyway.
+      */
+     if (freespace < BLCKSZ / 5)
+         XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
  }

  /*
***************
*** 4296,4301 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4324,4330 ----
      xl_heap_header xlhdr;
      int            hsize;
      uint32        newlen;
+     Size        freespace;

      if (record->xl_info & XLR_BKP_BLOCK_1)
      {
***************
*** 4453,4462 **** newsame:;
--- 4482,4513 ----
      offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
      if (offnum == InvalidOffsetNumber)
          elog(PANIC, "heap_update_redo: failed to add tuple");
+
+     freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
      PageSetLSN(page, lsn);
      PageSetTLI(page, ThisTimeLineID);
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);
+
+     /*
+      * If the page is running low on free space, update the FSM as well.
+      * Arbitrarily, our definition of "low" is less than 20%. We can't do
+      * much better than that without knowing the fill-factor for the table.
+      *
+      * However, don't update the FSM on HOT updates, because after crash
+      * recovery, either the old or the new tuple will certainly be dead and
+      * prunable. After pruning, the page will have roughly as much free space
+      * as it did before the update, assuming the new tuple is about the same
+      * size as the old one.
+      *
+      * XXX: We don't get here if the page was restored from full page image.
+      * We don't bother to update the FSM in that case, it doesn't need to be
+      * totally accurate anyway.
+      */
+     if (!hot_update && freespace < BLCKSZ / 5)
+         XLogRecordPageWithFreeSpace(xlrec->target.node,
+                     ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace);
  }

  static void
*** src/backend/access/transam/xlog.c
--- src/backend/access/transam/xlog.c
***************
*** 2898,2904 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
          blk += sizeof(BkpBlock);

          buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
!                                         true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 2898,2904 ----
          blk += sizeof(BkpBlock);

          buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
!                                         XLRBB_ZERO);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 228,234 **** XLogCheckInvalidPages(void)
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
!     return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init);
  }

  /*
--- 228,235 ----
  Buffer
  XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
  {
!     return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno,
!                                   init ? XLRBB_ZERO : XLRBB_EXISTS);
  }

  /*
***************
*** 238,244 **** XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
   */
  Buffer
  XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
!                        BlockNumber blkno, bool init)
  {
      BlockNumber lastblock;
      Buffer        buffer;
--- 239,245 ----
   */
  Buffer
  XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
!                        BlockNumber blkno, XLogReadBufferBehavior behavior)
  {
      BlockNumber lastblock;
      Buffer        buffer;
***************
*** 264,275 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
      if (blkno < lastblock)
      {
          /* page exists in file */
!         buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno, init);
      }
      else
      {
          /* hm, page doesn't exist in file */
!         if (!init)
          {
              log_invalid_page(rnode, forknum, blkno, false);
              return InvalidBuffer;
--- 265,277 ----
      if (blkno < lastblock)
      {
          /* page exists in file */
!         buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno,
!                                            behavior == XLRBB_ZERO);
      }
      else
      {
          /* hm, page doesn't exist in file */
!         if (behavior == XLRBB_EXISTS)
          {
              log_invalid_page(rnode, forknum, blkno, false);
              return InvalidBuffer;
***************
*** 291,297 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,

      LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

!     if (!init)
      {
          /* check that page has been initialized */
          Page        page = (Page) BufferGetPage(buffer);
--- 293,299 ----

      LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

!     if (behavior == XLRBB_EXISTS)
      {
          /* check that page has been initialized */
          Page        page = (Page) BufferGetPage(buffer);
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 203,208 **** RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
--- 203,238 ----
  }

  /*
+  * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
+  *        WAL replay
+  */
+ void
+ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
+                             Size spaceAvail)
+ {
+     int            new_cat = fsm_space_avail_to_cat(spaceAvail);
+     FSMAddress    addr;
+     uint16        slot;
+     BlockNumber blkno;
+     Buffer        buf;
+     Page        page;
+
+     /* Get the location of the FSM byte representing the heap block */
+     addr = fsm_get_location(heapBlk, &slot);
+     blkno = fsm_logical_to_physical(addr);
+
+     /* If the page doesn't exist already, extend */
+     buf = XLogReadBufferWithFork(rnode, FSM_FORKNUM, blkno, XLRBB_CAN_EXTEND);
+     page = BufferGetPage(buf);
+     if (PageIsNew(page))
+         PageInit(page, BLCKSZ, 0);
+
+     if (fsm_set_avail(page, slot, new_cat))
+         MarkBufferDirty(buf);
+     UnlockReleaseBuffer(buf);
+ }
+
+ /*
   * GetRecordedFreePage - return the amount of free space on a particular page,
   *        according to the FSM.
   */
***************
*** 504,509 **** static Buffer
--- 534,540 ----
  fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
  {
      BlockNumber blkno = fsm_logical_to_physical(addr);
+     Buffer buf;

      RelationOpenSmgr(rel);

***************
*** 518,524 **** fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
          else
              return InvalidBuffer;
      }
!     return ReadBufferWithFork(rel, FSM_FORKNUM, blkno);
  }

  /*
--- 549,564 ----
          else
              return InvalidBuffer;
      }
!     buf = ReadBufferWithFork(rel, FSM_FORKNUM, blkno);
!
!     /*
!      * An all-zeroes page could be left over if a backend extends the
!      * relation but crashes before initializing the page.
!      */
!     if (PageIsNew(BufferGetPage(buf)))
!         PageInit(BufferGetPage(buf), BLCKSZ, 0);
!
!     return buf;
  }

  /*
***************
*** 768,773 **** fsm_redo_truncate(xl_fsm_truncate *xlrec)
--- 808,814 ----
      uint16        first_removed_slot;
      BlockNumber fsmblk;
      Buffer        buf;
+     Page        page;

      /* Get the location in the FSM of the first removed heap block */
      first_removed_address = fsm_get_location(xlrec->nheapblocks,
***************
*** 779,801 **** fsm_redo_truncate(xl_fsm_truncate *xlrec)
       * replay of the smgr truncation record to remove completely unused
       * pages.
       */
!     buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk, false);
!     if (BufferIsValid(buf))
!     {
!         fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
!         MarkBufferDirty(buf);
!         UnlockReleaseBuffer(buf);
!     }
!     else
!     {
!         /*
!          * The page doesn't exist. Because FSM extensions are not WAL-logged,
!          * it's normal to have a truncation record for a page that doesn't
!          * exist. Tell xlogutils.c not to PANIC at the end of recovery
!          * because of the missing page
!          */
!         XLogTruncateRelation(xlrec->node, FSM_FORKNUM, fsmblk);
!     }
  }

  void
--- 820,834 ----
       * replay of the smgr truncation record to remove completely unused
       * pages.
       */
!     buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk,
!                                  XLRBB_CAN_EXTEND);
!     page = BufferGetPage(buf);
!     if (PageIsNew(page))
!         PageInit(page, BLCKSZ, 0);
!
!     fsm_truncate_avail(page, first_removed_slot);
!     MarkBufferDirty(buf);
!     UnlockReleaseBuffer(buf);
  }

  void
*** src/include/access/xlogutils.h
--- src/include/access/xlogutils.h
***************
*** 24,32 **** extern void XLogDropDatabase(Oid dbid);
  extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
                                   BlockNumber nblocks);

  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
  extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
!                                      BlockNumber blkno, bool init);

  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
--- 24,44 ----
  extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
                                   BlockNumber nblocks);

+ /*
+  * XLogReadBuffer() behavior in case the requested page doesn't exist.
+  */
+ typedef enum XLogReadBufferBehavior
+ {
+     XLRBB_EXISTS,        /* complain if page doesn't exist */
+     XLRBB_ZERO,            /* never read, just allocate a buffer, the caller
+                          * will initialize the page */
+     XLRBB_CAN_EXTEND    /* extend relation if page doesn't exist */
+ } XLogReadBufferBehavior;
+
  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
  extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
!                                      BlockNumber blkno,
!                                      XLogReadBufferBehavior behavior);

  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
*** src/include/storage/freespace.h
--- src/include/storage/freespace.h
***************
*** 27,32 **** extern BlockNumber RecordAndGetPageWithFreeSpace(Relation rel,
--- 27,34 ----
                                Size spaceNeeded);
  extern void RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
                                      Size spaceAvail);
+ extern void XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
+                                         Size spaceAvail);

  extern void FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks);
  extern void FreeSpaceMapVacuum(Relation rel);

Re: Updating FSM on recovery

От
Tom Lane
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
> The FSM would be perfectly happy to just initialize torn or otherwise 
> damaged pages, so I think we should add yet another mode to ReadBuffer() 
> to allow that. We could also treat read() errors as merely warnings in 
> that mode, effectively the same as with zero_damaged_pages=on.

> The ReadBuffer() interface is already pretty complex, with all the 
> different variants. We should probably keep the good old ReadBuffer() 
> the same, for the sake of simplicity in the callers, but try to reduce 
> the number of other variatns.

Indeed.  Did you see the discussion about the similarly-too-complex
heap_insert API a couple days ago in connection with bulk-write
scenarios?  The conclusion there was to try to shift stuff into a
bitmask options argument, in hopes that future additions might not
require touching every caller.  Can we do it similarly here?
        regards, tom lane


Re: Updating FSM on recovery

От
"Robert Haas"
Дата:
> Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
> Buffer ReadBufferExt(Relation reln, ForkNumber forkNum, BlockNumber
> blockNum, BufferAccessStrategy strategy, ReadBufferMode mode);
> Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp, ForkNumber
> forkNum, BlockNumber blockNum, ReadBufferMode mode);
>
> Thoughts?

I'm not sure why we would abbreviate Extended to Ext when nothing else
in here is abbreviated.  Seems needlessly inconsistent.

We may also want to rethink our approach to BufferAccessStrategy a
bit.  Right now, we don't admit that
GetBufferAccessStrategy(BAS_NORMAL) just returns a NULL pointer - we
expect the caller to get that strategy and later call
FreeBufferAccessStrategy it just as if it were a real object.
Particularly in light of this API change, I think we should just give
up on that.  Otherwise, a caller that wants to specify a fork number
or ReadBufferMode has to get and free an access strategy that doesn't
amount to anything.  Perhaps it would be sufficient to do this:

#define NormalBufferAccessStrategy NULL

That way, it would be easy to grep for any place where we used this to
get around a useless pair of get/free calls if we ever need to go back
and make a normal buffer access strategy into a real object.

...Robert


Re: Updating FSM on recovery

От
Heikki Linnakangas
Дата:
Tom Lane wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
>> The ReadBuffer() interface is already pretty complex, with all the
>> different variants. We should probably keep the good old ReadBuffer()
>> the same, for the sake of simplicity in the callers, but try to reduce
>> the number of other variatns.
>
> Indeed.  Did you see the discussion about the similarly-too-complex
> heap_insert API a couple days ago in connection with bulk-write
> scenarios?  The conclusion there was to try to shift stuff into a
> bitmask options argument, in hopes that future additions might not
> require touching every caller.  Can we do it similarly here?

Hmm. I think an enum is better than a bitmask here. At the moment, we
need three different modes of operation:
1. Read the page as usual, throw an error on corrupted page (ReadBuffer())
2. Read the page, zero page on corruption (this is new)
3. Don't read the page from disk, just allocate a buffer.
(ReadOrZeroBuffer())

If we turned this into a bitmask, what would the bits be? Perhaps:

DONT_READ /* don't read the page from disk, just allocate buffer */
NO_ERROR_ON_CORRUPTION /* don't throw an error if page is corrupt */

With two bits, there's four different combinations. I don't think the
DONT_READ | NO_ERROR_ON_CORRUPTION combination makes much sense. Also,
negative arguments like that can be confusing, but if we inverted the
meanings, most callers would have to pass both flags to get the normal
behavior.

Looking into the crystal ball, there's two forthcoming features to the
interface that I can see:
1. Pin the buffer if the page is in buffer cache. If it's not, do
nothing. This is what Simon proposed for the B-tree vacuum interlocking,
and I can see that it might be useful elsewhere as well.
2. The posix_fadvise() thing. Or async I/O. It looks like it's going to
be a separate function you call before ReadBuffer(), but it could also
be implemented as a new mode to ReadBuffer() that just allocates a
buffer, issues a posix_fadvise(), and returns. You would then pass the
Buffer to another function to finish the read and make the contents of
the buffer valid.

Neither of these fits too well with the bitmask. Neither would make
sense with DONT_READ or NO_ERROR_ON_CORRUPTION.

So, attached is a patch using an enum. Barring objections, I'll commit this.

There is a conflict with Simon's hot standby patch, though. Simon's
patch adds yet another argument to XLogReadBufferWithFork(), to indicate
whether a normal exclusive lock or a cleanup lock is taken on the
buffer. I'm inclined to change the interface of XLogReadBufferExtended
(as it's now called, after this patch) so that it only pins the page,
and leave the locking to the caller.

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/gin/ginvacuum.c
--- src/backend/access/gin/ginvacuum.c
***************
*** 155,164 **** xlogVacuumPage(Relation index, Buffer buffer)
  static bool
  ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
  {
!     Buffer        buffer = ReadBufferWithStrategy(gvs->index, blkno, gvs->strategy);
!     Page        page = BufferGetPage(buffer);
      bool        hasVoidPage = FALSE;

      /*
       * We should be sure that we don't concurrent with inserts, insert process
       * never release root page until end (but it can unlock it and lock
--- 155,168 ----
  static bool
  ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer)
  {
!     Buffer        buffer;
!     Page        page;
      bool        hasVoidPage = FALSE;

+     buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
+                                 RBM_NORMAL, gvs->strategy);
+     page = BufferGetPage(buffer);
+
      /*
       * We should be sure that we don't concurrent with inserts, insert process
       * never release root page until end (but it can unlock it and lock
***************
*** 241,253 **** static void
  ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
                BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
  {
!     Buffer        dBuffer = ReadBufferWithStrategy(gvs->index, deleteBlkno, gvs->strategy);
!     Buffer        lBuffer = (leftBlkno == InvalidBlockNumber) ?
!     InvalidBuffer : ReadBufferWithStrategy(gvs->index, leftBlkno, gvs->strategy);
!     Buffer        pBuffer = ReadBufferWithStrategy(gvs->index, parentBlkno, gvs->strategy);
      Page        page,
                  parentPage;

      LockBuffer(dBuffer, GIN_EXCLUSIVE);
      if (!isParentRoot)            /* parent is already locked by
                                   * LockBufferForCleanup() */
--- 245,268 ----
  ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkno,
                BlockNumber parentBlkno, OffsetNumber myoff, bool isParentRoot)
  {
!     Buffer        dBuffer;
!     Buffer        lBuffer;
!     Buffer        pBuffer;
      Page        page,
                  parentPage;

+     dBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, deleteBlkno,
+                                  RBM_NORMAL, gvs->strategy);
+
+     if (leftBlkno != InvalidBlockNumber)
+         lBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, leftBlkno,
+                                      RBM_NORMAL, gvs->strategy);
+     else
+         lBuffer = InvalidBuffer;
+
+     pBuffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, parentBlkno,
+                                  RBM_NORMAL, gvs->strategy);
+
      LockBuffer(dBuffer, GIN_EXCLUSIVE);
      if (!isParentRoot)            /* parent is already locked by
                                   * LockBufferForCleanup() */
***************
*** 401,407 **** ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, DataPageDel
              me = parent->child;
      }

!     buffer = ReadBufferWithStrategy(gvs->index, blkno, gvs->strategy);
      page = BufferGetPage(buffer);

      Assert(GinPageIsData(page));
--- 416,423 ----
              me = parent->child;
      }

!     buffer = ReadBufferExtended(gvs->index, MAIN_FORKNUM, blkno,
!                                 RBM_NORMAL, gvs->strategy);
      page = BufferGetPage(buffer);

      Assert(GinPageIsData(page));
***************
*** 589,595 **** ginbulkdelete(PG_FUNCTION_ARGS)
      gvs.strategy = info->strategy;
      initGinState(&gvs.ginstate, index);

!     buffer = ReadBufferWithStrategy(index, blkno, info->strategy);

      /* find leaf page */
      for (;;)
--- 605,612 ----
      gvs.strategy = info->strategy;
      initGinState(&gvs.ginstate, index);

!     buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
!                                 RBM_NORMAL, info->strategy);

      /* find leaf page */
      for (;;)
***************
*** 621,627 **** ginbulkdelete(PG_FUNCTION_ARGS)
          Assert(blkno != InvalidBlockNumber);

          UnlockReleaseBuffer(buffer);
!         buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
      }

      /* right now we found leftmost page in entry's BTree */
--- 638,645 ----
          Assert(blkno != InvalidBlockNumber);

          UnlockReleaseBuffer(buffer);
!         buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
!                                     RBM_NORMAL, info->strategy);
      }

      /* right now we found leftmost page in entry's BTree */
***************
*** 663,669 **** ginbulkdelete(PG_FUNCTION_ARGS)
          if (blkno == InvalidBlockNumber)        /* rightmost page */
              break;

!         buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
          LockBuffer(buffer, GIN_EXCLUSIVE);
      }

--- 681,688 ----
          if (blkno == InvalidBlockNumber)        /* rightmost page */
              break;

!         buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
!                                     RBM_NORMAL, info->strategy);
          LockBuffer(buffer, GIN_EXCLUSIVE);
      }

***************
*** 718,724 **** ginvacuumcleanup(PG_FUNCTION_ARGS)

          vacuum_delay_point();

!         buffer = ReadBufferWithStrategy(index, blkno, info->strategy);
          LockBuffer(buffer, GIN_SHARE);
          page = (Page) BufferGetPage(buffer);

--- 737,744 ----

          vacuum_delay_point();

!         buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
!                                     RBM_NORMAL, info->strategy);
          LockBuffer(buffer, GIN_SHARE);
          page = (Page) BufferGetPage(buffer);

*** src/backend/access/gist/gistvacuum.c
--- src/backend/access/gist/gistvacuum.c
***************
*** 86,92 **** gistDeleteSubtree(GistVacuum *gv, BlockNumber blkno)
      Buffer        buffer;
      Page        page;

!     buffer = ReadBufferWithStrategy(gv->index, blkno, gv->strategy);
      LockBuffer(buffer, GIST_EXCLUSIVE);
      page = (Page) BufferGetPage(buffer);

--- 86,93 ----
      Buffer        buffer;
      Page        page;

!     buffer = ReadBufferExtended(gv->index, MAIN_FORKNUM, blkno, RBM_NORMAL,
!                                 gv->strategy);
      LockBuffer(buffer, GIST_EXCLUSIVE);
      page = (Page) BufferGetPage(buffer);

***************
*** 306,312 **** gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)

      vacuum_delay_point();

!     buffer = ReadBufferWithStrategy(gv->index, blkno, gv->strategy);
      LockBuffer(buffer, GIST_EXCLUSIVE);
      gistcheckpage(gv->index, buffer);
      page = (Page) BufferGetPage(buffer);
--- 307,314 ----

      vacuum_delay_point();

!     buffer = ReadBufferExtended(gv->index, MAIN_FORKNUM, blkno, RBM_NORMAL,
!                                 gv->strategy);
      LockBuffer(buffer, GIST_EXCLUSIVE);
      gistcheckpage(gv->index, buffer);
      page = (Page) BufferGetPage(buffer);
***************
*** 595,601 **** gistvacuumcleanup(PG_FUNCTION_ARGS)

          vacuum_delay_point();

!         buffer = ReadBufferWithStrategy(rel, blkno, info->strategy);
          LockBuffer(buffer, GIST_SHARE);
          page = (Page) BufferGetPage(buffer);

--- 597,604 ----

          vacuum_delay_point();

!         buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
!                                     info->strategy);
          LockBuffer(buffer, GIST_SHARE);
          page = (Page) BufferGetPage(buffer);

***************
*** 691,703 **** gistbulkdelete(PG_FUNCTION_ARGS)

      while (stack)
      {
!         Buffer        buffer = ReadBufferWithStrategy(rel, stack->blkno, info->strategy);
          Page        page;
          OffsetNumber i,
                      maxoff;
          IndexTuple    idxtuple;
          ItemId        iid;

          LockBuffer(buffer, GIST_SHARE);
          gistcheckpage(rel, buffer);
          page = (Page) BufferGetPage(buffer);
--- 694,708 ----

      while (stack)
      {
!         Buffer        buffer;
          Page        page;
          OffsetNumber i,
                      maxoff;
          IndexTuple    idxtuple;
          ItemId        iid;

+         buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno,
+                                     RBM_NORMAL, info->strategy);
          LockBuffer(buffer, GIST_SHARE);
          gistcheckpage(rel, buffer);
          page = (Page) BufferGetPage(buffer);
*** src/backend/access/hash/hashpage.c
--- src/backend/access/hash/hashpage.c
***************
*** 158,164 **** _hash_getinitbuf(Relation rel, BlockNumber blkno)
      if (blkno == P_NEW)
          elog(ERROR, "hash AM does not use P_NEW");

!     buf = ReadOrZeroBuffer(rel, MAIN_FORKNUM, blkno);

      LockBuffer(buf, HASH_WRITE);

--- 158,164 ----
      if (blkno == P_NEW)
          elog(ERROR, "hash AM does not use P_NEW");

!     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO, NULL);

      LockBuffer(buf, HASH_WRITE);

***************
*** 203,209 **** _hash_getnewbuf(Relation rel, BlockNumber blkno)
                   BufferGetBlockNumber(buf), blkno);
      }
      else
!         buf = ReadOrZeroBuffer(rel, MAIN_FORKNUM, blkno);

      LockBuffer(buf, HASH_WRITE);

--- 203,209 ----
                   BufferGetBlockNumber(buf), blkno);
      }
      else
!         buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO, NULL);

      LockBuffer(buf, HASH_WRITE);

***************
*** 231,237 **** _hash_getbuf_with_strategy(Relation rel, BlockNumber blkno,
      if (blkno == P_NEW)
          elog(ERROR, "hash AM does not use P_NEW");

!     buf = ReadBufferWithStrategy(rel, blkno, bstrategy);

      if (access != HASH_NOLOCK)
          LockBuffer(buf, access);
--- 231,237 ----
      if (blkno == P_NEW)
          elog(ERROR, "hash AM does not use P_NEW");

!     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, bstrategy);

      if (access != HASH_NOLOCK)
          LockBuffer(buf, access);
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 205,213 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
      }

      /* read page using selected strategy */
!     scan->rs_cbuf = ReadBufferWithStrategy(scan->rs_rd,
!                                            page,
!                                            scan->rs_strategy);
      scan->rs_cblock = page;

      if (!scan->rs_pageatatime)
--- 205,212 ----
      }

      /* read page using selected strategy */
!     scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
!                                        RBM_NORMAL, scan->rs_strategy);
      scan->rs_cblock = page;

      if (!scan->rs_pageatatime)
*** src/backend/access/nbtree/nbtree.c
--- src/backend/access/nbtree/nbtree.c
***************
*** 750,756 **** restart:
       * recycle all-zero pages, not fail.  Also, we want to use a nondefault
       * buffer access strategy.
       */
!     buf = ReadBufferWithStrategy(rel, blkno, info->strategy);
      LockBuffer(buf, BT_READ);
      page = BufferGetPage(buf);
      opaque = (BTPageOpaque) PageGetSpecialPointer(page);
--- 750,757 ----
       * recycle all-zero pages, not fail.  Also, we want to use a nondefault
       * buffer access strategy.
       */
!     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
!                              info->strategy);
      LockBuffer(buf, BT_READ);
      page = BufferGetPage(buf);
      opaque = (BTPageOpaque) PageGetSpecialPointer(page);
*** src/backend/access/transam/xlog.c
--- src/backend/access/transam/xlog.c
***************
*** 2897,2904 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
          memcpy(&bkpb, blk, sizeof(BkpBlock));
          blk += sizeof(BkpBlock);

!         buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
!                                         true);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

--- 2897,2904 ----
          memcpy(&bkpb, blk, sizeof(BkpBlock));
          blk += sizeof(BkpBlock);

!         buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
!                                         RBM_ZERO);
          Assert(BufferIsValid(buffer));
          page = (Page) BufferGetPage(buffer);

*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 200,205 **** XLogCheckInvalidPages(void)
--- 200,219 ----
      invalid_page_tab = NULL;
  }

+ /*
+  * XLogReadBufferExtended
+  *        A shorthand of XLogReadBufferExtended(), for reading from the main
+  *        fork.
+  *
+  * For legacy reasons, instead of a ReadBufferMode argument, this only
+  * supports RBM_ZERO (init == true) and RBM_NORMAL (init == false) modes.
+  */
+ Buffer
+ XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
+ {
+     return XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+                                   init ? RBM_ZERO : RBM_NORMAL);
+ }

  /*
   * XLogReadBuffer
***************
*** 211,244 **** XLogCheckInvalidPages(void)
   * expect that this is only used during single-process XLOG replay, but
   * some subroutines such as MarkBufferDirty will complain if we don't.)
   *
!  * If "init" is true then the caller intends to rewrite the page fully
!  * using the info in the XLOG record.  In this case we will extend the
!  * relation if needed to make the page exist, and we will not complain about
!  * the page being "new" (all zeroes); in fact, we usually will supply a
!  * zeroed buffer without reading the page at all, so as to avoid unnecessary
!  * failure if the page is present on disk but has corrupt headers.
   *
!  * If "init" is false then the caller needs the page to be valid already.
!  * If the page doesn't exist or contains zeroes, we return InvalidBuffer.
   * In this case the caller should silently skip the update on this page.
   * (In this situation, we expect that the page was later dropped or truncated.
   * If we don't see evidence of that later in the WAL sequence, we'll complain
   * at the end of WAL replay.)
   */
  Buffer
! XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
! {
!     return XLogReadBufferWithFork(rnode, MAIN_FORKNUM, blkno, init);
! }
!
! /*
!  * XLogReadBufferWithFork
!  *        Like XLogReadBuffer, but for reading other relation forks than
!  *        the main one.
!  */
! Buffer
! XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
!                        BlockNumber blkno, bool init)
  {
      BlockNumber lastblock;
      Buffer        buffer;
--- 225,246 ----
   * expect that this is only used during single-process XLOG replay, but
   * some subroutines such as MarkBufferDirty will complain if we don't.)
   *
!  * There's a couple of differences in the behavior wrt. the "mode" argument,
!  * compared to ReadBufferExtended:
   *
!  * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
!  * return InvalidBuffer.
   * In this case the caller should silently skip the update on this page.
   * (In this situation, we expect that the page was later dropped or truncated.
   * If we don't see evidence of that later in the WAL sequence, we'll complain
   * at the end of WAL replay.)
+  *
+  * In RBM_ZERO AND RBM_ZERO_ON_ERROR modes, if the page doesn't exist, the
+  * relation is extended with all-zeroes pages up to the given block number.
   */
  Buffer
! XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
!                        BlockNumber blkno, ReadBufferMode mode)
  {
      BlockNumber lastblock;
      Buffer        buffer;
***************
*** 264,275 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
      if (blkno < lastblock)
      {
          /* page exists in file */
!         buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno, init);
      }
      else
      {
          /* hm, page doesn't exist in file */
!         if (!init)
          {
              log_invalid_page(rnode, forknum, blkno, false);
              return InvalidBuffer;
--- 266,278 ----
      if (blkno < lastblock)
      {
          /* page exists in file */
!         buffer = ReadBufferWithoutRelcache(rnode, false, forknum, blkno,
!                                            mode, NULL);
      }
      else
      {
          /* hm, page doesn't exist in file */
!         if (mode == RBM_NORMAL)
          {
              log_invalid_page(rnode, forknum, blkno, false);
              return InvalidBuffer;
***************
*** 283,289 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
              if (buffer != InvalidBuffer)
                  ReleaseBuffer(buffer);
              buffer = ReadBufferWithoutRelcache(rnode, false, forknum,
!                                                P_NEW, false);
              lastblock++;
          }
          Assert(BufferGetBlockNumber(buffer) == blkno);
--- 286,292 ----
              if (buffer != InvalidBuffer)
                  ReleaseBuffer(buffer);
              buffer = ReadBufferWithoutRelcache(rnode, false, forknum,
!                                                P_NEW, mode, NULL);
              lastblock++;
          }
          Assert(BufferGetBlockNumber(buffer) == blkno);
***************
*** 291,297 **** XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,

      LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

!     if (!init)
      {
          /* check that page has been initialized */
          Page        page = (Page) BufferGetPage(buffer);
--- 294,300 ----

      LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);

!     if (mode == RBM_NORMAL)
      {
          /* check that page has been initialized */
          Page        page = (Page) BufferGetPage(buffer);
*** src/backend/commands/analyze.c
--- src/backend/commands/analyze.c
***************
*** 911,917 **** acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
           * each tuple, but since we aren't doing much work per tuple, the
           * extra lock traffic is probably better avoided.
           */
!         targbuffer = ReadBufferWithStrategy(onerel, targblock, vac_strategy);
          LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
          targpage = BufferGetPage(targbuffer);
          maxoffset = PageGetMaxOffsetNumber(targpage);
--- 911,918 ----
           * each tuple, but since we aren't doing much work per tuple, the
           * extra lock traffic is probably better avoided.
           */
!         targbuffer = ReadBufferExtended(onerel, MAIN_FORKNUM, targblock,
!                                         RBM_NORMAL, vac_strategy);
          LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
          targpage = BufferGetPage(targbuffer);
          maxoffset = PageGetMaxOffsetNumber(targpage);
*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 1348,1354 **** scan_heap(VRelStats *vacrelstats, Relation onerel,

          vacuum_delay_point();

!         buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
          page = BufferGetPage(buf);

          /*
--- 1348,1355 ----

          vacuum_delay_point();

!         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
!                                  vac_strategy);
          page = BufferGetPage(buf);

          /*
***************
*** 1919,1925 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
          /*
           * Process this page of relation.
           */
!         buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);
          page = BufferGetPage(buf);

          vacpage->offsets_free = 0;
--- 1920,1927 ----
          /*
           * Process this page of relation.
           */
!         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno, RBM_NORMAL,
!                                  vac_strategy);
          page = BufferGetPage(buf);

          vacpage->offsets_free = 0;
***************
*** 2173,2181 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
                      nextTid = tp.t_data->t_ctid;
                      priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
                      /* assume block# is OK (see heap_fetch comments) */
!                     nextBuf = ReadBufferWithStrategy(onerel,
                                           ItemPointerGetBlockNumber(&nextTid),
!                                                      vac_strategy);
                      nextPage = BufferGetPage(nextBuf);
                      /* If bogus or unused slot, assume tp is end of chain */
                      nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
--- 2175,2183 ----
                      nextTid = tp.t_data->t_ctid;
                      priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
                      /* assume block# is OK (see heap_fetch comments) */
!                     nextBuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
                                           ItemPointerGetBlockNumber(&nextTid),
!                                          RBM_NORMAL, vac_strategy);
                      nextPage = BufferGetPage(nextBuf);
                      /* If bogus or unused slot, assume tp is end of chain */
                      nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
***************
*** 2318,2326 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
                          break;    /* out of check-all-items loop */
                      }
                      tp.t_self = vtlp->this_tid;
!                     Pbuf = ReadBufferWithStrategy(onerel,
                                       ItemPointerGetBlockNumber(&(tp.t_self)),
!                                                   vac_strategy);
                      Ppage = BufferGetPage(Pbuf);
                      Pitemid = PageGetItemId(Ppage,
                                     ItemPointerGetOffsetNumber(&(tp.t_self)));
--- 2320,2328 ----
                          break;    /* out of check-all-items loop */
                      }
                      tp.t_self = vtlp->this_tid;
!                     Pbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
                                       ItemPointerGetBlockNumber(&(tp.t_self)),
!                                      RBM_NORMAL, vac_strategy);
                      Ppage = BufferGetPage(Pbuf);
                      Pitemid = PageGetItemId(Ppage,
                                     ItemPointerGetOffsetNumber(&(tp.t_self)));
***************
*** 2402,2415 **** repair_frag(VRelStats *vacrelstats, Relation onerel,

                      /* Get page to move from */
                      tuple.t_self = vtmove[ti].tid;
!                     Cbuf = ReadBufferWithStrategy(onerel,
                                    ItemPointerGetBlockNumber(&(tuple.t_self)),
!                                                   vac_strategy);

                      /* Get page to move to */
!                     dst_buffer = ReadBufferWithStrategy(onerel,
!                                                         destvacpage->blkno,
!                                                         vac_strategy);

                      LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
                      if (dst_buffer != Cbuf)
--- 2404,2417 ----

                      /* Get page to move from */
                      tuple.t_self = vtmove[ti].tid;
!                     Cbuf = ReadBufferExtended(onerel, MAIN_FORKNUM,
                                    ItemPointerGetBlockNumber(&(tuple.t_self)),
!                                   RBM_NORMAL, vac_strategy);

                      /* Get page to move to */
!                     dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
!                                                     destvacpage->blkno,
!                                                     RBM_NORMAL, vac_strategy);

                      LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
                      if (dst_buffer != Cbuf)
***************
*** 2502,2510 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
                  if (i == num_fraged_pages)
                      break;        /* can't move item anywhere */
                  dst_vacpage = fraged_pages->pagedesc[i];
!                 dst_buffer = ReadBufferWithStrategy(onerel,
!                                                     dst_vacpage->blkno,
!                                                     vac_strategy);
                  LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
                  dst_page = BufferGetPage(dst_buffer);
                  /* if this page was not used before - clean it */
--- 2504,2512 ----
                  if (i == num_fraged_pages)
                      break;        /* can't move item anywhere */
                  dst_vacpage = fraged_pages->pagedesc[i];
!                 dst_buffer = ReadBufferExtended(onerel, MAIN_FORKNUM,
!                                                 dst_vacpage->blkno,
!                                                 RBM_NORMAL, vac_strategy);
                  LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
                  dst_page = BufferGetPage(dst_buffer);
                  /* if this page was not used before - clean it */
***************
*** 2681,2689 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
              Page        page;

              /* this page was not used as a move target, so must clean it */
!             buf = ReadBufferWithStrategy(onerel,
!                                          (*curpage)->blkno,
!                                          vac_strategy);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              page = BufferGetPage(buf);
              if (!PageIsEmpty(page))
--- 2683,2690 ----
              Page        page;

              /* this page was not used as a move target, so must clean it */
!             buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*curpage)->blkno,
!                                      RBM_NORMAL, vac_strategy);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              page = BufferGetPage(buf);
              if (!PageIsEmpty(page))
***************
*** 2770,2776 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
              int            uncnt = 0;
              int            num_tuples = 0;

!             buf = ReadBufferWithStrategy(onerel, vacpage->blkno, vac_strategy);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              page = BufferGetPage(buf);
              maxoff = PageGetMaxOffsetNumber(page);
--- 2771,2778 ----
              int            uncnt = 0;
              int            num_tuples = 0;

!             buf = ReadBufferExtended(onerel, MAIN_FORKNUM, vacpage->blkno,
!                                      RBM_NORMAL, vac_strategy);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              page = BufferGetPage(buf);
              maxoff = PageGetMaxOffsetNumber(page);
***************
*** 3150,3156 **** update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
              break;                /* no need to scan any further */
          if ((*curpage)->offsets_used == 0)
              continue;            /* this page was never used as a move dest */
!         buf = ReadBufferWithStrategy(rel, (*curpage)->blkno, vac_strategy);
          LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
          page = BufferGetPage(buf);
          max_offset = PageGetMaxOffsetNumber(page);
--- 3152,3159 ----
              break;                /* no need to scan any further */
          if ((*curpage)->offsets_used == 0)
              continue;            /* this page was never used as a move dest */
!         buf = ReadBufferExtended(rel, MAIN_FORKNUM, (*curpage)->blkno,
!                                  RBM_NORMAL, vac_strategy);
          LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
          page = BufferGetPage(buf);
          max_offset = PageGetMaxOffsetNumber(page);
***************
*** 3219,3227 **** vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)

          if ((*vacpage)->offsets_free > 0)
          {
!             buf = ReadBufferWithStrategy(onerel,
!                                          (*vacpage)->blkno,
!                                          vac_strategy);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              vacuum_page(onerel, buf, *vacpage);
              UnlockReleaseBuffer(buf);
--- 3222,3229 ----

          if ((*vacpage)->offsets_free > 0)
          {
!             buf = ReadBufferExtended(onerel, MAIN_FORKNUM, (*vacpage)->blkno,
!                                      RBM_NORMAL, vac_strategy);
              LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
              vacuum_page(onerel, buf, *vacpage);
              UnlockReleaseBuffer(buf);
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 301,307 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
              vacrelstats->num_index_scans++;
          }

!         buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);

          /* We need buffer cleanup lock so that we can prune HOT chains. */
          LockBufferForCleanup(buf);
--- 301,308 ----
              vacrelstats->num_index_scans++;
          }

!         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
!                                  RBM_NORMAL, vac_strategy);

          /* We need buffer cleanup lock so that we can prune HOT chains. */
          LockBufferForCleanup(buf);
***************
*** 618,624 **** lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
          vacuum_delay_point();

          tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
!         buf = ReadBufferWithStrategy(onerel, tblk, vac_strategy);
          LockBufferForCleanup(buf);
          tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);

--- 619,626 ----
          vacuum_delay_point();

          tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
!         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
!                                  vac_strategy);
          LockBufferForCleanup(buf);
          tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);

***************
*** 880,886 **** count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)

          blkno--;

!         buf = ReadBufferWithStrategy(onerel, blkno, vac_strategy);

          /* In this phase we only need shared access to the buffer */
          LockBuffer(buf, BUFFER_LOCK_SHARE);
--- 882,889 ----

          blkno--;

!         buf = ReadBufferExtended(onerel, MAIN_FORKNUM, blkno,
!                                  RBM_NORMAL, vac_strategy);

          /* In this phase we only need shared access to the buffer */
          LockBuffer(buf, BUFFER_LOCK_SHARE);
*** src/backend/storage/buffer/bufmgr.c
--- src/backend/storage/buffer/bufmgr.c
***************
*** 72,82 **** static bool IsForInput;
  static volatile BufferDesc *PinCountWaitBuf = NULL;


- static Buffer ReadBuffer_relcache(Relation reln, ForkNumber forkNum,
-         BlockNumber blockNum, bool zeroPage, BufferAccessStrategy strategy);
  static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
!                   ForkNumber forkNum, BlockNumber blockNum,
!                   bool zeroPage, BufferAccessStrategy strategy, bool *hit);
  static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
--- 72,81 ----
  static volatile BufferDesc *PinCountWaitBuf = NULL;


  static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf,
!                     ForkNumber forkNum, BlockNumber blockNum,
!                     ReadBufferMode mode , BufferAccessStrategy strategy,
!                     bool *hit);
  static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
  static void PinBuffer_Locked(volatile BufferDesc *buf);
  static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
***************
*** 96,101 **** static void AtProcExit_Buffers(int code, Datum arg);
--- 95,111 ----


  /*
+  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
+  *        fork with RBM_NORMAL mode and default strategy.
+  */
+ Buffer
+ ReadBuffer(Relation reln, BlockNumber blockNum)
+ {
+     return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
+ }
+
+
+ /*
   * ReadBuffer -- returns a buffer containing the requested
   *        block of the requested relation.  If the blknum
   *        requested is P_NEW, extend the relation file and
***************
*** 107,181 **** static void AtProcExit_Buffers(int code, Datum arg);
   *        the block read.  The returned buffer has been pinned.
   *        Does not return on error --- elog's instead.
   *
!  * Assume when this function is called, that reln has been
!  *        opened already.
!  */
! Buffer
! ReadBuffer(Relation reln, BlockNumber blockNum)
! {
!     return ReadBuffer_relcache(reln, MAIN_FORKNUM, blockNum, false, NULL);
! }
!
! /*
!  * ReadBufferWithFork -- same as ReadBuffer, but for accessing relation
!  *        forks other than MAIN_FORKNUM.
!  */
! Buffer
! ReadBufferWithFork(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
! {
!     return ReadBuffer_relcache(reln, forkNum, blockNum, false, NULL);
! }
!
! /*
!  * ReadBufferWithStrategy -- same as ReadBuffer, except caller can specify
!  *        a nondefault buffer access strategy.  See buffer/README for details.
!  */
! Buffer
! ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
!                        BufferAccessStrategy strategy)
! {
!     return ReadBuffer_relcache(reln, MAIN_FORKNUM, blockNum, false, strategy);
! }
!
! /*
!  * ReadOrZeroBuffer -- like ReadBuffer, but if the page isn't in buffer
!  *        cache already, it's filled with zeros instead of reading it from
!  *        disk.  Useful when the caller intends to fill the page from scratch,
!  *        since this saves I/O and avoids unnecessary failure if the
!  *        page-on-disk has corrupt page headers.
!  *
!  *        Caution: do not use this to read a page that is beyond the relation's
!  *        current physical EOF; that is likely to cause problems in md.c when
!  *        the page is modified and written out.  P_NEW is OK, though.
!  */
! Buffer
! ReadOrZeroBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
! {
!     return ReadBuffer_relcache(reln, forkNum, blockNum, true, NULL);
! }
!
! /*
!  * ReadBufferWithoutRelcache -- like ReadBuffer, but doesn't require a
!  *        relcache entry for the relation. If zeroPage is true, this behaves
!  *        like ReadOrZeroBuffer rather than ReadBuffer.
   */
  Buffer
! ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
!                       ForkNumber forkNum, BlockNumber blockNum, bool zeroPage)
! {
!     bool hit;
!
!     SMgrRelation smgr = smgropen(rnode);
!     return ReadBuffer_common(smgr, isTemp, forkNum, blockNum, zeroPage, NULL, &hit);
! }
!
! /*
!  * ReadBuffer_relcache -- common logic for ReadBuffer-variants that
!  *        operate on a Relation.
!  */
! static Buffer
! ReadBuffer_relcache(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
!                     bool zeroPage, BufferAccessStrategy strategy)
  {
      bool hit;
      Buffer buf;
--- 117,146 ----
   *        the block read.  The returned buffer has been pinned.
   *        Does not return on error --- elog's instead.
   *
!  * Assume when this function is called, that reln has been opened already.
!  *
!  * In RBM_NORMAL mode, the page is read from disk, and the page header is
!  * validated. An error is thrown if the page header is not valid.
!  *
!  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
!  * valid, the page is zeroed instead of throwing an error. This is intended
!  * for non-critical data, where the caller is prepared to deal repair
!  * errors.
!  *
!  * In RBM_ZERO mode, if the page isn't in buffer cache already, it's filled
!  * with zeros instead of reading it from disk.  Useful when the caller is
!  * going to fill the page from scratch, since this saves I/O and avoids
!  * unnecessary failure if the page-on-disk has corrupt page headers.
!  * Caution: do not use this mode to read a page that is beyond the relation's
!  * current physical EOF; that is likely to cause problems in md.c when
!  * the page is modified and written out. P_NEW is OK, though.
!  *
!  * If strategy is not NULL, a nondefault buffer access strategy is used.
!  * See buffer/README for details.
   */
  Buffer
! ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
!                    ReadBufferMode mode, BufferAccessStrategy strategy)
  {
      bool hit;
      Buffer buf;
***************
*** 189,200 **** ReadBuffer_relcache(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
       */
      pgstat_count_buffer_read(reln);
      buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, forkNum, blockNum,
!                             zeroPage, strategy, &hit);
      if (hit)
          pgstat_count_buffer_hit(reln);
      return buf;
  }

  /*
   * ReadBuffer_common -- common logic for all ReadBuffer variants
   *
--- 154,183 ----
       */
      pgstat_count_buffer_read(reln);
      buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, forkNum, blockNum,
!                             mode, strategy, &hit);
      if (hit)
          pgstat_count_buffer_hit(reln);
      return buf;
  }

+
+ /*
+  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
+  *        a relcache entry for the relation.
+  */
+ Buffer
+ ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
+                           ForkNumber forkNum, BlockNumber blockNum,
+                           ReadBufferMode mode, BufferAccessStrategy strategy)
+ {
+     bool hit;
+
+     SMgrRelation smgr = smgropen(rnode);
+     return ReadBuffer_common(smgr, isTemp, forkNum, blockNum, mode, strategy,
+                              &hit);
+ }
+
+
  /*
   * ReadBuffer_common -- common logic for all ReadBuffer variants
   *
***************
*** 202,208 **** ReadBuffer_relcache(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
   */
  static Buffer
  ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
!                   BlockNumber blockNum, bool zeroPage,
                    BufferAccessStrategy strategy, bool *hit)
  {
      volatile BufferDesc *bufHdr;
--- 185,191 ----
   */
  static Buffer
  ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
!                   BlockNumber blockNum, ReadBufferMode mode,
                    BufferAccessStrategy strategy, bool *hit)
  {
      volatile BufferDesc *bufHdr;
***************
*** 295,302 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
          bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
          if (!PageIsNew((Page) bufBlock))
              ereport(ERROR,
!                     (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u",
!                             blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode),
                       errhint("This has been seen to occur with buggy kernels; consider updating your system.")));

          /*
--- 278,285 ----
          bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
          if (!PageIsNew((Page) bufBlock))
              ereport(ERROR,
!                     (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u/%u",
!                             blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode,
forkNum),
                       errhint("This has been seen to occur with buggy kernels; consider updating your system.")));

          /*
***************
*** 356,362 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
           * Read in the page, unless the caller intends to overwrite it and
           * just wants us to allocate a buffer.
           */
!         if (zeroPage)
              MemSet((char *) bufBlock, 0, BLCKSZ);
          else
          {
--- 339,345 ----
           * Read in the page, unless the caller intends to overwrite it and
           * just wants us to allocate a buffer.
           */
!         if (mode == RBM_ZERO)
              MemSet((char *) bufBlock, 0, BLCKSZ);
          else
          {
***************
*** 365,388 **** ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, ForkNumber forkNum,
              /* check for garbage data */
              if (!PageHeaderIsValid((PageHeader) bufBlock))
              {
!                 if (zero_damaged_pages)
                  {
                      ereport(WARNING,
                              (errcode(ERRCODE_DATA_CORRUPTED),
!                              errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page",
                                      blockNum,
                                      smgr->smgr_rnode.spcNode,
                                      smgr->smgr_rnode.dbNode,
!                                     smgr->smgr_rnode.relNode)));
                      MemSet((char *) bufBlock, 0, BLCKSZ);
                  }
                  else
                      ereport(ERROR,
                              (errcode(ERRCODE_DATA_CORRUPTED),
!                              errmsg("invalid page header in block %u of relation %u/%u/%u",
                                      blockNum, smgr->smgr_rnode.spcNode,
                                      smgr->smgr_rnode.dbNode,
!                                     smgr->smgr_rnode.relNode)));
              }
          }
      }
--- 348,372 ----
              /* check for garbage data */
              if (!PageHeaderIsValid((PageHeader) bufBlock))
              {
!                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
                  {
                      ereport(WARNING,
                              (errcode(ERRCODE_DATA_CORRUPTED),
!                              errmsg("invalid page header in block %u of relation %u/%u/%u/%u; zeroing out page",
                                      blockNum,
                                      smgr->smgr_rnode.spcNode,
                                      smgr->smgr_rnode.dbNode,
!                                     smgr->smgr_rnode.relNode,
!                                     forkNum)));
                      MemSet((char *) bufBlock, 0, BLCKSZ);
                  }
                  else
                      ereport(ERROR,
                              (errcode(ERRCODE_DATA_CORRUPTED),
!                              errmsg("invalid page header in block %u of relation %u/%u/%u/%u",
                                      blockNum, smgr->smgr_rnode.spcNode,
                                      smgr->smgr_rnode.dbNode,
!                                     smgr->smgr_rnode.relNode, forkNum)));
              }
          }
      }
***************
*** 1679,1688 **** PrintBufferLeakWarning(Buffer buffer)
      /* theoretically we should lock the bufhdr here */
      elog(WARNING,
           "buffer refcount leak: [%03d] "
!          "(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
           buffer,
           buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!          buf->tag.rnode.relNode,
           buf->tag.blockNum, buf->flags,
           buf->refcount, loccount);
  }
--- 1663,1672 ----
      /* theoretically we should lock the bufhdr here */
      elog(WARNING,
           "buffer refcount leak: [%03d] "
!          "(rel=%u/%u/%u, forkNum=%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
           buffer,
           buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!          buf->tag.rnode.relNode, buf->tag.forkNum,
           buf->tag.blockNum, buf->flags,
           buf->refcount, loccount);
  }
***************
*** 1991,2001 **** PrintBufferDescs(void)
      {
          /* theoretically we should lock the bufhdr here */
          elog(LOG,
!              "[%02d] (freeNext=%d, rel=%u/%u/%u, "
               "blockNum=%u, flags=0x%x, refcount=%u %d)",
               i, buf->freeNext,
               buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!              buf->tag.rnode.relNode,
               buf->tag.blockNum, buf->flags,
               buf->refcount, PrivateRefCount[i]);
      }
--- 1975,1985 ----
      {
          /* theoretically we should lock the bufhdr here */
          elog(LOG,
!              "[%02d] (freeNext=%d, rel=%u/%u/%u, forkNum=%u, "
               "blockNum=%u, flags=0x%x, refcount=%u %d)",
               i, buf->freeNext,
               buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!              buf->tag.rnode.relNode, buf->tag.forkNum,
               buf->tag.blockNum, buf->flags,
               buf->refcount, PrivateRefCount[i]);
      }
***************
*** 2015,2025 **** PrintPinnedBufs(void)
          {
              /* theoretically we should lock the bufhdr here */
              elog(LOG,
!                  "[%02d] (freeNext=%d, rel=%u/%u/%u, "
                   "blockNum=%u, flags=0x%x, refcount=%u %d)",
                   i, buf->freeNext,
                   buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!                  buf->tag.rnode.relNode,
                   buf->tag.blockNum, buf->flags,
                   buf->refcount, PrivateRefCount[i]);
          }
--- 1999,2009 ----
          {
              /* theoretically we should lock the bufhdr here */
              elog(LOG,
!                  "[%02d] (freeNext=%d, rel=%u/%u/%u, forkNum=%u, "
                   "blockNum=%u, flags=0x%x, refcount=%u %d)",
                   i, buf->freeNext,
                   buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
!                  buf->tag.rnode.relNode, buf->tag.forkNum,
                   buf->tag.blockNum, buf->flags,
                   buf->refcount, PrivateRefCount[i]);
          }
***************
*** 2654,2664 **** AbortBufferIO(void)
                  /* Buffer is pinned, so we can read tag without spinlock */
                  ereport(WARNING,
                          (errcode(ERRCODE_IO_ERROR),
!                          errmsg("could not write block %u of %u/%u/%u",
                                  buf->tag.blockNum,
                                  buf->tag.rnode.spcNode,
                                  buf->tag.rnode.dbNode,
!                                 buf->tag.rnode.relNode),
                           errdetail("Multiple failures --- write error might be permanent.")));
              }
          }
--- 2638,2648 ----
                  /* Buffer is pinned, so we can read tag without spinlock */
                  ereport(WARNING,
                          (errcode(ERRCODE_IO_ERROR),
!                          errmsg("could not write block %u of %u/%u/%u/%u",
                                  buf->tag.blockNum,
                                  buf->tag.rnode.spcNode,
                                  buf->tag.rnode.dbNode,
!                                 buf->tag.rnode.relNode, buf->tag.forkNum),
                           errdetail("Multiple failures --- write error might be permanent.")));
              }
          }
***************
*** 2676,2684 **** buffer_write_error_callback(void *arg)

      /* Buffer is pinned, so we can read the tag without locking the spinlock */
      if (bufHdr != NULL)
!         errcontext("writing block %u of relation %u/%u/%u",
                     bufHdr->tag.blockNum,
                     bufHdr->tag.rnode.spcNode,
                     bufHdr->tag.rnode.dbNode,
!                    bufHdr->tag.rnode.relNode);
  }
--- 2660,2669 ----

      /* Buffer is pinned, so we can read the tag without locking the spinlock */
      if (bufHdr != NULL)
!         errcontext("writing block %u of relation %u/%u/%u/%u",
                     bufHdr->tag.blockNum,
                     bufHdr->tag.rnode.spcNode,
                     bufHdr->tag.rnode.dbNode,
!                    bufHdr->tag.rnode.relNode,
!                    bufHdr->tag.forkNum);
  }
*** src/backend/storage/freespace/freespace.c
--- src/backend/storage/freespace/freespace.c
***************
*** 504,509 **** static Buffer
--- 504,510 ----
  fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
  {
      BlockNumber blkno = fsm_logical_to_physical(addr);
+     Buffer buf;

      RelationOpenSmgr(rel);

***************
*** 518,524 **** fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
          else
              return InvalidBuffer;
      }
!     return ReadBufferWithFork(rel, FSM_FORKNUM, blkno);
  }

  /*
--- 519,536 ----
          else
              return InvalidBuffer;
      }
!
!     /*
!      * Use ZERO_ON_ERROR mode, and initialize the page if necessary. The FSM
!      * information is not accurate anyway, so it's better to clear corrupt
!      * pages than error out. Since the FSM changes are not WAL-logged, the
!      * so-called torn page problem on crash can lead to pages with corrupt
!      * headers, for example.
!      */
!     buf = ReadBufferExtended(rel, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR, NULL);
!     if (PageIsNew(BufferGetPage(buf)))
!         PageInit(BufferGetPage(buf), BLCKSZ, 0);
!     return buf;
  }

  /*
***************
*** 779,801 **** fsm_redo_truncate(xl_fsm_truncate *xlrec)
       * replay of the smgr truncation record to remove completely unused
       * pages.
       */
!     buf = XLogReadBufferWithFork(xlrec->node, FSM_FORKNUM, fsmblk, false);
      if (BufferIsValid(buf))
      {
!         fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
          MarkBufferDirty(buf);
          UnlockReleaseBuffer(buf);
      }
-     else
-     {
-         /*
-          * The page doesn't exist. Because FSM extensions are not WAL-logged,
-          * it's normal to have a truncation record for a page that doesn't
-          * exist. Tell xlogutils.c not to PANIC at the end of recovery
-          * because of the missing page
-          */
-         XLogTruncateRelation(xlrec->node, FSM_FORKNUM, fsmblk);
-     }
  }

  void
--- 791,808 ----
       * replay of the smgr truncation record to remove completely unused
       * pages.
       */
!     buf = XLogReadBufferExtended(xlrec->node, FSM_FORKNUM, fsmblk,
!                                  RBM_ZERO_ON_ERROR);
      if (BufferIsValid(buf))
      {
!         Page page = BufferGetPage(buf);
!
!         if (PageIsNew(page))
!             PageInit(page, BLCKSZ, 0);
!         fsm_truncate_avail(page, first_removed_slot);
          MarkBufferDirty(buf);
          UnlockReleaseBuffer(buf);
      }
  }

  void
*** src/include/access/xlogutils.h
--- src/include/access/xlogutils.h
***************
*** 12,17 ****
--- 12,18 ----
  #define XLOG_UTILS_H

  #include "storage/buf.h"
+ #include "storage/bufmgr.h"
  #include "storage/relfilenode.h"
  #include "storage/block.h"
  #include "utils/relcache.h"
***************
*** 25,32 **** extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
                                   BlockNumber nblocks);

  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
! extern Buffer XLogReadBufferWithFork(RelFileNode rnode, ForkNumber forknum,
!                                      BlockNumber blkno, bool init);

  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
--- 26,33 ----
                                   BlockNumber nblocks);

  extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
! extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
!                                      BlockNumber blkno, ReadBufferMode mode);

  extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
  extern void FreeFakeRelcacheEntry(Relation fakerel);
*** src/include/storage/bufmgr.h
--- src/include/storage/bufmgr.h
***************
*** 31,36 **** typedef enum BufferAccessStrategyType
--- 31,46 ----
      BAS_VACUUM                    /* VACUUM */
  } BufferAccessStrategyType;

+ /*
+  * Possible modes for ReadBufferExtended()
+  */
+ typedef enum
+ {
+     RBM_NORMAL,            /* Normal read */
+     RBM_ZERO,            /* Don't read from disk, caller will initialize */
+     RBM_ZERO_ON_ERROR    /* Read, but return an all-zeros page on error */
+ } ReadBufferMode;
+
  /* in globals.c ... this duplicates miscadmin.h */
  extern PGDLLIMPORT int NBuffers;

***************
*** 144,156 **** extern PGDLLIMPORT int32 *LocalRefCount;
   * prototypes for functions in bufmgr.c
   */
  extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
! extern Buffer ReadBufferWithFork(Relation reln, ForkNumber forkNum, BlockNumber blockNum);
! extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
!                        BufferAccessStrategy strategy);
! extern Buffer ReadOrZeroBuffer(Relation reln, ForkNumber forkNum,
!                                BlockNumber blockNum);
  extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
!                     ForkNumber forkNum, BlockNumber blockNum, bool zeroPage);
  extern void ReleaseBuffer(Buffer buffer);
  extern void UnlockReleaseBuffer(Buffer buffer);
  extern void MarkBufferDirty(Buffer buffer);
--- 154,165 ----
   * prototypes for functions in bufmgr.c
   */
  extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
! extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
!                                  BlockNumber blockNum, ReadBufferMode mode,
!                                  BufferAccessStrategy strategy);
  extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
!                         ForkNumber forkNum, BlockNumber blockNum,
!                         ReadBufferMode mode, BufferAccessStrategy strategy);
  extern void ReleaseBuffer(Buffer buffer);
  extern void UnlockReleaseBuffer(Buffer buffer);
  extern void MarkBufferDirty(Buffer buffer);

Re: Updating FSM on recovery

От
Simon Riggs
Дата:
On Thu, 2008-10-30 at 10:40 +0200, Heikki Linnakangas wrote:

> So, attached is a patch using an enum. Barring objections, I'll commit
> this.

I probably agree with the changes from reading your post, but I'd ask
that you hang fire on committing this for a few days.

It's just going to prevent Koichi and myself from submitting clean
patches on F-Day, or it will cause us to spend time on rework before
we've even submitted the patch. I'd like to avoid the pileup for now,
though don't have any problem with the rework after that point.

Thanks,

-- Simon Riggs           www.2ndQuadrant.comPostgreSQL Training, Services and Support



Re: Updating FSM on recovery

От
Simon Riggs
Дата:
On Thu, 2008-10-30 at 09:57 +0000, Simon Riggs wrote:
> On Thu, 2008-10-30 at 10:40 +0200, Heikki Linnakangas wrote:
> 
> > So, attached is a patch using an enum. Barring objections, I'll commit
> > this.
> 
> I probably agree with the changes from reading your post, but I'd ask
> that you hang fire on committing this for a few days.

Best thing from here is for me to just freeze my tree for next few days.
It will make my submission a few days out of date, but we can fix that
up fairly quickly.

-- Simon Riggs           www.2ndQuadrant.comPostgreSQL Training, Services and Support



Re: Updating FSM on recovery

От
Gregory Stark
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:

> Hmm. I think an enum is better than a bitmask here. At the moment, we need
> three different modes of operation:
> 1. Read the page as usual, throw an error on corrupted page (ReadBuffer())
> 2. Read the page, zero page on corruption (this is new)

Is this new? Would it make sense for zero_damaged_pages to use this? Perhaps
the enum should have an option to error on damaged pages, warn and zero
damaged pages, or just zero damaged pages.

We might also want different behaviour for pages for which the crc doesn't
match versus pages that have nonsensical page headers.

> 3. Don't read the page from disk, just allocate a buffer. (ReadOrZeroBuffer())

--  Gregory Stark EnterpriseDB          http://www.enterprisedb.com Ask me about EnterpriseDB's Slony Replication
support!