Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL

Поиск
Список
Период
Сортировка
От Heikki Linnakangas
Тема Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL
Дата
Msg-id 4BA361E4.7020309@enterprisedb.com
обсуждение исходный текст
Ответ на Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL  (Simon Riggs <simon@2ndQuadrant.com>)
Ответы Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL  (Tom Lane <tgl@sss.pgh.pa.us>)
Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL  (Alvaro Herrera <alvherre@commandprompt.com>)
Re: Re: [COMMITTERS] pgsql: Make standby server continuously retry restoring the next WAL  (Fujii Masao <masao.fujii@gmail.com>)
Список pgsql-hackers
Simon Riggs wrote:
> On Thu, 2010-03-18 at 23:27 +0900, Fujii Masao wrote:
>
>> I agree that this is a bigger problem. Since the standby always starts
>> walreceiver before replaying any WAL files in pg_xlog, walreceiver tries
>> to receive the WAL files following the REDO starting point even if they
>> have already been in pg_xlog. IOW, the same WAL files might be shipped
>> from the primary to the standby many times. This behavior is unsmart,
>> and should be addressed.
>
> We might also have written half a file many times. The files in pg_xlog
> are suspect whereas the files in the archive are not. If we have both we
> should prefer the archive.

Yep.

Here's a patch I've been playing with. The idea is that in standby mode,
the server keeps trying to make progress in the recovery by:

a) restoring files from archive
b) replaying files from pg_xlog
c) streaming from master

When recovery reaches an invalid WAL record, typically caused by a
half-written WAL file, it closes the file and moves to the next source.
If an error is found in a file restored from archive or in a portion
just streamed from master, however, a PANIC is thrown, because it's not
expected to have errors in the archive or in the master.

When a file is streamed from master, it's left in pg_xlog, so it's found
there after a standby restart, and recovery can progress to the same
point as before restart. It also means that you can copy partial WAL
files to pg_xlog at any time and have them replayed in a few seconds.

The code structure is a bit spaghetti-like, I'm afraid. Any suggestions
on how to improve that are welcome..

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 445,464 **** static uint32 openLogSeg = 0;
  static uint32 openLogOff = 0;

  /*
   * These variables are used similarly to the ones above, but for reading
   * the XLOG.  Note, however, that readOff generally represents the offset
   * of the page just read, not the seek position of the FD itself, which
   * will be just past that page. readLen indicates how much of the current
!  * page has been read into readBuf.
   */
  static int    readFile = -1;
  static uint32 readId = 0;
  static uint32 readSeg = 0;
  static uint32 readOff = 0;
  static uint32 readLen = 0;

! /* Is the currently open segment being streamed from primary? */
! static bool readStreamed = false;

  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
  static char *readBuf = NULL;
--- 445,477 ----
  static uint32 openLogOff = 0;

  /*
+  * Codes indicating where we got a WAL file from during recovery, or where
+  * to attempt to get one.
+  */
+ #define XLOG_FROM_ARCHIVE        (1<<0)    /* Restored using restore_command */
+ #define XLOG_FROM_PG_XLOG        (1<<1)    /* Existing file in pg_xlog */
+ #define XLOG_FROM_STREAM        (1<<2)    /* Streamed from master */
+
+ /*
   * These variables are used similarly to the ones above, but for reading
   * the XLOG.  Note, however, that readOff generally represents the offset
   * of the page just read, not the seek position of the FD itself, which
   * will be just past that page. readLen indicates how much of the current
!  * page has been read into readBuf, and readSource indicates where we got
!  * the currently open file from.
   */
  static int    readFile = -1;
  static uint32 readId = 0;
  static uint32 readSeg = 0;
  static uint32 readOff = 0;
  static uint32 readLen = 0;
+ static int readSource = 0;        /* XLOG_FROM_* code */

! /*
!  * Keeps track of which sources we've tried to read the current WAL
!  * record from and failed.
!  */
! static int failedSources = 0;

  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
  static char *readBuf = NULL;
