Re: Visibility map, partial vacuums

Поиск
Список
Период
Сортировка
От Heikki Linnakangas
Тема Re: Visibility map, partial vacuums
Дата
Msg-id 492F051C.40403@enterprisedb.com
обсуждение исходный текст
Ответ на Re: Visibility map, partial vacuums  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Список pgsql-hackers
Heikki Linnakangas wrote:
> Here's an updated version, ...

And here it is, for real...

--
   Heikki Linnakangas
   EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/heap/Makefile
--- src/backend/access/heap/Makefile
***************
*** 12,17 **** subdir = src/backend/access/heap
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global

! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o

  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global

! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o

  include $(top_srcdir)/src/backend/common.mk
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 47,52 ****
--- 47,53 ----
  #include "access/transam.h"
  #include "access/tuptoaster.h"
  #include "access/valid.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 195,200 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 196,202 ----
      int            ntup;
      OffsetNumber lineoff;
      ItemId        lpp;
+     bool        all_visible;

      Assert(page < scan->rs_nblocks);

***************
*** 233,252 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
      lines = PageGetMaxOffsetNumber(dp);
      ntup = 0;

      for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
           lineoff <= lines;
           lineoff++, lpp++)
      {
          if (ItemIdIsNormal(lpp))
          {
-             HeapTupleData loctup;
              bool        valid;

!             loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
!             loctup.t_len = ItemIdGetLength(lpp);
!             ItemPointerSet(&(loctup.t_self), page, lineoff);

!             valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
              if (valid)
                  scan->rs_vistuples[ntup++] = lineoff;
          }
--- 235,266 ----
      lines = PageGetMaxOffsetNumber(dp);
      ntup = 0;

+     /*
+      * If the all-visible flag indicates that all tuples on the page are
+      * visible to everyone, we can skip the per-tuple visibility tests.
+      */
+     all_visible = PageIsAllVisible(dp);
+
      for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
           lineoff <= lines;
           lineoff++, lpp++)
      {
          if (ItemIdIsNormal(lpp))
          {
              bool        valid;

!             if (all_visible)
!                 valid = true;
!             else
!             {
!                 HeapTupleData loctup;
!
!                 loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
!                 loctup.t_len = ItemIdGetLength(lpp);
!                 ItemPointerSet(&(loctup.t_self), page, lineoff);

!                 valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
!             }
              if (valid)
                  scan->rs_vistuples[ntup++] = lineoff;
          }
***************
*** 1860,1865 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1874,1880 ----
      TransactionId xid = GetCurrentTransactionId();
      HeapTuple    heaptup;
      Buffer        buffer;
+     bool        all_visible_cleared = false;

      if (relation->rd_rel->relhasoids)
      {
***************
*** 1920,1925 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1935,1946 ----

      RelationPutHeapTuple(relation, buffer, heaptup);

+     if (PageIsAllVisible(BufferGetPage(buffer)))
+     {
+         all_visible_cleared = true;
+         PageClearAllVisible(BufferGetPage(buffer));
+     }
+
      /*
       * XXX Should we set PageSetPrunable on this page ?
       *
***************
*** 1943,1948 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1964,1970 ----
          Page        page = BufferGetPage(buffer);
          uint8        info = XLOG_HEAP_INSERT;

+         xlrec.all_visible_cleared = all_visible_cleared;
          xlrec.target.node = relation->rd_node;
          xlrec.target.tid = heaptup->t_self;
          rdata[0].data = (char *) &xlrec;
***************
*** 1994,1999 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 2016,2026 ----

      UnlockReleaseBuffer(buffer);

+     /* Clear the bit in the visibility map if necessary */
+     if (all_visible_cleared)
+         visibilitymap_clear(relation,
+                             ItemPointerGetBlockNumber(&(heaptup->t_self)));
+
      /*
       * If tuple is cachable, mark it for invalidation from the caches in case
       * we abort.  Note it is OK to do this after releasing the buffer, because
***************
*** 2070,2075 **** heap_delete(Relation relation, ItemPointer tid,
--- 2097,2103 ----
      Buffer        buffer;
      bool        have_tuple_lock = false;
      bool        iscombo;
+     bool        all_visible_cleared = false;

      Assert(ItemPointerIsValid(tid));

***************
*** 2216,2221 **** l1:
--- 2244,2255 ----
       */
      PageSetPrunable(page, xid);

+     if (PageIsAllVisible(page))
+     {
+         all_visible_cleared = true;
+         PageClearAllVisible(page);
+     }
+
      /* store transaction information of xact deleting the tuple */
      tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
                                 HEAP_XMAX_INVALID |
***************
*** 2237,2242 **** l1:
--- 2271,2277 ----
          XLogRecPtr    recptr;
          XLogRecData rdata[2];

+         xlrec.all_visible_cleared = all_visible_cleared;
          xlrec.target.node = relation->rd_node;
          xlrec.target.tid = tp.t_self;
          rdata[0].data = (char *) &xlrec;
***************
*** 2281,2286 **** l1:
--- 2316,2325 ----
       */
      CacheInvalidateHeapTuple(relation, &tp);

+     /* Clear the bit in the visibility map if necessary */
+     if (all_visible_cleared)
+         visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+
      /* Now we can release the buffer */
      ReleaseBuffer(buffer);

***************
*** 2388,2393 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
--- 2427,2434 ----
      bool        have_tuple_lock = false;
      bool        iscombo;
      bool        use_hot_update = false;
+     bool        all_visible_cleared = false;
+     bool        all_visible_cleared_new = false;

      Assert(ItemPointerIsValid(otid));

***************
*** 2763,2768 **** l2:
--- 2804,2815 ----
          MarkBufferDirty(newbuf);
      MarkBufferDirty(buffer);

+     /*
+      * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL
+      * record, because log_heap_update looks at those flags to set the
+      * corresponding flags in the WAL record.
+      */
+
      /* XLOG stuff */
      if (!relation->rd_istemp)
      {
***************
*** 2778,2783 **** l2:
--- 2825,2842 ----
          PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
      }

+     /* Clear PD_ALL_VISIBLE flags */
+     if (PageIsAllVisible(BufferGetPage(buffer)))
+     {
+         all_visible_cleared = true;
+         PageClearAllVisible(BufferGetPage(buffer));
+     }
+     if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
+     {
+         all_visible_cleared_new = true;
+         PageClearAllVisible(BufferGetPage(newbuf));
+     }
+
      END_CRIT_SECTION();

      if (newbuf != buffer)
***************
*** 2791,2796 **** l2:
--- 2850,2861 ----
       */
      CacheInvalidateHeapTuple(relation, &oldtup);

+     /* Clear bits in visibility map */
+     if (all_visible_cleared)
+         visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+     if (all_visible_cleared_new)
+         visibilitymap_clear(relation, BufferGetBlockNumber(newbuf));
+
      /* Now we can release the buffer(s) */
      if (newbuf != buffer)
          ReleaseBuffer(newbuf);
***************
*** 3412,3417 **** l3:
--- 3477,3487 ----
      LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);

      /*
+      * Don't update the visibility map here. Locking a tuple doesn't
+      * change visibility info.
+      */
+
+     /*
       * Now that we have successfully marked the tuple as locked, we can
       * release the lmgr tuple lock, if we had it.
       */
***************
*** 3916,3922 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
--- 3986,3994 ----

      xlrec.target.node = reln->rd_node;
      xlrec.target.tid = from;
+     xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf));
      xlrec.newtid = newtup->t_self;
