Re: In-placre persistance change of a relation

Поиск
Список
Период
Сортировка
От Kyotaro Horiguchi
Тема Re: In-placre persistance change of a relation
Дата
Msg-id 20220114.114310.2172685740667639154.horikyota.ntt@gmail.com
обсуждение исходный текст
Ответ на Re: In-placre persistance change of a relation  (Jakub Wartak <jakub.wartak@tomtom.com>)
Ответы Re: In-placre persistance change of a relation  (Julien Rouhaud <rjuju123@gmail.com>)
Список pgsql-hackers
I found a bug.

mdmarkexists() didn't close the tentatively opend fd. This is a silent
leak on Linux and similars and it causes delete failure on Windows.
It was the reason of the CI failure.

027_persistence_change.pl uses interactive_psql() that doesn't work on
the Windos VM on the CI.

In this version the following changes have been made in 0001.

- Properly close file descriptor in mdmarkexists.

- Skip some tests when IO::Pty is not available.
  It might be better to separate that part.

Looking again the ALTER TABLE ALL IN TABLESPACE SET LOGGED patch, I
noticed that it doesn't implement OWNED BY part and doesn't have test
and documenttaion (it was PoC). Added all of them to 0002.

At Tue, 11 Jan 2022 09:33:55 +0000, Jakub Wartak <jakub.wartak@tomtom.com> wrote in 
> The following review has been posted through the commitfest application:
> make installcheck-world:  tested, passed
> Implements feature:       tested, passed
> Spec compliant:           tested, passed
> Documentation:            not tested
> 
> I've retested v15 of the patch with everything that came to my mind. The patch passes all my tests (well, there's
thisjust windows / cfbot issue). Patch looks good to me. I haven't looked in-depth at the code, so patch might still
needreview.
 

Thanks for checking.

> FYI, about potential usage of this patch: the most advanced test that I did was continually bouncing wal_level - it
works.So chain of :
 
> 1. wal_level=replica->minimal
> 2. alter table set unlogged and load a lot of data, set logged
> 3. wal_level=minimal->replica
> 4. barman incremental backup # rsync(1) just backups changed files in steps 2 and 3 (not whole DB)
> 5. some other (logged) work
> The idea is that when performing mass-alterations to the DB (think nightly ETL/ELT on TB-sized DBs), one could skip
backingup most of DB and then just quickly backup only the changed files - during the maintenance window - e.g. thanks
tolocal-rsync barman mode. This is the output of barman show-backups after loading data to unlogged table each such
cycle:
> mydb 20220110T100236 - Mon Jan 10 10:05:14 2022 - Size: 144.1 GiB - WAL Size: 16.0 KiB
> mydb 20220110T094905 - Mon Jan 10 09:50:12 2022 - Size: 128.5 GiB - WAL Size: 80.2 KiB
> mydb 20220110T094016 - Mon Jan 10 09:40:17 2022 - Size: 109.1 GiB - WAL Size: 496.3 KiB
> And dedupe ratio of the last one: Backup size: 144.1 GiB. Actual size on disk: 36.1 GiB (-74.96% deduplication
ratio). 
 

Ah, The patch skips duping relation files. This is advantageous that
that not only eliminates the I/O activities the duping causes but also
reduce the size of incremental backup.  I didn't noticed only the
latter advantage.

> The only thing I've found out that bouncing wal_level also forces max_wal_senders=X -> 0 -> X which in turn requires
droppingreplication slot for pg_receievewal (e.g. barman receive-wal --create-slot/--drop-slot/--reset). I have tested
therestore using barman recover afterwards to backup 20220110T094905  and indeed it worked OK using this patch too.
 

Year, it is irrelevant to this patch but I'm annoyed by the
restriction.  I think it would be okay that max_wal_senders is
forcibly set to 0 while wal_level=minimal..

> The new status of this patch is: Needs review

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From d6bf0bd0d60391b24d5be7942b546acfffa3d7b1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 21:51:11 +0900
Subject: [PATCH v16 1/2] In-place table persistence change

Even though ALTER TABLE SET LOGGED/UNLOGGED does not require data
rewriting, currently it runs heap rewrite which causes large amount of
file I/O.  This patch makes the command run without heap rewrite.
Addition to that, SET LOGGED while wal_level > minimal emits WAL using
XLOG_FPI instead of massive number of HEAP_INSERT's, which should be
smaller.

Also this allows for the cleanup of files left behind in the crash of
the transaction that created it.
---
 src/backend/access/rmgrdesc/smgrdesc.c        |  52 ++
 src/backend/access/transam/README             |   8 +
 src/backend/access/transam/xact.c             |   7 +
 src/backend/access/transam/xlog.c             |  17 +
 src/backend/catalog/storage.c                 | 545 +++++++++++++++++-
 src/backend/commands/tablecmds.c              | 266 +++++++--
 src/backend/replication/basebackup.c          |   3 +-
 src/backend/storage/buffer/bufmgr.c           |  88 +++
 src/backend/storage/file/fd.c                 |   4 +-
 src/backend/storage/file/reinit.c             | 344 +++++++----
 src/backend/storage/smgr/md.c                 |  94 ++-
 src/backend/storage/smgr/smgr.c               |  32 +
 src/backend/storage/sync/sync.c               |  20 +-
 src/bin/pg_rewind/parsexlog.c                 |  24 +
 src/common/relpath.c                          |  47 +-
 src/include/catalog/storage.h                 |   3 +
 src/include/catalog/storage_xlog.h            |  42 +-
 src/include/common/relpath.h                  |   9 +-
 src/include/storage/bufmgr.h                  |   2 +
 src/include/storage/fd.h                      |   1 +
 src/include/storage/md.h                      |   8 +-
 src/include/storage/reinit.h                  |  10 +-
 src/include/storage/smgr.h                    |  17 +
 src/test/recovery/t/027_persistence_change.pl | 263 +++++++++
 24 files changed, 1724 insertions(+), 182 deletions(-)
 create mode 100644 src/test/recovery/t/027_persistence_change.pl

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index 7755553d57..d251f22207 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,49 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
                          xlrec->blkno, xlrec->flags);
         pfree(path);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+        char       *path = relpathperm(xlrec->rnode, xlrec->forkNum);
+
+        appendStringInfoString(buf, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+        char       *path = GetRelationPath(xlrec->rnode.dbNode,
+                                           xlrec->rnode.spcNode,
+                                           xlrec->rnode.relNode,
+                                           InvalidBackendId,
+                                           xlrec->forkNum, xlrec->mark);
+        char       *action;
+
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                action = "CREATE";
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                action = "DELETE";
+                break;
+            default:
+                action = "<unknown action>";
+                break;
+        }
+
+        appendStringInfo(buf, "%s %s", action, path);
+        pfree(path);
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+        char       *path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+        appendStringInfoString(buf, path);
+        appendStringInfo(buf, " persistence %d", xlrec->persistence);
+        pfree(path);
+    }
 }
 
 const char *
@@ -55,6 +98,15 @@ smgr_identify(uint8 info)
         case XLOG_SMGR_TRUNCATE:
             id = "TRUNCATE";
             break;
+        case XLOG_SMGR_UNLINK:
+            id = "UNLINK";
+            break;
+        case XLOG_SMGR_MARK:
+            id = "MARK";
+            break;
+        case XLOG_SMGR_BUFPERSISTENCE:
+            id = "BUFPERSISTENCE";
+            break;
     }
 
     return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 1edc8180c1..b344bbe511 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -724,6 +724,14 @@ we must panic and abort recovery.  The DBA will have to manually clean up
 then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
+The Smgr MARK files
+--------------------------------
+
+An smgr mark file is created when a new relation file is created to
+mark the relfilenode needs to be cleaned up at recovery time.  In
+contrast to the four actions above, failure to remove smgr mark files
+will lead to data loss, in which case the server will shut down.
+
 
 Skipping WAL for New RelFileNode
 --------------------------------
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e7b0bc804d..b41186d6d8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2197,6 +2197,9 @@ CommitTransaction(void)
      */
     smgrDoPendingSyncs(true, is_parallel_worker);
 
+    /* Likewise delete mark files for files created during this transaction. */
+    smgrDoPendingCleanups(true);
+
     /* close large objects before lower-level cleanup */
     AtEOXact_LargeObject(true);
 
@@ -2447,6 +2450,9 @@ PrepareTransaction(void)
      */
     smgrDoPendingSyncs(true, false);
 
+    /* Likewise delete mark files for files created during this transaction. */
+    smgrDoPendingCleanups(true);
+
     /* close large objects before lower-level cleanup */
     AtEOXact_LargeObject(true);
 
@@ -2772,6 +2778,7 @@ AbortTransaction(void)
     AfterTriggerEndXact(false); /* 'false' means it's abort */
     AtAbort_Portals();
     smgrDoPendingSyncs(false, is_parallel_worker);
+    smgrDoPendingCleanups(false);
     AtEOXact_LargeObject(false);
     AtAbort_Notify();
     AtEOXact_RelationMap(false, is_parallel_worker);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 87cd05c945..243860fcb1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -40,6 +40,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
+#include "catalog/storage.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
 #include "common/controldata_utils.h"
@@ -4564,6 +4565,14 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
             {
                 ereport(DEBUG1,
                         (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+                /* cleanup garbage files left during crash recovery */
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+                /* run rollback cleanup if any */
+                smgrDoPendingDeletes(false);
+
                 InArchiveRecovery = true;
                 if (StandbyModeRequested)
                     StandbyMode = true;
@@ -7824,6 +7833,14 @@ StartupXLOG(void)
                 }
             }
 
+            /* cleanup garbage files left during crash recovery */
+            if (!InArchiveRecovery)
+                ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+                                       UNLOGGED_RELATION_CLEANUP);
+
+            /* run rollback cleanup if any */
+            smgrDoPendingDeletes(false);
+
             /* Allow resource managers to do any required cleanup. */
             for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
             {
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index c5ad28d71f..d6b30387e9 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -66,6 +67,23 @@ typedef struct PendingRelDelete
     struct PendingRelDelete *next;    /* linked-list link */
 } PendingRelDelete;
 