***************
*** 512,520 **** static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
                         bool find_free, int *max_advance,
                         bool use_lock);
  static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
!              bool fromArchive, bool notexistOk);
  static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
!                    bool fromArchive);
  static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
               bool randAccess);
  static void XLogFileClose(void);
--- 525,533 ----
                         bool find_free, int *max_advance,
                         bool use_lock);
  static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
!              int source, bool notexistOk);
  static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
!                    int sources);
  static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
               bool randAccess);
  static void XLogFileClose(void);
***************
*** 2567,2573 **** XLogFileOpen(uint32 log, uint32 seg)
   */
  static int
  XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
!              bool fromArchive, bool notfoundOk)
  {
      char        xlogfname[MAXFNAMELEN];
      char        activitymsg[MAXFNAMELEN + 16];
--- 2580,2586 ----
   */
  static int
  XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
!              int source, bool notfoundOk)
  {
      char        xlogfname[MAXFNAMELEN];
      char        activitymsg[MAXFNAMELEN + 16];
***************
*** 2576,2598 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,

      XLogFileName(xlogfname, tli, log, seg);

!     if (fromArchive)
      {
!         /* Report recovery progress in PS display */
!         snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
!                  xlogfname);
!         set_ps_display(activitymsg, false);

!         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
!                                                   "RECOVERYXLOG",
!                                                   XLogSegSize);
!         if (!restoredFromArchive)
!             return -1;
!     }
!     else
!     {
!         XLogFilePath(path, tli, log, seg);
!         restoredFromArchive = false;
      }

      fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
--- 2589,2616 ----

      XLogFileName(xlogfname, tli, log, seg);

!     switch (source)
      {
!         case XLOG_FROM_ARCHIVE:
!             /* Report recovery progress in PS display */
!             snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
!                      xlogfname);
!             set_ps_display(activitymsg, false);

!             restoredFromArchive = RestoreArchivedFile(path, xlogfname,
!                                                       "RECOVERYXLOG",
!                                                       XLogSegSize);
!             if (!restoredFromArchive)
!                 return -1;
!             break;
!
!         case XLOG_FROM_PG_XLOG:
!             XLogFilePath(path, tli, log, seg);
!             restoredFromArchive = false;
!             break;
!
!         default:
!             elog(ERROR, "invalid XLogFileRead source %d", source);
      }

      fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
***************
*** 2606,2611 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2624,2631 ----
                   xlogfname);
          set_ps_display(activitymsg, false);

+         readSource = source;
+
          return fd;
      }
      if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
***************
*** 2624,2630 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
   * searched in pg_xlog if not found in archive.
   */
  static int
! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
  {
      char        path[MAXPGPATH];
      ListCell   *cell;
--- 2644,2650 ----
   * searched in pg_xlog if not found in archive.
   */
  static int
! XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
  {
      char        path[MAXPGPATH];
      ListCell   *cell;
***************
*** 2647,2666 **** XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
          if (tli < curFileTLI)
              break;                /* don't bother looking at too-old TLIs */

!         fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
!         if (fd != -1)
!             return fd;

!         /*
!          * If not in StandbyMode, fall back to searching pg_xlog. In
!          * StandbyMode we're streaming segments from the primary to pg_xlog,
!          * and we mustn't confuse the (possibly partial) segments in pg_xlog
!          * with complete segments ready to be applied. We rather wait for the
!          * records to arrive through streaming.
!          */
!         if (!StandbyMode && fromArchive)
          {
!             fd = XLogFileRead(log, seg, emode, tli, false, true);
              if (fd != -1)
                  return fd;
          }
--- 2667,2685 ----
          if (tli < curFileTLI)
              break;                /* don't bother looking at too-old TLIs */

!         if (sources & XLOG_FROM_ARCHIVE)
!         {
!             fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_ARCHIVE, true);
!             if (fd != -1)
!             {
!                 elog(DEBUG1, "got WAL segment from archive");
!                 return fd;
!             }
!         }

!         if (sources & XLOG_FROM_PG_XLOG)
          {
!             fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_PG_XLOG, true);
              if (fd != -1)
                  return fd;
          }
