Re: archive status ".ready" files may be created too early

Поиск
Список
Период
Сортировка
От Kyotaro Horiguchi
Тема Re: archive status ".ready" files may be created too early
Дата
Msg-id 20201218.144205.14089619073535731.horikyota.ntt@gmail.com
обсуждение исходный текст
Ответ на Re: archive status ".ready" files may be created too early  (Kyotaro Horiguchi <horikyota.ntt@gmail.com>)
Ответы Re: archive status ".ready" files may be created too early  (Andrey Borodin <x4mmm@yandex-team.ru>)
Re: archive status ".ready" files may be created too early  (Andrey Borodin <x4mmm@yandex-team.ru>)
Список pgsql-hackers
At Wed, 16 Dec 2020 11:01:20 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in 
> - Record the beginning LSN of the first cross-seg record and the end
>   LSN of the last cross-seg recrod in a consecutive segments bonded by
>   cross-seg recrods. Spcifically X and Y below.
> 
>        X                 Z         Y    
>        [recA]  [recB]         [recC]
>   [seg A] [seg B] [seg C] [seg D] [seg E]
> (1)    (2.2)    (2.2)  (2.1)   (2.1)   (1)
> 
> 1. If we wrote upto before X or beyond Y at a segment boundary, notify
>   the finished segment immediately.
> 
>   1.1. If we have written beyond Y, clear the recorded region.
> 
> 2. Otherwise we don't notify the segment immediately:
> 
>   2.1. If write request was up to exactly the current segment boundary
>     and we know the end LSN of the record there (that is, it is recC
>     above), extend the request to the end LSN. Then notify the segment
>     after the record is written to the end.
> 
>   2.2. Otherwise (that is recA or recB), we don't know whether the
>     last record of the last segment is ends just at the segment boundary
>     (Z) or a record spans between segments (recB). Anyway even if there
>     is such a record there, we don't know where it ends.  As the result
>     what we can do there is only to refrain from notifying. It doesn't
>     matter so much since we have already inserted recC so we will soon
>     reach recC and will notify up to seg C.
> 
> There might be a case where we insert up to Y before writing up to Z,
> the segment-region X-Y contains non-connected segment boundary in that
> case. It is processed as if it is a connected segment
> boundary. However, like 2.2 above, It doesn't matter since we write up
> to Y soon.

I noticed that we can cause the continuation record flushed
immedately. So in the attached,

1. If there's no remembered cross-segment boundary or we're out of the
  region X-Y, notify the finished segment immediately.

2. Otherwise we don't notify the segment immedately

 2.1. If we are finishing the last semgment known to continue to the
   next segment, extend write request to the end of the recrod *and*
   force to write then flush up to there.

 2.2. (the same to the above)

3. In the case of 2.1, we can flush the previous segment immediately
  so do that.

X. When we notify a segment, clear the rememberd region if we have got
  out of the region.


The attached is changed in the following points:

- Fixed some bugs that I confusedly refer to write-lsn instead of flush-lsn.

- Changed to urge flushing up to the end of a continuation record, not
  only waiting for the recrod to be written.

- More agressively clear the remembered region.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 95e438ee448c1686c946909f1fc84ec95ee6c7d4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Fri, 18 Dec 2020 13:45:29 +0900
Subject: [PATCH v5 1/2] Avoid archiving a WAL segment that continues to the
 next segment

If the last record of a finshed segment continues to the next segment,
we need to defer archiving of the segment until the record is flushed
to the end. Otherwise crash recovery can overwrite the last record of
a segment and history diverges between archive and pg_wal.
---
 src/backend/access/transam/xlog.c   | 221 +++++++++++++++++++++++++++-
 src/backend/replication/walsender.c |  14 +-
 src/include/access/xlog.h           |   1 +
 3 files changed, 224 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b1e5d2dbff..b0d3ba2c5a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -733,6 +733,16 @@ typedef struct XLogCtlData
      */
     XLogRecPtr    lastFpwDisableRecPtr;
 
