Re: [BUGS] Bug in Physical Replication Slots (at least 9.5)?

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [BUGS] Bug in Physical Replication Slots (at least 9.5)?
Дата
Msg-id 20170119.183731.223893446.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [BUGS] Bug in Physical Replication Slots (at least 9.5)?  (Michael Paquier <michael.paquier@gmail.com>)
Ответы Re: [BUGS] Bug in Physical Replication Slots (at least 9.5)?  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Re: [BUGS] Bug in Physical Replication Slots (at least 9.5)?  (Fujii Masao <masao.fujii@gmail.com>)
Список pgsql-bugs
Hello,

At Wed, 18 Jan 2017 12:34:51 +0900, Michael Paquier <michael.paquier@gmail.com> wrote in
<CAB7nPqQytF2giE7FD-4oJJpPVwiKJrDQPc24hLNGThX01SbSmA@mail.gmail.com>
> On Tue, Jan 17, 2017 at 7:36 PM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > I managed to reproduce this. A little tweak as the first patch
> > lets the standby to suicide as soon as walreceiver sees a
> > contrecord at the beginning of a segment.
> 
> Good idea.

Thanks. Fortunately(?), the problematic situation seems to happen
at almost all segment boundary.

> > I believe that a continuation record cannot be span over three or
> > more *segments* (is it right?), so keeping one spare segment
> > would be enough. The attached second patch does this.
> 
> I have to admit that I did not think about this problem much yet (I
> bookmarked this report weeks ago to be honest as something to look
> at), but that does not look right to me. Couldn't a record be spawned
> across even more segments? Take a random string longer than 64MB or
> event longer for example.

Though I haven't look closer to how a modification is splitted
into WAL records. A tuple cannot be so long. As a simple test, I
observed rechder->xl_tot_len at the end of XLogRecordAssemble
inserting an about 400KB not-so-compressable string into a text
column, but I saw a series of many records with shorter than
several thousand bytes.

> > Other possible measures might be,
> >
> > - Allowing switching wal source while reading a continuation
> >   record. Currently ReadRecord assumes that a continuation record
> >   can be read from single source. But this needs refactoring
> >   involving xlog.c, xlogreader.c and relatives.
> 
> This is scary thinking about back-branches.

Yes. It would be no longer a bug fix. (Or becomes a quite ugly hack..)

> > - Delaying recycling a segment until the last partial record on it
> >   completes. This seems doable in page-wise (coarse resolution)
> >   but would cost additional reading of past xlog files (page
> >   header of past pages is required).
> 
> Hm, yes. That looks like the least invasive way to go. At least that
> looks more correct than the others.

The attached patch does that. Usually it reads page headers only
on segment boundaries, but once continuation record found (or
failed to read the next page header, that is, the first record on
the first page in the next segment has not been replicated), it
becomes to happen on every page boundary until non-continuation
page comes.

I leave a debug info (at LOG level) in the attached file shown on
every state change of keep pointer. At least for pgbench, the
cost seems ignorable.

> > - Delaying write/flush feedback until the current record is
> >   completed. walreceiver is not conscious of a WAL record and
> >   this might break synchronous replication.
> 
> Not sure about this one yet.

I'm not sure, too:p

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..6b3abc5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -217,7 +217,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transtatic void
WalSndWriteData(LogicalDecodingContext*ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);static XLogRecPtr
WalSndWaitForWal(XLogRecPtrloc);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);/* Initialize walsender process
beforeentering the main command loop */
 
@@ -774,7 +774,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req        count =
flushptr- targetPagePtr;    /* now actually read the data, we know it's there */
 
-    XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+    XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);    return count;}
@@ -1551,8 +1551,9 @@ static voidProcessStandbyReplyMessage(void){    XLogRecPtr    writePtr,
-                flushPtr,
-                applyPtr;
+                flushPtr, oldFlushPtr,
+                applyPtr,
+                keepPtr;    bool        replyRequested;    /* the caller already consumed the msgtype byte */
@@ -1580,24 +1581,99 @@ ProcessStandbyReplyMessage(void)        WalSnd       *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
+        keepPtr = walsnd->keep;
+        oldFlushPtr = walsnd->flush;        walsnd->write = writePtr;        walsnd->flush = flushPtr;
walsnd->apply= applyPtr;        SpinLockRelease(&walsnd->mutex);    }
 
