Re: In-placre persistance change of a relation

Поиск
Список
Период
Сортировка
От Kyotaro Horiguchi
Тема Re: In-placre persistance change of a relation
Дата
Msg-id 20211220.152823.1386247767464518843.horikyota.ntt@gmail.com
обсуждение исходный текст
Ответ на RE: In-placre persistance change of a relation  (Jakub Wartak <Jakub.Wartak@tomtom.com>)
Ответы Re: In-placre persistance change of a relation  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
RE: In-placre persistance change of a relation  (Jakub Wartak <Jakub.Wartak@tomtom.com>)
Список pgsql-hackers
Hello, Jakub.

At Fri, 17 Dec 2021 09:10:30 +0000, Jakub Wartak <Jakub.Wartak@tomtom.com> wrote in 
> the patch didn't apply clean (it's from March; some hunks were failing), so I've fixed it and the combined git
format-patchis attached. It did conflict with the following:
 

Thanks for looking this. Also thanks for Justin for finding the silly
use-after-free bug. (Now I see the regression test fails and I'm not
sure how come I didn't find this before.)

>     b0483263dda - Add support for SET ACCESS METHOD in ALTER TABLE
>     7b565843a94 - Add call to object access hook at the end of table rewrite in ALTER TABLE
>     9ce346eabf3 - Report progress of startup operations that take a long time.
>     f10f0ae420 - Replace RelationOpenSmgr() with RelationGetSmgr().
> 
> I'm especially worried if I didn't screw up something/forgot something related to the last one (rd->rd_smgr changes),
butI'm getting "All 210 tests passed".
 

About the last one, all rel->rd_smgr acesses need to be repalced with
RelationGetSmgr(). On the other hand we can simply remove
RelationOpenSmgr() calls since the target smgrrelation is guaranteed
to be loaded by RelationGetSmgr().

The fix you made for RelationCreate/DropInitFork is correct and
changes you made would work, but I prefer that the code not being too
permissive for unknown (or unexpected) states.

> Basic demonstration of this patch (with wal_level=minimal):
>     create unlogged table t6 (id bigint, t text);
>     -- produces 110GB table, takes ~5mins
>     insert into t6 select nextval('s1'), repeat('A', 1000) from generate_series(1, 100000000);
>     alter table t6 set logged;
>         on baseline SET LOGGED takes: ~7min10s
>         on patched SET LOGGED takes: 25s
> 
> So basically one can - thanks to this patch - use his application (performing classic INSERTs/UPDATEs/DELETEs, so
withoutthe need to rewrite to use COPY) and perform literally batch upload and then just switch the tables to LOGGED. 
 

This result is significant. That operation finally requires WAL writes
but I was not sure how much gain FPIs (or bulk WAL logging) gives in
comparison to operational WALs.

> Some more intensive testing also looks good, assuming table prepared to put pressure on WAL:
>     create unlogged table t_unlogged (id bigint, t text) partition by hash (id);
>     create unlogged table t_unlogged_h0 partition of t_unlogged  FOR VALUES WITH (modulus 4, remainder 0);
>     [..]
>     create unlogged table t_unlogged_h3 partition of t_unlogged  FOR VALUES WITH (modulus 4, remainder 3);
> 
> Workload would still be pretty heavy on LWLock/BufferContent,WALInsert and Lock/extend .
>     t_logged.sql = insert into t_logged select nextval('s1'), repeat('A', 1000) from generate_series(1, 1000); #
accordingto pg_wal_stats.wal_bytes generates ~1MB of WAL
 
>     t_unlogged.sql = insert into t_unlogged select nextval('s1'), repeat('A', 1000) from generate_series(1, 1000); #
accordingto pg_wal_stats.wal_bytes generates ~3kB of WAL
 
> 
> so using: pgbench -f <tabletypetest>.sql -T 30 -P 1 -c 32 -j 3 t 
> with synchronous_commit =ON(default):
>     with t_logged.sql:   tps = 229 (lat avg = 138ms)
>     with t_unlogged.sql  tps = 283 (lat avg = 112ms) # almost all on LWLock/WALWrite
> with synchronous_commit =OFF:
>     with t_logged.sql:   tps = 413 (lat avg = 77ms)
>     with t_unloged.sql:  tps = 782 (lat avg = 40ms)
> Afterwards switching the unlogged ~16GB partitions takes 5s per partition. 
> 
> As the thread didn't get a lot of traction, I've registered it into current commitfest
https://commitfest.postgresql.org/36/3461/with You as the author and in 'Ready for review' state.
 
> 
> I think it behaves as almost finished one and apparently after reading all those discussions that go back over
10years+time span about this feature, and lot of failed effort towards wal_level=noWAL I think it would be nice to
finallystart getting some of that of it into the core.
 

Thanks for taking the performance benchmark.

I didn't register for some reasons.

1. I'm not sure that we want to have the new mark files.

2. Aside of possible bugs, I'm not confident that the crash-safety of
  this patch is actually water-tight. At least we need tests for some
  failure cases.

3. As mentioned in transam/README, failure in removing smgr mark files
   leads to immediate shut down.  I'm not sure this behavior is acceptable.

4. Including the reasons above, this is not fully functionally.
   For example, if we execute the following commands on primary,
   replica dones't work correctly. (boom!)

   =# CREATE UNLOGGED TABLE t (a int);
   =# ALTER TABLE t SET LOGGED;


The following fixes are done in the attched v8.

- Rebased. Referring to Jakub and Justin's work, I replaced direct
  access to ->rd_smgr with RelationGetSmgr() and removed calls to
  RelationOpenSmgr(). I still separate the "ALTER TABLE ALL IN
  TABLESPACE SET LOGGED/UNLOGGED" statement part.

- Fixed RelationCreate/DropInitFork's behavior for non-target
  relations. (From Jakub's work).

- Fixed wording of some comments.

- As revisited, I found a bug around recovery. If the logged-ness of a
  relation gets flipped repeatedly in a transaction, duplicate
  pending-delete entries are accumulated during recovery and work in a
  wrong way. sgmr_redo now adds up to one entry for a action.

- The issue 4 above is not fixed (yet).

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From c665734c9e056e80a0d56281011b95e55ea14507 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v8 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.

Also this allows for the cleanup of files left behind in the crash of
the transaction that created it.
---
 src/backend/access/rmgrdesc/smgrdesc.c |  52 +++
 src/backend/access/transam/README      |   8 +
 src/backend/access/transam/xlog.c      |  17 +
 src/backend/catalog/storage.c          | 539 +++++++++++++++++++++++--
 src/backend/commands/tablecmds.c       | 245 +++++++++--
 src/backend/replication/basebackup.c   |   3 +-
 src/backend/storage/buffer/bufmgr.c    |  88 ++++
 src/backend/storage/file/fd.c          |   4 +-
 src/backend/storage/file/reinit.c      | 345 +++++++++++-----
 src/backend/storage/smgr/md.c          |  93 ++++-
 src/backend/storage/smgr/smgr.c        |  32 ++
 src/backend/storage/sync/sync.c        |  20 +-
 src/bin/pg_rewind/parsexlog.c          |  24 ++
 src/common/relpath.c                   |  47 ++-
 src/include/catalog/storage.h          |   2 +
 src/include/catalog/storage_xlog.h     |  42 +-
 src/include/common/relpath.h           |   9 +-
 src/include/storage/bufmgr.h           |   2 +
 src/include/storage/fd.h               |   1 +
 src/include/storage/md.h               |   8 +-
 src/include/storage/reinit.h           |  10 +-
 src/include/storage/smgr.h             |  17 +
 22 files changed, 1401 insertions(+), 207 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index 7755553d57..d251f22207 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,49 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
                          xlrec->blkno, xlrec->flags);
         pfree(path);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+        char       *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+        appendStringInfoString(buf, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+        char       *path = GetRelationPath(xlrec->rnode.dbNode,
+                                           xlrec->rnode.spcNode,
+                                           xlrec->rnode.relNode,
+                                           InvalidBackendId,
+                                           xlrec->forkNum, xlrec->mark);
+        char       *action;
+
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                action = "CREATE";
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                action = "DELETE";
+                break;
+            default:
+                action = "<unknown action>";
+                break;
+        }
+
+        appendStringInfo(buf, "%s %s", action, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+        char       *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+        appendStringInfoString(buf, path);
+        appendStringInfo(buf, " persistence %d", xlrec->persistence);
+        pfree(path);
+    }
 }
 
 const char *
@@ -55,6 +98,15 @@ smgr_identify(uint8 info)
         case XLOG_SMGR_TRUNCATE:
             id = "TRUNCATE";
             break;
+        case XLOG_SMGR_UNLINK:
+            id = "UNLINK";
+            break;
+        case XLOG_SMGR_MARK:
+            id = "MARK";
+            break;
+        case XLOG_SMGR_BUFPERSISTENCE:
+            id = "BUFPERSISTENCE";
+            break;
     }
 
     return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..b344bbe511 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -724,6 +724,14 @@ we must panic and abort recovery.  The DBA will have to manually clean up
 then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
+The Smgr MARK files
+--------------------------------
+
+An smgr mark file is created when a new relation file is created to
+mark the relfilenode needs to be cleaned up at recovery time.  In
+contrast to the four actions above, failure to remove smgr mark files
+will lead to data loss, in which case the server will shut down.
+
 
 Skipping WAL for New RelFileNode
 --------------------------------
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1e1fbe957f..59f4c2eacf 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -40,6 +40,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
+#include "catalog/storage.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
 #include "common/controldata_utils.h"
@@ -4564,6 +4565,14 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
             {
                 ereport(DEBUG1,
                         (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+                /* cleanup garbage files left during crash recovery */
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+                /* run rollback cleanup if any */
+                smgrDoPendingDeletes(false);
+
                 InArchiveRecovery = true;
                 if (StandbyModeRequested)
                     StandbyMode = true;
@@ -7824,6 +7833,14 @@ StartupXLOG(void)
                 }
             }
 
+            /* cleanup garbage files left during crash recovery */
+            if (!InArchiveRecovery)
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+            /* run rollback cleanup if any */
+            smgrDoPendingDeletes(false);
+
             /* Allow resource managers to do any required cleanup. */
             for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
             {
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index c5ad28d71f..f2bcc12958 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -27,6 +28,7 @@
 #include "access/xlogutils.h"
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
+#include "common/hashfn.h"
 #include "miscadmin.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
@@ -57,9 +59,18 @@ int            wal_skip_threshold = 2048;    /* in kilobytes */
  * but I'm being paranoid.
  */
 
+#define    PDOP_DELETE                (1 << 0)
+#define    PDOP_UNLINK_FORK        (1 << 1)
+#define    PDOP_UNLINK_MARK        (1 << 2)
+#define    PDOP_SET_PERSISTENCE    (1 << 3)
+
 typedef struct PendingRelDelete
 {
     RelFileNode relnode;        /* relation that may need to be deleted */
+    int            op;                /* operation mask */
+    bool        bufpersistence;    /* buffer persistence to set */
+    int            unlink_forknum;    /* forknum to unlink */
+    StorageMarks unlink_mark;    /* mark to unlink */
     BackendId    backend;        /* InvalidBackendId if not a temp rel */
     bool        atCommit;        /* T=delete at commit; F=delete at abort */
     int            nestLevel;        /* xact nesting level of request */
@@ -75,6 +86,24 @@ typedef struct PendingRelSync
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 HTAB       *pendingSyncHash = NULL;
 
+typedef struct SRelHashEntry
+{
+    SMgrRelation  srel;
+    char          status;    /* for simplehash use */
+} SRelHashEntry;
+
+/* define hashtable for workarea for pending deletes */
+#define SH_PREFIX srelhash
+#define SH_ELEMENT_TYPE SRelHashEntry
+#define SH_KEY_TYPE SMgrRelation
+#define SH_KEY srel
+#define SH_HASH_KEY(tb, key) \
+    hash_bytes((unsigned char *)&key, sizeof(SMgrRelation))
+#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(SMgrRelation)) == 0)
+#define SH_SCOPE static inline
+#define SH_DEFINE
+#define SH_DECLARE
+#include "lib/simplehash.h"
 
 /*
  * AddPendingSync
@@ -143,22 +172,48 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
             return NULL;        /* placate compiler */
     }
 
+    /*
+     * We are going to create a new storage file. If server crashes before the
+     * current transaction ends the file needs to be cleaned up but there's no
+     * clue to the orphan files. The SMGR_MARK_UNCOMMITED mark file works as
+     * the signal of that situation.
+     */
     srel = smgropen(rnode, backend);
+    log_smgrcreatemark(&rnode, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
     smgrcreate(srel, MAIN_FORKNUM, false);
 
     if (needs_wal)
         log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
 
-    /* Add the relation to the list of stuff to delete at abort */
+    /*
+     * Add the relation to the list of stuff to delete at abort. We don't
+     * remove the mark file at commit. It needs to persists until the main fork
+     * file is actually deleted.  See SyncPostCheckpoint.
+     */
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rnode;
+    pending->op = PDOP_DELETE;
     pending->backend = backend;
     pending->atCommit = false;    /* delete if abort */
     pending->nestLevel = GetCurrentTransactionNestLevel();
     pending->next = pendingDeletes;
     pendingDeletes = pending;
 
+    /* drop cleanup fork at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_MARK;
+    pending->unlink_forknum = MAIN_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->backend = backend;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
     if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
     {
         Assert(backend == InvalidBackendId);
@@ -168,6 +223,226 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *        Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    SMgrRelation srel;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              create = true;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+    /*
+     * If we have entries for init-fork operations on this relation, that means
+     * that we have already registered pending delete entries to drop
+     * preexisting init-fork since before the current transaction started. This
+     * function reverts that change just by removing the entries.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+
+        /*
+         * We don't touch unrelated entries. Although init-fork related entries
+         * are not useful if the relation is created or dropped in this
+         * transaction, we don't bother to avoid registering entries for such
+         * relations here.
+         */
+        if (!RelFileNodeEquals(rnode, pending->relnode) ||
+            (pending->op & PDOP_DELETE) != 0 ||
+            pending->unlink_forknum != INIT_FORKNUM)
+        {
+            prev = pending;
+            continue;
+        }
+
+        /* make sure the entry is what we're expecting here */
+        Assert(((pending->op & (PDOP_UNLINK_FORK|PDOP_UNLINK_MARK)) != 0 &&
+                pending->unlink_forknum == INIT_FORKNUM) ||
+               (pending->op & PDOP_SET_PERSISTENCE) != 0);
+
+        /* unlink list entry */
+        if (prev)
+            prev->next = next;
+        else
+            pendingDeletes = next;
+        pfree(pending);
+
+        create = false;
+    }
+
+    if (!create)
+        return;
+
+    /*
+     * We are going to create an init fork. If server crashes before the
+     * current transaction ends the init fork left alone corrupts data while
+     * recovery.  The mark file works as the sentinel to identify that
+     * situation.
+     */
+    srel = smgropen(rnode, InvalidBackendId);
+    log_smgrcreatemark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+    /* We don't have existing init fork, create it. */
+    smgrcreate(srel, INIT_FORKNUM, false);
+
+    /*
+     * index-init fork needs further initialization. ambuildempty shoud do
+     * WAL-log and file sync by itself but otherwise we do that by ourselves.
+     */
+    if (rel->rd_rel->relkind == RELKIND_INDEX)
+        rel->rd_indam->ambuildempty(rel);
+    else
+    {
+        log_smgrcreate(&rnode, INIT_FORKNUM);
+        smgrimmedsync(srel, INIT_FORKNUM);
+    }
+
+    /* drop the init fork, mark file and revert persistence at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK | PDOP_UNLINK_MARK | PDOP_SET_PERSISTENCE;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->bufpersistence = true;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* drop mark file at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_MARK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *        Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion doesn't happen.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingRelDelete *pending;
+    PendingRelDelete *prev;
+    PendingRelDelete *next;
+    bool              inxact_created = false;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+    /*
+     * If we have entries for init-fork operations of this relation, that means
+     * that we have created the init fork in the current transaction.  We
+     * remove the init fork and mark file immediately in that case.  Otherwise
+     * just register pending-delete for the existing init fork.
+     */
+    prev = NULL;
+    for (pending = pendingDeletes; pending != NULL; pending = next)
+    {
+        next = pending->next;
+
+        /*
+         * We don't touch unrelated entries. Although init-fork related entries
+         * are not useful if the relation is created or dropped in this
+         * transaction, we don't bother to avoid registering entries for such
+         * relations here.
+         */
+        if (!RelFileNodeEquals(rnode, pending->relnode) ||
+            (pending->op & PDOP_DELETE) != 0 ||
+            pending->unlink_forknum != INIT_FORKNUM))
+        {
+            prev = pending;
+            continue;
+        }
+
+        /* make sure the entry is what we're expecting here */
+        Assert(((pending->op & (PDOP_UNLINK_FORK|PDOP_UNLINK_MARK)) != 0 &&
+                pending->unlink_forknum == INIT_FORKNUM) ||
+               (pending->op & PDOP_SET_PERSISTENCE) != 0);
+
+        /* unlink list entry */
+        if (prev)
+            prev->next = next;
+        else
+            pendingDeletes = next;
+        pfree(pending);
+
+        inxact_created = true;
+    }
+
+    if (inxact_created)
+    {
+        SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+
+        /*
+         * INIT forks never be loaded to shared buffer so no point in dropping
+         * buffers for such files.
+         */
+        log_smgrunlinkmark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+        smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+        log_smgrunlink(&rnode, INIT_FORKNUM);
+        smgrunlink(srel, INIT_FORKNUM, false);
+        return;
+    }
+
+    /* register drop of this init fork file at commit */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_UNLINK_FORK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+
+    /* revert buffer-persistence changes at abort */
+    pending = (PendingRelDelete *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+    pending->relnode = rnode;
+    pending->op = PDOP_SET_PERSISTENCE;
+    pending->bufpersistence = false;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingDeletes;
+    pendingDeletes = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +462,88 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
     XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+    xl_smgr_unlink xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+    xl_smgr_bufpersistence xlrec;
+
+    /*
+     * Make an XLOG entry reporting the change of buffer persistence.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.persistence = persistence;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *        Schedule unlinking of physical storage at transaction commit.
@@ -200,6 +557,7 @@ RelationDropStorage(Relation rel)
     pending = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
     pending->relnode = rel->rd_node;
+    pending->op = PDOP_DELETE;
     pending->backend = rel->rd_backend;
     pending->atCommit = true;    /* delete if commit */
     pending->nestLevel = GetCurrentTransactionNestLevel();
@@ -618,59 +976,104 @@ smgrDoPendingDeletes(bool isCommit)
     int            nrels = 0,
                 maxrels = 0;
     SMgrRelation *srels = NULL;
+    srelhash_hash  *close_srels = NULL;
+    bool            found;
 
     prev = NULL;
     for (pending = pendingDeletes; pending != NULL; pending = next)
     {
+        SMgrRelation srel;
+
         next = pending->next;
         if (pending->nestLevel < nestLevel)
         {
             /* outer-level entries should not be processed yet */
             prev = pending;
+            continue;
         }
+
+        /* unlink list entry first, so we don't retry on failure */
+        if (prev)
+            prev->next = next;
         else
+            pendingDeletes = next;
+
+        if (pending->atCommit != isCommit)
         {
-            /* unlink list entry first, so we don't retry on failure */
-            if (prev)
-                prev->next = next;
-            else
-                pendingDeletes = next;
-            /* do deletion if called for */
-            if (pending->atCommit == isCommit)
-            {
-                SMgrRelation srel;
-
-                srel = smgropen(pending->relnode, pending->backend);
-
-                /* allocate the initial array, or extend it, if needed */
-                if (maxrels == 0)
-                {
-                    maxrels = 8;
-                    srels = palloc(sizeof(SMgrRelation) * maxrels);
-                }
-                else if (maxrels <= nrels)
-                {
-                    maxrels *= 2;
-                    srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
-                }
-
-                srels[nrels++] = srel;
-            }
             /* must explicitly free the list entry */
             pfree(pending);
             /* prev does not change */
+            continue;
         }
+
+        if (close_srels == NULL)
+            close_srels = srelhash_create(CurrentMemoryContext, 32, NULL);
+
+        srel = smgropen(pending->relnode, pending->backend);
+
+        /* Uniquify the smgr relations */
+        srelhash_insert(close_srels, srel, &found);
+
+        if (pending->op & PDOP_DELETE)
+        {
+            /* allocate the initial array, or extend it, if needed */
+            if (maxrels == 0)
+            {
+                maxrels = 8;
+                srels = palloc(sizeof(SMgrRelation) * maxrels);
+            }
+            else if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            srels[nrels++] = srel;
+        }
+
+        if (pending->op & PDOP_UNLINK_FORK)
+        {
+            /* other forks needs to drop buffers */
+            Assert(pending->unlink_forknum == INIT_FORKNUM);
+
+            /* Don't emit wal while recovery. */
+            if (!InRecovery)
+                log_smgrunlink(&pending->relnode, pending->unlink_forknum);
+            smgrunlink(srel, pending->unlink_forknum, false);
+        }
+
+        if (pending->op & PDOP_UNLINK_MARK)
+        {
+            if (!InRecovery)
+                log_smgrunlinkmark(&pending->relnode,
+                                   pending->unlink_forknum,
+                                   pending->unlink_mark);
+            smgrunlinkmark(srel, pending->unlink_forknum,
+                           pending->unlink_mark, InRecovery);
+        }
+
+        if (pending->op & PDOP_SET_PERSISTENCE)
+            SetRelationBuffersPersistence(srel, pending->bufpersistence,
+                                          InRecovery);
     }
 
     if (nrels > 0)
     {
         smgrdounlinkall(srels, nrels, false);
-
-        for (int i = 0; i < nrels; i++)
-            smgrclose(srels[i]);
-
         pfree(srels);
     }
+
+    if (close_srels)
+    {
+        srelhash_iterator i;
+        SRelHashEntry     *ent;
+
+        /* close smgr relatoins */
+        srelhash_start_iterate(close_srels, &i);
+        while ((ent = srelhash_iterate(close_srels, &i)) != NULL)
+            smgrclose(ent->srel);
+        srelhash_destroy(close_srels);
+    }
 }
 
 /*
@@ -840,7 +1243,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId
+            && pending->op == PDOP_DELETE)
             nrels++;
     }
     if (nrels == 0)
@@ -853,7 +1257,8 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     for (pending = pendingDeletes; pending != NULL; pending = pending->next)
     {
         if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
-            && pending->backend == InvalidBackendId)
+            && pending->backend == InvalidBackendId &&
+            pending->op == PDOP_DELETE)
         {
             *rptr = pending->relnode;
             rptr++;
@@ -933,6 +1338,15 @@ smgr_redo(XLogReaderState *record)
         reln = smgropen(xlrec->rnode, InvalidBackendId);
         smgrcreate(reln, xlrec->forkNum, true);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        smgrunlink(reln, xlrec->forkNum, true);
+        smgrclose(reln);
+    }
     else if (info == XLOG_SMGR_TRUNCATE)
     {
         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1021,6 +1435,65 @@ smgr_redo(XLogReaderState *record)
 
         FreeFakeRelcacheEntry(rel);
     }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingRelDelete *pending;
+        bool        created = false;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+                created = true;
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+                break;
+            default:
+                elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+        }
+
+        if (created)
+        {
+            /* revert mark file operation at abort */
+            pending = (PendingRelDelete *)
+                MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+            pending->relnode = xlrec->rnode;
+            pending->op = PDOP_UNLINK_MARK;
+            pending->unlink_forknum = xlrec->forkNum;
+            pending->unlink_mark = xlrec->mark;
+            pending->backend = InvalidBackendId;
+            pending->atCommit = false;
+            pending->nestLevel = GetCurrentTransactionNestLevel();
+            pending->next = pendingDeletes;
+            pendingDeletes = pending;
+        }
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec =
+            (xl_smgr_bufpersistence *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingRelDelete *pending;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+        /* revert buffer-persistence changes at abort */
+        pending = (PendingRelDelete *)
+            MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
+        pending->relnode = xlrec->rnode;
+        pending->op = PDOP_SET_PERSISTENCE;
+        pending->bufpersistence = !xlrec->persistence;
+        pending->backend = InvalidBackendId;
+        pending->atCommit = false;
+        pending->nestLevel = GetCurrentTransactionNestLevel();
+        pending->next = pendingDeletes;
+        pendingDeletes = pending;
+    }
     else
         elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index bf42587e38..afc77f0d98 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -52,6 +52,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5329,6 +5330,166 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
     return newcmd;
 }
 
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+                          LOCKMODE lockmode)
+{
+    Relation     rel;
+    Relation    classRel;
+    HeapTuple    tuple,
+                newtuple;
+    Datum        new_val[Natts_pg_class];
+    bool        new_null[Natts_pg_class],
+                new_repl[Natts_pg_class];
+    int            i;
+    List       *relids;
+    ListCell   *lc_oid;
+
+    Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+    Assert(lockmode == AccessExclusiveLock);
+
+    /*
+     * Under the following condition, we need to call ATRewriteTable, which
+     * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+     */
+    Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+           tab->newvals == NULL && !tab->verify_new_notnull);
+
+    rel = table_open(tab->relid, lockmode);
+
+    Assert(rel->rd_rel->relpersistence != persistence);
+
+    elog(DEBUG1, "perform in-place persistnce change");
+
+    /*
+     * First we collect all relations that we need to change persistence.
+     */
+
+    /* Collect OIDs of indexes and toast relations */
+    relids = RelationGetIndexList(rel);
+    relids = lcons_oid(rel->rd_id, relids);
+
+    /* Add toast relation if any */
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        List    *toastidx;
+        Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+        relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+        toastidx = RelationGetIndexList(toastrel);
+        relids = list_concat(relids, toastidx);
+        pfree(toastidx);
+        table_close(toastrel, NoLock);
+    }
+
+    table_close(rel, NoLock);
+
+    /* Make changes in storage */
+    classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+    foreach (lc_oid, relids)
+    {
+        Oid reloid = lfirst_oid(lc_oid);
+        Relation r = relation_open(reloid, lockmode);
+
+        /*
+         * Some access methods do not accept in-place persistence change. For
+         * example, GiST uses page LSNs to figure out whether a block has
+         * changed, where UNLOGGED GiST indexes use fake LSNs that are
+         * incompatible with real LSNs used for LOGGED ones.
+         *
+         * XXXX: We don't bother to allow in-place persistence change for index
+         * methods other than btree for now.
+         */
+        if (r->rd_rel->relkind == RELKIND_INDEX &&
+            r->rd_rel->relam != BTREE_AM_OID)
+        {
+            int            reindex_flags;
+
+            /* reindex doesn't allow concurrent use of the index */
+            table_close(r, NoLock);
+
+            reindex_flags =
+                REINDEX_REL_SUPPRESS_INDEX_USE |
+                REINDEX_REL_CHECK_CONSTRAINTS;
+
+            /* Set the same persistence with the parent relation. */
+            if (persistence == RELPERSISTENCE_UNLOGGED)
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+            else
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+            reindex_index(reloid, reindex_flags, persistence, 0);
+
+            continue;
+        }
+
+        /* Create or drop init fork */
+        if (persistence == RELPERSISTENCE_UNLOGGED)
+            RelationCreateInitFork(r);
+        else
+            RelationDropInitFork(r);
+
+        /*
+         * When this relation gets WAL-logged, immediately sync all files but
+         * initfork to establish the initial state on storage.  Buffers have
+         * already flushed out by RelationCreate(Drop)InitFork called just
+         * above. Initfork should have been synced as needed.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT)
+        {
+            for (i = 0 ; i < INIT_FORKNUM ; i++)
+            {
+                if (smgrexists(RelationGetSmgr(r), i))
+                    smgrimmedsync(RelationGetSmgr(r), i);
+            }
+        }
+
+        /* Update catalog */
+        tuple = SearchSysCacheCopy1(RELOID,    ObjectIdGetDatum(reloid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+        memset(new_val, 0, sizeof(new_val));
+        memset(new_null, false, sizeof(new_null));
+        memset(new_repl, false, sizeof(new_repl));
+
+        new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+        new_null[Anum_pg_class_relpersistence - 1] = false;
+        new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+                                     new_val, new_null, new_repl);
+
+        CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+        heap_freetuple(newtuple);
+
+        /*
+         * While wal_level >= replica, switching to LOGGED requires the
+         * relation content to be WAL-logged to recover the table.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+        {
+            ForkNumber fork;
+
+            for (fork = 0; fork < INIT_FORKNUM ; fork++)
+            {
+                if (smgrexists(RelationGetSmgr(r), fork))
+                    log_newpage_range(r, fork, 0,
+                                      smgrnblocks(RelationGetSmgr(r), fork),
+                                      false);
+            }
+        }
+
+        table_close(r, NoLock);
+    }
+
+    table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5459,47 +5620,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
                                          tab->relid,
                                          tab->rewrite);
 
-            /*
-             * Create transient table that will receive the modified data.
-             *
-             * Ensure it is marked correctly as logged or unlogged.  We have
-             * to do this here so that buffers for the new relfilenode will
-             * have the right persistence set, and at the same time ensure
-             * that the original filenode's buffers will get read in with the
-             * correct setting (i.e. the original one).  Otherwise a rollback
-             * after the rewrite would possibly result with buffers for the
-             * original filenode having the wrong persistence setting.
-             *
-             * NB: This relies on swap_relation_files() also swapping the
-             * persistence. That wouldn't work for pg_class, but that can't be
-             * unlogged anyway.
-             */
-            OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-                                       persistence, lockmode);
+            if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+                RelationChangePersistence(tab, persistence, lockmode);
+            else
+            {
+                /*
+                 * Create transient table that will receive the modified data.
+                 *
+                 * Ensure it is marked correctly as logged or unlogged.  We
+                 * have to do this here so that buffers for the new relfilenode
+                 * will have the right persistence set, and at the same time
+                 * ensure that the original filenode's buffers will get read in
+                 * with the correct setting (i.e. the original one).  Otherwise
+                 * a rollback after the rewrite would possibly result with
+                 * buffers for the original filenode having the wrong
+                 * persistence setting.
+                 *
+                 * NB: This relies on swap_relation_files() also swapping the
+                 * persistence. That wouldn't work for pg_class, but that can't
+                 * be unlogged anyway.
+                 */
+                OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+                                           NewAccessMethod,
+                                           persistence, lockmode);
 
-            /*
-             * Copy the heap data into the new table with the desired
-             * modifications, and test the current data within the table
-             * against new constraints generated by ALTER TABLE commands.
-             */
-            ATRewriteTable(tab, OIDNewHeap, lockmode);
+                /*
+                 * Copy the heap data into the new table with the desired
+                 * modifications, and test the current data within the table
+                 * against new constraints generated by ALTER TABLE commands.
+                 */
+                ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-            /*
-             * Swap the physical files of the old and new heaps, then rebuild
-             * indexes and discard the old heap.  We can use RecentXmin for
-             * the table's new relfrozenxid because we rewrote all the tuples
-             * in ATRewriteTable, so no older Xid remains in the table.  Also,
-             * we never try to swap toast tables by content, since we have no
-             * interest in letting this code work on system catalogs.
-             */
-            finish_heap_swap(tab->relid, OIDNewHeap,
-                             false, false, true,
-                             !OidIsValid(tab->newTableSpace),
-                             RecentXmin,
-                             ReadNextMultiXactId(),
-                             persistence);
+                /*
+                 * Swap the physical files of the old and new heaps, then
+                 * rebuild indexes and discard the old heap.  We can use
+                 * RecentXmin for the table's new relfrozenxid because we
+                 * rewrote all the tuples in ATRewriteTable, so no older Xid
+                 * remains in the table.  Also, we never try to swap toast
+                 * tables by content, since we have no interest in letting this
+                 * code work on system catalogs.
+                 */
+                finish_heap_swap(tab->relid, OIDNewHeap,
+                                 false, false, true,
+                                 !OidIsValid(tab->newTableSpace),
+                                 RecentXmin,
+                                 ReadNextMultiXactId(),
+                                 persistence);
 
-            InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+                InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+            }
         }
         else
         {
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index ec0485705d..45e1a5d817 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1070,6 +1070,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
         bool        excludeFound;
         ForkNumber    relForkNum; /* Type of fork if file is a relation */
         int            relOidChars;    /* Chars in filename that are the rel oid */
+        StorageMarks mark;
 
         /* Skip special stuff */
         if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1120,7 +1121,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
         /* Exclude all forks for unlogged tables except the init fork */
         if (isDbDir &&
             parse_filename_for_nontemp_relation(de->d_name, &relOidChars,
-                                                &relForkNum))
+                                                &relForkNum, &mark))
         {
             /* Never exclude init forks */
             if (relForkNum != INIT_FORKNUM)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index b4532948d3..dab74bf99a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3154,6 +3155,93 @@ DropRelFileNodeBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
     }
 }
 
+/* ---------------------------------------------------------------------
+ *        SetRelFileNodeBuffersPersistence
+ *
+ *        This function changes the persistence of all buffer pages of a relation
+ *        then writes all dirty pages of the relation out to disk when switching
+ *        to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *        ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *        Generally, the caller should be holding AccessExclusiveLock on the
+ *        target relation to ensure that no other backend is busy dirtying
+ *        more blocks of the relation; the effects can't be expected to last
+ *        after the lock is released.
+ *
+ *        XXX currently it sequentially searches the buffer pool, should be
+ *        changed to more clever ways of searching.  This routine is not
+ *        used in any performance-critical code paths, so it's not worth
+ *        adding additional overhead to normal paths to make it go faster;
+ *        but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+    int            i;
+    RelFileNodeBackend rnode = srel->smgr_rnode;
+
+    Assert (!RelFileNodeBackendIsTemp(rnode));
+
+    if (!isRedo)
+        log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+    for (i = 0; i < NBuffers; i++)
+    {
+        BufferDesc *bufHdr = GetBufferDescriptor(i);
+        uint32        buf_state;
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+            continue;
+
+        ReservePrivateRefCountEntry();
+
+        buf_state = LockBufHdr(bufHdr);
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+        {
+            UnlockBufHdr(bufHdr, buf_state);
+            continue;
+        }
+
+        if (permanent)
+        {
+            /* Init fork is being dropped, drop buffers for it. */
+            if (bufHdr->tag.forkNum == INIT_FORKNUM)
+            {
+                InvalidateBuffer(bufHdr);
+                continue;
+            }
+
+            buf_state |= BM_PERMANENT;
+            pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+            /* we flush this buffer when switching to PERMANENT */
+            if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+            {
+                PinBuffer_Locked(bufHdr);
+                LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                              LW_SHARED);
+                FlushBuffer(bufHdr, srel);
+                LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+                UnpinBuffer(bufHdr, true);
+            }
+            else
+                UnlockBufHdr(bufHdr, buf_state);
+        }
+        else
+        {
+            /* init fork is always BM_PERMANENT. See BufferAlloc */
+            if (bufHdr->tag.forkNum != INIT_FORKNUM)
+                buf_state &= ~BM_PERMANENT;
+
+            UnlockBufHdr(bufHdr, buf_state);
+        }
+    }
+}
+
 /* ---------------------------------------------------------------------
  *        DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 263057841d..8487ae1f02 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -349,8 +349,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int    fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3759,7 +3757,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
     char        parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 0ae3fb6902..f8458a1e1e 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,49 @@
 
 #include <unistd.h>
 
+#include "access/xlog.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
 #include "postmaster/startup.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-                                                  int op);
+                                                  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-                                               int op);
+                                               Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
     Oid            reloid;            /* hash key */