+    /* The last segment notified to be archived. Protected by WALWriteLock */
+    XLogSegNo    lastNotifiedSeg;
+
+    /*
+     * Remember the region we need to consider refraining from archiving
+     * finished segments immediately. Protected by info_lck.
+     */
+    XLogRecPtr    firstSegContRecStart;
+    XLogRecPtr    lastSegContRecEnd;
+    
     slock_t        info_lck;        /* locks shared variables shown above */
 } XLogCtlData;
 
@@ -1170,6 +1180,9 @@ XLogInsertRecord(XLogRecData *rdata,
      */
     if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
     {
+        XLogSegNo startseg;
+        XLogSegNo endseg;
+
         SpinLockAcquire(&XLogCtl->info_lck);
         /* advance global request to include new block(s) */
         if (XLogCtl->LogwrtRqst.Write < EndPos)
@@ -1177,6 +1190,22 @@ XLogInsertRecord(XLogRecData *rdata,
         /* update local result copy while I have the chance */
         LogwrtResult = XLogCtl->LogwrtResult;
         SpinLockRelease(&XLogCtl->info_lck);
+
+        /*
+         * Remember the range of segment boundaries that are connected by a
+         * continuation record.
+         */
+        XLByteToSeg(StartPos, startseg, wal_segment_size);
+        XLByteToPrevSeg(EndPos, endseg, wal_segment_size);
+
+        if (startseg != endseg)
+        {
+            SpinLockAcquire(&XLogCtl->info_lck);
+            if (XLogCtl->firstSegContRecStart == InvalidXLogRecPtr)
+                XLogCtl->firstSegContRecStart = StartPos;
+            XLogCtl->lastSegContRecEnd = EndPos;
+            SpinLockRelease(&XLogCtl->info_lck);
+        }
     }
 
     /*
@@ -2410,6 +2439,50 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
     return false;
 }
 
+/*
+ * Returns last notified segment.
+ */
+static XLogSegNo
+GetLastNotifiedSegment(void)
+{
+    XLogSegNo last_notified;
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    last_notified = XLogCtl->lastNotifiedSeg;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    return last_notified;
+}
+
+/*
+ * Notify segments that are not yet notified.
+ */
+static void
+NotifySegmentsUpTo(XLogSegNo notifySegNo)
+{
+    XLogSegNo last_notified = GetLastNotifiedSegment();
+    XLogSegNo i;
+
+    if (notifySegNo <= last_notified)
+        return;
+
+    for (i = XLogCtl->lastNotifiedSeg + 1 ; i <= notifySegNo ; i++)
+        XLogArchiveNotifySeg(i);
+
+    /* Don't go back in the case someone else has made it go further. */
+    SpinLockAcquire(&XLogCtl->info_lck);
+    if (XLogCtl->lastNotifiedSeg < notifySegNo)
+        XLogCtl->lastNotifiedSeg = notifySegNo;
+
+    /* Reset the locations if we have got out of the continuation region. */
+    if (XLogCtl->lastSegContRecEnd <= XLogCtl->LogwrtRqst.Flush)
+    {
+        XLogCtl->firstSegContRecStart = InvalidXLogRecPtr;
+        XLogCtl->lastSegContRecEnd = InvalidXLogRecPtr;
+    }
+    SpinLockRelease(&XLogCtl->info_lck);
+}
+
 /*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
@@ -2433,6 +2506,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
     int            npages;
     int            startidx;
     uint32        startoffset;
+    bool        notify_seg = false;
+    bool        force_continue = false;
 
     /* We should always be inside a critical section here */
     Assert(CritSectionCount > 0);
@@ -2589,15 +2664,97 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
              */
             if (finishing_seg)
             {
+                XLogRecPtr firstSegContRecStart;
+                XLogRecPtr lastSegContRecEnd;
+
                 issue_xlog_fsync(openLogFile, openLogSegNo);
 
                 /* signal that we need to wakeup walsenders later */
                 WalSndWakeupRequest();
 
-                LogwrtResult.Flush = LogwrtResult.Write;    /* end of page */
+                SpinLockAcquire(&XLogCtl->info_lck);
+                firstSegContRecStart = XLogCtl->firstSegContRecStart;
+                lastSegContRecEnd = XLogCtl->lastSegContRecEnd;
+                SpinLockRelease(&XLogCtl->info_lck);
 
-                if (XLogArchivingActive())
-                    XLogArchiveNotifySeg(openLogSegNo);
+                /*
+                 * If we may be on a continuation record spans over segments,
+                 * don't archive the segment until the record is written to the
+                 * end. If we do, we could have corrupt archive having
+                 * different records at the boundary after a server crash
+                 * around here.  For the same reason, also for replication,
+                 * don't expose flush location until the record is written to
+                 * the end so that an incomplete record at segment boundary
+                 * won't be sent to standby.
+                 */
+                if (firstSegContRecStart == InvalidXLogRecPtr ||
+                    LogwrtResult.Write < firstSegContRecStart ||
+                    lastSegContRecEnd <= LogwrtResult.Write)
+                {
+                    LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
+
+                    if (XLogArchivingActive())
+                        NotifySegmentsUpTo(openLogSegNo);
+
+                    /* Already notified */
+                    notify_seg = false;
+                    force_continue = false;
+                }
+                else
+                {
+                    /*
+                     * The last record in the finishing segment may be
+                     * continuing into the next segment. Don't archive the
+                     * segment until we are sure that the continuation record
+                     * is completely written out.
+                     */
+
+                    XLogSegNo reqendseg;
+                    XLogSegNo contendseg;
+
+                    force_continue = true;
+
+                    XLByteToSeg(WriteRqst.Write, reqendseg, wal_segment_size);
+                    XLByteToSeg(lastSegContRecEnd, contendseg,
+                                wal_segment_size);
+
+                    /*
+                     * There's a case where we are told to flush up not to the
+                     * end of a record but to a page boundary. Advance the
+                     * request LSN to the end of the record at the boundary if
+                     * any.
+                     */
+                    if (reqendseg == contendseg)
+                    {
+                        /*
+                         * We finished the last segment in the region. Extend
+                         * the write request to the end of the recrod.
+                         */
+                        if (WriteRqst.Write < lastSegContRecEnd)
+                            WriteRqst.Write = lastSegContRecEnd;
+
+                        /*
+                         * Flush up to at least the same location immediately
+                         * so that the segment can be archived.
+                         */
+                        if (WriteRqst.Flush < lastSegContRecEnd)
+                            WriteRqst.Flush = lastSegContRecEnd;
+
+                        /* We know the record will be finished */
+                        notify_seg = true;
+                    }
+                    else
+                    {
+                        /* We don't know the exact placement of the record
+                         * around the requested write LSN. Do not archive this
+                         * segment since it might ends with an incomplete
+                         * record. In this case we will end up with archiving
+                         * the segment soon since we have written up to further
+                         * segments.
+                         */
+                        notify_seg = false;
+                    }
+                }
 
                 XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
                 XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2626,8 +2783,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
         }
         curridx = NextBufIdx(curridx);
 
-        /* If flexible, break out of loop as soon as we wrote something */
-        if (flexible && npages == 0)
+        /*
+         * If flexible, break out of loop as soon as we wrote something.
+         * However, we don't leave the loop if we have to write a bit more.
+         */
+        if (flexible && !force_continue && npages == 0)
             break;
     }
 
@@ -2685,6 +2845,14 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
             XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
         SpinLockRelease(&XLogCtl->info_lck);
     }
+
+    /*
+     * We have extended the write request to the next segment if the record at
+     * the initial WriteRqst.Write continues to the next segment.  In that case
+     * need to notify the last segment here.
+     */
+    if (notify_seg)
+        NotifySegmentsUpTo(openLogSegNo - 1);
 }
 
 /*
@@ -7716,6 +7884,18 @@ StartupXLOG(void)
     XLogCtl->LogwrtRqst.Write = EndOfLog;
     XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+    /*
+     * We have archived up to the previous segment of EndOfLog so far.
+     * Initialize lastNotifiedSeg if needed.
+     */
+    if (XLogArchivingActive())
+    {
+        XLogSegNo    endLogSegNo;
+
+        XLByteToSeg(EndOfLog, endLogSegNo, wal_segment_size);
+        XLogCtl->lastNotifiedSeg = endLogSegNo - 1;
+    }
+
     /*
      * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
      * record before resource manager writes cleanup WAL records or checkpoint
@@ -8440,6 +8620,37 @@ GetFlushRecPtr(void)
     return LogwrtResult.Flush;
 }
 
+/*
+ * GetReplicationTargetRecPtr -- Returns the latest position that is safe to
+ * replicate.
+ */
+XLogRecPtr
+GetReplicationTargetRecPtr(void)
+{
+    static XLogRecPtr    lastTargetRecPtr = InvalidXLogRecPtr;
+    XLogRecPtr    firstSegContRecStart;
+    XLogRecPtr    lastSegContRecEnd;
+
+    SpinLockAcquire(&XLogCtl->info_lck);
+    LogwrtResult = XLogCtl->LogwrtResult;
+    firstSegContRecStart = XLogCtl->firstSegContRecStart;
+    lastSegContRecEnd = XLogCtl->lastSegContRecEnd;
+    SpinLockRelease(&XLogCtl->info_lck);
+
+    /*
+     * Don't move forward if the current flush position may be within a
+     * continuation record that spans over segments.
+     */
+    if (lastTargetRecPtr == InvalidXLogRecPtr ||
+        firstSegContRecStart == InvalidXLogRecPtr ||
+        XLogSegmentOffset(LogwrtResult.Flush, wal_segment_size) != 0 ||
+        LogwrtResult.Flush < firstSegContRecStart ||
+        lastSegContRecEnd <= LogwrtResult.Flush)
+        lastTargetRecPtr = LogwrtResult.Flush;
+
+    return lastTargetRecPtr;
+}
+
 /*
  * GetLastImportantRecPtr -- Returns the LSN of the last important record
  * inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d5c9bc31d8..c13cca1e64 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2631,14 +2631,14 @@ XLogSendPhysical(void)
         /*
          * Streaming the current timeline on a primary.
          *
-         * Attempt to send all data that's already been written out and
-         * fsync'd to disk.  We cannot go further than what's been written out
-         * given the current implementation of WALRead().  And in any case
-         * it's unsafe to send WAL that is not securely down to disk on the
-         * primary: if the primary subsequently crashes and restarts, standbys
-         * must not have applied any WAL that got lost on the primary.
+         * Attempt to send all data that's can be replicated.  We cannot go
+         * further than what's been written out given the current
+         * implementation of WALRead().  And in any case it's unsafe to send
+         * WAL that is not securely down to disk on the primary: if the primary
+         * subsequently crashes and restarts, standbys must not have applied
+         * any WAL that got lost on the primary.
          */
