Обсуждение: why there is not VACUUM FULL CONCURRENTLY?

Поиск
Список
Период
Сортировка

why there is not VACUUM FULL CONCURRENTLY?

От
Pavel Stehule
Дата:
Hi

I have one question, what is a block of implementation of some variant of VACUUM FULL like REINDEX CONCURRENTLY? Why similar mechanism of REINDEX CONCURRENTLY cannot be used for VACUUM FULL?

Regards

Pavel

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Michael Paquier
Дата:
On Tue, Jan 30, 2024 at 09:01:57AM +0100, Pavel Stehule wrote:
> I have one question, what is a block of implementation of some variant of
> VACUUM FULL like REINDEX CONCURRENTLY? Why similar mechanism of REINDEX
> CONCURRENTLY cannot be used for VACUUM FULL?

You may be interested in these threads:
https://www.postgresql.org/message-id/CAB7nPqTGmNUFi%2BW6F1iwmf7J-o6sY%2Bxxo6Yb%3DmkUVYT-CG-B5A%40mail.gmail.com
https://www.postgresql.org/message-id/CAB7nPqTys6JUQDxUczbJb0BNW0kPrW8WdZuk11KaxQq6o98PJg%40mail.gmail.com

VACUUM FULL is CLUSTER under the hoods.  One may question whether it
is still a relevant discussion these days if we assume that autovacuum
is able to keep up, because it always keeps up with the house cleanup,
right?  ;)

More seriously, we have a lot more options these days with VACUUM like
PARALLEL, so CONCURRENTLY may still have some uses, but the new toys
available may have changed things.  So, would it be worth the
complexities around heap manipulations that lower locks would require?
--
Michael

Вложения

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Pavel Stehule
Дата:


út 30. 1. 2024 v 9:14 odesílatel Michael Paquier <michael@paquier.xyz> napsal:
On Tue, Jan 30, 2024 at 09:01:57AM +0100, Pavel Stehule wrote:
> I have one question, what is a block of implementation of some variant of
> VACUUM FULL like REINDEX CONCURRENTLY? Why similar mechanism of REINDEX
> CONCURRENTLY cannot be used for VACUUM FULL?

You may be interested in these threads:
https://www.postgresql.org/message-id/CAB7nPqTGmNUFi%2BW6F1iwmf7J-o6sY%2Bxxo6Yb%3DmkUVYT-CG-B5A%40mail.gmail.com
https://www.postgresql.org/message-id/CAB7nPqTys6JUQDxUczbJb0BNW0kPrW8WdZuk11KaxQq6o98PJg%40mail.gmail.com

VACUUM FULL is CLUSTER under the hoods.  One may question whether it
is still a relevant discussion these days if we assume that autovacuum
is able to keep up, because it always keeps up with the house cleanup,
right?  ;)

More seriously, we have a lot more options these days with VACUUM like
PARALLEL, so CONCURRENTLY may still have some uses, but the new toys
available may have changed things.  So, would it be worth the
complexities around heap manipulations that lower locks would require?

One of my customer today is reducing one table from 140GB to 20GB.  Now he is able to run archiving. He should play with pg_repack, and it is working well today, but I ask myself, what pg_repack does not be hard to do internally because it should be done for REINDEX CONCURRENTLY. This is not a common task, and not will be, but on the other hand, it can be nice to have feature, and maybe not too hard to implement today. But I didn't try it

I'll read the threads

Pavel


--
Michael

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Alvaro Herrera
Дата:
On 2024-Jan-30, Pavel Stehule wrote:

> One of my customer today is reducing one table from 140GB to 20GB.  Now he
> is able to run archiving. He should play with pg_repack, and it is working
> well today, but I ask myself, what pg_repack does not be hard to do
> internally because it should be done for REINDEX CONCURRENTLY. This is not
> a common task, and not will be, but on the other hand, it can be nice to
> have feature, and maybe not too hard to implement today. But I didn't try it

FWIW a newer, more modern and more trustworthy alternative to pg_repack
is pg_squeeze, which I discovered almost by random chance, and soon
discovered I liked it much more.

So thinking about your question, I think it might be possible to
integrate a tool that works like pg_squeeze, such that it runs when
VACUUM is invoked -- either under some new option, or just replace the
code under FULL, not sure.  If the Cybertec people allows it, we could
just grab the pg_squeeze code and add it to the things that VACUUM can
run.

Now, pg_squeeze has some additional features, such as periodic
"squeezing" of tables.  In a first attempt, for simplicity, I would
leave that stuff out and just allow it to run from the user invoking it,
and then have the command to do a single run.  (The scheduling features
could be added later, or somehow integrated into autovacuum, or maybe
something else.)

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
"We're here to devour each other alive"            (Hobbes)



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Pavel Stehule
Дата:


út 30. 1. 2024 v 11:31 odesílatel Alvaro Herrera <alvherre@alvh.no-ip.org> napsal:
On 2024-Jan-30, Pavel Stehule wrote:

> One of my customer today is reducing one table from 140GB to 20GB.  Now he
> is able to run archiving. He should play with pg_repack, and it is working
> well today, but I ask myself, what pg_repack does not be hard to do
> internally because it should be done for REINDEX CONCURRENTLY. This is not
> a common task, and not will be, but on the other hand, it can be nice to
> have feature, and maybe not too hard to implement today. But I didn't try it

FWIW a newer, more modern and more trustworthy alternative to pg_repack
is pg_squeeze, which I discovered almost by random chance, and soon
discovered I liked it much more.

So thinking about your question, I think it might be possible to
integrate a tool that works like pg_squeeze, such that it runs when
VACUUM is invoked -- either under some new option, or just replace the
code under FULL, not sure.  If the Cybertec people allows it, we could
just grab the pg_squeeze code and add it to the things that VACUUM can
run.

Now, pg_squeeze has some additional features, such as periodic
"squeezing" of tables.  In a first attempt, for simplicity, I would
leave that stuff out and just allow it to run from the user invoking it,
and then have the command to do a single run.  (The scheduling features
could be added later, or somehow integrated into autovacuum, or maybe
something else.)

some basic variant (without autovacuum support) can be good enough. We have no autovacuum support for REINDEX CONCURRENTLY and I don't see a necessity for it (sure, it can be limited by my perspective) . The necessity of reducing table size is not too common (a lot of use cases are better covered by using partitioning), but sometimes it is, and then buildin simple available solution can be helpful.





--
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
"We're here to devour each other alive"            (Hobbes)

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Alvaro Herrera
Дата:
On 2024-Jan-30, Pavel Stehule wrote:

> some basic variant (without autovacuum support) can be good enough. We have
> no autovacuum support for REINDEX CONCURRENTLY and I don't see a necessity
> for it (sure, it can be limited by my perspective) . The necessity of
> reducing table size is not too common (a lot of use cases are better
> covered by using partitioning), but sometimes it is, and then buildin
> simple available solution can be helpful.

That's my thinking as well.

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Michael Paquier
Дата:
On Tue, Jan 30, 2024 at 12:37:12PM +0100, Alvaro Herrera wrote:
> On 2024-Jan-30, Pavel Stehule wrote:
>
> > some basic variant (without autovacuum support) can be good enough. We have
> > no autovacuum support for REINDEX CONCURRENTLY and I don't see a necessity
> > for it (sure, it can be limited by my perspective) . The necessity of
> > reducing table size is not too common (a lot of use cases are better
> > covered by using partitioning), but sometimes it is, and then buildin
> > simple available solution can be helpful.
>
> That's my thinking as well.

Or, yes, I'd agree about that.  This can make for a much better user
experience.  I'm just not sure how that stuff would be shaped and how
much ground it would need to cover.
--
Michael

Вложения

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:

> On 2024-Jan-30, Pavel Stehule wrote:
>
> > One of my customer today is reducing one table from 140GB to 20GB.  Now he
> > is able to run archiving. He should play with pg_repack, and it is working
> > well today, but I ask myself, what pg_repack does not be hard to do
> > internally because it should be done for REINDEX CONCURRENTLY. This is not
> > a common task, and not will be, but on the other hand, it can be nice to
> > have feature, and maybe not too hard to implement today. But I didn't try it
>
> FWIW a newer, more modern and more trustworthy alternative to pg_repack
> is pg_squeeze, which I discovered almost by random chance, and soon
> discovered I liked it much more.
>
> So thinking about your question, I think it might be possible to
> integrate a tool that works like pg_squeeze, such that it runs when
> VACUUM is invoked -- either under some new option, or just replace the
> code under FULL, not sure.  If the Cybertec people allows it, we could
> just grab the pg_squeeze code and add it to the things that VACUUM can
> run.

There are no objections from Cybertec. Nevertheless, I don't expect much code
to be just copy & pasted. If I started to implement the extension today, I'd
do some things in a different way. (Some things might actually be simpler in
the core, i.e. a few small changes in PG core are easier than the related
workarounds in the extension.)

The core idea is that: 1) a "historic snapshot" is used to get the current
contents of the table, 2) logical decoding is used to capture the changes done
while the data is being copied to new storage, 3) the exclusive lock on the
table is only taken for very short time, to swap the storage (relfilenode) of
the table.

I think it should be coded in a way that allows use by VACUUM FULL, CLUSTER,
and possibly some subcommands of ALTER TABLE. For example, some users of
pg_squeeze requested an enhancement that allows the user to change column data
type w/o service disruption (typically when it appears that integer type is
going to overflow and change bigint is needed).

Online (re)partitioning could be another use case, although I admit that
commands that change the system catalog are a bit harder to implement than
VACUUM FULL / CLUSTER.

One thing that pg_squeeze does not handle is visibility: it uses heap_insert()
to insert the tuples into the new storage, so the problems described in [1]
can appear. The in-core implementation should rather do something like tuple
rewriting (rewriteheap.c).

Is your plan to work on it soon or should I try to write a draft patch? (I
assume this is for PG >= 18.)

[1] https://www.postgresql.org/docs/current/mvcc-caveats.html

--
Antonin Houska
Web: https://www.cybertec-postgresql.com



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Alvaro Herrera
Дата:
This is great to hear.

On 2024-Jan-31, Antonin Houska wrote:

> Is your plan to work on it soon or should I try to write a draft patch? (I
> assume this is for PG >= 18.)

I don't have plans for it, so if you have resources, please go for it.

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:

> This is great to hear.
> 
> On 2024-Jan-31, Antonin Houska wrote:
> 
> > Is your plan to work on it soon or should I try to write a draft patch? (I
> > assume this is for PG >= 18.)
> 
> I don't have plans for it, so if you have resources, please go for it.

ok, I'm thinking how can the feature be integrated into the core.

BTW, I'm failing to understand why cluster_rel() has no argument of the
BufferAccessStrategy type. According to buffer/README, the criterion for using
specific strategy is that page "is unlikely to be needed again
soon". Specifically for cluster_rel(), the page will *definitely* not be used
again (unless the VACCUM FULL/CLUSTER command fails): BufferTag contains the
relatin file number and the old relation file is eventually dropped.

Am I missing anything?

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Alvaro Herrera
Дата:
On 2024-Feb-16, Antonin Houska wrote:

> BTW, I'm failing to understand why cluster_rel() has no argument of the
> BufferAccessStrategy type. According to buffer/README, the criterion for using
> specific strategy is that page "is unlikely to be needed again
> soon". Specifically for cluster_rel(), the page will *definitely* not be used
> again (unless the VACCUM FULL/CLUSTER command fails): BufferTag contains the
> relatin file number and the old relation file is eventually dropped.
> 
> Am I missing anything?

No, that's just an oversight.  Access strategies are newer than that
cluster code.

-- 
Álvaro Herrera               48°01'N 7°57'E  —  https://www.EnterpriseDB.com/
"Most hackers will be perfectly comfortable conceptualizing users as entropy
 sources, so let's move on."                               (Nathaniel Smith)
      https://mail.gnu.org/archive/html/monotone-devel/2007-01/msg00080.html



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:

> > Is your plan to work on it soon or should I try to write a draft patch? (I
> > assume this is for PG >= 18.)
> 
> I don't have plans for it, so if you have resources, please go for it.

The first version is attached. The actual feature is in 0003. 0004 is probably
not necessary now, but I haven't realized until I coded it.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com

From f47a98b9b4580a581aacf73c553b87ca6bf16533 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 9 Jul 2024 17:45:59 +0200
Subject: [PATCH 1/4] Adjust signature of cluster_rel() and its subroutines.

So far cluster_rel() received OID of the relation it should process and it
performed opening and locking of the relation itself. Yet copy_table_data()
received the OID as well and also had to open the relation itself. This patch
tries to eliminate the repeated opening and closing.

One particular reason for this change is that the VACUUM FULL / CLUSTER
command with the CONCURRENTLY option will need to release all locks on the
relation (and possibly on the clustering index) at some point. Since it makes
little sense to keep relation reference w/o lock, the cluster_rel() function
also closes its reference to the relation (and its index). Neither the
function nor its subroutines may open extra references because then it'd be a
bit harder to close them all.
---
 src/backend/commands/cluster.c   | 146 ++++++++++++++++++-------------
 src/backend/commands/matview.c   |   2 +-
 src/backend/commands/tablecmds.c |   2 +-
 src/backend/commands/vacuum.c    |  12 +--
 src/include/commands/cluster.h   |   5 +-
 5 files changed, 99 insertions(+), 68 deletions(-)

diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 78f96789b0..194d143cf4 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -70,8 +70,8 @@ typedef struct
 
 
 static void cluster_multiple_rels(List *rtcs, ClusterParams *params);
-static void rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose);
-static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
+static void rebuild_relation(Relation OldHeap, Relation index, bool verbose);
+static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
                             bool verbose, bool *pSwapToastByContent,
                             TransactionId *pFreezeXid, MultiXactId *pCutoffMulti);
 static List *get_tables_to_cluster(MemoryContext cluster_context);
@@ -194,11 +194,11 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 
         if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
         {
-            /* close relation, keep lock till commit */
-            table_close(rel, NoLock);
-
-            /* Do the job. */
-            cluster_rel(tableOid, indexOid, ¶ms);
+            /*
+             * Do the job. (The function will close the relation, lock is kept
+             * till commit.)
+             */
+            cluster_rel(rel, indexOid, ¶ms);
 
             return;
         }
@@ -275,6 +275,7 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params)
     foreach(lc, rtcs)
     {
         RelToCluster *rtc = (RelToCluster *) lfirst(lc);
+        Relation    rel;
 
         /* Start a new transaction for each relation. */
         StartTransactionCommand();
@@ -282,8 +283,13 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params)
         /* functions in indexes may want a snapshot set */
         PushActiveSnapshot(GetTransactionSnapshot());
 
-        /* Do the job. */
-        cluster_rel(rtc->tableOid, rtc->indexOid, params);
+        rel = table_open(rtc->tableOid, AccessExclusiveLock);
+
+        /*
+         * Do the job. (The function will close the relation, lock is kept
+         * till commit.)
+         */
+        cluster_rel(rel, rtc->indexOid, params);
 
         PopActiveSnapshot();
         CommitTransactionCommand();
@@ -306,16 +312,19 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params)
  * If indexOid is InvalidOid, the table will be rewritten in physical order
  * instead of index order.  This is the new implementation of VACUUM FULL,
  * and error messages should refer to the operation as VACUUM not CLUSTER.
+ *
+ * We expect that OldHeap is already locked in AccessExclusiveLock mode.
  */
 void
-cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
+cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
 {
-    Relation    OldHeap;
+    Oid            tableOid = RelationGetRelid(OldHeap);
     Oid            save_userid;
     int            save_sec_context;
     int            save_nestlevel;
     bool        verbose = ((params->options & CLUOPT_VERBOSE) != 0);
     bool        recheck = ((params->options & CLUOPT_RECHECK) != 0);
+    Relation    index = NULL;
 
     /* Check for user-requested abort. */
     CHECK_FOR_INTERRUPTS();
@@ -328,21 +337,6 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
         pgstat_progress_update_param(PROGRESS_CLUSTER_COMMAND,
                                      PROGRESS_CLUSTER_COMMAND_VACUUM_FULL);
 
-    /*
-     * We grab exclusive access to the target rel and index for the duration
-     * of the transaction.  (This is redundant for the single-transaction
-     * case, since cluster() already did it.)  The index lock is taken inside
-     * check_index_is_clusterable.
-     */
-    OldHeap = try_relation_open(tableOid, AccessExclusiveLock);
-
-    /* If the table has gone away, we can skip processing it */
-    if (!OldHeap)
-    {
-        pgstat_progress_end_command();
-        return;
-    }
-
     /*
      * Switch to the table owner's userid, so that any index functions are run
      * as that user.  Also lock down security-restricted operations and
@@ -445,7 +439,11 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
 
     /* Check heap and index are valid to cluster on */
     if (OidIsValid(indexOid))
+    {
         check_index_is_clusterable(OldHeap, indexOid, AccessExclusiveLock);
+        /* Open the index (It should already be locked.) */
+        index = index_open(indexOid, NoLock);
+    }
 
     /*
      * Quietly ignore the request if this is a materialized view which has not
@@ -474,9 +472,12 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
     TransferPredicateLocksToHeapRelation(OldHeap);
 
     /* rebuild_relation does all the dirty work */
-    rebuild_relation(OldHeap, indexOid, verbose);
+    rebuild_relation(OldHeap, index, verbose);
 
-    /* NB: rebuild_relation does table_close() on OldHeap */
+    /*
+     * NB: rebuild_relation does table_close() on OldHeap, and also on index,
+     * if the pointer is valid.
+     */
 
 out:
     /* Roll back any GUC changes executed by index functions */
@@ -625,22 +626,27 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
  * rebuild_relation: rebuild an existing relation in index or physical order
  *
  * OldHeap: table to rebuild --- must be opened and exclusive-locked!
- * indexOid: index to cluster by, or InvalidOid to rewrite in physical order.
+ * index: index to cluster by, or NULL to rewrite in physical order. Must be
+ * opened and locked.
  *
- * NB: this routine closes OldHeap at the right time; caller should not.
+ * On exit, the heap (and also the index, if one was passed) are closed, but
+ * still locked with AccessExclusiveLock.
  */
 static void
-rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose)
+rebuild_relation(Relation OldHeap, Relation index, bool verbose)
 {
     Oid            tableOid = RelationGetRelid(OldHeap);
+    Oid            indexOid = index ? RelationGetRelid(index) : InvalidOid;
     Oid            accessMethod = OldHeap->rd_rel->relam;
     Oid            tableSpace = OldHeap->rd_rel->reltablespace;
     Oid            OIDNewHeap;
+    Relation    NewHeap;
     char        relpersistence;
     bool        is_system_catalog;
     bool        swap_toast_by_content;
     TransactionId frozenXid;
     MultiXactId cutoffMulti;
+    LOCKMODE    lmode_new;
 
     if (OidIsValid(indexOid))
         /* Mark the correct index as clustered */
@@ -650,19 +656,40 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose)
     relpersistence = OldHeap->rd_rel->relpersistence;
     is_system_catalog = IsSystemRelation(OldHeap);
 
-    /* Close relcache entry, but keep lock until transaction commit */
-    table_close(OldHeap, NoLock);
-
-    /* Create the transient table that will receive the re-ordered data */
+    /*
+     * Create the transient table that will receive the re-ordered data.
+     *
+     * NoLock for the old heap because we already have it locked and want to
+     * keep unlocking straightforward.
+     */
+    lmode_new = AccessExclusiveLock;
     OIDNewHeap = make_new_heap(tableOid, tableSpace,
                                accessMethod,
                                relpersistence,
-                               AccessExclusiveLock);
+                               NoLock, &lmode_new);
+    Assert(lmode_new == AccessExclusiveLock || lmode_new == NoLock);
+    /* Lock iff not done above. */
+    NewHeap = table_open(OIDNewHeap, lmode_new == NoLock ?
+                         AccessExclusiveLock : NoLock);
 
     /* Copy the heap data into the new table in the desired order */
-    copy_table_data(OIDNewHeap, tableOid, indexOid, verbose,
+    copy_table_data(NewHeap, OldHeap, index, verbose,
                     &swap_toast_by_content, &frozenXid, &cutoffMulti);
 
+
+    /* Close relcache entries, but keep lock until transaction commit */
+    table_close(OldHeap, NoLock);
+    if (index)
+        index_close(index, NoLock);
+
+    /*
+     * Close the new relation so it can be dropped as soon as the storage is
+     * swapped. The relation is not visible to others, so we could unlock it
+     * completely, but it's simpler to pass NoLock than to track all the locks
+     * acquired so far.
+     */
+    table_close(NewHeap, NoLock);
+
     /*
      * Swap the physical files of the target and transient tables, then
      * rebuild the target's indexes and throw away the transient table.
@@ -683,10 +710,15 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose)
  *
  * After this, the caller should load the new heap with transferred/modified
  * data, then call finish_heap_swap to complete the operation.
+ *
+ * If a specific lock mode is needed for the new relation, pass it via the
+ * in/out parameter lockmode_new_p. On exit, the output value tells whether
+ * the lock was actually acquired.
  */
 Oid
 make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
-              char relpersistence, LOCKMODE lockmode)
+              char relpersistence, LOCKMODE lockmode_old,
+              LOCKMODE *lockmode_new_p)
 {
     TupleDesc    OldHeapDesc;
     char        NewHeapName[NAMEDATALEN];
@@ -697,8 +729,17 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
     Datum        reloptions;
     bool        isNull;
     Oid            namespaceid;
+    LOCKMODE    lockmode_new;
 
-    OldHeap = table_open(OIDOldHeap, lockmode);
+    if (lockmode_new_p)
+    {
+        lockmode_new = *lockmode_new_p;
+        *lockmode_new_p = NoLock;
+    }
+    else
+        lockmode_new = lockmode_old;
+
+    OldHeap = table_open(OIDOldHeap, lockmode_old);
     OldHeapDesc = RelationGetDescr(OldHeap);
 
     /*
@@ -792,7 +833,9 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
         if (isNull)
             reloptions = (Datum) 0;
 
-        NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode, toastid);
+        NewHeapCreateToastTable(OIDNewHeap, reloptions, lockmode_new, toastid);
+        if (lockmode_new_p)
+            *lockmode_new_p = lockmode_new;
 
         ReleaseSysCache(tuple);
     }
@@ -811,13 +854,13 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
  * *pCutoffMulti receives the MultiXactId used as a cutoff point.
  */
 static void
-copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
+copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose,
                 bool *pSwapToastByContent, TransactionId *pFreezeXid,
                 MultiXactId *pCutoffMulti)
 {
-    Relation    NewHeap,
-                OldHeap,
-                OldIndex;
+    Oid        OIDOldHeap = RelationGetRelid(OldHeap);
+    Oid        OIDOldIndex = OldIndex ? RelationGetRelid(OldIndex) : InvalidOid;
+    Oid        OIDNewHeap = RelationGetRelid(NewHeap);
     Relation    relRelation;
     HeapTuple    reltup;
     Form_pg_class relform;
@@ -836,16 +879,6 @@ copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
 
     pg_rusage_init(&ru0);
 
-    /*
-     * Open the relations we need.
-     */
-    NewHeap = table_open(OIDNewHeap, AccessExclusiveLock);
-    OldHeap = table_open(OIDOldHeap, AccessExclusiveLock);
-    if (OidIsValid(OIDOldIndex))
-        OldIndex = index_open(OIDOldIndex, AccessExclusiveLock);
-    else
-        OldIndex = NULL;
-
     /* Store a copy of the namespace name for logging purposes */
     nspname = get_namespace_name(RelationGetNamespace(OldHeap));
 
@@ -1001,11 +1034,6 @@ copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
                        tups_recently_dead,
                        pg_rusage_show(&ru0))));
 
-    if (OldIndex != NULL)
-        index_close(OldIndex, NoLock);
-    table_close(OldHeap, NoLock);
-    table_close(NewHeap, NoLock);
-
     /* Update pg_class to reflect the correct values of pages and tuples. */
     relRelation = table_open(RelationRelationId, RowExclusiveLock);
 
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index ea05d4b224..488ca950d9 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -296,7 +296,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
      */
     OIDNewHeap = make_new_heap(matviewOid, tableSpace,
                                matviewRel->rd_rel->relam,
-                               relpersistence, ExclusiveLock);
+                               relpersistence, ExclusiveLock, NULL);
     LockRelationOid(OIDNewHeap, AccessExclusiveLock);
     dest = CreateTransientRelDestReceiver(OIDNewHeap);
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index dbfe0d6b1c..5d6151dad1 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5841,7 +5841,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
              * unlogged anyway.
              */
             OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-                                       persistence, lockmode);
+                                       persistence, lockmode, NULL);
 
             /*
              * Copy the heap data into the new table with the desired
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 48f8eab202..0bd000acc5 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -2196,15 +2196,17 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
         {
             ClusterParams cluster_params = {0};
 
-            /* close relation before vacuuming, but hold lock until commit */
-            relation_close(rel, NoLock);
-            rel = NULL;
-
             if ((params->options & VACOPT_VERBOSE) != 0)
                 cluster_params.options |= CLUOPT_VERBOSE;
 
             /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
-            cluster_rel(relid, InvalidOid, &cluster_params);
+            cluster_rel(rel, InvalidOid, &cluster_params);
+
+            /*
+             * cluster_rel() should have closed the relation, lock is kept
+             * till commit.
+             */
+            rel = NULL;
         }
         else
             table_relation_vacuum(rel, params, bstrategy);
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 4e32380417..7492796ea2 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -32,13 +32,14 @@ typedef struct ClusterParams
 } ClusterParams;
 
 extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel);
-extern void cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params);
+extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params);
 extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
                                        LOCKMODE lockmode);
 extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
 
 extern Oid    make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
-                          char relpersistence, LOCKMODE lockmode);
+                          char relpersistence, LOCKMODE lockmode_old,
+                          LOCKMODE *lockmode_new_p);
 extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
                              bool is_system_catalog,
                              bool swap_toast_by_content,
-- 
2.45.2

From cdf67d933a56323c0e5ca77495f60017d398bbd5 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 9 Jul 2024 17:45:59 +0200
Subject: [PATCH 2/4] Move progress related fields from PgBackendStatus to
 PgBackendProgress.

VACUUM FULL / CLUSTER CONCURRENTLY will need to save and restore these fields
at some point.
---
 src/backend/utils/activity/backend_progress.c | 18 +++++++++---------
 src/backend/utils/activity/backend_status.c   |  4 ++--
 src/backend/utils/adt/pgstatfuncs.c           |  6 +++---
 src/include/utils/backend_progress.h          | 14 ++++++++++++++
 src/include/utils/backend_status.h            | 14 ++------------
 5 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index bfb9b7704b..e7c8bfba94 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -33,9 +33,9 @@ pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
         return;
 
     PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
-    beentry->st_progress_command = cmdtype;
-    beentry->st_progress_command_target = relid;
-    MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param));
+    beentry->st_progress.command = cmdtype;
+    beentry->st_progress.command_target = relid;
+    MemSet(&beentry->st_progress.param, 0, sizeof(beentry->st_progress.param));
     PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
@@ -56,7 +56,7 @@ pgstat_progress_update_param(int index, int64 val)
         return;
 
     PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
-    beentry->st_progress_param[index] = val;
+    beentry->st_progress.param[index] = val;
     PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
@@ -77,7 +77,7 @@ pgstat_progress_incr_param(int index, int64 incr)
         return;
 
     PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
-    beentry->st_progress_param[index] += incr;
+    beentry->st_progress.param[index] += incr;
     PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
@@ -134,7 +134,7 @@ pgstat_progress_update_multi_param(int nparam, const int *index,
     {
         Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM);
 
-        beentry->st_progress_param[index[i]] = val[i];
+        beentry->st_progress.param[index[i]] = val[i];
     }
 
     PGSTAT_END_WRITE_ACTIVITY(beentry);
@@ -155,11 +155,11 @@ pgstat_progress_end_command(void)
     if (!beentry || !pgstat_track_activities)
         return;
 
-    if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID)
+    if (beentry->st_progress.command == PROGRESS_COMMAND_INVALID)
         return;
 
     PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
-    beentry->st_progress_command = PROGRESS_COMMAND_INVALID;
-    beentry->st_progress_command_target = InvalidOid;
+    beentry->st_progress.command = PROGRESS_COMMAND_INVALID;
+    beentry->st_progress.command_target = InvalidOid;
     PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c
index 1ccf4c6d83..b54a35d91c 100644
--- a/src/backend/utils/activity/backend_status.c
+++ b/src/backend/utils/activity/backend_status.c
@@ -378,8 +378,8 @@ pgstat_bestart(void)
 #endif
 
     lbeentry.st_state = STATE_UNDEFINED;
-    lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID;
-    lbeentry.st_progress_command_target = InvalidOid;
+    lbeentry.st_progress.command = PROGRESS_COMMAND_INVALID;
+    lbeentry.st_progress.command_target = InvalidOid;
     lbeentry.st_query_id = UINT64CONST(0);
 
     /*
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3876339ee1..fe09ae8f63 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -269,7 +269,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
          * Report values for only those backends which are running the given
          * command.
          */
-        if (beentry->st_progress_command != cmdtype)
+        if (beentry->st_progress.command != cmdtype)
             continue;
 
         /* Value available to all callers */