+#define    PCOP_UNLINK_FORK        (1 << 0)
+#define    PCOP_UNLINK_MARK        (1 << 1)
+#define    PCOP_SET_PERSISTENCE    (1 << 2)
+
+typedef struct PendingCleanup
+{
+    RelFileNode relnode;        /* relation that may need to be deleted */
+    int            op;                /* operation mask */
+    bool        bufpersistence;    /* buffer persistence to set */
+    int            unlink_forknum;    /* forknum to unlink */
+    StorageMarks unlink_mark;    /* mark to unlink */
+    BackendId    backend;        /* InvalidBackendId if not a temp rel */
+    bool        atCommit;        /* T=delete at commit; F=delete at abort */
+    int            nestLevel;        /* xact nesting level of request */
+    struct PendingCleanup *next;    /* linked-list link */
+} PendingCleanup;
+
 typedef struct PendingRelSync
 {
     RelFileNode rnode;
@@ -73,6 +91,7 @@ typedef struct PendingRelSync
 } PendingRelSync;
 
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+static PendingCleanup   *pendingCleanups = NULL; /* head of linked list */
 HTAB       *pendingSyncHash = NULL;
 
 
@@ -117,7 +136,8 @@ AddPendingSync(const RelFileNode *rnode)
 SMgrRelation
 RelationCreateStorage(RelFileNode rnode, char relpersistence)
 {
-    PendingRelDelete *pending;
+    PendingRelDelete *pendingdel;
+    PendingCleanup     *pendingclean;
     SMgrRelation srel;
     BackendId    backend;
     bool        needs_wal;
@@ -143,21 +163,41 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
             return NULL;        /* placate compiler */
     }
 
+    /*
+     * We are going to create a new storage file. If server crashes before the
+     * current transaction ends the file needs to be cleaned up. The
+     * SMGR_MARK_UNCOMMITED mark file prompts that work at the next startup.
+     */
     srel = smgropen(rnode, backend);
+    log_smgrcreatemark(&rnode, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
     smgrcreate(srel, MAIN_FORKNUM, false);
 
     if (needs_wal)
         log_smgrcreate(&srel->smgr_rnode.node, MAIN_FORKNUM);
 
     /* Add the relation to the list of stuff to delete at abort */
-    pending = (PendingRelDelete *)
+    pendingdel = (PendingRelDelete *)
         MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-    pending->relnode = rnode;
-    pending->backend = backend;
-    pending->atCommit = false;    /* delete if abort */
-    pending->nestLevel = GetCurrentTransactionNestLevel();
-    pending->next = pendingDeletes;
-    pendingDeletes = pending;
+    pendingdel->relnode = rnode;
+    pendingdel->backend = backend;
+    pendingdel->atCommit = false;    /* delete if abort */
+    pendingdel->nestLevel = GetCurrentTransactionNestLevel();
+    pendingdel->next = pendingDeletes;
+    pendingDeletes = pendingdel;
+
+    /* drop mark files at commit */
+    pendingclean = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pendingclean->relnode = rnode;
+    pendingclean->op = PCOP_UNLINK_MARK;
+    pendingclean->unlink_forknum = MAIN_FORKNUM;
+    pendingclean->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pendingclean->backend = backend;
+    pendingclean->atCommit = true;
+    pendingclean->nestLevel = GetCurrentTransactionNestLevel();
+    pendingclean->next = pendingCleanups;
+    pendingCleanups = pendingclean;
 
     if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
     {
@@ -168,6 +208,203 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
     return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *        Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingCleanup *pending;
+    PendingCleanup *prev;
+    PendingCleanup *next;
+    SMgrRelation srel;
+    bool              create = true;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+    /*
+     * If we have entries for init-fork operations on this relation, that means
+     * that we have already registered pending delete entries to drop an
+     * init-fork preexisting since before the current transaction started. This
+     * function reverts that change just by removing the entries.
+     */
+    prev = NULL;
+    for (pending = pendingCleanups; pending != NULL; pending = next)
+    {
+        next = pending->next;
+
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->unlink_forknum == INIT_FORKNUM)
+        {
+            if (prev)
+                prev->next = next;
+            else
+                pendingCleanups = next;
+
+            pfree(pending);
+            /* prev does not change */
+
+            create = false;
+        }
+        else
+            prev = pending;
+    }
+
+    if (!create)
+        return;
+
+    /*
+     * We are going to create an init fork. If server crashes before the
+     * current transaction ends the init fork left alone corrupts data while
+     * recovery.  The mark file works as the sentinel to identify that
+     * situation.
+     */
+    srel = smgropen(rnode, InvalidBackendId);
+    log_smgrcreatemark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+    smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+    /* We don't have existing init fork, create it. */
+    smgrcreate(srel, INIT_FORKNUM, false);
+
+    /*
+     * index-init fork needs further initialization. ambuildempty shoud do
+     * WAL-log and file sync by itself but otherwise we do that by ourselves.
+     */
+    if (rel->rd_rel->relkind == RELKIND_INDEX)
+        rel->rd_indam->ambuildempty(rel);
+    else
+    {
+        log_smgrcreate(&rnode, INIT_FORKNUM);
+        smgrimmedsync(srel, INIT_FORKNUM);
+    }
+
+    /* drop the init fork, mark file and revert persistence at abort */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_UNLINK_FORK | PCOP_UNLINK_MARK | PCOP_SET_PERSISTENCE;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->bufpersistence = true;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+
+    /* drop mark file at commit */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_UNLINK_MARK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *        Delete physical storage for the init fork of a relation.
+ *
+ * Register pending-delete of the init fork. The real deletion is performed by
+ * smgrDoPendingDeletes at commit.
+ *
+ * This function is transactional. If the transaction aborts later on, the
+ * deletion doesn't happen.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+    RelFileNode rnode = rel->rd_node;
+    PendingCleanup *pending;
+    PendingCleanup *prev;
+    PendingCleanup *next;
+    bool            inxact_created = false;
+
+    /* switch buffer persistence */
+    SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+    /*
+     * If we have entries for init-fork operations of this relation, that means
+     * that we have created the init fork in the current transaction.  We
+     * remove the init fork and mark file immediately in that case.  Otherwise
+     * just register pending-delete for the existing init fork.
+     */
+    prev = NULL;
+    for (pending = pendingCleanups; pending != NULL; pending = next)
+    {
+        next = pending->next;
+
+        if (RelFileNodeEquals(rnode, pending->relnode) &&
+            pending->unlink_forknum != INIT_FORKNUM)
+        {
+            /* unlink list entry */
+            if (prev)
+                prev->next = next;
+            else
+                pendingCleanups = next;
+
+            pfree(pending);
+            /* prev does not change */
+
+            inxact_created = true;
+        }
+        else
+            prev = pending;
+    }
+
+    if (inxact_created)
+    {
+        SMgrRelation srel = smgropen(rnode, InvalidBackendId);
+
+        /*
+         * INIT forks never be loaded to shared buffer so no point in dropping
+         * buffers for such files.
+         */
+        log_smgrunlinkmark(&rnode, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+        smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+        log_smgrunlink(&rnode, INIT_FORKNUM);
+        smgrunlink(srel, INIT_FORKNUM, false);
+        return;
+    }
+
+    /* register drop of this init fork file at commit */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_UNLINK_FORK;
+    pending->unlink_forknum = INIT_FORKNUM;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = true;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+
+    /* revert buffer-persistence changes at abort */
+    pending = (PendingCleanup *)
+        MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+    pending->relnode = rnode;
+    pending->op = PCOP_SET_PERSISTENCE;
+    pending->bufpersistence = false;
+    pending->backend = InvalidBackendId;
+    pending->atCommit = false;
+    pending->nestLevel = GetCurrentTransactionNestLevel();
+    pending->next = pendingCleanups;
+    pendingCleanups = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -187,6 +424,88 @@ log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum)
     XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum)
+{
+    xl_smgr_unlink xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file unlink.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                   StorageMarks mark)
+{
+    xl_smgr_mark xlrec;
+
+    /*
+     * Make an XLOG entry reporting the file creation.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.forkNum = forkNum;
+    xlrec.mark = mark;
+    xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileNode *rnode, bool persistence)
+{
+    xl_smgr_bufpersistence xlrec;
+
+    /*
+     * Make an XLOG entry reporting the change of buffer persistence.
+     */
+    xlrec.rnode = *rnode;
+    xlrec.persistence = persistence;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+    XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *        Schedule unlinking of physical storage at transaction commit.
@@ -255,6 +574,7 @@ RelationPreserveStorage(RelFileNode rnode, bool atCommit)
                 prev->next = next;
             else
                 pendingDeletes = next;
+
             pfree(pending);
             /* prev does not change */
         }
@@ -673,6 +993,88 @@ smgrDoPendingDeletes(bool isCommit)
     }
 }
 
+/*
+ *    smgrDoPendingUnmark() -- Clean up work that emits WAL records
+ *
+ *  The operations handled in the function emits WAL records, which must be
+ *  emitted before the commit record for the current transaction.
+ */
+void
+smgrDoPendingCleanups(bool isCommit)
+{
+    int            nestLevel = GetCurrentTransactionNestLevel();
+    PendingCleanup *pending;
+    PendingCleanup *prev;
+    PendingCleanup *next;
+
+    prev = NULL;
+    for (pending = pendingCleanups; pending != NULL; pending = next)
+    {
+        next = pending->next;
+        if (pending->nestLevel < nestLevel)
+        {
+            /* outer-level entries should not be processed yet */
+            prev = pending;
+        }
+        else
+        {
+            /* unlink list entry first, so we don't retry on failure */
+            if (prev)
+                prev->next = next;
+            else
+                pendingCleanups = next;
+
+            /* do cleanup if called for */
+            if (pending->atCommit == isCommit)
+            {
+                SMgrRelation srel;
+
+                srel = smgropen(pending->relnode, pending->backend);
+
+                Assert ((pending->op &
+                         ~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK |
+                           PCOP_SET_PERSISTENCE)) == 0);
+
+                if (pending->op & PCOP_UNLINK_FORK)
+                {
+                    /* other forks needs to drop buffers */
+                    Assert(pending->unlink_forknum == INIT_FORKNUM);
+
+                    /* Don't emit wal while recovery. */
+                    if (!InRecovery)
+                        log_smgrunlink(&pending->relnode,
+                                       pending->unlink_forknum);
+                    smgrunlink(srel, pending->unlink_forknum, false);
+                }
+
+                if (pending->op & PCOP_UNLINK_MARK)
+                {
+                    SMgrRelation srel;
+
+                    if (!InRecovery)
+                        log_smgrunlinkmark(&pending->relnode,
+                                           pending->unlink_forknum,
+                                           pending->unlink_mark);
+                    srel = smgropen(pending->relnode, pending->backend);
+                    smgrunlinkmark(srel, pending->unlink_forknum,
+                                   pending->unlink_mark, InRecovery);
+                    smgrclose(srel);
+                }
+
+                if (pending->op & PCOP_SET_PERSISTENCE)
+                {
+                    SetRelationBuffersPersistence(srel, pending->bufpersistence,
+                                                  InRecovery);
+                }
+            }
+
+            /* must explicitly free the list entry */
+            pfree(pending);
+            /* prev does not change */
+        }
+    }
+}
+
 /*
  *    smgrDoPendingSyncs() -- Take care of relation syncs at end of xact.
  */