+     xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf));

      rdata[0].data = (char *) &xlrec;
      rdata[0].len = SizeOfHeapUpdate;
***************
*** 4185,4197 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
      OffsetNumber offnum;
      ItemId        lp = NULL;
      HeapTupleHeader htup;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node,
!                             ItemPointerGetBlockNumber(&(xlrec->target.tid)),
!                             false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
--- 4257,4281 ----
      OffsetNumber offnum;
      ItemId        lp = NULL;
      HeapTupleHeader htup;
+     BlockNumber    blkno;
+
+     blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+     /*
+      * The visibility map always needs to be updated, even if the heap page
+      * is already up-to-date.
+      */
+     if (xlrec->all_visible_cleared)
+     {
+         Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+         visibilitymap_clear(reln, blkno);
+         FreeFakeRelcacheEntry(reln);
+     }

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

!     buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
      if (!BufferIsValid(buffer))
          return;
      page = (Page) BufferGetPage(buffer);
***************
*** 4223,4228 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
--- 4307,4315 ----
      /* Mark the page as a candidate for pruning */
      PageSetPrunable(page, record->xl_xid);

+     if (xlrec->all_visible_cleared)
+         PageClearAllVisible(page);
+
      /* Make sure there is no forward chain link in t_ctid */
      htup->t_ctid = xlrec->target.tid;
      PageSetLSN(page, lsn);
***************
*** 4249,4259 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
      Size        freespace;
      BlockNumber    blkno;

      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

-     blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
-
      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
          buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
--- 4336,4357 ----
      Size        freespace;
      BlockNumber    blkno;

+     blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+     /*
+      * The visibility map always needs to be updated, even if the heap page
+      * is already up-to-date.
+      */
+     if (xlrec->all_visible_cleared)
+     {
+         Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+         visibilitymap_clear(reln, blkno);
+         FreeFakeRelcacheEntry(reln);
+     }
+
      if (record->xl_info & XLR_BKP_BLOCK_1)
          return;

      if (record->xl_info & XLOG_HEAP_INIT_PAGE)
      {
          buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
***************
*** 4307,4312 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4405,4414 ----

      PageSetLSN(page, lsn);
      PageSetTLI(page, ThisTimeLineID);
+
+     if (xlrec->all_visible_cleared)
+         PageClearAllVisible(page);
+
      MarkBufferDirty(buffer);
      UnlockReleaseBuffer(buffer);

***************
*** 4347,4352 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4449,4466 ----
      uint32        newlen;
      Size        freespace;

+     /*
+      * The visibility map always needs to be updated, even if the heap page
+      * is already up-to-date.
+      */
+     if (xlrec->all_visible_cleared)
+     {
+         Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+         visibilitymap_clear(reln,
+                             ItemPointerGetBlockNumber(&xlrec->target.tid));
+         FreeFakeRelcacheEntry(reln);
+     }
+
      if (record->xl_info & XLR_BKP_BLOCK_1)
      {
          if (samepage)
***************
*** 4411,4416 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4525,4533 ----
      /* Mark the page as a candidate for pruning */
      PageSetPrunable(page, record->xl_xid);

+     if (xlrec->all_visible_cleared)
+         PageClearAllVisible(page);
+
      /*
       * this test is ugly, but necessary to avoid thinking that insert change
       * is already applied
***************
*** 4426,4431 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4543,4559 ----

  newt:;

+     /*
+      * The visibility map always needs to be updated, even if the heap page
+      * is already up-to-date.
+      */
+     if (xlrec->new_all_visible_cleared)
+     {
+         Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+         visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid));
+         FreeFakeRelcacheEntry(reln);
+     }
+
      if (record->xl_info & XLR_BKP_BLOCK_2)
          return;

***************
*** 4504,4509 **** newsame:;
--- 4632,4640 ----
      if (offnum == InvalidOffsetNumber)
          elog(PANIC, "heap_update_redo: failed to add tuple");

+     if (xlrec->new_all_visible_cleared)
+         PageClearAllVisible(page);
+
      freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */

      PageSetLSN(page, lsn);
