Re: Streaming replication, retrying from archive

Поиск
Список
Период
Сортировка
От Heikki Linnakangas
Тема Re: Streaming replication, retrying from archive
Дата
Msg-id 4B5758ED.1060703@enterprisedb.com
обсуждение исходный текст
Ответ на Re: Streaming replication, retrying from archive  (Dimitri Fontaine <dfontaine@hi-media.com>)
Ответы Re: Streaming replication, retrying from archive  (Simon Riggs <simon@2ndQuadrant.com>)
Re: Streaming replication, retrying from archive  (Dimitri Fontaine <dfontaine@hi-media.com>)
Список pgsql-hackers
Dimitri Fontaine wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
>> 1. Initial archive recovery. Standby fetches WAL files from archive
>> using restore_command. When a file is not found in archive, we start
>> walreceiver and switch to state 2
>>
>> 2. Retrying to restore from archive. When the connection to primary is
>> established and replication is started, we switch to state 3
>
> When do the master know about this new slave being there? I'd say not
> until 3 is ok, and then, the actual details between 1 and 2 look
> strange, partly because it's more about processes than states.

Right. The master doesn't need to know about the slave.

> I'd propose to have 1 and 2 started in parallel from the beginning, and
> as Simon proposes, being able to get back to 1. at any time:
>
> 0. start from a base backup, determine the first WAL / LSN we need to
>    start streaming, call it SR_LSN. That means asking the master its
>    current xlog location.

What if the master can't be contacted?

> The LSN we're at now, after replaying the base
>    backup and maybe the initial recovery from local WAL files, let's
>    call it BASE_LSN.
>
> 1. Get the missing WAL to get from BASE_LSN to SR_LSN from the archive,
>    with restore_command, apply them as we receive them, and start
>    2. possibly in parallel
>
> 2. Streaming replication: we connect to the primary and walreceiver gets
>    the WALs from the connection. It either stores them if current
>    standby's position < SR_LSN or apply them directly if we were already
>    streaming.
>
>    Local storage would be either standby's archiving or a specific
>    temporary location. I guess it's more or less what you want to do
>    with retrying from the master's archives, but I'm not sure your line
>    of though makes it simpler.

Seems complicated...

> <snip>
> The details about when a slave is in sync will get more important as
> soon as we have synchronous streaming.

Yeah, a lot of that logic and states is completely unnecessary until we
have a synchronous mode. Even then, it seems complex.

Here's what I've been hacking:

First of all, walreceiver no longer tries to retry the connection on
error, and postmaster no longer tries to relaunch it if it dies. So when
Walreceiver is launched, it tries to connect once, and if successful,
streams until an error occurs or it's killed.

When startup process needs more WAL to continue replay, the logic is in
pseudocode:

while (<need more wal>)
{
  if(<walreceiver is alive>)
  {
     wait for WAL to arrive, or for walreceiver to die.
  }
  else
  {
     Run restore_command
     If (restore_command succeeded)
       break;
     else
     {
       Sleep 5 seconds
       Start walreceiver
     }
  }
}

So there's just two states:

1. Recovering from archive
2. Streaming

We start from 1, and switch state at error.

This gives nice behavior from a user point of view. Standby tries to
make progress using either the archive or streaming, whichever becomes
available first.

Attached is a WIP patch implementing that, also available in the
'replication-xlogrefactor' branch in my git repository. It includes the
Read/FetchRecord refactoring I mentioned earlier; that's a pre-requisite
for this.

The code implementing the above retry logic in XLogReadPage(), in xlog.c.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 690dbb6..6cb6bf0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -144,16 +144,6 @@ HotStandbyState        standbyState = STANDBY_DISABLED;
 static     XLogRecPtr    LastRec;

 /*
- * Are we doing recovery from XLOG stream? If so, we recover without using
- * offline XLOG archives even though InArchiveRecovery==true. This flag is
- * used only in standby mode.
- */
-static bool InStreamingRecovery = false;
-
-/* The current log page is partially-filled, and so needs to be read again? */
-static bool needReread = false;
-
-/*
  * Local copy of SharedRecoveryInProgress variable. True actually means "not
  * known, need to check the shared state".
  */
@@ -457,12 +447,16 @@ static uint32 openLogOff = 0;
  * These variables are used similarly to the ones above, but for reading
  * the XLOG.  Note, however, that readOff generally represents the offset
  * of the page just read, not the seek position of the FD itself, which
- * will be just past that page.
+ * will be just past that page. readLen indicates how much of the current
+ * page has been read into readBuf.
  */
 static int    readFile = -1;
 static uint32 readId = 0;
 static uint32 readSeg = 0;
 static uint32 readOff = 0;
+static uint32 readLen = 0;
+/* Is the currently open segment being streamed from primary? */
+static bool readStreamed = false;

 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
 static char *readBuf = NULL;
@@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0;
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;    /* start of last record read */
 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
-static XLogRecord *nextRecord = NULL;
 static TimeLineID lastPageTLI = 0;

 static XLogRecPtr minRecoveryPoint;        /* local copy of
@@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
                        bool find_free, int *max_advance,
                        bool use_lock);
-static int    XLogFileRead(uint32 log, uint32 seg, int emode);
+static int    XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+             bool fromArchive, bool notexistOk);
+static int    XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
+                   bool fromArchive);
+static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+             bool randAccess);
 static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
                     const char *recovername, off_t expectedSize);
@@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
 static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
-static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
 static List *readTimeLineHistory(TimeLineID targetTLI);
@@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 static void WriteControlFile(void);
 static void ReadControlFile(void);
 static char *str_time(pg_time_t tnow);
+static bool CheckForStandbyTrigger(void);

 #ifdef WAL_DEBUG
 static void xlog_outrec(StringInfo buf, XLogRecord *record);
@@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg)

 /*
  * Open a logfile segment for reading (during recovery).
+ *
+ * If fromArchive is true, the segment is retrieved from archive, otherwise
+ * it's read from pg_xlog.
  */
 static int