@@ -279,9 +279,9 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
         /* show rest of the values including relid only to role members */
         if (HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
         {
-            values[2] = ObjectIdGetDatum(beentry->st_progress_command_target);
+            values[2] = ObjectIdGetDatum(beentry->st_progress.command_target);
             for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
-                values[i + 3] = Int64GetDatum(beentry->st_progress_param[i]);
+                values[i + 3] = Int64GetDatum(beentry->st_progress.param[i]);
         }
         else
         {
diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h
index 7b63d38f97..e09598eafc 100644
--- a/src/include/utils/backend_progress.h
+++ b/src/include/utils/backend_progress.h
@@ -30,8 +30,22 @@ typedef enum ProgressCommandType
     PROGRESS_COMMAND_COPY,
 } ProgressCommandType;
 
+
 #define PGSTAT_NUM_PROGRESS_PARAM    20
 
+/*
+ * Any command which wishes can advertise that it is running by setting
+ * command, command_target, and param[].  command_target should be the OID of
+ * the relation which the command targets (we assume there's just one, as this
+ * is meant for utility commands), but the meaning of each element in the
+ * param array is command-specific.
+ */
+typedef struct PgBackendProgress
+{
+    ProgressCommandType command;
+    Oid            command_target;
+    int64        param[PGSTAT_NUM_PROGRESS_PARAM];
+} PgBackendProgress;
 
 extern void pgstat_progress_start_command(ProgressCommandType cmdtype,
                                           Oid relid);
diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h
index 7b7f6f59d0..11cdf7f95a 100644
--- a/src/include/utils/backend_status.h
+++ b/src/include/utils/backend_status.h
@@ -155,18 +155,8 @@ typedef struct PgBackendStatus
      */
     char       *st_activity_raw;
 
-    /*
-     * Command progress reporting.  Any command which wishes can advertise
-     * that it is running by setting st_progress_command,
-     * st_progress_command_target, and st_progress_param[].
-     * st_progress_command_target should be the OID of the relation which the
-     * command targets (we assume there's just one, as this is meant for
-     * utility commands), but the meaning of each element in the
-     * st_progress_param array is command-specific.
-     */
-    ProgressCommandType st_progress_command;
-    Oid            st_progress_command_target;
-    int64        st_progress_param[PGSTAT_NUM_PROGRESS_PARAM];
+    /* Command progress reporting. */
+    PgBackendProgress    st_progress;
 
     /* query identifier, optionally computed using post_parse_analyze_hook */
     uint64        st_query_id;
-- 
2.45.2

From 1cb536663c018d98faf349a680b773364b464026 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 9 Jul 2024 17:45:59 +0200
Subject: [PATCH 3/4] Add CONCURRENTLY option to both VACUUM FULL and CLUSTER
 commands.

Both VACUUM FULL and CLUSTER commands copy the relation data into a new file,
create new indexes and eventually swap the files. To make sure that the old
file does not change during the copying, the relation is locked in an
exclusive mode, which prevents applications from both reading and writing. (To
keep the data consistent, we'd only need to prevent the applications from
writing, but even reading needs to be blocked before we can swap the files -
otherwise some applications could continue using the old file. Since we cannot
get stronger lock without releasing the weaker one first, we acquire the
exclusive lock in the beginning and keep it till the end of the processing.)

This patch introduces an alternative workflow, which only requires the
exclusive lock when the relation (and index) files are being swapped.
(Supposedly, the swapping should be pretty fast.) On the other hand, when we
copy the data to the new file, we allow applications to read from the relation
and even write into it.

First, we scan the relation using a "historic snapshot", and insert all the
tuples satisfying this snapshot into the new file. Note that, before creating
that snapshot, we need to make sure that all the other backends treat the
relation as a system catalog: in particular, they must log information on new
command IDs (CIDs). We achieve that by adding the relation ID into a shared
hash table and waiting until all the transactions currently writing into the
table (i.e. transactions possibly not aware of the new entry) have finished.

Second, logical decoding is used to capture the data changes done by
applications during the copying (i.e. changes that do not satisfy the historic
snapshot mentioned above), and those are applied to the new file before we
acquire the exclusive lock we need to swap the files. (Of course, more data
changes can take place while we are waiting for the lock - these will be
applied to the new file after we have acquired the lock, before we swap the
files.)

While copying the data into the new file, we hold a lock that prevents
applications from changing the relation tuple descriptor (tuples inserted into
the old file must fit into the new file). However, as we have to release that
lock before getting the exclusive one, it's possible that someone adds or
drops a column, or changes the data type of an existing one. Therefore we have
to check the tuple descriptor before we swap the files. If we find out that
the tuple descriptor changed, ERROR is raised and all the changes are rolled
back. Since a lot of effort can be wasted in such a case, the ALTER TABLE
command also tries to check if VACUUM FULL / CLUSTER with the CONCURRENTLY
option is running on the same relation, and raises an ERROR if it is.

Like the existing implementation of both VACUUM FULL and CLUSTER commands, the
variant with the CONCURRENTLY option also requires an extra space for the new
relation and index files (which coexist with the old files for some time). In
addition, the CONCURRENTLY option might introduce a lag in releasing WAL
segments for archiving / recycling. This is due to the decoding of the data
changes done by application concurrently. However, this lag should not be more
than a single WAL segment.
---
 doc/src/sgml/monitoring.sgml                  |   36 +-
 doc/src/sgml/ref/cluster.sgml                 |  114 +-
 doc/src/sgml/ref/vacuum.sgml                  |   27 +-
 src/Makefile                                  |    1 +
 src/backend/access/common/toast_internals.c   |    3 +-
 src/backend/access/heap/heapam.c              |   80 +-
 src/backend/access/heap/heapam_handler.c      |  155 +-
 src/backend/access/heap/heapam_visibility.c   |   30 +-
 src/backend/access/transam/xact.c             |   52 +
 src/backend/catalog/index.c                   |   43 +-
 src/backend/catalog/system_views.sql          |   17 +-
 src/backend/commands/cluster.c                | 2618 ++++++++++++++++-
 src/backend/commands/matview.c                |    2 +-
 src/backend/commands/tablecmds.c              |   11 +
 src/backend/commands/vacuum.c                 |  137 +-
 src/backend/replication/logical/decode.c      |   58 +-
 src/backend/replication/logical/snapbuild.c   |   87 +-
 .../replication/pgoutput_cluster/Makefile     |   32 +
 .../replication/pgoutput_cluster/meson.build  |   18 +
 .../pgoutput_cluster/pgoutput_cluster.c       |  321 ++
 src/backend/storage/ipc/ipci.c                |    3 +
 src/backend/tcop/utility.c                    |   11 +
 src/backend/utils/activity/backend_progress.c |   16 +
 .../utils/activity/wait_event_names.txt       |    1 +
 src/backend/utils/cache/inval.c               |   22 +
 src/backend/utils/cache/relcache.c            |    5 +
 src/backend/utils/time/snapmgr.c              |    6 +-
 src/bin/psql/tab-complete.c                   |    5 +-
 src/include/access/heapam.h                   |   19 +-
 src/include/access/heapam_xlog.h              |    2 +
 src/include/access/tableam.h                  |   10 +
 src/include/access/xact.h                     |    2 +
 src/include/catalog/index.h                   |    3 +
 src/include/commands/cluster.h                |  117 +-
 src/include/commands/progress.h               |   17 +-
 src/include/commands/vacuum.h                 |   17 +-
 src/include/replication/snapbuild.h           |    2 +
 src/include/storage/lockdefs.h                |    2 +-
 src/include/storage/lwlocklist.h              |    1 +
 src/include/utils/backend_progress.h          |    3 +-
 src/include/utils/inval.h                     |    2 +
 src/include/utils/rel.h                       |    7 +-
 src/include/utils/snapmgr.h                   |    3 +
 src/test/regress/expected/rules.out           |   17 +-
 44 files changed, 3876 insertions(+), 259 deletions(-)
 create mode 100644 src/backend/replication/pgoutput_cluster/Makefile
 create mode 100644 src/backend/replication/pgoutput_cluster/meson.build
 create mode 100644 src/backend/replication/pgoutput_cluster/pgoutput_cluster.c

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 991f629907..fe1ba36f40 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -5567,14 +5567,35 @@ FROM pg_stat_get_backend_idset() AS backendid;
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>heap_tuples_written</structfield> <type>bigint</type>
+       <structfield>heap_tuples_inserted</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of heap tuples written.
+       Number of heap tuples inserted.
        This counter only advances when the phase is
        <literal>seq scanning heap</literal>,
-       <literal>index scanning heap</literal>
-       or <literal>writing new heap</literal>.
+       <literal>index scanning heap</literal>,
+       <literal>writing new heap</literal>
+       or <literal>catch-up</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>heap_tuples_updated</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of heap tuples updated.
+       This counter only advances when the phase is <literal>catch-up</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>heap_tuples_deleted</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of heap tuples deleted.
+       This counter only advances when the phase is <literal>catch-up</literal>.
       </para></entry>
      </row>
 
@@ -5655,6 +5676,13 @@ FROM pg_stat_get_backend_idset() AS backendid;
        <command>CLUSTER</command> is currently writing the new heap.
      </entry>
     </row>
+    <row>
+     <entry><literal>catch-up</literal></entry>
+     <entry>
+       <command>CLUSTER</command> is currently processing the DML commands
+       that other transactions executed during any of the preceding phase.
+     </entry>
+    </row>
     <row>
      <entry><literal>swapping relation files</literal></entry>
      <entry>
diff --git a/doc/src/sgml/ref/cluster.sgml b/doc/src/sgml/ref/cluster.sgml
index c5760244e6..0fe4e9603b 100644
--- a/doc/src/sgml/ref/cluster.sgml
+++ b/doc/src/sgml/ref/cluster.sgml
@@ -26,6 +26,7 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r
 <phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase>
 
     VERBOSE [ <replaceable class="parameter">boolean</replaceable> ]
+    CONCURRENTLY [ <replaceable class="parameter">boolean</replaceable> ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -69,14 +70,18 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r
    <replaceable class="parameter">table_name</replaceable> reclusters all the
    previously-clustered tables in the current database that the calling user
    has privileges for.  This form of <command>CLUSTER</command> cannot be
-   executed inside a transaction block.
+   executed inside a transaction block. Also, if
+   the <literal>CONCURRENTLY</literal> option is used with this form, system
+   catalogs and <acronym>TOAST</acronym> tables are not processed.
   </para>
 
   <para>
-   When a table is being clustered, an <literal>ACCESS
-   EXCLUSIVE</literal> lock is acquired on it. This prevents any other
-   database operations (both reads and writes) from operating on the
-   table until the <command>CLUSTER</command> is finished.
+   When a table is being clustered, an <literal>ACCESS EXCLUSIVE</literal>
+   lock is acquired on it. This prevents any other database operations (both
+   reads and writes) from operating on the table until
+   the <command>CLUSTER</command> is finished. If you want to keep the table
+   accessible during the clustering, consider using
+   the <literal>CONCURRENTLY</literal> option.
   </para>
  </refsect1>
 
@@ -111,6 +116,105 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>CONCURRENTLY</literal></term>
+    <listitem>
+     <para>
+      Allow other transactions to use the table while it is being clustered.
+     </para>
+
+     <para>
+      Internally, <command>CLUSTER</command> copies the contents of the table
+      (ignoring dead tuples) into a new file, sorted by the specified index,
+      and also creates a new file for each index. Then it swaps the old and
+      new files for the table and all the indexes, and deletes the old
+      files. The <literal>ACCESS EXCLUSIVE</literal> lock is needed to make
+      sure that the old files do not change during the processing because the
+      chnages would get lost due to the swap.
+     </para>
+
+     <para>
+      With the <literal>CONCURRENTLY</literal> option, the <literal>ACCESS
+      EXCLUSIVE</literal> lock is only acquired to swap the table and index
+      files. The data changes that took place during the creation of the new
+      table and index files are captured using logical decoding
+      (<xref linkend="logicaldecoding"/>) and applied before
+      the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock
+      is typically held only for the time needed to swap the files, which
+      should be pretty short. However, the time might still be noticeable
+      noticeable if too many data changes have been done to the table
+      while <command>CLUSTER</command> was waiting for the lock: those changes
+      must be processed before the files are swapped.
+     </para>
+
+     <para>
+      Note that <command>CLUSTER</command> with the
+      the <literal>CONCURRENTLY</literal> option does not try to order the
+      rows inserted into the table after the clustering started. Also
+      note <command>CLUSTER</command> might fail to complete due to DDL
+      commands executed on the table by other transactions during the
+      clustering.
+     </para>
+
+     <note>
+      <para>
+       In addition to the temporary space requirements explained below,
+       the <literal>CONCURRENTLY</literal> option can add to the usage of
+       temporary space a bit more. The reason is that other transactions can
+       perform DML operations which cannot be applied to the new file until
+       <command>CLUSTER</command> has copied all the tuples from the old
+       file. Thus the tuples inserted into the old file during the copying are
+       also stored in separately in a temporary file, so they can eventually
+       be applied to the new file.
+      </para>
+
+      <para>
+       Furthermore, the data changes performed during the copying are
+       extracted from <link linkend="wal">write-ahead log</link> (WAL), and
+       this extraction (decoding) only takes place when certain amount of WAL
+       has been written. Therefore, WAL removal can be delayed by this
+       threshold. Currently the threshold is equal to the value of
+       the <link linkend="guc-wal-segment-size"><varname>wal_segment_size</varname></link>
+       configuration parameter.
+      </para>
+     </note>
+
+     <para>
+      The <literal>CONCURRENTLY</literal> option cannot be used in the
+      following cases:
+
+      <itemizedlist>
+       <listitem>
+        <para>
+          The table is a system catalog or a <acronym>TOAST</acronym> table.
+        </para>
+       </listitem>
+
+       <listitem>
+        <para>
+         <command>CLUSTER</command> is executed inside a transaction block.
+        </para>
+       </listitem>
+
+       <listitem>
+        <para>
+          The <link linkend="guc-wal-level"><varname>wal_level</varname></link>
+          configuration parameter is less than <literal>logical</literal>.
+        </para>
+       </listitem>
+
+       <listitem>
+        <para>
+         The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+         configuration parameter does not allow for creation of an additional
+         replication slot.
+        </para>
+       </listitem>
+      </itemizedlist>
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">boolean</replaceable></term>
     <listitem>
diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index 9857b35627..298cf7298d 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -39,6 +39,7 @@ VACUUM [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <re
     SKIP_DATABASE_STATS [ <replaceable class="parameter">boolean</replaceable> ]
     ONLY_DATABASE_STATS [ <replaceable class="parameter">boolean</replaceable> ]
     BUFFER_USAGE_LIMIT <replaceable class="parameter">size</replaceable>
+    CONCURRENTLY [ <replaceable class="parameter">boolean</replaceable> ]
 
 <phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase>
 
@@ -61,8 +62,12 @@ VACUUM [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <re
   <para>
    Without a <replaceable class="parameter">table_and_columns</replaceable>
    list, <command>VACUUM</command> processes every table and materialized view
-   in the current database that the current user has permission to vacuum.
-   With a list, <command>VACUUM</command> processes only those table(s).
+   in the current database that the current user has permission to vacuum. If
+   the <literal>CONCURRENTLY</literal> is specified (see below), tables which
+   have not been clustered yet are silently skipped. With a
+   list, <command>VACUUM</command> processes only those table(s). If
+   the <literal>CONCURRENTLY</literal> is specified, the list may only contain
+   tables which have already been clustered.
   </para>
 
   <para>
@@ -360,6 +365,24 @@ VACUUM [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <re
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>CONCURRENTLY</literal></term>
+    <listitem>
+     <para>
+      Allow other transactions to use the table while it is being vacuumed. If
+      this option is specified, <command>VACUUM</command> can only process
+      tables which have already been clustered. For more information, see the
+      description of the <literal>CONCURRENTLY</literal> of the
+      <xref linkend="sql-cluster"/> command.
+     </para>
+
+     <para>
+      The <literal>CONCURRENTLY</literal> option can only be used
+      if <literal>FULL</literal> is used at the same time.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">boolean</replaceable></term>
     <listitem>
diff --git a/src/Makefile b/src/Makefile
index 2f31a2f20a..8b9d30ff72 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,6 +23,7 @@ SUBDIRS = \
     interfaces \
     backend/replication/libpqwalreceiver \
     backend/replication/pgoutput \
+    backend/replication/pgoutput_cluster \
     fe_utils \
     bin \
     pl \
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index 90d0654e62..183055647b 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -320,7 +320,8 @@ toast_save_datum(Relation rel, Datum value,
         memcpy(VARDATA(&chunk_data), data_p, chunk_size);
         toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
 
-        heap_insert(toastrel, toasttup, mycid, options, NULL);
+        heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid,
+                    options, NULL);
 
         /*
          * Create the index entry.  We cheat a little here by not using
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b20147a0..493c351d7f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -75,7 +75,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
                                   Buffer newbuf, HeapTuple oldtup,
                                   HeapTuple newtup, HeapTuple old_key_tuple,
-                                  bool all_visible_cleared, bool new_all_visible_cleared);
+                                  bool all_visible_cleared, bool new_all_visible_cleared,
+                                  bool wal_logical);
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                            Bitmapset *interesting_cols,
                                            Bitmapset *external_cols,
@@ -1975,7 +1976,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
 /*
  *    heap_insert        - insert tuple into a heap
  *
- * The new tuple is stamped with current transaction ID and the specified
+ * The new tuple is stamped with specified transaction ID and the specified
  * command ID.
  *
  * See table_tuple_insert for comments about most of the input flags, except
@@ -1991,15 +1992,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * reflected into *tup.
  */
 void
-heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-            int options, BulkInsertState bistate)
+heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+            CommandId cid, int options, BulkInsertState bistate)
 {
-    TransactionId xid = GetCurrentTransactionId();
     HeapTuple    heaptup;
     Buffer        buffer;
     Buffer        vmbuffer = InvalidBuffer;
     bool        all_visible_cleared = false;
 
+    Assert(TransactionIdIsValid(xid));
+
     /* Cheap, simplistic check that the tuple matches the rel's rowtype. */
     Assert(HeapTupleHeaderGetNatts(tup->t_data) <=
            RelationGetNumberOfAttributes(relation));
@@ -2079,8 +2081,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
         /*
          * If this is a catalog, we need to transmit combo CIDs to properly
          * decode, so log that as well.
+         *
+         * Currently we only pass HEAP_INSERT_NO_LOGICAL when doing VACUUM
+         * FULL / CLUSTER, in which case the visibility information does not
+         * change. Therefore, there's no need to update the decoding snapshot.
          */
-        if (RelationIsAccessibleInLogicalDecoding(relation))
+        if ((options & HEAP_INSERT_NO_LOGICAL) == 0 &&
+            RelationIsAccessibleInLogicalDecoding(relation))
             log_heap_new_cid(relation, heaptup);
 
         /*
@@ -2624,7 +2631,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 void
 simple_heap_insert(Relation relation, HeapTuple tup)
 {
-    heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
+    heap_insert(relation, tup, GetCurrentTransactionId(),
+                GetCurrentCommandId(true), 0, NULL);
 }
 
 /*
@@ -2681,11 +2689,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
  */
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
-            CommandId cid, Snapshot crosscheck, bool wait,
-            TM_FailureData *tmfd, bool changingPart)
+            TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait,
+            TM_FailureData *tmfd, bool changingPart,
+            bool wal_logical)
 {
     TM_Result    result;
-    TransactionId xid = GetCurrentTransactionId();
     ItemId        lp;
     HeapTupleData tp;
     Page        page;
@@ -2702,6 +2710,7 @@ heap_delete(Relation relation, ItemPointer tid,
     bool        old_key_copied = false;
 
     Assert(ItemPointerIsValid(tid));
+    Assert(TransactionIdIsValid(xid));
 
     /*
      * Forbid this during a parallel operation, lest it allocate a combo CID.
@@ -2927,7 +2936,8 @@ l1:
      * Compute replica identity tuple before entering the critical section so
      * we don't PANIC upon a memory allocation failure.
      */
-    old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+    old_key_tuple = wal_logical ?
+        ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL;
 
     /*
      * If this is the first possibly-multixact-able operation in the current
@@ -2995,8 +3005,12 @@ l1:
         /*
          * For logical decode we need combo CIDs to properly decode the
          * catalog
+         *
+         * Like in heap_insert(), visibility is unchanged when called from
+         * VACUUM FULL / CLUSTER.
          */
-        if (RelationIsAccessibleInLogicalDecoding(relation))
+        if (wal_logical &&
+            RelationIsAccessibleInLogicalDecoding(relation))
             log_heap_new_cid(relation, &tp);
 
         xlrec.flags = 0;
@@ -3017,6 +3031,15 @@ l1:
                 xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
         }
 
+        /*
+         * Unlike UPDATE, DELETE is decoded even if there is no old key, so it
+         * does not help to clear both XLH_DELETE_CONTAINS_OLD_TUPLE and
+         * XLH_DELETE_CONTAINS_OLD_KEY. Thus we need an extra flag. TODO
+         * Consider not decoding tuples w/o the old tuple/key instead.
+         */
+        if (!wal_logical)
+            xlrec.flags |= XLH_DELETE_NO_LOGICAL;
+
         XLogBeginInsert();
         XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
 
@@ -3106,10 +3129,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
     TM_Result    result;
     TM_FailureData tmfd;
 
-    result = heap_delete(relation, tid,
+    result = heap_delete(relation, tid, GetCurrentTransactionId(),
                          GetCurrentCommandId(true), InvalidSnapshot,
                          true /* wait for commit */ ,
-                         &tmfd, false /* changingPart */ );
+                         &tmfd, false, /* changingPart */
+                         true /* wal_logical */);
     switch (result)
     {
         case TM_SelfModified:
@@ -3148,12 +3172,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  */
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-            CommandId cid, Snapshot crosscheck, bool wait,
-            TM_FailureData *tmfd, LockTupleMode *lockmode,
-            TU_UpdateIndexes *update_indexes)
+            TransactionId xid, CommandId cid, Snapshot crosscheck,
+            bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
+            TU_UpdateIndexes *update_indexes, bool wal_logical)
 {
     TM_Result    result;
-    TransactionId xid = GetCurrentTransactionId();
     Bitmapset  *hot_attrs;
     Bitmapset  *sum_attrs;
     Bitmapset  *key_attrs;
@@ -3193,6 +3216,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                 infomask2_new_tuple;
 
     Assert(ItemPointerIsValid(otid));
+    Assert(TransactionIdIsValid(xid));
 
     /* Cheap, simplistic check that the tuple matches the rel's rowtype. */
     Assert(HeapTupleHeaderGetNatts(newtup->t_data) <=
@@ -3981,8 +4005,12 @@ l2:
         /*
          * For logical decoding we need combo CIDs to properly decode the
          * catalog.
+         *
+         * Like in heap_insert(), visibility is unchanged when called from
+         * VACUUM FULL / CLUSTER.
          */
-        if (RelationIsAccessibleInLogicalDecoding(relation))
+        if (wal_logical &&
+            RelationIsAccessibleInLogicalDecoding(relation))
         {
             log_heap_new_cid(relation, &oldtup);
             log_heap_new_cid(relation, heaptup);
@@ -3992,7 +4020,8 @@ l2:
                                  newbuf, &oldtup, heaptup,
                                  old_key_tuple,
                                  all_visible_cleared,
-                                 all_visible_cleared_new);
+                                 all_visible_cleared_new,
+                                 wal_logical);
         if (newbuf != buffer)
         {
             PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -4225,10 +4254,10 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
     TM_FailureData tmfd;
     LockTupleMode lockmode;
 
-    result = heap_update(relation, otid, tup,
+    result = heap_update(relation, otid, tup, GetCurrentTransactionId(),
                          GetCurrentCommandId(true), InvalidSnapshot,
                          true /* wait for commit */ ,
-                         &tmfd, &lockmode, update_indexes);
+                         &tmfd, &lockmode, update_indexes, true);
     switch (result)
     {
         case TM_SelfModified:
@@ -8357,7 +8386,8 @@ static XLogRecPtr
 log_heap_update(Relation reln, Buffer oldbuf,
                 Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
                 HeapTuple old_key_tuple,
-                bool all_visible_cleared, bool new_all_visible_cleared)
+                bool all_visible_cleared, bool new_all_visible_cleared,
+                bool wal_logical)
 {
     xl_heap_update xlrec;
     xl_heap_header xlhdr;
@@ -8368,10 +8398,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
                 suffixlen = 0;
     XLogRecPtr    recptr;
     Page        page = BufferGetPage(newbuf);
-    bool        need_tuple_data = RelationIsLogicallyLogged(reln);
+    bool        need_tuple_data;
     bool        init;
     int            bufflags;
 
+    need_tuple_data = RelationIsLogicallyLogged(reln) && wal_logical;
+
     /* Caller should not call me on a non-WAL-logged relation */
     Assert(RelationNeedsWAL(reln));
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..02fd6d2983 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -33,6 +33,7 @@
 #include "catalog/index.h"
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
+#include "commands/cluster.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
 #include "miscadmin.h"
@@ -53,6 +54,9 @@ static void reform_and_rewrite_tuple(HeapTuple tuple,
 static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
                                    HeapTuple tuple,
                                    OffsetNumber tupoffset);
+static bool accept_tuple_for_concurrent_copy(HeapTuple tuple,
+                                             Snapshot snapshot,
+                                             Buffer buffer);
 
 static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan);
 
@@ -250,7 +254,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
     tuple->t_tableOid = slot->tts_tableOid;
 
     /* Perform the insertion, and copy the resulting ItemPointer */
-    heap_insert(relation, tuple, cid, options, bistate);
+    heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+                bistate);
     ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
     if (shouldFree)
@@ -273,7 +278,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
     options |= HEAP_INSERT_SPECULATIVE;
 
     /* Perform the insertion, and copy the resulting ItemPointer */
-    heap_insert(relation, tuple, cid, options, bistate);
+    heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+                bistate);
     ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
     if (shouldFree)
@@ -307,7 +313,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
      * the storage itself is cleaning the dead tuples by itself, it is the
      * time to call the index tuple deletion also.
      */
-    return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
+    return heap_delete(relation, tid, GetCurrentTransactionId(), cid,
+                       crosscheck, wait, tmfd, changingPart, true);
 }
 
 
@@ -325,8 +332,9 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
     slot->tts_tableOid = RelationGetRelid(relation);
     tuple->t_tableOid = slot->tts_tableOid;
 
-    result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-                         tmfd, lockmode, update_indexes);
+    result = heap_update(relation, otid, tuple, GetCurrentTransactionId(),
+                         cid, crosscheck, wait,
+                         tmfd, lockmode, update_indexes, true);
     ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
     /*
@@ -686,6 +694,8 @@ static void
 heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                                  Relation OldIndex, bool use_sort,
                                  TransactionId OldestXmin,
+                                 Snapshot snapshot,
+                                 LogicalDecodingContext *decoding_ctx,
                                  TransactionId *xid_cutoff,
                                  MultiXactId *multi_cutoff,
                                  double *num_tuples,
@@ -706,6 +716,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
     bool       *isnull;
     BufferHeapTupleTableSlot *hslot;
     BlockNumber prev_cblock = InvalidBlockNumber;
+    bool    concurrent = snapshot != NULL;
+    XLogRecPtr    end_of_wal_prev = GetFlushRecPtr(NULL);
 
     /* Remember if it's a system catalog */
     is_system_catalog = IsSystemRelation(OldHeap);
@@ -786,6 +798,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
         HeapTuple    tuple;
         Buffer        buf;
         bool        isdead;
+        HTSV_Result    vis;
 
         CHECK_FOR_INTERRUPTS();
 
@@ -840,7 +853,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
         LockBuffer(buf, BUFFER_LOCK_SHARE);
 
-        switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))
+        switch ((vis = HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf)))
         {
             case HEAPTUPLE_DEAD:
                 /* Definitely dead */
@@ -856,14 +869,15 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
             case HEAPTUPLE_INSERT_IN_PROGRESS:
 
                 /*
-                 * Since we hold exclusive lock on the relation, normally the
-                 * only way to see this is if it was inserted earlier in our
-                 * own transaction.  However, it can happen in system
+                 * As long as we hold exclusive lock on the relation, normally
+                 * the only way to see this is if it was inserted earlier in
+                 * our own transaction.  However, it can happen in system
                  * catalogs, since we tend to release write lock before commit
-                 * there.  Give a warning if neither case applies; but in any
-                 * case we had better copy it.
+                 * there. Also, there's no exclusive lock during concurrent
+                 * processing. Give a warning if neither case applies; but in
+                 * any case we had better copy it.
                  */
-                if (!is_system_catalog &&
+                if (!is_system_catalog && !concurrent &&
                     !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data)))
                     elog(WARNING, "concurrent insert in progress within table \"%s\"",
                          RelationGetRelationName(OldHeap));
@@ -875,7 +889,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                 /*
                  * Similar situation to INSERT_IN_PROGRESS case.
                  */
-                if (!is_system_catalog &&
+                if (!is_system_catalog && !concurrent &&
                     !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data)))
                     elog(WARNING, "concurrent delete in progress within table \"%s\"",
                          RelationGetRelationName(OldHeap));
@@ -889,8 +903,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                 break;
         }
 
-        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-
         if (isdead)
         {
             *tups_vacuumed += 1;
@@ -901,9 +913,39 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                 *tups_vacuumed += 1;
                 *tups_recently_dead -= 1;
             }
+
+            LockBuffer(buf, BUFFER_LOCK_UNLOCK);
             continue;
         }
 
+        /*
+         * Ignore concurrent changes now, they'll be processed later via
+         * logical decoding. INSERT_IN_PROGRESS is rejected right away because
+         * our snapshot should represent a point in time which should precede
+         * (or be equal to) the state of transactions as it was when the
+         * "SatisfiesVacuum" test was performed. Thus
+         * accept_tuple_for_concurrent_copy() should not consider the tuple
+         * inserted.
+         */
+        if (concurrent &&
+            (vis == HEAPTUPLE_INSERT_IN_PROGRESS ||
+             !accept_tuple_for_concurrent_copy(tuple, snapshot, buf)))
+        {
+            LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+            continue;
+        }
+
+        /*
+         * In the concurrent case, we should not unlock the buffer until the
+         * tuple has been copied to the new file: if a concurrent transaction
+         * marked it updated or deleted in between, we'd fail to replay that
+         * transaction's changes because then we'd try to perform the same
+         * UPDATE / DELETE twice. XXX Should we instead create a copy of the
+         * tuple so that the buffer can be unlocked right away?
+         */
+        if (!concurrent)
+            LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
         *num_tuples += 1;
         if (tuplesort != NULL)
         {
@@ -920,7 +962,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
         {
             const int    ct_index[] = {
                 PROGRESS_CLUSTER_HEAP_TUPLES_SCANNED,
-                PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN
+                PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED
             };
             int64        ct_val[2];
 
@@ -935,6 +977,35 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
             ct_val[1] = *num_tuples;
             pgstat_progress_update_multi_param(2, ct_index, ct_val);
         }
+
+        /* See the comment on unlocking above. */
+        if (concurrent)
+            LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+        /*
+         * Process the WAL produced by the load, as well as by other
+         * transactions, so that the replication slot can advance and WAL does
+         * not pile up. Use wal_segment_size as a threshold so that we do not
+         * introduce the decoding overhead too often.
+         *
+         * Of course, we must not apply the changes until the initial load has
+         * completed.
+         *
+         * Note that our insertions into the new table should not be decoded
+         * as we (intentionally) do not write the logical decoding specific
+         * information to WAL.
+         */
+        if (concurrent)
+        {
+            XLogRecPtr    end_of_wal;
+
+            end_of_wal = GetFlushRecPtr(NULL);
+            if ((end_of_wal - end_of_wal_prev) > wal_segment_size)
+            {
+                cluster_decode_concurrent_changes(decoding_ctx, end_of_wal);
+                end_of_wal_prev = end_of_wal;
+            }
+        }
     }
 
     if (indexScan != NULL)
@@ -978,7 +1049,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                                      values, isnull,
                                      rwstate);
             /* Report n_tuples */
-            pgstat_progress_update_param(PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN,
+            pgstat_progress_update_param(PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED,
                                          n_tuples);
         }
 
@@ -2583,6 +2654,56 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
     }
 }
 
+/*
+ * Check if the tuple was inserted, updated or deleted while
+ * heapam_relation_copy_for_cluster() was copying the data.
+ *
+ * 'snapshot' is used to determine whether xmin/xmax was set by a transaction
+ * that is still in-progress, or one that started in the future from the
+ * snapshot perspective.
+ *
+ * Returns true if the insertion is visible to 'snapshot', but clear xmax if
+ * it was set by a transaction which is in-progress or in the future from the
+ * snapshot perspective. (The xmax will be set later, when we decode the
+ * corresponding UPDATE / DELETE from WAL.)
+ *
+ * Returns false if the insertion is not visible to 'snapshot'.
+ */
+static bool
+accept_tuple_for_concurrent_copy(HeapTuple tuple, Snapshot snapshot,
+                                 Buffer buffer)
+{
+    Assert(snapshot->snapshot_type == SNAPSHOT_MVCC);
+
+    /*
+     * First, check if the tuple should be rejected because it was inserted
+     * concurrently.
+     */
+    if (!HeapTupleMVCCInserted(tuple, snapshot, buffer))
+        return false;
+
+    /*
+     * If the tuple was deleted / updated but our snapshot still sees it, we
+     * need to keep it. In that case, clear the information that indicates the
+     * deletion / update. Otherwise the tuple chain would stay incomplete (as
+     * we will reject the new tuple above), and the delete / update would fail
+     * if executed later during logical decoding.
+     */
+    if (TransactionIdIsNormal(HeapTupleHeaderGetRawXmax(tuple->t_data)) &&
+        HeapTupleMVCCNotDeleted(tuple, snapshot, buffer))
+    {
+        /* TODO More work needed here?*/
+        tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
+        HeapTupleHeaderSetXmax(tuple->t_data, 0);
+    }
+
+    /*
+     * Accept the tuple even if our snapshot considers it deleted - older
+     * snapshots can still see the tuple.
+     */
+    return true;
+}
+
 
 /* ------------------------------------------------------------------------
  * Definition of the heap table access method.
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 9243feed01..d702592469 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -955,16 +955,31 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
  * did TransactionIdIsInProgress in each call --- to no avail, as long as the
  * inserting/deleting transaction was still running --- which was more cycles
  * and more contention on ProcArrayLock.
+ *
+ * The checks are split into two functions, HeapTupleMVCCInserted() and
+ * HeapTupleMVCCNotDeleted(), because they are also useful separately.
  */
 static bool
 HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
                        Buffer buffer)
 {
-    HeapTupleHeader tuple = htup->t_data;
-
     Assert(ItemPointerIsValid(&htup->t_self));
     Assert(htup->t_tableOid != InvalidOid);
 
+    return HeapTupleMVCCInserted(htup, snapshot, buffer) &&
+        HeapTupleMVCCNotDeleted(htup, snapshot, buffer);
+}
+
+/*
+ * HeapTupleMVCCInserted
+ *        True iff heap tuple was successfully inserted for the given MVCC
+ *        snapshot.
+ */
+bool
+HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot, Buffer buffer)
+{
+    HeapTupleHeader tuple = htup->t_data;
+
     if (!HeapTupleHeaderXminCommitted(tuple))
     {
         if (HeapTupleHeaderXminInvalid(tuple))
@@ -1073,6 +1088,17 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
     }
 
     /* by here, the inserting transaction has committed */
+    return true;
+}
+
+/*
+ * HeapTupleMVCCNotDeleted
+ *        True iff heap tuple was not deleted for the given MVCC snapshot.
+ */
+bool
+HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot, Buffer buffer)
+{
+    HeapTupleHeader tuple = htup->t_data;
 
     if (tuple->t_infomask & HEAP_XMAX_INVALID)    /* xid invalid or aborted */
         return true;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d119ab909d..f9b8cb4da7 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -124,6 +124,18 @@ static FullTransactionId XactTopFullTransactionId = {InvalidTransactionId};
 static int    nParallelCurrentXids = 0;
 static TransactionId *ParallelCurrentXids;
 
+/*
+ * Another case that requires TransactionIdIsCurrentTransactionId() to behave
+ * specially is when CLUSTER CONCURRENTLY is processing data changes made in
+ * the old storage of a table by other transactions. When applying the changes
+ * to the new storage, the backend executing the CLUSTER command needs to act
+ * on behalf on those other transactions. The transactions responsible for the
+ * changes in the old storage are stored in this array, sorted by
+ * xidComparator.
+ */
+static int                 nClusterCurrentXids = 0;
+static TransactionId    *ClusterCurrentXids = NULL;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
@@ -970,6 +982,8 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
         int            low,
                     high;
 
+        Assert(nClusterCurrentXids == 0);
+
         low = 0;
         high = nParallelCurrentXids - 1;
         while (low <= high)
@@ -989,6 +1003,21 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
         return false;
     }
 
