Re: [HACKERS] Restricting maximum keep segments by repslots

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] Restricting maximum keep segments by repslots
Дата
Msg-id 20170913.170816.148239681.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Ответы Re: [HACKERS] Restricting maximum keep segments by repslots  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Список pgsql-hackers
At Wed, 13 Sep 2017 11:43:06 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20170913.114306.67844218.horiguchi.kyotaro@lab.ntt.co.jp>
horiguchi.kyotaro> At Thu, 07 Sep 2017 21:59:56 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp>wrote in <20170907.215956.110216588.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > Hello,
> > 
> > At Thu, 07 Sep 2017 14:12:12 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20170907.141212.227032666.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > > > I would like a flag in pg_replication_slots, and possibly also a
> > > > numerical column that indicates how far away from the critical point
> > > > each slot is.  That would be great for a monitoring system.
> > > 
> > > Great! I'll do that right now.
> > 
> > Done.
> 
> The CF status of this patch turned into "Waiting on Author".
> This is because the second patch is posted separately from the
> first patch. I repost them together after rebasing to the current
> master.

Hmm. I was unconsciously careless of regression test since it is
in a tentative shape. This must pass the regression..

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 105,110 **** int            wal_level = WAL_LEVEL_MINIMAL;
--- 105,111 ---- int            CommitDelay = 0;    /* precommit delay in microseconds */ int            CommitSiblings
=5; /* # concurrent xacts needed to sleep */ int            wal_retrieve_retry_interval = 5000;
 
+ int            max_slot_wal_keep_size_mb = 0;  #ifdef WAL_DEBUG bool        XLOG_DEBUG = false;
***************
*** 9365,9373 **** KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
--- 9366,9397 ----     if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)     {         XLogSegNo
slotSegNo;
+         int            slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb);          XLByteToSeg(keep,
slotSegNo);
 