-XLogFileRead(uint32 log, uint32 seg, int emode)
+XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
+             bool fromArchive, bool notfoundOk)
 {
-    char        path[MAXPGPATH];
     char        xlogfname[MAXFNAMELEN];
     char        activitymsg[MAXFNAMELEN + 16];
-    ListCell   *cell;
+    char        path[MAXPGPATH];
     int            fd;

-    /*
-     * Loop looking for a suitable timeline ID: we might need to read any of
-     * the timelines listed in expectedTLIs.
-     *
-     * We expect curFileTLI on entry to be the TLI of the preceding file in
-     * sequence, or 0 if there was no predecessor.    We do not allow curFileTLI
-     * to go backwards; this prevents us from picking up the wrong file when a
-     * parent timeline extends to higher segment numbers than the child we
-     * want to read.
-     */
-    foreach(cell, expectedTLIs)
-    {
-        TimeLineID    tli = (TimeLineID) lfirst_int(cell);
-
-        if (tli < curFileTLI)
-            break;                /* don't bother looking at too-old TLIs */
-
         XLogFileName(xlogfname, tli, log, seg);

-        if (InArchiveRecovery && !InStreamingRecovery)
+        if (fromArchive)
         {
             /* Report recovery progress in PS display */
             snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
@@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
             restoredFromArchive = RestoreArchivedFile(path, xlogfname,
                                                       "RECOVERYXLOG",
                                                       XLogSegSize);
+            if (!restoredFromArchive)
+                return -1;
         }
         else
+        {
             XLogFilePath(path, tli, log, seg);
+            restoredFromArchive = false;
+        }

         fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
         if (fd >= 0)
@@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode)

             return fd;
         }
-        if (errno != ENOENT)    /* unexpected failure? */
+        if (errno != ENOENT || !notfoundOk)    /* unexpected failure? */
             ereport(PANIC,
                     (errcode_for_file_access(),
             errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
                    path, log, seg)));
+        return -1;
+}
+
+/*
+ * Open a logfile segment for reading (during recovery).
+ *
+ * This version searches for the segment with any TLI listed in expectedTLIs.
+ * Also, if not in StandbyMode and fromArchive is true, the segment is
+ * also searched in pg_xlog if not found in archive.
+ */
+static int
+XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
+{
+    char        path[MAXPGPATH];
+    ListCell   *cell;
+    int            fd;
+
+    /*
+     * Loop looking for a suitable timeline ID: we might need to read any of
+     * the timelines listed in expectedTLIs.
+     *
+     * We expect curFileTLI on entry to be the TLI of the preceding file in
+     * sequence, or 0 if there was no predecessor.    We do not allow curFileTLI
+     * to go backwards; this prevents us from picking up the wrong file when a
+     * parent timeline extends to higher segment numbers than the child we
+     * want to read.
+     */
+    foreach(cell, expectedTLIs)
+    {
+        TimeLineID    tli = (TimeLineID) lfirst_int(cell);
+
+        if (tli < curFileTLI)
+            break;                /* don't bother looking at too-old TLIs */
+
+        fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
+        if (fd != -1)
+            return fd;
+
+        /*
+         * If not in StandbyMode, fall back to searching pg_xlog. In
+         * StandbyMode we're streaming segments from the primary to pg_xlog,
+         * and we mustn't confuse the (possibly partial) segments in pg_xlog
+         * with complete segments ready to be applied. We rather wait for
+         * the records to arrive through streaming.
+         */
+        if (!StandbyMode && fromArchive)
+        {
+            fd = XLogFileRead(log, seg, emode, tli, false, true);
+            if (fd != -1)
+                return fd;
+        }
     }

     /* Couldn't find it.  For simplicity, complain about front timeline */
@@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
              * different filename that can't be confused with regular XLOG
              * files.
              */
-            if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
+            if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
             {
                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);

@@ -3474,92 +3514,19 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
 }

 /*
- * Attempt to fetch an XLOG record.
- *
- * If RecPtr is not NULL, try to fetch a record at that position.  Otherwise
- * try to fetch a record just after the last one previously read.
- *
- * In standby mode, if we failed in reading a valid record and are not doing
- * recovery from XLOG stream yet, we ignore the failure and start walreceiver
- * process to fetch the record from the primary. Otherwise, returns NULL,
- * or fails if emode is PANIC. (emode must be either PANIC or LOG.)
- *
- * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
- * this case, if we have to start XLOG streaming, we use RedoStartLSN as the
- * streaming start position instead of RecPtr.
- *
- * The record is copied into readRecordBuf, so that on successful return,
- * the returned record pointer always points there.
- */
-static XLogRecord *
-FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
-{
-    if (StandbyMode && !InStreamingRecovery)
-    {
-        XLogRecord *record;
-        XLogRecPtr    startlsn;
-        bool        haveNextRecord = (nextRecord != NULL);
-
-        /* An invalid record is OK here, so we set emode to DEBUG2 */
-        record = ReadRecord(RecPtr, DEBUG2);
-        if (record != NULL)
-            return record;
-
-        /*
-         * Start XLOG streaming if there is no more valid records available
-         * in the archive.
-         *
-         * We need to calculate the start position of XLOG streaming. If we
-         * read a record in the middle of a segment which doesn't exist in
-         * pg_xlog, we use the start of the segment as the start position.
-         * That prevents a broken segment (i.e., with no records in the
-         * first half of a segment) from being created by XLOG streaming,
-         * which might cause trouble later on if the segment is e.g
-         * archived.
-         */
-        startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
-        if (startlsn.xrecoff % XLogSegSize != 0)
-        {
-            char        xlogpath[MAXPGPATH];
-            struct stat    stat_buf;
-            uint32        log;
-            uint32        seg;
-
-            XLByteToSeg(startlsn, log, seg);
-            XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
-
-            if (stat(xlogpath, &stat_buf) != 0)
-                startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
-        }
-        RequestXLogStreaming(startlsn, PrimaryConnInfo);
-
-        /* Needs to read the current page again if the next record is in it */
-        needReread = haveNextRecord;
-        nextRecord = NULL;
-
-        InStreamingRecovery = true;
-        ereport(LOG,
-                (errmsg("starting streaming recovery at %X/%X",
-                        startlsn.xlogid, startlsn.xrecoff)));
-    }
-
-    return ReadRecord(RecPtr, emode);
-}
-
-/*
  * Attempt to read an XLOG record.
  *
  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
  * try to read a record just after the last one previously read.
  *
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC, LOG or DEBUG2.)
+ * (emode must be either PANIC, LOG)
  *
  * The record is copied into readRecordBuf, so that on successful return,
  * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
+ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
 {
     XLogRecord *record;
     char       *buffer;
@@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
     bool        randAccess = false;
     uint32        len,
                 total_len;
-    uint32        targetPageOff;
     uint32        targetRecOff;
     uint32        pageHeaderSize;
-    XLogRecPtr    receivedUpto = {0,0};
-    bool        finished;
     int            emode;

     /*
@@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
      * should never hit the end of WAL because we wait for it to be streamed.
      * Therefore treat any broken WAL as PANIC, instead of failing over.
      */
-    if (InStreamingRecovery)
+    if (StandbyMode)
         emode = PANIC;
     else
         emode = emode_arg;
@@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
     if (RecPtr == NULL)
     {
         RecPtr = &tmpRecPtr;
-        /* fast case if next record is on same page */
-        if (nextRecord != NULL)
-        {
-            record = nextRecord;
-            goto got_record;
-        }

         /*
-         * Align old recptr to next page if the current page is filled and
-         * doesn't need to be read again.
+         * Align recptr to next page if no more records can fit on the
+         * current page.
          */
-        if (!needReread)
+        if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
+        {
             NextLogPage(tmpRecPtr);
-        /* We will account for page header size below */
+            /* We will account for page header size below */
+        }
     }
     else
     {
@@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
         randAccess = true;        /* allow curFileTLI to go backwards too */
     }

-    if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
-    {
-        close(readFile);
-        readFile = -1;
-    }
-
-    /* Is the target record ready yet? */
-    if (InStreamingRecovery)
-    {
-        receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
-        if (finished)
-        {
-            if (emode_arg == PANIC)
-                ereport(PANIC,
-                        (errmsg("streaming recovery ended")));
-            else
-                return NULL;
-        }
-    }
-
-    XLByteToSeg(*RecPtr, readId, readSeg);
-    if (readFile < 0)
-    {
-        /* Now it's okay to reset curFileTLI if random fetch */
-        if (randAccess)
-            curFileTLI = 0;
-
-        readFile = XLogFileRead(readId, readSeg, emode);
-        if (readFile < 0)
-            goto next_record_is_invalid;
-
-        /*
-         * Whenever switching to a new WAL segment, we read the first page of
-         * the file and validate its header, even if that's not where the
-         * target record is.  This is so that we can check the additional
-         * identification info that is present in the first page's "long"
-         * header.
-         */
-        readOff = 0;
-        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-        {
-            ereport(emode,
-                    (errcode_for_file_access(),
-                     errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                            readId, readSeg, readOff)));
-            goto next_record_is_invalid;
-        }
-        if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-            goto next_record_is_invalid;
-    }
+    /* Read the page containing the record */
+    if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
+        return NULL;

-    targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
-    if (readOff != targetPageOff || needReread)
-    {
-        readOff = targetPageOff;
-        needReread = false;
-        if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
-        {
-            ereport(emode,
-                    (errcode_for_file_access(),
-                     errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                            readId, readSeg, readOff)));
-            goto next_record_is_invalid;
-        }
-        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-        {
-            ereport(emode,
-                    (errcode_for_file_access(),
-                     errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                            readId, readSeg, readOff)));
-            goto next_record_is_invalid;
-        }
-        if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-            goto next_record_is_invalid;
-    }
     pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
     targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
     if (targetRecOff == 0)
@@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
     }
     record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);

-got_record:;
-
     /*
      * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
      * required.
@@ -3838,58 +3725,35 @@ got_record:;
     }

     buffer = readRecordBuf;
-    nextRecord = NULL;
     len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
     if (total_len > len)
     {
         /* Need to reassemble record */
         XLogContRecord *contrecord;
-        XLogRecPtr    nextpagelsn = *RecPtr;
+        XLogRecPtr    pagelsn;
         uint32        gotlen = len;