@@ -933,6 +1335,15 @@ smgr_redo(XLogReaderState *record)
         reln = smgropen(xlrec->rnode, InvalidBackendId);
         smgrcreate(reln, xlrec->forkNum, true);
     }
+    else if (info == XLOG_SMGR_UNLINK)
+    {
+        xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+        SMgrRelation reln;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        smgrunlink(reln, xlrec->forkNum, true);
+        smgrclose(reln);
+    }
     else if (info == XLOG_SMGR_TRUNCATE)
     {
         xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1021,6 +1432,124 @@ smgr_redo(XLogReaderState *record)
 
         FreeFakeRelcacheEntry(rel);
     }
+    else if (info == XLOG_SMGR_MARK)
+    {
+        xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingCleanup *pending;
+        bool        created = false;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+
+        switch (xlrec->action)
+        {
+            case XLOG_SMGR_MARK_CREATE:
+                smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+                created = true;
+                break;
+            case XLOG_SMGR_MARK_UNLINK:
+                smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+                break;
+            default:
+                elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+        }
+
+        if (created)
+        {
+            /* revert mark file operation at abort */
+            pending = (PendingCleanup *)
+                MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+            pending->relnode = xlrec->rnode;
+            pending->op = PCOP_UNLINK_MARK;
+            pending->unlink_forknum = xlrec->forkNum;
+            pending->unlink_mark = xlrec->mark;
+            pending->backend = InvalidBackendId;
+            pending->atCommit = false;
+            pending->nestLevel = GetCurrentTransactionNestLevel();
+            pending->next = pendingCleanups;
+            pendingCleanups = pending;
+        }
+        else
+        {
+            /*
+             * Delete pending action for this mark file if any. We should have
+             * at most one entry for this action.
+             */
+            PendingCleanup *prev = NULL;
+
+            for (pending = pendingCleanups; pending != NULL;
+                 pending = pending->next)
+            {
+                if (RelFileNodeEquals(xlrec->rnode, pending->relnode) &&
+                    pending->unlink_forknum == xlrec->forkNum &&
+                    (pending->op & PCOP_UNLINK_MARK) != 0)
+                {
+                    if (prev)
+                        prev->next = pending->next;
+                    else
+                        pendingCleanups = pending->next;
+
+                    pfree(pending);
+                    break;
+                }
+
+                prev = pending;
+            }
+        }
+    }
+    else if (info == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        xl_smgr_bufpersistence *xlrec =
+            (xl_smgr_bufpersistence *) XLogRecGetData(record);
+        SMgrRelation reln;
+        PendingCleanup *pending;
+        PendingCleanup *prev = NULL;
+
+        reln = smgropen(xlrec->rnode, InvalidBackendId);
+        SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+        /*
+         * Delete pending action for persistence change if any. We should have
+         * at most one entry for this action.
+         */
+        for (pending = pendingCleanups; pending != NULL;
+             pending = pending->next)
+        {
+            if (RelFileNodeEquals(xlrec->rnode, pending->relnode) &&
+                (pending->op & PCOP_SET_PERSISTENCE) != 0)
+            {
+                Assert (pending->bufpersistence == xlrec->persistence);
+
+                if (prev)
+                    prev->next = pending->next;
+                else
+                    pendingCleanups = pending->next;
+
+                pfree(pending);
+                break;
+            }
+
+            prev = pending;
+        }
+
+        /*
+         * Revert buffer-persistence changes at abort if the relation is going
+         * to different persistence from before this transaction.
+         */
+        if (!pending)
+        {
+            pending = (PendingCleanup *)
+                MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+            pending->relnode = xlrec->rnode;
+            pending->op = PCOP_SET_PERSISTENCE;
+            pending->bufpersistence = !xlrec->persistence;
+            pending->backend = InvalidBackendId;
+            pending->atCommit = false;
+            pending->nestLevel = GetCurrentTransactionNestLevel();
+            pending->next = pendingCleanups;
+            pendingCleanups = pending;
+        }
+    }
     else
         elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 89bc865e28..51fcf9ca5f 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -52,6 +52,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5346,6 +5347,187 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
     return newcmd;
 }
 
+/*
+ * RelationChangePersistence: do in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+                          LOCKMODE lockmode)
+{
+    Relation     rel;
+    Relation    classRel;
+    HeapTuple    tuple,
+                newtuple;
+    Datum        new_val[Natts_pg_class];
+    bool        new_null[Natts_pg_class],
+                new_repl[Natts_pg_class];
+    int            i;
+    List       *relids;
+    ListCell   *lc_oid;
+
+    Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+    Assert(lockmode == AccessExclusiveLock);
+
+    /*
+     * Under the following condition, we need to call ATRewriteTable, which
+     * cannot be false in the AT_REWRITE_ALTER_PERSISTENCE case.
+     */
+    Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+           tab->newvals == NULL && !tab->verify_new_notnull);
+
+    rel = table_open(tab->relid, lockmode);
+
+    Assert(rel->rd_rel->relpersistence != persistence);
+
+    elog(DEBUG1, "perform in-place persistnce change");
+
+    /*
+     * First we collect all relations that we need to change persistence.
+     */
+
+    /* Collect OIDs of indexes and toast relations */
+    relids = RelationGetIndexList(rel);
+    relids = lcons_oid(rel->rd_id, relids);
+
+    /* Add toast relation if any */
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        List    *toastidx;
+        Relation toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+        relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+        toastidx = RelationGetIndexList(toastrel);
+        relids = list_concat(relids, toastidx);
+        pfree(toastidx);
+        table_close(toastrel, NoLock);
+    }
+
+    table_close(rel, NoLock);
+
+    /* Make changes in storage */
+    classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+    foreach (lc_oid, relids)
+    {
+        Oid reloid = lfirst_oid(lc_oid);
+        Relation r = relation_open(reloid, lockmode);
+
+        /*
+         * XXXX: Some access methods do not bear up an in-place persistence
+         * change. Specifically, GiST uses page LSNs to figure out whether a
+         * block has changed, where UNLOGGED GiST indexes use fake LSNs that
+         * are incompatible with real LSNs used for LOGGED ones.
+         *
+         * Maybe if gistGetFakeLSN behaved the same way for permanent and
+         * unlogged indexes, we could skip index rebuild in exchange of some
+         * extra WAL records emitted while it is unlogged.
+         *
+         * Check relam against a positive list so that we take this way for
+         * unknown AMs.
+         */
+        if (r->rd_rel->relkind == RELKIND_INDEX &&
+            /* GiST is excluded */
+            r->rd_rel->relam != BTREE_AM_OID &&
+            r->rd_rel->relam != HASH_AM_OID &&
+            r->rd_rel->relam != GIN_AM_OID &&
+            r->rd_rel->relam != SPGIST_AM_OID &&
+            r->rd_rel->relam != BRIN_AM_OID)
+        {
+            int            reindex_flags;
+            ReindexParams params = {0};
+
+            /* reindex doesn't allow concurrent use of the index */
+            table_close(r, NoLock);
+
+            reindex_flags =
+                REINDEX_REL_SUPPRESS_INDEX_USE |
+                REINDEX_REL_CHECK_CONSTRAINTS;
+
+            /* Set the same persistence with the parent relation. */
+            if (persistence == RELPERSISTENCE_UNLOGGED)
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+            else
+                reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+            reindex_index(reloid, reindex_flags, persistence, ¶ms);
+
+            continue;
+        }
+
+        /* Create or drop init fork */
+        if (persistence == RELPERSISTENCE_UNLOGGED)
+            RelationCreateInitFork(r);
+        else
+            RelationDropInitFork(r);
+
+        /*
+         * When this relation gets WAL-logged, immediately sync all files but
+         * initfork to establish the initial state on storage.  Buffers have
+         * already flushed out by RelationCreate(Drop)InitFork called just
+         * above. Initfork should have been synced as needed.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT)
+        {
+            for (i = 0 ; i < INIT_FORKNUM ; i++)
+            {
+                if (smgrexists(RelationGetSmgr(r), i))
+                    smgrimmedsync(RelationGetSmgr(r), i);
+            }
+        }
+
+        /* Update catalog */
+        tuple = SearchSysCacheCopy1(RELOID,    ObjectIdGetDatum(reloid));
+        if (!HeapTupleIsValid(tuple))
+            elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+        memset(new_val, 0, sizeof(new_val));
+        memset(new_null, false, sizeof(new_null));
+        memset(new_repl, false, sizeof(new_repl));
+
+        new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+        new_null[Anum_pg_class_relpersistence - 1] = false;
+        new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+        newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+                                     new_val, new_null, new_repl);
+
+        CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+        heap_freetuple(newtuple);
+
+        /*
+         * While wal_level >= replica, switching to LOGGED requires the
+         * relation content to be WAL-logged to recover the table.
+         * We don't emit this fhile wal_level = minimal.
+         */
+        if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+        {
+            ForkNumber fork;
+            xl_smgr_truncate xlrec;
+
+            xlrec.blkno = 0;
+            xlrec.rnode = r->rd_node;
+            xlrec.flags = SMGR_TRUNCATE_ALL;
+
+            XLogBeginInsert();
+            XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+            XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+            for (fork = 0; fork < INIT_FORKNUM ; fork++)
+            {
+                if (smgrexists(RelationGetSmgr(r), fork))
+                    log_newpage_range(r, fork, 0,
+                                      smgrnblocks(RelationGetSmgr(r), fork),
+                                      false);
+            }
+        }
+
+        table_close(r, NoLock);
+    }
+
+    table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5476,47 +5658,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
                                          tab->relid,
                                          tab->rewrite);
 
