Re: [HACKERS] Restricting maximum keep segments by repslots
| От | Kyotaro Horiguchi |
|---|---|
| Тема | Re: [HACKERS] Restricting maximum keep segments by repslots |
| Дата | |
| Msg-id | 20200407.163043.2050717072576572791.horikyota.ntt@gmail.com обсуждение исходный текст |
| Ответ на | Re: [HACKERS] Restricting maximum keep segments by repslots (Kyotaro Horiguchi <horikyota.ntt@gmail.com>) |
| Ответы |
Re: [HACKERS] Restricting maximum keep segments by repslots
|
| Список | pgsql-hackers |
At Tue, 07 Apr 2020 12:09:05 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in
> > it seems to me that it suffices to check restart_lsn for being invalid
> > in the couple of places where the slot's owner advances (which is the
> > two auxiliary functions for ProcessStandbyReplyMessage). I have done so
> > in the attached. There are other places where the restart_lsn is set,
> > but those seem to be used only when the slot is created. I don't think
> > we need to cover for those, but I'm not 100% sure about that.
>
> StartLogicalReplcation does
> "XLogBeginRead(,MyReplicationSlot->data.restart_lsn)". If the
> restart_lsn is invalid, following call to XLogReadRecord runs into
> assertion failure. Walsender (or StartLogicalReplication) should
> correctly reject reconnection from the subscriber if restart_lsn is
> invalid.
>
> > However, the change in PhysicalConfirmReceivedLocation() breaks
> > the way slots work for pg_basebackup: apparently the slot is created
> > with a restart_lsn of Invalid and we only advance it the first time we
> > process a feedback message from pg_basebackup. I have a vague feeling
> > that that's bogus, but I'll have to look at the involved code a little
> > bit more closely to be sure about this.
>
> Mmm. Couldn't we have a new member 'invalidated' in ReplicationSlot?
I did that in the attached. The invalidated is shared-but-not-saved
member of a slot and initialized to false then irreversibly changed to
true when the slot loses required segment.
It is checked by the new function CheckReplicationSlotInvalidated() at
acquireing a slot and at updating slot by standby reply message. This
change stops walsender without explicitly killing but I didn't remove
that code.
When logical slot loses segment, the publisher complains as:
[backend ] LOG: slot "s1" is invalidated at 0/370001C0 due to exceeding max_slot_wal_keep_size
[walsender] FATAL: terminating connection due to administrator command
The subscriber tries to reconnect and that fails as follows:
[19350] ERROR: replication slot "s1" is invalidated
[19352] ERROR: replication slot "s1" is invalidated
...
If the publisher restarts, the message is not seen and see the
following instead.
[19372] ERROR: requested WAL segment 000000010000000000000037 has already been removed
The check is done at ReplicationSlotAcquire, some slot-related SQL
functions are affected.
=# select pg_replication_slot_advance('s1', '0/37000000');
ERROR: replication slot "s1" is invalidated
After restarting the publisher, the message changes as the same with
walsender.
=# select pg_replication_slot_advance('s1', '0/380001C0');
ERROR: requested WAL segment pg_wal/000000010000000000000037 has already been removed
Since I didn't touch restart_lsn at all so no fear for changing other
behavior inadvertently.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
From 3f81c5740ea3554835bbe794820624b56c9c3ea8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Tue, 7 Apr 2020 11:09:54 +0900
Subject: [PATCH] further change type 2
---
src/backend/access/transam/xlog.c | 17 +++++----
src/backend/replication/slot.c | 59 ++++++++++++++++++++++++-----
src/backend/replication/walsender.c | 2 +
src/include/replication/slot.h | 7 ++++
4 files changed, 68 insertions(+), 17 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8f28ffaab9..c5b96126ee 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9559,20 +9559,21 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
XLByteToSeg(recptr, currSegNo, wal_segment_size);
segno = currSegNo;
+ /*
+ * Calculate how many segments are kept by slots first, adjusting
+ * for max_slot_wal_keep_size.
+ */
keep = XLogGetReplicationSlotMinimumLSN();
-
- /*
- * Calculate how many segments are kept by slots first.
- */
- /* Cap keepSegs by max_slot_wal_keep_size */
if (keep != InvalidXLogRecPtr)
{
XLByteToSeg(keep, segno, wal_segment_size);
- /* Reduce it if slots already reserves too many. */
+ /* Cap by max_slot_wal_keep_size ... */
if (max_slot_wal_keep_size_mb >= 0)
{
- XLogRecPtr slot_keep_segs =
+ XLogRecPtr slot_keep_segs;
+
+ slot_keep_segs =
ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
if (currSegNo - segno > slot_keep_segs)
@@ -9580,7 +9581,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
}
}
- /* but, keep at least wal_keep_segments segments if any */
+ /* but, keep at least wal_keep_segments if that's set */
if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
{
/* avoid underflow, don't go below 1 */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 86ddff8b9d..0a28b27607 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -277,6 +277,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
slot->data.persistency = persistency;
+ slot->invalidated = false;
/* and then data only present in shared memory */
slot->just_dirtied = false;
@@ -323,6 +324,29 @@ ReplicationSlotCreate(const char *name, bool db_specific,
ConditionVariableBroadcast(&slot->active_cv);
}
+
+/*
+ * Check if the slot is invalidated.
+ */
+void
+CheckReplicationSlotInvalidated(ReplicationSlot *slot)
+{
+ bool invalidated;
+
+ /* Take lock to read slot name for error message. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ invalidated = slot->invalidated;
+
+ /* If the slot is invalidated, error out. */
+ if (invalidated)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" is invalidated",
+ NameStr(slot->data.name))));
+
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
/*
* Find a previously created slot and mark it as used by this backend.
*/
@@ -412,6 +436,9 @@ retry:
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
+
+ /* Finally, check if the slot is invalidated */
+ CheckReplicationSlotInvalidated(slot);
}
/*
@@ -1083,25 +1110,39 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ XLogRecPtr restart_lsn = InvalidXLogRecPtr;
- if (!s->in_use || s->data.restart_lsn == InvalidXLogRecPtr)
+ if (!s->in_use)
+ continue;
+
+ if (s->invalidated)
continue;
if (s->data.restart_lsn < oldestLSN)
{
- elog(LOG, "slot %s is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
- s->data.name.data,
- (uint32) (s->data.restart_lsn >> 32),
- (uint32) s->data.restart_lsn);
- /* mark this slot as invalid */
SpinLockAcquire(&s->mutex);
- s->data.restart_lsn = InvalidXLogRecPtr;
- /* remember PID for killing, if active*/
+ /* mark this slot as invalid */
+ s->invalidated = true;
+
+ /* remember restart_lsn for logging */
+ restart_lsn = s->data.restart_lsn;
+
+ SpinLockRelease(&s->mutex);
+
+ /* remember PID for killing, if active */
if (s->active_pid != 0)
pids = lappend_int(pids, s->active_pid);
- SpinLockRelease(&s->mutex);
}
+ SpinLockRelease(&s->mutex);
+
+ if (restart_lsn != InvalidXLogRecPtr)
+ ereport(LOG,
+ errmsg("slot \"%s\" is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
+ NameStr(s->data.name),
+ (uint32) (restart_lsn >> 32),
+ (uint32) restart_lsn));
+
}
LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76ec3c7dd0..c582f34fcc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1897,6 +1897,8 @@ ProcessStandbyReplyMessage(void)
*/
if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
{
+ CheckReplicationSlotInvalidated(MyReplicationSlot);
+
if (SlotIsLogical(MyReplicationSlot))
LogicalConfirmReceivedLocation(flushPtr);
else
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 6e469ea749..f7531ca495 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -98,6 +98,9 @@ typedef struct ReplicationSlotPersistentData
* backend owning the slot does not need to take this lock when reading its
* own fields, while concurrent backends not owning this slot should take the
* lock when reading this slot's data.
+ * - The invalidated field is initially false then changed to true
+ * irreversibly by other than the owner and read by the possible next owner
+ * process after the termination of the current owner.
*/
typedef struct ReplicationSlot
{
@@ -131,6 +134,9 @@ typedef struct ReplicationSlot
/* data surviving shutdowns and crashes */
ReplicationSlotPersistentData data;
+ /* is invalidated ? */
+ bool invalidated;
+
/* is somebody performing io on this slot? */
LWLock io_in_progress_lock;
@@ -187,6 +193,7 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void);
+extern void CheckReplicationSlotInvalidated(ReplicationSlot *slot);
extern void ReplicationSlotSave(void);
extern void ReplicationSlotMarkDirty(void);
--
2.18.2
В списке pgsql-hackers по дате отправления: