Re: [HACKERS] Restricting maximum keep segments by repslots

Поиск
Список
Период
Сортировка
От Kyotaro Horiguchi
Тема Re: [HACKERS] Restricting maximum keep segments by repslots
Дата
Msg-id 20200407.163043.2050717072576572791.horikyota.ntt@gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Ответы Re: [HACKERS] Restricting maximum keep segments by repslots  (Alvaro Herrera <alvherre@2ndquadrant.com>)
Список pgsql-hackers
At Tue, 07 Apr 2020 12:09:05 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> > it seems to me that it suffices to check restart_lsn for being invalid
> > in the couple of places where the slot's owner advances (which is the
> > two auxiliary functions for ProcessStandbyReplyMessage).  I have done so
> > in the attached.  There are other places where the restart_lsn is set,
> > but those seem to be used only when the slot is created.  I don't think
> > we need to cover for those, but I'm not 100% sure about that.
> 
> StartLogicalReplcation does
> "XLogBeginRead(,MyReplicationSlot->data.restart_lsn)". If the
> restart_lsn is invalid, following call to XLogReadRecord runs into
> assertion failure.  Walsender (or StartLogicalReplication) should
> correctly reject reconnection from the subscriber if restart_lsn is
> invalid.
> 
> > However, the change in PhysicalConfirmReceivedLocation() breaks
> > the way slots work for pg_basebackup: apparently the slot is created
> > with a restart_lsn of Invalid and we only advance it the first time we
> > process a feedback message from pg_basebackup.  I have a vague feeling
> > that that's bogus, but I'll have to look at the involved code a little
> > bit more closely to be sure about this.
> 
> Mmm. Couldn't we have a new member 'invalidated' in ReplicationSlot?

I did that in the attached. The invalidated is shared-but-not-saved
member of a slot and initialized to false then irreversibly changed to
true when the slot loses required segment.

It is checked by the new function CheckReplicationSlotInvalidated() at
acquireing a slot and at updating slot by standby reply message. This
change stops walsender without explicitly killing but I didn't remove
that code.

When logical slot loses segment, the publisher complains as:


[backend  ] LOG:  slot "s1" is invalidated at 0/370001C0 due to exceeding max_slot_wal_keep_size
[walsender] FATAL:  terminating connection due to administrator command

The subscriber tries to reconnect and that fails as follows:

[19350] ERROR:  replication slot "s1" is invalidated
[19352] ERROR:  replication slot "s1" is invalidated
...

If the publisher restarts, the message is not seen and see the
following instead.

[19372] ERROR:  requested WAL segment 000000010000000000000037 has already been removed

The check is done at ReplicationSlotAcquire, some slot-related SQL
functions are affected.

=# select pg_replication_slot_advance('s1', '0/37000000');
ERROR:  replication slot "s1" is invalidated

After restarting the publisher, the message changes as the same with
walsender.

=# select pg_replication_slot_advance('s1', '0/380001C0');
ERROR:  requested WAL segment pg_wal/000000010000000000000037 has already been removed

Since I didn't touch restart_lsn at all so no fear for changing other
behavior inadvertently.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 3f81c5740ea3554835bbe794820624b56c9c3ea8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Tue, 7 Apr 2020 11:09:54 +0900
Subject: [PATCH] further change type 2

---
 src/backend/access/transam/xlog.c   | 17 +++++----
 src/backend/replication/slot.c      | 59 ++++++++++++++++++++++++-----
 src/backend/replication/walsender.c |  2 +
 src/include/replication/slot.h      |  7 ++++
 4 files changed, 68 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8f28ffaab9..c5b96126ee 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9559,20 +9559,21 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
     XLByteToSeg(recptr, currSegNo, wal_segment_size);
     segno = currSegNo;
 
+    /*
+     * Calculate how many segments are kept by slots first, adjusting
+     * for max_slot_wal_keep_size.
+     */
     keep = XLogGetReplicationSlotMinimumLSN();
-
-    /*
-     * Calculate how many segments are kept by slots first.
-     */
-    /* Cap keepSegs by max_slot_wal_keep_size */
     if (keep != InvalidXLogRecPtr)
     {
         XLByteToSeg(keep, segno, wal_segment_size);
 
-        /* Reduce it if slots already reserves too many. */
+        /* Cap by max_slot_wal_keep_size ... */
         if (max_slot_wal_keep_size_mb >= 0)
         {
-            XLogRecPtr slot_keep_segs =
+            XLogRecPtr    slot_keep_segs;
+
+            slot_keep_segs =
                 ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
             if (currSegNo - segno > slot_keep_segs)
@@ -9580,7 +9581,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
         }
     }
 