-            /*
-             * Create transient table that will receive the modified data.
-             *
-             * Ensure it is marked correctly as logged or unlogged.  We have
-             * to do this here so that buffers for the new relfilenode will
-             * have the right persistence set, and at the same time ensure
-             * that the original filenode's buffers will get read in with the
-             * correct setting (i.e. the original one).  Otherwise a rollback
-             * after the rewrite would possibly result with buffers for the
-             * original filenode having the wrong persistence setting.
-             *
-             * NB: This relies on swap_relation_files() also swapping the
-             * persistence. That wouldn't work for pg_class, but that can't be
-             * unlogged anyway.
-             */
-            OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-                                       persistence, lockmode);
+            if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+                RelationChangePersistence(tab, persistence, lockmode);
+            else
+            {
+                /*
+                 * Create transient table that will receive the modified data.
+                 *
+                 * Ensure it is marked correctly as logged or unlogged.  We
+                 * have to do this here so that buffers for the new relfilenode
+                 * will have the right persistence set, and at the same time
+                 * ensure that the original filenode's buffers will get read in
+                 * with the correct setting (i.e. the original one).  Otherwise
+                 * a rollback after the rewrite would possibly result with
+                 * buffers for the original filenode having the wrong
+                 * persistence setting.
+                 *
+                 * NB: This relies on swap_relation_files() also swapping the
+                 * persistence. That wouldn't work for pg_class, but that can't
+                 * be unlogged anyway.
+                 */
+                OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+                                           NewAccessMethod,
+                                           persistence, lockmode);
 
-            /*
-             * Copy the heap data into the new table with the desired
-             * modifications, and test the current data within the table
-             * against new constraints generated by ALTER TABLE commands.
-             */
-            ATRewriteTable(tab, OIDNewHeap, lockmode);
+                /*
+                 * Copy the heap data into the new table with the desired
+                 * modifications, and test the current data within the table
+                 * against new constraints generated by ALTER TABLE commands.
+                 */
+                ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-            /*
-             * Swap the physical files of the old and new heaps, then rebuild
-             * indexes and discard the old heap.  We can use RecentXmin for
-             * the table's new relfrozenxid because we rewrote all the tuples
-             * in ATRewriteTable, so no older Xid remains in the table.  Also,
-             * we never try to swap toast tables by content, since we have no
-             * interest in letting this code work on system catalogs.
-             */
-            finish_heap_swap(tab->relid, OIDNewHeap,
-                             false, false, true,
-                             !OidIsValid(tab->newTableSpace),
-                             RecentXmin,
-                             ReadNextMultiXactId(),
-                             persistence);
+                /*
+                 * Swap the physical files of the old and new heaps, then
+                 * rebuild indexes and discard the old heap.  We can use
+                 * RecentXmin for the table's new relfrozenxid because we
+                 * rewrote all the tuples in ATRewriteTable, so no older Xid
+                 * remains in the table.  Also, we never try to swap toast
+                 * tables by content, since we have no interest in letting this
+                 * code work on system catalogs.
+                 */
+                finish_heap_swap(tab->relid, OIDNewHeap,
+                                 false, false, true,
+                                 !OidIsValid(tab->newTableSpace),
+                                 RecentXmin,
+                                 ReadNextMultiXactId(),
+                                 persistence);
 
-            InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+                InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+            }
         }
         else
         {
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index ec0485705d..45e1a5d817 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1070,6 +1070,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
         bool        excludeFound;
         ForkNumber    relForkNum; /* Type of fork if file is a relation */
         int            relOidChars;    /* Chars in filename that are the rel oid */
+        StorageMarks mark;
 
         /* Skip special stuff */
         if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1120,7 +1121,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
         /* Exclude all forks for unlogged tables except the init fork */
         if (isDbDir &&
             parse_filename_for_nontemp_relation(de->d_name, &relOidChars,
-                                                &relForkNum))
+                                                &relForkNum, &mark))
         {
             /* Never exclude init forks */
             if (relForkNum != INIT_FORKNUM)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index b4532948d3..dab74bf99a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "executor/instrument.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
@@ -3154,6 +3155,93 @@ DropRelFileNodeBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
     }
 }
 
+/* ---------------------------------------------------------------------
+ *        SetRelFileNodeBuffersPersistence
+ *
+ *        This function changes the persistence of all buffer pages of a relation
+ *        then writes all dirty pages of the relation out to disk when switching
+ *        to PERMANENT. (or more accurately, out to kernel disk buffers),
+ *        ensuring that the kernel has an up-to-date view of the relation.
+ *
+ *        Generally, the caller should be holding AccessExclusiveLock on the
+ *        target relation to ensure that no other backend is busy dirtying
+ *        more blocks of the relation; the effects can't be expected to last
+ *        after the lock is released.
+ *
+ *        XXX currently it sequentially searches the buffer pool, should be
+ *        changed to more clever ways of searching.  This routine is not
+ *        used in any performance-critical code paths, so it's not worth
+ *        adding additional overhead to normal paths to make it go faster;
+ *        but see also DropRelFileNodeBuffers.
+ * --------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+    int            i;
+    RelFileNodeBackend rnode = srel->smgr_rnode;
+
+    Assert (!RelFileNodeBackendIsTemp(rnode));
+
+    if (!isRedo)
+        log_smgrbufpersistence(&srel->smgr_rnode.node, permanent);
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+    for (i = 0; i < NBuffers; i++)
+    {
+        BufferDesc *bufHdr = GetBufferDescriptor(i);
+        uint32        buf_state;
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+            continue;
+
+        ReservePrivateRefCountEntry();
+
+        buf_state = LockBufHdr(bufHdr);
+
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
+        {
+            UnlockBufHdr(bufHdr, buf_state);
+            continue;
+        }
+
+        if (permanent)
+        {
+            /* Init fork is being dropped, drop buffers for it. */
+            if (bufHdr->tag.forkNum == INIT_FORKNUM)
+            {
+                InvalidateBuffer(bufHdr);
+                continue;
+            }
+
+            buf_state |= BM_PERMANENT;
+            pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+            /* we flush this buffer when switching to PERMANENT */
+            if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+            {
+                PinBuffer_Locked(bufHdr);
+                LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+                              LW_SHARED);
+                FlushBuffer(bufHdr, srel);
+                LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+                UnpinBuffer(bufHdr, true);
+            }
+            else
+                UnlockBufHdr(bufHdr, buf_state);
+        }
+        else
+        {
+            /* init fork is always BM_PERMANENT. See BufferAlloc */
+            if (bufHdr->tag.forkNum != INIT_FORKNUM)
+                buf_state &= ~BM_PERMANENT;
+
+            UnlockBufHdr(bufHdr, buf_state);
+        }
+    }
+}
+
 /* ---------------------------------------------------------------------
  *        DropRelFileNodesAllBuffers
  *
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 263057841d..8487ae1f02 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -349,8 +349,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int    fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3759,7 +3757,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
     char        parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 0ae3fb6902..0137902bb2 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,49 @@
 
 #include <unistd.h>
 
+#include "access/xlog.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
 #include "postmaster/startup.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-                                                  int op);
+                                                  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-                                               int op);
+                                               Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
     Oid            reloid;            /* hash key */
-} unlogged_relation_entry;
+    bool        has_init;        /* has INIT fork */
+    bool        dirty_init;        /* needs to remove INIT fork */
+    bool        dirty_all;        /* needs to remove all forks */
+} relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -72,7 +92,7 @@ ResetUnloggedRelations(int op)
     /*
      * First process unlogged files in pg_default ($PGDATA/base)
      */
-    ResetUnloggedRelationsInTablespaceDir("base", op);
+    ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
     /*
      * Cycle through directories for all non-default tablespaces.
@@ -81,13 +101,19 @@ ResetUnloggedRelations(int op)
 
     while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
     {
+        Oid tspid;
+
         if (strcmp(spc_de->d_name, ".") == 0 ||
             strcmp(spc_de->d_name, "..") == 0)
             continue;
 
         snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
                  spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-        ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+        tspid = atooid(spc_de->d_name);
+
+        Assert(tspid != 0);
+        ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
     }
 
     FreeDir(spc_dir);
@@ -103,7 +129,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+                                      Oid tspid, int op)
 {
     DIR           *ts_dir;
     struct dirent *de;
@@ -130,6 +157,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
     while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
     {
+        Oid dbid;
+
         /*
          * We're only interested in the per-database directories, which have
          * numeric names.  Note that this code will also (properly) ignore "."
@@ -148,7 +177,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
             ereport_startup_progress("resetting unlogged relations (cleanup), elapsed time: %ld.%02d s, current path:
%s",
                                      dbspace_path);
 
-        ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+        dbid = atooid(de->d_name);
+        Assert(dbid != 0);
+
+        ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
     }
 
     FreeDir(ts_dir);
@@ -158,125 +190,227 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+                                   Oid tspid, Oid dbid, int op)
 {
     DIR           *dbspace_dir;
     struct dirent *de;
     char        rm_path[MAXPGPATH * 2];
+    HTAB       *hash;
+    HASHCTL        ctl;
 
     /* Caller must specify at least one operation. */
-    Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+    Assert((op & (UNLOGGED_RELATION_CLEANUP |
+                  UNLOGGED_RELATION_DROP_BUFFER |
+                  UNLOGGED_RELATION_INIT)) != 0);
 
     /*
      * Cleanup is a two-pass operation.  First, we go through and identify all
      * the files with init forks.  Then, we go through again and nuke
      * everything with the same OID except the init fork.
      */