+    /*
+     * If we are managed by a replication slot, maintain keepPtr on the page
+     * where the first fragment of the continuation record at flushPtr. Since
+     * this doesn't look into individual record, keepPtr may stay a bit too
+     * behind.
+     */
+    if (MyReplicationSlot &&
+        flushPtr != InvalidXLogRecPtr && oldFlushPtr != InvalidXLogRecPtr)
+    {
+        /*
+         * If keepPtr is cathing up, we do nothing until the next segment
+         * comes. Otherwise check on every page boundary.
+         */
+        if (oldKeepPtr == InvalidXLogRecPtr ?
+            keepPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+            keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ)
+        {
+            XLogRecPtr oldKeepPtr = keepPtr;
+            XLogRecPtr rp;
+
+            if (keepPtr == InvalidXLogRecPtr)
+                keepPtr = oldFlushPtr;
+
+            rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+            /* We may have the record at flushPtr, so it's worth looking */
+            while (rp <= flushPtr)
+            {
+                XLogPageHeaderData header;
+
+                /*
+                 * If we don't have enough wal data, don't move keepPtr
+                 * forward. We may read it by the next chance.
+                 */
+                if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+                {
+                    bool found;
+                    /*
+                     * Fetch the page header of the next page. Move keepPtr
+                     * forward only if when it is not a continuing page.
+                     */
+                    found = XLogRead((char *)&header,
+                                     rp, sizeof(XLogPageHeaderData), true);
+                    if (found &&
+                        (header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+                        keepPtr = rp;
+                }
+                rp += XLOG_BLCKSZ;
+            }
+
+            /*
+             * If keepPtr is on the same page with flushPtr, it means catching
+             * up
+             */
+            if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+                keepPtr = InvalidXLogRecPtr;
+
+            if (oldKeepPtr != keepPtr)
+            {
+                WalSnd       *walsnd = MyWalSnd;
+                elog(LOG, "%lX => %lX / %lX", oldKeepPtr, keepPtr, flushPtr); 
+                SpinLockAcquire(&walsnd->mutex);
+                walsnd->keep = keepPtr;
+                SpinLockRelease(&walsnd->mutex);
+            }
+        }
+    }
+    if (!am_cascading_walsender)        SyncRepReleaseWaiters();    /*     * Advance our local xmin horizon when the
clientconfirmed a flush.     */
 
-    if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+    if (MyReplicationSlot)    {
-        if (SlotIsLogical(MyReplicationSlot))
+        if (SlotIsLogical(MyReplicationSlot) && flushPtr != InvalidXLogRecPtr)
LogicalConfirmReceivedLocation(flushPtr);       else
 
-            PhysicalConfirmReceivedLocation(flushPtr);
+        {
+            /* keepPtr == InvalidXLogRecPtr means catching up */
+            if (keepPtr == InvalidXLogRecPtr)
+                keepPtr = flushPtr;
+            PhysicalConfirmReceivedLocation(keepPtr);
+        }    }}
@@ -2019,6 +2095,7 @@ WalSndKill(int code, Datum arg)/* * Read 'count' bytes from WAL into 'buf', starting at location
'startptr'
+ * Returns false if the segment file is not found iff notfoundok is true. * * XXX probably this should be improved to
suckdata directly from the * WAL buffers when possible.
 
@@ -2028,8 +2105,8 @@ WalSndKill(int code, Datum arg) * always be one descriptor left open until the process ends, but
never* more than one. */
 
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok){    char       *p;    XLogRecPtr    recptr;
@@ -2106,10 +2183,15 @@ retry:                 * removed or recycled.                 */                if (errno ==
ENOENT)
+                {
+                    if (notfoundok)
+                        return false;
+                    ereport(ERROR,                            (errcode_for_file_access(),
errmsg("requestedWAL segment %s has already been removed",
XLogFileNameP(curFileTimeLine,sendSegNo))));
 
+                }                else                    ereport(ERROR,
(errcode_for_file_access(),
@@ -2189,6 +2271,8 @@ retry:            goto retry;        }    }
+
+    return true;}/*
@@ -2393,7 +2477,7 @@ XLogSendPhysical(void)     * calls.     */    enlargeStringInfo(&output_message, nbytes);
-    XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+    XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);    output_message.len += nbytes;
output_message.data[output_message.len]= '\0';
 
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5e6ccfc..084146d 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -47,6 +47,13 @@ typedef struct WalSnd    XLogRecPtr    flush;    XLogRecPtr    apply;
+    /*
+     * Segment-spanning continuation records requires that the all related
+     * segments preserved. This holds how far we should preserve older
+     * segments only when it differs to flush location.
+     */
+    XLogRecPtr    keep;
+    /* Protects shared variables shown above. */    slock_t        mutex;

В списке pgsql-bugs по дате отправления:

Предыдущее
От: Вадим Акбашев
Дата:
Сообщение: Re: [BUGS] Strange influence of default_statistics_target
Следующее
От: Ron Ben
Дата:
Сообщение: [BUGS] installation of older versions