-} unlogged_relation_entry;
+    bool        has_init;        /* has INIT fork */
+    bool        dirty_init;        /* needs to remove INIT fork */
+    bool        dirty_all;        /* needs to remove all forks */
+} relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -72,7 +92,7 @@ ResetUnloggedRelations(int op)
     /*
      * First process unlogged files in pg_default ($PGDATA/base)
      */
-    ResetUnloggedRelationsInTablespaceDir("base", op);
+    ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
     /*
      * Cycle through directories for all non-default tablespaces.
@@ -81,13 +101,19 @@ ResetUnloggedRelations(int op)
 
     while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
     {
+        Oid tspid;
+
         if (strcmp(spc_de->d_name, ".") == 0 ||
             strcmp(spc_de->d_name, "..") == 0)
             continue;
 
         snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
                  spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-        ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+        tspid = atooid(spc_de->d_name);
+
+        Assert(tspid != 0);
+        ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
     }
 
     FreeDir(spc_dir);
@@ -103,7 +129,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+                                      Oid tspid, int op)
 {
     DIR           *ts_dir;
     struct dirent *de;
@@ -130,6 +157,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
     while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
     {
+        Oid dbid;
+
         /*
          * We're only interested in the per-database directories, which have
          * numeric names.  Note that this code will also (properly) ignore "."
@@ -148,7 +177,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
             ereport_startup_progress("resetting unlogged relations (cleanup), elapsed time: %ld.%02d s, current path:
%s",
                                      dbspace_path);
 
-        ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+        dbid = atooid(de->d_name);
+        Assert(dbid != 0);
+
+        ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
     }
 
     FreeDir(ts_dir);
@@ -158,125 +190,228 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+                                   Oid tspid, Oid dbid, int op)
 {
     DIR           *dbspace_dir;
     struct dirent *de;
     char        rm_path[MAXPGPATH * 2];
+    HTAB       *hash;
+    HASHCTL        ctl;
 
     /* Caller must specify at least one operation. */