+
+    /*
+     * It's possible that someone could create tons of unlogged relations in
+     * the same database & tablespace, so we'd better use a hash table rather
+     * than an array or linked list to keep track of which files need to be
+     * reset.  Otherwise, this cleanup operation would be O(n^2).
+     */
+    memset(&ctl, 0, sizeof(ctl));
+    ctl.keysize = sizeof(Oid);
+    ctl.entrysize = sizeof(relfile_entry);
+    hash = hash_create("relfilenode cleanup hash",
+                       32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+    /* Collect INIT fork and mark files in the directory. */
+    dbspace_dir = AllocateDir(dbspacedirname);
+    while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+    {
+        int            oidchars;
+        ForkNumber    forkNum;
+        StorageMarks mark;
+
+        /* Skip anything that doesn't look like a relation data file. */
+        if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
+                                                 &forkNum, &mark))
+            continue;
+
+        if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
+        {
+            Oid                key;
+            relfile_entry  *ent;
+            bool            found;
+
+            /*
+             * Record the relfilenode information. If it has
+             * SMGR_MARK_UNCOMMITTED mark files, the relfilenode is in dirty
+             * state, where clean up is needed.
+             */
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+            if (!found)
+            {
+                ent->has_init = false;
+                ent->dirty_init = false;
+                ent->dirty_all = false;
+            }
+
+            if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_init = true;
+            else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+                ent->dirty_all = true;
+            else
+            {
+                Assert(forkNum == INIT_FORKNUM);
+                ent->has_init = true;
+            }
+        }
+    }
+
+    /* Done with the first pass. */
+    FreeDir(dbspace_dir);
+
+    /* nothing to do if we don't have init nor cleanup forks */
+    if (hash_get_num_entries(hash) < 1)
+    {
+        hash_destroy(hash);
+        return;
+    }
+
+    if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+    {
+        /*
+         * When we come here after recovery, smgr object for this file might
+         * have been created. In that case we need to drop all buffers then the
+         * smgr object before initializing the unlogged relation.  This is safe
+         * as far as no other backends have accessed the relation before
+         * starting archive recovery.
+         */
+        HASH_SEQ_STATUS status;
+        relfile_entry *ent;
+        SMgrRelation   *srels = palloc(sizeof(SMgrRelation) * 8);
+        int               maxrels = 8;
+        int               nrels = 0;
+        int i;
+
+        Assert(!HotStandbyActive());
+
+        hash_seq_init(&status, hash);
+        while((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+        {
+            RelFileNodeBackend rel;
+
+            /*
+             * The relation is persistent and stays remain persistent. Don't
+             * drop the buffers for this relation.
+             */
+            if (ent->has_init && ent->dirty_init)
+                continue;
+
+            if (maxrels <= nrels)
+            {
+                maxrels *= 2;
+                srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+            }
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = ent->reloid;
+
+            srels[nrels++] = smgropen(rel.node, InvalidBackendId);
+        }
+
+        DropRelFileNodesAllBuffers(srels, nrels);
+
+        for (i = 0 ; i < nrels ; i++)
+            smgrclose(srels[i]);
+    }
+
+    /*
+     * Now, make a second pass and remove anything that matches.
+     */
     if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
     {
-        HTAB       *hash;
-        HASHCTL        ctl;
-
-        /*
-         * It's possible that someone could create a ton of unlogged relations
-         * in the same database & tablespace, so we'd better use a hash table
-         * rather than an array or linked list to keep track of which files
-         * need to be reset.  Otherwise, this cleanup operation would be
-         * O(n^2).
-         */
-        ctl.keysize = sizeof(Oid);
-        ctl.entrysize = sizeof(unlogged_relation_entry);
-        ctl.hcxt = CurrentMemoryContext;
-        hash = hash_create("unlogged relation OIDs", 32, &ctl,
-                           HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-        /* Scan the directory. */
         dbspace_dir = AllocateDir(dbspacedirname);
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
+            ForkNumber        forkNum;
+            StorageMarks    mark;
+            int                oidchars;
+            Oid                key;
+            relfile_entry  *ent;
+            RelFileNodeBackend rel;
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* Also skip it unless this is the init fork. */
-            if (forkNum != INIT_FORKNUM)
-                continue;
-
-            /*
-             * Put the OID portion of the name into the hash table, if it
-             * isn't already.
-             */
-            ent.reloid = atooid(de->d_name);
-            (void) hash_search(hash, &ent, HASH_ENTER, NULL);
-        }
-
-        /* Done with the first pass. */
-        FreeDir(dbspace_dir);
-
-        /*
-         * If we didn't find any init forks, there's no point in continuing;
-         * we can bail out now.
-         */
-        if (hash_get_num_entries(hash) == 0)
-        {
-            hash_destroy(hash);
-            return;
-        }
-
-        /*
-         * Now, make a second pass and remove anything that matches.
-         */
-        dbspace_dir = AllocateDir(dbspacedirname);
-        while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-        {
-            ForkNumber    forkNum;
-            int            oidchars;
-            unlogged_relation_entry ent;
-
-            /* Skip anything that doesn't look like a relation data file. */
-            if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
-                continue;
-
-            /* We never remove the init fork. */
-            if (forkNum == INIT_FORKNUM)
+                                                     &forkNum, &mark))
                 continue;
 
             /*
              * See whether the OID portion of the name shows up in the hash
              * table.  If so, nuke it!
              */
-            ent.reloid = atooid(de->d_name);
-            if (hash_search(hash, &ent, HASH_FIND, NULL))
+            key = atooid(de->d_name);
+            ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+            if (!ent)
+                continue;
+
+            if (!ent->dirty_all)
             {
-                snprintf(rm_path, sizeof(rm_path), "%s/%s",
-                         dbspacedirname, de->d_name);
-                if (unlink(rm_path) < 0)
-                    ereport(ERROR,
-                            (errcode_for_file_access(),
-                             errmsg("could not remove file \"%s\": %m",
-                                    rm_path)));
+                /* clean permanent relations don't need cleanup */
+                if (!ent->has_init)
+                    continue;
+
+                if (ent->dirty_init)
+                {
+                    /*
+                     * The crashed trasaction did SET UNLOGGED. This relation
+                     * is restored to a LOGGED relation.
+                     */
+                    if (forkNum != INIT_FORKNUM)
+                        continue;
+                }
                 else
-                    elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+                {
+                    /*
+                     * we don't remove the INIT fork of a non-dirty
+                     * relfilenode
+                     */
+                    if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+                        continue;
+                }
             }
+
+            /* so, nuke it! */
+            snprintf(rm_path, sizeof(rm_path), "%s/%s",
+                     dbspacedirname, de->d_name);
+            if (unlink(rm_path) < 0)
+                ereport(ERROR,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m",
+                                rm_path)));
+
+            rel.backend = InvalidBackendId;
+            rel.node.spcNode = tspid;
+            rel.node.dbNode = dbid;
+            rel.node.relNode = atooid(de->d_name);
+
+            ForgetRelationForkSyncRequests(rel, forkNum);
         }
 
         /* Cleanup is complete. */
         FreeDir(dbspace_dir);
-        hash_destroy(hash);
     }
 
+    hash_destroy(hash);
+    hash = NULL;
+
     /*
      * Initialization happens after cleanup is complete: we copy each init
-     * fork file to the corresponding main fork file.  Note that if we are
-     * asked to do both cleanup and init, we may never get here: if the
-     * cleanup code determines that there are no init forks in this dbspace,
-     * it will return before we get to this point.
+     * fork file to the corresponding main fork file.
      */
     if ((op & UNLOGGED_RELATION_INIT) != 0)
     {
@@ -285,6 +419,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        srcpath[MAXPGPATH * 2];
@@ -292,9 +427,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -328,15 +465,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
         while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
         {
             ForkNumber    forkNum;
+            StorageMarks mark;
             int            oidchars;
             char        oidbuf[OIDCHARS + 1];
             char        mainpath[MAXPGPATH];
 
             /* Skip anything that doesn't look like a relation data file. */
             if (!parse_filename_for_nontemp_relation(de->d_name, &oidchars,
-                                                     &forkNum))
+                                                     &forkNum, &mark))
                 continue;
 
+            Assert(mark == SMGR_MARK_NONE);
+
             /* Also skip it unless this is the init fork. */
             if (forkNum != INIT_FORKNUM)
                 continue;
@@ -379,7 +519,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *oidchars,
-                                    ForkNumber *fork)
+                                    ForkNumber *fork, StorageMarks *mark)
 {
     int            pos;
 
@@ -410,11 +550,19 @@ parse_filename_for_nontemp_relation(const char *name, int *oidchars,
 
         for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
             ;
-        if (segchar <= 1)
-            return false;
-        pos += segchar;
+        if (segchar > 1)
+            pos += segchar;
     }
 
+    /* mark file? */
+    if (name[pos] == '.' && name[pos + 1] != 0)
+    {
+        *mark = name[pos + 1];
+        pos += 2;
+    }
+    else
+        *mark = SMGR_MARK_NONE;
+
     /* Now we should be at the end. */
     if (name[pos] != '\0')
         return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index b4bca7eed6..1f3aac5bcc 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -139,7 +139,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
                              BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
                               MdfdVec *seg);
-
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+                         StorageMarks mark);
 
 /*
  *    mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -169,6 +170,82 @@ mdexists(SMgrRelation reln, ForkNumber forkNum)
     return (mdopenfork(reln, forkNum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path =markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    /* See mdcreate for details.. */
+    TablespaceCreateDbspace(reln->smgr_rnode.node.spcNode,
+                            reln->smgr_rnode.node.dbNode,
+                            isRedo);
+
+    fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+    if (fd < 0 && (!isRedo || errno != EEXIST))
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not crete mark file \"%s\": %m", path)));
+
+    pg_fsync(fd);
+    close(fd);
+
+    /*
+     * To guarantee that the creation of the file is persistent, fsync its
+     * parent directory.
+     */
+    fsync_parent_path(path, ERROR);
+
+    pfree(path);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+             bool isRedo)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+
+    if (!isRedo || mdmarkexists(reln, forkNum, mark))
+        durable_unlink(path, ERROR);
+
+    pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+    char   *path = markpath(reln->smgr_rnode, forkNum, mark);
+    int        fd;
+
+    fd = BasicOpenFile(path, O_RDONLY);
+    if (fd < 0 && errno != ENOENT)
+        ereport(ERROR,
+                (errcode_for_file_access(),
+                 errmsg("could not access mark file \"%s\": %m", path)));
+    pfree(path);
+
+    if (fd < 0)
+        return false;
+
+    close(fd);
+    return true;
+}
+
 /*
  *    mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1025,6 +1102,15 @@ register_forget_request(RelFileNodeBackend rnode, ForkNumber forknum,
     RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileNodeBackend rnode, ForkNumber forknum)
+{
+    register_forget_request(rnode, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1378,12 +1464,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
     char       *p;
 
     /* Compute the path. */
-    p = relpathperm(ftag->rnode, MAIN_FORKNUM);
+    p = GetRelationPath(ftag->rnode.dbNode, ftag->rnode.spcNode,
+                        ftag->rnode.relNode, InvalidBackendId, MAIN_FORKNUM,
+                        mark);
     strlcpy(path, p, MAXPGPATH);
     pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 0fcef4994b..110e64b0b2 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -62,6 +62,10 @@ typedef struct f_smgr
     void        (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
                                   BlockNumber nblocks);
     void        (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+    void        (*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
+    void        (*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+                                    StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -82,6 +86,8 @@ static const f_smgr smgrsw[] = {
         .smgr_nblocks = mdnblocks,
         .smgr_truncate = mdtruncate,
         .smgr_immedsync = mdimmedsync,
+        .smgr_createmark = mdcreatemark,
+        .smgr_unlinkmark = mdunlinkmark,
     }
 };
 
@@ -335,6 +341,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
     smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *    smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *    smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+               bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  *    smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -662,6 +688,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
     smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+    smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rnode, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index d4083e8a56..9563940d45 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -89,7 +89,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
     int            (*sync_syncfiletag) (const FileTag *ftag, char *path);
-    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+    int            (*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+                                       StorageMarks mark);
     bool        (*sync_filetagmatches) (const FileTag *ftag,
                                         const FileTag *candidate);
 } SyncOps;
@@ -222,7 +223,8 @@ SyncPostCheckpoint(void)
 
         /* Unlink the file */
         if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-                                                          path) < 0)
+                                                          path,
+                                                          SMGR_MARK_NONE) < 0)
         {
             /*
              * There's a race condition, when the database is dropped at the
@@ -236,6 +238,20 @@ SyncPostCheckpoint(void)
                         (errcode_for_file_access(),
                          errmsg("could not remove file \"%s\": %m", path)));
         }
+        else if (syncsw[entry->tag.handler].sync_unlinkfiletag(
+                     &entry->tag, path,
+                     SMGR_MARK_UNCOMMITTED) < 0)
+        {
+            /*
+             * And we may have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+             * fork files has been successfully removed. It's ok if the file
+             * does not exist.
+             */
+            if (errno != ENOENT)
+                ereport(WARNING,
+                        (errcode_for_file_access(),
+                         errmsg("could not remove file \"%s\": %m", path)));
+        }
 
         /* Mark the list entry as canceled, just in case */
         entry->canceled = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 436df54120..dbc0da5da5 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -407,6 +407,30 @@ extractPageInfo(XLogReaderState *record)
          * source system.
          */
     }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
+    else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+    {
+        /*
+         * We can safely ignore these. When we compare the sizes later on,
+         * we'll notice that they differ, and copy the missing tail from
+         * source system.
+         */
+    }
     else if (rmid == RM_XACT_ID &&
              ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
               (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 1f5c426ec0..4945b111cc 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbNode, Oid spcNode)
  */
 char *
 GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                int backendId, ForkNumber forkNumber)
+                int backendId, ForkNumber forkNumber, char mark)
 {
     char       *path;
+    char        markstr[4];
+
+    if (mark == 0)
+        markstr[0] = 0;
+    else
+        snprintf(markstr, sizeof(markstr), ".%c", mark);
 
     if (spcNode == GLOBALTABLESPACE_OID)
     {
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         Assert(dbNode == 0);
         Assert(backendId == InvalidBackendId);
         if (forkNumber != MAIN_FORKNUM)
-            path = psprintf("global/%u_%s",
-                            relNode, forkNames[forkNumber]);
+            path = psprintf("global/%u_%s%s",
+                            relNode, forkNames[forkNumber], markstr);
         else
-            path = psprintf("global/%u", relNode);
+            path = psprintf("global/%u%s", relNode, markstr);
     }
     else if (spcNode == DEFAULTTABLESPACE_OID)
     {
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/%u_%s",
+                path = psprintf("base/%u/%u_%s%s",
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/%u",
-                                dbNode, relNode);
+                path = psprintf("base/%u/%u%s",
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("base/%u/t%d_%u_%s",
+                path = psprintf("base/%u/t%d_%u_%s%s",
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("base/%u/t%d_%u",
-                                dbNode, backendId, relNode);
+                path = psprintf("base/%u/t%d_%u%s",
+                                dbNode, backendId, relNode, markstr);
         }
     }
     else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
         if (backendId == InvalidBackendId)
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, relNode);
+                                dbNode, relNode, markstr);
         }
         else
         {
             if (forkNumber != MAIN_FORKNUM)
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
                                 dbNode, backendId, relNode,
-                                forkNames[forkNumber]);
+                                forkNames[forkNumber], markstr);
             else
-                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+                path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
                                 spcNode, TABLESPACE_VERSION_DIRECTORY,
-                                dbNode, backendId, relNode);
+                                dbNode, backendId, relNode, markstr);
         }
     }
+
     return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 0ab32b44e9..584ebac391 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,6 +23,8 @@
 extern int    wal_skip_threshold;
 
 extern SMgrRelation RelationCreateStorage(RelFileNode rnode, char relpersistence);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
@@ -41,6 +43,7 @@ extern void RestorePendingSyncs(char *startAddress);
 extern void smgrDoPendingDeletes(bool isCommit);
 extern void smgrDoPendingSyncs(bool isCommit, bool isParallelWorker);
 extern int    smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void smgrDoPendingCleanups(bool isCommit);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index f0814f1458..12346ed7f6 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE    0x10
 #define XLOG_SMGR_TRUNCATE    0x20
+#define XLOG_SMGR_UNLINK    0x30
+#define XLOG_SMGR_MARK        0x40
+#define XLOG_SMGR_BUFPERSISTENCE    0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,32 @@ typedef struct xl_smgr_create
     ForkNumber    forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+    RelFileNode rnode;
+    ForkNumber    forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+    XLOG_SMGR_MARK_CREATE = 'c',
+    XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+    RelFileNode     rnode;
+    ForkNumber        forkNum;
+    StorageMarks    mark;
+    smgr_mark_action action;
+} xl_smgr_mark;
+
+typedef struct xl_smgr_bufpersistence
+{
+    RelFileNode rnode;
+    bool        persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP        0x0001
 #define SMGR_TRUNCATE_VM        0x0002
@@ -51,6 +83,12 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileNode *rnode, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileNode *rnode, ForkNumber forkNum,
+                               StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileNode *rnode, bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index a44be11ca0..106a5cf508 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -67,7 +67,7 @@ extern int    forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbNode, Oid spcNode);
 
 extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
-                             int backendId, ForkNumber forkNumber);
+                             int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -77,7 +77,7 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 /* First argument is a RelFileNode */
 #define relpathbackend(rnode, backend, forknum) \
     GetRelationPath((rnode).dbNode, (rnode).spcNode, (rnode).relNode, \
-                    backend, forknum)
+                    backend, forknum, 0)
 
 /* First argument is a RelFileNode */
 #define relpathperm(rnode, forknum) \
@@ -87,4 +87,9 @@ extern char *GetRelationPath(Oid dbNode, Oid spcNode, Oid relNode,
 #define relpath(rnode, forknum) \
     relpathbackend((rnode).node, (rnode).backend, forknum)
 
+/* First argument is a RelFileNodeBackend */
+#define markpath(rnode, forknum, mark)                                \
+    GetRelationPath((rnode).node.dbNode, (rnode).node.spcNode, \
+                    (rnode).node.relNode, \
+                    (rnode).backend, forknum, mark)
 #endif                            /* RELPATH_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index cfce23ecbc..f5a7df87a4 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -206,6 +206,8 @@ extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels)
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(struct SMgrRelationData *smgr_reln, ForkNumber *forkNum,
                                    int nforks, BlockNumber *firstDelBlock);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+                                          bool permanent, bool isRedo);
 extern void DropRelFileNodesAllBuffers(struct SMgrRelationData **smgr_reln, int nnodes);
 extern void DropDatabaseBuffers(Oid dbid);
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 34602ae006..2dc0357ad5 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -185,6 +185,7 @@ extern ssize_t pg_pwritev_with_retry(int fd,
 extern int    pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int    fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int    fsync_parent_path(const char *fname, int elevel);
 extern int    durable_rename(const char *oldfile, const char *newfile, int loglevel);
 extern int    durable_unlink(const char *fname, int loglevel);
 extern int    durable_rename_excl(const char *oldfile, const char *newfile, int loglevel);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 752b440864..99620816b5 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -23,6 +23,10 @@
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                         StorageMarks mark, bool    isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo);
@@ -41,12 +45,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
                        BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileNodeBackend rnode,