*** /dev/null
--- src/backend/access/heap/visibilitymap.c
***************
*** 0 ****
--- 1,478 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.c
+  *      bitmap for tracking visibility of heap tuples
+  *
+  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *      $PostgreSQL$
+  *
+  * INTERFACE ROUTINES
+  *        visibilitymap_clear    - clear a bit in the visibility map
+  *        visibilitymap_pin    - pin a map page for setting a bit
+  *        visibilitymap_set    - set a bit in a previously pinned page
+  *        visibilitymap_test    - test if a bit is set
+  *
+  * NOTES
+  *
+  * The visibility map is a bitmap with one bit per heap page. A set bit means
+  * that all tuples on the page are visible to all transactions, and doesn't
+  * therefore need to be vacuumed. The map is conservative in the sense that we
+  * make sure that whenever a bit is set, we know the condition is true, but if
+  * a bit is not set, it might or might not be.
+  *
+  * There's no explicit WAL logging in the functions in this file. The callers
+  * must make sure that whenever a bit is cleared, the bit is cleared on WAL
+  * replay of the updating operation as well. Setting bits during recovery
+  * isn't necessary for correctness.
+  *
+  * Currently, the visibility map is only used as a hint, to speed up VACUUM.
+  * A corrupted visibility map won't cause data corruption, although it can
+  * make VACUUM skip pages that need vacuuming, until the next anti-wraparound
+  * vacuum. The visibility map is not used for anti-wraparound vacuums, because
+  * an anti-wraparound vacuum needs to freeze tuples and observe the latest xid
+  * present in the table, also on pages that don't have any dead tuples.
+  *
+  * Although the visibility map is just a hint at the moment, the PD_ALL_VISIBLE
+  * flag on heap pages *must* be correct.
+  *
+  * LOCKING
+  *
+  * In heapam.c, whenever a page is modified so that not all tuples on the
+  * page are visible to everyone anymore, the corresponding bit in the
+  * visibility map is cleared. The bit in the visibility map is cleared
+  * after releasing the lock on the heap page, to avoid holding the lock
+  * over possible I/O to read in the visibility map page.
+  *
+  * To set a bit, you need to hold a lock on the heap page. That prevents
+  * the race condition where VACUUM sees that all tuples on the page are
+  * visible to everyone, but another backend modifies the page before VACUUM
+  * sets the bit in the visibility map.
+  *
+  * When a bit is set, the LSN of the visibility map page is updated to make
+  * sure that the visibility map update doesn't get written to disk before the
+  * WAL record of the changes that made it possible to set the bit is flushed.
+  * But when a bit is cleared, we don't have to do that because it's always OK
+  * to clear a bit in the map from correctness point of view.
+  *
+  * TODO
+  *
+  * It would be nice to use the visibility map to skip visibility checkes in
+  * index scans.
+  *
+  * Currently, the visibility map is not 100% correct all the time.
+  * During updates, the bit in the visibility map is cleared after releasing
+  * the lock on the heap page. During the window after releasing the lock
+  * and clearing the bit in the visibility map, the bit in the visibility map
+  * is set, but the new insertion or deletion is not yet visible to other
+  * backends.
+  *
+  * That might actually be OK for the index scans, though. The newly inserted
+  * tuple wouldn't have an index pointer yet, so all tuples reachable from an
+  * index would still be visible to all other backends, and deletions wouldn't
+  * be visible to other backends yet.
+  *
+  * There's another hole in the way the PD_ALL_VISIBLE flag is set. When
+  * vacuum observes that all tuples are visible to all, it sets the flag on
+  * the heap page, and also sets the bit in the visibility map. If we then
+  * crash, and only the visibility map page was flushed to disk, we'll have
+  * a bit set in the visibility map, but the corresponding flag on the heap
+  * page is not set. If the heap page is then updated, the updater won't
+  * know to clear the bit in the visibility map.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+
+ #include "access/visibilitymap.h"
+ #include "storage/bufmgr.h"
+ #include "storage/bufpage.h"
+ #include "storage/lmgr.h"
+ #include "storage/smgr.h"
+ #include "utils/inval.h"
+
+ /*#define TRACE_VISIBILITYMAP */
+
+ /*
+  * Size of the bitmap on each visibility map page, in bytes. There's no
+  * extra headers, so the whole page minus except for the standard page header
+  * is used for the bitmap.
+  */
+ #define MAPSIZE (BLCKSZ - SizeOfPageHeaderData)
+
+ /* Number of bits allocated for each heap block. */
+ #define BITS_PER_HEAPBLOCK 1
+
+ /* Number of heap blocks we can represent in one byte. */
+ #define HEAPBLOCKS_PER_BYTE 8
+
+ /* Number of heap blocks we can represent in one visibility map page. */
+ #define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
+
+ /* Mapping from heap block number to the right bit in the visibility map */
+ #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+ #define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
+ #define HEAPBLK_TO_MAPBIT(x) ((x) % HEAPBLOCKS_PER_BYTE)
+
+ /* prototypes for internal routines */
+ static Buffer vm_readbuf(Relation rel, BlockNumber blkno, bool extend);
+ static void vm_extend(Relation rel, BlockNumber nvmblocks);
+
+
+ /*
+  *    visibilitymap_clear - clear a bit in visibility map
+  *
+  * Clear a bit in the visibility map, marking that not all tuples are
+  * visible to all transactions anymore.
+  */
+ void
+ visibilitymap_clear(Relation rel, BlockNumber heapBlk)
+ {
+     BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+     int            mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+     int            mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+     uint8        mask = 1 << mapBit;
+     Buffer        mapBuffer;
+     char       *map;
+
+ #ifdef TRACE_VISIBILITYMAP
+     elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+
+     mapBuffer = vm_readbuf(rel, mapBlock, false);
+     if (!BufferIsValid(mapBuffer))
+         return; /* nothing to do */
+
+     LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+     map = PageGetContents(BufferGetPage(mapBuffer));
+
+     if (map[mapByte] & mask)
+     {
+         map[mapByte] &= ~mask;
+
+         MarkBufferDirty(mapBuffer);
+     }
+
+     UnlockReleaseBuffer(mapBuffer);
+ }
+
+ /*
+  *    visibilitymap_pin - pin a map page for setting a bit
+  *
+  * Setting a bit in the visibility map is a two-phase operation. First, call
+  * visibilitymap_pin, to pin the visibility map page containing the bit for
+  * the heap page. Because that can require I/O to read the map page, you
+  * shouldn't hold a lock on the heap page while doing that. Then, call
+  * visibilitymap_set to actually set the bit.
+  *
+  * On entry, *buf should be InvalidBuffer or a valid buffer returned by
+  * an earlier call to visibilitymap_pin or visibilitymap_test on the same
+  * relation. On return, *buf is a valid buffer with the map page containing
+  * the the bit for heapBlk.
+  *
+  * If the page doesn't exist in the map file yet, it is extended.
+  */
+ void
+ visibilitymap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+     BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+
+     /* Reuse the old pinned buffer if possible */
+     if (BufferIsValid(*buf))
+     {
+         if (BufferGetBlockNumber(*buf) == mapBlock)
+             return;
+
+         ReleaseBuffer(*buf);
+     }
+     *buf = vm_readbuf(rel, mapBlock, true);
+ }
+
+ /*
+  *    visibilitymap_set - set a bit on a previously pinned page
+  *
+  * recptr is the LSN of the heap page. The LSN of the visibility map page is
+  * advanced to that, to make sure that the visibility map doesn't get flushed
+  * to disk before the update to the heap page that made all tuples visible.
+  *
+  * This is an opportunistic function. It does nothing, unless *buf
+  * contains the bit for heapBlk. Call visibilitymap_pin first to pin
+  * the right map page. This function doesn't do any I/O.
+  */
+ void
+ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
+                   Buffer *buf)
+ {
+     BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+     uint32        mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+     uint8        mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+     Page        page;
+     char       *map;
+
+ #ifdef TRACE_VISIBILITYMAP
+     elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+
+     /* Check that we have the right page pinned */
+     if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != mapBlock)
+         return;
+
+     page = BufferGetPage(*buf);
+     map = PageGetContents(page);
+     LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+
+     if (!(map[mapByte] & (1 << mapBit)))
+     {
+         map[mapByte] |= (1 << mapBit);
+
+         if (XLByteLT(PageGetLSN(page), recptr))
+             PageSetLSN(page, recptr);
+         PageSetTLI(page, ThisTimeLineID);
+         MarkBufferDirty(*buf);
+     }
+
+     LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+ }
+
+ /*
+  *    visibilitymap_test - test if a bit is set
+  *
+  * Are all tuples on heapBlk visible to all, according to the visibility map?
+  *
+  * On entry, *buf should be InvalidBuffer or a valid buffer returned by an
+  * earlier call to visibilitymap_pin or visibilitymap_test on the same
+  * relation. On return, *buf is a valid buffer with the map page containing
+  * the the bit for heapBlk, or InvalidBuffer. The caller is responsible for
+  * releasing *buf after it's done testing and setting bits.
+  */
+ bool
+ visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+     BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+     uint32        mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+     uint8        mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+     bool        result;
+     char       *map;
+
+ #ifdef TRACE_VISIBILITYMAP
+     elog(DEBUG1, "vm_test %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+
+     /* Reuse the old pinned buffer if possible */
+     if (BufferIsValid(*buf))
+     {
+         if (BufferGetBlockNumber(*buf) != mapBlock)
+         {
+             ReleaseBuffer(*buf);
+             *buf = InvalidBuffer;
+         }
+     }
+
+     if (!BufferIsValid(*buf))
+     {
+         *buf = vm_readbuf(rel, mapBlock, false);
+         if (!BufferIsValid(*buf))
+             return false;
+     }
+
+     map = PageGetContents(BufferGetPage(*buf));
+
+     /*
+      * We don't need to lock the page, as we're only looking at a single bit.
+      */
+     result = (map[mapByte] & (1 << mapBit)) ? true : false;
+
+     return result;
+ }
+
+ /*
+  *    visibilitymap_test - truncate the visibility map
+  */
+ void
+ visibilitymap_truncate(Relation rel, BlockNumber nheapblocks)
+ {
+     BlockNumber newnblocks;
+     /* last remaining block, byte, and bit */
+     BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+     uint32        truncByte  = HEAPBLK_TO_MAPBYTE(nheapblocks);
+     uint8        truncBit   = HEAPBLK_TO_MAPBIT(nheapblocks);
+
+ #ifdef TRACE_VISIBILITYMAP
+     elog(DEBUG1, "vm_truncate %s %d", RelationGetRelationName(rel), nheapblocks);
+ #endif
+
+     /*
+      * If no visibility map has been created yet for this relation, there's
+      * nothing to truncate.
+      */
+     if (!smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+         return;
+
+     /*
+      * Unless the new size is exactly at a visibility map page boundary, the
+      * tail bits in the last remaining map page, representing truncated heap
+      * blocks, need to be cleared. This is not only tidy, but also necessary
+      * because we don't get a chance to clear the bits if the heap is
+      * extended again.
+      */
+     if (truncByte != 0 || truncBit != 0)
+     {
+         Buffer mapBuffer;
+         Page page;
+         char *map;
+
+         newnblocks = truncBlock + 1;
+
+         mapBuffer = vm_readbuf(rel, truncBlock, false);
+         if (!BufferIsValid(mapBuffer))
+         {
+             /* nothing to do, the file was already smaller */
+             return;
+         }
+
+         page = BufferGetPage(mapBuffer);
+         map = PageGetContents(page);
+
+         LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+
+         /* Clear out the unwanted bytes. */
+         MemSet(&map[truncByte + 1], 0, MAPSIZE - (truncByte + 1));
+
+         /*
+          * Mask out the unwanted bits of the last remaining byte.
+          *
+          * ((1 << 0) - 1) = 00000000
+          * ((1 << 1) - 1) = 00000001
+          * ...
+          * ((1 << 6) - 1) = 00111111
+          * ((1 << 7) - 1) = 01111111
+          */
+         map[truncByte] &= (1 << truncBit) - 1;
+
+         MarkBufferDirty(mapBuffer);
+         UnlockReleaseBuffer(mapBuffer);
+     }
+     else
+         newnblocks = truncBlock;
+
+     if (smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM) < newnblocks)
+     {
+         /* nothing to do, the file was already smaller than requested size */
+         return;
+     }
+
+     smgrtruncate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, newnblocks,
+                  rel->rd_istemp);
+
+     /*
+      * Need to invalidate the relcache entry, because rd_vm_nblocks
+      * seen by other backends is no longer valid.
+      */
+     if (!InRecovery)
+         CacheInvalidateRelcache(rel);
+
+     rel->rd_vm_nblocks = newnblocks;
+ }
+
+ /*
+  * Read a visibility map page.
+  *
+  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
+  * true, the visibility map file is extended.
+  */
+ static Buffer
+ vm_readbuf(Relation rel, BlockNumber blkno, bool extend)
+ {
+     Buffer buf;
+
+     RelationOpenSmgr(rel);
+
+     /*
+      * The current size of the visibility map fork is kept in relcache, to
+      * avoid reading beyond EOF. If we haven't cached the size of the map yet,
+      * do that first.
+      */
+     if (rel->rd_vm_nblocks == InvalidBlockNumber)
+     {
+         if (smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+             rel->rd_vm_nblocks = smgrnblocks(rel->rd_smgr,
+                                              VISIBILITYMAP_FORKNUM);
+         else
+             rel->rd_vm_nblocks = 0;
+     }
+
+     /* Handle requests beyond EOF */
+     if (blkno >= rel->rd_vm_nblocks)
+     {
+         if (extend)
+             vm_extend(rel, blkno + 1);
+         else
+             return InvalidBuffer;
+     }
+
+     /*
+      * Use ZERO_ON_ERROR mode, and initialize the page if necessary. It's
+      * always safe to clear bits, so it's better to clear corrupt pages than
+      * error out.
+      */
+     buf = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, blkno,
+                              RBM_ZERO_ON_ERROR, NULL);
+     if (PageIsNew(BufferGetPage(buf)))
+         PageInit(BufferGetPage(buf), BLCKSZ, 0);
+     return buf;
+ }
+
+ /*
+  * Ensure that the visibility map fork is at least vm_nblocks long, extending
+  * it if necessary with zeroed pages.
+  */
+ static void
+ vm_extend(Relation rel, BlockNumber vm_nblocks)
+ {
+     BlockNumber vm_nblocks_now;
+     Page pg;
+
+     pg = (Page) palloc(BLCKSZ);
+     PageInit(pg, BLCKSZ, 0);
+
+     /*
+      * We use the relation extension lock to lock out other backends trying
+      * to extend the visibility map at the same time. It also locks out
+      * extension of the main fork, unnecessarily, but extending the
+      * visibility map happens seldom enough that it doesn't seem worthwhile to
+      * have a separate lock tag type for it.
+      *
+      * Note that another backend might have extended or created the
+      * relation before we get the lock.
+      */
+     LockRelationForExtension(rel, ExclusiveLock);
+
+     /* Create the file first if it doesn't exist */
+     if ((rel->rd_vm_nblocks == 0 || rel->rd_vm_nblocks == InvalidBlockNumber)
+         && !smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+     {
+         smgrcreate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, false);
+         vm_nblocks_now = 0;
+     }
+     else
+         vm_nblocks_now = smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+
+     while (vm_nblocks_now < vm_nblocks)
+     {
+         smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
+                    (char *) pg, rel->rd_istemp);
+         vm_nblocks_now++;
+     }
+
+     UnlockRelationForExtension(rel, ExclusiveLock);
+
+     pfree(pg);
+
+     /* Update the relcache with the up-to-date size */
+     if (!InRecovery)
+         CacheInvalidateRelcache(rel);
+     rel->rd_vm_nblocks = vm_nblocks_now;
+ }
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 377,382 **** CreateFakeRelcacheEntry(RelFileNode rnode)
--- 377,383 ----

      rel->rd_targblock = InvalidBlockNumber;
      rel->rd_fsm_nblocks = InvalidBlockNumber;
+     rel->rd_vm_nblocks = InvalidBlockNumber;
      rel->rd_smgr = NULL;

      return rel;
*** src/backend/catalog/catalog.c
--- src/backend/catalog/catalog.c
***************
*** 54,60 ****
   */
  const char *forkNames[] = {
      "main", /* MAIN_FORKNUM */
!     "fsm"   /* FSM_FORKNUM */
  };

  /*
--- 54,61 ----
   */
  const char *forkNames[] = {
      "main", /* MAIN_FORKNUM */
!     "fsm",   /* FSM_FORKNUM */
!     "vm"   /* VISIBILITYMAP_FORKNUM */
  };

  /*
*** src/backend/catalog/storage.c
--- src/backend/catalog/storage.c
***************
*** 19,24 ****
--- 19,25 ----

  #include "postgres.h"

+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 175,180 **** void
--- 176,182 ----
  RelationTruncate(Relation rel, BlockNumber nblocks)
  {
      bool fsm;
+     bool vm;

      /* Open it at the smgr level if not already done */
      RelationOpenSmgr(rel);
***************
*** 187,192 **** RelationTruncate(Relation rel, BlockNumber nblocks)
--- 189,199 ----
      if (fsm)
          FreeSpaceMapTruncateRel(rel, nblocks);