-    /* but, keep at least wal_keep_segments segments if any */
+    /* but, keep at least wal_keep_segments if that's set */
     if (wal_keep_segments > 0 && currSegNo - segno < wal_keep_segments)
     {
         /* avoid underflow, don't go below 1 */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 86ddff8b9d..0a28b27607 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -277,6 +277,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
     StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     slot->data.persistency = persistency;
+    slot->invalidated = false;
 
     /* and then data only present in shared memory */
     slot->just_dirtied = false;
@@ -323,6 +324,29 @@ ReplicationSlotCreate(const char *name, bool db_specific,
     ConditionVariableBroadcast(&slot->active_cv);
 }
 
+
+/*
+ * Check if the slot is invalidated.
+ */
+void
+CheckReplicationSlotInvalidated(ReplicationSlot *slot)
+{
+    bool invalidated;
+
+    /* Take lock to read slot name for error message. */
+    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+    invalidated = slot->invalidated;
+
+    /* If the slot is invalidated, error out. */
+    if (invalidated)
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                 errmsg("replication slot \"%s\" is invalidated",
+                        NameStr(slot->data.name))));
+
+    LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Find a previously created slot and mark it as used by this backend.
  */
@@ -412,6 +436,9 @@ retry:
 
     /* We made this slot active, so it's ours now. */
     MyReplicationSlot = slot;
+
+    /* Finally, check if the slot is invalidated */
+    CheckReplicationSlotInvalidated(slot);
 }
 
 /*
@@ -1083,25 +1110,39 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
     for (int i = 0; i < max_replication_slots; i++)
     {
         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+        XLogRecPtr restart_lsn = InvalidXLogRecPtr;
 
-        if (!s->in_use || s->data.restart_lsn == InvalidXLogRecPtr)
+        if (!s->in_use)
+            continue;
+
+        if (s->invalidated)
             continue;
 
         if (s->data.restart_lsn < oldestLSN)
         {
-            elog(LOG, "slot %s is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
-                 s->data.name.data,
-                 (uint32) (s->data.restart_lsn >> 32),
-                 (uint32) s->data.restart_lsn);
-            /* mark this slot as invalid */
             SpinLockAcquire(&s->mutex);
-            s->data.restart_lsn = InvalidXLogRecPtr;
 
-            /* remember PID for killing, if active*/
+            /* mark this slot as invalid */
+            s->invalidated = true;
+
+            /* remember restart_lsn for logging */
+            restart_lsn = s->data.restart_lsn;
+
+            SpinLockRelease(&s->mutex);
+
+            /* remember PID for killing, if active */
             if (s->active_pid != 0)
                 pids = lappend_int(pids, s->active_pid);
-            SpinLockRelease(&s->mutex);
         }
+        SpinLockRelease(&s->mutex);
+
+        if (restart_lsn != InvalidXLogRecPtr)
+            ereport(LOG,
+                    errmsg("slot \"%s\" is invalidated at %X/%X due to exceeding max_slot_wal_keep_size",
+                           NameStr(s->data.name),
+                           (uint32) (restart_lsn >> 32),
+                           (uint32) restart_lsn));
+
     }
     LWLockRelease(ReplicationSlotControlLock);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76ec3c7dd0..c582f34fcc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1897,6 +1897,8 @@ ProcessStandbyReplyMessage(void)
      */
     if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
     {
+        CheckReplicationSlotInvalidated(MyReplicationSlot);
+
         if (SlotIsLogical(MyReplicationSlot))
             LogicalConfirmReceivedLocation(flushPtr);
         else
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 6e469ea749..f7531ca495 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -98,6 +98,9 @@ typedef struct ReplicationSlotPersistentData
  * backend owning the slot does not need to take this lock when reading its
  * own fields, while concurrent backends not owning this slot should take the
  * lock when reading this slot's data.
+ * - The invalidated field is initially false then changed to true
+ * irreversibly by other than the owner and read by the possible next owner
+ * process after the termination of the current owner.
  */
 typedef struct ReplicationSlot
 {
@@ -131,6 +134,9 @@ typedef struct ReplicationSlot
     /* data surviving shutdowns and crashes */
     ReplicationSlotPersistentData data;
 
+    /* is invalidated ? */
+    bool        invalidated;
+
     /* is somebody performing io on this slot? */
     LWLock        io_in_progress_lock;
 
@@ -187,6 +193,7 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
+extern void CheckReplicationSlotInvalidated(ReplicationSlot *slot);
 extern void ReplicationSlotSave(void);
 extern void ReplicationSlotMarkDirty(void);
 
-- 
2.18.2


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Pavel Stehule
Дата:
Сообщение: Re: proposal \gcsv
Следующее
От: Kyotaro Horiguchi
Дата:
Сообщение: Re: shared-memory based stats collector