Re: Logical replication keepalive flood

Поиск
Список
Период
Сортировка
От Kyotaro Horiguchi
Тема Re: Logical replication keepalive flood
Дата
Msg-id 20210610.151231.80515139203984762.horikyota.ntt@gmail.com
обсуждение исходный текст
Ответ на Re: Logical replication keepalive flood  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Ответы Re: Logical replication keepalive flood  (Amit Kapila <amit.kapila16@gmail.com>)
Список pgsql-hackers
At Thu, 10 Jun 2021 15:00:16 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> At Wed, 9 Jun 2021 17:32:25 +0500, Abbas Butt <abbas.butt@enterprisedb.com> wrote in 
> > 
> > On Wed, Jun 9, 2021 at 2:30 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > > Is it possible that the write/flush location is not
> > > updated at the pace at which we expect?
> 
> Yes. MyWalSnd->flush/write are updated far frequently but still
> MyWalSnd->write is behind sentPtr by from thousands of bytes up to
> less than 1 block (1block = 8192 bytes). (Flush lags are larger than
> write lags, of course.)

For more clarity, I changed the previous patch a bit and retook numbers.

Total records: 19476
  8:     2 /     4 /     2:    4648 /  302472
 16:     5 /    10 /     5:    5427 /  139872
 24:  3006 /  6015 /  3028:    4739 /  267215
187:     2 /     0 /    50:       1 /     398

While a 10 seconds run of pgbench, it walsender reads 19476 records
and calls logical_read_xlog_page() 3028 times, and the mean of write
lag is 4739 bytes and flush lag is 267215 bytes (really?), as the
result most of the record fetch causes a keep alive. (The WAL contains
many FPIs).

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 42738eb940..ee78116e79 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -571,6 +571,7 @@ err:
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
  */
+int hogestate = -1;
 static int
 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 {
@@ -605,6 +606,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
     {
         XLogRecPtr    targetSegmentPtr = pageptr - targetPageOff;
 
+        hogestate = pageptr + XLOG_BLCKSZ - state->currRecPtr;
         readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
                                            state->currRecPtr,
                                            state->readBuf);
@@ -623,6 +625,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
      * First, read the requested data length, but at least a short page header
      * so that we can validate it.
      */
+    hogestate = pageptr + Max(reqLen, SizeOfXLogShortPHD) - state->currRecPtr;
     readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
                                        state->currRecPtr,
                                        state->readBuf);
@@ -642,6 +645,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
     /* still not enough */
     if (readLen < XLogPageHeaderSize(hdr))
     {
+        hogestate = pageptr + XLogPageHeaderSize(hdr) - state->currRecPtr;
         readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
                                            state->currRecPtr,
                                            state->readBuf);
@@ -649,6 +653,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
             goto err;
     }
 
+    hogestate = -1;
     /*
      * Now that we know we have the full header, validate it.
      */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723f4e..62f5f09fee 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1363,17 +1363,49 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
  * if we detect a shutdown request (either from postmaster or client)
  * we will return early, so caller must always check.
  */
+unsigned long counts[32768][3] = {0};
+unsigned long lagw[32768] = {0};
+unsigned long lagf[32768] = {0};
+unsigned long nrec = 0;
+void
+PrintCounts(void)
+{
+    int i = 0;
+    ereport(LOG, (errmsg ("Total records: %lu", nrec), errhidestmt(true)));
+    nrec = 0;
+
+    for (i = 0 ; i < 32768 ; i++)
+    {
+        if (counts[i][0] + counts[i][1] + counts[i][2] > 0)
+        {
+            unsigned long wl = 0, fl = 0;
+            if (counts[i][1] > 0)
+            {
+                wl = lagw[i] / counts[i][0];
+                fl = lagf[i] / counts[i][0];
+            
+                ereport(LOG, (errmsg ("%5d: %5lu / %5lu / %5lu: %7lu / %7lu",
+                                      i, counts[i][1], counts[i][2], counts[i][0], wl, fl), errhidestmt(true)));
+            }
+            counts[i][0] = counts[i][1] = counts[i][2] = lagw[i] = lagf[i] = 0;
+        }
+    }
+}
+
 static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
     int            wakeEvents;
     static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+    extern int hogestate;
+    bool        lagtaken = false;
 
     /*
      * Fast path to avoid acquiring the spinlock in case we already know we
      * have enough WAL available. This is particularly interesting if we're
      * far behind.
      */
+    counts[hogestate][0]++;
     if (RecentFlushPtr != InvalidXLogRecPtr &&
         loc <= RecentFlushPtr)
         return RecentFlushPtr;
@@ -1439,7 +1471,39 @@ WalSndWaitForWal(XLogRecPtr loc)
         if (MyWalSnd->flush < sentPtr &&
             MyWalSnd->write < sentPtr &&
             !waiting_for_ping_response)
+        {
+            if (hogestate >= 0)
+            {
+                counts[hogestate][1]++;
+                if (!lagtaken)
+                {
+                    lagf[hogestate] += sentPtr - MyWalSnd->flush;
+                    lagw[hogestate] += sentPtr - MyWalSnd->write;
+                    lagtaken = true;
+                }
+            }
+//            ereport(LOG, (errmsg ("KA[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld",
+//                                  ka, na, ka + na,
+//                                  LSN_FORMAT_ARGS(MyWalSnd->flush),
+//                                  LSN_FORMAT_ARGS(MyWalSnd->write),
+//                                  LSN_FORMAT_ARGS(sentPtr),
+//                                  waiting_for_ping_response,
+//                                  sentPtr - MyWalSnd->write)));
             WalSndKeepalive(false);
+        }
+        else
+        {
+            if (hogestate >= 0)
+                counts[hogestate][2]++;
+
+//            ereport(LOG, (errmsg ("kap[%lu/%lu/%lu]: %X/%X %X/%X %X/%X %d: %ld",
+//                                  ka, na, ka + na,
+//                                  LSN_FORMAT_ARGS(MyWalSnd->flush),
+//                                  LSN_FORMAT_ARGS(MyWalSnd->write),
+//                                  LSN_FORMAT_ARGS(sentPtr),
+//                                  waiting_for_ping_response,
+//                                  sentPtr - MyWalSnd->write)));
+        }
 
         /* check whether we're done */
         if (loc <= RecentFlushPtr)
@@ -2843,6 +2907,7 @@ XLogSendLogical(void)
 {
     XLogRecord *record;
     char       *errm;
+    extern unsigned long nrec;
 
     /*
      * We'll use the current flush point to determine whether we've caught up.
@@ -2860,6 +2925,7 @@ XLogSendLogical(void)
      */
     WalSndCaughtUp = false;
 
+    nrec++;
     record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
 
     /* xlog record was invalid */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Kyotaro Horiguchi
Дата:
Сообщение: Re: Logical replication keepalive flood
Следующее
От: Amit Kapila
Дата:
Сообщение: Re: Logical replication keepalive flood