+        /* Initialize pagelsn to the beginning of the page this record is on */
+        pagelsn = *RecPtr;
+        pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+
         memcpy(buffer, record, len);
         record = (XLogRecord *) buffer;
         buffer += len;
         for (;;)
         {
-            /* Is the next page ready yet? */
-            if (InStreamingRecovery)
+            /* Calculate pointer to beginning of next page */
+            pagelsn.xrecoff += XLOG_BLCKSZ;
+            if (pagelsn.xrecoff >= XLogFileSize)
             {
-                if (gotlen != len)
-                    nextpagelsn.xrecoff += XLOG_BLCKSZ;
-                NextLogPage(nextpagelsn);
-                receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
-                if (finished)
-                {
-                    if (emode_arg == PANIC)
-                        ereport(PANIC,
-                                (errmsg("streaming recovery ended")));
-                    else
-                        return NULL;
-                }
+                (pagelsn.xlogid)++;
+                pagelsn.xrecoff = 0;
             }
+            /* Wait for the next page to become available */
+            if (!XLogPageRead(&pagelsn, emode, false, false))
+                return NULL;

-            readOff += XLOG_BLCKSZ;
-            if (readOff >= XLogSegSize)
-            {
-                close(readFile);
-                readFile = -1;
-                NextLogSeg(readId, readSeg);
-                readFile = XLogFileRead(readId, readSeg, emode);
-                if (readFile < 0)
-                    goto next_record_is_invalid;
-                readOff = 0;
-            }
-            if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-            {
-                ereport(emode,
-                        (errcode_for_file_access(),
-                         errmsg("could not read from log file %u, segment %u, offset %u: %m",
-                                readId, readSeg, readOff)));
-                goto next_record_is_invalid;
-            }
-            if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
-                goto next_record_is_invalid;
+            /* Check that the continuation record looks valid */
             if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
             {
                 ereport(emode,
@@ -3923,31 +3787,11 @@ got_record:;
         if (!RecordIsValid(record, *RecPtr, emode))
             goto next_record_is_invalid;
         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
-        if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
-            MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
-        {
-            nextRecord = (XLogRecord *) ((char *) contrecord +
-                    MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
-        }
         EndRecPtr.xlogid = readId;
         EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
             pageHeaderSize +
             MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);

-        /*
-         * Check whether the current page needs to be read again. If there is no
-         * unread record in the current page (nextRecord == NULL), obviously we
-         * don't need to reread it. If we're not in streaming recovery mode yet,
-         * partially-filled page doesn't need to be reread because it is the
-         * last valid page.
-         */
-        if (nextRecord != NULL && InStreamingRecovery &&
-            XLByteLE(receivedUpto, EndRecPtr))
-        {
-            nextRecord    = NULL;
-            needReread    = true;
-        }
-
         ReadRecPtr = *RecPtr;
         /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
         return record;
@@ -3956,26 +3800,9 @@ got_record:;
     /* Record does not cross a page boundary */
     if (!RecordIsValid(record, *RecPtr, emode))
         goto next_record_is_invalid;
-    if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
-        MAXALIGN(total_len))
-        nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
     EndRecPtr.xlogid = RecPtr->xlogid;
     EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);

-    /*
-     * Check whether the current page needs to be read again. If there is no
-     * unread record in the current page (nextRecord == NULL), obviously we
-     * don't need to reread it. If we're not in streaming recovery mode yet,
-     * partially-filled page doesn't need to be reread because it is the last
-     * valid page.
-     */
-    if (nextRecord != NULL && InStreamingRecovery &&
-        XLByteLE(receivedUpto, EndRecPtr))
-    {
-        nextRecord    = NULL;
-        needReread    = true;
-    }
-
     ReadRecPtr = *RecPtr;
     memcpy(buffer, record, total_len);

@@ -3987,8 +3814,6 @@ got_record:;
         /* Pretend it extends to end of segment */
         EndRecPtr.xrecoff += XLogSegSize - 1;
         EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
-        nextRecord = NULL;        /* definitely not on same page */
-        needReread = false;

         /*
          * Pretend that readBuf contains the last page of the segment. This is
@@ -4005,7 +3830,6 @@ next_record_is_invalid:;
         close(readFile);
         readFile = -1;
     }
-    nextRecord = NULL;
     return NULL;
 }

@@ -5722,7 +5546,7 @@ StartupXLOG(void)
                     (errmsg("checkpoint record is at %X/%X",
                             checkPointLoc.xlogid, checkPointLoc.xrecoff)));
         }
-        else if (InStreamingRecovery)
+        else if (StandbyMode)
         {
             /*
              * The last valid checkpoint record required for a streaming
@@ -5930,12 +5754,12 @@ StartupXLOG(void)
         if (XLByteLT(checkPoint.redo, RecPtr))
         {
             /* back up to find the record */
-            record = FetchRecord(&(checkPoint.redo), PANIC, false);
+            record = ReadRecord(&(checkPoint.redo), PANIC, false);
         }
         else
         {
             /* just have to read next record after CheckPoint */
-            record = FetchRecord(NULL, LOG, false);
+            record = ReadRecord(NULL, LOG, false);
         }

         if (record != NULL)
@@ -6088,7 +5912,7 @@ StartupXLOG(void)

                 LastRec = ReadRecPtr;

-                record = FetchRecord(NULL, LOG, false);
+                record = ReadRecord(NULL, LOG, false);
             } while (record != NULL && recoveryContinue);

             /*
@@ -6122,22 +5946,17 @@ StartupXLOG(void)

     /*
      * We are now done reading the xlog from stream. Turn off streaming
-     * recovery, and restart fetching the files (which would be required
-     * at end of recovery, e.g., timeline history file) from archive.
+     * recovery to force fetching the files (which would be required
+     * at end of recovery, e.g., timeline history file) from archive or
+     * pg_xlog.
      */
-    if (InStreamingRecovery)
-    {
-        /* We are no longer in streaming recovery state */
-        InStreamingRecovery = false;
-        ereport(LOG,
-                (errmsg("streaming recovery complete")));
-    }
+    StandbyMode = false;

     /*
      * Re-fetch the last valid or last applied record, so we can identify the
      * exact endpoint of what we consider the valid portion of WAL.
      */
-    record = ReadRecord(&LastRec, PANIC);
+    record = ReadRecord(&LastRec, PANIC, false);
     EndOfLog = EndRecPtr;
     XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);

@@ -6507,7 +6326,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
         return NULL;
     }

-    record = FetchRecord(&RecPtr, LOG, true);
+    record = ReadRecord(&RecPtr, LOG, true);

     if (record == NULL)
     {
@@ -7453,10 +7272,6 @@ CreateRestartPoint(int flags)
     }
     LWLockRelease(ControlFileLock);

-    /* Are we doing recovery from XLOG stream? */
-    if (!InStreamingRecovery)
-        InStreamingRecovery = WalRcvInProgress();
-
     /*
      * Delete old log files (those no longer needed even for previous
      * checkpoint/restartpoint) to prevent the disk holding the xlog from
@@ -7464,7 +7279,7 @@ CreateRestartPoint(int flags)
      * streaming recovery we have to or the disk will eventually fill up from
      * old log files streamed from master.
      */
-    if (InStreamingRecovery && (_logId || _logSeg))
+    if (WalRcvInProgress() && (_logId || _logSeg))
     {
         XLogRecPtr    endptr;

@@ -8739,6 +8554,13 @@ HandleStartupProcInterrupts(void)
      */
     if (shutdown_requested)
         proc_exit(1);
+
+    /*
+     * Emergency bailout if postmaster has died.  This is to avoid the
+     * necessity for manual cleanup of all postmaster children.
+     */
+    if (IsUnderPostmaster && !PostmasterIsAlive(true))
+        exit(1);
 }

 /* Main entry point for startup process */
@@ -8788,3 +8610,281 @@ StartupProcessMain(void)
      */
     proc_exit(0);
 }