+    /*
+     * When executing CLUSTER CONCURRENTLY, the array of current transactions
+     * is given.
+     */
+    if (nClusterCurrentXids > 0)
+    {
+        Assert(nParallelCurrentXids == 0);
+
+        return bsearch(&xid,
+                       ClusterCurrentXids,
+                       nClusterCurrentXids,
+                       sizeof(TransactionId),
+                       xidComparator) != NULL;
+    }
+
     /*
      * We will return true for the Xid of the current subtransaction, any of
      * its subcommitted children, any of its parents, or any of their
@@ -5621,6 +5650,29 @@ EndParallelWorkerTransaction(void)
     CurrentTransactionState->blockState = TBLOCK_DEFAULT;
 }
 
+/*
+ * SetClusterCurrentXids
+ *        Set the XID array that TransactionIdIsCurrentTransactionId() should
+ *        use.
+ */
+void
+SetClusterCurrentXids(TransactionId *xip, int xcnt)
+{
+    ClusterCurrentXids = xip;
+    nClusterCurrentXids = xcnt;
+}
+
+/*
+ * ResetClusterCurrentXids
+ *        Undo the effect of SetClusterCurrentXids().
+ */
+void
+ResetClusterCurrentXids(void)
+{
+    ClusterCurrentXids = NULL;
+    nClusterCurrentXids = 0;
+}
+
 /*
  * ShowTransactionState
  *        Debug support
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index a819b4197c..a25c84d7ae 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1415,22 +1415,7 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId,
     for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++)
         opclassOptions[i] = get_attoptions(oldIndexId, i + 1);
 
-    /* Extract statistic targets for each attribute */
-    stattargets = palloc0_array(NullableDatum, newInfo->ii_NumIndexAttrs);
-    for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++)
-    {
-        HeapTuple    tp;
-        Datum        dat;
-
-        tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(oldIndexId), Int16GetDatum(i + 1));
-        if (!HeapTupleIsValid(tp))
-            elog(ERROR, "cache lookup failed for attribute %d of relation %u",
-                 i + 1, oldIndexId);
-        dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull);
-        ReleaseSysCache(tp);
-        stattargets[i].value = dat;
-        stattargets[i].isnull = isnull;
-    }
+    stattargets = get_index_stattargets(oldIndexId, newInfo);
 
     /*
      * Now create the new index.
@@ -1469,6 +1454,32 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId,
     return newIndexId;
 }
 
+NullableDatum *
+get_index_stattargets(Oid indexid, IndexInfo *indInfo)
+{
+    NullableDatum *stattargets;
+
+    /* Extract statistic targets for each attribute */
+    stattargets = palloc0_array(NullableDatum, indInfo->ii_NumIndexAttrs);
+    for (int i = 0; i < indInfo->ii_NumIndexAttrs; i++)
+    {
+        HeapTuple    tp;
+        Datum        dat;
+        bool        isnull;
+
+        tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(indexid), Int16GetDatum(i + 1));
+        if (!HeapTupleIsValid(tp))
+            elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+                 i + 1, indexid);
+        dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull);
+        ReleaseSysCache(tp);
+        stattargets[i].value = dat;
+        stattargets[i].isnull = isnull;
+    }
+
+    return stattargets;
+}
+
 /*
  * index_concurrently_build
  *
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..fddab1cfa9 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1236,16 +1236,19 @@ CREATE VIEW pg_stat_progress_cluster AS
                       WHEN 2 THEN 'index scanning heap'
                       WHEN 3 THEN 'sorting tuples'
                       WHEN 4 THEN 'writing new heap'
-                      WHEN 5 THEN 'swapping relation files'
-                      WHEN 6 THEN 'rebuilding index'
-                      WHEN 7 THEN 'performing final cleanup'
+                      WHEN 5 THEN 'catch-up'
+                      WHEN 6 THEN 'swapping relation files'
+                      WHEN 7 THEN 'rebuilding index'
+                      WHEN 8 THEN 'performing final cleanup'
                       END AS phase,
         CAST(S.param3 AS oid) AS cluster_index_relid,
         S.param4 AS heap_tuples_scanned,
-        S.param5 AS heap_tuples_written,
-        S.param6 AS heap_blks_total,
-        S.param7 AS heap_blks_scanned,
-        S.param8 AS index_rebuild_count
+        S.param5 AS heap_tuples_inserted,
+        S.param6 AS heap_tuples_updated,
+        S.param7 AS heap_tuples_deleted,
+        S.param8 AS heap_blks_total,
+        S.param9 AS heap_blks_scanned,
+        S.param10 AS index_rebuild_count
     FROM pg_stat_get_progress_info('CLUSTER') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 194d143cf4..6397f7f8c4 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -25,6 +25,10 @@
 #include "access/toast_internals.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
+#include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
 #include "catalog/heap.h"
@@ -32,6 +36,7 @@
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_am.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/toasting.h"
@@ -40,10 +45,15 @@
 #include "commands/progress.h"
 #include "commands/tablecmds.h"
 #include "commands/vacuum.h"
+#include "executor/executor.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "pgstat.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
+#include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "utils/acl.h"
@@ -57,6 +67,8 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
+typedef struct RewriteStateData *RewriteState;
+
 /*
  * This struct is used to pass around the information on tables to be
  * clustered. We need this so we can make a list of them when invoked without
@@ -68,17 +80,175 @@ typedef struct
     Oid            indexOid;
 } RelToCluster;
 
+/*
+ * The following definitions are used for concurrent processing.
+ */
+
+/*
+ * OID of the table being processed by CLUSTER CONCURRENTLY by this backend.
+ */
+static Oid    clustered_rel    = InvalidOid;
+/* The same for its TOAST relation. */
+static Oid    clustered_rel_toast    = InvalidOid;
+
+/* XXX Do we also need to mention VACUUM FULL CONCURRENTLY? */
+#define CLUSTER_IN_PROGRESS_MESSAGE \
+    "relation \"%s\" is already being processed by CLUSTER CONCURRENTLY"
+
+/*
+ * Everything we need to call ExecInsertIndexTuples().
+ */
+typedef struct IndexInsertState
+{
+    ResultRelInfo *rri;
+    EState       *estate;
+    ExprContext *econtext;
+
+    Relation    ident_index;
+} IndexInsertState;
 
-static void cluster_multiple_rels(List *rtcs, ClusterParams *params);
-static void rebuild_relation(Relation OldHeap, Relation index, bool verbose);
+/*
+ * Catalog information to check if another backend changed the relation in
+ * such a way that makes CLUSTER CONCURRENTLY unable to continue. Such changes
+ * are possible because cluster_rel() has to release its lock on the relation
+ * in order to acquire AccessExclusiveLock that it needs to swap the relation
+ * files.
+ *
+ * The most obvious problem is that the tuple descriptor has changed, since
+ * then the tuples we try to insert into the new storage are not guaranteed to
+ * fit into the storage.
+ *
+ * Another problem is that multiple backends might call cluster_rel(). This is
+ * not necessarily a correctness issue, but it definitely means wasted CPU
+ * time.
+ *
+ * Where possible, commands which might change the relation in an incompatible
+ * way should check if CLUSTER CONCURRENTLY is running, before they start to
+ * do the actual changes (see is_concurrent_cluster_in_progress()). Anything
+ * else must be caught by check_catalog_changes(), which uses this structure.
+ */
+typedef struct CatalogState
+{
+    /* Tuple descriptor of the relation. */
+    TupleDesc    tupdesc;
+
+    /* The number of indexes tracked. */
+    int        ninds;
+    /* The index OIDs. */
+    Oid        *ind_oids;
+    /* The index tuple descriptors. */
+    TupleDesc    *ind_tupdescs;
+
+    /* The following are copies of the corresponding fields of pg_class. */
+    Oid        reltoastrelid;
+    char    relpersistence;
+    char    replident;
+
+    /* rd_replidindex */
+    Oid        replidindex;
+} CatalogState;
+
+/* The WAL segment being decoded. */
+static XLogSegNo    cluster_current_segment = 0;
+
+static void cluster_multiple_rels(List *rtcs, ClusterParams *params,
+                                  LOCKMODE lock_mode, bool isTopLevel);
+static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
+                             bool concurrent);
 static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
+                            Snapshot snapshot, LogicalDecodingContext *decoding_ctx,
                             bool verbose, bool *pSwapToastByContent,
                             TransactionId *pFreezeXid, MultiXactId *pCutoffMulti);
 static List *get_tables_to_cluster(MemoryContext cluster_context);
 static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
                                                Oid indexOid);
 static bool cluster_is_permitted_for_relation(Oid relid, Oid userid);
+static void check_concurrent_cluster_requirements(Relation rel,
+                                                  bool isTopLevel,
+                                                  bool isCluster);
+static void begin_concurrent_cluster(Relation *rel_p, Relation *index_p,
+                                     bool *entered_p);
+static void end_concurrent_cluster(Oid relid, bool error);
+static void cluster_before_shmem_exit_callback(int code, Datum arg);
+static CatalogState *get_catalog_state(Relation rel);
+static void free_catalog_state(CatalogState *state);
+static void check_catalog_changes(Relation rel, CatalogState *cat_state);
+static LogicalDecodingContext *setup_logical_decoding(Oid relid,
+                                                      const char *slotname,
+                                                      TupleDesc tupdesc);
+static HeapTuple get_changed_tuple(ConcurrentChange *change);
+static void apply_concurrent_changes(ClusterDecodingState *dstate,
+                                     Relation rel, ScanKey key, int nkeys,
+                                     IndexInsertState *iistate);
+static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
+                                    HeapTuple tup, IndexInsertState *iistate,
+                                    TupleTableSlot *index_slot);
+static void apply_concurrent_update(Relation rel, HeapTuple tup,
+                                    HeapTuple tup_target,
+                                    ConcurrentChange *change,
+                                    IndexInsertState *iistate,
+                                    TupleTableSlot *index_slot);
+static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
+                                    ConcurrentChange *change);
+static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
+                                   HeapTuple tup_key,
+                                   Snapshot snapshot,
+                                   IndexInsertState *iistate,
+                                   TupleTableSlot *ident_slot,
+                                   IndexScanDesc *scan_p);
+static void process_concurrent_changes(LogicalDecodingContext *ctx,
+                                       XLogRecPtr end_of_wal,
+                                       Relation rel_dst,
+                                       Relation rel_src,
+                                       ScanKey ident_key,
+                                       int ident_key_nentries,
+                                       IndexInsertState *iistate);
+static IndexInsertState *get_index_insert_state(Relation relation,
+                                                Oid ident_index_id);
+static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src,
+                                  int *nentries);
+static void free_index_insert_state(IndexInsertState *iistate);
+static void cleanup_logical_decoding(LogicalDecodingContext *ctx);
+static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
+                                               Relation cl_index,
+                                               CatalogState    *cat_state,
+                                               LogicalDecodingContext *ctx,
+                                               bool swap_toast_by_content,
+                                               TransactionId frozenXid,
+                                               MultiXactId cutoffMulti);
+static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
+
+/*
+ * Use this API when relation needs to be unlocked, closed and re-opened. If
+ * the relation got dropped while being unlocked, raise ERROR that mentions
+ * the relation name rather than OID.
+ */
+typedef struct RelReopenInfo
+{
+    /*
+     * The relation to be closed. Pointer to the value is stored here so that
+     * the user gets his reference updated automatically on re-opening.
+     *
+     * When calling unlock_and_close_relations(), 'relid' can be passed
+     * instead of 'rel_p' when the caller only needs to gather information for
+     * subsequent opening.
+     */
+    Relation    *rel_p;
+    Oid        relid;
+
+    char        relkind;
+    LOCKMODE    lockmode_orig;    /* The existing lock mode */
+    LOCKMODE    lockmode_new;    /* The lock mode after the relation is
+                                 * re-opened */
 
+    char    *relname;            /* Relation name, initialized automatically. */
+} RelReopenInfo;
+
+static void init_rel_reopen_info(RelReopenInfo *rri, Relation *rel_p,
+                                 Oid relid, LOCKMODE lockmode_orig,
+                                 LOCKMODE lockmode_new);
+static void unlock_and_close_relations(RelReopenInfo *rels, int nrel);
+static void reopen_relations(RelReopenInfo *rels, int nrel);
 
 /*---------------------------------------------------------------------------
  * This cluster code allows for clustering multiple tables at once. Because
@@ -110,10 +280,12 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
     ListCell   *lc;
     ClusterParams params = {0};
     bool        verbose = false;
+    bool        concurrent = false;
     Relation    rel = NULL;
     Oid            indexOid = InvalidOid;
     MemoryContext cluster_context;
     List       *rtcs;
+    LOCKMODE lock_mode;
 
     /* Parse option list */
     foreach(lc, stmt->params)
@@ -122,6 +294,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
 
         if (strcmp(opt->defname, "verbose") == 0)
             verbose = defGetBoolean(opt);
+        else if (strcmp(opt->defname, "concurrently") == 0)
+            concurrent = defGetBoolean(opt);
         else
             ereport(ERROR,
                     (errcode(ERRCODE_SYNTAX_ERROR),
@@ -130,20 +304,30 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
                      parser_errposition(pstate, opt->location)));
     }
 
-    params.options = (verbose ? CLUOPT_VERBOSE : 0);
+    params.options =
+        (verbose ? CLUOPT_VERBOSE : 0) |
+        (concurrent ? CLUOPT_CONCURRENT : 0);
+
+    /*
+     * Determine the lock mode expected by cluster_rel().
+     *
+     * In the exclusive case, we obtain AccessExclusiveLock right away to
+     * avoid lock-upgrade hazard in the single-transaction case. In the
+     * CONCURRENT case, the AccessExclusiveLock will only be used at the end
+     * of processing, supposedly for very short time. Until then, we'll have
+     * to unlock the relation temporarily, so there's no lock-upgrade hazard.
+     */
+    lock_mode = (params.options & CLUOPT_CONCURRENT) == 0 ?
+        AccessExclusiveLock : LOCK_CLUSTER_CONCURRENT;
 
     if (stmt->relation != NULL)
     {
         /* This is the single-relation case. */
         Oid            tableOid;
 
-        /*
-         * Find, lock, and check permissions on the table.  We obtain
-         * AccessExclusiveLock right away to avoid lock-upgrade hazard in the
-         * single-transaction case.
-         */
+        /* Find, lock, and check permissions on the table. */
         tableOid = RangeVarGetRelidExtended(stmt->relation,
-                                            AccessExclusiveLock,
+                                            lock_mode,
                                             0,
                                             RangeVarCallbackMaintainsTable,
                                             NULL);
@@ -198,7 +382,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
              * Do the job. (The function will close the relation, lock is kept
              * till commit.)
              */
-            cluster_rel(rel, indexOid, ¶ms);
+            cluster_rel(rel, indexOid, ¶ms, isTopLevel);
 
             return;
         }
@@ -237,7 +421,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
         rtcs = get_tables_to_cluster_partitioned(cluster_context, indexOid);
 
         /* close relation, releasing lock on parent table */
-        table_close(rel, AccessExclusiveLock);
+        table_close(rel, lock_mode);
     }
     else
     {
@@ -246,7 +430,7 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
     }
 
     /* Do the job. */
-    cluster_multiple_rels(rtcs, ¶ms);
+    cluster_multiple_rels(rtcs, ¶ms, lock_mode, isTopLevel);
 
     /* Start a new transaction for the cleanup work. */
     StartTransactionCommand();
@@ -263,7 +447,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
  * return.
  */
 static void
-cluster_multiple_rels(List *rtcs, ClusterParams *params)
+cluster_multiple_rels(List *rtcs, ClusterParams *params, LOCKMODE lock_mode,
+                      bool isTopLevel)
 {
     ListCell   *lc;
 
@@ -283,13 +468,19 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params)
         /* functions in indexes may want a snapshot set */
         PushActiveSnapshot(GetTransactionSnapshot());
 
-        rel = table_open(rtc->tableOid, AccessExclusiveLock);
+        rel = table_open(rtc->tableOid, lock_mode);
 
-        /*
-         * Do the job. (The function will close the relation, lock is kept
-         * till commit.)
-         */
-        cluster_rel(rel, rtc->indexOid, params);
+        /* Not all relations cannot be processed in the concurrent mode. */
+        if ((params->options & CLUOPT_CONCURRENT) == 0 ||
+            check_relation_is_clusterable_concurrently(rel, DEBUG1,
+                                                       "CLUSTER (CONCURRENTLY)"))
+        {
+            /* Do the job. (The function will close the relation, lock is kept
+             * till commit.) */
+            cluster_rel(rel, rtc->indexOid, params, isTopLevel);
+        }
+        else
+            table_close(rel, lock_mode);
 
         PopActiveSnapshot();
         CommitTransactionCommand();
@@ -313,10 +504,21 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params)
  * instead of index order.  This is the new implementation of VACUUM FULL,
  * and error messages should refer to the operation as VACUUM not CLUSTER.
  *
- * We expect that OldHeap is already locked in AccessExclusiveLock mode.
+ * We expect that OldHeap is already locked. The lock mode is
+ * AccessExclusiveLock for normal processing and LOCK_CLUSTER_CONCURRENT for
+ * concurrent processing (so that SELECT, INSERT, UPDATE and DELETE commands
+ * work, but cluster_rel() cannot be called concurrently for the same
+ * relation).
+ *
+ * Note that, in the concurrent case, the function releases the lock at some
+ * point, in order to get AccessExclusiveLock for the final steps (i.e. to
+ * swap the relation files). To make things simpler, the caller should expect
+ * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The
+ * AccessExclusiveLock is kept till the end of the transaction.)
  */
 void
-cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
+cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params,
+            bool isTopLevel)
 {
     Oid            tableOid = RelationGetRelid(OldHeap);
     Oid            save_userid;
@@ -325,6 +527,41 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
     bool        verbose = ((params->options & CLUOPT_VERBOSE) != 0);
     bool        recheck = ((params->options & CLUOPT_RECHECK) != 0);
     Relation    index = NULL;
+    bool        concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
+    LOCKMODE    lmode;
+    bool        entered, success;
+
+    /* Check that the correct lock is held. */
+    lmode = !concurrent ? AccessExclusiveLock : LOCK_CLUSTER_CONCURRENT;
+
+    /*
+     * Skip the relation if it's being processed concurrently. In such a case,
+     * we cannot rely on a lock because the other backend needs to release it
+     * temporarily at some point.
+     *
+     * This check should not take place until we have a lock that prevents
+     * another backend from starting VACUUM FULL / CLUSTER CONCURRENTLY after
+     * our check.
+     */
+    Assert(CheckRelationLockedByMe(OldHeap, lmode, false));
+    if (is_concurrent_cluster_in_progress(tableOid))
+    {
+        ereport(NOTICE,
+                (errmsg(CLUSTER_IN_PROGRESS_MESSAGE,
+                        RelationGetRelationName(OldHeap))));
+        table_close(OldHeap, lmode);
+        return;
+    }
+
+    /* There are specific requirements on concurrent processing. */
+    if (concurrent)
+    {
+        check_concurrent_cluster_requirements(OldHeap, isTopLevel,
+                                              OidIsValid(indexOid));
+
+        check_relation_is_clusterable_concurrently(OldHeap, ERROR,
+                                                   "CLUSTER (CONCURRENTLY)");
+    }
 
     /* Check for user-requested abort. */
     CHECK_FOR_INTERRUPTS();
@@ -361,7 +598,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
         /* Check that the user still has privileges for the relation */
         if (!cluster_is_permitted_for_relation(tableOid, save_userid))
         {
-            relation_close(OldHeap, AccessExclusiveLock);
+            relation_close(OldHeap, lmode);
             goto out;
         }
 
@@ -376,7 +613,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
          */
         if (RELATION_IS_OTHER_TEMP(OldHeap))
         {
-            relation_close(OldHeap, AccessExclusiveLock);
+            relation_close(OldHeap, lmode);
             goto out;
         }
 
@@ -387,7 +624,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
              */
             if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
             {
-                relation_close(OldHeap, AccessExclusiveLock);
+                relation_close(OldHeap, lmode);
                 goto out;
             }
 
@@ -398,7 +635,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
             if ((params->options & CLUOPT_RECHECK_ISCLUSTERED) != 0 &&
                 !get_index_isclustered(indexOid))
             {
-                relation_close(OldHeap, AccessExclusiveLock);
+                relation_close(OldHeap, lmode);
                 goto out;
             }
         }
@@ -414,6 +651,11 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
         ereport(ERROR,
                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                  errmsg("cannot cluster a shared catalog")));
+    /*
+     * The CONCURRENT case should have been rejected earlier because it does
+     * not support system catalogs.
+     */
+    Assert(!(OldHeap->rd_rel->relisshared && concurrent));
 
     /*
      * Don't process temp tables of other backends ... their local buffer
@@ -440,7 +682,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
     /* Check heap and index are valid to cluster on */
     if (OidIsValid(indexOid))
     {
-        check_index_is_clusterable(OldHeap, indexOid, AccessExclusiveLock);
+        check_index_is_clusterable(OldHeap, indexOid, lmode);
         /* Open the index (It should already be locked.) */
         index = index_open(indexOid, NoLock);
     }
@@ -455,7 +697,8 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
     if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW &&
         !RelationIsPopulated(OldHeap))
     {
-        relation_close(OldHeap, AccessExclusiveLock);
+        index_close(index, lmode);
+        relation_close(OldHeap, lmode);
         goto out;
     }
 
@@ -468,11 +711,42 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params)
      * invalid, because we move tuples around.  Promote them to relation
      * locks.  Predicate locks on indexes will be promoted when they are
      * reindexed.
+     *
+     * During concurrent processing, the heap as well as its indexes stay in
+     * operation, so we postpone this step until they are locked using
+     * AccessExclusiveLock near the end of the processing.
      */
-    TransferPredicateLocksToHeapRelation(OldHeap);
+    if (!concurrent)
+        TransferPredicateLocksToHeapRelation(OldHeap);
 
     /* rebuild_relation does all the dirty work */
-    rebuild_relation(OldHeap, index, verbose);
+    entered = false;
+    success = false;
+    PG_TRY();
+    {
+        /*
+         * For concurrent processing, make sure other transactions treat this
+         * table as if it was a system / user catalog, and WAL the relevant
+         * additional information. ERROR is raised if another backend is
+         * processing the same table.
+         */
+        if (concurrent)
+        {
+            Relation    *index_p = index ? &index : NULL;
+
+            begin_concurrent_cluster(&OldHeap, index_p, &entered);
+        }
+
+        rebuild_relation(OldHeap, index, verbose,
+                         (params->options & CLUOPT_CONCURRENT) != 0);
+        success = true;
+    }
+    PG_FINALLY();
+    {
+        if (concurrent && entered)
+            end_concurrent_cluster(tableOid, !success);
+    }
+    PG_END_TRY();
 
     /*
      * NB: rebuild_relation does table_close() on OldHeap, and also on index,
@@ -622,18 +896,99 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal)
     table_close(pg_index, RowExclusiveLock);
 }
 
+/*
+ * Check if the CONCURRENTLY option is legal for the relation.
+ */
+bool
+check_relation_is_clusterable_concurrently(Relation rel, int elevel,
+                                           const char *stmt)
+{
+    char    relpersistence, replident;
+    Oid        ident_idx;
+
+    /* Data changes in system relations are not logically decoded. */
+    if (IsCatalogRelation(rel))
+    {
+        ereport(elevel,
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("cannot process relation \"%s\"",
+                        RelationGetRelationName(rel)),
+                 errhint("%s is not supported for catalog relations", stmt)));
+        return false;
+    }
+
+    if (IsToastRelation(rel))
+    {
+        ereport(elevel,
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("cannot process relation \"%s\"",
+                        RelationGetRelationName(rel)),
+                 errhint("%s is not supported for TOAST relations, unless the main relation is processed too",
+                         stmt)));
+        return false;
+    }
+
+    relpersistence = rel->rd_rel->relpersistence;
+    if (relpersistence != RELPERSISTENCE_PERMANENT)
+    {
+        ereport(elevel,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("cannot process relation \"%s\"",
+                        RelationGetRelationName(rel)),
+                 errhint("CLUSTER CONCURRENTLY is only allowed for permanent relations")));
+        return false;
+    }
+
+    /* With NOTHING, WAL does not contain the old tuple. */
+    replident = rel->rd_rel->relreplident;
+    if (replident == REPLICA_IDENTITY_NOTHING)
+    {
+        ereport(elevel,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("cannot process relation \"%s\"",
+                        RelationGetRelationName(rel)),
+                 errhint("relation \"%s\" has insufficient replication identity",
+                         RelationGetRelationName(rel))));
+        return false;
+    }
+
+    /*
+     * Identity index is not set if the replica identity is FULL, but PK might
+     * exist in such a case.
+     */
+    ident_idx = RelationGetReplicaIndex(rel);
+    if (!OidIsValid(ident_idx) && OidIsValid(rel->rd_pkindex))
+        ident_idx = rel->rd_pkindex;
+    if (!OidIsValid(ident_idx))
+    {
+        ereport(elevel,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("cannot process relation \"%s\"",
+                        RelationGetRelationName(rel)),
+                 (errhint("relation \"%s\" has no identity index",
+                          RelationGetRelationName(rel)))));
+        return false;
+    }
+
+    return true;
+}
+
 /*
  * rebuild_relation: rebuild an existing relation in index or physical order
  *
- * OldHeap: table to rebuild --- must be opened and exclusive-locked!
+ * OldHeap: table to rebuild --- must be opened and locked. See cluster_rel()
+ * for comments on the required lock strength.
+ *
  * index: index to cluster by, or NULL to rewrite in physical order. Must be
  * opened and locked.
  *
  * On exit, the heap (and also the index, if one was passed) are closed, but
- * still locked with AccessExclusiveLock.
+ * still locked with AccessExclusiveLock. (The function handles the lock
+ * upgrade if 'concurrent' is true.)
  */
 static void
-rebuild_relation(Relation OldHeap, Relation index, bool verbose)
+rebuild_relation(Relation OldHeap, Relation index, bool verbose,
+                 bool concurrent)
 {
     Oid            tableOid = RelationGetRelid(OldHeap);
     Oid            indexOid = index ? RelationGetRelid(index) : InvalidOid;
@@ -642,19 +997,83 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose)
     Oid            OIDNewHeap;
     Relation    NewHeap;
     char        relpersistence;
-    bool        is_system_catalog;
     bool        swap_toast_by_content;
     TransactionId frozenXid;
     MultiXactId cutoffMulti;
+    NameData    slotname;
+    LogicalDecodingContext *ctx = NULL;
+    Snapshot    snapshot = NULL;
+    CatalogState    *cat_state = NULL;
     LOCKMODE    lmode_new;
 
+    if (concurrent)
+    {
+        TupleDesc    tupdesc;
+        RelReopenInfo    rri[2];
+        int        nrel;
+
+        /*
+         * CLUSTER CONCURRENTLY is not allowed in a transaction block, so this
+         * should never fire.
+         */
+        Assert(GetTopTransactionIdIfAny() == InvalidTransactionId);
+
+        /*
+         * A single backend should not execute multiple CLUSTER commands at a
+         * time, so use PID to make the slot unique.
+         */
+        snprintf(NameStr(slotname), NAMEDATALEN, "cluster_%d", MyProcPid);
+
+        /*
+         * Gather catalog information so that we can check later if the old
+         * relation has not changed while unlocked.
+         *
+         * Since this function also checks if the relation can be processed,
+         * it's important to call it before we setup the logical decoding,
+         * because that can take some time. Not sure if it's necessary to do
+         * it even earlier.
+         */
+        cat_state = get_catalog_state(OldHeap);
+
+        tupdesc = CreateTupleDescCopy(RelationGetDescr(OldHeap));
+
+        /*
+         * Unlock the relation (and possibly the clustering index) to avoid
+         * deadlock because setup_logical_decoding() will wait for all the
+         * running transactions (with XID assigned) to finish. Some of those
+         * transactions might be waiting for a lock on our relation.
+         */
+        nrel = 0;
+        init_rel_reopen_info(&rri[nrel++], &OldHeap, InvalidOid,
+                             LOCK_CLUSTER_CONCURRENT,
+                             LOCK_CLUSTER_CONCURRENT);
+        if (index)
+            init_rel_reopen_info(&rri[nrel++], &index, InvalidOid,
+                                 LOCK_CLUSTER_CONCURRENT,
+                                 LOCK_CLUSTER_CONCURRENT);
+        unlock_and_close_relations(rri, nrel);
+
+        /* Prepare to capture the concurrent data changes. */
+        ctx = setup_logical_decoding(tableOid, NameStr(slotname), tupdesc);
+
+        /* Lock the table (and index) again. */
+        reopen_relations(rri, nrel);
+
+        /*
+         * Check if a 'tupdesc' could have changed while the relation was
+         * unlocked.
+         */
+        check_catalog_changes(OldHeap, cat_state);
+
+        snapshot = SnapBuildInitialSnapshotForCluster(ctx->snapshot_builder);
+    }
+
     if (OidIsValid(indexOid))
         /* Mark the correct index as clustered */
         mark_index_clustered(OldHeap, indexOid, true);
 
     /* Remember info about rel before closing OldHeap */
     relpersistence = OldHeap->rd_rel->relpersistence;
-    is_system_catalog = IsSystemRelation(OldHeap);
 
     /*
      * Create the transient table that will receive the re-ordered data.
@@ -673,31 +1092,52 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose)
                          AccessExclusiveLock : NoLock);
 
     /* Copy the heap data into the new table in the desired order */
-    copy_table_data(NewHeap, OldHeap, index, verbose,
+    copy_table_data(NewHeap, OldHeap, index, snapshot, ctx, verbose,
                     &swap_toast_by_content, &frozenXid, &cutoffMulti);
 
+    if (concurrent)
+    {
+        rebuild_relation_finish_concurrent(NewHeap, OldHeap, index,
+                                           cat_state, ctx,
+                                           swap_toast_by_content,
+                                           frozenXid, cutoffMulti);
+
+        pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
+                                     PROGRESS_CLUSTER_PHASE_FINAL_CLEANUP);
+
+        /* Done with decoding. */
+        FreeSnapshot(snapshot);
+        free_catalog_state(cat_state);
+        cleanup_logical_decoding(ctx);
+        ReplicationSlotRelease();
+        ReplicationSlotDrop(NameStr(slotname), false);
+    }
+    else
+    {
+        bool        is_system_catalog = IsSystemRelation(OldHeap);
 
-    /* Close relcache entries, but keep lock until transaction commit */
-    table_close(OldHeap, NoLock);
-    if (index)
-        index_close(index, NoLock);
+        /* Close relcache entries, but keep lock until transaction commit */
+        table_close(OldHeap, NoLock);
+        if (index)
+            index_close(index, NoLock);
 
-    /*
-     * Close the new relation so it can be dropped as soon as the storage is
-     * swapped. The relation is not visible to others, so we could unlock it
-     * completely, but it's simpler to pass NoLock than to track all the locks
-     * acquired so far.
-     */
-    table_close(NewHeap, NoLock);
+        /*
+         * Close the new relation so it can be dropped as soon as the storage
+         * is swapped. The relation is not visible to others, so we could
+         * unlock it completely, but it's simpler to pass NoLock than to track
+         * all the lock acquired so far.
+         */
+        table_close(NewHeap, NoLock);
 
-    /*
-     * Swap the physical files of the target and transient tables, then
-     * rebuild the target's indexes and throw away the transient table.
-     */
-    finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
-                     swap_toast_by_content, false, true,
-                     frozenXid, cutoffMulti,
-                     relpersistence);
+        /*
+         * Swap the physical files of the target and transient tables, then
+         * rebuild the target's indexes and throw away the transient table.
+         */
+        finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
+                         swap_toast_by_content, false, true, true,
+                         frozenXid, cutoffMulti,
+                         relpersistence);
+    }
 }
 
 