***************
*** 3530,3545 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
      uint32        pageHeaderSize;
      int            emode;

-     /*
-      * We don't expect any invalid records during streaming recovery: we
-      * should never hit the end of WAL because we wait for it to be streamed.
-      * Therefore treat any broken WAL as PANIC, instead of failing over.
-      */
-     if (StandbyMode)
-         emode = PANIC;
-     else
-         emode = emode_arg;
-
      if (readBuf == NULL)
      {
          /*
--- 3549,3554 ----
***************
*** 3591,3600 **** ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
          randAccess = true;        /* allow curFileTLI to go backwards too */
      }

      /* Read the page containing the record */
!     if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
          return NULL;

      pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
      targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
      if (targetRecOff == 0)
--- 3600,3623 ----
          randAccess = true;        /* allow curFileTLI to go backwards too */
      }

+     /* This is the first try read this page. */
+     failedSources = 0;
+ retry:
      /* Read the page containing the record */
!     if (!XLogPageRead(RecPtr, emode_arg, fetching_ckpt, randAccess))
          return NULL;

+     /*
+      * We don't expect any invalid records in archive or in records streamed
+      * from master: we should never hit the end of WAL because we wait for it
+      * to be streamed. Therefore treat any broken WAL as PANIC, instead of
+      * failing over.
+      */
+     if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
+         emode = PANIC;
+     else
+         emode = emode_arg;
+
      pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
      targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
      if (targetRecOff == 0)
***************
*** 3828,3833 **** next_record_is_invalid:;
--- 3851,3864 ----
          close(readFile);
          readFile = -1;
      }
+
+     /* In standby-mode, retry from another source */
+     if (StandbyMode)
+     {
+         failedSources |= readSource;
+         goto retry;
+     }
+
      return NULL;
  }

