Re: [HACKERS] Restricting maximum keep segments by repslots

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] Restricting maximum keep segments by repslots
Дата
Msg-id 20181025.215518.189844649.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] Restricting maximum keep segments by repslots  (Masahiko Sawada <sawada.mshk@gmail.com>)
Ответы Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Re: [HACKERS] Restricting maximum keep segments by repslots  (Masahiko Sawada <sawada.mshk@gmail.com>)
Список pgsql-hackers
Hello.

At Mon, 22 Oct 2018 19:35:04 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in
<CAD21AoBdfoLSgujPZ_TpnH5zdQz0jg-Y8OXtZ=TCO787Sey-=w@mail.gmail.com>
> On Thu, Sep 13, 2018 at 6:30 PM Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> Sorry for the late response. The patch still can be applied to the

It's alright. Thanks.

> curent HEAD so I reviewed the latest patch.
> The value of 'remain' and 'wal_status' might not be correct. Although
> 'wal_stats' shows 'lost' but we can get changes from the slot. I've
> tested it with the following steps.
> 
> =# alter system set max_slot_wal_keep_size to '64MB'; -- while
> wal_keep_segments is 0
> =# select pg_reload_conf();
> =# select slot_name, wal_status, remain, pg_size_pretty(remain) as
> remain_pretty from pg_replication_slots ;
>  slot_name | wal_status |  remain  | remain_pretty
> -----------+------------+----------+---------------
>  1         | streaming  | 83885648 | 80 MB
> (1 row)
> 
> ** consume 80MB WAL, and do CHECKPOINT **
> 
> =# select slot_name, wal_status, remain, pg_size_pretty(remain) as
> remain_pretty from pg_replication_slots ;
>  slot_name | wal_status | remain | remain_pretty
> -----------+------------+--------+---------------
>  1         | lost       |      0 | 0 bytes
> (1 row)
> =# select count(*) from pg_logical_slot_get_changes('1', NULL, NULL);
>  count
> -------
>     15
> (1 row)

Mmm. The function looks into the segment already open before
losing the segment in the file system (precisely, its direcotory
entry has been deleted). So just 1 lost segment doesn't
matter. Please try losing more one segment.

=# select * from pg_logical_slot_get_changes('s1', NULL, NULL);
ERROR:  unexpected pageaddr 0/29000000 in log segment 000000010000000000000023, offset 0

Or, instead just restarting will let the opened segment forgotten.

...
>  1         | lost       |      0 | 0 bytes
(just restart)
> =# select * from pg_logical_slot_get_changes('s1', NULL, NULL);
> ERROR:  requested WAL segment pg_wal/000000010000000000000029 has already been removed

I'm not sure this is counted to be a bug...


> -----
> I got the following result with setting of wal_keep_segments >
> max_slot_keep_size. The 'wal_status' shows 'streaming' although the
> 'remain' is 0.
> 
> =# select slot_name, wal_status, remain from pg_replication_slots limit 1;
>  slot_name | wal_status | remain
> -----------+------------+--------
>  1         | streaming  |      0
> (1 row)
> 
> +               XLByteToSeg(targetLSN, restartSeg, wal_segment_size);
> +               if (max_slot_wal_keep_size_mb >= 0 && currSeg <=
> restartSeg + limitSegs)
> +               {
> 
> You use limitSegs here but shouldn't we use keepSeg instead? Actually
> I've commented this point for v6 patch before[1], and this had been
> fixed in the v7 patch. However you're using limitSegs again from v8
> patch again. I might be missing something though.

No. keepSegs is the number of segments *actually* kept around. So
reverting it to keptSegs just resurrects the bug you pointed
upthread. What needed here is at most how many segments will be
kept. So raising limitSegs by wal_keep_segments fixes that.
Sorry for the sequence of silly bugs. TAP test for the case
added.


> Changed the status to 'Waiting on Author'.
> 
> [1] https://www.postgresql.org/message-id/CAD21AoD0rChq7wQE%3D_o95quopcQGjcVG9omwdH07nT5cm81hzg%40mail.gmail.com
> [2] https://www.postgresql.org/message-id/20180904.195250.144186960.horiguchi.kyotaro%40lab.ntt.co.jp

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From ecb7a8dd6a59ededd24578920bc1b4fbaf481b10 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/4] Add WAL relief vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 108 ++++++++++++++++++++------
 src/backend/utils/misc/guc.c                  |  12 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 4 files changed, 97 insertions(+), 25 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7375a78ffc..814cd01f79 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int            wal_level = WAL_LEVEL_MINIMAL;
 int            CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int            wal_retrieve_retry_interval = 5000;