@@ -848,15 +1288,19 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
 /*
  * Do the physical copying of table data.
  *
+ * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass
+ * iff concurrent processing is required.
+ *
  * There are three output parameters:
  * *pSwapToastByContent is set true if toast tables must be swapped by content.
  * *pFreezeXid receives the TransactionId used as freeze cutoff point.
  * *pCutoffMulti receives the MultiXactId used as a cutoff point.
  */
 static void
-copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose,
-                bool *pSwapToastByContent, TransactionId *pFreezeXid,
-                MultiXactId *pCutoffMulti)
+copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
+                Snapshot snapshot, LogicalDecodingContext *decoding_ctx,
+                bool verbose, bool *pSwapToastByContent,
+                TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
 {
     Oid        OIDOldHeap = RelationGetRelid(OldHeap);
     Oid        OIDOldIndex = OldIndex ? RelationGetRelid(OldIndex) : InvalidOid;
@@ -876,6 +1320,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb
     int            elevel = verbose ? INFO : DEBUG2;
     PGRUsage    ru0;
     char       *nspname;
+    bool        concurrent = snapshot != NULL;
 
     pg_rusage_init(&ru0);
 
@@ -902,8 +1347,12 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb
      *
      * We don't need to open the toast relation here, just lock it.  The lock
      * will be held till end of transaction.
+     *
+     * In the CONCURRENT case, the lock does not help because we need to
+     * release it temporarily at some point. Instead, we expect VACUUM /
+     * CLUSTER to skip tables which are present in ClusteredRelsHash.
      */
-    if (OldHeap->rd_rel->reltoastrelid)
+    if (OldHeap->rd_rel->reltoastrelid && !concurrent)
         LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
 
     /*
@@ -979,7 +1428,45 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb
      * provided, else plain seqscan.
      */
     if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
+    {
+        ResourceOwner    oldowner = CurrentResourceOwner;
+
+        /*
+         * In the CONCURRENT case, do the planning in a subtrensaction so that
+         * we don't leave any additional locks behind us that we cannot
+         * release easily.
+         */
+        if (concurrent)
+        {
+            Assert(CheckRelationLockedByMe(OldHeap, LOCK_CLUSTER_CONCURRENT,
+                                           false));
+            Assert(CheckRelationLockedByMe(OldIndex, LOCK_CLUSTER_CONCURRENT,
+                                           false));
+            BeginInternalSubTransaction("plan_cluster_use_sort");
+        }
+
         use_sort = plan_cluster_use_sort(OIDOldHeap, OIDOldIndex);
+
+        if (concurrent)
+        {
+            PgBackendProgress    progress;
+
+            /*
+             * Command progress reporting gets terminated at subtransaction
+             * end. Save the status so it can be eventually restored.
+             */
+            memcpy(&progress, &MyBEEntry->st_progress,
+                   sizeof(PgBackendProgress));
+
+            /* Release the locks by aborting the subtransaction. */
+            RollbackAndReleaseCurrentSubTransaction();
+
+            /* Restore the progress reporting status. */
+            pgstat_progress_restore_state(&progress);
+
+            CurrentResourceOwner = oldowner;
+        }
+    }
     else
         use_sort = false;
 
@@ -1008,7 +1495,9 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb
      * values (e.g. because the AM doesn't use freezing).
      */
     table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
-                                    cutoffs.OldestXmin, &cutoffs.FreezeLimit,
+                                    cutoffs.OldestXmin, snapshot,
+                                    decoding_ctx,
+                                    &cutoffs.FreezeLimit,
                                     &cutoffs.MultiXactCutoff,
                                     &num_tuples, &tups_vacuumed,
                                     &tups_recently_dead);
@@ -1017,7 +1506,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb
     *pFreezeXid = cutoffs.FreezeLimit;
     *pCutoffMulti = cutoffs.MultiXactCutoff;
 
-    /* Reset rd_toastoid just to be tidy --- it shouldn't be looked at again */
+    /*
+     * Reset rd_toastoid just to be tidy --- it shouldn't be looked at
+     * again. In the CONCURRENT case, we need to set it again before applying
+     * the concurrent changes.
+     */
     NewHeap->rd_toastoid = InvalidOid;
 
     num_pages = RelationGetNumberOfBlocks(NewHeap);
@@ -1468,14 +1961,13 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
                  bool swap_toast_by_content,
                  bool check_constraints,
                  bool is_internal,
+                 bool reindex,
                  TransactionId frozenXid,
                  MultiXactId cutoffMulti,
                  char newrelpersistence)
 {
     ObjectAddress object;
     Oid            mapped_tables[4];
-    int            reindex_flags;
-    ReindexParams reindex_params = {0};
     int            i;
 
     /* Report that we are now swapping relation files */
@@ -1501,39 +1993,46 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
     if (is_system_catalog)
         CacheInvalidateCatalog(OIDOldHeap);
 
-    /*
-     * Rebuild each index on the relation (but not the toast table, which is
-     * all-new at this point).  It is important to do this before the DROP
-     * step because if we are processing a system catalog that will be used
-     * during DROP, we want to have its indexes available.  There is no
-     * advantage to the other order anyway because this is all transactional,
-     * so no chance to reclaim disk space before commit.  We do not need a
-     * final CommandCounterIncrement() because reindex_relation does it.
-     *
-     * Note: because index_build is called via reindex_relation, it will never
-     * set indcheckxmin true for the indexes.  This is OK even though in some
-     * sense we are building new indexes rather than rebuilding existing ones,
-     * because the new heap won't contain any HOT chains at all, let alone
-     * broken ones, so it can't be necessary to set indcheckxmin.
-     */
-    reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
-    if (check_constraints)
-        reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
+    if (reindex)
+    {
+        int            reindex_flags;
+        ReindexParams reindex_params = {0};
 
-    /*
-     * Ensure that the indexes have the same persistence as the parent
-     * relation.
-     */
-    if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
-        reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
-    else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
-        reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+        /*
+         * Rebuild each index on the relation (but not the toast table, which
+         * is all-new at this point).  It is important to do this before the
+         * DROP step because if we are processing a system catalog that will
+         * be used during DROP, we want to have its indexes available.  There
+         * is no advantage to the other order anyway because this is all
+         * transactional, so no chance to reclaim disk space before commit.
+         * We do not need a final CommandCounterIncrement() because
+         * reindex_relation does it.
+         *
+         * Note: because index_build is called via reindex_relation, it will never
+         * set indcheckxmin true for the indexes.  This is OK even though in some
+         * sense we are building new indexes rather than rebuilding existing ones,
+         * because the new heap won't contain any HOT chains at all, let alone
+         * broken ones, so it can't be necessary to set indcheckxmin.
+         */
+        reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
+        if (check_constraints)
+            reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
 
-    /* Report that we are now reindexing relations */
-    pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
-                                 PROGRESS_CLUSTER_PHASE_REBUILD_INDEX);
+        /*
+         * Ensure that the indexes have the same persistence as the parent
+         * relation.
+         */
+        if (newrelpersistence == RELPERSISTENCE_UNLOGGED)
+            reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+        else if (newrelpersistence == RELPERSISTENCE_PERMANENT)
+            reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
 
-    reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
+        /* Report that we are now reindexing relations */
+        pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
+                                     PROGRESS_CLUSTER_PHASE_REBUILD_INDEX);
+
+        reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
+    }
 
     /* Report that we are now doing clean up */
     pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
@@ -1773,3 +2272,1938 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid)
                     get_rel_name(relid))));
     return false;
 }