-        SendRqstPtr = GetFlushRecPtr();
+        SendRqstPtr = GetReplicationTargetRecPtr();
     }
 
     /*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 221af87e71..94876f628c 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -338,6 +338,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetReplicationTargetRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
 
-- 
2.27.0

From db3b7a536199202539a29af50fa55cc46cdff1db Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga.ntt@gmail.com>
Date: Fri, 18 Dec 2020 13:49:43 +0900
Subject: [PATCH v5 2/2] debug print

---
 src/backend/access/transam/xlog.c | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b0d3ba2c5a..ee5494701c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1200,6 +1200,11 @@ XLogInsertRecord(XLogRecData *rdata,
 
         if (startseg != endseg)
         {
+            if (XLogCtl->firstSegContRecStart == InvalidXLogRecPtr)
+                ereport(LOG, (errmsg("REG-REG: (%lX, %lX)", StartPos, EndPos),
errhidestmt(true),errhidecontext(true)));
+            else
+                ereport(LOG, (errmsg("UPD-REG: (%lX, %lX)", XLogCtl->firstSegContRecStart, EndPos),
errhidestmt(true),errhidecontext(true)));
+
             SpinLockAcquire(&XLogCtl->info_lck);
             if (XLogCtl->firstSegContRecStart == InvalidXLogRecPtr)
                 XLogCtl->firstSegContRecStart = StartPos;
@@ -2477,6 +2482,8 @@ NotifySegmentsUpTo(XLogSegNo notifySegNo)
     /* Reset the locations if we have got out of the continuation region. */
     if (XLogCtl->lastSegContRecEnd <= XLogCtl->LogwrtRqst.Flush)
     {
+        ereport(LOG, (errmsg("CLEAR-REG: (%lX, %lX) %lX, %lX", XLogCtl->firstSegContRecStart,
XLogCtl->lastSegContRecEnd,XLogCtl->LogwrtRqst.Write, XLogCtl->LogwrtRqst.Flush),
errhidestmt(true),errhidecontext(true)));
+
         XLogCtl->firstSegContRecStart = InvalidXLogRecPtr;
         XLogCtl->lastSegContRecEnd = InvalidXLogRecPtr;
     }
