Re: [HACKERS] Restricting maximum keep segments by repslots

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] Restricting maximum keep segments by repslots
Дата
Msg-id 20180713.174004.249224160.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] Restricting maximum keep segments by repslots  (Masahiko Sawada <sawada.mshk@gmail.com>)
Ответы Re: [HACKERS] Restricting maximum keep segments by repslots
Re: [HACKERS] Restricting maximum keep segments by repslots
Список pgsql-hackers
Hello.

At Wed, 11 Jul 2018 15:09:23 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in
<CAD21AoCFtW6+SN_eVTszDAjQeeU2sSea2VpCEx08ejNafk8H9w@mail.gmail.com>
> On Mon, Jul 9, 2018 at 2:47 PM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
..
> Here is review comments of v4 patches.
> 
> +       if (minKeepLSN)
> +       {
> +               XLogRecPtr slotPtr = XLogGetReplicationSlotMinimumLSN();
> +               Assert(!XLogRecPtrIsInvalid(slotPtr));
> +
> +               tailSeg = GetOldestKeepSegment(currpos, slotPtr);
> +
> +               XLogSegNoOffsetToRecPtr(tailSeg, 0, *minKeepLSN,
> wal_segment_size);
> +       }
> 
> The usage of XLogSegNoOffsetToRecPtr is wrong. Since we specify the
> destination at 4th argument the wal_segment_size will be changed in
> the above expression. The regression tests by PostgreSQL Patch Tester

I'm not sure I get this correctly, the definition of the macro is
as follows.

| #define XLogSegNoOffsetToRecPtr(segno, offset, dest, wal_segsz_bytes) \
|         (dest) = (segno) * (wal_segsz_bytes) + (offset)

The destination is the *3rd* parameter and the forth is segment
size which is not to be written.

> seem passed but I got the following assertion failure in
> recovery/t/010_logical_decoding_timelines.pl
> 
> TRAP: FailedAssertion("!(XLogRecPtrToBytePos(*StartPos) ==
> startbytepos)", File: "xlog.c", Line: 1277)

Hmm. I don't see a relation with this patch, but how did you
cause the failure? The failure means inconsistency between
existing XLogBytePosToRecPtr and XLogRecPtrToBytePos, which
doesn't seem to happen without modifying the two functions.

> ----
> +       XLByteToSeg(restartLSN, restartSeg, wal_segment_size);
> +
> +
> +       if (minKeepLSN)
> There is an extra empty line.
> 
> ----
> +    /* but, keep larger than wal_segment_size if any*/
> +    if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
> +        keepSegs = wal_keep_segments;
> 
> You meant wal_keep_segments in the above comment rather than
> wal_segment_size? Also, the above comment need a whitespace just after
> "any".

Ouch! You're right. Fixed.

> ----
> +bool
> +IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minKeepLSN)
> +{
> 
> I think restartLSN is a word used for replication slots. Since this
> function is defined in xlog.c it would be better to change the
> argument name to more generic name, for example recptr.

Agreed. I used "target" instead.

> ----
> +       /*
> +        * Calcualte keep segments by slots first. The second term of the
> +        * condition is just a sanity check.
> +        */
> 
> s/calcualte/calculate/

Fixed.

> ----
> +               /* get minimum segment ignorig timeline ID */
> 
> s/ignorig/ignoring/

Fixed.

# One of my fingers is literally fatter with bandaid than usual..

> ----
> min_keep_lsn in pg_replication_slots currently shows the same value in
> every slots but I felt that the value seems not easy to understand
> intuitively for users because users will have to confirm that value
> and to compare the current LSN in order to check if replication slots
> will become the "lost" status. So how about showing values that
> indicate how far away from the point where we become "lost" for
> individual slots?

Yeah, that is what I did in the first cut of this patch from the
same thought. pg_replication_slots have two additional columns
"live" and "distance".

https://www.postgresql.org/message-id/20171031.184310.182012625.horiguchi.kyotaro@lab.ntt.co.jp

Thre current design is changed following a comment.

https://www.postgresql.org/message-id/20171108.131431.170534842.horiguchi.kyotaro%40lab.ntt.co.jp

> > I don't think 'distance' is a good metric - that's going to continually
> > change. Why not store the LSN that's available and provide a function
> > that computes this? Or just rely on the lsn - lsn operator?
> 
> It seems reasonable.,The 'secured minimum LSN' is common among
> all slots so showing it in the view may look a bit stupid but I
> don't find another suitable place for it.  distance = 0 meant the
> state that the slot is living but insecured in the previous patch
> and that information is lost by changing 'distance' to
> 'min_secure_lsn'.

As I reconsidered this, I noticed that "lsn - lsn" doesn't make
sense here. The correct formula for the value is
"max_slot_wal_keep_size * 1024 * 1024 - ((oldest LSN to keep) -
restart_lsn). It is not a simple formula to write by hand but
doesn't seem general enough. I re-changed my mind to show the
"distance" there again.