+
+/*
+ * Read the XLOG page containing RecPtr into readBuf. Returns true
+ * if successful, false otherwise.
+ *
+ * This is responsible for restoring files from archive as needed, as well
+ * as for waiting for new WAL to arrive in standby mode.
+ */
+static bool
+XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
+             bool randAccess)
+{
+    static XLogRecPtr receivedUpto = {0, 0};
+    bool switched_segment = false;
+    uint32 targetPageOff;
+    uint32 targetRecOff;
+    uint32 targetId;
+    uint32 targetSeg;
+
+    XLByteToSeg(*RecPtr, targetId, targetSeg);
+    targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
+    targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
+
+    /* Fast exit if we have read the record in the current buffer already */
+    if (targetId == readId && targetSeg == readSeg &&
+        targetPageOff == readOff && targetRecOff < readLen)
+        return true;
+
+    /*
+     * See if we need to switch to a new segment because the requested record
+     * is not in the currently open one.
+     */
+    if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
+    {
+        close(readFile);
+        readFile = -1;
+    }
+
+    XLByteToSeg(*RecPtr, readId, readSeg);
+
+    /* See if we need to retrieve more data */
+    if (readFile < 0 ||
+        (readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
+    {
+        if (StandbyMode)
+        {
+            bool last_restore_failed = false;
+
+            /*
+             * In standby mode, wait for the requested record to become
+             * available, either via restore_command succeeding to restore
+             * the segment, or via walreceiver having streamed the record.
+             */
+            for (;;)
+            {
+                if (WalRcvInProgress())
+                {
+                    /*
+                     * While walreceiver is active, wait for new WAL to
+                     * arrive from primary.
+                     */
+                    receivedUpto = GetWalRcvWriteRecPtr();
+                    if (XLByteLT(*RecPtr, receivedUpto))
+                    {
+                        /*
+                         * Great, streamed far enough. Open the file if it's
+                         * not open already.
+                         */
+                        if (readFile < 0)
+                        {
+                            readFile =
+                                XLogFileRead(readId, readSeg, PANIC,
+                                             recoveryTargetTLI, false, false);
+                            switched_segment = true;
+                            readStreamed = true;
+                        }
+                        break;
+                    }
+
+                    if (CheckForStandbyTrigger())
+                        goto next_record_is_invalid;
+
+                    /*
+                     * When streaming is active, we want to react quickly when
+                     * the next WAL record arrives, so sleep only a bit.
+                     */
+                    pg_usleep(100000L); /* 100ms */
+                }
+                else
+                {
+                    /*
+                     * Until walreceiver manages to reconnect, poll the
+                     * archive.
+                     */
+                    if (readFile >= 0)
+                    {
+                        close(readFile);
+                        readFile = -1;
+                    }
+                    /* Reset curFileTLI if random fetch. */
+                    if (randAccess)
+                        curFileTLI = 0;
+                    readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
+                    switched_segment = true;
+                    readStreamed = false;
+                    if (readFile != -1)
+                    {
+                        elog(DEBUG1, "got WAL segment from archive");
+                        break;
+                    }
+
+                    /*
+                     * If we succeeded restoring some segments from archive
+                     * since the last connection attempt (or we haven't
+                     * tried streaming yet, retry immediately. But if we
+                     * haven't, assume the problem is persistent, so be
+                     * less aggressive.
+                     */
+                    if (last_restore_failed)
+                    {
+                        /*
+                         * Check to see if the trigger file exists. Note that
+                         * we do this only after failure, so when you create
+                         * the trigger file, we still finish replaying as much
+                         * as we can before failover.
+                         */
+                        if (CheckForStandbyTrigger())
+                            goto next_record_is_invalid;
+                        pg_usleep(5000000L); /* 5 seconds */
+                    }
+                    last_restore_failed = true;
+
+                    /*
+                     * Nope, not found in archive. Try to stream it.
+                     *
+                     * If fetching_ckpt is TRUE, RecPtr points to the initial
+                     * checkpoint location. In that case, we use RedoStartLSN
+                     * as the streaming start position instead of RecPtr, so
+                     * that when we later jump backwards to start redo at
+                     * RedoStartLSN, we will have the logs streamed already.
+                     */
+                    RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
+                                         PrimaryConnInfo);
+                }
+
+                /*
+                 * This possibly-long loop needs to handle interrupts of startup
+                 * process.
+                 */
+                HandleStartupProcInterrupts();
+            }
+        }
+        else
+        {
+            /* In archive or crash recovery. */
+            if (readFile < 0)
+            {
+                /* Reset curFileTLI if random fetch. */
+                if (randAccess)
+                    curFileTLI = 0;
+                readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
+                                              InArchiveRecovery);
+                switched_segment = true;
+                readStreamed = false;
+                if (readFile < 0)
+                    return false;
+            }
+        }
+    }
+
+    /*
+     * At this point, we have the right segment open and we know the
+     * requested record is in it.
+     */
+    Assert(readFile != -1);
+
+    /*
+     * If the current segment is being streamed from master, calculate
+     * how much of the current page we have received already. We know the
+     * requested record has been received, but this is for the benefit
+     * of future calls, to allow quick exit at the top of this function.
+     */
+    if (readStreamed)
+    {
+        if (RecPtr->xlogid != receivedUpto.xlogid ||
+            (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
+        {
+            readLen = XLOG_BLCKSZ;
+        }
+        else
+            readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
+    }
+    else
+        readLen = XLOG_BLCKSZ;
+
+    if (switched_segment && targetPageOff != 0)
+    {
+        /*
+         * Whenever switching to a new WAL segment, we read the first page of
+         * the file and validate its header, even if that's not where the
+         * target record is.  This is so that we can check the additional
+         * identification info that is present in the first page's "long"
+         * header.
+         */
+        readOff = 0;
+        if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+        {
+            ereport(emode,
+                    (errcode_for_file_access(),
+                     errmsg("could not read from log file %u, segment %u, offset %u: %m",
+                            readId, readSeg, readOff)));
+            goto next_record_is_invalid;
+        }
+        if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+            goto next_record_is_invalid;
+    }
+
+    /* Read the requested page */
+    readOff = targetPageOff;
+    if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
+    {
+        ereport(emode,
+                (errcode_for_file_access(),
+                 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
+                        readId, readSeg, readOff)));
+        goto next_record_is_invalid;
+    }
+    if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+    {
+        ereport(emode,
+                (errcode_for_file_access(),
+                 errmsg("could not read from log file %u, segment %u, offset %u: %m",
+                        readId, readSeg, readOff)));
+        goto next_record_is_invalid;
+    }
+    if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
+        goto next_record_is_invalid;
+
+    Assert(targetId == readId);
+    Assert(targetSeg == readSeg);
+    Assert(targetPageOff == readOff);
+    Assert(targetRecOff < readLen);
+
+    return true;
+
+next_record_is_invalid:
+    if (readFile >= 0)
+        close(readFile);
+    readFile = -1;
+    readStreamed = false;
+    readLen = 0;
+
+    return false;
+}
+
+/*
+ * Check to see if the trigger file exists. If it does, request postmaster
+ * to shut down walreceiver and wait for it to exit, and remove the trigger
+ * file.
+ */
+static bool
+CheckForStandbyTrigger(void)
+{
+    struct stat stat_buf;
+
+    if (TriggerFile == NULL)
+        return false;
+
+    if (stat(TriggerFile, &stat_buf) == 0)
+    {
+        ereport(LOG,
+                (errmsg("trigger file found: %s", TriggerFile)));
+        ShutdownWalRcv();
+        unlink(TriggerFile);
+        return true;
+    }
+    return false;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index e8ddfc1..5281fa2 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -224,9 +224,6 @@ static int    Shutdown = NoShutdown;
 static bool FatalError = false; /* T if recovering from backend crash */
 static bool RecoveryError = false;        /* T if WAL recovery failed */

-/* If WalReceiverActive is true, restart walreceiver if it dies */
-static bool WalReceiverActive = false;
-
 /*
  * We use a simple state machine to control startup, shutdown, and
  * crash recovery (which is rather like shutdown followed by startup).
@@ -1469,11 +1466,6 @@ ServerLoop(void)
         if (PgStatPID == 0 && pmState == PM_RUN)
             PgStatPID = pgstat_start();

-        /* If we have lost walreceiver, try to start a new one */
-        if (WalReceiverPID == 0 && WalReceiverActive &&
-            (pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT))
-            WalReceiverPID = StartWalReceiver();
-
         /* If we need to signal the autovacuum launcher, do so now */
         if (avlauncher_needs_signal)
         {
@@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS)
         WalReceiverPID == 0)
     {
         /* Startup Process wants us to start the walreceiver process. */
-        WalReceiverActive = true;
         WalReceiverPID = StartWalReceiver();
     }

-    if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER))
-    {
-        /* The walreceiver process doesn't want to be restarted anymore */
-        WalReceiverActive = false;
-    }
-
     PG_SETMASK(&UnBlockSig);

     errno = save_errno;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b049baa..f0fcb7c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -134,7 +134,6 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);

 /* Prototypes for private functions */