***************
*** 8698,8704 **** StartupProcessMain(void)
   * as for waiting for the requested WAL record to arrive in standby mode.
   */
  static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
               bool randAccess)
  {
      static XLogRecPtr receivedUpto = {0, 0};
--- 8729,8735 ----
   * as for waiting for the requested WAL record to arrive in standby mode.
   */
  static bool
! XLogPageRead(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt,
               bool randAccess)
  {
      static XLogRecPtr receivedUpto = {0, 0};
***************
*** 8707,8719 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
      uint32        targetRecOff;
      uint32        targetId;
      uint32        targetSeg;

      XLByteToSeg(*RecPtr, targetId, targetSeg);
      targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
      targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;

      /* Fast exit if we have read the record in the current buffer already */
!     if (targetId == readId && targetSeg == readSeg &&
          targetPageOff == readOff && targetRecOff < readLen)
          return true;

--- 8738,8752 ----
      uint32        targetRecOff;
      uint32        targetId;
      uint32        targetSeg;
+     int            emode;
+     static pg_time_t last_fail_time = 0;

      XLByteToSeg(*RecPtr, targetId, targetSeg);
      targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
      targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;

      /* Fast exit if we have read the record in the current buffer already */
!     if (failedSources == 0 && targetId == readId && targetSeg == readSeg &&
          targetPageOff == readOff && targetRecOff < readLen)
          return true;

***************
*** 8725,8742 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
      {
          close(readFile);
          readFile = -1;
      }

      XLByteToSeg(*RecPtr, readId, readSeg);

      /* See if we need to retrieve more data */
      if (readFile < 0 ||
!         (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
      {
          if (StandbyMode)
          {
-             bool        last_restore_failed = false;
-
              /*
               * In standby mode, wait for the requested record to become
               * available, either via restore_command succeeding to restore the
--- 8758,8775 ----
      {
          close(readFile);
          readFile = -1;
+         readSource = 0;
      }

      XLByteToSeg(*RecPtr, readId, readSeg);

+ retry:
      /* See if we need to retrieve more data */
      if (readFile < 0 ||
!         (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
      {
          if (StandbyMode)
          {
              /*
               * In standby mode, wait for the requested record to become
               * available, either via restore_command succeeding to restore the
***************
*** 8746,8751 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 8779,8786 ----
              {
                  if (WalRcvInProgress())
                  {
+                     failedSources = 0;
+
                      /*
                       * While walreceiver is active, wait for new WAL to arrive
                       * from primary.
***************
*** 8761,8775 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                          {
                              readFile =
                                  XLogFileRead(readId, readSeg, PANIC,
!                                              recoveryTargetTLI, false, false);
                              switched_segment = true;
!                             readStreamed = true;
                          }
                          break;
                      }

                      if (CheckForStandbyTrigger())
!                         goto next_record_is_invalid;

                      /*
                       * When streaming is active, we want to react quickly when
--- 8796,8811 ----
                          {
                              readFile =
                                  XLogFileRead(readId, readSeg, PANIC,
!                                              recoveryTargetTLI,
!                                              XLOG_FROM_PG_XLOG, false);
                              switched_segment = true;
!                             readSource = XLOG_FROM_STREAM;
                          }
                          break;
                      }

                      if (CheckForStandbyTrigger())
!                         goto triggered;

                      /*
                       * When streaming is active, we want to react quickly when
***************
*** 8779,8784 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
--- 8815,8823 ----
                  }
                  else
                  {
+                     int sources;
+                     pg_time_t now;
+
                      /*
                       * Until walreceiver manages to reconnect, poll the
                       * archive.
***************
*** 8791,8828 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                      /* Reset curFileTLI if random fetch. */
                      if (randAccess)
                          curFileTLI = 0;
!                     readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
                      switched_segment = true;
-                     readStreamed = false;
                      if (readFile != -1)
                      {
-                         elog(DEBUG1, "got WAL segment from archive");
                          break;
                      }

                      /*
!                      * If we succeeded restoring some segments from archive
!                      * since the last connection attempt (or we haven't tried
!                      * streaming yet, retry immediately. But if we haven't,
!                      * assume the problem is persistent, so be less
!                      * aggressive.
                       */
!                     if (last_restore_failed)
                      {
!                         /*
!                          * Check to see if the trigger file exists. Note that
!                          * we do this only after failure, so when you create
!                          * the trigger file, we still finish replaying as much
!                          * as we can before failover.
!                          */
!                         if (CheckForStandbyTrigger())
!                             goto next_record_is_invalid;
!                         pg_usleep(5000000L);    /* 5 seconds */
                      }
!                     last_restore_failed = true;

                      /*
!                      * Nope, not found in archive. Try to stream it.
                       *
                       * If fetching_ckpt is TRUE, RecPtr points to the initial
                       * checkpoint location. In that case, we use RedoStartLSN
--- 8830,8877 ----
                      /* Reset curFileTLI if random fetch. */
                      if (randAccess)
                          curFileTLI = 0;
!
!                     /*
!                      * Try to restore the file from archive, or read an
!                      * existing file from pg_xlog.
!                      */
!                     sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
!                     sources &= ~failedSources;
!                     readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
!                                                   sources);
                      switched_segment = true;
                      if (readFile != -1)
                      {
                          break;
                      }

                      /*
!                      * Nope, not found in archive.
!                      */
!
!                     /*
!                      * Check to see if the trigger file exists. Note that
!                      * we do this only after failure, so when you create
!                      * the trigger file, we still finish replaying as much
!                      * as we can from archive and pg_xlog before failover.
                       */
!                     if (CheckForStandbyTrigger())
!                         goto triggered;
!
!                     /*
!                      * Sleep if it hasn't been long since last attempt.
!                      */
!                     now = (pg_time_t) time(NULL);
!                     if ((now - last_fail_time) < 5)
                      {
!                         pg_usleep(1000000L * (5 - (now - last_fail_time)));
!                         now = (pg_time_t) time(NULL);
                      }
!                     last_fail_time = now;

                      /*
!                      * If primary_conninfo is set, launch walreceiver to
!                      * try to stream the missing WAL.
                       *
                       * If fetching_ckpt is TRUE, RecPtr points to the initial
                       * checkpoint location. In that case, we use RedoStartLSN
***************
*** 8847,8859 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
              /* In archive or crash recovery. */
              if (readFile < 0)
              {
                  /* Reset curFileTLI if random fetch. */
                  if (randAccess)
                      curFileTLI = 0;
!                 readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
!                                               InArchiveRecovery);
                  switched_segment = true;
-                 readStreamed = false;
                  if (readFile < 0)
                      return false;
              }
--- 8896,8914 ----
              /* In archive or crash recovery. */
              if (readFile < 0)
              {
+                 int sources;
                  /* Reset curFileTLI if random fetch. */
                  if (randAccess)
                      curFileTLI = 0;
!
!                 sources = XLOG_FROM_PG_XLOG;
!                 if (InArchiveRecovery)
!                     sources |= XLOG_FROM_ARCHIVE;
!                 sources &= ~failedSources;
!
!                 readFile = XLogFileReadAnyTLI(readId, readSeg, emode_arg,
!                                               sources);
                  switched_segment = true;
                  if (readFile < 0)
                      return false;
              }
***************
*** 8861,8878 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
      }

      /*
!      * At this point, we have the right segment open and we know the requested
!      * record is in it.
       */
      Assert(readFile != -1);

      /*
       * If the current segment is being streamed from master, calculate how
       * much of the current page we have received already. We know the
       * requested record has been received, but this is for the benefit of
       * future calls, to allow quick exit at the top of this function.
       */
!     if (readStreamed)
      {
          if (RecPtr->xlogid != receivedUpto.xlogid ||
              (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
--- 8916,8944 ----
      }

      /*
!      * At this point, we have the right segment open and if we're streaming
!      * we know the requested record is in it.
       */
      Assert(readFile != -1);

      /*
+      * We don't expect any invalid records in archive or in records streamed
+      * from master: we should never hit the end of WAL because we wait for it
+      * to be streamed. Therefore treat any broken WAL as PANIC, instead of
+      * failing over.
+      */
+     if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
+         emode = PANIC;
+     else
+         emode = emode_arg;
+
+     /*
       * If the current segment is being streamed from master, calculate how
       * much of the current page we have received already. We know the
       * requested record has been received, but this is for the benefit of
       * future calls, to allow quick exit at the top of this function.
       */
!     if (readSource == XLOG_FROM_STREAM)
      {
          if (RecPtr->xlogid != receivedUpto.xlogid ||
              (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
***************
*** 8936,8946 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
      return true;

  next_record_is_invalid:
      if (readFile >= 0)
          close(readFile);
      readFile = -1;
-     readStreamed = false;
      readLen = 0;

      return false;
  }
--- 9002,9026 ----
      return true;

  next_record_is_invalid:
+     failedSources |= readSource;
+
+     if (readFile >= 0)
+         close(readFile);
+     readFile = -1;
+     readLen = 0;
+     readSource = 0;
+
+     if (StandbyMode)
+         goto retry;
+     else
+         return false;
+
+ triggered:
      if (readFile >= 0)
          close(readFile);
      readFile = -1;
      readLen = 0;
+     readSource = 0;

      return false;
  }

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Yeb Havinga
Дата:
Сообщение: Re: explain and PARAM_EXEC
Следующее
От: Tom Lane
Дата:
Сообщение: Re: [BUG] SECURITY DEFINER on call handler makes daemon crash