+     /* Truncate the visibility map too if it exists. */
+     vm = smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+     if (vm)
+         visibilitymap_truncate(rel, nblocks);
+
      /*
       * We WAL-log the truncation before actually truncating, which
       * means trouble if the truncation fails. If we then crash, the WAL
***************
*** 217,228 **** RelationTruncate(Relation rel, BlockNumber nblocks)

          /*
           * Flush, because otherwise the truncation of the main relation
!          * might hit the disk before the WAL record of truncating the
!          * FSM is flushed. If we crashed during that window, we'd be
!          * left with a truncated heap, but the FSM would still contain
!          * entries for the non-existent heap pages.
           */
!         if (fsm)
              XLogFlush(lsn);
      }

--- 224,235 ----

          /*
           * Flush, because otherwise the truncation of the main relation
!          * might hit the disk before the WAL record, and the truncation of
!          * the FSM or visibility map. If we crashed during that window, we'd
!          * be left with a truncated heap, but the FSM or visibility map would
!          * still contain entries for the non-existent heap pages.
           */
!         if (fsm || vm)
              XLogFlush(lsn);
      }

*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 26,31 ****
--- 26,32 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
  #include "catalog/namespace.h"
***************
*** 2902,2907 **** move_chain_tuple(Relation rel,
--- 2903,2914 ----
      Size        tuple_len = old_tup->t_len;

      /*
+      * Clear the bits in the visibility map.
+      */
+     visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+     visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+
+     /*
       * make a modifiable copy of the source tuple.
       */
      heap_copytuple_with_tuple(old_tup, &newtup);
***************
*** 3005,3010 **** move_chain_tuple(Relation rel,
--- 3012,3021 ----

      END_CRIT_SECTION();

+     PageClearAllVisible(BufferGetPage(old_buf));
+     if (dst_buf != old_buf)
+         PageClearAllVisible(BufferGetPage(dst_buf));
+
      LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
      if (dst_buf != old_buf)
          LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
***************
*** 3107,3112 **** move_plain_tuple(Relation rel,
--- 3118,3140 ----

      END_CRIT_SECTION();

+     /*
+      * Clear the visible-to-all hint bits on the page, and bits in the
+      * visibility map. Normally we'd release the locks on the heap pages
+      * before updating the visibility map, but doesn't really matter here
+      * because we're holding an AccessExclusiveLock on the relation anyway.
+      */
+     if (PageIsAllVisible(dst_page))
+     {
+         PageClearAllVisible(dst_page);
+         visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+     }
+     if (PageIsAllVisible(old_page))
+     {
+         PageClearAllVisible(old_page);
+         visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+     }
+
      dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
      LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
      LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 40,45 ****
--- 40,46 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "catalog/storage.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
***************
*** 88,93 **** typedef struct LVRelStats
--- 89,95 ----
      int            max_dead_tuples;    /* # slots allocated in array */
      ItemPointer dead_tuples;    /* array of ItemPointerData */
      int            num_index_scans;
+     bool        scanned_all;    /* have we scanned all pages (this far)? */
  } LVRelStats;


***************
*** 102,108 **** static BufferAccessStrategy vac_strategy;

  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
!                Relation *Irel, int nindexes);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
                    IndexBulkDeleteResult **stats,
--- 104,110 ----

  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
!                Relation *Irel, int nindexes, bool scan_all);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
                    IndexBulkDeleteResult **stats,
***************
*** 141,146 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
--- 143,149 ----
      BlockNumber possibly_freeable;
      PGRUsage    ru0;
      TimestampTz starttime = 0;
+     bool        scan_all;

      pg_rusage_init(&ru0);

***************
*** 161,173 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
      vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));

      vacrelstats->num_index_scans = 0;

      /* Open all indexes of the relation */
      vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
      vacrelstats->hasindex = (nindexes > 0);

      /* Do the vacuuming */
!     lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);

      /* Done with indexes */
      vac_close_indexes(nindexes, Irel, NoLock);
--- 164,183 ----
      vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));

      vacrelstats->num_index_scans = 0;
+     vacrelstats->scanned_all = true;

      /* Open all indexes of the relation */
      vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
      vacrelstats->hasindex = (nindexes > 0);

+     /* Should we use the visibility map or scan all pages? */
+     if (vacstmt->freeze_min_age != -1)
+         scan_all = true;
+     else
+         scan_all = false;
+
      /* Do the vacuuming */
!     lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);

      /* Done with indexes */
      vac_close_indexes(nindexes, Irel, NoLock);
***************
*** 186,195 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
      /* Vacuum the Free Space Map */
      FreeSpaceMapVacuum(onerel);