-static void InitWalRcv(void);
 static void WalRcvKill(int code, Datum arg);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
@@ -153,21 +152,57 @@ static struct
 void
 WalReceiverMain(void)
 {
-    sigjmp_buf    local_sigjmp_buf;
-    MemoryContext walrcv_context;
     char conninfo[MAXCONNINFO];
     XLogRecPtr startpoint;
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;

-    /* Load the libpq-specific functions */
-    load_file("libpqwalreceiver", false);
-    if (walrcv_connect == NULL || walrcv_receive == NULL ||
-        walrcv_disconnect == NULL)
-        elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+    /*
+     * WalRcv should be set up already (if we are a backend, we inherit
+     * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+     */
+    Assert(walrcv != NULL);

-    /* Mark walreceiver in progress */
-    InitWalRcv();
+    /*
+     * Mark walreceiver as running in shared memory.
+     *
+     * Do this as early as possible, so that if we fail later on, we'll
+     * set state to STOPPED. If we die before this, the startup process
+     * will keep waiting for us to startup, until it times out.
+     */
+    SpinLockAcquire(&walrcv->mutex);
+    Assert(walrcv->pid == 0);
+    switch(walrcv->walRcvState)
+    {
+        case WALRCV_STOPPING:
+            /* If we've already been requested to stop, don't start up. */
+            walrcv->walRcvState = WALRCV_STOPPED;
+            /* fall through */
+
+        case WALRCV_STOPPED:
+            SpinLockRelease(&walrcv->mutex);
+            proc_exit(1);
+            break;
+
+        case WALRCV_STARTING:
+            /* The usual case */
+            break;
+
+        case WALRCV_RUNNING:
+            /* Shouldn't happen */
+            elog(PANIC, "walreceiver still running according to shared memory state");
+    }
+    /* Advertise our PID so that the startup process can kill us */
+    walrcv->pid = MyProcPid;
+    walrcv->walRcvState = WALRCV_RUNNING;
+
+    /* Fetch information required to start streaming */
+    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+    startpoint = walrcv->receivedUpto;
+    SpinLockRelease(&walrcv->mutex);
+
+    /* Arrange to clean up at walreceiver exit */
+    on_shmem_exit(WalRcvKill, 0);

     /*
      * If possible, make this process a group leader, so that the postmaster
@@ -200,81 +235,21 @@ WalReceiverMain(void)
     /* We allow SIGQUIT (quickdie) at all times */
     sigdelset(&BlockSig, SIGQUIT);

+    /* Load the libpq-specific functions */
+    load_file("libpqwalreceiver", false);
+    if (walrcv_connect == NULL || walrcv_receive == NULL ||
+        walrcv_disconnect == NULL)
+        elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
     /*
      * Create a resource owner to keep track of our resources (not clear that
      * we need this, but may as well have one).
      */
     CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");

-    /*
-     * Create a memory context that we will do all our work in.  We do this so
-     * that we can reset the context during error recovery and thereby avoid
-     * possible memory leaks.
-     */
-    walrcv_context = AllocSetContextCreate(TopMemoryContext,
-                                              "Wal Receiver",
-                                              ALLOCSET_DEFAULT_MINSIZE,
-                                              ALLOCSET_DEFAULT_INITSIZE,
-                                              ALLOCSET_DEFAULT_MAXSIZE);
-    MemoryContextSwitchTo(walrcv_context);
-
-    /*
-     * If an exception is encountered, processing resumes here.
-     *
-     * This code is heavily based on bgwriter.c, q.v.
-     */
-    if (sigsetjmp(local_sigjmp_buf, 1) != 0)
-    {
-        /* Since not using PG_TRY, must reset error stack by hand */
-        error_context_stack = NULL;
-
-        /* Reset WalRcvImmediateInterruptOK */
-        DisableWalRcvImmediateExit();
-
-        /* Prevent interrupts while cleaning up */
-        HOLD_INTERRUPTS();
-
-        /* Report the error to the server log */
-        EmitErrorReport();
-
-        /* Disconnect any previous connection. */
-        EnableWalRcvImmediateExit();
-        walrcv_disconnect();
-        DisableWalRcvImmediateExit();
-
-        /*
-         * Now return to normal top-level context and clear ErrorContext for
-         * next time.
-         */
-        MemoryContextSwitchTo(walrcv_context);
-        FlushErrorState();
-
-        /* Flush any leaked data in the top-level context */
-        MemoryContextResetAndDeleteChildren(walrcv_context);
-
-        /* Now we can allow interrupts again */
-        RESUME_INTERRUPTS();
-
-        /*
-         * Sleep at least 1 second after any error.  A write error is likely
-         * to be repeated, and we don't want to be filling the error logs as
-         * fast as we can.
-         */
-        pg_usleep(1000000L);
-    }
-
-    /* We can now handle ereport(ERROR) */
-    PG_exception_stack = &local_sigjmp_buf;
-
     /* Unblock signals (they were blocked when the postmaster forked us) */
     PG_SETMASK(&UnBlockSig);

-    /* Fetch connection information from shared memory */
-    SpinLockAcquire(&walrcv->mutex);
-    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
-    startpoint = walrcv->receivedUpto;
-    SpinLockRelease(&walrcv->mutex);
-
     /* Establish the connection to the primary for XLOG streaming */
     EnableWalRcvImmediateExit();
     walrcv_connect(conninfo, startpoint);
@@ -330,63 +305,24 @@ WalReceiverMain(void)
     }
 }