-    Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+    Assert((op & (UNLOGGED_RELATION_CLEANUP |
+                  UNLOGGED_RELATION_DROP_BUFFER |
+                  UNLOGGED_RELATION_INIT)) != 0);
 
     /*
      * Cleanup is a two-pass operation.  First, we go through and identify all
      * the files with init forks.  Then, we go through again and nuke
      * everything with the same OID except the init fork.
      */
+
+    /*
+     * It's possible that someone could create a ton of unlogged relations
+     * in the same database & tablespace, so we'd better use a hash table
+     * rather than an array or linked list to keep track of which files
+     * need to be reset.  Otherwise, this cleanup operation would be
+     * O(n^2).
+     */
+    memset(&ctl, 0, sizeof(ctl));
+    ctl.keysize = sizeof(Oid);
+    ctl.entrysize = sizeof(relfile_entry);
+    hash = hash_create("relfilenode cleanup hash",
+                       32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+    /* Collect INIT fork and mark files in the directory. */
+    dbspace_dir = AllocateDir(dbspacedirname);
+    while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+    {
+        int            oidchars;
+        ForkNumber    forkNum;
+        StorageMarks mark;
+
+        /* Skip anything that doesn't look like a relation data file. */
+        if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+                                                 &forkNum, &mark))
+            continue;
+
+        if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
+        {
+            Oid                key;
+            relfile_entry  *ent;
+            bool            found;
+
+            /*
+             * Record the relfilenode information. If it has
+             * SMGR_MARK_UNCOMMITTED mark files, the relfilenode is in dirty
+             * state, where clean up is needed.
+             */
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+            if (!found)
+            {
+                ent->has_init = false;
+                ent->dirty_init = false;
+                ent->dirty_all = false;
+            }
+
+            if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_init = true;
+            else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_all = true;
+            else
+            {
+                Assert(forkNum == INIT_FORKNUM);
+                ent->has_init = true;
+            }
+        }
+    }
+
+    /* Done with the first pass. */
+    FreeDir(dbspace_dir);
+
+    /* nothing to do if we don't have init nor cleanup forks */
+    if (hash_get_num_entries(hash) < 1)
+    {
+        hash_destroy(hash);
+        return;
+    }
+
+    if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+    {
+        /*
+         * When we come here after recovery, smgr object for this file might
+         * have been created. In that case we need to drop all buffers then the
+         * smgr object before initializing the unlogged relation.  This is safe
+         * as far as no other backends have accessed the relation before
+         * starting archive recovery.
+         */
+        HASH_SEQ_STATUS status;
+        relfile_entry *ent;
+        SMgrRelation   *srels = palloc(sizeof(SMgrRelation) * 8);
+        int               maxrels = 8;
+        int               nrels = 0;
+        int i;
+
+        Assert(!HotStandbyActive());
+
+        hash_seq_init(&status, hash);
+        while((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+        {
+            RelFileNodeBackend rel;
+
+            /*
+             * The relation is persistent and stays remain persistent. Don't
+             * drop the buffers for this relation.
+             */
+            if (ent->has_init && ent->dirty_init)
+                continue;
+
+            if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = ent->reloid;
+
+            srels[nrels++] = smgropen(rel.node, InvalidBackendId);
+        }
+
+        DropRelFileNodesAllBuffers(srels, nrels);
+
+        for (i = 0 ; i < nrels ; i++)
+            smgrclose(srels[i]);
+    }
+
+    /*
+     * Now, make a second pass and remove anything that matches.
+     */
     if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
     {
-        HTAB       *hash;
-        HASHCTL        ctl;
-
-        /*
-         * It's possible that someone could create a ton of unlogged relations
-         * in the same database & tablespace, so we'd better use a hash table
-         * rather than an array or linked list to keep track of which files
-         * need to be reset.  Otherwise, this cleanup operation would be
-         * O(n^2).
-         */
-        ctl.keysize = sizeof(Oid);
-        ctl.entrysize = sizeof(unlogged_relation_entry);
-        ctl.hcxt = CurrentMemoryContext;
-        hash = hash_create("unlogged relation OIDs", 32, &ctl,
-                           HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-        /* Scan the directory. */
         dbspace_dir = AllocateDir(dbspacedirname);
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
+            ForkNumber        forkNum;
+            StorageMarks    mark;
+            int                oidchars;
+            Oid                key;
+            relfile_entry  *ent;
+            RelFileNodeBackend rel;
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* Also skip it unless this is the init fork. */
-            if (forkNum != INIT_FORKNUM)
-                continue;
-
-            /*
-             * Put the OID portion of the name into the hash table, if it
-             * isn't already.
-             */
-            ent.reloid = atooid(de->d_name);
-            (void) hash_search(hash, &ent, HASH_ENTER, NULL);
-        }
-
-        /* Done with the first pass. */
-        FreeDir(dbspace_dir);
-
-        /*
-         * If we didn't find any init forks, there's no point in continuing;
-         * we can bail out now.
-         */
-        if (hash_get_num_entries(hash) == 0)
-        {
-            hash_destroy(hash);
-            return;
-        }
-
-        /*
-         * Now, make a second pass and remove anything that matches.
-         */
-        dbspace_dir = AllocateDir(dbspacedirname);
-        while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-        {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
-
-            /* Skip anything that doesn't look like a relation data file. */
-            if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* We never remove the init fork. */
-            if (forkNum == INIT_FORKNUM)
+                                                     &forkNum, &mark))
                 continue;
 
             /*
              * See whether the OID portion of the name shows up in the hash
              * table.  If so, nuke it!
              */
-            ent.reloid = atooid(de->d_name);
-            if (hash_search(hash, &ent, HASH_FIND, NULL))
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+            if (!ent)
+                continue;
+
+            if (!ent->dirty_all)
             {
-                snprintf(rm_path, sizeof(rm_path), "%s/%s",
-                         dbspacedirname, de->d_name);
-                if (unlink(rm_path) < 0)
-                    ereport(ERROR,
-                            (errcode_for_file_access(),
-                             errmsg("could not remove file \"%s\": %m",
-                                    rm_path)));
+                /* clean permanent relations don't need cleanup */
+                if (!ent->has_init)
+                    continue;
+
+                if (ent->dirty_init)
+                {
+                    /*
+                     * The crashed trasaction did SET UNLOGGED. This relation
+                     * is restored to a LOGGED relation.
+                     */
+                    if (forkNum != INIT_FORKNUM)
+                        continue;
+                }
                 else
-                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                {
+                    /*
+                     * we don't remove the INIT fork of a non-dirty
+                     * relfilenode
+                     */
+                    if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+                        continue;
+                }
             }
+
+            /* so, nuke it! */
+            snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                     dbspacedirname, de->d_name);
+            if (unlink(rm_path) < 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m",
+                                rm_path)));
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = atooid(de->d_name);
+
+            ForgetRelationForkSyncRequests(rel, forkNum);
         }
 
         /* Cleanup is complete. */
         FreeDir(dbspace_dir);