+                                           ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int    mdsyncfiletag(const FileTag *ftag, char *path);
-extern int    mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int    mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif                            /* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index fad1e5c473..e1f97e9b89 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,13 +16,15 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
-extern bool parse_filename_for_nontemp_relation(const char *name,
-                                                int *oidchars, ForkNumber *fork);
+extern bool parse_filename_for_nontemp_relation(const char *name, int *oidchars,
+                                                ForkNumber *fork,
+                                                StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP        0x0001
-#define UNLOGGED_RELATION_INIT            0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER    0x0002
+#define UNLOGGED_RELATION_INIT            0x0004
 
 #endif                            /* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index a6fbf7b6a6..201ecace8a 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilenode.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+    SMGR_MARK_NONE = 0,
+    SMGR_MARK_UNCOMMITTED = 'u'    /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -85,7 +97,12 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln);
 extern void smgrclose(SMgrRelation reln);
 extern void smgrcloseall(void);
 extern void smgrclosenode(RelFileNodeBackend rnode);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+                           StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
diff --git a/src/test/recovery/t/027_persistence_change.pl b/src/test/recovery/t/027_persistence_change.pl
new file mode 100644
index 0000000000..261c4cf943
--- /dev/null
+++ b/src/test/recovery/t/027_persistence_change.pl
@@ -0,0 +1,263 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test relation persistence change
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Test::More tests => 30;
+use IPC::Run qw(pump finish timer);
+use Config;
+
+my $data_unit = 2000;
+
+# Initialize primary node.
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+# we don't want checkpointing
+$node->append_conf('postgresql.conf', qq(
+checkpoint_timeout = '24h'
+));
+$node->start;
+create($node);
+
+my $relfilenodes1 = relfilenodes();
+
+# correctly recover empty tables
+$node->stop('immediate');
+$node->start;
+insert($node, 0, $data_unit, 0);
+
+# data persists after a crash
+$node->stop('immediate');
+$node->start;
+checkdataloss($data_unit, 'crash logged 1');
+
+set_unlogged($node);
+# SET UNLOGGED shouldn't change relfilenode
+my $relfilenodes2 = relfilenodes();
+checkrelfilenodes($relfilenodes1, $relfilenodes2, 'logged->unlogged');
+
+# data cleanly vanishes after a crash
+$node->stop('immediate');
+$node->start;
+checkdataloss(0, 'crash unlogged');
+
+insert($node, 0, $data_unit, 0);
+set_logged($node);
+
+$node->stop('immediate');
+$node->start;
+# SET LOGGED shouldn't change relfilenode and data should survive the crash
+my $relfilenodes3 = relfilenodes();
+checkrelfilenodes($relfilenodes2, $relfilenodes3, 'unlogged->logged');
+checkdataloss($data_unit, 'crash logged 2');
+
+# unlogged insert -> graceful stop
+set_unlogged($node);
+insert($node, $data_unit, $data_unit, 0);
+$node->stop;
+$node->start;
+checkdataloss($data_unit * 2, 'unlogged graceful restart');
+
+# crash during transaction
+set_logged($node);
+$node->stop('immediate');
+$node->start;
+insert($node, $data_unit * 2, $data_unit, 0);
+
+my $h;
+
+# insert(,,,1) requires IO::Pty. Skip the test if the module is not
+# available, but do the insert to make the expected situation for the
+# later tests.
+eval { require IO::Pty; };
+if ($@)
+{
+    insert($node, $data_unit * 3, $data_unit, 0);
+    ok (1, 'SKIPPED: IO::Pty is needed');
+    ok (1, 'SKIPPED: IO::Pty is needed');
+}
+else
+{
+    $h = insert($node, $data_unit * 3, $data_unit, 1); ## this is aborted
+}
+
+$node->stop('immediate');
+
+# finishing $h stalls this case, just tear it off.
+$h = undef;
+
+# check if indexes are working
+$node->start;
+# drop first half of data to reduce run time
+$node->safe_psql('postgres', 'DELETE FROM t WHERE bt < ' . $data_unit * 2);
+check($node, $data_unit * 2, $data_unit * 3 - 1, 'final check');
+
+sub create
+{
+    my ($node) = @_;
+
+    $node->psql('postgres', qq(
+            CREATE TABLE t (bt int, gin int[], gist point, hash int,
+                brin int, spgist point);
+            CREATE INDEX i_bt ON t USING btree (bt);
+            CREATE INDEX i_gin ON t USING gin (gin);
+            CREATE INDEX i_gist ON t USING gist (gist);
+            CREATE INDEX i_hash ON t USING hash (hash);
+            CREATE INDEX i_brin ON t USING brin (brin);
+            CREATE INDEX i_spgist ON t USING spgist (spgist);));
+}
+
+
+sub insert
+{
+    my ($node, $st, $num, $interactive) = @_;
+    my $ed = $st + $num - 1;
+    my $query = qq(BEGIN;
+INSERT INTO t
+ (SELECT i, ARRAY[i, i * 2], point(i, i * 2), i, i, point(i, i)
+  FROM generate_series($st, $ed) i);
+);
+
+    if ($interactive)
+    {
+        my $in  = '';
+        my $out = '';
+        my $timer = timer(10);
+
+        my $h = $node->interactive_psql('postgres', \$in, \$out, $timer);
+        like($out, qr/psql/, "print startup banner");
+
+        $in .= "$query\n";
+        pump $h until ($out =~ /[\n\r]+INSERT 0 $num[\n\r]+/ ||
+                       $timer->is_expired);
+        ok(($out =~ /[\n\r]+INSERT 0 $num[\n\r]+/), "inserted-$st-$num");
+        return $h
+        # the trasaction is not terminated
+    }
+    else
+    {
+        $node->psql('postgres', $query . "COMMIT;");
+        return undef;
+    }
+}
+
+sub check
+{
+    my ($node, $st, $ed, $head) = @_;
+    my $num_data = $ed - $st + 1;
+
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO true;
+            SET enable_indexscan TO false;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE bt = i)),
+       $num_data, "$head: heap is not broken");
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO false;
+            SET enable_indexscan TO true;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE bt = i)),
+       $num_data, "$head: btree is not broken");
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO false;
+            SET enable_indexscan TO true;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE gin = ARRAY[i, i * 2];)),
+       $num_data, "$head: gin is not broken");
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO false;
+            SET enable_indexscan TO true;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE gist <@ box(point(i-0.5, i*2-0.5),point(i+0.5, i*2+0.5));)),
+       $num_data, "$head: gist is not broken");
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO false;
+            SET enable_indexscan TO true;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE hash = i;)),
+       $num_data, "$head: hash is not broken");
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO false;
+            SET enable_indexscan TO true;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE brin = i;)),
+       $num_data, "$head: brin is not broken");
+    is($node->safe_psql('postgres', qq(
+            SET enable_seqscan TO false;
+            SET enable_indexscan TO true;
+            SELECT COUNT(*) FROM t, generate_series($st, $ed) i
+            WHERE spgist <@ box(point(i-0.5,i-0.5),point(i+0.5,i+0.5));)),
+       $num_data, "$head: spgist is not broken");
+}
+
+sub set_unlogged
+{
+    my ($node) = @_;
+
+    $node->psql('postgres', qq(
+            ALTER TABLE t SET UNLOGGED;
+));
+}
+
+sub set_logged
+{
+    my ($node) = @_;
+
+    $node->psql('postgres', qq(
+            ALTER TABLE t SET LOGGED;
+));
+}
+
+sub relfilenodes
+{
+    my $result = $node->safe_psql('postgres', qq{
+        SELECT relname, relfilenode FROM pg_class
+        WHERE relname
+        IN ('t', 'i_bt','i_gin','i_gist','i_hash','i_brin','i_spgist');});
+
+    my %relfilenodes;
+
+    foreach my $l (split(/\n/, $result))
+    {
+        die "unexpected format: $l" if ($l !~ /^([^|]+)\|([0-9]+)$/);
+        $relfilenodes{$1} = $2;
+    }
+
+    # the number must correspond to the in list above
+    is (scalar %relfilenodes, 7, "number of relations is correct");
+
+    return \%relfilenodes;
+}
+
+sub checkrelfilenodes
+{
+    my ($rnodes1, $rnodes2, $s) = @_;
+
+    foreach my $n (keys %{$rnodes1})
+    {
+        if ($n eq 'i_gist')
+        {
+            # persistence of GiST index is not changed in-place
+            isnt($rnodes1->{$n}, $rnodes2->{$n},
+                 "$s: relfilenode is changed: $n");
+        }
+        else
+        {
+            # otherwise all relations are processed in-place
+            is($rnodes1->{$n}, $rnodes2->{$n},
+                 "$s: relfilenode is not changed: $n");
+        }
+    }
+}
+
+sub checkdataloss
+{
+    my ($expected, $s) = @_;
+
+    is($node->safe_psql('postgres', "SELECT count(*) FROM t;"), $expected,
+       "$s: data in table t is in the expected state");
+}
-- 
2.27.0

From edb09262d0793df84dfcb9138bad0309f84cfe87 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Wed, 11 Nov 2020 23:21:09 +0900
Subject: [PATCH v16 2/2] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 doc/src/sgml/ref/alter_table.sgml        |  15 +++
 src/backend/commands/tablecmds.c         | 140 +++++++++++++++++++++++
 src/backend/nodes/copyfuncs.c            |  16 +++
 src/backend/nodes/equalfuncs.c           |  15 +++
 src/backend/parser/gram.y                |  42 +++++++
 src/backend/tcop/utility.c               |  11 ++
 src/include/commands/tablecmds.h         |   2 +
 src/include/nodes/nodes.h                |   1 +
 src/include/nodes/parsenodes.h           |  10 ++
 src/test/regress/expected/tablespace.out |  76 ++++++++++++
 src/test/regress/sql/tablespace.sql      |  41 +++++++
 11 files changed, 369 insertions(+)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index a76e2e7322..6f108980af 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -33,6 +33,8 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
 ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable
class="parameter">role_name</replaceable>[, ... ] ]
 
     SET TABLESPACE <replaceable class="parameter">new_tablespace</replaceable> [ NOWAIT ]
+ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable
class="parameter">role_name</replaceable>[, ... ] ]
 
+    SET { LOGGED | UNLOGGED } [ NOWAIT ]
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable
class="parameter">partition_bound_spec</replaceable>| DEFAULT }
 
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
@@ -753,6 +755,19 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       (see <xref linkend="sql-createtable-unlogged"/>).  It cannot be applied
       to a temporary table.
      </para>
+
+     <para>
+      All tables in the current database in a tablespace can be changed by using
+      the <literal>ALL IN TABLESPACE</literal> form, which will lock all tables
+      to be changed first and then change each one.  This form also supports
+      <literal>OWNED BY</literal>, which will only change tables owned by the
+      roles specified.  If the <literal>NOWAIT</literal> option is specified
+      then the command will fail if it is unable to acquire all of the locks
+      required immediately.  The <literal>information_schema</literal>
+      relations are not considered part of the system catalogs and will be
+      changed.  See also
+      <link linkend="sql-createtablespace"><command>CREATE TABLESPACE</command></link>.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 51fcf9ca5f..524c9d5c1b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14770,6 +14770,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
     return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistene only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt)
+{
+    List       *relations = NIL;
+    ListCell   *l;
+    ScanKeyData key[1];
+    Relation    rel;
+    TableScanDesc scan;
+    HeapTuple    tuple;
+    Oid            tablespaceoid;
+    List       *role_oids = roleSpecsToIds(stmt->roles);
+
+    /* Ensure we were not asked to change something we can't */
+    if (stmt->objtype != OBJECT_TABLE)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("only tables can be specified")));
+
+    /* Get the tablespace OID */
+    tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+    /*
+     * Now that the checks are done, check if we should set either to
+     * InvalidOid because it is our database's default tablespace.
+     */
+    if (tablespaceoid == MyDatabaseTableSpace)
+        tablespaceoid = InvalidOid;
+
+    /*
+     * Walk the list of objects in the tablespace to pick up them. This will
+     * only find objects in our database, of course.
+     */
+    ScanKeyInit(&key[0],
+                Anum_pg_class_reltablespace,
+                BTEqualStrategyNumber, F_OIDEQ,
+                ObjectIdGetDatum(tablespaceoid));
+
+    rel = table_open(RelationRelationId, AccessShareLock);
+    scan = table_beginscan_catalog(rel, 1, key);
+    while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+    {
+        Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+        Oid            relOid = relForm->oid;
+
+        /*
+         * Do not pick-up objects in pg_catalog as part of this, if an admin
+         * really wishes to do so, they can issue the individual ALTER
+         * commands directly.
+         *
+         * Also, explicitly avoid any shared tables, temp tables, or TOAST
+         * (TOAST will be changed with the main table).
+         */
+        if (IsCatalogNamespace(relForm->relnamespace) ||
+            relForm->relisshared ||
+            isAnyTempNamespace(relForm->relnamespace) ||
+            IsToastNamespace(relForm->relnamespace))
+            continue;
+
+        /* Only pick up the object type requested */
+        if (relForm->relkind != RELKIND_RELATION)
+            continue;
+
+        /* Check if we are only picking-up objects owned by certain roles */
+        if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+            continue;
+
+        /*
+         * Handle permissions-checking here since we are locking the tables
+         * and also to avoid doing a bunch of work only to fail part-way. Note
+         * that permissions will also be checked by AlterTableInternal().
+         *
+         * Caller must be considered an owner on the table of which we're going
+         * to change persistence.
+         */
+        if (!pg_class_ownercheck(relOid, GetUserId()))
+            aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+                           NameStr(relForm->relname));
+
+        if (stmt->nowait &&
+            !ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+            ereport(ERROR,
+                    (errcode(ERRCODE_OBJECT_IN_USE),
+                     errmsg("aborting because lock on relation \"%s.%s\" is not available",
+                            get_namespace_name(relForm->relnamespace),
+                            NameStr(relForm->relname))));
+        else
+            LockRelationOid(relOid, AccessExclusiveLock);
+
+        /*
+         * Add to our list of objects of which we're going to change
+         * persistence.
+         */
+        relations = lappend_oid(relations, relOid);
+    }
+
+    table_endscan(scan);
+    table_close(rel, AccessShareLock);
+
+    if (relations == NIL)
+        ereport(NOTICE,
+                (errcode(ERRCODE_NO_DATA_FOUND),
+                 errmsg("no matching relations in tablespace \"%s\" found",
+                        tablespaceoid == InvalidOid ? "(database default)" :
+                        get_tablespace_name(tablespaceoid))));
+
+    /*
+     * Everything is locked, loop through and change persistence of all of the
+     * relations.
+     */
+    foreach(l, relations)
+    {
+        List       *cmds = NIL;
+        AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+        if (stmt->logged)
+            cmd->subtype = AT_SetLogged;
+        else
+            cmd->subtype = AT_SetUnLogged;
+
+        cmds = lappend(cmds, cmd);
+
+        EventTriggerAlterTableStart((Node *) stmt);
+        /* OID is set by AlterTableInternal */
+        AlterTableInternal(lfirst_oid(l), cmds, false);
+        EventTriggerAlterTableEnd();
+    }
+}
+
 static void
 index_copy_data(Relation rel, RelFileNode newrnode)
 {
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 18e778e856..51b6ad757f 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4270,6 +4270,19 @@ _copyAlterTableMoveAllStmt(const AlterTableMoveAllStmt *from)
     return newnode;
 }
 