+
+#define REPL_PLUGIN_NAME    "pgoutput_cluster"
+
+/*
+ * Each relation being processed by CLUSTER CONCURRENTLY must be in the
+ * clusteredRels hashtable.
+ */
+typedef struct ClusteredRel
+{
+    Oid        relid;
+    Oid        dbid;
+} ClusteredRel;
+
+static HTAB *ClusteredRelsHash = NULL;
+
+/* Maximum number of entries in the hashtable. */
+static int maxClusteredRels = 0;
+
+Size
+ClusterShmemSize(void)
+{
+    /*
+     * A replication slot is needed for the processing, so use this GUC to
+     * allocate memory for the hashtable. Reserve also space for TOAST
+     * relations.
+     */
+    maxClusteredRels = max_replication_slots * 2;
+
+    return hash_estimate_size(maxClusteredRels, sizeof(ClusteredRel));
+}
+
+void
+ClusterShmemInit(void)
+{
+    HASHCTL        info;
+
+    info.keysize = sizeof(ClusteredRel);
+    info.entrysize = info.keysize;
+
+    ClusteredRelsHash = ShmemInitHash("Clustered Relations",
+                                      maxClusteredRels,
+                                      maxClusteredRels,
+                                      &info,
+                                      HASH_ELEM | HASH_BLOBS);
+}
+
+/*
+ * Perform a preliminary check whether CLUSTER / VACUUM FULL CONCURRENTLY is
+ * possible. Note that here we only check things that should not change if we
+ * release the relation lock temporarily. The information that can change due
+ * to unlocking is checked in get_catalog_state().
+ */
+static void
+check_concurrent_cluster_requirements(Relation rel, bool isTopLevel,
+                                      bool isCluster)
+{
+    const char    *stmt;
+
+    if (isCluster)
+        stmt = "CLUSTER (CONCURRENTLY)";
+    else
+        stmt = "VACUUM (FULL, CONCURRENTLY)";
+
+    /*
+     * Make sure we have no XID assigned, otherwise call of
+     * setup_logical_decoding() can cause a deadlock.
+     */
+    PreventInTransactionBlock(isTopLevel, stmt);
+
+    CheckSlotPermissions();
+
+    /*
+     * Use an existing function to check if we can use logical
+     * decoding. However note that RecoveryInProgress() should already have
+     * caused error, as it does for the non-concurrent VACUUM FULL / CLUSTER.
+     */
+    CheckLogicalDecodingRequirements();
+
+    /* See ClusterShmemSize() */
+    if (max_replication_slots < 2)
+        ereport(ERROR,
+                errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                (errmsg("%s requires \"max_replication_slots\" to be at least 2",
+                        stmt)));
+}
+
+/*
+ * Call this function before CLUSTER CONCURRENTLY starts to setup logical
+ * decoding. It makes sure that other users of the table put enough
+ * information into WAL.
+ *
+ * The point is that on various places we expect that the table we're
+ * processing is treated like a system catalog. For example, we need to be
+ * able to scan it using a "historic snapshot" anytime during the processing
+ * (as opposed to scanning only at the start point of the decoding, logical
+ * replication does during initial table synchronization), in order to apply
+ * concurrent UPDATE / DELETE commands.
+ *
+ * Since we need to close and reopen the relation here, the 'rel_p' and
+ * 'index_p' arguments are in/out.
+ *
+ * 'enter_p' receives a bool value telling whether relation OID was entered
+ * into the hashtable or not.
+ */
+static void
+begin_concurrent_cluster(Relation *rel_p, Relation *index_p,
+                         bool *entered_p)
+{
+    Relation    rel = *rel_p;
+    Oid        relid, toastrelid;
+    ClusteredRel    key, *entry;
+    bool    found;
+    RelReopenInfo    rri[2];
+    int        nrel;
+    static bool before_shmem_exit_callback_setup = false;
+
+    relid = RelationGetRelid(rel);
+
+    /*
+     * Make sure that we do not leave an entry in ClusteredRelsHash if exiting
+     * due to FATAL.
+     */
+    if (!before_shmem_exit_callback_setup)
+    {
+        before_shmem_exit(cluster_before_shmem_exit_callback, 0);
+        before_shmem_exit_callback_setup = true;
+    }
+
+    memset(&key, 0, sizeof(key));
+    key.relid = relid;
+    key.dbid = MyDatabaseId;
+
+    *entered_p = false;
+    LWLockAcquire(ClusteredRelsLock, LW_EXCLUSIVE);
+    entry = (ClusteredRel *)
+        hash_search(ClusteredRelsHash, &key, HASH_ENTER_NULL, &found);
+    if (found)
+    {
+        /*
+         * Since CLUSTER CONCURRENTLY takes ShareRowExclusiveLock, a conflict
+         * should occur much earlier. However that lock may be released
+         * temporarily, see below.  Anyway, we should complain whatever the
+         * reason of the conflict might be.
+         */
+        ereport(ERROR,
+                (errmsg(CLUSTER_IN_PROGRESS_MESSAGE,
+                        RelationGetRelationName(rel))));
+    }
+    if (entry == NULL)
+        ereport(ERROR,
+                (errmsg("too many requests for CLUSTER CONCURRENTLY at a time")),
+                (errhint("consider increasing the \"max_replication_slots\" configuration parameter")));
+
+    /*
+     * Even if the insertion of TOAST relid should fail below, the caller has
+     * to do cleanup.
+     */
+    *entered_p = true;
+
+    /*
+     * Enable the callback to remove the entry in case of exit. We should not
+     * do this earlier, otherwise an attempt to insert already existing entry
+     * could make us remove that entry (inserted by another backend) during
+     * ERROR handling.
+     */
+    Assert(!OidIsValid(clustered_rel));
+    clustered_rel = relid;
+
+    /*
+     * TOAST relation is not accessed using historic snapshot, but we enter it
+     * here to protect it from being VACUUMed by another backend. (Lock does
+     * not help in the CONCURRENT case because cannot hold it continuously
+     * till the end of the transaction.) See the comments on locking TOAST
+     * relation in copy_table_data().
+     */
+    toastrelid = rel->rd_rel->reltoastrelid;
+    if (OidIsValid(toastrelid))
+    {
+        key.relid = toastrelid;
+        entry = (ClusteredRel *)
+            hash_search(ClusteredRelsHash, &key, HASH_ENTER_NULL, &found);
+        if (found)
+            /*
+             * If we could enter the main fork the TOAST should succeed
+             * too. Nevertheless, check.
+             */
+            ereport(ERROR,
+                    (errmsg("TOAST relation of \"%s\" is already being processed by CLUSTER CONCURRENTLY",
+                            RelationGetRelationName(rel))));
+        if (entry == NULL)
+            ereport(ERROR,
+                    (errmsg("too many requests for CLUSTER CONCURRENT at a time")),
+                    (errhint("consider increasing the \"max_replication_slots\" configuration parameter")));
+
+        Assert(!OidIsValid(clustered_rel_toast));
+        clustered_rel_toast = toastrelid;
+    }
+    LWLockRelease(ClusteredRelsLock);
+
+    /*
+     * Make sure that other backends are aware of the new hash entry.
+     *
+     * Besides sending the invalidation message, we need to force re-opening
+     * of the relation, which includes the actual invalidation (and thus
+     * checking of our hashtable on the next access).
+     */
+    CacheInvalidateRelcacheImmediate(rel);
+    /*
+     * Since the hashtable only needs to be checked by write transactions,
+     * lock the relation in a mode that conflicts with any DML command. (The
+     * reading transactions are supposed to close the relation before opening
+     * it with higher lock.) Once we have the relation (and its index) locked,
+     * we unlock it immediately and then re-lock using the original mode.
+     */
+    nrel = 0;
+    init_rel_reopen_info(&rri[nrel++], rel_p, InvalidOid,
+                         LOCK_CLUSTER_CONCURRENT, ShareLock);
+    if (index_p)
+    {
+        /*
+         * Another transaction might want to open both the relation and the
+         * index. If it already has the relation lock and is waiting for the
+         * index lock, we should release the index lock, otherwise our request
+         * for ShareLock on the relation can end up in a deadlock.
+         */
+        init_rel_reopen_info(&rri[nrel++], index_p, InvalidOid,
+                             LOCK_CLUSTER_CONCURRENT, ShareLock);
+    }
+    unlock_and_close_relations(rri, nrel);
+    /*
+     * XXX It's not strictly necessary to lock the index here, but it's
+     * probably not worth teaching the "reopen API" about this special case.
+     */
+    reopen_relations(rri, nrel);
+
+    /* Switch back to the original lock. */
+    nrel = 0;
+    init_rel_reopen_info(&rri[nrel++], rel_p, InvalidOid,
+                         ShareLock, LOCK_CLUSTER_CONCURRENT);
+    if (index_p)
+        init_rel_reopen_info(&rri[nrel++], index_p, InvalidOid,
+                             ShareLock, LOCK_CLUSTER_CONCURRENT);
+    unlock_and_close_relations(rri, nrel);
+    reopen_relations(rri, nrel);
+}
+
+/*
+ * Call this when done with CLUSTER CONCURRENTLY.
+ *
+ * 'error' tells whether the function is being called in order to handle
+ * error.
+ */
+static void
+end_concurrent_cluster(Oid relid, bool error)
+{
+    ClusteredRel    key, *entry, *entry_toast = NULL;
+
+    /* Remove the relation from the hash. */
+    memset(&key, 0, sizeof(key));
+    key.relid = relid;
+    key.dbid = MyDatabaseId;
+    LWLockAcquire(ClusteredRelsLock, LW_EXCLUSIVE);
+    entry = hash_search(ClusteredRelsHash, &key, HASH_REMOVE, NULL);
+
+    /* Disable end_concurrent_cluster_on_exit_callback(). */
+    if (OidIsValid(clustered_rel))
+        clustered_rel = InvalidOid;
+
+    /* Remove the TOAST relation if there is one. */
+    if (OidIsValid(clustered_rel_toast))
+    {
+        key.relid = clustered_rel_toast;
+        entry_toast = hash_search(ClusteredRelsHash, &key, HASH_REMOVE,
+                                  NULL);
+
+        clustered_rel_toast = InvalidOid;
+    }
+    else
+        key.relid = InvalidOid;
+    LWLockRelease(ClusteredRelsLock);
+
+    /*
+     * On normal completion (!error), we should not really fail to remove the
+     * entry. But if it did for any reason, make sure the transaction is
+     * aborted: if other transactions, while changing the contents of the
+     * relation, didn't know that CLUSTER CONCURRENTLY was in progress, they
+     * could have missed to WAL enough information, and thus we could have
+     * produced an inconsistent table contents.
+     *
+     * On the other hand, if we are already handling an error, there's no
+     * reason to worry about inconsistent contents of the new storage because
+     * the transaction is going to be rolled back anyway. Furthermore, by
+     * raising ERROR here we'd shadow the original error.
+     */
+    if (!error)
+    {
+        char    *relname;
+
+        if (entry == NULL)
+        {
+            relname = get_rel_name(relid);
+            if (!relname)
+                ereport(ERROR,
+                        (errmsg("cache lookup failed for relation %u",
+                                relid)));
+
+            ereport(ERROR,
+                    (errmsg("relation \"%s\" not found among clustered relations",
+                            relname)));
+        }
+
+        /*
+         * Missing TOAST relation indicates that it could have been VACUUMed
+         * or CLUSTERed by another backend while we did not hold a lock on it.
+         */
+        if (entry_toast == NULL && OidIsValid(key.relid))
+        {
+            relname = get_rel_name(key.relid);
+            if (!relname)
+                ereport(ERROR,
+                        (errmsg("cache lookup failed for relation %u",
+                                key.relid)));
+
+            ereport(ERROR,
+                    (errmsg("relation \"%s\" not found among clustered relations",
+                            relname)));
+        }
+    }
+
+    /*
+     * Note: unlike begin_concurrent_cluster(), here we do not lock/unlock the
+     * relation: 1) On normal completion, the caller is already holding
+     * AccessExclusiveLock (till the end of the transaction), 2) on ERROR /
+     * FATAL, we try to do the cleanup asap, but the worst case is that other
+     * backends will write unnecessary information to WAL until they close the
+     * relation.
+     */
+}
+
+/*
+ * A wrapper to call end_concurrent_cluster() as a before_shmem_exit callback.
+ */
+static void
+cluster_before_shmem_exit_callback(int code, Datum arg)
+{
+    if (OidIsValid(clustered_rel) || OidIsValid(clustered_rel_toast))
+        end_concurrent_cluster(clustered_rel, true);
+}
+
+/*
+ * Check if relation is currently being processed by CLUSTER CONCURRENTLY.
+ */
+bool
+is_concurrent_cluster_in_progress(Oid relid)
+{
+    ClusteredRel    key, *entry;
+
+    memset(&key, 0, sizeof(key));
+    key.relid = relid;
+    key.dbid = MyDatabaseId;
+
+    LWLockAcquire(ClusteredRelsLock, LW_SHARED);
+    entry = (ClusteredRel *)
+        hash_search(ClusteredRelsHash, &key, HASH_FIND, NULL);
+    LWLockRelease(ClusteredRelsLock);
+
+    return entry != NULL;
+}
+
+/*
+ * Check if VACUUM FULL / CLUSTER CONCURRENTLY is already running for given
+ * relation, and if so, raise ERROR. The problem is that cluster_rel() needs
+ * to release its lock on the relation temporarily at some point, so our lock
+ * alone does not help. Commands that might break what cluster_rel() is doing
+ * should call this function first.
+ *
+ * Return without checking if lockmode allows for race conditions which would
+ * make the result meaningless. In that case, cluster_rel() itself should
+ * throw ERROR if the relation was changed by us in an incompatible
+ * way. However, if it managed to do most of its work by then, a lot of CPU
+ * time might be wasted.
+ */
+void
+check_for_concurrent_cluster(Oid relid, LOCKMODE lockmode)
+{
+    /*
+     * If the caller does not have a lock that conflicts with
+     * LOCK_CLUSTER_CONCURRENT, the check makes little sense because the
+     * VACUUM FULL / CLUSTER CONCURRENTLY can start anytime after the check.
+     */
+    if (lockmode < LOCK_CLUSTER_CONCURRENT)
+        return;
+
+    if (is_concurrent_cluster_in_progress(relid))
+        ereport(ERROR,
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg(CLUSTER_IN_PROGRESS_MESSAGE,
+                        get_rel_name(relid))));
+
+}
+
+/*
+ * Check if relation is eligible for CLUSTER CONCURRENTLY and retrieve the
+ * catalog state to be passed later to check_catalog_changes.
+ *
+ * Caller is supposed to hold (at least) LOCK_CLUSTER_CONCURRENT on the
+ * relation.
+ */
+static CatalogState *
+get_catalog_state(Relation rel)
+{
+    CatalogState    *result = palloc_object(CatalogState);
+    List    *ind_oids;
+    ListCell    *lc;
+    int        ninds, i;
+    Oid        reltoastrelid = rel->rd_rel->reltoastrelid;
+    char    relpersistence = rel->rd_rel->relpersistence;
+    char    replident = rel->rd_rel->relreplident;
+    Oid        ident_idx = RelationGetReplicaIndex(rel);
+    TupleDesc    td_src = RelationGetDescr(rel);
+
+    /*
+     * While gathering the catalog information, check if there is a reason not
+     * to proceed.
+     */
+    check_relation_is_clusterable_concurrently(rel, ERROR,
+                                               "CLUSTER (CONCURRENTLY)");
+
+    /*
+     * TOAST should not really change, but be careful. If it did, we would be
+     * unable to remove the new one from ClusteredRelsHash.
+     */
+    if (OidIsValid(clustered_rel_toast) &&
+        clustered_rel_toast != reltoastrelid)
+        ereport(ERROR,
+                (errmsg("TOAST relation changed by another transaction")));
+
+    /* No index should be dropped while we are checking it. */
+    Assert(CheckRelationLockedByMe(rel, ShareUpdateExclusiveLock, true));
+
+    ind_oids = RelationGetIndexList(rel);
+    result->ninds = ninds = list_length(ind_oids);
+    result->ind_oids = palloc_array(Oid, ninds);
+    result->ind_tupdescs = palloc_array(TupleDesc, ninds);
+    i = 0;
+    foreach(lc, ind_oids)
+    {
+        Oid    ind_oid = lfirst_oid(lc);
+        Relation    index;
+        TupleDesc    td_src, td_dst;
+
+        /*
+         * Weaker lock should be o.k. for the index, but this one should break
+         * anything either.
+         */
+        index = index_open(ind_oid, ShareUpdateExclusiveLock);
+
+        result->ind_oids[i] = RelationGetRelid(index);
+        td_src = RelationGetDescr(index);
+        td_dst = palloc(TupleDescSize(td_src));
+        TupleDescCopy(td_dst, td_src);
+        result->ind_tupdescs[i] = td_dst;
+        i++;
+
+        index_close(index, ShareUpdateExclusiveLock);
+    }
+
+    /* Fill-in the relation info. */
+    result->tupdesc = palloc(TupleDescSize(td_src));
+    TupleDescCopy(result->tupdesc, td_src);
+    result->reltoastrelid = reltoastrelid;
+    result->relpersistence = relpersistence;
+    result->replident = replident;
+    result->replidindex = ident_idx;
+
+    return    result;
+}
+
+static void
+free_catalog_state(CatalogState *state)
+{
+    /* We are only interested in indexes. */
+    if (state->ninds == 0)
+        return;
+
+    for (int i = 0; i < state->ninds; i++)
+        FreeTupleDesc(state->ind_tupdescs[i]);
+
+    FreeTupleDesc(state->tupdesc);
+    pfree(state->ind_oids);
+    pfree(state->ind_tupdescs);
+    pfree(state);
+}
+
+/*
+ * Raise ERROR if 'rel' changed in a way that does not allow further
+ * processing of CLUSTER CONCURRENTLY.
+ *
+ * Besides the relation's tuple descriptor, it's important to check indexes:
+ * concurrent change of index definition (can it happen in other way than
+ * dropping and re-creating the index, accidentally with the same OID?) can be
+ * a problem because we may already have the new index built. If an index was
+ * created or dropped concurrently, we'd fail to swap the index storage. In
+ * any case, we prefer to check the indexes early to get an explicit error
+ * message about the mismatch. Furthermore, the earlier we detect the change,
+ * the fewer CPU cycles we waste.
+ *
+ * Note that we do not check constraints because the transaction which changed
+ * them must have ensured that the existing tuples satisfy the new
+ * constraints. If any DML commands were necessary for that, we will simply
+ * decode them from WAL and apply them to the new storage.
+ *
+ * Caller is supposed to hold (at least) ShareUpdateExclusiveLock on the
+ * relation.
+ */
+static void
+check_catalog_changes(Relation rel, CatalogState *cat_state)
+{
+    List    *ind_oids;
+    ListCell    *lc;
+    LOCKMODE    lmode;
+    Oid        ident_idx;
+    TupleDesc    td, td_cp;
+
+    /* First, check the relation info. */
+
+    /* TOAST is not easy to change, but check. */
+    if (rel->rd_rel->reltoastrelid != cat_state->reltoastrelid)
+        ereport(ERROR,
+                errmsg("TOAST relation of relation \"%s\" changed by another transaction",
+                       RelationGetRelationName(rel)));
+
+    if (rel->rd_rel->relpersistence != cat_state->relpersistence)
+        ereport(ERROR,
+                errmsg("persistence of relation \"%s\" changed by another transaction",
+                       RelationGetRelationName(rel)));
+
+    if (cat_state->replident != rel->rd_rel->relreplident)
+        ereport(ERROR,
+                errmsg("replica identity of relation \"%s\" changed by another transaction",
+                       RelationGetRelationName(rel)));
+
+    ident_idx = RelationGetReplicaIndex(rel);
+    if (ident_idx == InvalidOid && rel->rd_pkindex != InvalidOid)
+        ident_idx = rel->rd_pkindex;
+    if (cat_state->replidindex != ident_idx)
+        ereport(ERROR,
+                errmsg("identity index of relation \"%s\" changed by another transaction",
+                       RelationGetRelationName(rel)));
+
+    /*
+     * As cat_state contains a copy (which has the constraint info cleared),
+     * create a temporary copy for the comparison.
+     */
+    td = RelationGetDescr(rel);
+    td_cp = palloc(TupleDescSize(td));
+    TupleDescCopy(td_cp, td);
+    if (!equalTupleDescs(cat_state->tupdesc, td_cp))
+        ereport(ERROR,
+                errmsg("definition of relation \"%s\" changed by another transaction",
+                       RelationGetRelationName(rel)));
+    FreeTupleDesc(td_cp);
+
+    /* Now we are only interested in indexes. */
+    if (cat_state->ninds == 0)
+        return;
+
+    /* No index should be dropped while we are checking the relation. */
+    lmode = ShareUpdateExclusiveLock;
+    Assert(CheckRelationLockedByMe(rel, lmode, true));
+
+    ind_oids = RelationGetIndexList(rel);
+    if (list_length(ind_oids) != cat_state->ninds)
+        goto failed_index;
+
+    foreach(lc, ind_oids)
+    {
+        Oid    ind_oid = lfirst_oid(lc);
+        int        i;
+        TupleDesc    tupdesc;
+        Relation    index;
+
+        /* Find the index in cat_state. */
+        for (i = 0; i < cat_state->ninds; i++)
+        {
+            if (cat_state->ind_oids[i] == ind_oid)
+                break;
+        }
+        /*
+         * OID not found, i.e. the index was replaced by another one. XXX
+         * Should we yet try to find if an index having the desired tuple
+         * descriptor exists? Or should we always look for the tuple
+         * descriptor and not use OIDs at all?
+         */
+        if (i == cat_state->ninds)
+            goto failed_index;
+
+        /* Check the tuple descriptor. */
+        index = try_index_open(ind_oid, lmode);
+        if (index == NULL)
+            goto failed_index;
+        tupdesc = RelationGetDescr(index);
+        if (!equalTupleDescs(cat_state->ind_tupdescs[i], tupdesc))
+            goto failed_index;
+        index_close(index, lmode);
+    }
+
+    return;
+
+failed_index:
+    ereport(ERROR,
+            (errmsg("index(es) of relation \"%s\" changed by another transaction",
+                    RelationGetRelationName(rel))));
+}
+
+/*
+ * This function is much like pg_create_logical_replication_slot() except that
+ * the new slot is neither released (if anyone else could read changes from
+ * our slot, we could miss changes other backends do while we copy the
+ * existing data into temporary table), nor persisted (it's easier to handle
+ * crash by restarting all the work from scratch).
+ *
+ * XXX Even though CreateInitDecodingContext() does not set state to
+ * RS_PERSISTENT, it does write the slot to disk. We rely on
+ * RestoreSlotFromDisk() to delete ephemeral slots during startup. (Both ERROR
+ * and FATAL should lead to cleanup even before the cluster goes down.)
+ */
+static LogicalDecodingContext *
+setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc)
+{
+    LogicalDecodingContext *ctx;
+    ClusterDecodingState *dstate;
+
+    /* RS_TEMPORARY so that the slot gets cleaned up on ERROR. */
+    ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, false, false);
+
+    /*
+     * Neither prepare_write nor do_write callback nor update_progress is
+     * useful for us.
+     *
+     * Regarding the value of need_full_snapshot, we pass false because the
+     * table we are processing is present in ClusteredRelsHash and therefore,
+     * regarding logical decoding, treated like a catalog.
+     */
+    ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
+                                    NIL,
+                                    false,
+                                    InvalidXLogRecPtr,
+                                    XL_ROUTINE(.page_read = read_local_xlog_page,
+                                               .segment_open = wal_segment_open,
+                                               .segment_close = wal_segment_close),
+                                    NULL, NULL, NULL);
+
+    /*
+     * We don't have control on setting fast_forward, so at least check it.
+     */
+    Assert(!ctx->fast_forward);
+
+    DecodingContextFindStartpoint(ctx);
+
+    /* Some WAL records should have been read. */
+    Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr);
+
+    XLByteToSeg(ctx->reader->EndRecPtr, cluster_current_segment,
+                wal_segment_size);
+
+    /*
+     * Setup structures to store decoded changes.
+     */
+    dstate = palloc0(sizeof(ClusterDecodingState));
+    dstate->relid = relid;
+    dstate->tstore = tuplestore_begin_heap(false, false,
+                                           maintenance_work_mem);
+    dstate->tupdesc = tupdesc;
+
+    /* Initialize the descriptor to store the changes ... */
+    dstate->tupdesc_change = CreateTemplateTupleDesc(1);
+
+    TupleDescInitEntry(dstate->tupdesc_change, 1, NULL, BYTEAOID, -1, 0);
+    /* ... as well as the corresponding slot. */
+    dstate->tsslot = MakeSingleTupleTableSlot(dstate->tupdesc_change,
+                                              &TTSOpsMinimalTuple);
+
+    dstate->resowner = ResourceOwnerCreate(CurrentResourceOwner,
+                                           "logical decoding");
+
+    ctx->output_writer_private = dstate;
+    return ctx;
+}
+
+/*
+ * Retrieve tuple from a change structure. As for the change, no alignment is
+ * assumed.
+ */
+static HeapTuple
+get_changed_tuple(ConcurrentChange *change)
+{
+    HeapTupleData tup_data;
+    HeapTuple    result;
+    char       *src;
+
+    /*
+     * Ensure alignment before accessing the fields. (This is why we can't use
+     * heap_copytuple() instead of this function.)
+     */
+    memcpy(&tup_data, &change->tup_data, sizeof(HeapTupleData));
+
+    result = (HeapTuple) palloc(HEAPTUPLESIZE + tup_data.t_len);
+    memcpy(result, &tup_data, sizeof(HeapTupleData));
+    result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE);
+    src = (char *) change + sizeof(ConcurrentChange);
+    memcpy(result->t_data, src, result->t_len);
+
+    return result;
+}
+
+/*
+ * Decode logical changes from the WAL sequence up to end_of_wal.
+ */
+void
+cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
+                                  XLogRecPtr end_of_wal)
+{
+    ClusterDecodingState *dstate;
+    ResourceOwner resowner_old;
+    PgBackendProgress    progress;
+
+    /*
+     * Invalidate the "present" cache before moving to "(recent) history".
+     */
+    InvalidateSystemCaches();
+
+    dstate = (ClusterDecodingState *) ctx->output_writer_private;
+    resowner_old = CurrentResourceOwner;
+    CurrentResourceOwner = dstate->resowner;
+
+    /*
+     * reorderbuffer.c uses internal subtransaction, whose abort ends the
+     * command progress reporting. Save the status here so we can restore when
+     * done with the decoding.
+     */
+    memcpy(&progress, &MyBEEntry->st_progress, sizeof(PgBackendProgress));
+
+    PG_TRY();
+    {
+        while (ctx->reader->EndRecPtr < end_of_wal)
+        {
+            XLogRecord *record;
+            XLogSegNo    segno_new;
+            char       *errm = NULL;
+            XLogRecPtr    end_lsn;
+
+            record = XLogReadRecord(ctx->reader, &errm);
+            if (errm)
+                elog(ERROR, "%s", errm);
+
+            if (record != NULL)
+                LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+            /*
+             * If WAL segment boundary has been crossed, inform the decoding
+             * system that the catalog_xmin can advance. (We can confirm more
+             * often, but a filling a single WAL segment should not take much
+             * time.)
+             */
+            end_lsn = ctx->reader->EndRecPtr;
+            XLByteToSeg(end_lsn, segno_new, wal_segment_size);
+            if (segno_new != cluster_current_segment)
+            {
+                LogicalConfirmReceivedLocation(end_lsn);
+                elog(DEBUG1, "cluster: confirmed receive location %X/%X",
+                     (uint32) (end_lsn >> 32), (uint32) end_lsn);
+                cluster_current_segment = segno_new;
+            }
+
+            CHECK_FOR_INTERRUPTS();
+        }
+        InvalidateSystemCaches();
+        CurrentResourceOwner = resowner_old;
+    }
+    PG_CATCH();
+    {
+        InvalidateSystemCaches();
+        CurrentResourceOwner = resowner_old;
+        PG_RE_THROW();
+    }
+    PG_END_TRY();
+
+    /* Restore the progress reporting status. */
+    pgstat_progress_restore_state(&progress);
+}
+
+/*
+ * Apply changes that happened during the initial load.
+ *
+ * Scan key is passed by caller, so it does not have to be constructed
+ * multiple times. Key entries have all fields initialized, except for
+ * sk_argument.
+ */
+static void
+apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
+                         ScanKey key, int nkeys, IndexInsertState *iistate)
+{
+    TupleTableSlot *index_slot, *ident_slot;
+    HeapTuple    tup_old = NULL;
+
+    if (dstate->nchanges == 0)
+        return;
+
+    /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */
+    index_slot = MakeSingleTupleTableSlot(dstate->tupdesc, &TTSOpsHeapTuple);
+    iistate->econtext->ecxt_scantuple = index_slot;
+
+    /* A slot to fetch tuples from identity index. */
+    ident_slot = table_slot_create(rel, NULL);
+
+    while (tuplestore_gettupleslot(dstate->tstore, true, false,
+                                   dstate->tsslot))
+    {
+        bool        shouldFree;
+        HeapTuple    tup_change,
+                    tup,
+                    tup_exist;
+        char       *change_raw;
+        ConcurrentChange *change;
+        Snapshot    snapshot;
+        bool        isnull[1];
+        Datum        values[1];
+
+        CHECK_FOR_INTERRUPTS();
+
+        /* Get the change from the single-column tuple. */
+        tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
+        heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull);
+        Assert(!isnull[0]);
+
+        /* This is bytea, but char* is easier to work with. */
+        change_raw = (char *) DatumGetByteaP(values[0]);
+
+        change = (ConcurrentChange *) VARDATA(change_raw);
+
+        /* TRUNCATE change contains no tuple, so process it separately. */
+        if (change->kind == CHANGE_TRUNCATE)
+        {
+            /*
+             * All the things that ExecuteTruncateGuts() does (such as firing
+             * triggers or handling the DROP_CASCADE behavior) should have
+             * taken place on the source relation. Thus we only do the actual
+             * truncation of the new relation (and its indexes).
+             */
+            heap_truncate_one_rel(rel);
+
+            pfree(tup_change);
+            continue;
+        }
+
+        /*
+         * Extract the tuple from the change. The tuple is copied here because
+         * it might be assigned to 'tup_old', in which case it needs to
+         * survive into the next iteration.
+         */
+        tup = get_changed_tuple(change);
+
+        if (change->kind == CHANGE_UPDATE_OLD)
+        {
+            Assert(tup_old == NULL);
+            tup_old = tup;
+        }
+        else if (change->kind == CHANGE_INSERT)
+        {
+            Assert(tup_old == NULL);
+
+            apply_concurrent_insert(rel, change, tup, iistate, index_slot);
+
+            pfree(tup);
+        }
+        else if (change->kind == CHANGE_UPDATE_NEW ||
+                 change->kind == CHANGE_DELETE)
+        {
+            IndexScanDesc    ind_scan = NULL;
+            HeapTuple    tup_key;
+
+            if (change->kind == CHANGE_UPDATE_NEW)
+            {
+                tup_key = tup_old != NULL ? tup_old : tup;
+            }
+            else
+            {
+                Assert(tup_old == NULL);
+                tup_key = tup;
+            }
+
+            /*
+             * Find the tuple to be updated or deleted.
+             *
+             * As the table being CLUSTERed concurrently is considered an
+             * "user catalog", new CID is WAL-logged and decoded. And since we
+             * use the same XID that the original DMLs did, the snapshot used
+             * for the logical decoding (by now converted to a non-historic
+             * MVCC snapshot) should see the tuples inserted previously into
+             * the new heap and/or updated there.
+             */
+            snapshot = change->snapshot;
+
+            /*
+             * Set what should be considered current transaction (and
+             * subtransactions) during visibility check.
+             *
+             * Note that this snapshot was created from a historic snapshot
+             * using SnapBuildMVCCFromHistoric(), which does not touch
+             * 'subxip'. Thus, unlike in a regular MVCC snapshot, the array
+             * does not contain (sub)transactions other than the one whose
+             * data changes we are applying.
+             */
+            SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+            tup_exist = find_target_tuple(rel, key, nkeys, tup_key, snapshot,
+                                          iistate, ident_slot, &ind_scan);
+            if (tup_exist == NULL)
+                elog(ERROR, "Failed to find target tuple");
+
+            if (change->kind == CHANGE_UPDATE_NEW)
+                apply_concurrent_update(rel, tup, tup_exist, change, iistate,
+                                        index_slot);
+            else
+                apply_concurrent_delete(rel, tup_exist, change);
+
+            ResetClusterCurrentXids();
+
+            if (tup_old != NULL)
+            {
+                pfree(tup_old);
+                tup_old = NULL;
+            }
+
+            pfree(tup);
+            index_endscan(ind_scan);
+        }
+        else
+            elog(ERROR, "Unrecognized kind of change: %d", change->kind);
+
+        /* Free the snapshot if this is the last change that needed it. */
+        Assert(change->snapshot->active_count > 0);
+        change->snapshot->active_count--;
+        if (change->snapshot->active_count == 0)
+        {
+            if (change->snapshot == dstate->snapshot)
+                dstate->snapshot = NULL;
+            FreeSnapshot(change->snapshot);
+        }
+
+        /* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
+        Assert(shouldFree);
+        pfree(tup_change);
+    }
+
+    tuplestore_clear(dstate->tstore);
+    dstate->nchanges = 0;
+
+    /* Cleanup. */
+    ExecDropSingleTupleTableSlot(index_slot);
+    ExecDropSingleTupleTableSlot(ident_slot);
+}
+
+static void
+apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
+                        IndexInsertState *iistate, TupleTableSlot *index_slot)
+{
+    Snapshot    snapshot = change->snapshot;
+    List       *recheck;
+
+    /*
+     * For INSERT, the visibility information is not important, but we use the
+     * snapshot to get CID. Index functions might need the whole snapshot
+     * anyway.
+     */
+    SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+    /*
+     * Write the tuple into the new heap.
+     *
+     * The snapshot is the one we used to decode the insert (though converted
+     * to "non-historic" MVCC snapshot), i.e. the snapshot's curcid is the
+     * tuple CID incremented by one (due to the "new CID" WAL record that got
+     * written along with the INSERT record). Thus if we want to use the
+     * original CID, we need to subtract 1 from curcid.
+     */
+    Assert(snapshot->curcid != InvalidCommandId &&
+           snapshot->curcid > FirstCommandId);
+
+    heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
+                HEAP_INSERT_NO_LOGICAL, NULL);
+
+    /*
+     * Update indexes.
+     *
+     * In case functions in the index need the active snapshot and caller
+     * hasn't set one.
+     */
+    PushActiveSnapshot(snapshot);
+    ExecStoreHeapTuple(tup, index_slot, false);
+    recheck = ExecInsertIndexTuples(iistate->rri,
+                                    index_slot,
+                                    iistate->estate,
+                                    false,    /* update */
+                                    false,    /* noDupErr */
+                                    NULL,    /* specConflict */
+                                    NIL, /* arbiterIndexes */
+                                    false    /* onlySummarizing */
+        );
+    PopActiveSnapshot();
+    ResetClusterCurrentXids();
+
+    /*
+     * If recheck is required, it must have been preformed on the source
+     * relation by now. (All the logical changes we process here are already
+     * committed.)
+     */
+    list_free(recheck);
+
+    pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED, 1);
+}
+
+static void
+apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
+                        ConcurrentChange *change, IndexInsertState *iistate,
+                        TupleTableSlot *index_slot)
+{
+    List       *recheck;
+    LockTupleMode    lockmode;
+    TU_UpdateIndexes    update_indexes;
+    ItemPointerData        tid_old_new_heap;
+    TM_Result    res;
+    Snapshot snapshot    = change->snapshot;
+    TM_FailureData tmfd;
+
+    /* Location of the existing tuple in the new heap. */
+    ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
+
+    /*
+     * Write the new tuple into the new heap. ('tup' gets the TID assigned
+     * here.)
+     *
+     * Regarding CID, see the comment in apply_concurrent_insert().
+     */
+    Assert(snapshot->curcid != InvalidCommandId &&
+           snapshot->curcid > FirstCommandId);
+
+    res = heap_update(rel, &tid_old_new_heap, tup,
+                      change->xid, snapshot->curcid - 1,
+                      InvalidSnapshot,
+                      false, /* no wait - only we are doing changes */
+                      &tmfd, &lockmode, &update_indexes,
+                      /* wal_logical */
+                      false);
+    if (res != TM_Ok)
+        ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
+
+    ExecStoreHeapTuple(tup, index_slot, false);
+
+    if (update_indexes != TU_None)
+    {
+        PushActiveSnapshot(snapshot);
+        recheck = ExecInsertIndexTuples(iistate->rri,
+                                        index_slot,
+                                        iistate->estate,
+                                        true,    /* update */
+                                        false,    /* noDupErr */
+                                        NULL,    /* specConflict */
+                                        NIL, /* arbiterIndexes */
+                                        /* onlySummarizing */
+                                        update_indexes == TU_Summarizing);
+        PopActiveSnapshot();
+        list_free(recheck);
+    }
+
+    pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_UPDATED, 1);
+}
+
+static void
+apply_concurrent_delete(Relation rel, HeapTuple tup_target,
+                        ConcurrentChange *change)
+{
+    ItemPointerData        tid_old_new_heap;
+    TM_Result    res;
+    TM_FailureData tmfd;
+    Snapshot    snapshot = change->snapshot;
+
+    /* Regarding CID, see the comment in apply_concurrent_insert(). */
+    Assert(snapshot->curcid != InvalidCommandId &&
+           snapshot->curcid > FirstCommandId);
+
+    /* Location of the existing tuple in the new heap. */
+    ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
+
+    res = heap_delete(rel, &tid_old_new_heap, change->xid,
+                      snapshot->curcid - 1, InvalidSnapshot, false,
+                      &tmfd, false,
+                      /* wal_logical */
+                      false);
+
+    if (res != TM_Ok)
+        ereport(ERROR, (errmsg("failed to apply concurrent DELETE")));
+
+    pgstat_progress_incr_param(PROGRESS_CLUSTER_HEAP_TUPLES_DELETED, 1);
+}
+
+/*
+ * Find the tuple to be updated or deleted.
+ *
+ * 'key' is a pre-initialized scan key, into which the function will put the
+ * key values.
+ *
+ * 'tup_key' is a tuple containing the key values for the scan.
+ *
+ * On exit,'*scan_p' contains the scan descriptor used. The caller must close
+ * it when he no longer needs the tuple returned.
+ */
+static HeapTuple
+find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
+                  Snapshot snapshot, IndexInsertState *iistate,
+                  TupleTableSlot *ident_slot, IndexScanDesc *scan_p)
+{
+    IndexScanDesc scan;
+    Form_pg_index ident_form;
+    int2vector *ident_indkey;
+    HeapTuple    result = NULL;
+
+    scan = index_beginscan(rel, iistate->ident_index, snapshot,
+                           nkeys, 0);
+    *scan_p = scan;
+    index_rescan(scan, key, nkeys, NULL, 0);
+
+    /* Info needed to retrieve key values from heap tuple. */
+    ident_form = iistate->ident_index->rd_index;
+    ident_indkey = &ident_form->indkey;
+
+    /* Use the incoming tuple to finalize the scan key. */
+    for (int i = 0; i < scan->numberOfKeys; i++)
+    {
+        ScanKey        entry;
+        bool        isnull;
+        int16        attno_heap;
+
+        entry = &scan->keyData[i];
+        attno_heap = ident_indkey->values[i];
+        entry->sk_argument = heap_getattr(tup_key,
+                                          attno_heap,
+                                          rel->rd_att,
+                                          &isnull);
+        Assert(!isnull);
+    }
+    if (index_getnext_slot(scan, ForwardScanDirection, ident_slot))
+    {
+        bool        shouldFree;
+
+        result = ExecFetchSlotHeapTuple(ident_slot, false, &shouldFree);
+        /* TTSOpsBufferHeapTuple has .get_heap_tuple != NULL. */
+        Assert(!shouldFree);
+    }
+
+    return result;
+}
+
+/*
+ * Decode and apply concurrent changes.
+ *
+ * Pass rel_src iff its reltoastrelid is needed.
+ */
+static void
+process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
+                           Relation rel_dst, Relation rel_src, ScanKey ident_key,
+                           int ident_key_nentries, IndexInsertState *iistate)
+{
+    ClusterDecodingState *dstate;
+
+    pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
+                                 PROGRESS_CLUSTER_PHASE_CATCH_UP);
+
+    dstate = (ClusterDecodingState *) ctx->output_writer_private;
+
+    cluster_decode_concurrent_changes(ctx, end_of_wal);
+
+    if (dstate->nchanges == 0)
+        return;
+
+    PG_TRY();
+    {
+        /*
+         * Make sure that TOAST values can eventually be accessed via the old
+         * relation - see comment in copy_table_data().
+         */
+        if (rel_src)
+            rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
+
+        apply_concurrent_changes(dstate, rel_dst, ident_key,
+                                 ident_key_nentries, iistate);
+    }
+    PG_FINALLY();
+    {
+        ResetClusterCurrentXids();
+
+        if (rel_src)
+            rel_dst->rd_toastoid = InvalidOid;
+    }
+    PG_END_TRY();
+}
+
+static IndexInsertState *
+get_index_insert_state(Relation relation, Oid ident_index_id)
+{
+    EState       *estate;
+    int            i;
+    IndexInsertState *result;
+
+    result = (IndexInsertState *) palloc0(sizeof(IndexInsertState));
+    estate = CreateExecutorState();
+    result->econtext = GetPerTupleExprContext(estate);
+
+    result->rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
+    InitResultRelInfo(result->rri, relation, 0, 0, 0);
+    ExecOpenIndices(result->rri, false);
+
+    /*
+     * Find the relcache entry of the identity index so that we spend no extra
+     * effort to open / close it.
+     */
+    for (i = 0; i < result->rri->ri_NumIndices; i++)
+    {
+        Relation    ind_rel;
+
+        ind_rel = result->rri->ri_IndexRelationDescs[i];
+        if (ind_rel->rd_id == ident_index_id)
+            result->ident_index = ind_rel;
+    }
+    if (result->ident_index == NULL)
+        elog(ERROR, "Failed to open identity index");
+
+    /* Only initialize fields needed by ExecInsertIndexTuples(). */
+    result->estate = estate;
+
+    return result;
+}
+
+/*
+ * Build scan key to process logical changes.
+ */
+static ScanKey
+build_identity_key(Oid ident_idx_oid, Relation rel_src, int *nentries)
+{
+    Relation    ident_idx_rel;
+    Form_pg_index ident_idx;
+    int            n,
+                i;
+    ScanKey        result;
+
+    Assert(OidIsValid(ident_idx_oid));
+    ident_idx_rel = index_open(ident_idx_oid, AccessShareLock);
+    ident_idx = ident_idx_rel->rd_index;
+    n = ident_idx->indnatts;
+    result = (ScanKey) palloc(sizeof(ScanKeyData) * n);
+    for (i = 0; i < n; i++)
+    {
+        ScanKey        entry;
+        int16        relattno;
+        Form_pg_attribute att;
+        Oid            opfamily,
+                    opcintype,
+                    opno,
+                    opcode;
+
+        entry = &result[i];
+        relattno = ident_idx->indkey.values[i];
+        if (relattno >= 1)
+        {
+            TupleDesc    desc;
+
+            desc = rel_src->rd_att;
+            att = TupleDescAttr(desc, relattno - 1);
+        }
+        else
+            elog(ERROR, "Unexpected attribute number %d in index", relattno);
+
+        opfamily = ident_idx_rel->rd_opfamily[i];
+        opcintype = ident_idx_rel->rd_opcintype[i];
+        opno = get_opfamily_member(opfamily, opcintype, opcintype,
+                                   BTEqualStrategyNumber);
+
+        if (!OidIsValid(opno))
+            elog(ERROR, "Failed to find = operator for type %u", opcintype);
+
+        opcode = get_opcode(opno);
+        if (!OidIsValid(opcode))
+            elog(ERROR, "Failed to find = operator for operator %u", opno);
+
+        /* Initialize everything but argument. */
+        ScanKeyInit(entry,
+                    i + 1,
+                    BTEqualStrategyNumber, opcode,
+                    (Datum) NULL);
+        entry->sk_collation = att->attcollation;
+    }
+    index_close(ident_idx_rel, AccessShareLock);
+
+    *nentries = n;
+    return result;
+}
+
+static void
+free_index_insert_state(IndexInsertState *iistate)
+{
+    ExecCloseIndices(iistate->rri);
+    FreeExecutorState(iistate->estate);
+    pfree(iistate->rri);
+    pfree(iistate);
+}
+
+static void
+cleanup_logical_decoding(LogicalDecodingContext *ctx)
+{
+    ClusterDecodingState *dstate;
+
+    dstate = (ClusterDecodingState *) ctx->output_writer_private;
+
+    ExecDropSingleTupleTableSlot(dstate->tsslot);
+    FreeTupleDesc(dstate->tupdesc_change);
+    FreeTupleDesc(dstate->tupdesc);
+    tuplestore_end(dstate->tstore);
+
+    FreeDecodingContext(ctx);
+}
+
+/*
+ * The final steps of rebuild_relation() for concurrent processing.
+ *
+ * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its
+ * clustering index (if one is passed) are still locked in a mode that allows
+ * concurrent data changes. On exit, both tables and their indexes are closed,
+ * but locked in AccessExclusiveLock mode.
+ */
+static void
+rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
+                                   Relation cl_index,
+                                   CatalogState    *cat_state,
+                                   LogicalDecodingContext *ctx,
+                                   bool swap_toast_by_content,
+                                   TransactionId frozenXid,
+                                   MultiXactId cutoffMulti)
+{
+    LOCKMODE    lmode_old;
+    List    *ind_oids_new;
+    Oid        old_table_oid = RelationGetRelid(OldHeap);
+    Oid        new_table_oid = RelationGetRelid(NewHeap);
+    List    *ind_oids_old = RelationGetIndexList(OldHeap);
+    ListCell    *lc, *lc2;
+    char        relpersistence;
+    bool        is_system_catalog;
+    Oid        ident_idx_old, ident_idx_new;
+    IndexInsertState *iistate;
+    ScanKey        ident_key;
+    int        ident_key_nentries;
+    XLogRecPtr    wal_insert_ptr, end_of_wal;
+    char        dummy_rec_data = '\0';
+    RelReopenInfo    *rri = NULL;
+    int        nrel;
+    Relation    *ind_refs_all, *ind_refs_p;
+
+    /* Like in cluster_rel(). */
+    lmode_old = LOCK_CLUSTER_CONCURRENT;
+    Assert(CheckRelationLockedByMe(OldHeap, lmode_old, false));
+    Assert(cl_index == NULL ||
+           CheckRelationLockedByMe(cl_index, lmode_old, false));
+    /* This is expected from the caller. */
+    Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false));
+
+    ident_idx_old = RelationGetReplicaIndex(OldHeap);
+
+    /*
+     * Unlike the exclusive case, we build new indexes for the new relation
+     * rather than swapping the storage and reindexing the old relation. The
+     * point is that the index build can take some time, so we do it before we
+     * get AccessExclusiveLock on the old heap and therefore we cannot swap
+     * the heap storage yet.
+     *
+     * index_create() will lock the new indexes using AccessExclusiveLock
+     * creation - no need to change that.
+     */
+    ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
+
+    /*
+     * Processing shouldn't start w/o valid identity index.
+     */
+    Assert(OidIsValid(ident_idx_old));
+
+    /* Find "identity index" on the new relation. */
+    ident_idx_new = InvalidOid;
+    forboth(lc, ind_oids_old, lc2, ind_oids_new)
+    {
+        Oid    ind_old = lfirst_oid(lc);
+        Oid    ind_new = lfirst_oid(lc2);
+
+        if (ident_idx_old == ind_old)
+        {
+            ident_idx_new = ind_new;
+            break;
+        }
+    }
+    if (!OidIsValid(ident_idx_new))
+        /*
+         * Should not happen, given our lock on the old relation.
+         */
+        ereport(ERROR,
+                (errmsg("Identity index missing on the new relation")));
+
+    /* Executor state to update indexes. */
+    iistate = get_index_insert_state(NewHeap, ident_idx_new);
+
+    /*
+     * Build scan key that we'll use to look for rows to be updated / deleted
+     * during logical decoding.
+     */
+    ident_key = build_identity_key(ident_idx_new, OldHeap, &ident_key_nentries);
+
+    /*
+     * Flush all WAL records inserted so far (possibly except for the last
+     * incomplete page, see GetInsertRecPtr), to minimize the amount of data
+     * we need to flush while holding exclusive lock on the source table.
+     */
+    wal_insert_ptr = GetInsertRecPtr();
+    XLogFlush(wal_insert_ptr);
+    end_of_wal = GetFlushRecPtr(NULL);
+
+    /*
+     * Apply concurrent changes first time, to minimize the time we need to
+     * hold AccessExclusiveLock. (Quite some amount of WAL could have been
+     * written during the data copying and index creation.)
+     */
+    process_concurrent_changes(ctx, end_of_wal, NewHeap,
+                               swap_toast_by_content ? OldHeap : NULL,
+                               ident_key, ident_key_nentries, iistate);
+
+    /*
+     * Release the locks that allowed concurrent data changes, in order to
+     * acquire the AccessExclusiveLock.
+     */
+    nrel = 0;
+    /*
+     * We unlock the old relation (and its clustering index), but then we will
+     * lock the relation and *all* its indexes because we want to swap their
+     * storage.
+     *
+     * (NewHeap is already locked, as well as its indexes.)
+     */
+    rri = palloc_array(RelReopenInfo, 1 + list_length(ind_oids_old));
+    init_rel_reopen_info(&rri[nrel++], &OldHeap, InvalidOid,
+                         LOCK_CLUSTER_CONCURRENT, AccessExclusiveLock);
+    /* References to the re-opened indexes will be stored in this array. */
+    ind_refs_all = palloc_array(Relation, list_length(ind_oids_old));
+    ind_refs_p = ind_refs_all;
+    /* The clustering index is a special case. */
+    if (cl_index)
+    {
+        *ind_refs_p = cl_index;
+        init_rel_reopen_info(&rri[nrel], ind_refs_p, InvalidOid,
+                             LOCK_CLUSTER_CONCURRENT, AccessExclusiveLock);
+        nrel++;
+        ind_refs_p++;
+    }
+    /*
+     * Initialize also the entries for the other indexes (currently unlocked)
+     * because we will have to lock them.
+     */
+    foreach(lc, ind_oids_old)
+    {
+        Oid        ind_oid;
+
+        ind_oid = lfirst_oid(lc);
+        /* Clustering index is already in the array, or there is none. */
+        if (cl_index && RelationGetRelid(cl_index) == ind_oid)
+            continue;
+
+        Assert(nrel < (1 + list_length(ind_oids_old)));
+
+        *ind_refs_p = NULL;
+        init_rel_reopen_info(&rri[nrel],
+                             /*
+                              * In this special case we do not have the
+                              * relcache reference, use OID instead.
+                              */
+                             ind_refs_p,
+                             ind_oid,
+                             NoLock, /* Nothing to unlock. */
+                             AccessExclusiveLock);
+
+        nrel++;
+        ind_refs_p++;
+    }
+    /* Perform the actual unlocking and re-locking. */
+    unlock_and_close_relations(rri, nrel);
+    reopen_relations(rri, nrel);
+
+    /*
+     * In addition, lock the OldHeap's TOAST relation that we skipped for the
+     * CONCURRENTLY option in copy_table_data(). This lock will be needed to
+     * swap the relation files.
+     */
+    if (OidIsValid(OldHeap->rd_rel->reltoastrelid))
+        LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock);
+
+    /*
+     * Check if the new indexes match the old ones, i.e. no changes occurred
+     * while OldHeap was unlocked.
+     *
+     * XXX It's probably not necessary to check the relation tuple descriptor
+     * here because the logical decoding was already active when we released
+     * the lock, and thus the corresponding data changes won't be lost.
+     * However processing of those changes might take a lot of time.
+     */
+    check_catalog_changes(OldHeap, cat_state);
+
+    /*
+     * Tuples and pages of the old heap will be gone, but the heap will stay.
+     */
+    TransferPredicateLocksToHeapRelation(OldHeap);
+    /* The same for indexes. */
+    for (int i = 0; i < (nrel - 1); i++)
+    {
+        Relation    index = ind_refs_all[i];
+
+        TransferPredicateLocksToHeapRelation(index);
+
+        /*
+         * References to indexes on the old relation are not needed anymore,
+         * however locks stay till the end of the transaction.
+         */
+        index_close(index, NoLock);
+    }
+    pfree(ind_refs_all);
+
+    /*
+     * Flush anything we see in WAL, to make sure that all changes committed
+     * while we were waiting for the exclusive lock are available for
+     * decoding. This should not be necessary if all backends had
+     * synchronous_commit set, but we can't rely on this setting.
+     *
+     * Unfortunately, GetInsertRecPtr() may lag behind the actual insert
+     * position, and GetLastImportantRecPtr() points at the start of the last
+     * record rather than at the end. Thus the simplest way to determine the
+     * insert position is to insert a dummy record and use its LSN.
+     *
+     * XXX Consider using GetLastImportantRecPtr() and adding the size of the
+     * last record (plus the total size of all the page headers the record
+     * spans)?
+     */
+    XLogBeginInsert();
+    XLogRegisterData(&dummy_rec_data, 1);
+    wal_insert_ptr = XLogInsert(RM_XLOG_ID, XLOG_NOOP);
+    XLogFlush(wal_insert_ptr);
+    end_of_wal = GetFlushRecPtr(NULL);
+
+    /* Apply the concurrent changes again. */
+    process_concurrent_changes(ctx, end_of_wal, NewHeap,
+                               swap_toast_by_content ? OldHeap : NULL,
+                               ident_key, ident_key_nentries, iistate);
+
+    /* Remember info about rel before closing OldHeap */
+    relpersistence = OldHeap->rd_rel->relpersistence;
+    is_system_catalog = IsSystemRelation(OldHeap);
+
+    pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
+                                 PROGRESS_CLUSTER_PHASE_SWAP_REL_FILES);
+
+    forboth(lc, ind_oids_old, lc2, ind_oids_new)
+    {
+        Oid    ind_old = lfirst_oid(lc);
+        Oid    ind_new = lfirst_oid(lc2);
+        Oid            mapped_tables[4];
+
+        /* Zero out possible results from swapped_relation_files */
+        memset(mapped_tables, 0, sizeof(mapped_tables));
+
+        swap_relation_files(ind_old, ind_new,
+                            (old_table_oid == RelationRelationId),
+                            swap_toast_by_content,
+                            true,
+                            InvalidTransactionId,
+                            InvalidMultiXactId,
+                            mapped_tables);
+
+#ifdef USE_ASSERT_CHECKING
+        /*
+         * Concurrent processing is not supported for system relations, so
+         * there should be no mapped tables.
+         */
+        for (int i = 0; i < 4; i++)
+            Assert(mapped_tables[i] == 0);
+#endif
+    }
+
+    /* The new indexes must be visible for deletion. */
+    CommandCounterIncrement();
+
+    /* Close the old heap but keep lock until transaction commit. */
+    table_close(OldHeap, NoLock);
+    /* Close the new heap. (We didn't have to open its indexes). */
+    table_close(NewHeap, NoLock);
+
+    /* Cleanup what we don't need anymore. (And close the identity index.) */
+    pfree(ident_key);
+    free_index_insert_state(iistate);
+
+    /*
+     * Swap the relations and their TOAST relations and TOAST indexes. This
+     * also drops the new relation and its indexes.
+     *
+     * (System catalogs are currently not supported.)
+     */
+    Assert(!is_system_catalog);
+    finish_heap_swap(old_table_oid, new_table_oid,
+                     is_system_catalog,
+                     swap_toast_by_content,
+                     false, true, false,
+                     frozenXid, cutoffMulti,
+                     relpersistence);
+
+    pfree(rri);
+}
+
+/*
+ * Build indexes on NewHeap according to those on OldHeap.
+ *
+ * OldIndexes is the list of index OIDs on OldHeap.
+ *
+ * A list of OIDs of the corresponding indexes created on NewHeap is
+ * returned. The order of items does match, so we can use these arrays to swap
+ * index storage.
+ */
+static List *
+build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
+{
+    StringInfo    ind_name;
+    ListCell    *lc;
+    List       *result = NIL;
+
+    pgstat_progress_update_param(PROGRESS_CLUSTER_PHASE,
+                                 PROGRESS_CLUSTER_PHASE_REBUILD_INDEX);
+
+    ind_name = makeStringInfo();
+
+    foreach(lc, OldIndexes)
+    {
+        Oid            ind_oid,
+                    ind_oid_new,
+                    tbsp_oid;
+        Relation    ind;
+        IndexInfo  *ind_info;
+        int            i,
+                    heap_col_id;
+        List       *colnames;
+        int16        indnatts;
+        Oid           *collations,
+                   *opclasses;
+        HeapTuple    tup;
+        bool        isnull;
+        Datum        d;
+        oidvector  *oidvec;
+        int2vector *int2vec;
+        size_t        oid_arr_size;
+        size_t        int2_arr_size;
+        int16       *indoptions;
+        text       *reloptions = NULL;
+        bits16        flags;
+        Datum        *opclassOptions;
+        NullableDatum *stattargets;
+
+        ind_oid = lfirst_oid(lc);
+        ind = index_open(ind_oid, AccessShareLock);
+        ind_info = BuildIndexInfo(ind);
+
+        tbsp_oid = ind->rd_rel->reltablespace;
+        /*
+         * Index name really doesn't matter, we'll eventually use only their
+         * storage. Just make them unique within the table.
+         */
+        resetStringInfo(ind_name);
+        appendStringInfo(ind_name, "ind_%d",
+                         list_cell_number(OldIndexes, lc));
+
+        flags = 0;
+        if (ind->rd_index->indisprimary)
+            flags |= INDEX_CREATE_IS_PRIMARY;
+
+        colnames = NIL;
+        indnatts = ind->rd_index->indnatts;
+        oid_arr_size = sizeof(Oid) * indnatts;
+        int2_arr_size = sizeof(int16) * indnatts;
+
+        collations = (Oid *) palloc(oid_arr_size);
+        for (i = 0; i < indnatts; i++)
+        {
+            char       *colname;
+
+            heap_col_id = ind->rd_index->indkey.values[i];
+            if (heap_col_id > 0)
+            {
+                Form_pg_attribute att;
+
+                /* Normal attribute. */
+                att = TupleDescAttr(OldHeap->rd_att, heap_col_id - 1);
+                colname = pstrdup(NameStr(att->attname));
+                collations[i] = att->attcollation;
+            }
+            else if (heap_col_id == 0)
+            {
+                HeapTuple    tuple;
+                Form_pg_attribute att;
+
+                /*
+                 * Expression column is not present in relcache. What we need
+                 * here is an attribute of the *index* relation.
+                 */
+                tuple = SearchSysCache2(ATTNUM,
+                                        ObjectIdGetDatum(ind_oid),
+                                        Int16GetDatum(i + 1));
+                if (!HeapTupleIsValid(tuple))
+                    elog(ERROR,
+                         "cache lookup failed for attribute %d of relation %u",
+                         i + 1, ind_oid);
+                att = (Form_pg_attribute) GETSTRUCT(tuple);
+                colname = pstrdup(NameStr(att->attname));
+                collations[i] = att->attcollation;
+                ReleaseSysCache(tuple);
+            }
+            else
+                elog(ERROR, "Unexpected column number: %d",
+                     heap_col_id);
+
+            colnames = lappend(colnames, colname);
+        }
+
+        /*
+         * Special effort needed for variable length attributes of
+         * Form_pg_index.
+         */
+        tup = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(ind_oid));
+        if (!HeapTupleIsValid(tup))
+            elog(ERROR, "cache lookup failed for index %u", ind_oid);
+        d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indclass, &isnull);
+        Assert(!isnull);
+        oidvec = (oidvector *) DatumGetPointer(d);
+        opclasses = (Oid *) palloc(oid_arr_size);
+        memcpy(opclasses, oidvec->values, oid_arr_size);
+
+        d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indoption,
+                            &isnull);
+        Assert(!isnull);
+        int2vec = (int2vector *) DatumGetPointer(d);
+        indoptions = (int16 *) palloc(int2_arr_size);
+        memcpy(indoptions, int2vec->values, int2_arr_size);
+        ReleaseSysCache(tup);
+
+        tup = SearchSysCache1(RELOID, ObjectIdGetDatum(ind_oid));
+        if (!HeapTupleIsValid(tup))
+            elog(ERROR, "cache lookup failed for index relation %u", ind_oid);
+        d = SysCacheGetAttr(RELOID, tup, Anum_pg_class_reloptions, &isnull);
+        reloptions = !isnull ? DatumGetTextPCopy(d) : NULL;
+        ReleaseSysCache(tup);
+
+        opclassOptions = palloc0(sizeof(Datum) * ind_info->ii_NumIndexAttrs);
+        for (i = 0; i < ind_info->ii_NumIndexAttrs; i++)
+            opclassOptions[i] = get_attoptions(ind_oid, i + 1);
+
+        stattargets = get_index_stattargets(ind_oid, ind_info);
+
+        /*
+         * Neither parentIndexRelid nor parentConstraintId needs to be passed
+         * since the new catalog entries (pg_constraint, pg_inherits) would
+         * eventually be dropped. Therefore there's no need to record valid
+         * dependency on parents.
+         */
+        ind_oid_new = index_create(NewHeap,
+                                   ind_name->data,
+                                   InvalidOid,
+                                   InvalidOid,    /* parentIndexRelid */
+                                   InvalidOid,    /* parentConstraintId */
+                                   InvalidOid,
+                                   ind_info,
+                                   colnames,
+                                   ind->rd_rel->relam,
+                                   tbsp_oid,
+                                   collations,
+                                   opclasses,
+                                   opclassOptions,
+                                   indoptions,
+                                   stattargets,
+                                   PointerGetDatum(reloptions),
+                                   flags,    /* flags */
+                                   0,    /* constr_flags */
+                                   false,    /* allow_system_table_mods */
+                                   false,    /* is_internal */
+                                   NULL /* constraintId */
+            );
+        result = lappend_oid(result, ind_oid_new);
+
+        index_close(ind, AccessShareLock);
+        list_free_deep(colnames);
+        pfree(collations);
+        pfree(opclasses);
+        pfree(indoptions);
+        if (reloptions)
+            pfree(reloptions);
+    }
+
+    return result;
+}
+
+static void
+init_rel_reopen_info(RelReopenInfo *rri, Relation *rel_p, Oid relid,
+                     LOCKMODE lockmode_orig, LOCKMODE lockmode_new)
+{
+    rri->rel_p = rel_p;
+    rri->relid = relid;
+    rri->lockmode_orig = lockmode_orig;
+    rri->lockmode_new = lockmode_new;
+}
+
+/*
+ * Unlock and close relations specified by items of the 'rels' array. 'nrels'
+ * is the number of items.
+ *
+ * Information needed to (re)open the relations (or to issue meaningful ERROR)
+ * is added to the array items.
+ */
+static void
+unlock_and_close_relations(RelReopenInfo *rels, int nrel)
+{
+    int        i;
+    RelReopenInfo    *rri;
+
+    /*
+     * First, retrieve the information that we will need for re-opening.
+     *
+     * We could close (and unlock) each relation as soon as we have gathered
+     * the related information, but then we would have to be careful not to
+     * unlock the table until we have the info on all its indexes. (Once we
+     * unlock the table, any index can be dropped, and thus we can fail to get
+     * the name we want to report if re-opening fails.) It seem simpler to
+     * separate the work into two iterations.
+     */
+    for (i = 0; i < nrel; i++)
+    {
+        Relation    rel;
+
+        rri = &rels[i];
+        rel = *rri->rel_p;
+
+        if (rel)
+        {
+            Assert(CheckRelationLockedByMe(rel, rri->lockmode_orig, false));
+            Assert(!OidIsValid(rri->relid));
+
+            rri->relid = RelationGetRelid(rel);
+            rri->relkind = rel->rd_rel->relkind;
+            rri->relname = pstrdup(RelationGetRelationName(rel));
+        }
+        else
+        {
+            Assert(OidIsValid(rri->relid));
+
+            rri->relname = get_rel_name(rri->relid);
+            rri->relkind = get_rel_relkind(rri->relid);
+        }
+    }
+
+    /* Second, close the relations. */
+    for (i = 0; i < nrel; i++)
+    {
+        Relation    rel;
+
+        rri = &rels[i];
+        rel = *rri->rel_p;
+
+        /* Close the relation if the caller passed one. */
+        if (rel)
+        {
+            if (rri->relkind == RELKIND_RELATION)
+                table_close(rel, rri->lockmode_orig);
+            else
+            {
+                Assert(rri->relkind == RELKIND_INDEX);
+
+                index_close(rel, rri->lockmode_orig);
+            }
+        }
+    }
+}
+
+/*
+ * Re-open the relations closed previously by unlock_and_close_relations().
+ */
+static void
+reopen_relations(RelReopenInfo *rels, int nrel)
+{
+    for (int i = 0; i < nrel; i++)
+    {
+        RelReopenInfo    *rri = &rels[i];
+        Relation    rel;
+
+        if (rri->relkind == RELKIND_RELATION)
+        {
+            rel = try_table_open(rri->relid, rri->lockmode_new);
+        }
+        else
+        {
+            Assert(rri->relkind == RELKIND_INDEX);
+
+            rel = try_index_open(rri->relid, rri->lockmode_new);
+        }
+
+        if (rel == NULL)
+        {
+            const char    *kind_str;
+
+            kind_str = (rri->relkind == RELKIND_RELATION) ? "table" : "index";
+            ereport(ERROR,
+                    (errmsg("could not open \%s \"%s\"", kind_str,
+                            rri->relname),
+                     errhint("the %s could have been dropped by another transaction",
+                             kind_str)));
+        }
+        *rri->rel_p = rel;
+
+        pfree(rri->relname);
+    }
+}
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 488ca950d9..af1945e1ed 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -873,7 +873,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 static void
 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
 {
-    finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
+    finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, true,
                      RecentXmin, ReadNextMultiXactId(), relpersistence);
 }
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 5d6151dad1..13f32ede92 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4395,6 +4395,16 @@ AlterTableInternal(Oid relid, List *cmds, bool recurse)
 
     rel = relation_open(relid, lockmode);
 