pg_replication_slots now has the column "remain" instaed of
"min_keep_lsn", which shows an LSN when wal_status is "streaming"
and otherwise "0/0". In a special case, "remain" can be "0/0"
while "wal_status" is "streaming". It is the reason for the
tristate return value of IsLsnStillAvaialbe().

wal_status | remain 
streaming  | 0/19E3C0  -- WAL is reserved
streaming  | 0/0       -- Still reserved but on the boundary
keeping    | 0/0       -- About to be lost.
lost       | 0/0       -- Lost.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From c0cd29e0bb568834cc8889d69d3e6081236c5784 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/4] Add WAL releaf vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 100 ++++++++++++++++++++------
 src/backend/utils/misc/guc.c                  |  12 ++++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 4 files changed, 94 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4049deb968..df6b5e89e6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int            wal_level = WAL_LEVEL_MINIMAL;
 int            CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int            wal_retrieve_retry_interval = 5000;
+int            max_slot_wal_keep_size_mb = 0;
 
 #ifdef WAL_DEBUG
 bool        XLOG_DEBUG = false;
@@ -867,6 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9462,6 +9464,51 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+/*
+ * Returns minimum segment number the next checktpoint must leave considering
+ * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+    uint64        keepSegs = 0;
+    XLogSegNo    currSeg;
+    XLogSegNo    slotSeg;
+
+    XLByteToSeg(currLSN, currSeg, wal_segment_size);
+    XLByteToSeg(minSlotLSN, slotSeg, wal_segment_size);
+
+    /*
+     * Calcualte keep segments by slots first. The second term of the
+     * condition is just a sanity check.
+     */
+    if (minSlotLSN != InvalidXLogRecPtr && slotSeg <= currSeg)
+        keepSegs = currSeg - slotSeg;
+
+    /*
+     * slot keep segments is limited by max_slot_wal_keep_size, fragment of a
+     * segment is ignored
+     */
+    if (max_slot_wal_keep_size_mb > 0)
+    {
+        uint64 limitSegs;
+
+        limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+        if (limitSegs < keepSegs)
+            keepSegs = limitSegs;
+    }
+
+    /* but, keep larger than wal_segment_size if any*/
+    if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+        keepSegs = wal_keep_segments;
+
+    /* avoid underflow, don't go below 1 */
+    if (currSeg <= keepSegs)
+        return 1;
+
+    return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9474,33 +9521,46 @@ static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
     XLogSegNo    segno;
-    XLogRecPtr    keep;
+    XLogRecPtr    slotminptr = InvalidXLogRecPtr;
+    XLogSegNo    minSegNo;
+    XLogSegNo    slotSegNo;
 
     XLByteToSeg(recptr, segno, wal_segment_size);
-    keep = XLogGetReplicationSlotMinimumLSN();
 