+static AlterTableSetLoggedAllStmt *
+_copyAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *from)
+{
+    AlterTableSetLoggedAllStmt *newnode = makeNode(AlterTableSetLoggedAllStmt);
+
+    COPY_STRING_FIELD(tablespacename);
+    COPY_SCALAR_FIELD(objtype);
+    COPY_SCALAR_FIELD(logged);
+    COPY_SCALAR_FIELD(nowait);
+
+    return newnode;
+}
+
 static CreateExtensionStmt *
 _copyCreateExtensionStmt(const CreateExtensionStmt *from)
 {
@@ -5623,6 +5636,9 @@ copyObjectImpl(const void *from)
         case T_AlterTableMoveAllStmt:
             retval = _copyAlterTableMoveAllStmt(from);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _copyAlterTableSetLoggedAllStmt(from);
+            break;
         case T_CreateExtensionStmt:
             retval = _copyCreateExtensionStmt(from);
             break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..a19b7874d7 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1916,6 +1916,18 @@ _equalAlterTableMoveAllStmt(const AlterTableMoveAllStmt *a,
     return true;
 }
 
+static bool
+_equalAlterTableSetLoggedAllStmt(const AlterTableSetLoggedAllStmt *a,
+                                 const AlterTableSetLoggedAllStmt *b)
+{
+    COMPARE_STRING_FIELD(tablespacename);
+    COMPARE_SCALAR_FIELD(objtype);
+    COMPARE_SCALAR_FIELD(logged);
+    COMPARE_SCALAR_FIELD(nowait);
+
+    return true;
+}
+
 static bool
 _equalCreateExtensionStmt(const CreateExtensionStmt *a, const CreateExtensionStmt *b)
 {
@@ -3625,6 +3637,9 @@ equal(const void *a, const void *b)
         case T_AlterTableMoveAllStmt:
             retval = _equalAlterTableMoveAllStmt(a, b);
             break;
+        case T_AlterTableSetLoggedAllStmt:
+            retval = _equalAlterTableSetLoggedAllStmt(a, b);
+            break;
         case T_CreateExtensionStmt:
             retval = _equalCreateExtensionStmt(a, b);
             break;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6dddc07947..50bc3190de 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -1984,6 +1984,48 @@ AlterTableStmt:
                     n->nowait = $13;
                     $$ = (Node *)n;
                 }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = true;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET LOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->roles = $9;
+                    n->logged = true;
+                    n->nowait = $12;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->logged = false;
+                    n->nowait = $9;
+                    $$ = (Node *)n;
+                }
+        |    ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET UNLOGGED opt_nowait
+                {
+                    AlterTableSetLoggedAllStmt *n =
+                        makeNode(AlterTableSetLoggedAllStmt);
+                    n->tablespacename = $6;
+                    n->objtype = OBJECT_TABLE;
+                    n->roles = $9;
+                    n->logged = false;
+                    n->nowait = $12;
+                    $$ = (Node *)n;
+                }
         |    ALTER INDEX qualified_name alter_table_cmds
                 {
                     AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1fbc387d47..1483f9a475 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -162,6 +162,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
         case T_AlterTSConfigurationStmt:
         case T_AlterTSDictionaryStmt:
         case T_AlterTableMoveAllStmt:
+        case T_AlterTableSetLoggedAllStmt:
         case T_AlterTableSpaceOptionsStmt:
         case T_AlterTableStmt:
         case T_AlterTypeStmt:
@@ -1747,6 +1748,12 @@ ProcessUtilitySlow(ParseState *pstate,
                 commandCollected = true;
                 break;
 
+            case T_AlterTableSetLoggedAllStmt:
+                AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+                /* commands are stashed in AlterTableSetLoggedAll */
+                commandCollected = true;
+                break;
+
             case T_DropStmt:
                 ExecDropStmt((DropStmt *) parsetree, isTopLevel);
                 /* no commands stashed for DROP */
@@ -2669,6 +2676,10 @@ CreateCommandTag(Node *parsetree)
             tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
             break;
 
+        case T_AlterTableSetLoggedAllStmt:
+            tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+            break;
+
         case T_AlterTableStmt:
             tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
             break;
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 336549cc5f..714077ff4c 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid    AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt *stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
                                          Oid *oldschema);
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 7c657c1241..8860b2e548 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -428,6 +428,7 @@ typedef enum NodeTag
     T_AlterCollationStmt,
     T_CallStmt,
     T_AlterStatsStmt,
+    T_AlterTableSetLoggedAllStmt,
 
     /*
      * TAGS FOR PARSE TREE NODES (parsenodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 593e301f7a..01661e9622 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2350,6 +2350,16 @@ typedef struct AlterTableMoveAllStmt
     bool        nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+    NodeTag        type;
+    char       *tablespacename;
+    ObjectType    objtype;        /* Object type to move */
+    List       *roles;            /* List of roles to change objects of */
+    bool        logged;
+    bool        nowait;
+} AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *        Create/Alter Extension Statements
  * ----------------------
diff --git a/src/test/regress/expected/tablespace.out b/src/test/regress/expected/tablespace.out
index 864f4b6e20..420eed0717 100644
--- a/src/test/regress/expected/tablespace.out
+++ b/src/test/regress/expected/tablespace.out
@@ -935,5 +935,81 @@ drop cascades to table testschema.asexecute
 drop cascades to table testschema.part
 drop cascades to table testschema.atable
 drop cascades to table testschema.tablespace_acl
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION :'testtablespace';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+      OWNED BY regress_tablespace_user1 SET LOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | p
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+RESET ROLE;
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | u
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | u
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+NOTICE:  drop cascades to 8 other objects
+DETAIL:  drop cascades to table testschema.lsu
+drop cascades to table testschema.usu
+drop cascades to table testschema._lsu
+drop cascades to table testschema._usu
+drop cascades to table testschema.lu1
+drop cascades to table testschema.uu1
+drop cascades to table testschema._lu1
+drop cascades to table testschema._uu1
+DROP TABLESPACE regress_tablespace;
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
diff --git a/src/test/regress/sql/tablespace.sql b/src/test/regress/sql/tablespace.sql
index 92076db9a1..0025c56401 100644
--- a/src/test/regress/sql/tablespace.sql
+++ b/src/test/regress/sql/tablespace.sql
@@ -412,5 +412,46 @@ DROP TABLESPACE regress_tblspace_renamed;
 
 DROP SCHEMA testschema CASCADE;
 
+
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION :'testtablespace';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+      OWNED BY regress_tablespace_user1 SET LOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+RESET ROLE;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+DROP TABLESPACE regress_tablespace;
+
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
-- 
2.27.0


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Masahiko Sawada
Дата:
Сообщение: Re: Skipping logical replication transactions on subscriber side
Следующее
От: Julien Rouhaud
Дата:
Сообщение: Re: Schema variables - new implementation for Postgres 15