+int            max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool        XLOG_DEBUG = false;
@@ -867,6 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9513,6 +9515,53 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+/*
+ * Returns minimum segment number the next checkpoint must leave considering
+ * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location
+ * minSlotLSN is the minimum restart_lsn of all active slots
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+    uint64        keepSegs = 0;
+    XLogSegNo    currSeg;
+    XLogSegNo    minSlotSeg;
+
+    XLByteToSeg(currLSN, currSeg, wal_segment_size);
+    XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+    /*
+     * Calculate keep segments by slots first. The second term of the
+     * condition is just a sanity check.
+     */
+    if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+        keepSegs = currSeg - minSlotSeg;
+
+    /* Cap keepSegs by max_slot_wal_keep_size */
+    if (max_slot_wal_keep_size_mb >= 0)
+    {
+        uint64 limitSegs;
+
+        limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+        /* Apply max_slot_wal_keep_size to keepSegs */
+        if (limitSegs < keepSegs)
+            keepSegs = limitSegs;
+    }
+
+    /* but, keep at least wal_keep_segments segments if any */
+    if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+        keepSegs = wal_keep_segments;
+
+    /* avoid underflow, don't go below 1 */
+    if (currSeg <= keepSegs)
+        return 1;
+
+    return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9524,38 +9573,47 @@ CreateRestartPoint(int flags)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-    XLogSegNo    segno;
-    XLogRecPtr    keep;
+    XLogRecPtr    slotminptr = InvalidXLogRecPtr;
+    XLogSegNo    minSegNo;
+    XLogSegNo    slotSegNo;
 
-    XLByteToSeg(recptr, segno, wal_segment_size);
-    keep = XLogGetReplicationSlotMinimumLSN();
+    if (max_replication_slots > 0)
+        slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-    /* compute limit for wal_keep_segments first */
-    if (wal_keep_segments > 0)
+    /*
+     * We should keep certain number of WAL segments after this checkpoint.
+     */
+    minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+    /*
+     * warn if the checkpoint flushes the segments required by replication
+     * slots.
+     */
+    if (!XLogRecPtrIsInvalid(slotminptr))
     {
-        /* avoid underflow, don't go below 1 */
-        if (segno <= wal_keep_segments)
-            segno = 1;
+        static XLogSegNo prev_lost_segs = 0;    /* avoid duplicate messages */
+
+        XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+        if (slotSegNo < minSegNo)
+        {
+            XLogSegNo lost_segs = minSegNo - slotSegNo;
+            if (prev_lost_segs != lost_segs)
+                ereport(WARNING,
+                        (errmsg ("some replication slots have lost required WAL segments"),
+                         errdetail_plural(
+                             "The mostly affected slot has lost %ld segment.",
+                             "The mostly affected slot has lost %ld segments.",
+                             lost_segs, lost_segs)));
+            prev_lost_segs = lost_segs;
+        }
         else
-            segno = segno - wal_keep_segments;
-    }
-
-    /* then check whether slots limit removal further */
-    if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-    {
-        XLogSegNo    slotSegNo;
-
-        XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-        if (slotSegNo <= 0)
-            segno = 1;
-        else if (slotSegNo < segno)
-            segno = slotSegNo;
+            prev_lost_segs = 0;
     }
 
     /* don't delete WAL segments newer than the calculated segment */