-        hash_destroy(hash);
     }
 
+    hash_destroy(hash);
+    hash = NULL;
+
     /*
      * Initialization happens after cleanup is complete: we copy each init
-     * fork file to the corresponding main fork file.  Note that if we are
-     * asked to do both cleanup and init, we may never get here: if the
-     * cleanup code determines that there are no init forks in this dbspace,
-     * it will return before we get to this point.
+     * fork file to the corresponding main fork file.
      */
     if ((op & UNLOGGED_RELATION_INIT) != 0)
     {
@@ -285,6 +420,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        srcpath[MAXPGPATH * 2];
@@ -292,9 +428,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -328,15 +466,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        mainpath[MAXPGPATH];
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -379,7 +520,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *oidchars,
-                                    ForkNumber *fork)
+                                    ForkNumber *fork, StorageMarks *mark)
 {
     int            pos;
 
@@ -410,11 +551,19 @@ parse_filename_for_nontemp_relation(const char *name, int *oidchars,
 
         for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
             ;
-        if (segchar <= 1)
-            return false;
-        pos += segchar;
+        if (segchar > 1)
+            pos += segchar;
     }
 
+    /* mark file? */
+    if (name[pos] == '.' && name[pos + 1] != 0)
+    {
+        *mark = name[pos + 1];
+        pos += 2;
+    }
+    else
+        *mark = SMGR_MARK_NONE;
+
     /* Now we should be at the end. */
     if (name[pos] != '\0')
         return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index b4bca7eed6..580b74839f 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -139,7 +139,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
                              BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
                               MdfdVec *seg);
-
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+                         StorageMarks mark);
 
 /*
  *    mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -169,6 +170,81 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
     return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path =markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    /* See mdcreate for details.. */
+    TablespaceCreateDbspace(reln->smgr_rnode.node.spcNode,
+                            reln->smgr_rnode.node.dbNode,
+                            isRedo);
+
+    fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+    if (fd < 0 && (!isRedo || errno != EEXIST))
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not crete mark file \"%s\": %m", path)));
+
+    pg_fsync(fd);
+    close(fd);
+
+    /*
+     * To guarantee that the creation of the file is persistent, fsync its
+     * parent directory.
+     */
+    fsync_parent_path(path, ERROR);
+
+    pfree(path);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+
+    if (!isRedo || mdmarkexists(reln, forkNum, mark))
+        durable_unlink(path, ERROR);
+
+    pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    fd = BasicOpenFile(path, O_RDONLY);
+    if (fd < 0 && errno != ENOENT)
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not access mark file \"%s\": %m", path)));
+    pfree(path);
+
+    if (fd < 0)
+        return false;
+
+    return true;
+}
+
 /*
  *    mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1025,6 +1101,15 @@ register_forget_request(RelFileNodeBackend rnode, ForkNumber forknum,
     RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileNodeBackend rnode, ForkNumber forknum)
+{
+    register_forget_request(rnode, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1378,12 +1463,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
     char       *p;
 
     /* Compute the path. */