+    /*
+     * If lockmode allows, check if VACUUM FULL / CLUSTER CONCURRENTLY is in
+     * progress. If lockmode is too weak, cluster_rel() should detect
+     * incompatible DDLs executed by us.
+     *
+     * XXX We might skip the changes for DDLs which do not change the tuple
+     * descriptor.
+     */
+    check_for_concurrent_cluster(relid, lockmode);
+
     EventTriggerAlterTableRelid(relid);
 
     ATController(NULL, rel, cmds, recurse, lockmode, NULL);
@@ -5861,6 +5871,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
             finish_heap_swap(tab->relid, OIDNewHeap,
                              false, false, true,
                              !OidIsValid(tab->newTableSpace),
+                             true,
                              RecentXmin,
                              ReadNextMultiXactId(),
                              persistence);
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 0bd000acc5..529c46c186 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -112,7 +112,8 @@ static void vac_truncate_clog(TransactionId frozenXID,
                               TransactionId lastSaneFrozenXid,
                               MultiXactId lastSaneMinMulti);
 static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
-                       BufferAccessStrategy bstrategy);
+                       BufferAccessStrategy bstrategy, bool isTopLevel,
+                       bool whole_database);
 static double compute_parallel_delay(void);
 static VacOptValue get_vacoptval_from_boolean(DefElem *def);
 static bool vac_tid_reaped(ItemPointer itemptr, void *state);
@@ -153,6 +154,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
     bool        analyze = false;
     bool        freeze = false;
     bool        full = false;
+    bool        concurrent = false;
     bool        disable_page_skipping = false;
     bool        process_main = true;
     bool        process_toast = true;
@@ -226,6 +228,8 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
             freeze = defGetBoolean(opt);
         else if (strcmp(opt->defname, "full") == 0)
             full = defGetBoolean(opt);
+        else if (strcmp(opt->defname, "concurrently") == 0)
+            concurrent = defGetBoolean(opt);
         else if (strcmp(opt->defname, "disable_page_skipping") == 0)
             disable_page_skipping = defGetBoolean(opt);
         else if (strcmp(opt->defname, "index_cleanup") == 0)
@@ -300,7 +304,7 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
         (skip_locked ? VACOPT_SKIP_LOCKED : 0) |
         (analyze ? VACOPT_ANALYZE : 0) |
         (freeze ? VACOPT_FREEZE : 0) |
-        (full ? VACOPT_FULL : 0) |
+        (full ? (concurrent ? VACOPT_FULL_CONCURRENT : VACOPT_FULL_EXCLUSIVE) : 0) |
         (disable_page_skipping ? VACOPT_DISABLE_PAGE_SKIPPING : 0) |
         (process_main ? VACOPT_PROCESS_MAIN : 0) |
         (process_toast ? VACOPT_PROCESS_TOAST : 0) |
@@ -380,6 +384,12 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
                      errmsg("ONLY_DATABASE_STATS cannot be specified with other VACUUM options")));
     }
 
+    /* This problem cannot be identified from the options. */
+    if (concurrent && !full)
+        ereport(ERROR,
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("CONCURRENTLY can only be specified with VACUUM FULL")));
+
     /*
      * All freeze ages are zero if the FREEZE option is given; otherwise pass
      * them as -1 which means to use the default values.
@@ -483,6 +493,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
     const char *stmttype;
     volatile bool in_outer_xact,
                 use_own_xacts;
+    bool        whole_database = false;
 
     Assert(params != NULL);
 
@@ -543,7 +554,15 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
         relations = newrels;
     }
     else
+    {
         relations = get_all_vacuum_rels(vac_context, params->options);
+        /*
+         * If all tables should be processed, the CONCURRENTLY option implies
+         * that we should skip system relations rather than raising ERRORs.
+         */
+        if (params->options & VACOPT_FULL_CONCURRENT)
+            whole_database = true;
+    }
 
     /*
      * Decide whether we need to start/commit our own transactions.
@@ -619,7 +638,8 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
 
             if (params->options & VACOPT_VACUUM)
             {
-                if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy))
+                if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy,
+                                isTopLevel, whole_database))
                     continue;
             }
 
@@ -1932,10 +1952,14 @@ vac_truncate_clog(TransactionId frozenXID,
 /*
  *    vacuum_rel() -- vacuum one heap relation
  *
- *        relid identifies the relation to vacuum.  If relation is supplied,
- *        use the name therein for reporting any failure to open/lock the rel;
- *        do not use it once we've successfully opened the rel, since it might
- *        be stale.
+ *        relid identifies the relation to vacuum.  If relation is supplied, use
+ *        the name therein for reporting any failure to open/lock the rel; do
+ *        not use it once we've successfully opened the rel, since it might be
+ *        stale.
+ *
+ *        If whole_database is true, we are processing all the relations of the
+ *        current database. In that case we might need to silently skip
+ *        relations which could otherwise cause ERROR.
  *
  *        Returns true if it's okay to proceed with a requested ANALYZE
  *        operation on this table.
@@ -1950,7 +1974,8 @@ vac_truncate_clog(TransactionId frozenXID,
  */
 static bool
 vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
-           BufferAccessStrategy bstrategy)
+           BufferAccessStrategy bstrategy, bool isTopLevel,
+           bool whole_database)
 {
     LOCKMODE    lmode;
     Relation    rel;
@@ -2013,10 +2038,11 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
 
     /*
      * Determine the type of lock we want --- hard exclusive lock for a FULL
-     * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
-     * way, we can be sure that no other backend is vacuuming the same table.
+     * exclusive vacuum, but a weaker lock (ShareUpdateExclusiveLock) for
+     * concurrent vacuum. Either way, we can be sure that no other backend is
+     * vacuuming the same table.
      */
-    lmode = (params->options & VACOPT_FULL) ?
+    lmode = (params->options & VACOPT_FULL_EXCLUSIVE) ?
         AccessExclusiveLock : ShareUpdateExclusiveLock;
 
     /* open the relation and get the appropriate lock on it */
@@ -2031,6 +2057,39 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
         return false;
     }
 
+    /*
+     * Leave if the CONCURRENTLY option was passed, but the relation is not
+     * suitable for that. Note that we only skip such relations if the user
+     * wants to vacuum the whole database. In contrast, if he specified
+     * inappropriate relation(s) explicitly, the command will end up with
+     * ERROR.
+     */
+    if (whole_database && (params->options & VACOPT_FULL_CONCURRENT) &&
+        !check_relation_is_clusterable_concurrently(rel, DEBUG1,
+                                                    "VACUUM (FULL, CONCURRENTLY)"))
+    {
+        relation_close(rel, lmode);
+        PopActiveSnapshot();
+        CommitTransactionCommand();
+        return false;
+    }
+
+    /*
+     * Skip the relation if VACUUM FULL / CLUSTER CONCURRENTLY is in progress
+     * as it will drop the current storage of the relation.
+     *
+     * This check should not take place until we have a lock that prevents
+     * another backend from starting VACUUM FULL / CLUSTER CONCURRENTLY later.
+     */
+    Assert(lmode >= LOCK_CLUSTER_CONCURRENT);
+    if (is_concurrent_cluster_in_progress(relid))
+    {
+        relation_close(rel, lmode);
+        PopActiveSnapshot();
+        CommitTransactionCommand();
+        return false;
+    }
+
     /*
      * When recursing to a TOAST table, check privileges on the parent.  NB:
      * This is only safe to do because we hold a session lock on the main
@@ -2104,19 +2163,6 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
         return true;
     }
 
-    /*
-     * Get a session-level lock too. This will protect our access to the
-     * relation across multiple transactions, so that we can vacuum the
-     * relation's TOAST table (if any) secure in the knowledge that no one is
-     * deleting the parent relation.
-     *
-     * NOTE: this cannot block, even if someone else is waiting for access,
-     * because the lock manager knows that both lock requests are from the
-     * same process.
-     */
-    lockrelid = rel->rd_lockInfo.lockRelId;
-    LockRelationIdForSession(&lockrelid, lmode);
-
     /*
      * Set index_cleanup option based on index_cleanup reloption if it wasn't
      * specified in VACUUM command, or when running in an autovacuum worker
@@ -2169,6 +2215,30 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
     else
         toast_relid = InvalidOid;
 
+    /*
+     * Get a session-level lock too. This will protect our access to the
+     * relation across multiple transactions, so that we can vacuum the
+     * relation's TOAST table (if any) secure in the knowledge that no one is
+     * deleting the parent relation.
+     *
+     * NOTE: this cannot block, even if someone else is waiting for access,
+     * because the lock manager knows that both lock requests are from the
+     * same process.
+     */
+    if (OidIsValid(toast_relid))
+    {
+        /*
+         * You might worry that, in the VACUUM (FULL, CONCURRENTLY) case,
+         * cluster_rel() needs to release all the locks on the relation at
+         * some point, but this session lock makes it impossible. In fact,
+         * cluster_rel() will will eventually be called for the TOAST relation
+         * and raise ERROR because, in the concurrent mode, it cannot process
+         * TOAST relation alone anyway.
+         */
+        lockrelid = rel->rd_lockInfo.lockRelId;
+        LockRelationIdForSession(&lockrelid, lmode);
+    }
+
     /*
      * Switch to the table owner's userid, so that any index functions are run
      * as that user.  Also lock down security-restricted operations and
@@ -2196,11 +2266,22 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
         {
             ClusterParams cluster_params = {0};
 
+            /*
+             * Invalid toast_relid means that there is no session lock on the
+             * relation. Such a lock would be a problem because it would
+             * prevent cluster_rel() from releasing all locks when it tries to
+             * get AccessExclusiveLock.
+             */
+            Assert(!OidIsValid(toast_relid));
+
             if ((params->options & VACOPT_VERBOSE) != 0)
                 cluster_params.options |= CLUOPT_VERBOSE;
 
+            if ((params->options & VACOPT_FULL_CONCURRENT) != 0)
+                cluster_params.options |= CLUOPT_CONCURRENT;
+
             /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */
-            cluster_rel(rel, InvalidOid, &cluster_params);
+            cluster_rel(rel, InvalidOid, &cluster_params, isTopLevel);
 
             /*
              * cluster_rel() should have closed the relation, lock is kept
@@ -2249,13 +2330,15 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
         toast_vacuum_params.options |= VACOPT_PROCESS_MAIN;
         toast_vacuum_params.toast_parent = relid;
 
-        vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy);
+        vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy,
+                   isTopLevel, whole_database);
     }
 
     /*
      * Now release the session-level lock on the main table.
      */
-    UnlockRelationIdForSession(&lockrelid, lmode);
+    if (OidIsValid(toast_relid))
+        UnlockRelationIdForSession(&lockrelid, lmode);
 
     /* Report that we really did it. */
     return true;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index d687ceee33..066d96dea2 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -467,6 +467,57 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     TransactionId xid = XLogRecGetXid(buf->record);
     SnapBuild  *builder = ctx->snapshot_builder;
 
+    /*
+     * If the change is not intended for logical decoding, do not even
+     * establish transaction for it. This is particularly important if the
+     * record was generated by CLUSTER CONCURRENTLY because this command uses
+     * the original XID when doing changes in the new storage. The decoding
+     * subsystem probably does not expect to see the same transaction multiple
+     * times.
+     */
+    switch (info)
+    {
+        case XLOG_HEAP_INSERT:
+        {
+            xl_heap_insert    *rec;
+
+            rec = (xl_heap_insert *) XLogRecGetData(buf->record);
+            /*
+             * (This does happen when raw_heap_insert marks the TOAST record
+             * as HEAP_INSERT_NO_LOGICAL).
+             */
+            if ((rec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) == 0)
+                return;
+
+            break;
+        }
+
+        case XLOG_HEAP_HOT_UPDATE:
+        case XLOG_HEAP_UPDATE:
+        {
+            xl_heap_update    *rec;
+
+            rec = (xl_heap_update *) XLogRecGetData(buf->record);
+            if ((rec->flags &
+                 (XLH_UPDATE_CONTAINS_NEW_TUPLE |
+                  XLH_UPDATE_CONTAINS_OLD_TUPLE |
+                  XLH_UPDATE_CONTAINS_OLD_KEY)) == 0)
+                return;
+
+            break;
+        }
+
+        case XLOG_HEAP_DELETE:
+        {
+            xl_heap_delete    *rec;
+
+            rec = (xl_heap_delete *) XLogRecGetData(buf->record);
+            if (rec->flags & XLH_DELETE_NO_LOGICAL)
+                return;
+            break;
+        }
+    }
+
     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
     /*
@@ -903,13 +954,6 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
     xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
-    /*
-     * Ignore insert records without new tuples (this does happen when
-     * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
-     */
-    if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
-        return;
-
     /* only interested in our database */
     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     if (target_locator.dbOid != ctx->slot->data.database)
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e37e22f441..ed15a0b175 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -286,7 +286,7 @@ static bool ExportInProgress = false;
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
-static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
+static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn);
 
 static void SnapBuildFreeSnapshot(Snapshot snap);
 
@@ -481,12 +481,17 @@ SnapBuildSnapDecRefcount(Snapshot snap)
  * Build a new snapshot, based on currently committed catalog-modifying
  * transactions.
  *
+ * 'lsn' is the location of the commit record (of a catalog-changing
+ * transaction) that triggered creation of the snapshot. Pass
+ * InvalidXLogRecPtr for the transaction base snapshot or if it the user of
+ * the snapshot should not need the LSN.
+ *
  * In-progress transactions with catalog access are *not* allowed to modify
  * these snapshots; they have to copy them and fill in appropriate ->curcid
  * and ->subxip/subxcnt values.
  */
 static Snapshot
-SnapBuildBuildSnapshot(SnapBuild *builder)
+SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 {
     Snapshot    snapshot;
     Size        ssize;
@@ -554,6 +559,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
     snapshot->active_count = 0;
     snapshot->regd_count = 0;
     snapshot->snapXactCompletionCount = 0;
+    snapshot->lsn = lsn;
 
     return snapshot;
 }
@@ -569,10 +575,7 @@ Snapshot
 SnapBuildInitialSnapshot(SnapBuild *builder)
 {
     Snapshot    snap;
-    TransactionId xid;
     TransactionId safeXid;
-    TransactionId *newxip;
-    int            newxcnt = 0;
 
     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     Assert(builder->building_full_snapshot);
@@ -593,7 +596,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
     if (TransactionIdIsValid(MyProc->xmin))
         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
 
-    snap = SnapBuildBuildSnapshot(builder);
+    snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 
     /*
      * We know that snap->xmin is alive, enforced by the logical xmin
@@ -614,6 +617,47 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 
     MyProc->xmin = snap->xmin;
 
+    /* Convert the historic snapshot to MVCC snapshot. */
+    return SnapBuildMVCCFromHistoric(snap, true);
+}
+
+/*
+ * Build an MVCC snapshot for the initial data load performed by CLUSTER
+ * CONCURRENTLY command.
+ *
+ * The snapshot will only be used to scan one particular relation, which is
+ * treated like a catalog (therefore ->building_full_snapshot is not
+ * important), and the caller should already have a replication slot setup (so
+ * we do not set MyProc->xmin). XXX Do we yet need to add some restrictions?
+ */
+Snapshot
+SnapBuildInitialSnapshotForCluster(SnapBuild *builder)
+{
+    Snapshot    snap;
+
+    Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+    snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
+    return SnapBuildMVCCFromHistoric(snap, false);
+}
+
+/*
+ * Turn a historic MVCC snapshot into an ordinary MVCC snapshot.
+ *
+ * Pass true for 'in_place' if you don't care about modifying the source
+ * snapshot. If you need a new instance, and one that was allocated as a
+ * single chunk of memory, pass false.
+ */
+Snapshot
+SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place)
+{
+    TransactionId xid;
+    TransactionId *oldxip = snapshot->xip;
+    uint32        oldxcnt    = snapshot->xcnt;
+    TransactionId *newxip;
+    int            newxcnt = 0;
+    Snapshot    result;
+
     /* allocate in transaction context */
     newxip = (TransactionId *)
         palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
@@ -624,7 +668,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
      * classical snapshot by marking all non-committed transactions as
      * in-progress. This can be expensive.
      */
-    for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
+    for (xid = snapshot->xmin; NormalTransactionIdPrecedes(xid, snapshot->xmax);)
     {
         void       *test;
 
@@ -632,7 +676,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
          * Check whether transaction committed using the decoding snapshot
          * meaning of ->xip.
          */
-        test = bsearch(&xid, snap->xip, snap->xcnt,
+        test = bsearch(&xid, snapshot->xip, snapshot->xcnt,
                        sizeof(TransactionId), xidComparator);
 
         if (test == NULL)
@@ -649,11 +693,22 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
     }
 
     /* adjust remaining snapshot fields as needed */
-    snap->snapshot_type = SNAPSHOT_MVCC;
-    snap->xcnt = newxcnt;
-    snap->xip = newxip;
+    snapshot->xcnt = newxcnt;
+    snapshot->xip = newxip;
+
+    if (in_place)
+        result = snapshot;
+    else
+    {
+        result = CopySnapshot(snapshot);
+
+        /* Restore the original values so the source is intact. */
+        snapshot->xip = oldxip;
+        snapshot->xcnt = oldxcnt;
+    }
+    result->snapshot_type = SNAPSHOT_MVCC;
 
-    return snap;
+    return result;
 }
 
 /*
@@ -712,7 +767,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     /* only build a new snapshot if we don't have a prebuilt one */
     if (builder->snapshot == NULL)
     {
-        builder->snapshot = SnapBuildBuildSnapshot(builder);
+        builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
         /* increase refcount for the snapshot builder */
         SnapBuildSnapIncRefcount(builder->snapshot);
     }
@@ -792,7 +847,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
         /* only build a new snapshot if we don't have a prebuilt one */
         if (builder->snapshot == NULL)
         {
-            builder->snapshot = SnapBuildBuildSnapshot(builder);
+            builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
             /* increase refcount for the snapshot builder */
             SnapBuildSnapIncRefcount(builder->snapshot);
         }
@@ -1161,7 +1216,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
         if (builder->snapshot)
             SnapBuildSnapDecRefcount(builder->snapshot);
 
-        builder->snapshot = SnapBuildBuildSnapshot(builder);
+        builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 
         /* we might need to execute invalidations, add snapshot */
         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@@ -1989,7 +2044,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
     {
         SnapBuildSnapDecRefcount(builder->snapshot);
     }
-    builder->snapshot = SnapBuildBuildSnapshot(builder);
+    builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
     SnapBuildSnapIncRefcount(builder->snapshot);
 
     ReorderBufferSetRestartPoint(builder->reorder, lsn);
diff --git a/src/backend/replication/pgoutput_cluster/Makefile b/src/backend/replication/pgoutput_cluster/Makefile
new file mode 100644
index 0000000000..31471bb546
--- /dev/null
+++ b/src/backend/replication/pgoutput_cluster/Makefile
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/pgoutput_cluster
+#
+# IDENTIFICATION
+#    src/backend/replication/pgoutput_cluster
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/pgoutput_cluster
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+    $(WIN32RES) \
+    pgoutput_cluster.o
+PGFILEDESC = "pgoutput_cluster - logical replication output plugin for CLUSTER command"
+NAME = pgoutput_cluster
+
+all: all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean: clean-lib
+    rm -f $(OBJS)
diff --git a/src/backend/replication/pgoutput_cluster/meson.build
b/src/backend/replication/pgoutput_cluster/meson.build
new file mode 100644
index 0000000000..0f033064f2
--- /dev/null
+++ b/src/backend/replication/pgoutput_cluster/meson.build
@@ -0,0 +1,18 @@
+# Copyright (c) 2022-2024, PostgreSQL Global Development Group
+
+pgoutput_cluster_sources = files(
+  'pgoutput_cluster.c',
+)
+
+if host_system == 'windows'
+  pgoutput_cluster_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'pgoutput_cluster',
+    '--FILEDESC', 'pgoutput_cluster - logical replication output plugin for CLUSTER command',])
+endif
+
+pgoutput_cluster = shared_module('pgoutput_cluster',
+  pgoutput_cluster_sources,
+  kwargs: pg_mod_args,
+)
+
+backend_targets += pgoutput_cluster
diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
new file mode 100644
index 0000000000..9fe44017a8
--- /dev/null
+++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
@@ -0,0 +1,321 @@
+/* TODO Move into src/backend/cluster/ (and rename?) */
+/*-------------------------------------------------------------------------
+ *
+ * pgoutput_cluster.c
+ *        Logical Replication output plugin for CLUSTER command
+ *
+ * Copyright (c) 2012-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *          src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heaptoast.h"
+#include "commands/cluster.h"
+#include "replication/snapbuild.h"
+
+PG_MODULE_MAGIC;
+
+static void plugin_startup(LogicalDecodingContext *ctx,
+                           OutputPluginOptions *opt, bool is_init);
+static void plugin_shutdown(LogicalDecodingContext *ctx);
+static void plugin_begin_txn(LogicalDecodingContext *ctx,
+                             ReorderBufferTXN *txn);
+static void plugin_commit_txn(LogicalDecodingContext *ctx,
+                              ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                          Relation rel, ReorderBufferChange *change);
+static void plugin_truncate(struct LogicalDecodingContext *ctx,
+                            ReorderBufferTXN *txn, int nrelations,
+                            Relation relations[],
+                            ReorderBufferChange *change);
+static void store_change(LogicalDecodingContext *ctx,
+                         ConcurrentChangeKind kind, HeapTuple tuple,
+                         TransactionId xid);
+
+void
+_PG_output_plugin_init(OutputPluginCallbacks *cb)
+{
+    AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
+
+    cb->startup_cb = plugin_startup;
+    cb->begin_cb = plugin_begin_txn;
+    cb->change_cb = plugin_change;
+    cb->truncate_cb = plugin_truncate;
+    cb->commit_cb = plugin_commit_txn;
+    cb->shutdown_cb = plugin_shutdown;
+}
+
+
+/* initialize this plugin */
+static void
+plugin_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
+               bool is_init)
+{
+    ctx->output_plugin_private = NULL;
+
+    /* Probably unnecessary, as we don't use the SQL interface ... */
+    opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+
+    if (ctx->output_plugin_options != NIL)
+    {
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("This plugin does not expect any options")));
+    }
+}
+
+static void
+plugin_shutdown(LogicalDecodingContext *ctx)
+{
+}
+
+/*
+ * As we don't release the slot during processing of particular table, there's
+ * no room for SQL interface, even for debugging purposes. Therefore we need
+ * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
+ * callbacks. (Although we might want to write custom callbacks, this API
+ * seems to be unnecessarily generic for our purposes.)
+ */
+
+/* BEGIN callback */
+static void
+plugin_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+}
+
+/* COMMIT callback */
+static void
+plugin_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                  XLogRecPtr commit_lsn)
+{
+}
+
+/*
+ * Callback for individual changed tuples
+ */
+static void
+plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+              Relation relation, ReorderBufferChange *change)
+{
+    ClusterDecodingState *dstate;
+    Snapshot    snapshot;
+
+    dstate = (ClusterDecodingState *) ctx->output_writer_private;
+
+    /* Only interested in one particular relation. */
+    if (relation->rd_id != dstate->relid)
+        return;
+
+    /*
+     * Catalog snapshot is fine because the table we are processing is
+     * temporarily considered a user catalog table.
+     */
+    snapshot = GetCatalogSnapshot(InvalidOid);
+    Assert(snapshot->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
+    Assert(!snapshot->suboverflowed);
+
+    /*
+     * This should not happen, but if we don't have enough information to
+     * apply a new snapshot, the consequences would be bad. Thus prefer ERROR
+     * to Assert().
+     */
+    if (XLogRecPtrIsInvalid(snapshot->lsn))
+        ereport(ERROR, (errmsg("snapshot has invalid LSN")));
+
+    /*
+     * reorderbuffer.c changes the catalog snapshot as soon as it sees a new
+     * CID or a commit record of a catalog-changing transaction.
+     */
+    if (dstate->snapshot == NULL || snapshot->lsn != dstate->snapshot_lsn ||
+        snapshot->curcid != dstate->snapshot->curcid)
+    {
+        /* CID should not go backwards. */
+        Assert(dstate->snapshot == NULL ||
+               snapshot->curcid >= dstate->snapshot->curcid);
+
+        /*
+         * XXX Is it a problem that the copy is created in
+         * TopTransactionContext?
+         */
+        dstate->snapshot = SnapBuildMVCCFromHistoric(snapshot, false);
+        dstate->snapshot_lsn = snapshot->lsn;
+    }
+
+    /* Decode entry depending on its type */
+    switch (change->action)
+    {
+        case REORDER_BUFFER_CHANGE_INSERT:
+            {
+                HeapTuple    newtuple;
+
+                newtuple = change->data.tp.newtuple != NULL ?
+                    change->data.tp.newtuple : NULL;
+
+                /*
+                 * Identity checks in the main function should have made this
+                 * impossible.
+                 */
+                if (newtuple == NULL)
+                    elog(ERROR, "Incomplete insert info.");
+
+                store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
+            }
+            break;
+        case REORDER_BUFFER_CHANGE_UPDATE:
+            {
+                HeapTuple    oldtuple,
+                            newtuple;
+
+                oldtuple = change->data.tp.oldtuple != NULL ?
+                    change->data.tp.oldtuple : NULL;
+                newtuple = change->data.tp.newtuple != NULL ?
+                    change->data.tp.newtuple : NULL;
+
+                if (newtuple == NULL)
+                    elog(ERROR, "Incomplete update info.");
+
+                if (oldtuple != NULL)
+                    store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
+                                 change->txn->xid);
+
+                store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
+                             change->txn->xid);
+            }
+            break;
+        case REORDER_BUFFER_CHANGE_DELETE:
+            {
+                HeapTuple    oldtuple;
+
+                oldtuple = change->data.tp.oldtuple ?
+                    change->data.tp.oldtuple : NULL;
+
+                if (oldtuple == NULL)
+                    elog(ERROR, "Incomplete delete info.");
+
+                store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
+            }
+            break;
+        default:
+            /* Should not come here */
+            Assert(false);
+            break;
+    }
+}
+
+static void
+plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                int nrelations, Relation relations[],
+                ReorderBufferChange *change)
+{
+    ClusterDecodingState *dstate;
+    int        i;
+    Relation    relation = NULL;
+
+    dstate = (ClusterDecodingState *) ctx->output_writer_private;
+
+    /* Find the relation we are processing. */
+    for (i = 0; i < nrelations; i++)
+    {
+        relation = relations[i];
+
+        if (RelationGetRelid(relation) == dstate->relid)
+            break;
+    }
+
+    /* Is this truncation of another relation? */
+    if (i == nrelations)
+        return;
+
+    store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
+}
+
+/* Store concurrent data change. */
+static void
+store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
+             HeapTuple tuple, TransactionId xid)
+{
+    ClusterDecodingState *dstate;
+    char       *change_raw;
+    ConcurrentChange *change;
+    bool        flattened = false;
+    Size        size;
+    Datum        values[1];
+    bool        isnull[1];
+    char       *dst;
+
+    dstate = (ClusterDecodingState *) ctx->output_writer_private;
+
+    size = MAXALIGN(VARHDRSZ) + sizeof(ConcurrentChange);
+
+    if (tuple)
+    {
+        /*
+         * ReorderBufferCommit() stores the TOAST chunks in its private memory
+         * context and frees them after having called
+         * apply_change(). Therefore we need flat copy (including TOAST) that
+         * we eventually copy into the memory context which is available to
+         * decode_concurrent_changes().
+         */
+        if (HeapTupleHasExternal(tuple))
+        {
+            /*
+             * toast_flatten_tuple_to_datum() might be more convenient but we
+             * don't want the decompression it does.
+             */
+            tuple = toast_flatten_tuple(tuple, dstate->tupdesc);
+            flattened = true;
+        }
+
+        size += tuple->t_len;
+    }
+
+    /* XXX Isn't there any function / macro to do this? */
+    if (size >= 0x3FFFFFFF)
+        elog(ERROR, "Change is too big.");
+
+    /* Construct the change. */
+    change_raw = (char *) palloc0(size);
+    SET_VARSIZE(change_raw, size);
+    change = (ConcurrentChange *) VARDATA(change_raw);
+    change->kind = kind;
+
+    /* No other information is needed for TRUNCATE. */
+    if (change->kind == CHANGE_TRUNCATE)
+        goto store;
+
+    /*
+     * Copy the tuple.
+     *
+     * CAUTION: change->tup_data.t_data must be fixed on retrieval!
+     */
+    memcpy(&change->tup_data, tuple, sizeof(HeapTupleData));
+    dst = (char *) change + sizeof(ConcurrentChange);
+    memcpy(dst, tuple->t_data, tuple->t_len);
+
+    /* Initialize the other fields. */
+    change->xid = xid;
+    change->snapshot = dstate->snapshot;
+    dstate->snapshot->active_count++;
+
+    /* The data has been copied. */
+    if (flattened)
+        pfree(tuple);
+
+store:
+    /* Store as tuple of 1 bytea column. */
+    values[0] = PointerGetDatum(change_raw);
+    isnull[0] = false;
+    tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change,
+                         values, isnull);
+
+    /* Accounting. */
+    dstate->nchanges++;
+
+    /* Cleanup. */
+    pfree(change_raw);
+}
+
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2100150f01..a84de0611a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/cluster.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
     size = add_size(size, WaitEventCustomShmemSize());
     size = add_size(size, InjectionPointShmemSize());
     size = add_size(size, SlotSyncShmemSize());