-    /* compute limit for wal_keep_segments first */
-    if (wal_keep_segments > 0)
+    if (max_replication_slots > 0)
+        slotminptr = XLogGetReplicationSlotMinimumLSN();
+
+    /*
+     * We should keep certain number of WAL segments after this checktpoint.
+     */
+    minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+    /*
+     * warn if the checkpoint flushes the segments required by replication
+     * slots.
+     */
+    if (!XLogRecPtrIsInvalid(slotminptr))
     {
-        /* avoid underflow, don't go below 1 */
-        if (segno <= wal_keep_segments)
-            segno = 1;
+        static XLogSegNo prev_lost_segs = 0;    /* avoid duplicate messages */
+
+        XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+        if (slotSegNo < minSegNo)
+        {
+            XLogSegNo lost_segs = minSegNo - slotSegNo;
+            if (prev_lost_segs != lost_segs)
+                ereport(WARNING,
+                        (errmsg ("some replication slots have lost required WAL segments"),
+                         errdetail("The mostly affected slot has lost %ld segments.",
+                                   lost_segs)));
+            prev_lost_segs = lost_segs;
+        }
         else
-            segno = segno - wal_keep_segments;
+            prev_lost_segs = 0;
     }
 
-    /* then check whether slots limit removal further */
-    if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-    {
-        XLogSegNo    slotSegNo;
-
-        XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-        if (slotSegNo <= 0)
-            segno = 1;
-        else if (slotSegNo < segno)
-            segno = slotSegNo;
-    }
+    if (minSegNo < segno)
+        segno = minSegNo;
 
     /* don't delete WAL segments newer than the calculated segment */
     if (segno < *logSegNo)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 17292e04fe..01b8c8edec 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2530,6 +2530,18 @@ static struct config_int ConfigureNamesInt[] =
         NULL, NULL, NULL
     },
 
+    {
+        {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+            gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+         NULL,
+         GUC_UNIT_MB
+        },
+        &max_slot_wal_keep_size_mb,
+        0, 0,
+        MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */
+        NULL, NULL, NULL
+    },
+
     {
         {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
             gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 657c3f81f8..23af9ea274 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -238,6 +238,7 @@
 #max_wal_senders = 10        # max number of walsender processes
                 # (change requires restart)
 #wal_keep_segments = 0        # in logfile segments; 0 disables
+#max_slot_wal_keep_size = 0    # measured in bytes; 0 disables
 #wal_sender_timeout = 60s    # in milliseconds; 0 disables
 
 #max_replication_slots = 10    # max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 421ba6d775..12cd0d1d10 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int    wal_segment_size;
 extern int    min_wal_size_mb;
 extern int    max_wal_size_mb;
 extern int    wal_keep_segments;
+extern int    max_slot_wal_keep_size_mb;
 extern int    XLOGbuffers;
 extern int    XLogArchiveTimeout;
 extern int    wal_retrieve_retry_interval;
-- 
2.16.3

From 32b4fa4556dda77bb6b6563692d19f0b9556d85e Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/4] Add monitoring aid for max_slot_wal_keep_size.

Adds two columns "status" and "remain" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows whether the slot can be reconnected or not, or
about to lose reserving WAL segments, and the remaing bytes of WAL
that can be written until the slot loses reserving WAL records.
---
 contrib/test_decoding/expected/ddl.out |   4 +-
 src/backend/access/transam/xlog.c      | 135 +++++++++++++++++++++++++++++++--
 src/backend/catalog/system_views.sql   |   4 +-
 src/backend/replication/slotfuncs.c    |  32 +++++++-
 src/include/access/xlog.h              |   1 +
 src/include/catalog/pg_proc.dat        |   6 +-
 src/test/regress/expected/rules.out    |   6 +-
 7 files changed, 172 insertions(+), 16 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index b7c76469fc..6b6a2df213 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn 
 

------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin |
restart_lsn| confirmed_flush_lsn | wal_status | remain 
 

+-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------
 (0 rows)
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index df6b5e89e6..9bf648dc17 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -868,7 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, uint64 *restBytes);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9464,12 +9464,110 @@ CreateRestartPoint(int flags)
     return true;
 }
 