-    p = relpathperm(ftag->rnode, MAIN_FORKNUM);
+    p = GetRelationPath(ftag->rnode.dbNode, ftag->rnode.spcNode,
+                        ftag->rnode.relNode, InvalidBackendId, MAIN_FORKNUM,
+                        mark);
     strlcpy(path, p, MAXPGPATH);
     pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 0fcef4994b..110e64b0b2 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -62,6 +62,10 @@ typedef struct f_smgr
     void        (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
                                   BlockNumber nblocks);
     void        (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+    void        (*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
+    void        (*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -82,6 +86,8 @@ static const f_smgr smgrsw[] = {
         .smgr_nblocks = mdnblocks,
         .smgr_truncate = mdtruncate,
         .smgr_immedsync = mdimmedsync,
+        .smgr_createmark = mdcreatemark,
+        .smgr_unlinkmark = mdunlinkmark,
     }
 };
 
@@ -335,6 +341,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
     smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *    smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *    smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  *    smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -662,6 +688,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
     smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index d4083e8a56..9563940d45 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -89,7 +89,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
     int            (*sync_syncfiletag) (const FileTag *ftag, char *path);
-    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+                                       StorageMarks mark);
     bool        (*sync_filetagmatches) (const FileTag *ftag,
                                         const FileTag *candidate);
 } SyncOps;
@@ -222,7 +223,8 @@ SyncPostCheckpoint(void)
 
         /* Unlink the file */
         if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-                                                          path) < 0)
+                                                          path,
+                                                          SMGR_MARK_NONE) < 0)
         {
             /*
              * There's a race condition, when the database is dropped at the
@@ -236,6 +238,20 @@ SyncPostCheckpoint(void)
                         (errcode_for_file_access(),
                          errmsg("could not remove file \"%s\": %m", path)));
         }
+        else if (syncsw[entry->tag.handler].sync_unlinkfiletag(
+                     &entry->tag, path,
+                     SMGR_MARK_UNCOMMITTED) < 0)
+        {
+            /*
+             * And we may have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+             * fork files has been successfully removed. It's ok if the file
+             * does not exist.
+             */
+            if (errno != ENOENT)
+                ereport(WARNING,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m", path)));
+        }
 
         /* Mark the list entry as canceled, just in case */
         entry->canceled = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 436df54120..dbc0da5da5 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -407,6 +407,30 @@ extractPageInfo(XLogReaderState *record)
          * source system.
          */
     }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
     else if (rmid == RM_XACT_ID &&
              ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
               (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 1f5c426ec0..67f24890d6 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbNode, Oid spcNode)
  */
 char *
 GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                int backendId, ForkNumber forkNumber)
+                int backendId, ForkNumber forkNumber, char mark)
 {
     char       *path;
+    char        markstr[10];
+
+    if (mark == 0)
+        markstr[0] = 0;
+    else
+        snprintf(markstr, 10, ".%c", mark);
 
     if (spcNode == GLOBALTABLESPACE_OID)
     {
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         Assert(dbNode == 0);
         Assert(backendId == InvalidBackendId);
         if (forkNumber != MAIN_FORKNUM)
-            path = psprintf("global/%u_%s",
-                            relNode, forkNames[forkNumber]);
+            path = psprintf("global/%u_%s%s",
+                            relNode, forkNames[forkNumber], markstr);
         else
-            path = psprintf("global/%u", relNode);
+            path = psprintf("global/%u%s", relNode, markstr);
     }
     else if (spcNode == DEFAULTTABLESPACE_OID)
     {
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/%u_%s",
+                path = psprintf("base/%u/%u_%s%s",
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/%u",
-                                dbNode, relNode);
+                path = psprintf("base/%u/%u%s",
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/t%d_%u_%s",
+                path = psprintf("base/%u/t%d_%u_%s%s",
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/t%d_%u",
-                                dbNode, backendId, relNode);
+                path = psprintf("base/%u/t%d_%u%s",
+                                dbNode, backendId, relNode, markstr);
         }
     }
     else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, relNode);
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, backendId, relNode);
+                                dbNode, backendId, relNode, markstr);
         }
     }
+
     return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 0ab32b44e9..382623159c 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int    wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index f0814f1458..12346ed7f6 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE    0x10
 #define XLOG_SMGR_TRUNCATE    0x20
+#define XLOG_SMGR_UNLINK    0x30
+#define XLOG_SMGR_MARK        0x40
+#define XLOG_SMGR_BUFPERSISTENCE    0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,32 @@ typedef struct xl_smgr_create
     ForkNumber    forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+    RelFileNode rnode;
+    ForkNumber    forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+    XLOG_SMGR_MARK_CREATE = 'c',
+    XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+    RelFileNode     rnode;
+    ForkNumber        forkNum;
+    StorageMarks    mark;
+    smgr_mark_action action;
+} xl_smgr_mark;
+
+typedef struct xl_smgr_bufpersistence
+{
+    RelFileNode rnode;
+    bool        persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP        0x0001
 #define SMGR_TRUNCATE_VM        0x0002
@@ -51,6 +83,12 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a44be11ca0..106a5cf508 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -67,7 +67,7 @@ extern int    forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbNode, Oid spcNode);
 
 extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                             int backendId, ForkNumber forkNumber);
+                             int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -77,7 +77,7 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 /* First argument is a RelFileNode */
 #define relpathbackend(rnode, backend, forknum) \
     GetRelationPath((rnode).dbNode, (rnode).spcNode, (rnode).relNode, \
-                    backend, forknum)
+                    backend, forknum, 0)
 
 /* First argument is a RelFileNode */
 #define relpathperm(rnode, forknum) \
@@ -87,4 +87,9 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 #define relpath(rnode, forknum) \
     relpathbackend((rnode).node, (rnode).backend, forknum)
 