+    size = add_size(size, ClusterShmemSize());
 #ifdef EXEC_BACKEND
     size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -357,6 +359,7 @@ CreateOrAttachShmemStructs(void)
     StatsShmemInit();
     WaitEventCustomShmemInit();
     InjectionPointShmemInit();
+    ClusterShmemInit();
 }
 
 /*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index fa66b8017e..a6dda9b520 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1299,6 +1299,17 @@ ProcessUtilitySlow(ParseState *pstate,
                     lockmode = AlterTableGetLockLevel(atstmt->cmds);
                     relid = AlterTableLookupRelation(atstmt, lockmode);
 
+                    /*
+                     * If lockmode allows, check if VACUUM FULL / CLUSTER
+                     * CONCURRENT is in progress. If lockmode is too weak,
+                     * cluster_rel() should detect incompatible DDLs executed
+                     * by us.
+                     *
+                     * XXX We might skip the changes for DDLs which do not
+                     * change the tuple descriptor.
+                     */
+                    check_for_concurrent_cluster(relid, lockmode);
+
                     if (OidIsValid(relid))
                     {
                         AlterTableUtilityContext atcontext;
diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index e7c8bfba94..c52ec92a97 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -163,3 +163,19 @@ pgstat_progress_end_command(void)
     beentry->st_progress.command_target = InvalidOid;
     PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
+
+void
+pgstat_progress_restore_state(PgBackendProgress *backup)
+{
+    volatile PgBackendStatus *beentry = MyBEEntry;
+
+    if (!beentry || !pgstat_track_activities)
+        return;
+
+    PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+    beentry->st_progress.command = backup->command;
+    beentry->st_progress.command_target = backup->command_target;
+    memcpy(MyBEEntry->st_progress.param, backup->param,
+           sizeof(beentry->st_progress.param));
+    PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index db37beeaae..8245be7846 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -345,6 +345,7 @@ WALSummarizer    "Waiting to read or update WAL summarization state."
 DSMRegistry    "Waiting to read or update the dynamic shared memory registry."
 InjectionPoint    "Waiting to read or update information related to injection points."
 SerialControl    "Waiting to read or update shared <filename>pg_serial</filename> state."
+ClusteredRels    "Waiting to read or update information on tables being clustered concurrently."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 603aa4157b..5a2d5d6138 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -1373,6 +1373,28 @@ CacheInvalidateRelcache(Relation relation)
     RegisterRelcacheInvalidation(databaseId, relationId);
 }
 
+/*
+ * CacheInvalidateRelcacheImmediate
+ *        Send invalidation message for the specified relation's relcache entry.
+ *
+ * Currently this is used in VACUUM FULL/CLUSTER CONCURRENTLY, to make sure
+ * that other backends are aware that the command is being executed for the
+ * relation.
+ */
+void
+CacheInvalidateRelcacheImmediate(Relation relation)
+{
+    SharedInvalidationMessage msg;
+
+    msg.rc.id = SHAREDINVALRELCACHE_ID;
+    msg.rc.dbId = MyDatabaseId;
+    msg.rc.relId = RelationGetRelid(relation);
+    /* check AddCatcacheInvalidationMessage() for an explanation */
+    VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+    SendSharedInvalidMessages(&msg, 1);
+}
+
 /*
  * CacheInvalidateRelcacheAll
  *        Register invalidation of the whole relcache at the end of command.
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 66ed24e401..708d1ee27a 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -64,6 +64,7 @@
 #include "catalog/pg_type.h"
 #include "catalog/schemapg.h"
 #include "catalog/storage.h"
+#include "commands/cluster.h"
 #include "commands/policy.h"
 #include "commands/publicationcmds.h"
 #include "commands/trigger.h"
@@ -1257,6 +1258,10 @@ retry:
     /* make sure relation is marked as having no open file yet */
     relation->rd_smgr = NULL;
 
+    /* Is CLUSTER CONCURRENTLY in progress? */
+    relation->rd_cluster_concurrent =
+        is_concurrent_cluster_in_progress(targetRelId);
+
     /*
      * now we can free the memory allocated for pg_class_tuple
      */
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 7d2b34d4f2..6be0fef84c 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -155,9 +155,7 @@ typedef struct ExportedSnapshot
 static List *exportedSnapshots = NIL;
 
 /* Prototypes for local functions */
-static Snapshot CopySnapshot(Snapshot snapshot);
 static void UnregisterSnapshotNoOwner(Snapshot snapshot);
-static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
 
 /* ResourceOwner callbacks to track snapshot references */
@@ -570,7 +568,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
  * The copy is palloc'd in TopTransactionContext and has initial refcounts set
  * to 0.  The returned snapshot has the copied flag set.
  */
-static Snapshot
+Snapshot
 CopySnapshot(Snapshot snapshot)
 {
     Snapshot    newsnap;
@@ -626,7 +624,7 @@ CopySnapshot(Snapshot snapshot)
  * FreeSnapshot
  *        Free the memory associated with a snapshot.
  */
-static void
+void
 FreeSnapshot(Snapshot snapshot)
 {
     Assert(snapshot->regd_count == 0);
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d453e224d9..6cab6ed5ee 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2787,7 +2787,7 @@ psql_completion(const char *text, int start, int end)
          * one word, so the above test is correct.
          */
         if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
-            COMPLETE_WITH("VERBOSE");
+            COMPLETE_WITH("VERBOSE", "CONCURRENTLY");
     }
 
 /* COMMENT */
@@ -4764,7 +4764,8 @@ psql_completion(const char *text, int start, int end)
                           "DISABLE_PAGE_SKIPPING", "SKIP_LOCKED",
                           "INDEX_CLEANUP", "PROCESS_MAIN", "PROCESS_TOAST",
                           "TRUNCATE", "PARALLEL", "SKIP_DATABASE_STATS",
-                          "ONLY_DATABASE_STATS", "BUFFER_USAGE_LIMIT");
+                          "ONLY_DATABASE_STATS", "BUFFER_USAGE_LIMIT",
+                          "CONCURRENTLY");
         else if
(TailMatches("FULL|FREEZE|ANALYZE|VERBOSE|DISABLE_PAGE_SKIPPING|SKIP_LOCKED|PROCESS_MAIN|PROCESS_TOAST|TRUNCATE|SKIP_DATABASE_STATS|ONLY_DATABASE_STATS"))
             COMPLETE_WITH("ON", "OFF");
         else if (TailMatches("INDEX_CLEANUP"))
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec88a6..e87eb2f861 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -316,21 +316,24 @@ extern BulkInsertState GetBulkInsertState(void);
 extern void FreeBulkInsertState(BulkInsertState);
 extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
-extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-                        int options, BulkInsertState bistate);
+extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+                        CommandId cid, int options, BulkInsertState bistate);
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
                               int ntuples, CommandId cid, int options,
                               BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
-                             CommandId cid, Snapshot crosscheck, bool wait,
-                             struct TM_FailureData *tmfd, bool changingPart);
+                             TransactionId xid, CommandId cid,
+                             Snapshot crosscheck, bool wait,
+                             struct TM_FailureData *tmfd, bool changingPart,
+                             bool wal_logical);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
-                             HeapTuple newtup,
+                             HeapTuple newtup, TransactionId xid,
                              CommandId cid, Snapshot crosscheck, bool wait,
                              struct TM_FailureData *tmfd, LockTupleMode *lockmode,
-                             TU_UpdateIndexes *update_indexes);
+                             TU_UpdateIndexes *update_indexes,
+                             bool wal_logical);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
                                  CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
                                  bool follow_updates,
@@ -405,6 +408,10 @@ extern HTSV_Result HeapTupleSatisfiesVacuumHorizon(HeapTuple htup, Buffer buffer
                                                    TransactionId *dead_after);
 extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
                                  uint16 infomask, TransactionId xid);
+extern bool HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot,
+                                  Buffer buffer);
+extern bool HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot,
+                                    Buffer buffer);
 extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple);
 extern bool HeapTupleIsSurelyDead(HeapTuple htup,
                                   struct GlobalVisState *vistest);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 42736f37e7..1c5cb7c728 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -103,6 +103,8 @@
 #define XLH_DELETE_CONTAINS_OLD_KEY                (1<<2)
 #define XLH_DELETE_IS_SUPER                        (1<<3)
 #define XLH_DELETE_IS_PARTITION_MOVE            (1<<4)
+/* See heap_delete() */
+#define XLH_DELETE_NO_LOGICAL                    (1<<5)
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLH_DELETE_CONTAINS_OLD                        \
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index da661289c1..1380ba81fc 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -21,6 +21,7 @@
 #include "access/sdir.h"
 #include "access/xact.h"
 #include "executor/tuptable.h"
+#include "replication/logical.h"
 #include "storage/read_stream.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
@@ -630,6 +631,8 @@ typedef struct TableAmRoutine
                                               Relation OldIndex,
                                               bool use_sort,
                                               TransactionId OldestXmin,
+                                              Snapshot snapshot,
+                                              LogicalDecodingContext *decoding_ctx,
                                               TransactionId *xid_cutoff,
                                               MultiXactId *multi_cutoff,
                                               double *num_tuples,
@@ -1667,6 +1670,10 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator)
  *   not needed for the relation's AM
  * - *xid_cutoff - ditto
  * - *multi_cutoff - ditto
+ * - snapshot - if != NULL, ignore data changes done by transactions that this
+ *     (MVCC) snapshot considers still in-progress or in the future.
+ * - decoding_ctx - logical decoding context, to capture concurrent data
+ *   changes.
  *
  * Output parameters:
  * - *xid_cutoff - rel's new relfrozenxid value, may be invalid
@@ -1679,6 +1686,8 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
                                 Relation OldIndex,
                                 bool use_sort,
                                 TransactionId OldestXmin,
+                                Snapshot snapshot,
+                                LogicalDecodingContext *decoding_ctx,
                                 TransactionId *xid_cutoff,
                                 MultiXactId *multi_cutoff,
                                 double *num_tuples,
@@ -1687,6 +1696,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
 {
     OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex,
                                                     use_sort, OldestXmin,
+                                                    snapshot, decoding_ctx,
                                                     xid_cutoff, multi_cutoff,
                                                     num_tuples, tups_vacuumed,
                                                     tups_recently_dead);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 6d4439f052..e0016631f6 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -476,6 +476,8 @@ extern Size EstimateTransactionStateSpace(void);
 extern void SerializeTransactionState(Size maxsize, char *start_address);
 extern void StartParallelWorkerTransaction(char *tstatespace);
 extern void EndParallelWorkerTransaction(void);
+extern void SetClusterCurrentXids(TransactionId *xip, int xcnt);
+extern void ResetClusterCurrentXids(void);
 extern bool IsTransactionBlock(void);
 extern bool IsTransactionOrTransactionBlock(void);
 extern char TransactionBlockStatusCode(void);
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 7d434f8e65..77d522561b 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -99,6 +99,9 @@ extern Oid    index_concurrently_create_copy(Relation heapRelation,
                                            Oid tablespaceOid,
                                            const char *newName);
 
+extern NullableDatum *get_index_stattargets(Oid indexid,
+                                            IndexInfo *indInfo);
+
 extern void index_concurrently_build(Oid heapRelationId,
                                      Oid indexRelationId);
 
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 7492796ea2..f98b855f21 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -13,10 +13,15 @@
 #ifndef CLUSTER_H
 #define CLUSTER_H
 
+#include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
+#include "replication/logical.h"
 #include "storage/lock.h"
+#include "storage/relfilelocator.h"
 #include "utils/relcache.h"
+#include "utils/resowner.h"
+#include "utils/tuplestore.h"
 
 
 /* flag bits for ClusterParams->options */
@@ -24,6 +29,7 @@
 #define CLUOPT_RECHECK 0x02        /* recheck relation state */
 #define CLUOPT_RECHECK_ISCLUSTERED 0x04 /* recheck relation state for
                                          * indisclustered */
+#define CLUOPT_CONCURRENT 0x08    /* allow concurrent data changes */
 
 /* options for CLUSTER */
 typedef struct ClusterParams
@@ -31,12 +37,114 @@ typedef struct ClusterParams
     bits32        options;        /* bitmask of CLUOPT_* */
 } ClusterParams;
 
+/*
+ * The following definitions are used for concurrent processing.
+ */
+
+/*
+ * Lock level for the concurrent variant of CLUSTER / VACUUM FULL.
+ *
+ * Like for lazy VACUUM, we choose the strongest lock that still allows
+ * INSERT, UPDATE and DELETE.
+ *
+ * Note that the lock needs to be released temporarily a few times during the
+ * processing. In such cases it should be checked after re-locking that the
+ * relation / index hasn't changed in the system catalog while the lock was
+ * not held.
+ */
+#define LOCK_CLUSTER_CONCURRENT    ShareUpdateExclusiveLock
+
+typedef enum
+{
+    CHANGE_INSERT,
+    CHANGE_UPDATE_OLD,
+    CHANGE_UPDATE_NEW,
+    CHANGE_DELETE,
+    CHANGE_TRUNCATE
+} ConcurrentChangeKind;
+
+typedef struct ConcurrentChange
+{
+    /* See the enum above. */
+    ConcurrentChangeKind kind;
+
+    /* Transaction that changes the data. */
+    TransactionId    xid;
+
+    /*
+     * Historic catalog snapshot that was used to decode this change.
+     */
+    Snapshot    snapshot;
+
+    /*
+     * The actual tuple.
+     *
+     * The tuple data follows the ConcurrentChange structure. Before use make
+     * sure the tuple is correctly aligned (ConcurrentChange can be stored as
+     * bytea) and that tuple->t_data is fixed.
+     */
+    HeapTupleData tup_data;
+} ConcurrentChange;
+
+/*
+ * Logical decoding state.
+ *
+ * Here we store the data changes that we decode from WAL while the table
+ * contents is being copied to a new storage. Also the necessary metadata
+ * needed to apply these changes to the table is stored here.
+ */
+typedef struct ClusterDecodingState
+{
+    /* The relation whose changes we're decoding. */
+    Oid            relid;
+
+    /*
+     * Decoded changes are stored here. Although we try to avoid excessive
+     * batches, it can happen that the changes need to be stored to disk. The
+     * tuplestore does this transparently.
+     */
+    Tuplestorestate *tstore;
+
+    /* The current number of changes in tstore. */
+    double        nchanges;
+
+    /*
+     * Descriptor to store the ConcurrentChange structure serialized (bytea).
+     * We can't store the tuple directly because tuplestore only supports
+     * minimum tuple and we may need to transfer OID system column from the
+     * output plugin. Also we need to transfer the change kind, so it's better
+     * to put everything in the structure than to use 2 tuplestores "in
+     * parallel".
+     */
+    TupleDesc    tupdesc_change;
+
+    /* Tuple descriptor needed to update indexes. */
+    TupleDesc    tupdesc;
+
+    /* Slot to retrieve data from tstore. */
+    TupleTableSlot *tsslot;
+
+    /*
+     * Historic catalog snapshot that was used to decode the most recent
+     * change.
+     */
+    Snapshot    snapshot;
+    /* LSN of the record  */
+    XLogRecPtr    snapshot_lsn;
+
+    ResourceOwner resowner;
+} ClusterDecodingState;
+
 extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel);
-extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params);
+extern void cluster_rel(Relation OldHeap, Oid indexOid,    ClusterParams *params,
+                        bool isTopLevel);
 extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
                                        LOCKMODE lockmode);
 extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
-
+extern bool check_relation_is_clusterable_concurrently(Relation rel, int elevel,
+                                                       const char *stmt);
+extern void cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
+                                              XLogRecPtr end_of_wal);
 extern Oid    make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
                           char relpersistence, LOCKMODE lockmode_old,
                           LOCKMODE *lockmode_new_p);
@@ -45,8 +153,13 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
                              bool swap_toast_by_content,
                              bool check_constraints,
                              bool is_internal,
+                             bool reindex,
                              TransactionId frozenXid,
                              MultiXactId cutoffMulti,
                              char newrelpersistence);
 
+extern Size ClusterShmemSize(void);
+extern void ClusterShmemInit(void);
+extern bool is_concurrent_cluster_in_progress(Oid relid);
+extern void check_for_concurrent_cluster(Oid relid, LOCKMODE lockmode);
 #endif                            /* CLUSTER_H */
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 5616d64523..03e3712ede 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -59,19 +59,22 @@
 #define PROGRESS_CLUSTER_PHASE                    1
 #define PROGRESS_CLUSTER_INDEX_RELID            2
 #define PROGRESS_CLUSTER_HEAP_TUPLES_SCANNED    3
-#define PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN    4
-#define PROGRESS_CLUSTER_TOTAL_HEAP_BLKS        5
-#define PROGRESS_CLUSTER_HEAP_BLKS_SCANNED        6
-#define PROGRESS_CLUSTER_INDEX_REBUILD_COUNT    7
+#define PROGRESS_CLUSTER_HEAP_TUPLES_INSERTED    4
+#define PROGRESS_CLUSTER_HEAP_TUPLES_UPDATED    5
+#define PROGRESS_CLUSTER_HEAP_TUPLES_DELETED    6
+#define PROGRESS_CLUSTER_TOTAL_HEAP_BLKS        7
+#define PROGRESS_CLUSTER_HEAP_BLKS_SCANNED        8
+#define PROGRESS_CLUSTER_INDEX_REBUILD_COUNT    9
 
 /* Phases of cluster (as advertised via PROGRESS_CLUSTER_PHASE) */
 #define PROGRESS_CLUSTER_PHASE_SEQ_SCAN_HEAP    1
 #define PROGRESS_CLUSTER_PHASE_INDEX_SCAN_HEAP    2
 #define PROGRESS_CLUSTER_PHASE_SORT_TUPLES        3
 #define PROGRESS_CLUSTER_PHASE_WRITE_NEW_HEAP    4
-#define PROGRESS_CLUSTER_PHASE_SWAP_REL_FILES    5
-#define PROGRESS_CLUSTER_PHASE_REBUILD_INDEX    6
-#define PROGRESS_CLUSTER_PHASE_FINAL_CLEANUP    7
+#define PROGRESS_CLUSTER_PHASE_CATCH_UP            5
+#define PROGRESS_CLUSTER_PHASE_SWAP_REL_FILES    6
+#define PROGRESS_CLUSTER_PHASE_REBUILD_INDEX    7
+#define PROGRESS_CLUSTER_PHASE_FINAL_CLEANUP    8
 
 /* Commands of PROGRESS_CLUSTER */
 #define PROGRESS_CLUSTER_COMMAND_CLUSTER        1
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 759f9a87d3..2f693e0fc0 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -181,13 +181,16 @@ typedef struct VacAttrStats
 #define VACOPT_ANALYZE 0x02        /* do ANALYZE */
 #define VACOPT_VERBOSE 0x04        /* output INFO instrumentation messages */
 #define VACOPT_FREEZE 0x08        /* FREEZE option */
-#define VACOPT_FULL 0x10        /* FULL (non-concurrent) vacuum */
-#define VACOPT_SKIP_LOCKED 0x20 /* skip if cannot get lock */
-#define VACOPT_PROCESS_MAIN 0x40    /* process main relation */
-#define VACOPT_PROCESS_TOAST 0x80    /* process the TOAST table, if any */
-#define VACOPT_DISABLE_PAGE_SKIPPING 0x100    /* don't skip any pages */
-#define VACOPT_SKIP_DATABASE_STATS 0x200    /* skip vac_update_datfrozenxid() */
-#define VACOPT_ONLY_DATABASE_STATS 0x400    /* only vac_update_datfrozenxid() */
+#define VACOPT_FULL_EXCLUSIVE 0x10    /* FULL (non-concurrent) vacuum */
+#define VACOPT_FULL_CONCURRENT 0x20    /* FULL (concurrent) vacuum */
+#define VACOPT_SKIP_LOCKED 0x40 /* skip if cannot get lock */
+#define VACOPT_PROCESS_MAIN 0x80    /* process main relation */
+#define VACOPT_PROCESS_TOAST 0x100    /* process the TOAST table, if any */
+#define VACOPT_DISABLE_PAGE_SKIPPING 0x200    /* don't skip any pages */
+#define VACOPT_SKIP_DATABASE_STATS 0x400    /* skip vac_update_datfrozenxid() */
+#define VACOPT_ONLY_DATABASE_STATS 0x800    /* only vac_update_datfrozenxid() */
+
+#define VACOPT_FULL (VACOPT_FULL_EXCLUSIVE | VACOPT_FULL_CONCURRENT)
 
 /*
  * Values used by index_cleanup and truncate params.
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a3360a1c5e..abbfb616ce 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -68,6 +68,8 @@ extern void FreeSnapshotBuilder(SnapBuild *builder);
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
 
 extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
+extern Snapshot SnapBuildInitialSnapshotForCluster(SnapBuild *builder);
+extern Snapshot SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place);
 extern const char *SnapBuildExportSnapshot(SnapBuild *builder);
 extern void SnapBuildClearExportedSnapshot(void);
 extern void SnapBuildResetExportedSnapshotState(void);
diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h
index 934ba84f6a..cac3d7f8c7 100644
--- a/src/include/storage/lockdefs.h
+++ b/src/include/storage/lockdefs.h
@@ -36,7 +36,7 @@ typedef int LOCKMODE;
 #define AccessShareLock            1    /* SELECT */
 #define RowShareLock            2    /* SELECT FOR UPDATE/FOR SHARE */
 #define RowExclusiveLock        3    /* INSERT, UPDATE, DELETE */
-#define ShareUpdateExclusiveLock 4    /* VACUUM (non-FULL), ANALYZE, CREATE
+#define ShareUpdateExclusiveLock 4    /* VACUUM (non-exclusive), ANALYZE, CREATE
                                      * INDEX CONCURRENTLY */
 #define ShareLock                5    /* CREATE INDEX (WITHOUT CONCURRENTLY) */
 #define ShareRowExclusiveLock    6    /* like EXCLUSIVE MODE, but allows ROW
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 6a2f64c54f..a5f59b6c12 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
 PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
+PG_LWLOCK(53, ClusteredRels)
diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h
index e09598eafc..5ab5df9d41 100644
--- a/src/include/utils/backend_progress.h
+++ b/src/include/utils/backend_progress.h
@@ -35,7 +35,7 @@ typedef enum ProgressCommandType
 
 /*
  * Any command which wishes can advertise that it is running by setting
- * command, command_target, and param[].  command_target should be the OID of
+ * ommand, command_target, and param[].  command_target should be the OID of
  * the relation which the command targets (we assume there's just one, as this
  * is meant for utility commands), but the meaning of each element in the
  * param array is command-specific.
@@ -55,6 +55,7 @@ extern void pgstat_progress_parallel_incr_param(int index, int64 incr);
 extern void pgstat_progress_update_multi_param(int nparam, const int *index,
                                                const int64 *val);
 extern void pgstat_progress_end_command(void);
+extern void pgstat_progress_restore_state(PgBackendProgress *backup);
 
 
 #endif                            /* BACKEND_PROGRESS_H */
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 24695facf2..4acf9d0ed9 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -42,6 +42,8 @@ extern void CacheInvalidateCatalog(Oid catalogId);
 
 extern void CacheInvalidateRelcache(Relation relation);
 
+extern void CacheInvalidateRelcacheImmediate(Relation relation);
+
 extern void CacheInvalidateRelcacheAll(void);
 
 extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 8700204953..adda46c985 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -253,6 +253,9 @@ typedef struct RelationData
     bool        pgstat_enabled; /* should relation stats be counted */
     /* use "struct" here to avoid needing to include pgstat.h: */
     struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+
+    /* Is CLUSTER CONCURRENTLY being performed on this relation? */
+    bool    rd_cluster_concurrent;
 } RelationData;
 
 