-    if (segno < *logSegNo)
-        *logSegNo = segno;
+    if (minSegNo < *logSegNo)
+        *logSegNo = minSegNo;
 }
 
 /*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2317e8be6b..b26c9abec0 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2538,6 +2538,18 @@ static struct config_int ConfigureNamesInt[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+            gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+         NULL,
+         GUC_UNIT_MB
+        },
+        &max_slot_wal_keep_size_mb,
+        -1, -1,
+        MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */
+        NULL, NULL, NULL
+    },
+
     {
         {"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
             gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 4e61bc6521..0fe00d99a9 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -238,6 +238,7 @@
 #max_wal_senders = 10        # max number of walsender processes
                 # (change requires restart)
 #wal_keep_segments = 0        # in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1    # measured in bytes; -1 disables
 #wal_sender_timeout = 60s    # in milliseconds; 0 disables
 
 #max_replication_slots = 10    # max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 421ba6d775..12cd0d1d10 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int    wal_segment_size;
 extern int    min_wal_size_mb;
 extern int    max_wal_size_mb;
 extern int    wal_keep_segments;
+extern int    max_slot_wal_keep_size_mb;
 extern int    XLOGbuffers;
 extern int    XLogArchiveTimeout;
 extern int    wal_retrieve_retry_interval;
-- 
2.16.3

From 52ab262b1a8fc6c95e2113e9a330999b273c4ef8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/4] Add monitoring aid for max_slot_wal_keep_size.

Adds two columns "status" and "remain" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows whether the slot can be reconnected or not, or
about to lose reserving WAL segments, and the remaining bytes of WAL
that can be written until the slot loses reserving WAL records.
---
 contrib/test_decoding/expected/ddl.out |   4 +-
 src/backend/access/transam/xlog.c      | 154 +++++++++++++++++++++++++++++++--
 src/backend/catalog/system_views.sql   |   4 +-
 src/backend/replication/slotfuncs.c    |  32 ++++++-
 src/include/access/xlog.h              |   1 +
 src/include/catalog/pg_proc.dat        |   6 +-
 src/test/regress/expected/rules.out    |   6 +-
 7 files changed, 190 insertions(+), 17 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index b7c76469fc..6b6a2df213 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn 
 

------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn | wal_status | remain 
 

+-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------
 (0 rows)
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 814cd01f79..e9327d0e76 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -868,7 +868,8 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr,
+                       XLogRecPtr targetLSN, uint64 *restBytes);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9515,19 +9516,126 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+
+/*
+ * Finds the segment number of the oldest file in XLOG directory.
+ *
+ * This function is intended to be used only when we haven't removed a WAL
+ * segment. Read XLogCtl->lastRemovedSegNo if any.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+    DIR        *xldir;
+    struct dirent *xlde;
+    XLogSegNo segno = 0;
+
+    xldir = AllocateDir(XLOGDIR);
+    while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+    {
+        TimeLineID tli;
+        XLogSegNo fsegno;
+
+        /* Ignore files that are not XLOG segments */
+        if (!IsXLogFileName(xlde->d_name) &&
+            !IsPartialXLogFileName(xlde->d_name))
+            continue;
+
+        XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+        /* get minimum segment ignoring timeline ID */
+        if (segno == 0 || fsegno < segno)
+            segno = fsegno;
+    }
+
+    FreeDir(xldir);
+
+    return segno;
+}
+
+/*
+ * Check if the record on the given targetLSN is present in XLOG files.
+ *
+ * Returns three kind of values.
+ * 0 means that WAL record at targetLSN is already removed.
+ * 1 means that WAL record at targetLSN is available.
+ * 2 means that WAL record at targetLSN is available but about to be removed by
+ * the next checkpoint.
+ */
+int
+IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes)
+{
+    XLogRecPtr currpos;
+    XLogRecPtr slotPtr;
+    XLogSegNo targetSeg;
+    XLogSegNo tailSeg;
+    XLogSegNo oldestSeg;
+
+    Assert(!XLogRecPtrIsInvalid(targetLSN));
+    Assert(restBytes);
+
+    currpos = GetXLogWriteRecPtr();
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    oldestSeg = XLogCtl->lastRemovedSegNo;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    if (oldestSeg != 0)
+    {
+        /* oldest segment is just after the last removed segment */
+        oldestSeg++;
+    }
+    else
+    {
+        /*
+         * We haven't removed a WAL segment since startup. Get the number
+         * looking WAL files.
+         */
+        static XLogSegNo oldestFileSeg = 0;
+
+        /* Must do it the hard way for the first time */
+        if (oldestFileSeg == 0)
+            oldestFileSeg = GetOldestXLogFileSegNo();
+
+        oldestSeg = oldestFileSeg;
+    }
+
+    XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+    slotPtr = XLogGetReplicationSlotMinimumLSN();
+    tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes);
+
+    /* targetSeg is being reserved by slots */
+    if (tailSeg <= targetSeg)
+        return 1;
+
+    /* targetSeg is not reserved but still available */
+    if (oldestSeg <= targetSeg)
+        return 2;
+
+    /* targetSeg has gone */
+    return    0;
+}
+
 /*
  * Returns minimum segment number the next checkpoint must leave considering
  * wal_keep_segments, replication slots and max_slot_wal_keep_size.
  *
  * currLSN is the current insert location
  * minSlotLSN is the minimum restart_lsn of all active slots
+ * targetLSN is used when restBytes is not NULL.
+ *
+ * If restBytes is not NULL, sets the remaining LSN bytes to advance until the
+ * segment that contains targetLSN will be removed.
  */
 static XLogSegNo
-GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN,
+                     XLogRecPtr targetLSN, uint64 *restBytes)
 {
     uint64        keepSegs = 0;
     XLogSegNo    currSeg;
     XLogSegNo    minSlotSeg;
+    uint64        limitSegs = 0;
 
     XLByteToSeg(currLSN, currSeg, wal_segment_size);
     XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
@@ -9542,8 +9650,6 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
     /* Cap keepSegs by max_slot_wal_keep_size */
     if (max_slot_wal_keep_size_mb >= 0)
     {
-        uint64 limitSegs;
-
         limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
         /* Apply max_slot_wal_keep_size to keepSegs */
@@ -9551,9 +9657,40 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
             keepSegs = limitSegs;
     }
 
-    /* but, keep at least wal_keep_segments segments if any */
-    if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
-        keepSegs = wal_keep_segments;
+    if (wal_keep_segments > 0)
+    {
+        /* but, keep at least wal_keep_segments segments if any */
+        if (keepSegs < wal_keep_segments)
+            keepSegs = wal_keep_segments;
+
+        /* also, limitSegs should be raised if wal_keep_segments is larger */
+        if (limitSegs < wal_keep_segments)
+            limitSegs = wal_keep_segments;
+    }
+
+    /*
+     * If requested, return remaining LSN bytes to advance until the slot
+     * gives up reserving WAL records.
+     */
+    if (restBytes)
+    {
+        uint64 fragbytes;
+        XLogSegNo targetSeg;
+
+        *restBytes = 0;
+
+        XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+        if (max_slot_wal_keep_size_mb >= 0 && currSeg <= targetSeg + limitSegs)
+        {
+            /*
+             * This slot still has all required segments. Calculate how many
+             * LSN bytes the slot has until it loses targetLSN.
+             */
+            fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+            XLogSegNoOffsetToRecPtr(targetSeg + limitSegs - currSeg, fragbytes,
+                                    wal_segment_size, *restBytes);
+        }
+    }
 
     /* avoid underflow, don't go below 1 */
     if (currSeg <= keepSegs)
@@ -9583,7 +9720,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
     /*
      * We should keep certain number of WAL segments after this checkpoint.
      */
-    minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+    minSegNo =
+        GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL);
 
     /*
      * warn if the checkpoint flushes the segments required by replication
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a03b005f73..1d680e7ed8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad4a2..d9ed9e8cf2 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     TupleDesc    tupdesc;
     Tuplestorestate *tupstore;
@@ -307,6 +307,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         else
             nulls[i++] = true;
 
+        if (restart_lsn == InvalidXLogRecPtr)
+        {
+            values[i++] = CStringGetTextDatum("unknown");
+            values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+        }
+        else
+        {
+            uint64    remaining_bytes;
+            char *status;
+
+            switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes))
+            {
+            case 0:
+                status = "lost";
+                break;
+            case 1:
+                status = "streaming";
+                break;
+            case 2:
+                status = "keeping";
+                break;
+            default:
+                status = "unknown";
+                break;
+            }
+
+            values[i++] = CStringGetTextDatum(status);
+            values[i++] = Int64GetDatum(remaining_bytes);
+        }
+
         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     }
     LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 12cd0d1d10..ad9d1dec29 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index cff58ed2d8..2253c780ba 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9599,9 +9599,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 735dd37acf..13e2b51376 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn)
 
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn, wal_status, remain)
 
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.16.3

From 5a131ba043da3e60eeb4837409b68fd74b08e33e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 17:33:53 +0900
Subject: [PATCH 3/4] TAP test for the slot limit feature

---
 src/test/recovery/t/016_replslot_limit.pl | 184 ++++++++++++++++++++++++++++++
 1 file changed, 184 insertions(+)
 create mode 100644 src/test/recovery/t/016_replslot_limit.pl

diff --git a/src/test/recovery/t/016_replslot_limit.pl b/src/test/recovery/t/016_replslot_limit.pl
new file mode 100644
index 0000000000..a7285d94c0
--- /dev/null
+++ b/src/test/recovery/t/016_replslot_limit.pl
@@ -0,0 +1,184 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slot.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 10;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 32MB
+max_wal_size = 48MB
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using a replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1');
+$node_standby->append_conf('recovery.conf', qq(
+primary_slot_name = 'rep1'
+));
+$node_standby->start;
+
+# Wait until standby has replayed enough data on the standby
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, currently the slot must be secured.
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots
WHEREslot_name = 'rep1'");
 
+is($result, "$start_lsn|streaming|0", 'check initial state of standby');
+
+# Advance WAL by ten segments (= 160MB) on master
+advance_wal($node_master, 10);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is unconditionally "safe" with the default setting.
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|streaming|0", 'check that slot is keeping all segments');
+
+# The stanby can connect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 48;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state.
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|streaming|64 MB", 'check that remaining byte is calculated');
+
+# Advance WAL again then checkpoint
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working.
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|streaming|32 MB", 'remaining byte should be reduced by 32MB');
+
+
+# wal_keep_segments can override
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 6; SELECT pg_reload_conf();");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|streaming|80 MB", 'check that wal_keep_segments override');
+
+# restore wal_keep_segments (no test)
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# Advance WAL again without checkpoint
+advance_wal($node_master, 2);
+
+# Slot gets to 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|keeping|0 bytes", 'check that some segments are about to be removed');
+
+# The stanby still can connect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+                "requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that no replication failure is caused by insecure state');
+
+# Advance WAL again, the slot loses some segments.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 10);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+print "### $logstart\n";
+ok(find_in_log($node_master,
+               "some replication slots have lost required WAL segments\n".
+               ".*The mostly affected slot has lost 5 segments.",
+               $logstart),
+   'check that warning is correctly logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM
pg_replication_slotsWHERE slot_name = 'rep1'");
 
+is($result, "$start_lsn|lost|0 bytes", 'check that overflown segments have been removed');
+
+# The stanby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+    if (find_in_log($node_standby,
+                    "requested WAL segment [0-9A-F]+ has already been removed",
+                    $logstart))
+    {
+        $failed = 1;
+        last;
+    }
+    usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+    my ($node, $n) = @_;
+
+    # Advance by $n segments (= (16 * $n) MB) on master
+    for (my $i = 0 ; $i < $n ; $i++)
+    {
+        $node->safe_psql('postgres', "CREATE TABLE t (a int); DROP TABLE t; SELECT pg_switch_wal();");
+    }
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+    my ($node) = @_;
+
+    return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+    my ($node, $pat, $off) = @_;
+
+    $off = 0 unless defined $off;
+    my $log = TestLib::slurp_file($node->logfile);
+    return 0 if (length($log) <= $off);
+
+    $log = substr($log, $off);
+
+    return $log =~ m/$pat/;
+}
-- 
2.16.3

From fc3a69b6fa82c9f6ecdc11b0885d34a39f5e5884 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 11 Jan 2018 15:00:32 +0900
Subject: [PATCH 4/4] Documentation for slot-limit feature

---
 doc/src/sgml/catalogs.sgml          | 28 ++++++++++++++++++++++++++++
 doc/src/sgml/config.sgml            | 23 +++++++++++++++++++++++
 doc/src/sgml/high-availability.sgml |  8 +++++---
 3 files changed, 56 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 9edba96fab..fc4cbc9239 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9881,6 +9881,34 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL records claimed by the
+      slot. <literal>streaming</literal>, <literal>keeping</literal>,
+      <literal>lost</literal>
+      or <literal>unknown</literal>. <literal>streaming</literal> means that
+      the claimed records are available. <literal>keeping</literal> means that
+      some of them are to be removed by the next checkpoint.
+      <literal>lost</literal> means that some of them are no longer
+      available. The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is non-negative. If the
+      slot doesn't have valid restart_lsn, this field
+      is <literal>unknown</literal>.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>remain</structfield></entry>
+      <entry><type>bigint</type></entry>
+      <entry></entry>
+      <entry>The amount in bytes that WAL location (LSN) can advance until the
+        slot may lose required WAL records.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 7554cba3f9..f3e504862c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3117,6 +3117,29 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited size of WAL files.  If restart_lsn
+        of a replication slot gets behind more than that bytes from the
+        current LSN, the standby using the slot may no longer be able to
+        reconnect due to removal of required WAL records. You can see the WAL
+        availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index ebcb3daaed..15a98340a6 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -927,9 +927,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allotted
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
-- 
2.16.3


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Alexey Kondratov
Дата:
Сообщение: Re: [Patch] pg_rewind: options to use restore_command fromrecovery.conf or command line
Следующее
От: Marius Timmer
Дата:
Сообщение: [PATCH] pg_hba.conf : new auth option : clientcert=verify-full