From 3bbaaba53a0cb3db43cc893acbd3ffbedd61bff1 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sat, 13 Jul 2024 18:31:28 +0800 Subject: [PATCH] fix alter replication slot --- doc/src/sgml/protocol.sgml | 16 ++++++++++++++ src/backend/commands/subscriptioncmds.c | 2 +- src/backend/replication/slot.c | 16 ++++++++------ src/backend/replication/walsender.c | 29 +++++++++++++++---------- src/include/replication/slot.h | 4 ++-- 5 files changed, 46 insertions(+), 21 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 1b27d0a547..3ac4a4be28 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2206,6 +2206,22 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + TWO_PHASE [ boolean ] + + + If true, this logical replication slot supports decoding of two-phase + commit. With this option, commands related to two-phase commit such as + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + + + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 7604e228e8..c48b6d0549 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1308,7 +1308,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot disable two_phase when uncommitted prepared transactions present"), - errhint("Resolve these transactions and try again"))); + errhint("Resolve these transactions and try again."))); /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2ad6dca993..2f167a2adc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -804,11 +804,12 @@ ReplicationSlotDrop(const char *name, bool nowait) * Change the definition of the slot identified by the specified name. */ void -ReplicationSlotAlter(const char *name, bool failover, bool two_phase) +ReplicationSlotAlter(const char *name, bool *failover, bool *two_phase) { bool update_slot = false; Assert(MyReplicationSlot == NULL); + Assert(failover || two_phase); ReplicationSlotAcquire(name, false); @@ -834,7 +835,7 @@ ReplicationSlotAlter(const char *name, bool failover, bool two_phase) * Do not allow users to enable failover on the standby as we do not * support sync to the cascading standby. */ - if (failover) + if (failover && *failover) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a replication slot" @@ -845,24 +846,25 @@ ReplicationSlotAlter(const char *name, bool failover, bool two_phase) * Do not allow users to enable failover for temporary slots as we do not * support syncing temporary slots to the standby. */ - if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY) + if (failover && *failover && + MyReplicationSlot->data.persistency == RS_TEMPORARY) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a temporary replication slot")); - if (MyReplicationSlot->data.failover != failover) + if (failover && MyReplicationSlot->data.failover != *failover) { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.failover = failover; + MyReplicationSlot->data.failover = *failover; SpinLockRelease(&MyReplicationSlot->mutex); update_slot = true; } - if (MyReplicationSlot->data.two_phase != two_phase) + if (two_phase && MyReplicationSlot->data.two_phase != *two_phase) { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.two_phase = two_phase; + MyReplicationSlot->data.two_phase = *two_phase; SpinLockRelease(&MyReplicationSlot->mutex); update_slot = true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5224ea6c2c..f3b5068d95 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1411,30 +1411,31 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) */ static void ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, - bool *failover, bool *two_phase) + bool *failover_given, bool *failover, + bool *two_phase_given, bool *two_phase) { - bool failover_given = false; - bool two_phase_given = false; + *failover_given = false; + *two_phase_given = false; /* Parse options */ foreach_ptr(DefElem, defel, cmd->options) { if (strcmp(defel->defname, "failover") == 0) { - if (failover_given) + if (*failover_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - failover_given = true; + *failover_given = true; *failover = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { - if (two_phase_given) + if (*two_phase_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - two_phase_given = true; + *two_phase_given = true; *two_phase = defGetBoolean(defel); } else @@ -1448,11 +1449,17 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd) { - bool failover = false; - bool two_phase = false; + bool failover_given; + bool two_phase_given; + bool failover; + bool two_phase; + + ParseAlterReplSlotOptions(cmd, &failover_given, &failover, + &two_phase_given, &two_phase); - ParseAlterReplSlotOptions(cmd, &failover, &two_phase); - ReplicationSlotAlter(cmd->slotname, failover, two_phase); + ReplicationSlotAlter(cmd->slotname, + failover_given ? &failover : NULL, + two_phase_given ? &two_phase : NULL); } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 163a4a911a..cde164472a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -243,8 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDropAcquired(void); -extern void ReplicationSlotAlter(const char *name, bool failover, - bool two_phase); +extern void ReplicationSlotAlter(const char *name, bool *failover, + bool *two_phase); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); -- 2.30.0.windows.2