!     /* Update statistics in pg_class */
      vac_update_relstats(onerel,
                          vacrelstats->rel_pages, vacrelstats->rel_tuples,
!                         vacrelstats->hasindex, FreezeLimit);

      /* report results to the stats collector, too */
      pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
--- 196,209 ----
      /* Vacuum the Free Space Map */
      FreeSpaceMapVacuum(onerel);

!     /*
!      * Update statistics in pg_class. We can only advance relfrozenxid if we
!      * didn't skip any pages.
!      */
      vac_update_relstats(onerel,
                          vacrelstats->rel_pages, vacrelstats->rel_tuples,
!                         vacrelstats->hasindex,
!                         vacrelstats->scanned_all ? FreezeLimit : InvalidOid);

      /* report results to the stats collector, too */
      pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
***************
*** 230,242 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
!                Relation *Irel, int nindexes)
  {
      BlockNumber nblocks,
                  blkno;
      HeapTupleData tuple;
      char       *relname;
      BlockNumber empty_pages,
                  vacuumed_pages;
      double        num_tuples,
                  tups_vacuumed,
--- 244,257 ----
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
!                Relation *Irel, int nindexes, bool scan_all)
  {
      BlockNumber nblocks,
                  blkno;
      HeapTupleData tuple;
      char       *relname;
      BlockNumber empty_pages,
+                 scanned_pages,
                  vacuumed_pages;
      double        num_tuples,
                  tups_vacuumed,
***************
*** 245,250 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 260,266 ----
      IndexBulkDeleteResult **indstats;
      int            i;
      PGRUsage    ru0;
+     Buffer        vmbuffer = InvalidBuffer;

      pg_rusage_init(&ru0);

***************
*** 254,260 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                      get_namespace_name(RelationGetNamespace(onerel)),
                      relname)));

!     empty_pages = vacuumed_pages = 0;
      num_tuples = tups_vacuumed = nkeep = nunused = 0;

      indstats = (IndexBulkDeleteResult **)
--- 270,276 ----
                      get_namespace_name(RelationGetNamespace(onerel)),
                      relname)));

!     empty_pages = vacuumed_pages = scanned_pages = 0;
      num_tuples = tups_vacuumed = nkeep = nunused = 0;

      indstats = (IndexBulkDeleteResult **)
***************
*** 278,286 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 294,321 ----
          OffsetNumber frozen[MaxOffsetNumber];
          int            nfrozen;
          Size        freespace;
+         bool        all_visible_according_to_vm = false;
+         bool        all_visible;
+
+         /*
+          * Skip pages that don't require vacuuming according to the
+          * visibility map.
+          */
+         if (!scan_all)
+         {
+             all_visible_according_to_vm =
+                 visibilitymap_test(onerel, blkno, &vmbuffer);
+             if (all_visible_according_to_vm)
+             {
+                 vacrelstats->scanned_all = false;
+                 continue;
+             }
+         }

          vacuum_delay_point();

+         scanned_pages++;
+
          /*
           * If we are close to overrunning the available space for dead-tuple
           * TIDs, pause and do a cycle of vacuuming before we tackle this page.
***************
*** 354,360 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
          {
              empty_pages++;
              freespace = PageGetHeapFreeSpace(page);
!             UnlockReleaseBuffer(buf);
              RecordPageWithFreeSpace(onerel, blkno, freespace);
              continue;
          }
--- 389,414 ----
          {
              empty_pages++;
              freespace = PageGetHeapFreeSpace(page);
!
!             if (!PageIsAllVisible(page))
!             {
!                 SetBufferCommitInfoNeedsSave(buf);
!                 PageSetAllVisible(page);
!             }
!
!             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
!
!             /* Update the visibility map */
!             if (!all_visible_according_to_vm)
!             {
!                 visibilitymap_pin(onerel, blkno, &vmbuffer);
!                 LockBuffer(buf, BUFFER_LOCK_SHARE);
!                 if (PageIsAllVisible(page))
!                     visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
!                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
!             }
!
!             ReleaseBuffer(buf);
              RecordPageWithFreeSpace(onerel, blkno, freespace);
              continue;
          }
***************
*** 371,376 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 425,431 ----
           * Now scan the page to collect vacuumable items and check for tuples
           * requiring freezing.
           */
+         all_visible = true;
          nfrozen = 0;
          hastup = false;
          prev_dead_count = vacrelstats->num_dead_tuples;
***************
*** 408,413 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 463,469 ----
              if (ItemIdIsDead(itemid))
              {
                  lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+                 all_visible = false;
                  continue;
              }

***************
*** 442,447 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 498,504 ----
                          nkeep += 1;
                      else
                          tupgone = true; /* we can delete the tuple */
+                     all_visible = false;
                      break;
                  case HEAPTUPLE_LIVE:
                      /* Tuple is good --- but let's do some validity checks */
***************
*** 449,454 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 506,540 ----
                          !OidIsValid(HeapTupleGetOid(&tuple)))
                          elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
                               relname, blkno, offnum);
+
+                     /*
+                      * Is the tuple definitely visible to all transactions?
+                      *
+                      * NB: Like with per-tuple hint bits, we can't set the
+                      * flag if the inserter committed asynchronously. See
+                      * SetHintBits for more info. Check that the
+                      * HEAP_XMIN_COMMITTED hint bit is set because of that.
+                      */
+                     if (all_visible)
+                     {
+                         TransactionId xmin;
+
+                         if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+                         {
+                             all_visible = false;
+                             break;
+                         }
+                         /*
+                          * The inserter definitely committed. But is it
+                          * old enough that everyone sees it as committed?
+                          */
+                         xmin = HeapTupleHeaderGetXmin(tuple.t_data);
+                         if (!TransactionIdPrecedes(xmin, OldestXmin))
+                         {
+                             all_visible = false;
+                             break;
+                         }
+                     }
                      break;
                  case HEAPTUPLE_RECENTLY_DEAD:

***************
*** 457,468 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 543,557 ----
                       * from relation.
                       */
                      nkeep += 1;
+                     all_visible = false;
                      break;
                  case HEAPTUPLE_INSERT_IN_PROGRESS:
                      /* This is an expected case during concurrent vacuum */
+                     all_visible = false;
                      break;
                  case HEAPTUPLE_DELETE_IN_PROGRESS:
                      /* This is an expected case during concurrent vacuum */
+                     all_visible = false;
                      break;
                  default:
                      elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
***************
*** 525,536 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,

          freespace = PageGetHeapFreeSpace(page);

          /* Remember the location of the last page with nonremovable tuples */
          if (hastup)
              vacrelstats->nonempty_pages = blkno + 1;

-         UnlockReleaseBuffer(buf);
-
          /*
           * If we remembered any tuples for deletion, then the page will be
           * visited again by lazy_vacuum_heap, which will compute and record
--- 614,656 ----

          freespace = PageGetHeapFreeSpace(page);

+         /* Update the all-visible flag on the page */
+         if (!PageIsAllVisible(page) && all_visible)
+         {
+             SetBufferCommitInfoNeedsSave(buf);
+             PageSetAllVisible(page);
+         }
+         else if (PageIsAllVisible(page) && !all_visible)
+         {
+             elog(WARNING, "PD_ALL_VISIBLE flag was incorrectly set");
+             SetBufferCommitInfoNeedsSave(buf);
+             PageClearAllVisible(page);
+
+             /*
+              * XXX: Normally, we would drop the lock on the heap page before
+              * updating the visibility map.
+              */
+             visibilitymap_clear(onerel, blkno);
+         }
+
+         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+         /* Update the visibility map */
+         if (!all_visible_according_to_vm && all_visible)
+         {
+             visibilitymap_pin(onerel, blkno, &vmbuffer);
+             LockBuffer(buf, BUFFER_LOCK_SHARE);
+             if (PageIsAllVisible(page))
+                 visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
+             LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+         }
+
+         ReleaseBuffer(buf);
+
          /* Remember the location of the last page with nonremovable tuples */
          if (hastup)
              vacrelstats->nonempty_pages = blkno + 1;

          /*
           * If we remembered any tuples for deletion, then the page will be
           * visited again by lazy_vacuum_heap, which will compute and record
***************
*** 560,565 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 680,692 ----
          vacrelstats->num_index_scans++;
      }

+     /* Release the pin on the visibility map page */
+     if (BufferIsValid(vmbuffer))
+     {
+         ReleaseBuffer(vmbuffer);
+         vmbuffer = InvalidBuffer;
+     }
+
      /* Do post-vacuum cleanup and statistics update for each index */
      for (i = 0; i < nindexes; i++)
          lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
***************
*** 572,580 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
                          tups_vacuumed, vacuumed_pages)));

      ereport(elevel,
!             (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
                      RelationGetRelationName(onerel),
!                     tups_vacuumed, num_tuples, nblocks),
               errdetail("%.0f dead row versions cannot be removed yet.\n"
                         "There were %.0f unused item pointers.\n"
                         "%u pages are entirely empty.\n"
--- 699,707 ----
                          tups_vacuumed, vacuumed_pages)));

      ereport(elevel,
!             (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages",
                      RelationGetRelationName(onerel),
!                     tups_vacuumed, num_tuples, scanned_pages, nblocks),
               errdetail("%.0f dead row versions cannot be removed yet.\n"
                         "There were %.0f unused item pointers.\n"
                         "%u pages are entirely empty.\n"
***************
*** 623,628 **** lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
--- 750,764 ----
          LockBufferForCleanup(buf);
          tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);

+         /*
+          * Before we let the page go, prune it. The primary reason is to
+          * update the visibility map in the common special case that we just
+          * vacuumed away the last tuple on the page that wasn't visible to
+          * everyone.
+          */
+         vacrelstats->tuples_deleted +=
+             heap_page_prune(onerel, buf, OldestXmin, false, false);
+
          /* Now that we've compacted the page, record its available space */
          page = BufferGetPage(buf);
          freespace = PageGetHeapFreeSpace(page);
*** src/backend/utils/cache/relcache.c
--- src/backend/utils/cache/relcache.c
***************
*** 305,310 **** AllocateRelationDesc(Relation relation, Form_pg_class relp)
--- 305,311 ----
      MemSet(relation, 0, sizeof(RelationData));
      relation->rd_targblock = InvalidBlockNumber;
      relation->rd_fsm_nblocks = InvalidBlockNumber;
+     relation->rd_vm_nblocks = InvalidBlockNumber;

      /* make sure relation is marked as having no open file yet */
      relation->rd_smgr = NULL;
***************
*** 1377,1382 **** formrdesc(const char *relationName, Oid relationReltype,
--- 1378,1384 ----
      relation = (Relation) palloc0(sizeof(RelationData));
      relation->rd_targblock = InvalidBlockNumber;
      relation->rd_fsm_nblocks = InvalidBlockNumber;
+     relation->rd_vm_nblocks = InvalidBlockNumber;

      /* make sure relation is marked as having no open file yet */
      relation->rd_smgr = NULL;
***************
*** 1665,1673 **** RelationReloadIndexInfo(Relation relation)
      heap_freetuple(pg_class_tuple);
      /* We must recalculate physical address in case it changed */
      RelationInitPhysicalAddr(relation);
!     /* Must reset targblock and fsm_nblocks in case rel was truncated */
      relation->rd_targblock = InvalidBlockNumber;
      relation->rd_fsm_nblocks = InvalidBlockNumber;
      /* Must free any AM cached data, too */
      if (relation->rd_amcache)
          pfree(relation->rd_amcache);
--- 1667,1679 ----
      heap_freetuple(pg_class_tuple);
      /* We must recalculate physical address in case it changed */
      RelationInitPhysicalAddr(relation);
!     /*
!      * Must reset targblock, fsm_nblocks and vm_nblocks in case rel was
!      * truncated
!      */
      relation->rd_targblock = InvalidBlockNumber;
      relation->rd_fsm_nblocks = InvalidBlockNumber;
+     relation->rd_vm_nblocks = InvalidBlockNumber;
      /* Must free any AM cached data, too */
      if (relation->rd_amcache)
          pfree(relation->rd_amcache);
***************
*** 1751,1756 **** RelationClearRelation(Relation relation, bool rebuild)
--- 1757,1763 ----
      {
          relation->rd_targblock = InvalidBlockNumber;
          relation->rd_fsm_nblocks = InvalidBlockNumber;
+         relation->rd_vm_nblocks = InvalidBlockNumber;
          if (relation->rd_rel->relkind == RELKIND_INDEX)
          {
              relation->rd_isvalid = false;        /* needs to be revalidated */
***************
*** 2346,2351 **** RelationBuildLocalRelation(const char *relname,
--- 2353,2359 ----

      rel->rd_targblock = InvalidBlockNumber;
      rel->rd_fsm_nblocks = InvalidBlockNumber;
+     rel->rd_vm_nblocks = InvalidBlockNumber;

      /* make sure relation is marked as having no open file yet */
      rel->rd_smgr = NULL;
***************
*** 3603,3608 **** load_relcache_init_file(void)
--- 3611,3617 ----
          rel->rd_smgr = NULL;
          rel->rd_targblock = InvalidBlockNumber;
          rel->rd_fsm_nblocks = InvalidBlockNumber;
+         rel->rd_vm_nblocks = InvalidBlockNumber;
          if (rel->rd_isnailed)
              rel->rd_refcnt = 1;
          else
*** src/include/access/heapam.h
--- src/include/access/heapam.h
***************
*** 153,158 **** extern void heap_page_prune_execute(Buffer buffer,
--- 153,159 ----
                          OffsetNumber *nowunused, int nunused,
                          bool redirect_move);
  extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
+ extern void heap_page_update_all_visible(Buffer buffer);

  /* in heap/syncscan.c */
  extern void ss_report_location(Relation rel, BlockNumber location);
*** src/include/access/htup.h
--- src/include/access/htup.h
***************
*** 601,609 **** typedef struct xl_heaptid
  typedef struct xl_heap_delete
  {
      xl_heaptid    target;            /* deleted tuple id */
  } xl_heap_delete;

! #define SizeOfHeapDelete    (offsetof(xl_heap_delete, target) + SizeOfHeapTid)

  /*
   * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
--- 601,610 ----
  typedef struct xl_heap_delete
  {
      xl_heaptid    target;            /* deleted tuple id */
+     bool all_visible_cleared;    /* PD_ALL_VISIBLE was cleared */
  } xl_heap_delete;

! #define SizeOfHeapDelete    (offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool))

  /*
   * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
***************
*** 626,646 **** typedef struct xl_heap_header
  typedef struct xl_heap_insert
  {
      xl_heaptid    target;            /* inserted tuple id */
      /* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;

! #define SizeOfHeapInsert    (offsetof(xl_heap_insert, target) + SizeOfHeapTid)

  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
      xl_heaptid    target;            /* deleted tuple id */
      ItemPointerData newtid;        /* new inserted tuple id */
      /* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
      /* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;

! #define SizeOfHeapUpdate    (offsetof(xl_heap_update, newtid) + SizeOfIptrData)

  /*
   * This is what we need to know about vacuum page cleanup/redirect
--- 627,650 ----
  typedef struct xl_heap_insert
  {
      xl_heaptid    target;            /* inserted tuple id */
+     bool all_visible_cleared;    /* PD_ALL_VISIBLE was cleared */
      /* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;

! #define SizeOfHeapInsert    (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))

  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
      xl_heaptid    target;            /* deleted tuple id */
      ItemPointerData newtid;        /* new inserted tuple id */
+     bool all_visible_cleared;    /* PD_ALL_VISIBLE was cleared */
+     bool new_all_visible_cleared; /* same for the page of newtid */
      /* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
      /* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;

! #define SizeOfHeapUpdate    (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))

  /*
   * This is what we need to know about vacuum page cleanup/redirect
*** /dev/null
--- src/include/access/visibilitymap.h
***************
*** 0 ****
--- 1,30 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.h
+  *      visibility map interface
+  *
+  *
+  * Portions Copyright (c) 2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef VISIBILITYMAP_H
+ #define VISIBILITYMAP_H
+
+ #include "utils/rel.h"
+ #include "storage/buf.h"
+ #include "storage/itemptr.h"
+ #include "access/xlogdefs.h"
+
+ extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk);
+ extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
+                               Buffer *vmbuf);
+ extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
+                               XLogRecPtr recptr, Buffer *vmbuf);
+ extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
+ extern void visibilitymap_truncate(Relation rel, BlockNumber heapblk);
+
+ #endif   /* VISIBILITYMAP_H */
*** src/include/storage/bufpage.h
--- src/include/storage/bufpage.h
***************
*** 152,159 **** typedef PageHeaderData *PageHeader;
  #define PD_HAS_FREE_LINES    0x0001        /* are there any unused line pointers? */
  #define PD_PAGE_FULL        0x0002        /* not enough free space for new
                                           * tuple? */

! #define PD_VALID_FLAG_BITS    0x0003        /* OR of all valid pd_flags bits */

  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
--- 152,161 ----
  #define PD_HAS_FREE_LINES    0x0001        /* are there any unused line pointers? */
  #define PD_PAGE_FULL        0x0002        /* not enough free space for new
                                           * tuple? */
+ #define PD_ALL_VISIBLE        0x0004        /* all tuples on page are visible to
+                                          * everyone */

! #define PD_VALID_FLAG_BITS    0x0007        /* OR of all valid pd_flags bits */

  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
***************
*** 336,341 **** typedef PageHeaderData *PageHeader;
--- 338,350 ----
  #define PageClearFull(page) \
      (((PageHeader) (page))->pd_flags &= ~PD_PAGE_FULL)

+ #define PageIsAllVisible(page) \
+     (((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
+ #define PageSetAllVisible(page) \
+     (((PageHeader) (page))->pd_flags |= PD_ALL_VISIBLE)
+ #define PageClearAllVisible(page) \
+     (((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
+
  #define PageIsPrunable(page, oldestxmin) \
  ( \
      AssertMacro(TransactionIdIsNormal(oldestxmin)), \
*** src/include/storage/relfilenode.h
--- src/include/storage/relfilenode.h
***************
*** 24,37 **** typedef enum ForkNumber
  {
      InvalidForkNumber = -1,
      MAIN_FORKNUM = 0,
!     FSM_FORKNUM
      /*
       * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
       * forkNames array in catalog.c
       */
  } ForkNumber;

! #define MAX_FORKNUM        FSM_FORKNUM

  /*
   * RelFileNode must provide all that we need to know to physically access
--- 24,38 ----
  {
      InvalidForkNumber = -1,
      MAIN_FORKNUM = 0,
!     FSM_FORKNUM,
!     VISIBILITYMAP_FORKNUM
      /*
       * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
       * forkNames array in catalog.c
       */
  } ForkNumber;

! #define MAX_FORKNUM        VISIBILITYMAP_FORKNUM

  /*
   * RelFileNode must provide all that we need to know to physically access
*** src/include/utils/rel.h
--- src/include/utils/rel.h
***************
*** 195,202 **** typedef struct RelationData
      List       *rd_indpred;        /* index predicate tree, if any */
      void       *rd_amcache;        /* available for use by index AM */

!     /* size of the FSM, or InvalidBlockNumber if not known yet */
      BlockNumber    rd_fsm_nblocks;

      /* use "struct" here to avoid needing to include pgstat.h: */
      struct PgStat_TableStatus *pgstat_info;        /* statistics collection area */
--- 195,206 ----
      List       *rd_indpred;        /* index predicate tree, if any */
      void       *rd_amcache;        /* available for use by index AM */

!     /*
!      * sizes of the free space and visibility map forks, or InvalidBlockNumber
!      * if not known yet
!      */
      BlockNumber    rd_fsm_nblocks;
+     BlockNumber    rd_vm_nblocks;

      /* use "struct" here to avoid needing to include pgstat.h: */
      struct PgStat_TableStatus *pgstat_info;        /* statistics collection area */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Heikki Linnakangas
Дата:
Сообщение: Re: [PATCHES] GIN improvements
Следующее
От: Zdenek Kotala
Дата:
Сообщение: Re: Thread safety