+
+/*
+ * Returns the segment number of the oldest file in XLOG directory.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+    DIR        *xldir;
+    struct dirent *xlde;
+    XLogSegNo segno = 0;
+
+    xldir = AllocateDir(XLOGDIR);
+    while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+    {
+        TimeLineID tli;
+        XLogSegNo fsegno;
+
+        /* Ignore files that are not XLOG segments */
+        if (!IsXLogFileName(xlde->d_name) &&
+            !IsPartialXLogFileName(xlde->d_name))
+            continue;
+
+        XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+        /* get minimum segment ignoring timeline ID */
+        if (segno == 0 || fsegno < segno)
+            segno = fsegno;
+    }
+
+    FreeDir(xldir);
+
+    return segno;
+}
+
+/*
+ * Check if the record on the given targetLSN is present in XLOG files.
+ *
+ * Returns three kind of values.
+ * 0 means that WAL record at targetLSN is alredy removed.
+ * 1 means that WAL record at tagetLSN is availble.
+ * 2 means that WAL record at tagetLSN is availble but about to be removed by
+ * the next checkpoint.
+ */
+int
+IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes)
+{
+    XLogRecPtr currpos;
+    XLogRecPtr slotPtr;
+    XLogSegNo targetSeg;
+    XLogSegNo tailSeg;
+    XLogSegNo oldestSeg;
+
+    Assert(!XLogRecPtrIsInvalid(targetLSN));
+    Assert(restBytes);
+
+    currpos = GetXLogWriteRecPtr();
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    oldestSeg = XLogCtl->lastRemovedSegNo;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    /*
+     * oldestSeg is zero before at least one segment has been removed since
+     * startup. Use oldest segno taken from file names.
+     */
+    if (oldestSeg == 0)
+    {
+        static XLogSegNo oldestFileSeg = 0;
+
+        if (oldestFileSeg == 0)
+            oldestFileSeg = GetOldestXLogFileSegNo();
+        /* let it have the same meaning with lastRemovedSegNo here */
+        oldestSeg = oldestFileSeg - 1;
+    }
+
+    /* oldest segment is just after the last removed segment */
+    oldestSeg++;
+
+    XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+    slotPtr = XLogGetReplicationSlotMinimumLSN();
+    tailSeg = GetOldestKeepSegment(currpos, slotPtr, restBytes);
+
+    /* targetSeg is being reserved by slots */
+    if (tailSeg <= targetSeg)
+        return 1;
+
+    /* targetSeg is not reserved but still available */
+    if (oldestSeg <= targetSeg)
+        return 2;
+
+    /* targetSeg has gone */
+    return    0;
+}
+
 /*
  * Returns minimum segment number the next checktpoint must leave considering
  * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ *
+ * If resetBytes is not NULL, returns remaining LSN bytes to advance until any
+ * slot loses reserving a WAL record.
  */
 static XLogSegNo
-GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, uint64 *restBytes)
 {
     uint64        keepSegs = 0;
     XLogSegNo    currSeg;
@@ -9479,26 +9577,49 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
     XLByteToSeg(minSlotLSN, slotSeg, wal_segment_size);
 
     /*
-     * Calcualte keep segments by slots first. The second term of the
+     * Calculate keep segments by slots first. The second term of the
      * condition is just a sanity check.
      */
     if (minSlotLSN != InvalidXLogRecPtr && slotSeg <= currSeg)
         keepSegs = currSeg - slotSeg;
 
+    if (restBytes)
+        *restBytes = 0;
+
     /*
-     * slot keep segments is limited by max_slot_wal_keep_size, fragment of a
-     * segment is ignored
+     * Calculate number of segments to keep ignoring segment fragment. If
+     * requested, return remaining LSN bytes to advance until the slot gives
+     * up to reserve WAL records.
      */
     if (max_slot_wal_keep_size_mb > 0)
     {
         uint64 limitSegs;
 
         limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
         if (limitSegs < keepSegs)
+        {
+            /* This slot gave up to retain reserved WAL records. */
             keepSegs = limitSegs;
+        }
+        else if (restBytes)
+        {
+            /* calculate return rest bytes until this slot loses WAL */
+            uint64 fragbytes;
+
+            /* If wal_keep_segments may be larger than slot limit. However
+             * it's a rather useless configuration, we should consider the
+             * case anyway.
+             */
+            if (limitSegs < wal_keep_segments)
+                limitSegs = wal_keep_segments;
+
+            fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+            *restBytes = (limitSegs - keepSegs) * wal_segment_size + fragbytes;
+        }
     }
 
-    /* but, keep larger than wal_segment_size if any*/
+    /* but, keep at least wal_keep_segments segments if any */
     if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
         keepSegs = wal_keep_segments;
 
@@ -9533,7 +9654,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
     /*
      * We should keep certain number of WAL segments after this checktpoint.
      */
-    minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+    minSegNo = GetOldestKeepSegment(recptr, slotminptr, NULL);
 
     /*
      * warn if the checkpoint flushes the segments required by replication
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 7251552419..d28896dc58 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 450f73759f..bf7fbb7833 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     TupleDesc    tupdesc;
     Tuplestorestate *tupstore;
@@ -307,6 +307,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
         else
             nulls[i++] = true;
 
+        if (restart_lsn == InvalidXLogRecPtr)
+        {
+            values[i++] = CStringGetTextDatum("unknown");
+            values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+        }
+        else
+        {
+            uint64    remaining_bytes;
+            char *status;
+
+            switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes))
+            {
+            case 0:
+                status = "lost";
+                break;
+            case 1:
+                status = "streaming";
+                break;
+            case 2:
+                status = "keeping";
+                break;
+            default:
+                status = "unknown";
+                break;
+            }
+
+            values[i++] = CStringGetTextDatum(status);
+            values[i++] = LSNGetDatum(remaining_bytes);
+        }
+
         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     }
     LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 12cd0d1d10..ad9d1dec29 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a14651010f..18acf1f8ef 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9796,9 +9796,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames =>
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ae0cd253d5..fe7a675e1e 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn)
 
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn, wal_status, remain)
 
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.16.3

From c12b68ee828ade7ed587e74d9f354f08ba39828d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 17:33:53 +0900
Subject: [PATCH 3/4] TAP test for the slot limit feature

---
 src/test/recovery/t/016_replslot_limit.pl | 161 ++++++++++++++++++++++++++++++
 1 file changed, 161 insertions(+)
 create mode 100644 src/test/recovery/t/016_replslot_limit.pl

diff --git a/src/test/recovery/t/016_replslot_limit.pl b/src/test/recovery/t/016_replslot_limit.pl
new file mode 100644
index 0000000000..401e3b1bd0
--- /dev/null
+++ b/src/test/recovery/t/016_replslot_limit.pl
@@ -0,0 +1,161 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slot.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 32MB
+max_wal_size = 48MB
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using a replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1');
+$node_standby->append_conf('recovery.conf', qq(
+primary_slot_name = 'rep1'
+));
+$node_standby->start;
+
+# Wait until standby has replayed enough data on the standby
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, currently the slot must be secured.
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|streaming", 'check initial state of standby');
+
+# Advance WAL by ten segments (= 160MB) on master
+advance_wal($node_master, 10);
+
+# All segments still must be secured after a checkpoint.
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|streaming", 'check that slot is keeping all segments');
+
+# The stanby can connect master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+
+# Advance WAL again
+advance_wal($node_master, 10);
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 32;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# Some segments become 'insecured'
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|keeping", 'check that some segments are about to removed');
+
+# The stanby still can connect master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+                "requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that no replication failure is caused by insecure state');
+
+# Advance WAL again
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 10);
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+               "some replication slots have lost required WAL segments",
+               $logstart),
+   'check that the warning is correctly logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status FROM pg_replication_slots WHERE
slot_name= 'rep1'");
 
+is($result, "$start_lsn|lost", 'check that overflown segments have been removed');
+
+# The stanby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+    if (find_in_log($node_standby,
+                    "requested WAL segment [0-9A-F]+ has already been removed",
+                    $logstart))
+    {
+        $failed = 1;
+        last;
+    }
+    usleep(100_000);
+}
+ok($failed, 'check replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+    my ($node, $n) = @_;
+
+    # Advance by $n segments (= (16 * $n) MB) on master
+    for (my $i = 0 ; $i < $n ; $i++)
+    {
+        $node->safe_psql('postgres', "CREATE TABLE t (a int); DROP TABLE t; SELECT pg_switch_wal();");
+    }
+
+    $node->safe_psql('postgres', "CHECKPOINT;");
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+    my ($node) = @_;
+
+    return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+    my ($node, $pat, $off) = @_;
+
+    $off = 0 unless defined $off;
+    my $log = TestLib::slurp_file($node->logfile);
+    return 0 if (length($log) <= $off);
+
+    $log = substr($log, $off);
+
+    return $log =~ m/$pat/;
+}
-- 
2.16.3

From 98fbdc59a4e8079de0ea1ca6bb2e09bf0ddfdcc9 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 11 Jan 2018 15:00:32 +0900
Subject: [PATCH 4/4] Documentation for slot-limit feature

---
 doc/src/sgml/catalogs.sgml          | 29 +++++++++++++++++++++++++++++
 doc/src/sgml/config.sgml            | 22 ++++++++++++++++++++++
 doc/src/sgml/high-availability.sgml | 14 ++++++++------
 3 files changed, 59 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 4851bc2e24..ce1eeb68bb 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9882,6 +9882,35 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL records claimed by the
+      slot. <literal>streaming</literal>, <literal>keeping</literal>,
+      <literal>lost</literal>
+      or <literal>unknown</literal>. <literal>streaming</literal> means that
+      the claimed records are available. <literal>keeping</literal> means that
+      some of them are to be removed by the next checkpoint.
+      <literal>lost</literal> means that some of them have been removed. The
+      last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is not zero. If the slot
+      doesn't have valid restart_lsn, this field
+      is <literal>unknown</literal>.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>min_keep_lsn</structfield></entry>
+      <entry><type>pg_lsn</type></entry>
+      <entry></entry>
+      <entry>The address (<literal>LSN</literal>) back to which is available
+      to the replication slot. The user of the slot can no longer continue
+      streaming if this exceeds restart_lsn.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e307bb4e8e..1db0736dc5 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3116,6 +3116,28 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to reatin in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is zero (the default),
+        replication slots retain unlimited size of WAL files.
+       </para>
+       <para>
+        This parameter is used being rounded down to the multiples of WAL file
+        size.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 934eb9052d..50ebb23c23 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -927,9 +927,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allotted
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
@@ -967,9 +969,9 @@ postgres=# SELECT * FROM pg_create_physical_replication_slot('node_a_slot');
  node_a_slot |
 
 postgres=# SELECT * FROM pg_replication_slots;
-  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn
--------------+-----------+--------+----------+--------+------+-------------+---------------------
- node_a_slot | physical  |        |          | f      |      |             |
+  slot_name  | slot_type | datoid | database | active | xmin | restart_lsn | confirmed_flush_lsn | wal_status |
min_keep_lsn

+-------------+-----------+--------+----------+--------+------+-------------+---------------------+------------+--------------
+ node_a_slot | physical  |        |          | f      |      |             |                     | unknown    |
0/1000000
 (1 row)
 </programlisting>
      To configure the standby to use this slot, <varname>primary_slot_name</varname>
-- 
2.16.3


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Fabien COELHO
Дата:
Сообщение: Re: Constraint documentation
Следующее
От: Fabien COELHO
Дата:
Сообщение: Re: pgbench's expression parsing & negative numbers