Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least 9.5)?

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least 9.5)?
Дата
Msg-id 20170120.110729.107284864.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least 9.5)?  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Ответы Re: [HACKERS] [BUGS] Bug in Physical Replication Slots (at least9.5)?  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Список pgsql-hackers
Hello,

At Thu, 19 Jan 2017 18:37:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20170119.183731.223893446.horiguchi.kyotaro@lab.ntt.co.jp>
> > > - Delaying recycling a segment until the last partial record on it
> > >   completes. This seems doable in page-wise (coarse resolution)
> > >   but would cost additional reading of past xlog files (page
> > >   header of past pages is required).
> > 
> > Hm, yes. That looks like the least invasive way to go. At least that
> > looks more correct than the others.
> 
> The attached patch does that. Usually it reads page headers only
> on segment boundaries, but once continuation record found (or
> failed to read the next page header, that is, the first record on
> the first page in the next segment has not been replicated), it
> becomes to happen on every page boundary until non-continuation
> page comes.
> 
> I leave a debug info (at LOG level) in the attached file shown on
> every state change of keep pointer. At least for pgbench, the
> cost seems ignorable.

I revised it. It became neater and less invasive.
- Removed added keep from struct WalSnd. It is never referrenced  from other processes. It is static variable now.
- Restore keepPtr from replication slot on starting.
- Moved the main part to more appropriate position.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..0270474 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -185,6 +185,12 @@ static volatile sig_atomic_t replication_active = false;static LogicalDecodingContext
*logical_decoding_ctx= NULL;static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/*
+ * Segment keep pointer for physical slots. Has a valid value only when it
+ * differs from the current flush pointer.
+ */
+static XLogRecPtr       keepPtr = InvalidXLogRecPtr;
+/* Signal handlers */static void WalSndSigHupHandler(SIGNAL_ARGS);static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -217,7 +223,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transtatic void
WalSndWriteData(LogicalDecodingContext*ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);static XLogRecPtr
WalSndWaitForWal(XLogRecPtrloc);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);/* Initialize walsender process
beforeentering the main command loop */
 
@@ -538,6 +544,9 @@ StartReplication(StartReplicationCmd *cmd)            ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),                    (errmsg("cannot use a logical replication slot
forphysical replication"))));
 
+
+        /* Restore keepPtr from replication slot */
+        keepPtr = MyReplicationSlot->data.restart_lsn;    }    /*
@@ -553,6 +562,10 @@ StartReplication(StartReplicationCmd *cmd)    else        FlushPtr = GetFlushRecPtr();
+    /* Set InvalidXLogRecPtr if catching up */
+    if (keepPtr == FlushPtr)
+        keepPtr = InvalidXLogRecPtr;
+        if (cmd->timeline != 0)    {        XLogRecPtr    switchpoint;
@@ -774,7 +787,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req        count =
flushptr- targetPagePtr;    /* now actually read the data, we know it's there */
 
-    XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+    XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);    return count;}
@@ -1551,7 +1564,7 @@ static voidProcessStandbyReplyMessage(void){    XLogRecPtr    writePtr,
-                flushPtr,
+                flushPtr, oldFlushPtr,                applyPtr;    bool        replyRequested;
@@ -1580,6 +1593,7 @@ ProcessStandbyReplyMessage(void)        WalSnd       *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
+        oldFlushPtr = walsnd->flush;        walsnd->write = writePtr;        walsnd->flush = flushPtr;
walsnd->apply= applyPtr;
 
@@ -1597,7 +1611,78 @@ ProcessStandbyReplyMessage(void)        if (SlotIsLogical(MyReplicationSlot))
LogicalConfirmReceivedLocation(flushPtr);       else
 
-            PhysicalConfirmReceivedLocation(flushPtr);
+        {
+            /*
+             * On recovery, a continuation reocrd must be available from
+             * single WAL source. So physical replication slot should stay in
+             * the first segment for a continuation record spanning multiple
+             * segments. Since this doesn't look into individual record,
+             * keepPtr may stay a bit too behind.
+             *
+             * Since the objective is avoding to remove required segments,
+             * checking every segment is enough. But once keepPtr goes behind,
+             * check every page for quick restoration.
+             *
+             * keepPtr has a valid value only when it is behind flushPtr.
+             */
+            if (oldFlushPtr != InvalidXLogRecPtr &&
+                (keepPtr == InvalidXLogRecPtr ?
+                 oldFlushPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+                 keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ))
+            {
+                XLogRecPtr rp;
+                XLogRecPtr oldKeepPtr = keepPtr; /* for debug */
+
+                if (keepPtr == InvalidXLogRecPtr)
+                    keepPtr = oldFlushPtr;
+
+                rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+                /*
+                 * We may have let the record at flushPtr sent, so it's worth
+                 * looking
+                 */
+                while (rp <= flushPtr)
+                {
+                    XLogPageHeaderData header;
+
+                    /*
+                     * If the page header is not available for now, don't move
+                     * keepPtr forward. We can read it by the next chance.
+                     */
+                    if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+                    {
+                        bool found;
+                        /*
+                         * Fetch the page header of the next page. Move
+                         * keepPtr forward only if when it is not a
+                         * continuation page.
+                         */
+                        found = XLogRead((char *)&header, rp,
+                                             sizeof(XLogPageHeaderData), true);
+                        if (found &&
+                            (header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+                            keepPtr = rp;
+                    }
+                    rp += XLOG_BLCKSZ;
+                }
+
+                /*
+                 * If keepPtr is on the same page with flushPtr, it means that
+                 * we are catching up
+                 */
+                if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+                    keepPtr = InvalidXLogRecPtr;
+
+                if (oldKeepPtr != keepPtr)
+                    elog(LOG, "%lX => %lX / %lX",
+                         oldKeepPtr, keepPtr, flushPtr); 
+            }
+
+            /* keepPtr == InvalidXLogRecPtr means catching up */
+            PhysicalConfirmReceivedLocation(keepPtr != InvalidXLogRecPtr ?
+                                            keepPtr : flushPtr);
+        }    }}
@@ -2019,6 +2104,7 @@ WalSndKill(int code, Datum arg)/* * Read 'count' bytes from WAL into 'buf', starting at location
'startptr'
+ * Returns false if the segment file is not found when notfoundok is true. * * XXX probably this should be improved to
suckdata directly from the * WAL buffers when possible.
 
@@ -2028,8 +2114,8 @@ WalSndKill(int code, Datum arg) * always be one descriptor left open until the process ends, but
never* more than one. */
 
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok){    char       *p;    XLogRecPtr    recptr;
@@ -2106,10 +2192,15 @@ retry:                 * removed or recycled.                 */                if (errno ==
ENOENT)
+                {
+                    if (notfoundok)
+                        return false;
+                    ereport(ERROR,                            (errcode_for_file_access(),
errmsg("requestedWAL segment %s has already been removed",
XLogFileNameP(curFileTimeLine,sendSegNo))));
 
+                }                else                    ereport(ERROR,
(errcode_for_file_access(),
@@ -2189,6 +2280,8 @@ retry:            goto retry;        }    }
+
+    return true;}/*
@@ -2393,7 +2486,7 @@ XLogSendPhysical(void)     * calls.     */    enlargeStringInfo(&output_message, nbytes);
-    XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+    XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);    output_message.len += nbytes;
output_message.data[output_message.len]= '\0'; 

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: [HACKERS] Failure on sittella
Следующее
От: Haribabu Kommi
Дата:
Сообщение: Re: [HACKERS] [WIP]Vertical Clustered Index (columnar store extension)