+         /*
+          * ignore slots if too many wal segments are kept.
+          * max_slot_wal_keep_size is just accumulated on wal_keep_segments.
+          */
+         if (max_slot_wal_keep_size_mb > 0 && slotSegNo + slotlimitsegs < segno)
+         {
+             segno = segno - slotlimitsegs; /* must be positive */
+ 
+             /*
+              * warn only if the checkpoint flushes the required segment.
+              * we assume here that *logSegNo is calculated keep location.
+              */
+             if (slotSegNo < *logSegNo)
+                 ereport(WARNING,
+                     (errmsg ("restart LSN of replication slots is ignored by checkpoint"),
+                      errdetail("Some replication slots have lost required WAL segnents to continue by up to %ld
segments.",
+                        (segno < *logSegNo ? segno : *logSegNo) - slotSegNo)));
+ 
+             /* emergency vent */
+             slotSegNo = segno;
+         }
+          if (slotSegNo <= 0)             segno = 1;         else if (slotSegNo < segno)
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2371,2376 **** static struct config_int ConfigureNamesInt[] =
--- 2371,2387 ----     },      {
+         {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+             gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+          NULL,
+          GUC_UNIT_MB
+         },
+         &max_slot_wal_keep_size_mb,
+         0, 0, INT_MAX,
+         NULL, NULL, NULL
+     },
+ 
+     {         {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,             gettext_noop("Sets the maximum time
towait for WAL replication."),             NULL,
 
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 235,240 ****
--- 235,241 ---- #max_wal_senders = 10        # max number of walsender processes                 # (change requires
restart)#wal_keep_segments = 0        # in logfile segments, 16MB each; 0 disables
 
+ #max_slot_wal_keep_size = 0    # measured in bytes; 0 disables #wal_sender_timeout = 60s    # in milliseconds; 0
disables #max_replication_slots = 10    # max number of replication slots
 
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 97,102 **** extern bool reachedConsistency;
--- 97,103 ---- extern int    min_wal_size_mb; extern int    max_wal_size_mb; extern int    wal_keep_segments;
+ extern int    max_slot_wal_keep_size_mb; extern int    XLOGbuffers; extern int    XLogArchiveTimeout; extern int
wal_retrieve_retry_interval;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 9336,9341 **** CreateRestartPoint(int flags)
--- 9336,9420 ---- }  /*
+  * Check if the record on the given lsn will be preserved at the next
+  * checkpoint.
+  *
+  * Returns true if it will be preserved. If distance is given, the distance
+  * from origin to the beginning of the first segment kept at the next
+  * checkpoint. It means margin when this function returns true and gap of lost
+  * records when false.
+  *
+  * This function should return the consistent result with KeepLogSeg.
+  */
+ bool
+ GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance)
+ {
+     XLogRecPtr currpos;
+     XLogRecPtr tailpos;
+     uint64 currSeg;
+     uint64 restByteInSeg;
+     uint64 restartSeg;
+     uint64 tailSeg;
+     uint64 keepSegs;
+ 
+     currpos = GetXLogWriteRecPtr();
+ 
+     LWLockAcquire(ControlFileLock, LW_SHARED);
+     tailpos = ControlFile->checkPointCopy.redo;
+     LWLockRelease(ControlFileLock);
+ 
+     /* Move the pointer to the beginning of the segment*/
+     XLByteToSeg(currpos, currSeg);
+     XLByteToSeg(restartLSN, restartSeg);
+     XLByteToSeg(tailpos, tailSeg);
+     restByteInSeg = 0;
+ 
+     Assert(wal_keep_segments >= 0);
+     Assert(max_slot_wal_keep_size_mb >= 0);
+ 
+     /*
+      * WAL are removed by the unit of segment.
+      */
+     keepSegs = wal_keep_segments + ConvertToXSegs(max_slot_wal_keep_size_mb);
+ 
+     /*
+      * If the latest checkpoint's redo point is older than the current head
+      * minus keep segments, the next checkpoint keeps the redo point's
+      * segment. Elsewise use current head minus number of segments to keep.
+      */
+     if (currSeg < tailSeg + keepSegs)
+     {
+         if (currSeg < keepSegs)
+             tailSeg = 0;
+         else
+             tailSeg = currSeg - keepSegs;
+ 
+         /* In this case, the margin will be the bytes to the next segment */
+         restByteInSeg = XLogSegSize - (currpos % XLogSegSize);
+     }
+ 
+     /* Required sements will be removed at the next checkpoint */
+     if (restartSeg < tailSeg)
+     {
+         /* Calculate how may bytes the slot have lost */
+         if (distance)
+         {
+             uint64 restbytes = (restartSeg + 1) * XLogSegSize - restartLSN;
+             *distance =
+                 (tailSeg - restartSeg - 1) * XLogSegSize
+                 + restbytes;
+         }
+         return false;
+     }
+ 
+     /* Margin at the next checkpoint before the slot lose sync  */
+     if (distance)
+         *distance = (restartSeg - tailSeg) * XLogSegSize + restByteInSeg;
+ 
+     return true;
+ }
+ 
+ /*  * Retreat *logSegNo to the last segment that we need to retain because of  * either wal_keep_segments or
replicationslots.  *
 
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 793,799 **** CREATE VIEW pg_replication_slots AS             L.xmin,             L.catalog_xmin,
L.restart_lsn,
!             L.confirmed_flush_lsn     FROM pg_get_replication_slots() AS L             LEFT JOIN pg_database D ON
(L.datoid= D.oid); 
 
--- 793,801 ----             L.xmin,             L.catalog_xmin,             L.restart_lsn,
!             L.confirmed_flush_lsn,
!             L.live,
!             L.distance     FROM pg_get_replication_slots() AS L             LEFT JOIN pg_database D ON (L.datoid =
D.oid);
 
*** a/src/backend/replication/slotfuncs.c
--- b/src/backend/replication/slotfuncs.c
***************
*** 182,188 **** pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) {
! #define PG_GET_REPLICATION_SLOTS_COLS 11     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc   tupdesc;     Tuplestorestate *tupstore;
 
--- 182,188 ---- Datum pg_get_replication_slots(PG_FUNCTION_ARGS) {
! #define PG_GET_REPLICATION_SLOTS_COLS 13     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc   tupdesc;     Tuplestorestate *tupstore;
 
***************
*** 304,309 **** pg_get_replication_slots(PG_FUNCTION_ARGS)
--- 304,323 ----         else             nulls[i++] = true; 
+         if (max_slot_wal_keep_size_mb > 0 && restart_lsn != InvalidXLogRecPtr)
+         {
+             uint64 distance;
+ 
+             values[i++] = BoolGetDatum(GetMarginToSlotSegmentLimit(restart_lsn,
+                                                                    &distance));
+             values[i++] = Int64GetDatum(distance);
+         }
+         else
+         {
+             values[i++] = BoolGetDatum(true);
+             nulls[i++] = true;
+         }
+          tuplestore_putvalues(tupstore, tupdesc, values, nulls);     }
LWLockRelease(ReplicationSlotControlLock);
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 267,272 **** extern void ShutdownXLOG(int code, Datum arg);
--- 267,273 ---- extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool
CreateRestartPoint(intflags);
 
+ extern bool GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance); extern void XLogPutNextOid(Oid
nextOid);extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void);
 
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 5347,5353 **** DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a
physicalreplication slot"); DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1
02278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a
replicationslot");
 
! DATA(insert OID = 3781 (  pg_get_replication_slots    PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 ""
"{19,19,25,26,16,16,23,28,28,3220,3220}""{o,o,o,o,o,o,o,o,o,o,o}"
"{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}"
_null__null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in
use");DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19
1916" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_
pg_create_logical_replication_slot_null_ _null_ _null_ )); DESCR("set up a logical replication slot");
 
--- 5347,5353 ---- DESCR("create a physical replication slot"); DATA(insert OID = 3780 (  pg_drop_replication_slot
PGNSPPGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_
_null__null_ )); DESCR("drop a replication slot");
 
! DATA(insert OID = 3781 (  pg_get_replication_slots    PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 ""
"{19,19,25,26,16,16,23,28,28,3220,3220,16,3220}""{o,o,o,o,o,o,o,o,o,o,o,o,o}"
"{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,live,distance}"
_null__null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in
use");DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19
1916" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_
pg_create_logical_replication_slot_null_ _null_ _null_ )); DESCR("set up a logical replication slot");
 
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
***************
*** 1451,1458 **** pg_replication_slots| SELECT l.slot_name,     l.xmin,     l.catalog_xmin,     l.restart_lsn,
!     l.confirmed_flush_lsn
!    FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn)      LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT
pg_authid.rolname,    pg_authid.rolsuper,
 
--- 1451,1460 ----     l.xmin,     l.catalog_xmin,     l.restart_lsn,
!     l.confirmed_flush_lsn,
!     l.live,
!     l.distance
!    FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin,
catalog_xmin,restart_lsn, confirmed_flush_lsn, live, distance)      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles|SELECT pg_authid.rolname,     pg_authid.rolsuper, 

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Alvaro Herrera
Дата:
Сообщение: Re: [HACKERS] WAL logging problem in 9.4.3?
Следующее
От: Andreas Karlsson
Дата:
Сообщение: Re: [HACKERS] generated columns