-/* Advertise our pid in shared memory, so that startup process can kill us. */
-static void
-InitWalRcv(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    /*
-     * WalRcv should be set up already (if we are a backend, we inherit
-     * this by fork() or EXEC_BACKEND mechanism from the postmaster).
-     */
-    if (walrcv == NULL)
-        elog(PANIC, "walreceiver control data uninitialized");
-
-    /* If we've already been requested to stop, don't start up */
-    SpinLockAcquire(&walrcv->mutex);
-    Assert(walrcv->pid == 0);
-    if (walrcv->walRcvState == WALRCV_STOPPED ||
-        walrcv->walRcvState == WALRCV_STOPPING)
-    {
-        walrcv->walRcvState = WALRCV_STOPPED;
-        SpinLockRelease(&walrcv->mutex);
-        proc_exit(1);
-    }
-    walrcv->pid = MyProcPid;
-    SpinLockRelease(&walrcv->mutex);
-
-    /* Arrange to clean up at walreceiver exit */
-    on_shmem_exit(WalRcvKill, 0);
-}
-
 /*
- * Clear our pid from shared memory at exit.
+ * Mark us as STOPPED in shared memory at exit.
  */
 static void
 WalRcvKill(int code, Datum arg)
 {
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
-    bool stopped = false;

     SpinLockAcquire(&walrcv->mutex);
-    if (walrcv->walRcvState == WALRCV_STOPPING ||
-        walrcv->walRcvState == WALRCV_STOPPED)
-    {
-        walrcv->walRcvState = WALRCV_STOPPED;
-        stopped = true;
-        elog(LOG, "walreceiver stopped");
-    }
+    Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+           walrcv->walRcvState == WALRCV_STOPPING);
+    walrcv->walRcvState = WALRCV_STOPPED;
     walrcv->pid = 0;
     SpinLockRelease(&walrcv->mutex);

+    /* Terminate the connection gracefully. */
     walrcv_disconnect();
-
-    /* If requested to stop, tell postmaster to not restart us. */
-    if (stopped)
-        SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
 }

 /* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 24cf789..763c02d 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -18,6 +18,8 @@

 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
 #include <unistd.h>
 #include <signal.h>

@@ -30,8 +32,11 @@

 WalRcvData *WalRcv = NULL;

-static bool CheckForStandbyTrigger(void);
-static void ShutdownWalRcv(void);
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10

 /* Report shared memory space needed by WalRcvShmemInit */
 Size
@@ -62,7 +67,7 @@ WalRcvShmemInit(void)

     /* Initialize the data structures */
     MemSet(WalRcv, 0, WalRcvShmemSize());
-    WalRcv->walRcvState = WALRCV_NOT_STARTED;
+    WalRcv->walRcvState = WALRCV_STOPPED;
     SpinLockInit(&WalRcv->mutex);
 }

@@ -73,90 +78,39 @@ WalRcvInProgress(void)
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
     WalRcvState state;
+    pg_time_t now = (pg_time_t) time(NULL);

     SpinLockAcquire(&walrcv->mutex);
+
+    /*
+     * If it has taken too long for walreceiver to start up, give up.
+     * Setting the state to STOPPED ensures that if walreceiver later
+     * does start up after all, it will see that it's not supposed to be
+     * running and dies before doing anything.
+     */
+    if (walrcv->walRcvState == WALRCV_STARTING &&
+        (now - walrcv->startTime) > WALRCV_STARTUP_TIMEOUT)
+        walrcv->walRcvState = WALRCV_STOPPED;
+
     state = walrcv->walRcvState;
+
     SpinLockRelease(&walrcv->mutex);

-    if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
+    if (state != WALRCV_STOPPED)
         return true;
     else
         return false;
 }

 /*
- * Wait for the XLOG record at given position to become available.
- *
- * 'recptr' indicates the byte position which caller wants to read the
- * XLOG record up to. The byte position actually written and flushed
- * by walreceiver is returned. It can be higher than the requested
- * location, and the caller can safely read up to that point without
- * calling WaitNextXLogAvailable() again.
- *
- * If WAL streaming is ended (because a trigger file is found), *finished
- * is set to true and function returns immediately. The returned position
- * can be lower than requested in that case.
- *
- * Called by the startup process during streaming recovery.
+ * Stop walreceiver (if running) and wait for it to die.
  */