+/* First argument is a RelFileNodeBackend */
+#define markpath(rnode, forknum, mark)                                \
+    GetRelationPath((rnode).node.dbNode, (rnode).node.spcNode, \
+                    (rnode).node.relNode, \
+                    (rnode).backend, forknum, mark)
 #endif                            /* RELPATH_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index cfce23ecbc..f5a7df87a4 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -206,6 +206,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
                                    int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+                                          bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(struct SMgrRelationData **smgr_reln, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 34602ae006..2dc0357ad5 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -185,6 +185,7 @@ extern ssize_t pg_pwritev_with_retry(int fd,
 extern int    pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int    fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int    fsync_parent_path(const char *fname, int elevel);
 extern int    durable_rename(const char *oldfile, const char *newfile, int loglevel);
 extern int    durable_unlink(const char *fname, int loglevel);
 extern int    durable_rename_excl(const char *oldfile, const char *newfile, int loglevel);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 752b440864..99620816b5 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -23,6 +23,10 @@
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool    isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
@@ -41,12 +45,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
                        BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileNodeBackend rnode,
+                                           ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int    mdsyncfiletag(const FileTag *ftag, char *path);
-extern int    mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int    mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif                            /* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index fad1e5c473..e1f97e9b89 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,13 +16,15 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
-extern bool parse_filename_for_nontemp_relation(const char *name,
-                                                int *oidchars, ForkNumber *fork);
+extern bool parse_filename_for_nontemp_relation(const char *name, int *oidchars,
+                                                ForkNumber *fork,
+                                                StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP        0x0001
-#define UNLOGGED_RELATION_INIT            0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER    0x0002
+#define UNLOGGED_RELATION_INIT            0x0004
 
 #endif                            /* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index a6fbf7b6a6..201ecace8a 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+    SMGR_MARK_NONE = 0,
+    SMGR_MARK_UNCOMMITTED = 'u'    /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -85,7 +97,12 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-- 
2.27.0

From 2d74ca97ae66dff87a883e2efa60f02fb8c883c3 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v8 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 src/backend/commands/tablecmds.c | 140 +++++++++++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c    |  16 ++++
 src/backend/nodes/equalfuncs.c   |  15 ++++
 src/backend/parser/gram.y        |  20 +++++
 src/backend/tcop/utility.c       |  11 +++
 src/include/commands/tablecmds.h |   2 +
 src/include/nodes/nodes.h        |   1 +
 src/include/nodes/parsenodes.h   |   9 ++
 8 files changed, 214 insertions(+)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index afc77f0d98..211ca3641a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14488,6 +14488,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
     return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistene only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt)
+{
+    List       *relations = NIL;
+    ListCell   *l;
+    ScanKeyData key[1];
+    Relation    rel;
+    TableScanDesc scan;
+    HeapTuple    tuple;
+    Oid            tablespaceoid;
+    List       *role_oids = roleSpecsToIds(NIL);
+
+    /* Ensure we were not asked to change something we can't */
+    if (stmt->objtype != OBJECT_TABLE)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be specified")));
+
+    /* Get the tablespace OID */
+    tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+    /*
+     * Now that the checks are done, check if we should set either to
+     * InvalidOid because it is our database's default tablespace.
+     */
+    if (tablespaceoid == MyDatabaseTableSpace)
+        tablespaceoid = InvalidOid;
+
+    /*
+     * Walk the list of objects in the tablespace to pick up them. This will
+     * only find objects in our database, of course.
+     */
+    ScanKeyInit(&key[0],
+                Anum_pg_class_reltablespace,
+                BTEqualStrategyNumber, F_OIDEQ,
+                ObjectIdGetDatum(tablespaceoid));
+
+    rel = table_open(RelationRelationId, AccessShareLock);
+    scan = table_beginscan_catalog(rel, 1, key);
+    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+    {
+        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+        Oid            relOid = relForm->oid;
+
+        /*
+         * Do not pick-up objects in pg_catalog as part of this, if an admin
+         * really wishes to do so, they can issue the individual ALTER
+         * commands directly.
+         *
+         * Also, explicitly avoid any shared tables, temp tables, or TOAST
+         * (TOAST will be changed with the main table).
+         */
+        if (IsCatalogNamespace(relForm->relnamespace) ||
+            relForm->relisshared ||
+            isAnyTempNamespace(relForm->relnamespace) ||
+            IsToastNamespace(relForm->relnamespace))
+            continue;
+
+        /* Only pick up the object type requested */
+        if (relForm->relkind != RELKIND_RELATION)
+            continue;
+
+        /* Check if we are only picking-up objects owned by certain roles */
+        if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+            continue;
+
+        /*
+         * Handle permissions-checking here since we are locking the tables
+         * and also to avoid doing a bunch of work only to fail part-way. Note
+         * that permissions will also be checked by AlterTableInternal().
+         *
+         * Caller must be considered an owner on the table of which we're going
+         * to change persistence.
+         */
+        if (!pg_class_ownercheck(relOid, GetUserId()))
+            aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+                           NameStr(relForm->relname));
+
+        if (stmt->nowait &&
+            !ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_IN_USE),
+                     errmsg("aborting because lock on relation \"%s.%s\" is not available",
+                            get_namespace_name(relForm->relnamespace),
+                            NameStr(relForm->relname))));
+        else
+            LockRelationOid(relOid, AccessExclusiveLock);
+
+        /*
+         * Add to our list of objects of which we're going to change
+         * persistence.
+         */
+        relations = lappend_oid(relations, relOid);
+    }
+
+    table_endscan(scan);
+    table_close(rel, AccessShareLock);
+
+    if (relations == NIL)
+        ereport(NOTICE,
+                (errcode(ERRCODE_NO_DATA_FOUND),
+                 errmsg("no matching relations in tablespace \"%s\" found",
+                        tablespaceoid == InvalidOid ? "(database default)" :
+                        get_tablespace_name(tablespaceoid))));
+
+    /*
+     * Everything is locked, loop through and change persistence of all of the
+     * relations.
+     */
+    foreach(l, relations)
+    {
+        List       *cmds = NIL;
+        AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+        if (stmt->logged)
+            cmd->subtype = AT_SetLogged;
+        else
+            cmd->subtype = AT_SetUnLogged;
+
+        cmds = lappend(cmds, cmd);
+
+        EventTriggerAlterTableStart((Node *) stmt);
+        /* OID is set by AlterTableInternal */
+        AlterTableInternal(lfirst_oid(l), cmds, false);
+        EventTriggerAlterTableEnd();
+    }
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..55e38cfe3f 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4269,6 +4269,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
     return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *from)
+{
+    AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+    COPY_STRING_FIELD(tablespacename);
+    COPY_SCALAR_FIELD(objtype);
+    COPY_SCALAR_FIELD(logged);
+    COPY_SCALAR_FIELD(nowait);
+
+    return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -5622,6 +5635,9 @@ copyObjectImpl(const void *from)
         case T_AlterTableMoveAllStmt:
             retval = _copyAlterTableMoveAllStmt(from);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _copyAlterTableSetLoggedAllStmt(from);
+            break;
         case T_CreateExtensionStmt:
             retval = _copyCreateExtensionStmt(from);
             break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..a19b7874d7 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1916,6 +1916,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
     return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *a,
+                                 const AlterTableSetLoggedAllStmt *b)
+{
+    COMPARE_STRING_FIELD(tablespacename);
+    COMPARE_SCALAR_FIELD(objtype);
+    COMPARE_SCALAR_FIELD(logged);
+    COMPARE_SCALAR_FIELD(nowait);
+
+    return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -3625,6 +3637,9 @@ equal(const void *a, const void *b)
         case T_AlterTableMoveAllStmt:
             retval = _equalAlterTableMoveAllStmt(a, b);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _equalAlterTableSetLoggedAllStmt(a, b);
+            break;
         case T_CreateExtensionStmt:
             retval = _equalCreateExtensionStmt(a, b);
             break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..9823d57a54 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -1984,6 +1984,26 @@ AlterTableStmt:
                     n->nowait = $13;
                     $$ = (Node *)n;
                 }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = true;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = false;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
         |    ALTER INDEX qualified_name alter_table_cmds
                 {
                     AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1fbc387d47..1483f9a475 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -162,6 +162,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
         case T_AlterTSConfigurationStmt:
         case T_AlterTSDictionaryStmt:
         case T_AlterTableMoveAllStmt:
+        case T_AlterTableSetLoggedAllStmt:
         case T_AlterTableSpaceOptionsStmt:
         case T_AlterTableStmt:
         case T_AlterTypeStmt:
@@ -1747,6 +1748,12 @@ ProcessUtilitySlow(ParseState *pstate,
                 commandCollected = true;
                 break;
 
+            case T_AlterTableSetLoggedAllStmt:
+                AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+                /* commands are stashed in AlterTableSetLoggedAll */
+                commandCollected = true;
+                break;
+
             case T_DropStmt:
                 ExecDropStmt((DropStmt *) parsetree, isTopLevel);
                 /* no commands stashed for DROP */
@@ -2669,6 +2676,10 @@ CreateCommandTag(Node *parsetree)
             tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
             break;
 
+        case T_AlterTableSetLoggedAllStmt:
+            tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+            break;
+
         case T_AlterTableStmt:
             tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
             break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 336549cc5f..714077ff4c 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid    AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
                                          Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 7c657c1241..8860b2e548 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -428,6 +428,7 @@ typedef enum NodeTag
     T_AlterCollationStmt,
     T_CallStmt,
     T_AlterStatsStmt,
+    T_AlterTableSetLoggedAllStmt,
 
     /*
      * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..c3e1bc66d1 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2350,6 +2350,15 @@ typedef struct AlterTableMoveAllStmt
     bool        nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+    NodeTag        type;
+    char       *tablespacename;
+    ObjectType    objtype;        /* Object type to move */
+    bool        logged;
+    bool        nowait;
+} AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *        Create/Alter Extension Statements
  * ----------------------
-- 
2.27.0


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Amit Langote
Дата:
Сообщение: Re: simplifying foreign key/RI checks
Следующее
От: "houzj.fnst@fujitsu.com"
Дата:
Сообщение: relcache not invalidated when ADD PRIMARY KEY USING INDEX