@@ -684,7 +687,9 @@ RelationCloseSmgr(Relation relation)
 #define RelationIsAccessibleInLogicalDecoding(relation) \
     (XLogLogicalInfoActive() && \
      RelationNeedsWAL(relation) && \
-     (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))
+     (IsCatalogRelation(relation) || \
+      RelationIsUsedAsCatalogTable(relation) || \
+      (relation)->rd_cluster_concurrent))
 
 /*
  * RelationIsLogicallyLogged
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 9398a84051..f58c9108fc 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -68,6 +68,9 @@ extern Snapshot GetLatestSnapshot(void);
 extern void SnapshotSetCommandId(CommandId curcid);
 extern Snapshot GetOldestSnapshot(void);
 
+extern Snapshot CopySnapshot(Snapshot snapshot);
+extern void FreeSnapshot(Snapshot snapshot);
+
 extern Snapshot GetCatalogSnapshot(Oid relid);
 extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid);
 extern void InvalidateCatalogSnapshot(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 4c789279e5..22cb0702dc 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1958,17 +1958,20 @@ pg_stat_progress_cluster| SELECT s.pid,
             WHEN 2 THEN 'index scanning heap'::text
             WHEN 3 THEN 'sorting tuples'::text
             WHEN 4 THEN 'writing new heap'::text
-            WHEN 5 THEN 'swapping relation files'::text
-            WHEN 6 THEN 'rebuilding index'::text
-            WHEN 7 THEN 'performing final cleanup'::text
+            WHEN 5 THEN 'catch-up'::text
+            WHEN 6 THEN 'swapping relation files'::text
+            WHEN 7 THEN 'rebuilding index'::text
+            WHEN 8 THEN 'performing final cleanup'::text
             ELSE NULL::text
         END AS phase,
     (s.param3)::oid AS cluster_index_relid,
     s.param4 AS heap_tuples_scanned,
-    s.param5 AS heap_tuples_written,
-    s.param6 AS heap_blks_total,
-    s.param7 AS heap_blks_scanned,
-    s.param8 AS index_rebuild_count
+    s.param5 AS heap_tuples_inserted,
+    s.param6 AS heap_tuples_updated,
+    s.param7 AS heap_tuples_deleted,
+    s.param8 AS heap_blks_total,
+    s.param9 AS heap_blks_scanned,
+    s.param10 AS index_rebuild_count
    FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5,
param6,param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18,
param19,param20) 
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_copy| SELECT s.pid,
-- 
2.45.2

From 8acfb903cb62baabea2b32174ce98b78d840e068 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 9 Jul 2024 17:46:00 +0200
Subject: [PATCH 4/4] Call logical_rewrite_heap_tuple() when applying
 concurrent data changes.

This was implemented for the sake of completeness, but I think it's currently
not needed. Possible use cases could be:

1. VACUUM FULL / CLUSTER CONCURRENTLY can process system catalogs.

System catalogs are scanned using a historic snapshot during logical decoding,
and the "combo CIDs" information is needed for that. Since "combo CID" is
associated with the "file locator" and that locator is changed by VACUUM FULL
/ CLUSTER, these commands must record the information on individual tuples
being moved from the old file to the new one. This is what
logical_rewrite_heap_tuple() does.

However, the logical decoding subsystem currently does not support decoding of
data changes in the system catalog. Therefore, the CONCURRENTLY option cannot
be used for system catalogs.

2. VACUUM FULL / CLUSTER CONCURRENTLY is processing a relation, but once it
has released all the locks (in order to get the exclusive lock), another
backend runs VACUUM FULL / CLUSTER CONCURRENTLY on the same table. Since the
relation is treated as a system catalog while these commands are processing it
(so it can be scanned using a historic snapshot during the "initial load"), it
is important that the 2nd backend does not break decoding of the "combo CIDs"
performed by the 1st backend.

However, it's not practical to let multiple backends run VACUUM FULL / CLUSTER
CONCURRENTLY on the same relation, so we forbid that.
---
 src/backend/access/heap/heapam_handler.c      |   2 +-
 src/backend/access/heap/rewriteheap.c         |  65 ++++++-----
 src/backend/commands/cluster.c                | 102 ++++++++++++++----
 src/backend/replication/logical/decode.c      |  41 ++++++-
 .../pgoutput_cluster/pgoutput_cluster.c       |  21 ++--
 src/include/access/rewriteheap.h              |   5 +-
 src/include/commands/cluster.h                |   3 +
 src/include/replication/reorderbuffer.h       |   7 ++
 8 files changed, 187 insertions(+), 59 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 02fd6d2983..cccfff62bd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -735,7 +735,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
     /* Initialize the rewrite operation */
     rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
-                                 *multi_cutoff);
+                                 *multi_cutoff, true);
 
 
     /* Set up sorting if wanted */
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 473f3aa9be..050c8306da 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -214,10 +214,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup);
 
 /* internal logical remapping prototypes */
 static void logical_begin_heap_rewrite(RewriteState state);
-static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
 static void logical_end_heap_rewrite(RewriteState state);
 
-
 /*
  * Begin a rewrite of a table
  *
@@ -226,18 +224,19 @@ static void logical_end_heap_rewrite(RewriteState state);
  * oldest_xmin    xid used by the caller to determine which tuples are dead
  * freeze_xid    xid before which tuples will be frozen
  * cutoff_multi    multixact before which multis will be removed
+ * tid_chains    need to maintain TID chains?
  *
  * Returns an opaque RewriteState, allocated in current memory context,
  * to be used in subsequent calls to the other functions.
  */
 RewriteState
 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
-                   TransactionId freeze_xid, MultiXactId cutoff_multi)
+                   TransactionId freeze_xid, MultiXactId cutoff_multi,
+                   bool tid_chains)
 {
     RewriteState state;
     MemoryContext rw_cxt;
     MemoryContext old_cxt;
-    HASHCTL        hash_ctl;
 
     /*
      * To ease cleanup, make a separate context that will contain the
@@ -262,29 +261,34 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
     state->rs_cxt = rw_cxt;
     state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
 
-    /* Initialize hash tables used to track update chains */
-    hash_ctl.keysize = sizeof(TidHashKey);
-    hash_ctl.entrysize = sizeof(UnresolvedTupData);
-    hash_ctl.hcxt = state->rs_cxt;
-
-    state->rs_unresolved_tups =
-        hash_create("Rewrite / Unresolved ctids",
-                    128,        /* arbitrary initial size */
-                    &hash_ctl,
-                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-    hash_ctl.entrysize = sizeof(OldToNewMappingData);
+    if (tid_chains)
+    {
+        HASHCTL        hash_ctl;
+
+        /* Initialize hash tables used to track update chains */
+        hash_ctl.keysize = sizeof(TidHashKey);
+        hash_ctl.entrysize = sizeof(UnresolvedTupData);
+        hash_ctl.hcxt = state->rs_cxt;
+
+        state->rs_unresolved_tups =
+            hash_create("Rewrite / Unresolved ctids",
+                        128,        /* arbitrary initial size */
+                        &hash_ctl,
+                        HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+        hash_ctl.entrysize = sizeof(OldToNewMappingData);
+
+        state->rs_old_new_tid_map =
+            hash_create("Rewrite / Old to new tid map",
+                        128,        /* arbitrary initial size */
+                        &hash_ctl,
+                        HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+    }
 
-    state->rs_old_new_tid_map =
-        hash_create("Rewrite / Old to new tid map",
-                    128,        /* arbitrary initial size */
-                    &hash_ctl,
-                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+    logical_begin_heap_rewrite(state);
 
     MemoryContextSwitchTo(old_cxt);
 
-    logical_begin_heap_rewrite(state);
-
     return state;
 }
 
@@ -303,12 +307,15 @@ end_heap_rewrite(RewriteState state)
      * Write any remaining tuples in the UnresolvedTups table. If we have any
      * left, they should in fact be dead, but let's err on the safe side.
      */
-    hash_seq_init(&seq_status, state->rs_unresolved_tups);
-
-    while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+    if (state->rs_unresolved_tups)
     {
-        ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
-        raw_heap_insert(state, unresolved->tuple);
+        hash_seq_init(&seq_status, state->rs_unresolved_tups);
+
+        while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+        {
+            ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+            raw_heap_insert(state, unresolved->tuple);
+        }
     }
 
     /* Write the last page, if any */
@@ -995,7 +1002,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
  * Perform logical remapping for a tuple that's mapped from old_tid to
  * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
  */
-static void
+void
 logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
                            HeapTuple new_tuple)
 {
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 6397f7f8c4..42e8118b7d 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -21,6 +21,7 @@
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/relscan.h"
+#include "access/rewriteheap.h"
 #include "access/tableam.h"
 #include "access/toast_internals.h"
 #include "access/transam.h"
@@ -179,17 +180,21 @@ static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 static HeapTuple get_changed_tuple(ConcurrentChange *change);
 static void apply_concurrent_changes(ClusterDecodingState *dstate,
                                      Relation rel, ScanKey key, int nkeys,
-                                     IndexInsertState *iistate);
+                                     IndexInsertState *iistate,
+                                     RewriteState rwstate);
 static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
                                     HeapTuple tup, IndexInsertState *iistate,
-                                    TupleTableSlot *index_slot);
+                                    TupleTableSlot *index_slot,
+                                    RewriteState rwstate);
 static void apply_concurrent_update(Relation rel, HeapTuple tup,
                                     HeapTuple tup_target,
                                     ConcurrentChange *change,
                                     IndexInsertState *iistate,
-                                    TupleTableSlot *index_slot);
+                                    TupleTableSlot *index_slot,
+                                    RewriteState rwstate);
 static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-                                    ConcurrentChange *change);
+                                    ConcurrentChange *change,
+                                    RewriteState rwstate);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
                                    HeapTuple tup_key,
                                    Snapshot snapshot,
@@ -202,7 +207,8 @@ static void process_concurrent_changes(LogicalDecodingContext *ctx,
                                        Relation rel_src,
                                        ScanKey ident_key,
                                        int ident_key_nentries,
-                                       IndexInsertState *iistate);
+                                       IndexInsertState *iistate,
+                                       RewriteState rwstate);
 static IndexInsertState *get_index_insert_state(Relation relation,
                                                 Oid ident_index_id);
 static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src,
@@ -3073,7 +3079,8 @@ cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
  */
 static void
 apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
-                         ScanKey key, int nkeys, IndexInsertState *iistate)
+                         ScanKey key, int nkeys, IndexInsertState *iistate,
+                         RewriteState rwstate)
 {
     TupleTableSlot *index_slot, *ident_slot;
     HeapTuple    tup_old = NULL;
@@ -3144,7 +3151,8 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
         {
             Assert(tup_old == NULL);
 
-            apply_concurrent_insert(rel, change, tup, iistate, index_slot);
+            apply_concurrent_insert(rel, change, tup, iistate, index_slot,
+                                    rwstate);
 
             pfree(tup);
         }
@@ -3152,7 +3160,7 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
                  change->kind == CHANGE_DELETE)
         {
             IndexScanDesc    ind_scan = NULL;
-            HeapTuple    tup_key;
+            HeapTuple    tup_key, tup_exist_cp;
 
             if (change->kind == CHANGE_UPDATE_NEW)
             {
@@ -3193,11 +3201,23 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
             if (tup_exist == NULL)
                 elog(ERROR, "Failed to find target tuple");
 
+            /*
+             * Update the mapping for xmax of the old version.
+             *
+             * Use a copy ('tup_exist' can point to shared buffer) with xmin
+             * invalid because mapping of that should have been written on
+             * insertion.
+             */
+            tup_exist_cp = heap_copytuple(tup_exist);
+            HeapTupleHeaderSetXmin(tup_exist_cp->t_data, InvalidTransactionId);
+            logical_rewrite_heap_tuple(rwstate, change->old_tid, tup_exist_cp);
+            pfree(tup_exist_cp);
+
             if (change->kind == CHANGE_UPDATE_NEW)
                 apply_concurrent_update(rel, tup, tup_exist, change, iistate,
-                                        index_slot);
+                                        index_slot, rwstate);
             else
-                apply_concurrent_delete(rel, tup_exist, change);
+                apply_concurrent_delete(rel, tup_exist, change, rwstate);
 
             ResetClusterCurrentXids();
 
@@ -3238,9 +3258,12 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 
 static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
-                        IndexInsertState *iistate, TupleTableSlot *index_slot)
+                        IndexInsertState *iistate, TupleTableSlot *index_slot,
+                        RewriteState rwstate)
 {
+    HeapTupleHeader    tup_hdr = tup->t_data;
     Snapshot    snapshot = change->snapshot;
+    ItemPointerData        old_tid;
     List       *recheck;
 
     /*
@@ -3250,6 +3273,9 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
      */
     SetClusterCurrentXids(snapshot->subxip, snapshot->subxcnt);
 
+    /* Remember location in the old heap. */
+    ItemPointerCopy(&tup_hdr->t_ctid, &old_tid);
+
     /*
      * Write the tuple into the new heap.
      *
@@ -3265,6 +3291,14 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
     heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
                 HEAP_INSERT_NO_LOGICAL, NULL);
 
+    /*
+     * Update the mapping for xmin. (xmax should be invalid). This is needed
+     * because, during the processing, the table is considered an "user
+     * catalog".
+     */
+    Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+    logical_rewrite_heap_tuple(rwstate, old_tid, tup);
+
     /*
      * Update indexes.
      *
@@ -3298,16 +3332,19 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 static void
 apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
                         ConcurrentChange *change, IndexInsertState *iistate,
-                        TupleTableSlot *index_slot)
+                        TupleTableSlot *index_slot, RewriteState rwstate)
 {
     List       *recheck;
     LockTupleMode    lockmode;
     TU_UpdateIndexes    update_indexes;
-    ItemPointerData        tid_old_new_heap;
+    ItemPointerData        tid_new_old_heap, tid_old_new_heap;
     TM_Result    res;
     Snapshot snapshot    = change->snapshot;
     TM_FailureData tmfd;
 
+    /* Location of the new tuple in the old heap. */
+    ItemPointerCopy(&tup->t_data->t_ctid, &tid_new_old_heap);
+
     /* Location of the existing tuple in the new heap. */
     ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
 
@@ -3330,6 +3367,10 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
     if (res != TM_Ok)
         ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
+    /* Update the mapping for xmin of the new version. */
+    Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+    logical_rewrite_heap_tuple(rwstate, tid_new_old_heap, tup);
+
     ExecStoreHeapTuple(tup, index_slot, false);
 
     if (update_indexes != TU_None)
@@ -3353,7 +3394,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 
 static void
 apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-                        ConcurrentChange *change)
+                        ConcurrentChange *change, RewriteState rwstate)
 {
     ItemPointerData        tid_old_new_heap;
     TM_Result    res;
@@ -3444,7 +3485,8 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
 static void
 process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
                            Relation rel_dst, Relation rel_src, ScanKey ident_key,
-                           int ident_key_nentries, IndexInsertState *iistate)
+                           int ident_key_nentries, IndexInsertState *iistate,
+                           RewriteState rwstate)
 {
     ClusterDecodingState *dstate;
 
@@ -3468,7 +3510,7 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
             rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
 
         apply_concurrent_changes(dstate, rel_dst, ident_key,
-                                 ident_key_nentries, iistate);
+                                 ident_key_nentries, iistate, rwstate);
     }
     PG_FINALLY();
     {
@@ -3631,6 +3673,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
     bool        is_system_catalog;
     Oid        ident_idx_old, ident_idx_new;
     IndexInsertState *iistate;
+    RewriteState    rwstate;
     ScanKey        ident_key;
     int        ident_key_nentries;
     XLogRecPtr    wal_insert_ptr, end_of_wal;
@@ -3708,10 +3751,26 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
      * Apply concurrent changes first time, to minimize the time we need to
      * hold AccessExclusiveLock. (Quite some amount of WAL could have been
      * written during the data copying and index creation.)
+     *
+     * Now we are processing individual tuples, so pass false for
+     * 'tid_chains'. Since rwstate is now only needed for
+     * logical_begin_heap_rewrite(), none of the transaction IDs needs to be
+     * valid.
      */
+    rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+                                 InvalidTransactionId,
+                                 InvalidTransactionId,
+                                 InvalidTransactionId,
+                                 false);
     process_concurrent_changes(ctx, end_of_wal, NewHeap,
                                swap_toast_by_content ? OldHeap : NULL,
-                               ident_key, ident_key_nentries, iistate);
+                               ident_key, ident_key_nentries, iistate,
+                               rwstate);
+    /*
+     * OldHeap will be closed, so we need to initialize rwstate again for the
+     * next call of process_concurrent_changes().
+     */
+    end_heap_rewrite(rwstate);
 
     /*
      * Release the locks that allowed concurrent data changes, in order to
@@ -3833,9 +3892,16 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
     end_of_wal = GetFlushRecPtr(NULL);
 
     /* Apply the concurrent changes again. */
+    rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+                                 InvalidTransactionId,
+                                 InvalidTransactionId,
+                                 InvalidTransactionId,
+                                 false);
     process_concurrent_changes(ctx, end_of_wal, NewHeap,
                                swap_toast_by_content ? OldHeap : NULL,
-                               ident_key, ident_key_nentries, iistate);
+                               ident_key, ident_key_nentries, iistate,
+                               rwstate);
+    end_heap_rewrite(rwstate);
 
     /* Remember info about rel before closing OldHeap */
     relpersistence = OldHeap->rd_rel->relpersistence;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 066d96dea2..69a43e3510 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -951,11 +951,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     xl_heap_insert *xlrec;
     ReorderBufferChange *change;
     RelFileLocator target_locator;
+    BlockNumber        blknum;
+    HeapTupleHeader    tuphdr;
 
     xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
     /* only interested in our database */
-    XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+    XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
     if (target_locator.dbOid != ctx->slot->data.database)
         return;
 
@@ -980,6 +982,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
 
+    /*
+     * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+     * CONCURRENTLY.
+     */
+    tuphdr = change->data.tp.newtuple->t_data;
+    ItemPointerSet(&tuphdr->t_ctid, blknum, xlrec->offnum);
+
     change->data.tp.clear_toast_afterwards = true;
 
     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1001,11 +1010,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     ReorderBufferChange *change;
     char       *data;
     RelFileLocator target_locator;
+    BlockNumber        old_blknum, new_blknum;
 
     xlrec = (xl_heap_update *) XLogRecGetData(r);
 
+    /* Retrieve blknum, so that we can compose CTID below. */
+    XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum);
+
     /* only interested in our database */
-    XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     if (target_locator.dbOid != ctx->slot->data.database)
         return;
 
@@ -1022,6 +1034,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     {
         Size        datalen;
         Size        tuplelen;
+        HeapTupleHeader    tuphdr;
 
         data = XLogRecGetBlockData(r, 0, &datalen);
 
@@ -1031,6 +1044,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
 
         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
+
+        /*
+         * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+         * CONCURRENTLY.
+         */
+        tuphdr = change->data.tp.newtuple->t_data;
+        ItemPointerSet(&tuphdr->t_ctid, new_blknum, xlrec->new_offnum);
     }
 
     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
@@ -1049,6 +1069,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
     }
 
+    /*
+     * Remember the old tuple CTID, for the sake of
+     * logical_rewrite_heap_tuple().
+     */
+    if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &old_blknum, NULL))
+        old_blknum = new_blknum;
+    ItemPointerSet(&change->data.tp.old_tid, old_blknum, xlrec->old_offnum);
+
     change->data.tp.clear_toast_afterwards = true;
 
     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1067,11 +1095,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     xl_heap_delete *xlrec;
     ReorderBufferChange *change;
     RelFileLocator target_locator;
+    BlockNumber        blknum;
 
     xlrec = (xl_heap_delete *) XLogRecGetData(r);
 
     /* only interested in our database */
-    XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+    XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
     if (target_locator.dbOid != ctx->slot->data.database)
         return;
 
@@ -1103,6 +1132,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
                         datalen, change->data.tp.oldtuple);
+
+        /*
+         * CTID is needed for logical_rewrite_heap_tuple(), when doing CLUSTER
+         * CONCURRENTLY.
+         */
+        ItemPointerSet(&change->data.tp.old_tid, blknum, xlrec->offnum);
     }
 
     change->data.tp.clear_toast_afterwards = true;
diff --git a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
index 9fe44017a8..2c33fbad82 100644
--- a/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
+++ b/src/backend/replication/pgoutput_cluster/pgoutput_cluster.c
@@ -34,7 +34,7 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
                             ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
                          ConcurrentChangeKind kind, HeapTuple tuple,
-                         TransactionId xid);
+                         TransactionId xid, ItemPointer old_tid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -162,7 +162,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                 if (newtuple == NULL)
                     elog(ERROR, "Incomplete insert info.");
 
-                store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
+                store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid,
+                             NULL);
             }
             break;
         case REORDER_BUFFER_CHANGE_UPDATE:
@@ -180,10 +181,10 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
                 if (oldtuple != NULL)
                     store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
-                                 change->txn->xid);
+                                 change->txn->xid, NULL);
 
                 store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
-                             change->txn->xid);
+                             change->txn->xid, &change->data.tp.old_tid);
             }
             break;
         case REORDER_BUFFER_CHANGE_DELETE:
@@ -196,7 +197,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                 if (oldtuple == NULL)
                     elog(ERROR, "Incomplete delete info.");
 
-                store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
+                store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid,
+                             &change->data.tp.old_tid);
             }
             break;
         default:
@@ -230,13 +232,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     if (i == nrelations)
         return;
 
-    store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
+    store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId, NULL);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-             HeapTuple tuple, TransactionId xid)
+             HeapTuple tuple, TransactionId xid, ItemPointer old_tid)
 {
     ClusterDecodingState *dstate;
     char       *change_raw;
@@ -301,6 +303,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
     change->snapshot = dstate->snapshot;
     dstate->snapshot->active_count++;
 
+    if (old_tid)
+        ItemPointerCopy(old_tid, &change->old_tid);
+    else
+        ItemPointerSetInvalid(&change->old_tid);
+
     /* The data has been copied. */
     if (flattened)
         pfree(tuple);
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 5866a26bdd..de62b6abf8 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -23,11 +23,14 @@ typedef struct RewriteStateData *RewriteState;
 
 extern RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap,
                                        TransactionId oldest_xmin, TransactionId freeze_xid,
-                                       MultiXactId cutoff_multi);
+                                       MultiXactId cutoff_multi, bool tid_chains);
 extern void end_heap_rewrite(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple,
                                HeapTuple new_tuple);
 extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple);
+extern void logical_rewrite_heap_tuple(RewriteState state,
+                                       ItemPointerData old_tid,
+                                       HeapTuple new_tuple);
 
 /*
  * On-Disk data format for an individual logical rewrite mapping.
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index f98b855f21..c394ef3871 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -71,6 +71,9 @@ typedef struct ConcurrentChange
     /* Transaction that changes the data. */
     TransactionId    xid;
 
+    /* For UPDATE / DELETE, the location of the old tuple version. */
+    ItemPointerData    old_tid;
+
     /*
      * Historic catalog snapshot that was used to decode this change.
      */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 851a001c8b..1fa8f8bd6a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -99,6 +99,13 @@ typedef struct ReorderBufferChange
             HeapTuple    oldtuple;
             /* valid for INSERT || UPDATE */
             HeapTuple    newtuple;
+
+            /*
+             * CLUSTER CONCURRENTLY needs the old TID, even if the old tuple
+             * itself is not WAL-logged (i.e. when the identity key does not
+             * change).
+             */
+            ItemPointerData    old_tid;
         }            tp;
 
         /*
-- 
2.45.2


Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Antonin Houska <ah@cybertec.at> wrote:

> Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
> 
> > > Is your plan to work on it soon or should I try to write a draft patch? (I
> > > assume this is for PG >= 18.)
> > 
> > I don't have plans for it, so if you have resources, please go for it.
> 
> The first version is attached. The actual feature is in 0003. 0004 is probably
> not necessary now, but I haven't realized until I coded it.

The mailing list archive indicates something is wrong with the 0003
attachment. Sending it all again, as *.tar.

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com


Вложения

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Alvaro Herrera
Дата:
On 2024-Jul-09, Antonin Houska wrote:

> Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
> 
> > > Is your plan to work on it soon or should I try to write a draft patch? (I
> > > assume this is for PG >= 18.)
> > 
> > I don't have plans for it, so if you have resources, please go for it.
> 
> The first version is attached. The actual feature is in 0003. 0004 is probably
> not necessary now, but I haven't realized until I coded it.

Thank you, this is great.  I'll be studying this during the next
commitfest.


BTW I can apply 0003 from this email perfectly fine, but you're right
that the archives don't show the file name.  I suspect the
"Content-Disposition: inline" PLUS the Content-Type text/plain are what
cause the problem -- for instance, [1] doesn't have a problem and they
do have inline content disposition, but the content-type is not
text/plain.  In any case, I encourage you not to send patches as
tarballs :-)

[1]  https://postgr.es/m/32781.1714378236@antos

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
"La primera ley de las demostraciones en vivo es: no trate de usar el sistema.
Escriba un guión que no toque nada para no causar daños." (Jakob Nielsen)



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:

> On 2024-Jul-09, Antonin Houska wrote:
>
> > Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:
> >
> > > > Is your plan to work on it soon or should I try to write a draft patch? (I
> > > > assume this is for PG >= 18.)
> > >
> > > I don't have plans for it, so if you have resources, please go for it.
> >
> > The first version is attached. The actual feature is in 0003. 0004 is probably
> > not necessary now, but I haven't realized until I coded it.
>
> Thank you, this is great.  I'll be studying this during the next
> commitfest.

Thanks. I'll register it in the CF application.


> BTW I can apply 0003 from this email perfectly fine, but you're right
> that the archives don't show the file name.  I suspect the
> "Content-Disposition: inline" PLUS the Content-Type text/plain are what
> cause the problem -- for instance, [1] doesn't have a problem and they
> do have inline content disposition, but the content-type is not
> text/plain.  In any case, I encourage you not to send patches as
> tarballs :-)
>
> [1]  https://postgr.es/m/32781.1714378236@antos

You're right, "Content-Disposition" is the problem. I forgot that "attachment"
is better for patches and my email client (emacs+nmh) defaults to
"inline". I'll pay attention next time.


--
Antonin Houska
Web: https://www.cybertec-postgresql.com



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Kirill Reshke
Дата:
Hi!
I'm interested in the vacuum concurrently feature being inside the
core, so will try to review patch set and give valuable feedback. For
now, just a few little thoughts..


> The first version is attached. The actual feature is in 0003. 0004 is probably
> not necessary now, but I haven't realized until I coded it.

The logical replication vacuum approach is a really smart idea, I like
it. As far as I understand, pg_squeeze works well in real production
databases, which
gives us hope that the vacuum concurrent feature in core will be good
too... What is the size of the biggest relation successfully vacuumed
via pg_squeeze?
Looks like in case of big relartion or high insertion load,
replication may lag and never catch up...

However, in general, the 3rd patch is really big, very hard to
comprehend.  Please consider splitting this into smaller (and
reviewable) pieces.
Also, we obviously need more tests on this. Both tap-test and
regression tests I suppose.

One more thing is about pg_squeeze background workers. They act in an
autovacuum-like fashion, aren't they? Maybe we can support this kind
of relation processing in core too?



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Pavel Stehule
Дата:
Hi

ne 21. 7. 2024 v 17:13 odesílatel Kirill Reshke <reshkekirill@gmail.com> napsal:
Hi!
I'm interested in the vacuum concurrently feature being inside the
core, so will try to review patch set and give valuable feedback. For
now, just a few little thoughts..



One more thing is about pg_squeeze background workers. They act in an
autovacuum-like fashion, aren't they? Maybe we can support this kind
of relation processing in core too?

I don't think it is necessary when this feature will be an internal feature.

I agree so this feature is very important, I proposed it (and I very happy so Tonda implemented it), but I am not sure, if usage of this should be automatized, and if it should be, then

a) probably autovacuum should do,
b) we can move a discussion after vacuum full concurrently will be merged to upstream, please. Isn't very practical to have too many open targets.

Regards

Pavel

Re: why there is not VACUUM FULL CONCURRENTLY?

От
Kirill Reshke
Дата:
> Also, we obviously need more tests on this. Both tap-test and
> regression tests I suppose.

The one simple test to this patch can be done this way:

1) create test relation (call it vac_conc_r1 for example) and fill it
with dead tuples (insert + update or insert + delete)
2) create injection point preventing concurrent vacuum from compiling.
3) run concurrent vacuum (VACUUM FULL CONCURRENTLY) in separate thread
or in some other async way.
4) Insert new data in relation to vac_conc_r1.
5) Release injection point, assert that vacuum completed successfully.
6) check that all data is present in vac_conc_r1 (data from step 1 and
from step 4).

This way we can catch some basic buggs, if some paths of VACUUM
CONCURRENTLY will be touched in the future.
The problem with this test is: i don't know how to do anything async
in current TAP tests (needed in step 3). Also, maybe test with async
interaction
may be too flappy (producing false negative flaps) to support.
Sequential test for this feature would be much better, but I can't
think of one.

Also, should we create a cf entry for this thread already?



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Michael Banck
Дата:
On Mon, Jul 22, 2024 at 01:23:03PM +0500, Kirill Reshke wrote:
> Also, should we create a cf entry for this thread already?

I was wondering about this as well, but there is one for the upcoming
commitfest already:

https://commitfest.postgresql.org/49/5117/


Michael



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Kirill Reshke
Дата:
Hi!

On Tue, 30 Jan 2024 at 15:31, Alvaro Herrera <alvherre@alvh.no-ip.org> wrote:

> FWIW a newer, more modern and more trustworthy alternative to pg_repack
> is pg_squeeze, which I discovered almost by random chance, and soon
> discovered I liked it much more.

Can you please clarify this a bit more? What is the exact reason for
pg_squeeze being more trustworthy than pg_repack?
Is there something about the logical replication approach that makes
it more bulletproof than the trigger-based repack approach?

Also, I was thinking about pg_repack vs pg_squeeze being used for the
VACUUM FULL CONCURRENTLY feature, and I'm a bit suspicious about the
latter.
If I understand correctly, we essentially parse the whole WAL to
obtain info about one particular relation changes. That may be a big
overhead, whereas the trigger approach does
not suffer from this. So, there is the chance that VACUUM FULL
CONCURRENTLY will never keep up with vacuumed relation changes. Am I
right?



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Kirill Reshke <reshkekirill@gmail.com> wrote:

> Also, I was thinking about pg_repack vs pg_squeeze being used for the
> VACUUM FULL CONCURRENTLY feature, and I'm a bit suspicious about the
> latter.

> If I understand correctly, we essentially parse the whole WAL to
> obtain info about one particular relation changes. That may be a big
> overhead,

pg_squeeze is an extension but the logical decoding is performed by the core,
so there is no way to ensure that data changes of the "other tables" are not
decoded. However, it might be possible if we integrate the functionality into
the core. I'll consider doing so in the next version of [1].

> whereas the trigger approach does not suffer from this. So, there is the
> chance that VACUUM FULL CONCURRENTLY will never keep up with vacuumed
> relation changes. Am I right?

Perhaps it can happen, but note that trigger processing is also not free and
that in this case the cost is paid by the applications. So while VACUUM FULL
CONCURRENTLY (based on logical decoding) might fail to catch-up, the trigger
based solution may slow down the applications that execute DML commands while
the table is being rewritten.

[1] https://commitfest.postgresql.org/49/5117/

-- 
Antonin Houska
Web: https://www.cybertec-postgresql.com



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Antonin Houska
Дата:
Kirill Reshke <reshkekirill@gmail.com> wrote:

> What is the size of the biggest relation successfully vacuumed
> via pg_squeeze?
> Looks like in case of big relartion or high insertion load,
> replication may lag and never catch up...

Users reports problems rather than successes, so I don't know. 400 GB was
reported in [1] but it's possible that the table size for this test was
determined based on available disk space.

I think that the amount of data changes performed during the "squeezing"
matters more than the table size. In [2] one user reported "thounsands of
UPSERTs per second", but the amount of data also depends on row size, which he
didn't mention.

pg_squeeze gives up if it fails to catch up a few times. The first version of
my patch does not check this, I'll add the corresponding code in the next
version.

> However, in general, the 3rd patch is really big, very hard to
> comprehend.  Please consider splitting this into smaller (and
> reviewable) pieces.

I'll try to move some preparation steps into separate diffs, but not sure if
that will make the main diff much smaller. I prefer self-contained patches, as
also explained in [3].

> Also, we obviously need more tests on this. Both tap-test and
> regression tests I suppose.

Sure. The next version will use the injection points to test if "concurrent
data changes" are  processed correctly.

> One more thing is about pg_squeeze background workers. They act in an
> autovacuum-like fashion, aren't they? Maybe we can support this kind
> of relation processing in core too?

Maybe later. Even just adding the CONCURRENTLY option to CLUSTER and VACUUM
FULL requires quite some effort.


[1] https://github.com/cybertec-postgresql/pg_squeeze/issues/51

[2]
https://github.com/cybertec-postgresql/pg_squeeze/issues/21#issuecomment-514495369

[3] http://peter.eisentraut.org/blog/2024/05/14/when-to-split-patches-for-postgresql

--
Antonin Houska
Web: https://www.cybertec-postgresql.com



Re: why there is not VACUUM FULL CONCURRENTLY?

От
Kirill Reshke
Дата:
On Fri, 2 Aug 2024 at 11:09, Antonin Houska <ah@cybertec.at> wrote:
>
> Kirill Reshke <reshkekirill@gmail.com> wrote:
> > However, in general, the 3rd patch is really big, very hard to
> > comprehend.  Please consider splitting this into smaller (and
> > reviewable) pieces.
>
> I'll try to move some preparation steps into separate diffs, but not sure if
> that will make the main diff much smaller. I prefer self-contained patches, as
> also explained in [3].

Thanks for sharing [3], it is a useful link.

There is actually one more case when ACCESS EXCLUSIVE is held: during
table rewrite (AT set TAM, AT set Tablespace and AT alter column type
are some examples).
This can be done CONCURRENTLY too, using the same logical replication
approach, or do I miss something?
I'm not saying we must do it immediately, this should be a separate
thread, but we can do some preparation work here.

I can see that a bunch of functions which are currently placed in
cluster.c can be moved to something like
logical_rewrite_heap.c. ConcurrentChange struct and
apply_concurrent_insert function  is one example of such.

So, if this is the case, 0003 patch can be splitted in two:
The first one is general utility code for logical table rewrite
The second one with actual VACUUM CONCURRENTLY feature.

What do you think?