-XLogRecPtr
-WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
-{
-    static XLogRecPtr receivedUpto = {0, 0};
-
-    *finished = false;
-
-    /* Quick exit if already known available */
-    if (XLByteLT(recptr, receivedUpto))
-        return receivedUpto;
-
-    for (;;)
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        /* Update local status */
-        SpinLockAcquire(&walrcv->mutex);
-        receivedUpto = walrcv->receivedUpto;
-        SpinLockRelease(&walrcv->mutex);
-
-        /* If available already, leave here */
-        if (XLByteLT(recptr, receivedUpto))
-            return receivedUpto;
-
-        /* Check to see if the trigger file exists */
-        if (CheckForStandbyTrigger())
-        {
-            *finished = true;
-            return receivedUpto;
-        }
-
-        pg_usleep(100000L); /* 100ms */
-
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        /*
-         * Emergency bailout if postmaster has died.  This is to avoid the
-         * necessity for manual cleanup of all postmaster children.
-         */
-        if (!PostmasterIsAlive(true))
-            exit(1);
-    }
-}
-
-/*
- * Stop walreceiver and wait for it to die.
- */
-static void
+void
 ShutdownWalRcv(void)
 {
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
-    pid_t walrcvpid;
+    pid_t walrcvpid = 0;

     /*
      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -164,9 +118,20 @@ ShutdownWalRcv(void)
      * restart itself.
      */
     SpinLockAcquire(&walrcv->mutex);
-    Assert(walrcv->walRcvState == WALRCV_RUNNING);
-    walrcv->walRcvState = WALRCV_STOPPING;
-    walrcvpid = walrcv->pid;
+    switch(walrcv->walRcvState)
+    {
+        case WALRCV_STOPPED:
+            break;
+        case WALRCV_STARTING:
+            walrcv->walRcvState = WALRCV_STOPPED;
+            break;
+
+        case WALRCV_RUNNING:
+        case WALRCV_STOPPING:
+            walrcv->walRcvState = WALRCV_STOPPING;
+            walrcvpid = walrcv->pid;
+            break;
+    }
     SpinLockRelease(&walrcv->mutex);

     /*
@@ -194,31 +159,8 @@ ShutdownWalRcv(void)
 }

 /*
- * Check to see if the trigger file exists. If it does, request postmaster
- * to shut down walreceiver and wait for it to exit, and remove the trigger
- * file.
- */
-static bool
-CheckForStandbyTrigger(void)
-{
-    struct stat stat_buf;
-
-    if (TriggerFile == NULL)
-        return false;
-
-    if (stat(TriggerFile, &stat_buf) == 0)
-    {
-        ereport(LOG,
-                (errmsg("trigger file found: %s", TriggerFile)));
-        ShutdownWalRcv();
-        unlink(TriggerFile);
-        return true;
-    }
-    return false;
-}
-
-/*
- * Request postmaster to start walreceiver.
+ * Request postmaster to start walreceiver, or update the starting point
+ * if already running.
  *
  * recptr indicates the position where streaming should begin, and conninfo
  * is a libpq connection string to use.
@@ -228,17 +170,29 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 {
     /* use volatile pointer to prevent code rearrangement */
     volatile WalRcvData *walrcv = WalRcv;
+    pg_time_t now = (pg_time_t) time(NULL);

-    Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
+    /*
+     * We always start at the beginning of the segment.
+     * That prevents a broken segment (i.e., with no records in the
+     * first half of a segment) from being created by XLOG streaming,
+     * which might cause trouble later on if the segment is e.g
+     * archived.
+     */
+    if (recptr.xrecoff % XLogSegSize != 0)
+        recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+
+    Assert(walrcv->walRcvState == WALRCV_STOPPED);

-    /* locking is just pro forma here; walreceiver isn't started yet */
     SpinLockAcquire(&walrcv->mutex);
-    walrcv->receivedUpto = recptr;
     if (conninfo != NULL)
         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     else
         walrcv->conninfo[0] = '\0';
-    walrcv->walRcvState = WALRCV_RUNNING;
+    walrcv->walRcvState = WALRCV_STARTING;
+    walrcv->startTime = now;
+
+    walrcv->receivedUpto = recptr;
     SpinLockRelease(&walrcv->mutex);

     SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
@@ -260,3 +214,4 @@ GetWalRcvWriteRecPtr(void)

     return recptr;
 }
+
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f492975..477431f 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -27,10 +27,10 @@
  */
 typedef enum
 {
-    WALRCV_NOT_STARTED,
-    WALRCV_RUNNING,        /* walreceiver has been started */
-    WALRCV_STOPPING,    /* requested to stop, but still running */
-    WALRCV_STOPPED        /* stopped and mustn't start up again */
+    WALRCV_STOPPED,        /* stopped and mustn't start up again */
+    WALRCV_STARTING,    /* launched, but the process hasn't initialized yet */
+    WALRCV_RUNNING,        /* walreceiver is running */
+    WALRCV_STOPPING    /* requested to stop, but still running */
 } WalRcvState;

 /* Shared memory area for management of walreceiver process */
@@ -47,6 +47,7 @@ typedef struct
      */
     pid_t    pid;
     WalRcvState walRcvState;
+    pg_time_t startTime;

     /*
      * receivedUpto-1 is the last byte position that has been already
@@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
 extern void WalReceiverMain(void);
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
+extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
 extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index f0e4329..4e305f3 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -30,7 +30,6 @@ typedef enum
     PMSIGNAL_START_AUTOVAC_LAUNCHER,    /* start an autovacuum launcher */
     PMSIGNAL_START_AUTOVAC_WORKER,        /* start an autovacuum worker */
     PMSIGNAL_START_WALRECEIVER,            /* start a walreceiver */
-    PMSIGNAL_SHUTDOWN_WALRECEIVER,        /* shut down a walreceiver */

     NUM_PMSIGNALS                /* Must be last value of enum! */
 } PMSignalReason;

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: [NOVICE] Python verison for build in config.pl (Win32)
Следующее
От: Magnus Hagander
Дата:
Сообщение: Re: [NOVICE] Python verison for build in config.pl (Win32)