@@ -2691,6 +2698,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                     LogwrtResult.Write < firstSegContRecStart ||
                     lastSegContRecEnd <= LogwrtResult.Write)
                 {
+                    ereport(LOG, (errmsg("NOTIFY1 %lX-%lX: (%lX, %lX) %lX(/%lX) %lX", GetLastNotifiedSegment() + 1,
openLogSegNo,firstSegContRecStart, lastSegContRecEnd, WriteRqst.Write, XLogSegmentOffset(WriteRqst.Write,
wal_segment_size),LogwrtResult.Write), errhidestmt(true),errhidecontext(true)));
 
+
                     LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
 
                     if (XLogArchivingActive())
@@ -2731,7 +2740,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                          * the write request to the end of the recrod.
                          */
                         if (WriteRqst.Write < lastSegContRecEnd)
+                        {
+                            ereport(LOG, (errmsg("EXTEND-RQST: (%lX, %lX(%lX)) %lX(%lX)", firstSegContRecStart,
lastSegContRecEnd,contendseg, WriteRqst.Write, reqendseg), errhidestmt(true),errhidecontext(true)));
 
                             WriteRqst.Write = lastSegContRecEnd;
+                        }
+                        else
+                            ereport(LOG, (errmsg("NOT-EXTEND-RQST: (%lX, %lX(%lX)) %lX(%lX)", firstSegContRecStart,
lastSegContRecEnd,contendseg, WriteRqst.Write, reqendseg), errhidestmt(true),errhidecontext(true)));
 
 
                         /*
                          * Flush up to at least the same location immediately
@@ -2752,6 +2766,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
                          * the segment soon since we have written up to further
                          * segments.
                          */
+                        ereport(LOG, (errmsg("SKIP-NOTIFY: (%lX, %lX(%lX)) %lX(%lX) => %d", firstSegContRecStart,
lastSegContRecEnd,contendseg, WriteRqst.Write, reqendseg, (contendseg == reqendseg && WriteRqst.Write <
lastSegContRecEnd)),errhidestmt(true),errhidecontext(true)));
 
                         notify_seg = false;
                     }
                 }
@@ -2852,7 +2867,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
      * need to notify the last segment here.
      */
     if (notify_seg)
+    {
+        ereport(LOG, (errmsg("NOTIFY2 %lX-%lX: (%lX, %lX) %lX, seg %lX", GetLastNotifiedSegment() + 1, openLogSegNo -
1,XLogCtl->firstSegContRecStart, XLogCtl->lastSegContRecEnd, LogwrtResult.Flush, openLogSegNo - 1),
errhidestmt(true),errhidecontext(true)));
+
         NotifySegmentsUpTo(openLogSegNo - 1);
+    }
 }
 
 /*
-- 
2.27.0


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Kyotaro Horiguchi
Дата:
Сообщение: Re: archive status ".ready" files may be created too early
Следующее
От: Ajin Cherian
Дата:
Сообщение: Re: [HACKERS] logical decoding of two-phase transactions