Обсуждение: Streaming Replication patch for CommitFest 2009-09

Поиск
Список
Период
Сортировка

Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

Here is the latest version of Streaming Replication (SR) patch.

There were four major problems in the SR patch which was submitted for
the last CommitFest. The latest patch has overcome those problems:

> 1. Change the way synchronization is done when standby connects to
> primary. After authentication, standby should send a message to primary,
> stating the <begin> point (where <begin> is an XLogRecPtr, not a WAL
> segment name). Primary starts streaming WAL starting from that point,
> and keeps streaming forever. pg_read_xlogfile() needs to be removed.

In the latest version, at first, the standby attempts to do an archive recovery
as long as there is WAL record available in pg_xlog or archival area (only
possible if restore_command is supplied). When it finds the recovery error
(e.g., there is no WAL file available), it starts walreceiver process, and
requests the primary server to ship the WAL records following the last applied
record. Then the primary continuously sends the WAL records. OTOH, the
standby continuously receives, writes and replays them.

> 2. The primary should have no business reading back from the archive.
> The standby can read from the archive, as it can today.

I got rid of the capability to restore the archived file, from the
primary. Also in
order not to lose the WAL file (required for the standby) from pg_xlog before
sending it, I tweaked the recycling policy of checkpoint.

> 3. Need to support multiple WALSenders. While multiple slave support
> isn't 1st priority right now, it's not acceptable that a new WALSender
> can't connect while one is active already. That can cause trouble in
> case of network problems etc.

In the latest version, more than one standbys can establish a connection to
the primary. The WAL is concurrently shipped to those standbys, respectively.
The maximum number of standbys can be specified as a GUC variable
(max_wal_senders: better name?).

> 4. It is not acceptable that normal backends have to wait for walsender
> to send data. That means that connecting a standby behind a slow
> connection to the primary can grind the primary to a halt. walsender
> needs to be able to read data from disk, not just from shared memory. (I
> raised this back in December
> http://archives.postgresql.org/message-id/495106FA.1050605@enterprisedb.com)

In the latest version, the walsender reads the WAL records from disk
instead of wal_buffers. So when the backend attempts to delete old data
from wal_buffer to insert new one, it doesn't need to wait until walsender
has read that data from wal_buffers.

> As a hint, I think you'll find it a lot easier if you implement only
> asynchronous replication at first. That reduces the amount of
> inter-process communication a lot. You can then add synchronous
> capability in a later commitfest. I would also suggest that for point 4,
> you implement WAL sender so that it *only* reads from disk at first, and
> only add the capability send from wal_buffers later on, and only if
> performance testing shows that it's needed.

I advance development of SR in stages as Heikki suggested.
So note that the current patch provides only core part of *asynchronous*
log-shipping. There are many TODO items for later CommitFests:
synchronous capability, more useful statistics for SR, some feature for
admin, and so on.

The attached tarball contains some files. Description of each files,
a brief procedure to set up SR and the functional overview of it are in wiki.
And, I'm going to add the description of design of SR into wiki as much
as possible.
http://wiki.postgresql.org/wiki/Streaming_Replication

If you notice anything, please feel free to comment!

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Вложения

Re: Streaming Replication patch for CommitFest 2009-09

От
Greg Smith
Дата:
This is looking really neat now, making async replication really solid 
first before even trying to move on to sync is the right way to go here 
IMHO.  I just cleaned up the docs on the Wiki page, when this patch is 
closer to being committed I officially volunteer to do the same on the 
internal SGML docs; someone should nudge me when the patch is at that 
point if I don't take care of it before then.

Putting on my DBA hat for a minute, the first question I see people asking 
is "how do I measure how far behind the slaves are?".  Presumably you can 
get that out of pg_controldata; my first question is whether that's 
complete enough information?  If not, what else should be monitored?

I don't think running that program going to fly for a production quality 
integrated replication setup though.  The UI admins are going to want 
would allow querying this easily via a standard database query.  Most 
monitoring systems can issue psql queries but not necessarily run a remote 
binary.  I think that parts of pg_controldata needs to get exposed via 
some number of built-in UDFs instead, and whatever new internal state 
makes sense too.  I could help out writing those, if someone more familiar 
with the replication internals can help me nail down a spec on what to 
watch.

--
* Greg Smith gsmith@gregsmith.com http://www.gregsmith.com Baltimore, MD


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Greg Smith wrote:
> Putting on my DBA hat for a minute, the first question I see people
> asking is "how do I measure how far behind the slaves are?".  Presumably
> you can get that out of pg_controldata; my first question is whether
> that's complete enough information?  If not, what else should be monitored?
> 
> I don't think running that program going to fly for a production quality
> integrated replication setup though.  The UI admins are going to want
> would allow querying this easily via a standard database query.  Most
> monitoring systems can issue psql queries but not necessarily run a
> remote binary.  I think that parts of pg_controldata needs to get
> exposed via some number of built-in UDFs instead, and whatever new
> internal state makes sense too.  I could help out writing those, if
> someone more familiar with the replication internals can help me nail
> down a spec on what to watch.

Yep, assuming for a moment that hot standby goes into 8.5, status
functions that return such information is the natural interface. It
should be trivial to write them as soon as hot standby and streaming
replication are in place.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Andrew Dunstan
Дата:

Greg Smith wrote:
> This is looking really neat now, making async replication really solid 
> first before even trying to move on to sync is the right way to go 
> here IMHO.

I agree with both of those sentiments.

One question I have is what is the level of traffic involved between the 
master and the slave. I know numbers of people have found the traffic 
involved in shipping of log files to be a pain, and thus we get things 
like pglesslog.

cheers

andrew


Re: Streaming Replication patch for CommitFest 2009-09

От
"Kevin Grittner"
Дата:
Greg Smith <gsmith@gregsmith.com> wrote:
> Putting on my DBA hat for a minute, the first question I see people
> asking is "how do I measure how far behind the slaves are?". 
> Presumably you can get that out of pg_controldata; my first question
> is whether that's complete enough information?  If not, what else
> should be monitored?
> 
> I don't think running that program going to fly for a production
> quality integrated replication setup though.  The UI admins are
> going to want would allow querying this easily via a standard
> database query.  Most monitoring systems can issue psql queries but
> not necessarily run a remote binary.  I think that parts of
> pg_controldata needs to get exposed via some number of built-in UDFs
> instead, and whatever new internal state makes sense too.  I could
> help out writing those, if someone more familiar with the
> replication internals can help me nail down a spec on what to watch.
IMO, it would be best if the status could be sent via NOTIFY.  In my
experience, this results in monitoring which both has less overhead
and is more current.  We tend to be almost as interested in metrics on
throughput as lag.  Backlogged volume can be interesting, too, if it's
available.
-Kevin


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Kevin Grittner wrote:
> Greg Smith <gsmith@gregsmith.com> wrote:
>> I don't think running that program going to fly for a production
>> quality integrated replication setup though.  The UI admins are
>> going to want would allow querying this easily via a standard
>> database query.  Most monitoring systems can issue psql queries but
>> not necessarily run a remote binary.  I think that parts of
>> pg_controldata needs to get exposed via some number of built-in UDFs
>> instead, and whatever new internal state makes sense too.  I could
>> help out writing those, if someone more familiar with the
>> replication internals can help me nail down a spec on what to watch.
>  
> IMO, it would be best if the status could be sent via NOTIFY.

To where?

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
"Kevin Grittner"
Дата:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> wrote:
> Kevin Grittner wrote:
>> IMO, it would be best if the status could be sent via NOTIFY.
> 
> To where?
To registered listeners?
I guess I should have worded that as "it would be best if a change is
replication status could be signaled via NOTIFY" -- does that satisfy,
or am I missing your point entirely?
-Kevin


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> Here is the latest version of Streaming Replication (SR) patch.

The first thing that caught my eye is that I don't think "replication"
should be a real database. Rather, it should by a keyword in
pg_hba.conf, like the existing "all", "sameuser", "samerole" keywords
that you can put into the database-column.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Simon Riggs
Дата:
On Mon, 2009-09-14 at 20:24 +0900, Fujii Masao wrote:

> The latest patch has overcome those problems:

Well done. I hope to look at it myself in a few days time.

-- Simon Riggs           www.2ndQuadrant.com



Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Tue, Sep 15, 2009 at 12:47 AM, Greg Smith <gsmith@gregsmith.com> wrote:
> Putting on my DBA hat for a minute, the first question I see people asking
> is "how do I measure how far behind the slaves are?".  Presumably you can
> get that out of pg_controldata; my first question is whether that's complete
> enough information?  If not, what else should be monitored?

Currently the progress of replication is shown only in PS display. So, the
following three steps are necessary to measure the gap of the servers.

1. execute pg_current_xlog_location() to check how far the primary has   written WAL.
2. execute 'ps' to check how far the standby has written WAL.
3. compare the above results.

This is very messy. More user-friendly monitoring feature is necessary,
and development of it is one of TODO item for the later CommitFest.

I'm thinking something like pg_standbys_xlog_location() which returns
one row per standby servers, showing pid of walsender, host name/
port number/user OID of the standby, the location where the standby
has written/flushed WAL. DBA can measure the gap from the
combination of pg_current_xlog_location() and pg_standbys_xlog_location()
via one query on the primary. Thought?

But the problem might be what happens after the primary has fallen
down. The current write location of the primary cannot be checked via
pg_current_xlog_locaton, and might need to be calculated from WAL
files on the primary. Is the tool which performs such calculation
necessary?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Tue, Sep 15, 2009 at 1:06 AM, Andrew Dunstan <andrew@dunslane.net> wrote:
> One question I have is what is the level of traffic involved between the
> master and the slave. I know numbers of people have found the traffic
> involved in shipping of log files to be a pain, and thus we get things like
> pglesslog.

That is almost the same as the WAL write traffic on the primary. In fact,
the content of WAL files written to the standby are exactly the same as
those on the primary. Currently SR has provided no compression
capability of the traffic. Should we introduce something like
walsender_hook/walreceiver_hook to cooperate with the add-on program
for compression like pglesslog?

If you always use PITR instead of normal recovery, full_page_writes = off
might be another solution.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Tue, Sep 15, 2009 at 2:54 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> The first thing that caught my eye is that I don't think "replication"
> should be a real database. Rather, it should by a keyword in
> pg_hba.conf, like the existing "all", "sameuser", "samerole" keywords
> that you can put into the database-column.

I'll try that! It might be only necessary to prevent walsender from accessing
pg_database and checking if the target database is present, in InitPostres().

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Kevin Grittner wrote:
> Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> wrote:
>> Kevin Grittner wrote:
>  
>>> IMO, it would be best if the status could be sent via NOTIFY.
>> To where?
>  
> To registered listeners?
>  
> I guess I should have worded that as "it would be best if a change is
> replication status could be signaled via NOTIFY" -- does that satisfy,
> or am I missing your point entirely?

Ok, makes more sense now.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
After playing with this a little bit, I think we need logic in the slave
to reconnect to the master if the connection is broken for some reason,
or can't be established in the first place. At the moment, that is
considered as the end of recovery, and the slave starts up. You have the
trigger file mechanism to stop that, but it only gives you a chance to
manually kill and restart the slave before it chooses a new timeline and
starts up, it doesn't reconnect automatically.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Tue, Sep 15, 2009 at 7:53 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> After playing with this a little bit, I think we need logic in the slave
> to reconnect to the master if the connection is broken for some reason,
> or can't be established in the first place. At the moment, that is
> considered as the end of recovery, and the slave starts up. You have the
> trigger file mechanism to stop that, but it only gives you a chance to
> manually kill and restart the slave before it chooses a new timeline and
> starts up, it doesn't reconnect automatically.

I was thinking that the automatic reconnection capability is the TODO item
for the later CF. The infrastructure for it has already been introduced in the
current patch. Please see the macro MAX_WALRCV_RETRIES (backend/
postmaster/walreceiver.c). This is the maximum number of times to retry
walreceiver. In the current version, this is the fixed value, but we can make
this user-configurable (parameter of recovery.conf is suitable, I think).

Also a parameter like retries_interval might be necessary. This parameter
indicates the interval between each reconnection attempt.

Do you think that these parameters should be introduced right now? or
the later CF?

BTW, these parameters are provided in MySQL replication.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Wed, Sep 16, 2009 at 11:37 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
> I was thinking that the automatic reconnection capability is the TODO item
> for the later CF. The infrastructure for it has already been introduced in the
> current patch. Please see the macro MAX_WALRCV_RETRIES (backend/
> postmaster/walreceiver.c). This is the maximum number of times to retry
> walreceiver. In the current version, this is the fixed value, but we can make
> this user-configurable (parameter of recovery.conf is suitable, I think).
>
> Also a parameter like retries_interval might be necessary. This parameter
> indicates the interval between each reconnection attempt.
>
> Do you think that these parameters should be introduced right now? or
> the later CF?

I updated the TODO list on the wiki, and marked the items that I'm going to
develop for the later CommitFest.
http://wiki.postgresql.org/wiki/Streaming_Replication#Todo_and_Claim

Do you have any other TODO item? How much is that priority?
And, is there already-listed TODO item which should be developed right
now (CommitFest 2009-09)?

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> On Tue, Sep 15, 2009 at 7:53 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> After playing with this a little bit, I think we need logic in the slave
>> to reconnect to the master if the connection is broken for some reason,
>> or can't be established in the first place. At the moment, that is
>> considered as the end of recovery, and the slave starts up. You have the
>> trigger file mechanism to stop that, but it only gives you a chance to
>> manually kill and restart the slave before it chooses a new timeline and
>> starts up, it doesn't reconnect automatically.
> 
> I was thinking that the automatic reconnection capability is the TODO item
> for the later CF. The infrastructure for it has already been introduced in the
> current patch. Please see the macro MAX_WALRCV_RETRIES (backend/
> postmaster/walreceiver.c). This is the maximum number of times to retry
> walreceiver. In the current version, this is the fixed value, but we can make
> this user-configurable (parameter of recovery.conf is suitable, I think).

Ah, I see.

Robert Haas suggested a while ago that walreceiver could be a
stand-alone utility, not requiring postmaster at all. That would allow
you to set up streaming replication as another way to implement WAL
archiving. Looking at how the processes interact, there really isn't
much communication between walreceiver and the rest of the system, so
that sounds pretty attractive.

Walreceiver only needs access to shared memory so that it can tell the
startup process how far it has replicated already. Even when we add the
synchronous capability, I don't think we need any more inter-process
communication. Only if we wanted to acknowledge to the master when a
piece of WAL log has been successfully replayed, the startup process
would need to tell walreceiver about it, but I think we're going to
settle for acknowledging when a piece of log has been fsync'd to disk.

Walreceiver is really a slave to the startup process. The startup
process decides when it's launched, and it's the startup process that
then waits for it to advance. But the way it's set up at the moment, the
startup process needs to ask the postmaster to start it up, and it
doesn't look very robust to me. For example, if launching walreceiver
fails for some reason, startup process will just hang waiting for it.

I'm thinking that walreceiver should be a stand-alone program that the
startup process launches, similar to how it invokes restore_command in
PITR recovery. Instead of using system(), though, it would use
fork+exec, and a pipe to communicate.

Also, when we get around to implement the "fetch base backup
automatically via the TCP connection" feature, we can't use walreceiver
as it is now for that, because there's no hope of starting up the system
that far without a base backup. I'm not sure if it can or should be
merged with the walreceiver program, but it can't be a postmaster child
process, that's for sure.

Thoughts?

> Also a parameter like retries_interval might be necessary. This parameter
> indicates the interval between each reconnection attempt.

Yeah, maybe, although a hard-coded interval of a few seconds should be
enough to get us started.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Magnus Hagander
Дата:
On Thu, Sep 17, 2009 at 10:08, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Fujii Masao wrote:
>> On Tue, Sep 15, 2009 at 7:53 PM, Heikki Linnakangas
>> <heikki.linnakangas@enterprisedb.com> wrote:
>>> After playing with this a little bit, I think we need logic in the slave
>>> to reconnect to the master if the connection is broken for some reason,
>>> or can't be established in the first place. At the moment, that is
>>> considered as the end of recovery, and the slave starts up. You have the
>>> trigger file mechanism to stop that, but it only gives you a chance to
>>> manually kill and restart the slave before it chooses a new timeline and
>>> starts up, it doesn't reconnect automatically.
>>
>> I was thinking that the automatic reconnection capability is the TODO item
>> for the later CF. The infrastructure for it has already been introduced in the
>> current patch. Please see the macro MAX_WALRCV_RETRIES (backend/
>> postmaster/walreceiver.c). This is the maximum number of times to retry
>> walreceiver. In the current version, this is the fixed value, but we can make
>> this user-configurable (parameter of recovery.conf is suitable, I think).
>
> Ah, I see.
>
> Robert Haas suggested a while ago that walreceiver could be a
> stand-alone utility, not requiring postmaster at all. That would allow
> you to set up streaming replication as another way to implement WAL
> archiving. Looking at how the processes interact, there really isn't
> much communication between walreceiver and the rest of the system, so
> that sounds pretty attractive.

Yes, that would be very very useful.


> Walreceiver is really a slave to the startup process. The startup
> process decides when it's launched, and it's the startup process that
> then waits for it to advance. But the way it's set up at the moment, the
> startup process needs to ask the postmaster to start it up, and it
> doesn't look very robust to me. For example, if launching walreceiver
> fails for some reason, startup process will just hang waiting for it.
>
> I'm thinking that walreceiver should be a stand-alone program that the
> startup process launches, similar to how it invokes restore_command in
> PITR recovery. Instead of using system(), though, it would use
> fork+exec, and a pipe to communicate.

Not having looked at all into the details, that sounds like a nice
improvement :-)


-- Magnus HaganderMe: http://www.hagander.net/Work: http://www.redpill-linpro.com/


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Some random comments:

I don't think we need the new PM_SHUTDOWN_3 postmaster state. We can
treat walsenders the same as the archive process, and kill and wait for
both of them to die in PM_SHUTDOWN_2 state.

I think there's something wrong with the napping in walsender. When I
perform px_xlog_switch(), it takes surprisingly long for it to trickle
to the standby. When I put a little proxy program in between the master
and slave that delays all messages from the slave to the master by one
second, it got worse, even though I would expect the master to still
keep sending WAL at full speed. I get logs like this:

2009-09-17 14:13:16.876 EEST LOG:  xlog send request 0/38000000; send
0/3700006C; write 0/3700006C
2009-09-17 14:13:16.877 EEST LOG:  xlog read request 0/37010000; send
0/37010000; write 0/3700006C
2009-09-17 14:13:17.077 EEST LOG:  xlog send request 0/38000000; send
0/37010000; write 0/3700006C
2009-09-17 14:13:17.077 EEST LOG:  xlog read request 0/37020000; send
0/37020000; write 0/3700006C
2009-09-17 14:13:17.078 EEST LOG:  xlog read request 0/37030000; send
0/37030000; write 0/3700006C
2009-09-17 14:13:17.278 EEST LOG:  xlog send request 0/38000000; send
0/37030000; write 0/3700006C
2009-09-17 14:13:17.279 EEST LOG:  xlog read request 0/37040000; send
0/37040000; write 0/3700006C
...
2009-09-17 14:13:22.796 EEST LOG:  xlog read request 0/37FD0000; send
0/37FD0000; write 0/376D0000
2009-09-17 14:13:22.896 EEST LOG:  xlog send request 0/38000000; send
0/37FD0000; write 0/376D0000
2009-09-17 14:13:22.896 EEST LOG:  xlog read request 0/37FE0000; send
0/37FE0000; write 0/376D0000
2009-09-17 14:13:22.896 EEST LOG:  xlog read request 0/37FF0000; send
0/37FF0000; write 0/376D0000
2009-09-17 14:13:22.897 EEST LOG:  xlog read request 0/38000000; send
0/38000000; write 0/376D0000
2009-09-17 14:14:09.932 EEST LOG:  xlog send request 0/38000428; send
0/38000000; write 0/38000000
2009-09-17 14:14:09.932 EEST LOG:  xlog read request 0/38000428; send
0/38000428; write 0/38000000

It looks like it's having 100 or 200 ms naps in between. Also, I
wouldn't expect to see so many "read request" acknowledgments from the
slave. The master doesn't really need to know how far the slave is,
except in synchronous replication when it has requested a flush to
slave. Another reason why master needs to know is so that the master can
recycle old log files, but for that we'd really only need an
acknowledgment once per WAL file or even less.

Why does XLogSend() care about page boundaries? Perhaps it's a leftover
from the old approach that read from wal_buffers?

Do we really need the support for asynchronous backend libpq commands?
Could walsender just keep blasting WAL to the slave, and only try to
read an acknowledgment after it has requested one, by setting
XLOGSTREAM_FLUSH flag. Or maybe we should be putting the socket into
non-blocking mode.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com



Re: Streaming Replication patch for CommitFest 2009-09

От
Csaba Nagy
Дата:
On Thu, 2009-09-17 at 10:08 +0200, Heikki Linnakangas wrote:
> Robert Haas suggested a while ago that walreceiver could be a
> stand-alone utility, not requiring postmaster at all. That would allow
> you to set up streaming replication as another way to implement WAL
> archiving. Looking at how the processes interact, there really isn't
> much communication between walreceiver and the rest of the system, so
> that sounds pretty attractive.

Just a small comment in this direction: what if the archive would be
itself a postgres DB, and it would collect the WALs in some special
place (together with some meta data, snapshots, etc), and then a slave
could connect to it just like to any other master ? (except maybe it
could specify which snapshot to to start with and possibly choosing
between different archived WAL streams).

Maybe it is completely stupid what I'm saying, but I see the archive as
just another form of a postgres server, with the same protocol from the
POV of a slave. While I don't have the clue to implement such a thing, I
thought it might be interesting as an idea while discussing the
walsender/receiver interface...

Cheers,
Csaba.




Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Heikki Linnakangas wrote:
> I'm thinking that walreceiver should be a stand-alone program that the
> startup process launches, similar to how it invokes restore_command in
> PITR recovery. Instead of using system(), though, it would use
> fork+exec, and a pipe to communicate.

Here's a WIP patch to do that, over your latest posted patch. I've also
pushed this to my git repository at
git://git.postgresql.org/git/users/heikki/postgres.git, "replication"
branch.

I'll continue reviewing...

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6804644..364d7e4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -41,7 +41,6 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
@@ -54,6 +53,7 @@
 #include "utils/guc.h"
 #include "utils/ps_status.h"
 #include "pg_trace.h"
+#include "postmaster/fork_process.h"


 /* File path names (all relative to $PGDATA) */
@@ -185,7 +185,8 @@ static TimestampTz recoveryLastXTime = 0;

 /* options taken from recovery.conf for XLOG streaming */
 static bool StandbyMode = false;
-char *TriggerFile = NULL;
+static char *TriggerFile = NULL;
+static char *conninfo = NULL;

 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
 static TransactionId recoveryStopXid;
@@ -489,6 +490,8 @@ static volatile sig_atomic_t shutdown_requested = false;
 static volatile sig_atomic_t in_restore_command = false;


+static pid_t WalReceiverPid = 0;
+
 static void XLogArchiveNotify(const char *xlog);
 static bool XLogArchiveCheckDone(const char *xlog);
 static bool XLogArchiveIsBusy(const char *xlog);
@@ -541,6 +544,10 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int    get_sync_bit(int method);

+static void StartWalReceiver(TimeLineID tli, XLogRecPtr startlsn);
+static void WaitNextXLogAvailable(XLogRecPtr recptr);
+static void WaitForTrigger(void);
+

 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -1180,18 +1187,6 @@ XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
     return false;                /* buffer does not need to be backed up */
 }

-/* Report XLOG streaming progress in PS display */
-void
-ReportLogstreamResult(void)
-{
-    char    activitymsg[50];
-
-    snprintf(activitymsg, sizeof(activitymsg),
-             "streaming %X/%X",
-             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
-    set_ps_display(activitymsg, false);
-}
-
 /*
  * XLogArchiveNotify
  *
@@ -3469,7 +3464,7 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)

         /* If there is no valid record available, request XLOG streaming */
         startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
-        RequestXLogStreaming(recoveryTargetTLI, startlsn);
+        StartWalReceiver(recoveryTargetTLI, startlsn);

         /* Needs to read the current page again if the next record is in it */
         needReread = haveNextRecord;
@@ -4934,7 +4929,6 @@ readRecoveryCommandFile(void)
 {
     FILE       *fd;
     char        cmdline[MAXPGPATH];
-    char       *conninfo = NULL;
     TimeLineID    rtli = 0;
     bool        rtliGiven = false;
     bool        syntaxError = false;
@@ -5113,14 +5107,6 @@ readRecoveryCommandFile(void)
                         cmdline),
               errhint("Lines should have the format parameter = 'value'.")));

-    /* Inform walreceiver of the connection information via file */
-    if (StandbyMode)
-    {
-        write_conninfo_file(conninfo);
-        if (conninfo)
-            pfree(conninfo);
-    }
-
     /* If not in standby mode, restore_command must be supplied */
     if (!StandbyMode && recoveryRestoreCommand == NULL)
         ereport(FATAL,
@@ -5282,7 +5268,13 @@ exitStreamingRecovery(void)
      * exited, and recovery checkpoint and subsequent records are
      * no longer overwritten unexpectedly.
      */
-    ShutdownWalRcv();
+    if (WalReceiverPid != 0)
+    {
+        int status = 0;
+        kill(WalReceiverPid, SIGTERM);
+        waitpid(WalReceiverPid, &status, 0);
+        WalReceiverPid = 0;
+    }

     /* We are no longer in streaming recovery state */
     InStreamingRecovery = false;
@@ -7185,7 +7177,7 @@ CreateRestartPoint(int flags)

     /* Are we doing recovery from XLOG stream? */
     if (!InStreamingRecovery)
-        InStreamingRecovery = WalRcvInProgress();
+        InStreamingRecovery = (WalReceiverPid != 0);

     /*
      * Delete old log files (those no longer needed even for previous
@@ -7203,8 +7195,9 @@ CreateRestartPoint(int flags)
     {
         XLogRecPtr    endptr;

-        /* Get the current (or recent) end of xlog */
-        endptr = GetWalRcvWriteRecPtr();
+        LWLockAcquire(ControlFileLock, LW_SHARED);
+        endptr = ControlFile->minRecoveryPoint;
+        LWLockRelease(ControlFileLock);

         PrevLogSeg(_logId, _logSeg);
         RemoveOldXlogFiles(_logId, _logSeg, endptr);
@@ -8443,3 +8436,193 @@ StartupProcessMain(void)
      */
     proc_exit(0);
 }
+
+
+
+
+/** WAL receiver stuff **/
+
+static int walreceiver_readfd;
+
+static void
+StartWalReceiver(TimeLineID tli, XLogRecPtr startlsn)
+{
+    pid_t pid;
+    char       *av[4];
+    char    startptr[22];
+    int pfildes[2];
+
+    sprintf(startptr, "%u %X/%X", tli, startlsn.xlogid, startlsn.xrecoff);
+
+    av[0] = "walreceiver";
+    av[1] = startptr;
+    av[2] = conninfo;
+    av[3] = 0;
+
+    pipe(pfildes);
+
+    /* Fire off execv in child */
+    if ((pid = fork_process()) == 0)
+    {
+        char walreceiverpath[MAXPGPATH];
+
+        find_other_exec(my_exec_path, "walreceiver",
+                        "walreceiver " PG_VERSION_STR,
+                        walreceiverpath);
+
+        dup2(pfildes[1], 1); /* stdout */
+        if (execv(walreceiverpath, av) < 0)
+        {
+            ereport(LOG,
+                    (errmsg("could not execute walreceiver process \"%s\": %m",
+                            walreceiverpath)));
+            /* We're already in the child process here, can't return */
+            exit(1);
+        }
+    }
+    else
+    {
+        walreceiver_readfd = pfildes[0];
+        WalReceiverPid = pid;
+    }
+}
+
+static bool foundTrigger = false;
+
+/*
+ * Wait for the XLOG records at given position available.
+ *
+ * The XLOG records already written by walreceiver are regarded as
+ * available.
+ *
+ * recptr: indicates the byte position which caller wants to read the
+ * XLOG record up to.
+ *
+ * Called by the startup process in streaming recovery.
+ */
+static void
+WaitNextXLogAvailable(XLogRecPtr recptr)
+{
+    struct stat stat_buf;
+    bool signaled = false;
+
+#ifdef REPLICATION_DEBUG
+    if (REPLICATION_DEBUG_ENABLED)
+        elog(LOG, "xlog wait request %X/%X; write %X/%X",
+             recptr.xlogid, recptr.xrecoff,
+             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+#endif
+
+    /* Quick exit if already known available */
+    while(XLByteLT(recptr, LogstreamResult.Write))
+    {
+        char buf[101];
+        int i;
+
+        for(i = 0; i < 100; i++)
+        {
+            read(walreceiver_readfd, &buf[i], 1);
+            if (buf[i] == '\0')
+                break;
+        }
+        buf[i] = '\0';
+
+        /* Update local status */
+        sscanf(buf, "%X/%X", &LogstreamResult.Write.xlogid, &LogstreamResult.Write.xrecoff);
+
+        /* If available already, leave here */
+        if (XLByteLT(recptr, LogstreamResult.Write))
+        {
+            /* XXX
+            XLogArchiveNotifySeg(recvId, recvSeg);
+            */
+
+            return;
+        }
+
+        /* Check to see if the trigger file exists */
+        if (TriggerFile != NULL && !foundTrigger &&
+            stat(TriggerFile, &stat_buf) == 0)
+        {
+            ereport(LOG,
+                    (errmsg("trigger file found: %s", TriggerFile)));
+            foundTrigger = true;
+            unlink(TriggerFile);
+        }
+
+        /*
+         * The presence of a trigger file shuts down walreceiver if it's
+         * in progress.
+         */
+        if (WalReceiverPid != 0)
+        {
+            if (foundTrigger && !signaled)
+            {
+                kill(WalReceiverPid, SIGTERM);
+                signaled = true;    /* prevents signal from being repeated */
+            }
+        }
+        /*
+         * If walreceiver is not in progress and has been retried more than
+         * MAX_WALRCV_RETRIES times, give up on the wait for the next record,
+         * which would cause a streaming recovery to end. If the former
+         * condition is met and the retry-count has not reached the maximum
+         * number yet, request XLOG streaming again.
+         */
+        else
+        {
+            return;
+        }
+
+        /*
+         * This possibly-long loop needs to handle interrupts of startup
+         * process.
+         */
+        HandleStartupProcInterrupts();
+
+        pg_usleep(100000L); /* 100ms */
+    }
+}
+
+
+/* Wait until a trigger file is found */
+static void
+WaitForTrigger(void)
+{
+    int    seconds_before_warning = 15;
+    int    elapsed    = 0;
+    int count    = 0;
+    struct stat stat_buf;
+
+    /* Quick exit if a trigger file was not specified or was already found */
+    if (TriggerFile == NULL || foundTrigger)
+        return;
+
+    while (stat(TriggerFile, &stat_buf) != 0)
+    {
+        /*
+         * This possibly-long loop needs to handle interrupts of startup
+         * process.
+         */
+        HandleStartupProcInterrupts();
+
+        pg_usleep(100000L);        /* 100ms */
+
+        if (++count >= 10)        /* 1s passed */
+        {
+            count = 0;
+
+            if (++elapsed >= seconds_before_warning)
+            {
+                seconds_before_warning *= 2;     /* This wraps in >10 years... */
+                ereport(WARNING,
+                        (errmsg("still waiting for the trigger file \"%s\" (%d seconds elapsed)",
+                                TriggerFile, elapsed)));
+            }
+        }
+    }
+
+    ereport(LOG,
+            (errmsg("trigger file found: %s", TriggerFile)));
+    unlink(TriggerFile);
+}
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 824a93f..06e9d33 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -31,7 +31,6 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "postmaster/bgwriter.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walwriter.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
@@ -340,9 +339,6 @@ AuxiliaryProcessMain(int argc, char *argv[])
             case WalWriterProcess:
                 statmsg = "wal writer process";
                 break;
-            case WalReceiverProcess:
-                statmsg = "wal receiver process";
-                break;
             default:
                 statmsg = "??? process";
                 break;
@@ -448,11 +444,6 @@ AuxiliaryProcessMain(int argc, char *argv[])
             WalWriterMain();
             proc_exit(1);        /* should never return */

-        case WalReceiverProcess:
-            /* don't set signals, walreceiver has its own agenda */
-            WalReceiverMain();
-            proc_exit(1);        /* should never return */
-
         default:
             elog(PANIC, "unrecognized process type: %d", auxType);
             proc_exit(1);
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 616cd2c..b73fdf4 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)

 OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \
-    syslogger.o walwriter.o walsender.o walreceiver.o
+    syslogger.o walwriter.o walsender.o

 walreceiver.o: submake-libpq

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 3ad82ef..b91ca88 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -108,7 +108,6 @@
 #include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walsender.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -125,7 +124,6 @@
 #include "storage/spin.h"
 #endif

-
 /*
  * List of active backends (or child processes anyway; we don't actually
  * know whether a given child has become a backend or is still in the
@@ -217,7 +215,6 @@ char       *bonjour_name;
 static pid_t StartupPID = 0,
             BgWriterPID = 0,
             WalWriterPID = 0,
-            WalReceiverPID = 0,
             AutoVacPID = 0,
             PgArchPID = 0,
             PgStatPID = 0,
@@ -289,7 +286,6 @@ typedef enum
     PM_WAIT_BACKENDS,            /* waiting for live backends to exit */
     PM_SHUTDOWN,                /* waiting for bgwriter to do shutdown ckpt */
     PM_SHUTDOWN_2,                /* waiting for archiver to finish */
-    PM_SHUTDOWN_3,                /* waiting for walsenders to finish */
     PM_WAIT_DEAD_END,            /* waiting for dead_end children to exit */
     PM_NO_CHILDREN                /* all important children have exited */
 } PMState;
@@ -468,7 +464,6 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartupDataBase()        StartChildProcess(StartupProcess)
 #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
 #define StartWalWriter()        StartChildProcess(WalWriterProcess)
-#define StartWalReceiver()        StartChildProcess(WalReceiverProcess)

 /* Macros to check exit status of a child process */
 #define EXIT_STATUS_0(st)  ((st) == 0)
@@ -1465,9 +1460,10 @@ ServerLoop(void)

         /* If we have lost the archiver, try to start a new one */
         if (XLogArchivingActive() && PgArchPID == 0 &&
-            (pmState == PM_RUN ||
+            (pmState == PM_RUN /* XXX postmaster doesn't know if walreceiver is active
+                                 ||
              ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
-              WalRcvInProgress())))
+             WalRcvInProgress()) */))
             PgArchPID = pgarch_start();

         /* If we have lost the stats collector, try to start a new one */
@@ -1640,7 +1636,7 @@ retry1:
     if (proto == XLOG_STREAMING_CODE && !am_walsender)
     {
         am_walsender = true;
-        /* No packets other than regular one should not follow */
+        /* No packets other than regular one should follow */
         return ProcessStartupPacket(port, SSLdone);
     }

@@ -2097,8 +2093,6 @@ SIGHUP_handler(SIGNAL_ARGS)
             signal_child(BgWriterPID, SIGHUP);
         if (WalWriterPID != 0)
             signal_child(WalWriterPID, SIGHUP);
-        if (WalReceiverPID != 0)
-            signal_child(WalReceiverPID, SIGHUP);
         if (AutoVacPID != 0)
             signal_child(AutoVacPID, SIGHUP);
         if (PgArchPID != 0)
@@ -2194,8 +2188,6 @@ pmdie(SIGNAL_ARGS)

             if (StartupPID != 0)
                 signal_child(StartupPID, SIGTERM);
-            if (WalReceiverPID != 0)
-                signal_child(WalReceiverPID, SIGTERM);
             if (pmState == PM_RECOVERY)
             {
                 /* only bgwriter is active in this state */
@@ -2243,8 +2235,6 @@ pmdie(SIGNAL_ARGS)
                 signal_child(BgWriterPID, SIGQUIT);
             if (WalWriterPID != 0)
                 signal_child(WalWriterPID, SIGQUIT);
-            if (WalReceiverPID != 0)
-                signal_child(WalReceiverPID, SIGQUIT);
             if (AutoVacPID != 0)
                 signal_child(AutoVacPID, SIGQUIT);
             if (PgArchPID != 0)
@@ -2404,17 +2394,16 @@ reaper(SIGNAL_ARGS)
                  */
                 Assert(Shutdown > NoShutdown);

-                if (PgArchPID != 0)
+                if (PgArchPID != 0 || WalSndInProgress())
                 {
                     /* Waken archiver for the last time */
-                    signal_child(PgArchPID, SIGUSR2);
-                    pmState = PM_SHUTDOWN_2;
-                }
-                else if (WalSndInProgress())
-                {
+                    if (PgArchPID != 0)
+                        signal_child(PgArchPID, SIGUSR2);
+
                     /* Waken walsenders for the last time */
                     SignalWalSenders(SIGUSR2);
-                    pmState = PM_SHUTDOWN_3;
+
+                    pmState = PM_SHUTDOWN_2;
                 }
                 else
                     pmState = PM_WAIT_DEAD_END;
@@ -2454,20 +2443,6 @@ reaper(SIGNAL_ARGS)
         }

         /*
-         * Was it the wal receiver?  If exit status is zero (normal) or one
-         * (FATAL exit), we assume everything is all right just like normal
-         * backends.
-         */
-        if (pid == WalReceiverPID)
-        {
-            WalReceiverPID = 0;
-            if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
-                HandleChildCrash(pid, exitstatus,
-                                 _("WAL receiver process"));
-            continue;
-        }
-
-        /*
          * Was it the autovacuum launcher?    Normal exit can be ignored; we'll
          * start a new one at the next iteration of the postmaster's main
          * loop, if necessary.    Any other exit condition is treated as a
@@ -2495,16 +2470,12 @@ reaper(SIGNAL_ARGS)
                 LogChildExit(LOG, _("archiver process"),
                              pid, exitstatus);
             if (XLogArchivingActive() &&
-                (pmState == PM_RUN ||
+                (pmState == PM_RUN/*  XXX postmaster doesn't know if walreceiver is active
+                                      ||
                  ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
-                  WalRcvInProgress())))
+                 WalRcvInProgress())*/))
                 PgArchPID = pgarch_start();
-            else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress())
-            {
-                SignalWalSenders(SIGUSR2);
-                pmState = PM_SHUTDOWN_3;
-            }
-            else
+            else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress())
                 pmState = PM_WAIT_DEAD_END;
             continue;
         }
@@ -2611,8 +2582,8 @@ CleanupBackend(int pid,
                  * advance to the next shutdown step.
                  */
                 if (bp->child_type == BACKEND_TYPE_WALSND &&
-                    pmState == PM_SHUTDOWN_3 &&
-                    !WalSndInProgress())
+                    pmState == PM_SHUTDOWN_2 &&
+                    !WalSndInProgress() && PgArchPID == 0)
                     pmState = PM_WAIT_DEAD_END;
             }
             DLRemove(curr);
@@ -2729,18 +2700,6 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
         signal_child(WalWriterPID, (SendStop ? SIGSTOP : SIGQUIT));
     }

-    /* Take care of the walreceiver too */
-    if (pid == WalReceiverPID)
-        WalReceiverPID = 0;
-    else if (WalReceiverPID != 0 && !FatalError)
-    {
-        ereport(DEBUG2,
-                (errmsg_internal("sending %s to process %d",
-                                 (SendStop ? "SIGSTOP" : "SIGQUIT"),
-                                 (int) WalReceiverPID)));
-        signal_child(WalReceiverPID, (SendStop ? SIGSTOP : SIGQUIT));
-    }
-
     /* Take care of the autovacuum launcher too */
     if (pid == AutoVacPID)
         AutoVacPID = 0;
@@ -2884,7 +2843,6 @@ PostmasterStateMachine(void)
          */
         if (CountChildren(true) == 0 &&
             StartupPID == 0 &&
-            WalReceiverPID == 0 &&
             (BgWriterPID == 0 || !FatalError) &&
             WalWriterPID == 0 &&
             AutoVacPID == 0)
@@ -2961,7 +2919,6 @@ PostmasterStateMachine(void)
         {
             /* These other guys should be dead already */
             Assert(StartupPID == 0);
-            Assert(WalReceiverPID == 0);
             Assert(BgWriterPID == 0);
             Assert(WalWriterPID == 0);
             Assert(AutoVacPID == 0);
@@ -4119,9 +4076,10 @@ sigusr1_handler(SIGNAL_ARGS)

     if (CheckPostmasterSignal(PMSIGNAL_START_ARCHIVER) &&
         XLogArchivingActive() && PgArchPID == 0 &&
-        (pmState == PM_RUN ||
+        (pmState == PM_RUN /*  XXX postmaster doesn't know if walreceiver is active
+                               ||
          ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
-          WalRcvInProgress())))
+         WalRcvInProgress()) */))
     {
         /*
          * Start archiver process. This is mainly called for archiving during
@@ -4173,12 +4131,6 @@ sigusr1_handler(SIGNAL_ARGS)
         RegisterWalSender();
     }

-    if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER))
-    {
-        /* The startup process wants us to start a walreceiver */
-        WalReceiverPID = StartWalReceiver();
-    }
-
     PG_SETMASK(&UnBlockSig);

     errno = save_errno;
@@ -4372,10 +4324,6 @@ StartChildProcess(AuxProcType type)
                 ereport(LOG,
                         (errmsg("could not fork WAL writer process: %m")));
                 break;
-            case WalReceiverProcess:
-                ereport(LOG,
-                        (errmsg("could not fork WAL receiver process: %m")));
-                break;
             default:
                 ereport(LOG,
                         (errmsg("could not fork process: %m")));
diff --git a/src/backend/postmaster/walreceiver.c b/src/backend/postmaster/walreceiver.c
deleted file mode 100644
index a07b1f2..0000000
--- a/src/backend/postmaster/walreceiver.c
+++ /dev/null
@@ -1,980 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * walreceiver.c
- *
- * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
- * takes charge of XLOG streaming receiver in the standby server. At first,
- * it is started by the postmaster and connects to the primary server,
- * when the startup process in the standby mode requests XLOG streaming
- * replication. It attempts to keep receiving XLOG records from the primary
- * server and writing them to the disk, as long as the connection is alive
- * (i.e., like any backend, there is an one to one relationship between
- * a connection and the walreceiver process). Also, it notifies the startup
- * process of the location of XLOG records available. This enables
- * the startup process to read XLOG records from XLOG stream and apply them
- * to make a replica of the primary database.
- *
- * Normal termination is by SIGTERM or an end-of-streaming message from the
- * primary server, which instructs the walreceiver to exit(0). Emergency
- * termination is by SIGQUIT; like any backend, the walreceiver will simply
- * abort and exit on SIGQUIT. A close of the connection and a FATAL error
- * are treated as not a crash but approximately normal termination.
- *
- * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group
- *
- *
- * IDENTIFICATION
- *      $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include <unistd.h>
-
-#include "access/xlog_internal.h"
-#include "libpq-fe.h"
-#include "libpq/pqsignal.h"
-#include "miscadmin.h"
-#include "postmaster/walreceiver.h"
-#include "storage/fd.h"
-#include "storage/ipc.h"
-#include "storage/pmsignal.h"
-#include "storage/shmem.h"
-#include "utils/guc.h"
-#include "utils/memutils.h"
-#include "utils/resowner.h"
-
-static WalRcvData *WalRcv = NULL;
-
-/* streamConn is a PGconn object of a connection to walsender from walreceiver */
-static PGconn *streamConn;
-
-/* Path for the connection information file (relative to $PGDATA) */
-#define CONNINFO_FILENAME    "global/conninfo"
-
-/*
- * These variables are used similarly to openLogFile/Id/Seg/Off,
- * but for walreceiver to write the XLOG.
- */
-static int    recvFile = -1;
-static uint32 recvId = 0;
-static uint32 recvSeg = 0;
-static uint32 recvOff = 0;
-
-/*
- * ZeroedRecPtr indicates the byte position that we have already zeroed. It is
- * updated when walreceiver writes a half-filled page that needs to be zeroed.
- * ZeroedBuffer points a zeroed buffer used for zeroing.
- */
-static XLogRecPtr    ZeroedRecPtr = {0, 0};
-static char           *ZeroedBuffer;
-
-/* Recovery has been already triggered? */
-static bool foundTrigger = false;
-
-/*
- * Max number of times to retry walreceiver
- *
- * XXX: Should this number be user-configurable?
- */
-#define MAX_WALRCV_RETRIES 0
-
-/*
- * Advances when startup process retries to request walreceiver.
- * When walreceiver is not in progress, if this counter is smaller
- * than MAX_WALRCV_RETRIES, we retry to start walreceiver.
- */
-static int NumWalRcvRetries = 0;
-
-/* Flags set by interrupt handlers of walreceiver for later service in the main loop */
-static volatile sig_atomic_t got_SIGHUP = false;
-static volatile sig_atomic_t shutdown_requested = false;
-
-/* Signal handlers */
-static void WalRcvSigHupHandler(SIGNAL_ARGS);
-static void WalRcvShutdownHandler(SIGNAL_ARGS);
-static void WalRcvQuickDieHandler(SIGNAL_ARGS);
-
-/* Prototypes for private functions */
-static void WalRcvLoop(void);
-static void    InitWalRcv(void);
-static void    WalRcvKill(int code, Datum arg);
-static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
-static void XLogWalRcvFlush(XLogRecPtr recptr);
-static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
-static char *read_conninfo_file(void);
-
-/* Main entry point for walreceiver process */
-void
-WalReceiverMain(void)
-{
-    MemoryContext walrcv_context;
-    char   *conninfo;
-
-    /* Mark walreceiver in progress */
-    InitWalRcv();
-
-    /*
-     * If possible, make this process a group leader, so that the postmaster
-     * can signal any child processes too.    (walreceiver probably never has
-     * any child processes, but for consistency we make all postmaster child
-     * processes do this.)
-     */
-#ifdef HAVE_SETSID
-    if (setsid() < 0)
-        elog(FATAL, "setsid() failed: %m");
-#endif
-
-    /* Properly accept or ignore signals the postmaster might send us */
-    pqsignal(SIGHUP, WalRcvSigHupHandler);        /* set flag to read config file */
-    pqsignal(SIGINT, SIG_IGN);
-    pqsignal(SIGTERM, WalRcvShutdownHandler);    /* request shutdown */
-    pqsignal(SIGQUIT, WalRcvQuickDieHandler);    /* hard crash time */
-    pqsignal(SIGALRM, SIG_IGN);
-    pqsignal(SIGPIPE, SIG_IGN);
-    pqsignal(SIGUSR1, SIG_IGN);
-    pqsignal(SIGUSR2, SIG_IGN);
-
-    /* Reset some signals that are accepted by postmaster but not here */
-    pqsignal(SIGCHLD, SIG_DFL);
-    pqsignal(SIGTTIN, SIG_DFL);
-    pqsignal(SIGTTOU, SIG_DFL);
-    pqsignal(SIGCONT, SIG_DFL);
-    pqsignal(SIGWINCH, SIG_DFL);
-
-    /* We allow SIGQUIT (quickdie) at all times */
-#ifdef HAVE_SIGPROCMASK
-    sigdelset(&BlockSig, SIGQUIT);
-#else
-    BlockSig &= ~(sigmask(SIGQUIT));
-#endif
-
-    /*
-     * Create a resource owner to keep track of our resources (not clear that
-     * we need this, but may as well have one).
-     */
-    CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
-
-    /*
-     * Create a memory context that we will do all our work in.  We do this so
-     * that we can reset the context during error recovery and thereby avoid
-     * possible memory leaks.  Formerly this code just ran in
-     * TopMemoryContext, but resetting that would be a really bad idea.
-     */
-    walrcv_context = AllocSetContextCreate(TopMemoryContext,
-                                              "Wal Receiver",
-                                              ALLOCSET_DEFAULT_MINSIZE,
-                                              ALLOCSET_DEFAULT_INITSIZE,
-                                              ALLOCSET_DEFAULT_MAXSIZE);
-    MemoryContextSwitchTo(walrcv_context);
-
-    /* Unblock signals (they were blocked when the postmaster forked us) */
-    PG_SETMASK(&UnBlockSig);
-
-    /* Get the starting XLOG location of XLOG streaming */
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        SpinLockAcquire(&walrcv->mutex);
-        LogstreamResult = walrcv->LogstreamResult;
-        SpinLockRelease(&walrcv->mutex);
-
-        /* Report XLOG streaming progress in PS display */
-        ReportLogstreamResult();
-    }
-
-    /* Read the connection information used to connect with the primary */
-    conninfo = read_conninfo_file();
-
-    /* Set up a connection for XLOG streaming */
-    streamConn = PQstartXLogStreaming(conninfo,
-                                      LogstreamResult.Write.xlogid,
-                                      LogstreamResult.Write.xrecoff);
-    if (PQstatus(streamConn) != CONNECTION_OK)
-        ereport(FATAL,
-                (errmsg("could not connect to the primary server : %s",
-                        PQerrorMessage(streamConn))));
-    pfree(conninfo);
-
-    /*
-     * Confirm that the current timeline of the primary is the same
-     * as the recovery target timeline.
-     */
-    ThisTimeLineID = PQtimeline(streamConn);
-    if (ThisTimeLineID != WalRcv->RecoveryTargetTLI)
-        ereport(FATAL,
-                (errmsg("timeline %u of the primary does not match "
-                        "recovery target timeline %u",
-                        ThisTimeLineID, WalRcv->RecoveryTargetTLI)));
-
-    ZeroedBuffer = (char *) palloc0(XLOG_BLCKSZ);
-
-    /* Main loop of walreceiver */
-    WalRcvLoop();
-}
-
-/* Main loop of walreceiver process */
-static void
-WalRcvLoop(void)
-{
-    char       *buf;
-    bool        finishing_seg;
-    bool        fsync_requested;
-    int            len;
-    XLogRecPtr    recptr;
-
-    /* Loop until end-of-streaming or error */
-    for (;;)
-    {
-        bool    fsynced = false;
-
-        /*
-         * Emergency bailout if postmaster has died.  This is to avoid the
-         * necessity for manual cleanup of all postmaster children.
-         */
-        if (!PostmasterIsAlive(true))
-            exit(1);
-
-        /*
-         * Exit walreceiver if we're not in recovery. This should not happen,
-         * but cross-check the status here.
-         */
-        if (!RecoveryInProgress())
-            ereport(FATAL,
-                    (errmsg("cannot continue XLOG streaming, recovery has already ended")));
-
-        /* Process any requests or signals received recently */
-        if (got_SIGHUP)
-        {
-            got_SIGHUP = false;
-            ProcessConfigFile(PGC_SIGHUP);
-        }
-
-        /* Normal exit from the walreceiver is here */
-        if (shutdown_requested)
-            proc_exit(0);
-
-        /* Receive XLogData message (wait for new message to arrive) */
-        len = PQgetXLogData(streamConn, &buf,
-                            (int *) &recptr.xlogid, (int *) &recptr.xrecoff,
-                            (char *) &finishing_seg, (char *) &fsync_requested, 0);
-
-        if (len < 0)    /* end-of-streaming or error */
-            break;
-
-        if (buf == NULL)    /* should not happen */
-            continue;
-
-#ifdef REPLICATION_DEBUG
-        if (REPLICATION_DEBUG_ENABLED)
-            elog(LOG, "xlog recv result %X/%X:%s%s; write %X/%X; flush %X/%X",
-                 recptr.xlogid, recptr.xrecoff,
-                 finishing_seg ? " finishing_seg" : "",
-                 fsync_requested ? " fsync_requested" : "",
-                 LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff,
-                 LogstreamResult.Flush.xlogid, LogstreamResult.Flush.xrecoff);
-#endif
-
-        /*
-         * A level of synchronization between both servers depends on when
-         * the standby returns a "success" of XLOG streaming to the primary.
-         * For example, the following timings can be considered:
-         *
-         *     A "success" is returned after
-         *         #1 receiving the logs and locating them on a memory
-         *         #2 writing them to the disk
-         *         #3 fsyncing them to the disk
-         *         #4 replaying them
-         *         ...etc
-         *
-         * We can choose only #2 now.
-         *
-         * Note: In #1 and #2, the logs might disappear if the standby fails
-         * before writing them to certainly the disk sector. But, since such
-         * missing logs are guaranteed to exist in the primary side,
-         * the transaction is not lost in the whole system (i.e., the standby
-         * can recover all transactions from the primary).
-         */
-
-        XLogWalRcvWrite(buf, len, recptr, &fsynced);
-
-        /*
-         * The logs in the XLogData message were written successfully,
-         * so we mark the message already consumed.
-         */
-        PQmarkConsumed(streamConn);
-
-        /*
-         * If fsync is not requested or was already done, we send a "success"
-         * to the primary before issuing fsync for end-of-segment.
-         */
-        if (fsynced || !fsync_requested)
-        {
-            if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
-                                (int) fsynced) == -1)
-                ereport(FATAL,
-                        (errmsg("could not send a message to the primary: %s",
-                                PQerrorMessage(streamConn))));
-        }
-
-        /*
-         * If we just wrote the whole last page of a logfile segment but
-         * had not fsynced it yet, fsync the segment immediately.  This
-         * avoids having to go back and re-open prior segments when an
-         * fsync request comes along later.
-         *
-         * Of course, if asked to fsync but not, do so.
-         */
-        if (!fsynced && (fsync_requested || finishing_seg))
-        {
-            XLogWalRcvFlush(recptr);
-
-            if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
-                                1) == -1)
-                ereport(FATAL,
-                        (errmsg("could not send a message to the primary: %s",
-                                PQerrorMessage(streamConn))));
-
-            /*
-             * If the segment is ready to copy to archival storage,
-             * notify the archiver so.
-             */
-            if (finishing_seg && XLogArchivingActive())
-                XLogArchiveNotifySeg(recvId, recvSeg);
-
-            /*
-             * XXX: Should we signal bgwriter to start a restartpoint
-             * if we've consumed too much xlog since the last one, like
-             * in normal processing? But this is not worth doing unless
-             * a restartpoint can be created independently from a
-             * checkpoint record.
-             */
-        }
-    }
-
-    if (len == -1)    /* end-of-streaming */
-    {
-        PGresult *res;
-
-        res = PQgetResult(streamConn);
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            PQclear(res);
-            proc_exit(0);
-        }
-        PQclear(res);
-    }
-
-    /* error */
-    ereport(FATAL,
-            (errmsg("could not read xlog records: %s",
-                    PQerrorMessage(streamConn))));
-}
-
-/* Mark this walreceiver in progress */
-static void
-InitWalRcv(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    /*
-     * WalRcv should be set up already (if we are a backend, we inherit
-     * this by fork() or EXEC_BACKEND mechanism from the postmaster).
-     */
-    if (walrcv == NULL)
-        elog(PANIC, "walreceiver control data uninitialized");
-
-    /* Make sure WalRcv is not in use */
-    if (walrcv->pid != 0)
-        elog(FATAL, "WalRcv is in use");
-
-    /* Arrange to clean up at walreceiver exit */
-    on_shmem_exit(WalRcvKill, 0);
-
-    /* Mark walreceiver in progress */
-    walrcv->pid = MyProcPid;
-}
-
-/*
- * Close a connection for XLOG streaming and mark this walreceiver
- * no longer in progress
- */
-static void
-WalRcvKill(int code, Datum arg)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    PQfinish(streamConn);
-    walrcv->pid = 0;
-    walrcv->in_progress = false;
-}
-
-/* SIGHUP: set flag to re-read config file at next convenient time */
-static void
-WalRcvSigHupHandler(SIGNAL_ARGS)
-{
-    got_SIGHUP = true;
-}
-
-/* SIGTERM: set flag to exit normally */
-static void
-WalRcvShutdownHandler(SIGNAL_ARGS)
-{
-    if (CritSectionCount == 0)
-        proc_exit(0);
-
-    /* Delay shutdown if we are inside a critical section */
-    shutdown_requested = true;
-}
-
-/*
- * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
- *
- * Some backend has bought the farm,
- * so we need to stop what we're doing and exit.
- */
-static void
-WalRcvQuickDieHandler(SIGNAL_ARGS)
-{
-    PG_SETMASK(&BlockSig);
-
-    /*
-     * We DO NOT want to run proc_exit() callbacks -- we're here because
-     * shared memory may be corrupted, so we don't want to try to clean up our
-     * transaction.  Just nail the windows shut and get out of town.  Now that
-     * there's an atexit callback to prevent third-party code from breaking
-     * things by calling exit() directly, we have to reset the callbacks
-     * explicitly to make this work as intended.
-     */
-    on_exit_reset();
-
-    /*
-     * Note we do exit(2) not exit(0).    This is to force the postmaster into a
-     * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
-     * backend.  This is necessary precisely because we don't clean up our
-     * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
-     * should ensure the postmaster sees this as a crash, too, but no harm
-     * in being doubly sure.)
-     */
-    exit(2);
-}
-
-/* Report shared-memory space needed by WalRcvShmemInit */
-Size
-WalRcvShmemSize(void)
-{
-    Size size = 0;
-
-    size = add_size(size, sizeof(WalRcvData));
-
-    return size;
-}
-
-/* Allocate and initialize walreceiver-related shared memory */
-void
-WalRcvShmemInit(void)
-{
-    bool    found;
-
-    WalRcv = (WalRcvData *)
-        ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
-
-    if (WalRcv == NULL)
-        ereport(FATAL,
-                (errcode(ERRCODE_OUT_OF_MEMORY),
-                 errmsg("not enough shared memory for walreceiver")));
-    if (found)
-        return;                    /* already initialized */
-
-    /* Initialize the data structures */
-    MemSet(WalRcv, 0, WalRcvShmemSize());
-    SpinLockInit(&WalRcv->mutex);
-}
-
-/* Is walreceiver in progress (or just starting up)? */
-bool
-WalRcvInProgress(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    return walrcv->in_progress;
-}
-
-/*
- * Write the log to disk.
- *
- * fsynced is set to true if the log was fsyned by O_DIRECT.
- */
-static void
-XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
-{
-    int        startoff;
-    int        endoff;
-
-    START_CRIT_SECTION();
-
-    if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
-    {
-        bool    use_existent;
-
-        /*
-         * XLOG segment files will be re-read in recovery operation soon,
-         * so we don't need to advise the OS to release any cache page.
-         */
-        if (recvFile >= 0 && close(recvFile))
-            ereport(PANIC,
-                    (errcode_for_file_access(),
-                     errmsg("could not close log file %u, segment %u: %m",
-                            recvId, recvSeg)));
-        recvFile = -1;
-
-        /* Create/use new log file */
-        XLByteToPrevSeg(recptr, recvId, recvSeg);
-        use_existent = true;
-        recvFile = XLogFileInit(recvId, recvSeg,
-                                  &use_existent, true);
-        recvOff = 0;
-    }
-
-    /* Make sure we have the current logfile open */
-    if (recvFile < 0)
-    {
-        XLByteToPrevSeg(recptr, recvId, recvSeg);
-        recvFile = XLogFileOpen(recvId, recvSeg);
-        recvOff = 0;
-    }
-
-    /* Calculate the start/end file offset of the received logs */
-    endoff = recptr.xrecoff % XLogSegSize;
-    startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;
-
-    /*
-     * Re-zero the page so that bytes beyond what we've written will look
-     * like zeroes and not valid XLOG records. Only end page which we are
-     * writing need to be zeroed. Of course, we can skip zeroing the pages
-     * full of the XLOG records. Save the end position of the already zeroed
-     * area at the variable ZeroedRecPtr, and avoid zeroing the same page
-     * two or more times.
-     *
-     * This must precede the writing of the actual logs. Otherwise, a crash
-     * before re-zeroing would cause a corrupted page.
-     */
-    if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
-    {
-        int        zlen;
-
-        zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
-        WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
-        ZeroedRecPtr = recptr;
-        ZeroedRecPtr.xrecoff += zlen;
-    }
-
-    /* Write out the logs */
-    WritePhysicalXLog(buf, len, startoff);
-    LogstreamResult.Send    = recptr;
-    LogstreamResult.Write    = recptr;
-
-    if (sync_method == SYNC_METHOD_OPEN ||
-        sync_method == SYNC_METHOD_OPEN_DSYNC)
-    {
-        LogstreamResult.Flush = recptr;
-        *fsynced = true;        /* logs were already fsynced */
-    }
-
-    /* Update shared-memory status */
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        SpinLockAcquire(&walrcv->mutex);
-        XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
-        XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
-        if (*fsynced)
-            XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
-        SpinLockRelease(&walrcv->mutex);
-    }
-
-    /* Report XLOG streaming progress in PS display */
-    ReportLogstreamResult();
-
-    END_CRIT_SECTION();
-}
-
-/* Flush the log to disk */
-static void
-XLogWalRcvFlush(XLogRecPtr recptr)
-{
-    START_CRIT_SECTION();
-
-    issue_xlog_fsync(recvFile, recvId, recvSeg);
-
-    LogstreamResult.Flush = recptr;
-
-    /* Update shared-memory status */
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        SpinLockAcquire(&walrcv->mutex);
-        XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
-        SpinLockRelease(&walrcv->mutex);
-    }
-
-    END_CRIT_SECTION();
-}
-
-/* Physical write to the given logs */
-static void
-WritePhysicalXLog(char *from, Size nbytes, int startoff)
-{
-    /* Need to seek in the file? */
-    if (recvOff != startoff)
-    {
-        if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
-            ereport(PANIC,
-                    (errcode_for_file_access(),
-                     errmsg("could not seek in log file %u, "
-                            "segment %u to offset %u: %m",
-                            recvId, recvSeg, startoff)));
-        recvOff = startoff;
-    }
-
-    /* OK to write the logs */
-    errno = 0;
-    if (write(recvFile, from, nbytes) != nbytes)
-    {
-        /* if write didn't set errno, assume no disk space */
-        if (errno == 0)
-            errno = ENOSPC;
-        ereport(PANIC,
-                (errcode_for_file_access(),
-                 errmsg("could not write to log file %u, segment %u "
-                        "at offset %u, length %lu: %m",
-                        recvId, recvSeg,
-                        recvOff, (unsigned long) nbytes)));
-    }
-
-    /* Update state for write */
-    recvOff += nbytes;
-}
-
-/*
- * Wait for the XLOG records at given position available.
- *
- * The XLOG records already written by walreceiver are regarded as
- * available.
- *
- * recptr: indicates the byte position which caller wants to read the
- * XLOG record up to.
- *
- * Called by the startup process in streaming recovery.
- */
-void
-WaitNextXLogAvailable(XLogRecPtr recptr)
-{
-    struct stat stat_buf;
-    bool signaled = false;
-
-#ifdef REPLICATION_DEBUG
-    if (REPLICATION_DEBUG_ENABLED)
-        elog(LOG, "xlog wait request %X/%X; write %X/%X",
-             recptr.xlogid, recptr.xrecoff,
-             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
-#endif
-
-    /* Quick exit if already known available */
-    if (XLByteLT(recptr, LogstreamResult.Write))
-        return;
-
-    for (;;)
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalRcvData *walrcv = WalRcv;
-
-        /* Update local status */
-        SpinLockAcquire(&walrcv->mutex);
-        LogstreamResult = walrcv->LogstreamResult;
-        SpinLockRelease(&walrcv->mutex);
-
-        /* If available already, leave here */
-        if (XLByteLT(recptr, LogstreamResult.Write))
-            return;
-
-        /* Check to see if the trigger file exists */
-        if (TriggerFile != NULL && !foundTrigger &&
-            stat(TriggerFile, &stat_buf) == 0)
-        {
-            ereport(LOG,
-                    (errmsg("trigger file found: %s", TriggerFile)));
-            foundTrigger = true;
-            unlink(TriggerFile);
-        }
-
-        /*
-         * The presence of a trigger file shuts down walreceiver if it's
-         * in progress.
-         */
-        if (WalRcvInProgress())
-        {
-            pid_t    pid = walrcv->pid;
-
-            if (foundTrigger && !signaled && pid != 0)
-            {
-                kill(pid, SIGTERM);
-                signaled = true;    /* prevents signal from being repeated */
-            }
-        }
-        /*
-         * If walreceiver is not in progress and has been retried more than
-         * MAX_WALRCV_RETRIES times, give up on the wait for the next record,
-         * which would cause a streaming recovery to end. If the former
-         * condition is met and the retry-count has not reached the maximum
-         * number yet, request XLOG streaming again.
-         */
-        else
-        {
-            if (NumWalRcvRetries < MAX_WALRCV_RETRIES && !foundTrigger)
-            {
-                /*
-                 * Since recovery target timeline has already been shared with
-                 * upcoming walreceiver, we pass 0 to RequestXLogStreaming()
-                 * as timeline (i.e., shared timeline variable is not updated).
-                 */
-                RequestXLogStreaming(0, recptr);
-                NumWalRcvRetries++;
-            }
-            else
-                return;
-        }
-
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        pg_usleep(100000L); /* 100ms */
-    }
-}
-
-/* Ensure that walreceiver has already exited */
-void
-ShutdownWalRcv(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-    pid_t    pid = walrcv->pid;
-
-    if (pid != 0)
-        kill(pid, SIGTERM);
-
-    while (WalRcvInProgress())
-    {
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        pg_usleep(100000);        /* 100ms */
-    }
-}
-
-/* Wait until a trigger file is found */
-void
-WaitForTrigger(void)
-{
-    int    seconds_before_warning = 15;
-    int    elapsed    = 0;
-    int count    = 0;
-    struct stat stat_buf;
-
-    /* Quick exit if a trigger file was not specified or was already found */
-    if (TriggerFile == NULL || foundTrigger)
-        return;
-
-    while (stat(TriggerFile, &stat_buf) != 0)
-    {
-        /*
-         * This possibly-long loop needs to handle interrupts of startup
-         * process.
-         */
-        HandleStartupProcInterrupts();
-
-        pg_usleep(100000L);        /* 100ms */
-
-        if (++count >= 10)        /* 1s passed */
-        {
-            count = 0;
-
-            if (++elapsed >= seconds_before_warning)
-            {
-                seconds_before_warning *= 2;     /* This wraps in >10 years... */
-                ereport(WARNING,
-                        (errmsg("still waiting for the trigger file \"%s\" (%d seconds elapsed)",
-                                TriggerFile, elapsed)));
-            }
-        }
-    }
-
-    ereport(LOG,
-            (errmsg("trigger file found: %s", TriggerFile)));
-    unlink(TriggerFile);
-}
-
-/*
- * Request postmaster to start the processes required for XLOG streaming.
- *
- * tli: recovery target timeline. If it's not 0, share it with upcoming
- * walreceiver.
- *
- * recptr: indicates the position where we failed in reading a record.
- */
-void
-RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-
-    /*
-     * Calculate the start position of XLOG streaming. If we need to read
-     * a record in the middle of a segment which doesn't exist in pg_xlog,
-     * the start position has to be the head of the segment which that
-     * record belongs to. Which is necessary for preventing an immature
-     * segment (i.e., there is no record in the first half of a segment)
-     * from being created by XLOG streaming.
-     */
-    if (recptr.xrecoff % XLogSegSize != 0)
-    {
-        char        xlogpath[MAXPGPATH];
-        struct stat    stat_buf;
-        uint32        log;
-        uint32        seg;
-
-        XLByteToSeg(recptr, log, seg);
-        XLogFilePath(xlogpath, tli, log, seg);
-
-        if (stat(xlogpath, &stat_buf) != 0)
-            recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
-    }
-
-    LogstreamResult.Send    = recptr;
-    LogstreamResult.Write    = recptr;
-    LogstreamResult.Flush    = recptr;
-
-    SpinLockAcquire(&walrcv->mutex);
-    walrcv->LogstreamResult    = LogstreamResult;
-    if (tli != 0)
-        walrcv->RecoveryTargetTLI = tli;
-    walrcv->in_progress = true;        /* Mark that walreceiver is in progress */
-    SpinLockRelease(&walrcv->mutex);
-
-    SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-
-    /* Start archiver to archive xlog segments written by walreceiver */
-    if (XLogArchivingActive())
-        SendPostmasterSignal(PMSIGNAL_START_ARCHIVER);
-}
-
-/*
- * Returns the byte position that walreceiver has written
- */
-XLogRecPtr
-GetWalRcvWriteRecPtr(void)
-{
-    /* use volatile pointer to prevent code rearrangement */
-    volatile WalRcvData *walrcv = WalRcv;
-    XLogRecPtr    recptr;
-
-    SpinLockAcquire(&walrcv->mutex);
-    recptr = walrcv->LogstreamResult.Write;
-    SpinLockRelease(&walrcv->mutex);
-
-    return recptr;
-}
-
-/* Write the connection information to the file */
-void
-write_conninfo_file(char *conninfo)
-{
-    FILE   *fp;
-
-    fp = AllocateFile(CONNINFO_FILENAME, "w");
-    if (!fp)
-    {
-        ereport(FATAL,
-                (errcode_for_file_access(),
-                 errmsg("could not write to file \"%s\": %m",
-                        CONNINFO_FILENAME)));
-    }
-
-    /*
-     * The format is:
-     *
-     *     conninfo string, null terminated
-     *
-     * If a connection information was not supplied (e.g., recovery.conf did not
-     * specify primary_conninfo parameter), an empty string is written, which
-     * means that the default values that are available from the environment etc
-     * are used for connection of XLOG streaming.
-     *
-     * Add 'replication' as the database name to connect to, into the tail of
-     * conninfo. Since libpq prefers a posteriorly-located setting, the database
-     * name specified by an user is always ignored.
-     */
-    if (conninfo != NULL)
-        fprintf(fp, "%s", conninfo);
-    fputs(" dbname=replication", fp);
-    fputc(0, fp);
-
-    if (FreeFile(fp))
-    {
-        ereport(FATAL,
-                (errcode_for_file_access(),
-                 errmsg("could not write to file \"%s\": %m",
-                        CONNINFO_FILENAME)));
-    }
-}
-
-/* Return a malloc'd connection information read from the file */
-static char *
-read_conninfo_file(void)
-{
-    FILE           *fp;
-    StringInfoData    buf;
-    int                ch;
-    char           *conninfo;
-
-    initStringInfo(&buf);
-
-    fp = AllocateFile(CONNINFO_FILENAME, "r");
-    if (!fp)
-    {
-        ereport(FATAL,
-                (errcode_for_file_access(),
-                 errmsg("could not read from file \"%s\": %m",
-                        CONNINFO_FILENAME)));
-    }
-
-    /* Read a string to a null-termination or the end of the file */
-    for (;;)
-    {
-        ch = fgetc(fp);
-        if (ch == 0 || ch == EOF)
-            break;
-
-        appendStringInfoChar(&buf, (char) ch);
-    }
-
-    FreeFile(fp);
-
-    conninfo = pstrdup(buf.data);
-    pfree(buf.data);
-
-    return conninfo;
-}
diff --git a/src/backend/postmaster/walsender.c b/src/backend/postmaster/walsender.c
index 28566de..2c46511 100644
--- a/src/backend/postmaster/walsender.c
+++ b/src/backend/postmaster/walsender.c
@@ -50,8 +50,13 @@
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/ps_status.h"
 #include "utils/resowner.h"

+/* Private, possibly out-of-date copy of shared LogstreamResult */
+extern XLogstreamResult LogstreamResult;
+
+
 WalSndCtlData *WalSndCtl = NULL;
 static WalSnd *MyWalSnd = NULL;

@@ -481,8 +486,9 @@ XLogSend(PendingMessage inMsg, PendingMessage outMsg)
     XLogRecPtr    SendRqstPtr;

     /*
-     * Invalid position means that XLOG streaming is not started yet,
-     * so we do nothing here.
+     * Invalid position means that we have not yet received the initial
+     * XLogRecPtr message from the slave that indicates where to start the
+     * streaming.
      */
     if (XLogRecPtrIsInvalid(LogstreamResult.Send))
         return true;
@@ -491,7 +497,7 @@ XLogSend(PendingMessage inMsg, PendingMessage outMsg)
     SendRqstPtr = GetWriteRecPtr();

 #ifdef REPLICATION_DEBUG
-    if (REPLICATION_DEBUG_ENABLED)
+    if (REPLICATION_DEBUG_ENABLED && XLByteLT(LogstreamResult.Send, SendRqstPtr))
         elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X",
              SendRqstPtr.xlogid, SendRqstPtr.xrecoff,
              LogstreamResult.Send.xlogid, LogstreamResult.Send.xrecoff,
@@ -911,3 +917,16 @@ UpdateOldestLogstreamResult(void)
     LogstreamResult = oldest;
     return found;
 }
+
+
+/* Report XLOG streaming progress in PS display */
+void
+ReportLogstreamResult(void)
+{
+    char    activitymsg[50];
+
+    snprintf(activitymsg, sizeof(activitymsg),
+             "streaming %X/%X",
+             LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+    set_ps_display(activitymsg, false);
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index b5f7260..ff3e659 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,7 +25,6 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
-#include "postmaster/walreceiver.h"
 #include "postmaster/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
@@ -119,7 +118,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         size = add_size(size, BgWriterShmemSize());
         size = add_size(size, AutoVacuumShmemSize());
         size = add_size(size, WalSndShmemSize());
-        size = add_size(size, WalRcvShmemSize());
         size = add_size(size, BTreeShmemSize());
         size = add_size(size, SyncScanShmemSize());
 #ifdef EXEC_BACKEND
@@ -218,7 +216,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
     BgWriterShmemInit();
     AutoVacuumShmemInit();
     WalSndShmemInit();
-    WalRcvShmemInit();

     /*
      * Set up other modules that need some shared memory space
diff --git a/src/bin/walreceiver/Makefile b/src/bin/walreceiver/Makefile
new file mode 100644
index 0000000..28932fb
--- /dev/null
+++ b/src/bin/walreceiver/Makefile
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/walreceiver
+#
+# Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# $PostgreSQL$
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "PostgreSQL WAL receiver utility"
+subdir = src/bin/walreceiver
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS = walreceiver.o $(WIN32RES)
+
+all: submake-libpq walreceiver
+
+%: %.o $(WIN32RES)
+    $(CC) $(CFLAGS) $^ $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X)
+
+walreceiver: $(OBJS)
+
+install: all installdirs
+    $(INSTALL_PROGRAM) walreceiver$(X)   '$(DESTDIR)$(bindir)'/walreceiver$(X)
+
+installdirs:
+    $(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+    rm -f $(addprefix '$(DESTDIR)$(bindir)'/, $(addsuffix $(X), $(PROGRAMS)))
+
+clean distclean maintainer-clean:
+    rm -f $(addsuffix $(X), $(PROGRAMS)) $(addsuffix .o, $(PROGRAMS))
diff --git a/src/bin/walreceiver/walreceiver.c b/src/bin/walreceiver/walreceiver.c
new file mode 100644
index 0000000..01e6f07
--- /dev/null
+++ b/src/bin/walreceiver/walreceiver.c
@@ -0,0 +1,505 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiver.c
+ *
+ * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
+ * takes charge of XLOG streaming receiver in the standby server. It is
+ * launched by the startup process, and connects to the primary server,
+ * It attempts to keep receiving XLOG records from the primary
+ * server and writing them to the disk, as long as the connection is alive
+ * Also, it notifies the startup
+ * process of the location of XLOG records available. This enables
+ * the startup process to read XLOG records from XLOG stream and apply them
+ * to make a replica of the primary database.
+ *
+ * Normal termination is by SIGTERM or an end-of-streaming message from the
+ * primary server, which instructs the walreceiver to exit(0). Emergency
+ * termination is by SIGQUIT; like backends, walreceiver will simply
+ * abort and exit on SIGQUIT. A close of the connection and a FATAL error
+ * are treated as not a crash but approximately normal termination.
+ *
+ * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *      $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/pmsignal.h"
+#include "storage/shmem.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/* libpq connection to the primary server. */
+static PGconn *streamConn = NULL;
+
+/*
+ * These variables are used similarly to openLogFile/Id/Seg/Off in xlog.c
+ */
+static int    recvFile = -1;
+static uint32 recvId = 0;
+static uint32 recvSeg = 0;
+static uint32 recvOff = 0;
+
+/*
+ * ZeroedRecPtr indicates the byte position that we have already zeroed. It is
+ * updated when walreceiver writes a half-filled page that needs to be zeroed.
+ * ZeroedBuffer points a zeroed buffer used for zeroing.
+ */
+static XLogRecPtr    ZeroedRecPtr = {0, 0};
+static char           *ZeroedBuffer;
+
+/* Signal handlers */
+static void WalRcvQuickDieHandler(SIGNAL_ARGS);
+
+/* Prototypes for private functions */
+static void WalRcvLoop(void);
+static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
+static void XLogWalRcvFlush(XLogRecPtr recptr);
+static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
+static int OpenPhysicalXLog(uint32 log, uint32 seg);
+
+
+static XLogRecPtr writtenPtr;
+static XLogRecPtr flushedPtr;
+
+TimeLineID ThisTimeLineID;
+
+static void
+usage(const char *progname)
+{
+    printf(_("%s is an internal utility to receive WAL from another PostgreSQL instance.\n\n"), progname);
+    printf(_("Usage:\n  %s <target TLI> <starting XLOG location> <connection string>\n"), progname);
+    printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+
+/* Main entry point for walreceiver process */
+int
+main(int argc, char *argv[])
+{
+    char   *conninfo;
+    TimeLineID RecoveryTargetTLI;
+    char   *s;
+
+    if (argc > 3)
+    {
+        if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+        {
+            usage(argv[0]);
+            exit(0);
+        }
+        if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
+        {
+            puts("walreceiver (PostgreSQL) " PG_VERSION);
+            exit(0);
+        }
+    }
+    else
+    {
+        usage(argv[0]);
+        exit(1);
+    }
+
+    /* Properly accept or ignore signals the postmaster might send us */
+    pqsignal(SIGHUP, SIG_IGN);
+    pqsignal(SIGINT, SIG_IGN);
+    pqsignal(SIGTERM, SIG_DFL);
+    pqsignal(SIGQUIT, WalRcvQuickDieHandler);    /* hard crash time */
+    pqsignal(SIGALRM, SIG_IGN);
+    pqsignal(SIGPIPE, SIG_IGN);
+    pqsignal(SIGUSR1, SIG_IGN);
+    pqsignal(SIGUSR2, SIG_IGN);
+
+    /* Reset some signals that are accepted by postmaster but not here */
+    pqsignal(SIGCHLD, SIG_DFL);
+    pqsignal(SIGTTIN, SIG_DFL);
+    pqsignal(SIGTTOU, SIG_DFL);
+    pqsignal(SIGCONT, SIG_DFL);
+    pqsignal(SIGWINCH, SIG_DFL);
+
+    /* We allow SIGQUIT (quickdie) at all times */
+#ifdef BROKEN
+#ifdef HAVE_SIGPROCMASK
+    sigdelset(&BlockSig, SIGQUIT);
+#else
+    BlockSig &= ~(sigmask(SIGQUIT));
+#endif
+
+    /* Unblock signals (they were blocked when the postmaster forked us) */
+    PG_SETMASK(&UnBlockSig);
+#endif
+
+    /* Get the starting XLOG location from command line */
+    RecoveryTargetTLI = strtoul(argv[1], &s, 10);
+    if (*s != '\0')
+    {
+        fprintf(stderr, "invalid TLI: %s\n", argv[1]);
+        exit(1);
+    }
+    if (sscanf(argv[2], "%X/%X", &writtenPtr.xlogid, &writtenPtr.xrecoff) != 2)
+    {
+        fprintf(stderr, "invalid recptr: %s\n", argv[2]);
+        exit(1);
+    }
+
+    /* Read the connection information used to connect with the primary */
+    conninfo = malloc(strlen(argv[3]) + strlen(" dbname=replication") + 1);
+    sprintf(conninfo, "%s dbname=replication", argv[3]);
+
+    /* Set up a connection for XLOG streaming */
+    streamConn = PQstartXLogStreaming(conninfo,
+                                      writtenPtr.xlogid,
+                                      writtenPtr.xrecoff);
+    if (PQstatus(streamConn) != CONNECTION_OK)
+    {
+        fprintf(stderr, "could not connect to the primary server: %s\n",
+                PQerrorMessage(streamConn));
+        exit(1);
+    }
+
+    /*
+     * Confirm that the current timeline of the primary is the same
+     * as the recovery target timeline.
+     */
+    ThisTimeLineID = PQtimeline(streamConn);
+    if (ThisTimeLineID != RecoveryTargetTLI)
+    {
+        fprintf(stderr, "timeline %u of the primary does not match recovery target timeline %u",
+                ThisTimeLineID, RecoveryTargetTLI);
+        exit(1);
+    }
+
+    ZeroedBuffer = (char *) malloc(XLOG_BLCKSZ);
+    memset(ZeroedBuffer, 0, XLOG_BLCKSZ);
+
+    /* Main loop of walreceiver */
+    WalRcvLoop();
+
+    return 0;
+}
+
+/* Main loop of walreceiver process */
+static void
+WalRcvLoop(void)
+{
+    char       *buf;
+    bool        finishing_seg;
+    bool        fsync_requested;
+    int            len;
+    XLogRecPtr    recptr;
+
+    /* Loop until end-of-streaming or error */
+    for (;;)
+    {
+        bool    fsynced = false;
+
+#ifdef NOT_USED
+        /*
+         * Emergency bailout if postmaster has died.  This is to avoid the
+         * necessity for manual cleanup of all postmaster children.
+         */
+        if (!PostmasterIsAlive(true))
+            exit(1);
+
+        /*
+         * Exit walreceiver if we're not in recovery. This should not happen,
+         * but cross-check the status here.
+         */
+        if (!RecoveryInProgress())
+        {
+            fprintf(stderr, "cannot continue XLOG streaming, recovery has already ended\n");
+            exit(1);
+        }
+#endif
+
+        /* Receive XLogData message (wait for new message to arrive) */
+        len = PQgetXLogData(streamConn, &buf,
+                            (int *) &recptr.xlogid, (int *) &recptr.xrecoff,
+                            (char *) &finishing_seg, (char *) &fsync_requested, 0);
+
+        if (len < 0)    /* end-of-streaming or error */
+            break;
+
+        if (buf == NULL)    /* should not happen */
+            continue;
+
+#ifdef REPLICATION_DEBUG
+        fprintf(stderr, "xlog recv result %X/%X:%s%s; write %X/%X; flush %X/%X\n",
+                recptr.xlogid, recptr.xrecoff,
+                finishing_seg ? " finishing_seg" : "",
+                fsync_requested ? " fsync_requested" : "",
+                writtenPtr.xlogid, writtenPtr.xrecoff,
+                flushedPtr.xlogid, flushedPtr.xrecoff);
+#endif
+
+        /*
+         * A level of synchronization between both servers depends on when
+         * the standby returns a "success" of XLOG streaming to the primary.
+         * For example, the following timings can be considered:
+         *
+         *     A "success" is returned after
+         *         #1 receiving the logs and locating them on a memory
+         *         #2 writing them to the disk
+         *         #3 fsyncing them to the disk
+         *         #4 replaying them
+         *         ...etc
+         *
+         * We can choose only #2 now.
+         *
+         * Note: In #1 and #2, the logs might disappear if the standby fails
+         * before writing them to certainly the disk sector. But, since such
+         * missing logs are guaranteed to exist in the primary side,
+         * the transaction is not lost in the whole system (i.e., the standby
+         * can recover all transactions from the primary).
+         */
+
+        XLogWalRcvWrite(buf, len, recptr, &fsynced);
+
+        /*
+         * The logs in the XLogData message were written successfully,
+         * so we mark the message already consumed.
+         */
+        PQmarkConsumed(streamConn);
+
+        /*
+         * If we just wrote the whole last page of a logfile segment but
+         * had not fsynced it yet, fsync the segment immediately.  This
+         * avoids having to go back and re-open prior segments when an
+         * fsync request comes along later.
+         *
+         * Of course, if asked to fsync but not, do so.
+         */
+        if (!fsynced && (fsync_requested || finishing_seg))
+        {
+            XLogWalRcvFlush(recptr);
+
+            /*
+             * XXX: Should we signal bgwriter to start a restartpoint
+             * if we've consumed too much xlog since the last one, like
+             * in normal processing? But this is not worth doing unless
+             * a restartpoint can be created independently from a
+             * checkpoint record.
+             *
+             * Heikki:
+             * No. The startup process is responsible for that when it
+             * replays the WAL. We're just storing the WAL to disk, the
+             * checkpoint won't do anything before it's been replayed as well.
+             */
+        }
+        /*
+         * If fsync is not requested or was already done, we send a "success"
+         * to the primary before issuing fsync for end-of-segment.
+         */
+        if (finishing_seg || (fsynced && fsync_requested))
+        {
+            if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
+                                (int) fsynced) == -1)
+            {
+                fprintf(stderr, "could not send a message to the primary: %s\n",
+                        PQerrorMessage(streamConn));
+                exit(1);
+            }
+        }
+    }
+
+    if (len == -1)    /* end-of-streaming */
+    {
+        PGresult *res;
+
+        res = PQgetResult(streamConn);
+        if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        {
+            PQclear(res);
+            exit(0);
+        }
+        PQclear(res);
+    }
+
+    /* error */
+    fprintf(stderr, "could not read xlog records: %s",
+            PQerrorMessage(streamConn));
+    exit(1);
+}
+
+/*
+ * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm,
+ * so we need to stop what we're doing and exit.
+ */
+static void
+WalRcvQuickDieHandler(SIGNAL_ARGS)
+{
+#ifdef BROKEN
+    PG_SETMASK(&BlockSig);
+#endif
+
+    exit(2);
+}
+
+/*
+ * Write the log to disk.
+ *
+ * fsynced is set to true if the log was fsyned by O_DIRECT.
+ */
+static void
+XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
+{
+    int        startoff;
+    int        endoff;
+
+    if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
+    {
+        /*
+         * XLOG segment files will be re-read in recovery operation soon,
+         * so we don't need to advise the OS to release any cache page.
+         */
+        if (recvFile >= 0 && close(recvFile))
+        {
+            fprintf(stderr, "could not close log file %u, segment %u: %m",
+                    recvId, recvSeg);
+            exit(3);
+        }
+        recvFile = -1;
+
+        /* Create/use new log file */
+        XLByteToPrevSeg(recptr, recvId, recvSeg);
+        recvFile = OpenPhysicalXLog(recvId, recvSeg);
+        recvOff = 0;
+    }
+
+    /* Make sure we have the current logfile open */
+    if (recvFile < 0)
+    {
+        XLByteToPrevSeg(recptr, recvId, recvSeg);
+        recvFile = OpenPhysicalXLog(recvId, recvSeg);
+        recvOff = 0;
+    }
+
+    /* Calculate the start/end file offset of the received logs */
+    endoff = recptr.xrecoff % XLogSegSize;
+    startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;
+
+    /*
+     * Re-zero the page so that bytes beyond what we've written will look
+     * like zeroes and not valid XLOG records. Only end of the page which we
+     * wrote to need to be zeroed. Of course, we can skip zeroing the pages
+     * full of the XLOG records. Save the end position of the already zeroed
+     * area at the variable ZeroedRecPtr, and avoid zeroing the same page
+     * two or more times.
+     *
+     * This must precede the writing of the actual logs. Otherwise, a crash
+     * before re-zeroing would cause a corrupted page.
+     */
+    if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
+    {
+        int        zlen;
+
+        zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
+        WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
+        ZeroedRecPtr = recptr;
+        ZeroedRecPtr.xrecoff += zlen;
+    }
+
+    /* Write out the logs */
+    WritePhysicalXLog(buf, len, startoff);
+    writtenPtr = recptr;
+
+    /* Let the startup process know how far we've advanced */
+    printf("%X/%X\n", writtenPtr.xlogid, writtenPtr.xrecoff);
+
+    /* Report XLOG streaming progress in PS display */
+    ReportLogstreamResult();
+}
+
+/* Flush the log to disk */
+static void
+XLogWalRcvFlush(XLogRecPtr recptr)
+{
+    fsync(recvFile);
+
+    flushedPtr = recptr;
+}
+
+/* Physical write to the given logs */
+static void
+WritePhysicalXLog(char *from, Size nbytes, int startoff)
+{
+    /* Need to seek in the file? */
+    if (recvOff != startoff)
+    {
+        if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
+        {
+            fprintf(stderr, "could not seek in log file %u, segment %u to offset %u: %s\n",
+                    recvId, recvSeg, startoff, strerror(errno));
+            exit(3);
+        }
+        recvOff = startoff;
+    }
+
+    /* OK to write the logs */
+    errno = 0;
+    if (write(recvFile, from, nbytes) != nbytes)
+    {
+        /* if write didn't set errno, assume no disk space */
+        if (errno == 0)
+            errno = ENOSPC;
+        fprintf(stderr, "could not write to log file %u, segment %u "
+                        "at offset %u, length %lu: %s",
+                recvId, recvSeg,
+                recvOff, (unsigned long) nbytes, strerror(errno));
+        exit(3);
+    }
+
+    /* Update state for write */
+    recvOff += nbytes;
+}
+
+static int
+OpenPhysicalXLog(uint32 log, uint32 seg)
+{
+    char        path[MAXPGPATH];
+    int            fd;
+
+    XLogFilePath(path, ThisTimeLineID, log, seg);
+
+    /*
+     * Try to use existent file (checkpoint maker may have created it already)
+     */
+    fd = open(path, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+    if (fd < 0)
+    {
+        fprintf(stderr, "could not open file \"%s\" (log file %u, segment %u): %s",
+                path, log, seg, strerror(errno));
+        exit(2);
+    }
+    return fd;
+}
+
+/* Report XLOG streaming progress in PS display */
+void
+ReportLogstreamResult(void)
+{
+#ifdef BROKEN
+    char    activitymsg[50];
+
+    snprintf(activitymsg, sizeof(activitymsg),
+             "streaming %X/%X",
+             writtenPtr.xlogid, writtenPtr.xrecoff);
+    set_ps_display(activitymsg, false);
+#endif
+}
+
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0e32f04..8ae62fe 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -191,22 +191,6 @@ typedef struct CheckpointStatsData

 extern CheckpointStatsData CheckpointStats;

-/*
- * LogstreamResult indicates the byte positions that we have already
- * sent/written/fsynced. This is used for management of XLOG streaming.
- */
-typedef struct
-{
-    XLogRecPtr    Send;    /* last byte + 1 sent to the standby */
-    XLogRecPtr    Write;    /* last byte + 1 written out in the standby */
-    XLogRecPtr    Flush;    /* last byte + 1 flushed in the standby */
-} XLogstreamResult;
-
-extern XLogstreamResult LogstreamResult;
-
-extern char *TriggerFile;
-
-
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
 extern void XLogFlush(XLogRecPtr RecPtr);
 extern void XLogBackgroundFlush(void);
diff --git a/src/include/postmaster/walreceiver.h b/src/include/postmaster/walreceiver.h
deleted file mode 100644
index 8e34172..0000000
--- a/src/include/postmaster/walreceiver.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * walreceiver.h
- *      Exports from postmaster/walreceiver.c.
- *
- * Portions Copyright (c) 2009-2009, PostgreSQL Global Development Group
- *
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#ifndef _WALRECEIVER_H
-#define _WALRECEIVER_H
-
-#include "storage/spin.h"
-
-/* Shared memory area for management of walreceiver process */
-typedef struct
-{
-    pid_t    pid;            /* walreceiver's process id, or 0 */
-
-    /*
-     * in_progress indicates whether walreceiver is in progress
-     * (or just starting up). This flag is set to TRUE when
-     * startup process requests walreceiver to start XLOG streaming,
-     * and FALSE when walreceiver exits.
-     */
-    bool    in_progress;
-
-    /*
-     * LogstreamResult indicates the byte positions that have been
-     * already streamed. This is shared by walreceiver and startup
-     * process, and used to advance XLOG streaming and recovery
-     * cooperatively.
-     */
-    XLogstreamResult LogstreamResult;
-
-    /*
-     * recovery target timeline; must be the same as the current
-     * timeline of the primary.
-     */
-    TimeLineID    RecoveryTargetTLI;
-
-    slock_t    mutex;        /* locks shared variables shown above */
-} WalRcvData;
-
-extern void    WalReceiverMain(void);
-extern Size WalRcvShmemSize(void);
-extern void WalRcvShmemInit(void);
-extern bool WalRcvInProgress(void);
-extern void WaitNextXLogAvailable(XLogRecPtr recptr);
-extern void ShutdownWalRcv(void);
-extern void WaitForTrigger(void);
-extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr);
-extern XLogRecPtr GetWalRcvWriteRecPtr(void);
-extern void write_conninfo_file(char *conninfo);
-
-#endif   /* _WALRECEIVER_H */
diff --git a/src/include/postmaster/walsender.h b/src/include/postmaster/walsender.h
index e547cb3..bd669e1 100644
--- a/src/include/postmaster/walsender.h
+++ b/src/include/postmaster/walsender.h
@@ -16,6 +16,17 @@
 #include "storage/spin.h"

 /*
+ * LogstreamResult indicates the byte positions that we have already
+ * sent/written/fsynced. This is used for management of XLOG streaming.
+ */
+typedef struct
+{
+    XLogRecPtr    Send;    /* last byte + 1 sent to the standby */
+    XLogRecPtr    Write;    /* last byte + 1 written out in the standby */
+    XLogRecPtr    Flush;    /* last byte + 1 flushed in the standby */
+} XLogstreamResult;
+
+/*
  * Each walsender has a WalSnd struct in shared memory.
  *
  * links: list link for any list the WalSnd struct is in. A recycled WalSnd

Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Thu, Sep 17, 2009 at 8:32 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Some random comments:

Thanks for the comments.

> I don't think we need the new PM_SHUTDOWN_3 postmaster state. We can
> treat walsenders the same as the archive process, and kill and wait for
> both of them to die in PM_SHUTDOWN_2 state.

OK, I'll use PM_SHUTDOWN_2 for walsender instead of PM_SHUTDOWN_3.

> I think there's something wrong with the napping in walsender. When I
> perform px_xlog_switch(), it takes surprisingly long for it to trickle
> to the standby. When I put a little proxy program in between the master
> and slave that delays all messages from the slave to the master by one
> second, it got worse, even though I would expect the master to still
> keep sending WAL at full speed. I get logs like this:

Probably this is because XLOG records following XLOG_SWITCH are
sent to the standby, too. Though those records are obviously not used
for recovery, they are sent because walsender doesn't know where
XLOG_SWITCH is.

The difficulty is that there might be many XLOG_SWITCHs in the XLOG
files which are going to be sent by walsender. How should walsender
get to know those location? One possible solution is to make walsender
parse the XLOG files and search XLOG_SWITCH. But this is overkill,
I think.

I don't think that XLOG switch is often requested and is sensitive to
response time in many cases. So it's not worth changing walsender
to skip the XLOG following XLOG_SWITCH, I think. Thought?

> 2009-09-17 14:14:09.932 EEST LOG:  xlog send request 0/38000428; send
> 0/38000000; write 0/38000000
> 2009-09-17 14:14:09.932 EEST LOG:  xlog read request 0/38000428; send
> 0/38000428; write 0/38000000
>
> It looks like it's having 100 or 200 ms naps in between. Also, I
> wouldn't expect to see so many "read request" acknowledgments from the
> slave. The master doesn't really need to know how far the slave is,
> except in synchronous replication when it has requested a flush to
> slave. Another reason why master needs to know is so that the master can
> recycle old log files, but for that we'd really only need an
> acknowledgment once per WAL file or even less.

You mean that the new protocol for asking the standby about the completion
location of replication is required? In synchronous case, the backend should
not wait for one acknowledgement per XLOG file, for its performance.

> Why does XLogSend() care about page boundaries? Perhaps it's a leftover
> from the old approach that read from wal_buffers?

That is for not sending a partially-filled XLOG *record*, which simplifies the
logic that startup process waits for the next XLOG record available, i.e.,
startup process doesn't need to take care of a partially-sent record.

> Do we really need the support for asynchronous backend libpq commands?
> Could walsender just keep blasting WAL to the slave, and only try to
> read an acknowledgment after it has requested one, by setting
> XLOGSTREAM_FLUSH flag. Or maybe we should be putting the socket into
> non-blocking mode.

Yes, that is required, especially for synchronous replication. The receiving of
the acknowledgement should not keep the subsequent XLOG-sending waiting.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Heikki Linnakangas wrote:
> Heikki Linnakangas wrote:
>> I'm thinking that walreceiver should be a stand-alone program that the
>> startup process launches, similar to how it invokes restore_command in
>> PITR recovery. Instead of using system(), though, it would use
>> fork+exec, and a pipe to communicate.
> 
> Here's a WIP patch to do that, over your latest posted patch. I've also
> pushed this to my git repository at
> git://git.postgresql.org/git/users/heikki/postgres.git, "replication"
> branch.
> 
> I'll continue reviewing...

BTW, my modified patch doesn't correctly zero-fill new WAL segments.
Needs to be fixed...

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Fri, Sep 18, 2009 at 2:47 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Heikki Linnakangas wrote:
>> I'm thinking that walreceiver should be a stand-alone program that the
>> startup process launches, similar to how it invokes restore_command in
>> PITR recovery. Instead of using system(), though, it would use
>> fork+exec, and a pipe to communicate.
>
> Here's a WIP patch to do that, over your latest posted patch. I've also
> pushed this to my git repository at
> git://git.postgresql.org/git/users/heikki/postgres.git, "replication"
> branch.

In my environment, I cannot use git protocol for some reason.
Could you export your repository so that it can be accessed also via http?
BTW, I seem to be able to access http://git.postgresql.org/git/bucardo.git.
http://www.kernel.org/pub/software/scm/git/docs/user-manual.html#exporting-via-http

How should we advance development of SR?
Should I be concentrated on the primary side, and leave the standby side to you?
When I change something, should I make a patch for the latest SR source in your
git repo, and submit it?

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> Hi,
> 
> On Fri, Sep 18, 2009 at 2:47 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> Heikki Linnakangas wrote:
>>> I'm thinking that walreceiver should be a stand-alone program that the
>>> startup process launches, similar to how it invokes restore_command in
>>> PITR recovery. Instead of using system(), though, it would use
>>> fork+exec, and a pipe to communicate.
>> Here's a WIP patch to do that, over your latest posted patch. I've also
>> pushed this to my git repository at
>> git://git.postgresql.org/git/users/heikki/postgres.git, "replication"
>> branch.
> 
> In my environment, I cannot use git protocol for some reason.
> Could you export your repository so that it can be accessed also via http?

Sure, it should be accessible via HTTP as well:
http://git.postgresql.org/git/users/heikki/postgres.git

> How should we advance development of SR?
> Should I be concentrated on the primary side, and leave the standby side to you?
> When I change something, should I make a patch for the latest SR source in your
> git repo, and submit it?

Hmm, yeah, let's do that.

Right now, I'm trying to understand the page boundary stuff and partial
page handling in ReadRecord and walsender.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Thu, Sep 17, 2009 at 5:08 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> I'm thinking that walreceiver should be a stand-alone program that the
> startup process launches, similar to how it invokes restore_command in
> PITR recovery. Instead of using system(), though, it would use
> fork+exec, and a pipe to communicate.

This approach is OK if the stand-alone walreceiver is treated steadily
by the startup process like a child process under postmaster:

* Handling of some interrupts: SIGHUP, SIGTERM?, SIGINT, SIGQUIT...  For example, the startup process would need to
rethrowwalreceiver  the interrupt from postmaster.
 

* Communication with other child processes: stats collector? syslogger?...  For example, the log message generated by
walreceivershould also  be collected by syslogger if requested.
 

For now, I think that pipe is enough for communication between the
startup process and walreceiver. Though there was the idea to pass
XLOG to the startup process via wal_buffers, in which pipe is not
suitable, I think that is overkill.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Having gone through the patch now in more detail, I think it's in pretty
good shape. I'm happy with the overall design, except that I haven't
been able to make up my mind if walreceiver should indeed be a
stand-alone program as discussed, or a postmaster child process as in
the patch you submitted. Putting that question aside for a moment,
here's some minor things, in no particular order:

- The async API in PQgetXLogData is quite different from the other
commands. It's close to the API from PQgetCopyData(), but doesn't return
a malloc'd buffer like PQgetCopyData does. I presume that's to optimize
away the extra memcpy step? I don't think that's really necessary, I
don't recall any complaints about that in PQgetCopyData(), and if it
does become an issue, it could be optimized away by mallocing the buffer
first and reading directly to that.

- Can we avoid sprinkling XLogStreamingAllowed() calls to places where
we check if WAL-logging is required (nbtsort.c, copy.c etc.). I think we
need a new macro to encapsulate (XLogArchivingActive() ||
XLogStreamingAllowed()).

- Is O_DIRECT ever a good idea in walreceiver? If it's really direct and
doesn't get cached, the startup process will need to read from disk.

- Can we replace read/write_conninfo with just a long-enough field in
shared mem? Would be simpler. (this is moot if we go with the
stand-alone walreceiver program and pass it as a command-line argument)

- walreceiver shouldn't die on connection error, just to be restarted by
startup process. Can we add error handling a la bgwriter and have a
retry loop within walreceiver? (again, if we go with a stand-alone
walreceiver program, it's probably better to have startup process
responsible to restart walreceiver, as it is now)

- pq_wait in backend waits until you can read or write at least 1 byte.
There is no guarantee that you can send or read the whole message
without blocking. We'd have to put the socket in non-blocking mode for
that. I'm not sure what the implications of this are.

- we should include system_identifier somewhere in the replication
startup handshake. Otherwise you can connect to server from a different
system and have logs shipped, if they happen to be roughly at the same
point in WAL. Replay will almost certainly fail, but we should error
earlier.

- I know I said we should have just asynchronous replication at first,
but looking ahead, how would you do synchronous? What kind of signaling
is needed between walreceiver and startup process for that?

- 'replication' shouldn't be a real database.


I found the paging logic in walsender confusing, and didn't like the
idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely
walreceiver knows how to split the WAL into files without such a flag. I
reworked that logic, I think it's easier to understand now. I kept the
support for the flag in libpq and the protocol for now, but it should be
removed too, or repurposed to indicate that pg_switch_xlog() was done in
the master. I've pushed that to 'replication-orig' branch in my git
repository, attached is the same as a diff against your SR_0914.patch.

I need a break from this patch, so I'll take a closer look at Simon's
hot standby now. Meanwhile, can you work on the above items and submit a
new version, please?

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 2,10 ****
  # PostgreSQL recovery config file
  # -------------------------------
  #
! # Edit this file to provide the parameters that PostgreSQL
! # needs to perform an archive recovery of a database, or
! # a log-streaming replication.
  #
  # If "recovery.conf" is present in the PostgreSQL data directory, it is
  # read on postmaster startup.  After successful recovery, it is renamed
--- 2,10 ----
  # PostgreSQL recovery config file
  # -------------------------------
  #
! # Edit this file to provide the parameters that PostgreSQL needs to
! # perform an archive recovery of a database, or to act as a log-streaming
! # replication standby.
  #
  # If "recovery.conf" is present in the PostgreSQL data directory, it is
  # read on postmaster startup.  After successful recovery, it is renamed
***************
*** 83,89 ****
  #---------------------------------------------------------------------------
  #
  # When standby_mode is enabled, the PostgreSQL server will work as
! # the standby. It tries to connect to the primary according to the
  # connection settings primary_conninfo, and receives XLOG records
  # continuously.
  #
--- 83,89 ----
  #---------------------------------------------------------------------------
  #
  # When standby_mode is enabled, the PostgreSQL server will work as
! # a standby. It tries to connect to the primary according to the
  # connection settings primary_conninfo, and receives XLOG records
  # continuously.
  #
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 2645,2653 **** XLogFileClose(void)
       * WAL segment files will not be re-read in normal operation, so we advise
       * the OS to release any cached pages.    But do not do so if WAL archiving
       * or streaming is active, because archiver and walsender process could use
!      * the cache to read the WAL segment, respectively.  Also, don't bother
!      * with it if we are using O_DIRECT, since the kernel is presumably not
!      * caching in that case.
       */
  #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
      if (!XLogArchivingActive() && !WalSndInProgress() &&
--- 2645,2653 ----
       * WAL segment files will not be re-read in normal operation, so we advise
       * the OS to release any cached pages.    But do not do so if WAL archiving
       * or streaming is active, because archiver and walsender process could use
!      * the cache to read the WAL segment.  Also, don't bother with it if we
!      * are using O_DIRECT, since the kernel is presumably not caching in that
!      * case.
       */
  #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
      if (!XLogArchivingActive() && !WalSndInProgress() &&
***************
*** 3481,3487 **** FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
                          startlsn.xlogid, startlsn.xrecoff)));
      }

!     return ReadRecord(RecPtr, emode);
  }

  /*
--- 3481,3487 ----
                          startlsn.xlogid, startlsn.xrecoff)));
      }

!     return ReadRecord(RecPtr, emode);
  }

  /*
***************
*** 5284,5290 **** exitStreamingRecovery(void)
       */
      ShutdownWalRcv();

!     /* We are no longer in streaming recovery state */
      InStreamingRecovery = false;

      ereport(LOG,
--- 5284,5290 ----
       */
      ShutdownWalRcv();

!     /* We are no longer in streaming recovery state */
      InStreamingRecovery = false;

      ereport(LOG,
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 289,295 **** typedef enum
      PM_WAIT_BACKENDS,            /* waiting for live backends to exit */
      PM_SHUTDOWN,                /* waiting for bgwriter to do shutdown ckpt */
      PM_SHUTDOWN_2,                /* waiting for archiver to finish */
-     PM_SHUTDOWN_3,                /* waiting for walsenders to finish */
      PM_WAIT_DEAD_END,            /* waiting for dead_end children to exit */
      PM_NO_CHILDREN                /* all important children have exited */
  } PMState;
--- 289,294 ----
***************
*** 1640,1646 **** retry1:
      if (proto == XLOG_STREAMING_CODE && !am_walsender)
      {
          am_walsender = true;
!         /* No packets other than regular one should not follow */
          return ProcessStartupPacket(port, SSLdone);
      }

--- 1639,1645 ----
      if (proto == XLOG_STREAMING_CODE && !am_walsender)
      {
          am_walsender = true;
!         /* No packets other than regular one should follow */
          return ProcessStartupPacket(port, SSLdone);
      }

***************
*** 2404,2420 **** reaper(SIGNAL_ARGS)
                   */
                  Assert(Shutdown > NoShutdown);

!                 if (PgArchPID != 0)
                  {
                      /* Waken archiver for the last time */
!                     signal_child(PgArchPID, SIGUSR2);
!                     pmState = PM_SHUTDOWN_2;
!                 }
!                 else if (WalSndInProgress())
!                 {
                      /* Waken walsenders for the last time */
                      SignalWalSenders(SIGUSR2);
!                     pmState = PM_SHUTDOWN_3;
                  }
                  else
                      pmState = PM_WAIT_DEAD_END;
--- 2403,2418 ----
                   */
                  Assert(Shutdown > NoShutdown);

!                 if (PgArchPID != 0 || WalSndInProgress())
                  {
                      /* Waken archiver for the last time */
!                     if (PgArchPID != 0)
!                         signal_child(PgArchPID, SIGUSR2);
!
                      /* Waken walsenders for the last time */
                      SignalWalSenders(SIGUSR2);
!
!                     pmState = PM_SHUTDOWN_2;
                  }
                  else
                      pmState = PM_WAIT_DEAD_END;
***************
*** 2499,2510 **** reaper(SIGNAL_ARGS)
                   ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
                    WalRcvInProgress())))
                  PgArchPID = pgarch_start();
!             else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress())
!             {
!                 SignalWalSenders(SIGUSR2);
!                 pmState = PM_SHUTDOWN_3;
!             }
!             else
                  pmState = PM_WAIT_DEAD_END;
              continue;
          }
--- 2497,2503 ----
                   ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) &&
                    WalRcvInProgress())))
                  PgArchPID = pgarch_start();
!             else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress())
                  pmState = PM_WAIT_DEAD_END;
              continue;
          }
***************
*** 2611,2618 **** CleanupBackend(int pid,
                   * advance to the next shutdown step.
                   */
                  if (bp->child_type == BACKEND_TYPE_WALSND &&
!                     pmState == PM_SHUTDOWN_3 &&
!                     !WalSndInProgress())
                      pmState = PM_WAIT_DEAD_END;
              }
              DLRemove(curr);
--- 2604,2611 ----
                   * advance to the next shutdown step.
                   */
                  if (bp->child_type == BACKEND_TYPE_WALSND &&
!                     pmState == PM_SHUTDOWN_2 &&
!                     !WalSndInProgress() && PgArchPID == 0)
                      pmState = PM_WAIT_DEAD_END;
              }
              DLRemove(curr);
*** a/src/backend/postmaster/walreceiver.c
--- b/src/backend/postmaster/walreceiver.c
***************
*** 100,108 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
  static void WalRcvLoop(void);
  static void    InitWalRcv(void);
  static void    WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced);
! static void XLogWalRcvFlush(XLogRecPtr recptr);
! static void WritePhysicalXLog(char *from, Size nbytes, int startoff);
  static char *read_conninfo_file(void);

  /* Main entry point for walreceiver process */
--- 100,107 ----
  static void WalRcvLoop(void);
  static void    InitWalRcv(void);
  static void    WalRcvKill(int code, Datum arg);
! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr);
! static void XLogWalRcvFlush(void);
  static char *read_conninfo_file(void);

  /* Main entry point for walreceiver process */
***************
*** 228,235 **** WalRcvLoop(void)
      /* Loop until end-of-streaming or error */
      for (;;)
      {
-         bool    fsynced = false;
-
          /*
           * Emergency bailout if postmaster has died.  This is to avoid the
           * necessity for manual cleanup of all postmaster children.
--- 227,232 ----
***************
*** 298,304 **** WalRcvLoop(void)
           * can recover all transactions from the primary).
           */

!         XLogWalRcvWrite(buf, len, recptr, &fsynced);

          /*
           * The logs in the XLogData message were written successfully,
--- 295,301 ----
           * can recover all transactions from the primary).
           */

!         XLogWalRcvWrite(buf, len, recptr);

          /*
           * The logs in the XLogData message were written successfully,
***************
*** 307,357 **** WalRcvLoop(void)
          PQmarkConsumed(streamConn);

          /*
!          * If fsync is not requested or was already done, we send a "success"
!          * to the primary before issuing fsync for end-of-segment.
           */
!         if (fsynced || !fsync_requested)
!         {
!             if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
!                                 (int) fsynced) == -1)
!                 ereport(FATAL,
!                         (errmsg("could not send a message to the primary: %s",
!                                 PQerrorMessage(streamConn))));
!         }
!
!         /*
!          * If we just wrote the whole last page of a logfile segment but
!          * had not fsynced it yet, fsync the segment immediately.  This
!          * avoids having to go back and re-open prior segments when an
!          * fsync request comes along later.
!          *
!          * Of course, if asked to fsync but not, do so.
!          */
!         if (!fsynced && (fsync_requested || finishing_seg))
!         {
!             XLogWalRcvFlush(recptr);
!
!             if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff,
!                                 1) == -1)
!                 ereport(FATAL,
!                         (errmsg("could not send a message to the primary: %s",
!                                 PQerrorMessage(streamConn))));
!
!             /*
!              * If the segment is ready to copy to archival storage,
!              * notify the archiver so.
!              */
!             if (finishing_seg && XLogArchivingActive())
!                 XLogArchiveNotifySeg(recvId, recvSeg);
!
!             /*
!              * XXX: Should we signal bgwriter to start a restartpoint
!              * if we've consumed too much xlog since the last one, like
!              * in normal processing? But this is not worth doing unless
!              * a restartpoint can be created independently from a
!              * checkpoint record.
!              */
!         }
      }

      if (len == -1)    /* end-of-streaming */
--- 304,314 ----
          PQmarkConsumed(streamConn);

          /*
!          * If the primary requested us to fsync, do so now and send
!          * and acknowledgement.
           */
!         if (fsync_requested)
!             XLogWalRcvFlush();
      }

      if (len == -1)    /* end-of-streaming */
***************
*** 511,589 **** WalRcvInProgress(void)
   * fsynced is set to true if the log was fsyned by O_DIRECT.
   */
  static void
! XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
  {
      int        startoff;
!     int        endoff;

!     START_CRIT_SECTION();

!     if (!XLByteInPrevSeg(recptr, recvId, recvSeg))
      {
!         bool    use_existent;

!         /*
!          * XLOG segment files will be re-read in recovery operation soon,
!          * so we don't need to advise the OS to release any cache page.
!          */
!         if (recvFile >= 0 && close(recvFile))
              ereport(PANIC,
                      (errcode_for_file_access(),
!                      errmsg("could not close log file %u, segment %u: %m",
!                             recvId, recvSeg)));
!         recvFile = -1;
!
!         /* Create/use new log file */
!         XLByteToPrevSeg(recptr, recvId, recvSeg);
!         use_existent = true;
!         recvFile = XLogFileInit(recvId, recvSeg,
!                                   &use_existent, true);
!         recvOff = 0;
!     }

!     /* Make sure we have the current logfile open */
!     if (recvFile < 0)
!     {
!         XLByteToPrevSeg(recptr, recvId, recvSeg);
!         recvFile = XLogFileOpen(recvId, recvSeg);
!         recvOff = 0;
!     }

!     /* Calculate the start/end file offset of the received logs */
!     endoff = recptr.xrecoff % XLogSegSize;
!     startoff = ((endoff == 0) ? XLogSegSize : endoff) - len;

      /*
!      * Re-zero the page so that bytes beyond what we've written will look
!      * like zeroes and not valid XLOG records. Only end page which we are
!      * writing need to be zeroed. Of course, we can skip zeroing the pages
!      * full of the XLOG records. Save the end position of the already zeroed
!      * area at the variable ZeroedRecPtr, and avoid zeroing the same page
!      * two or more times.
       *
       * This must precede the writing of the actual logs. Otherwise, a crash
!      * before re-zeroing would cause a corrupted page.
       */
!     if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0)
      {
          int        zlen;

!         zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ;
!         WritePhysicalXLog(ZeroedBuffer, zlen, endoff);
          ZeroedRecPtr = recptr;
          ZeroedRecPtr.xrecoff += zlen;
-     }

!     /* Write out the logs */
!     WritePhysicalXLog(buf, len, startoff);
!     LogstreamResult.Send    = recptr;
!     LogstreamResult.Write    = recptr;
!
!     if (sync_method == SYNC_METHOD_OPEN ||
!         sync_method == SYNC_METHOD_OPEN_DSYNC)
!     {
!         LogstreamResult.Flush = recptr;
!         *fsynced = true;        /* logs were already fsynced */
      }

      /* Update shared-memory status */
--- 468,623 ----
   * fsynced is set to true if the log was fsyned by O_DIRECT.
   */
  static void
! XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
  {
      int        startoff;
!     int        byteswritten;

!     START_CRIT_SECTION(); /* XXX: Why? */

!     while (nbytes > 0)
      {
!         int        segbytes;
!         uint32    tmp;

!         if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
!         {
!             bool    use_existent;
!
!             /*
!              * XLOG segment files will be re-read in recovery operation soon,
!              * so we don't need to advise the OS to release any cache page.
!              */
!             if (recvFile >= 0)
!             {
!                 /*
!                  * fsync() before we switch to next file. We would otherwise
!                  * have to reopen this file to fsync it later
!                  */
!                 XLogWalRcvFlush();
!                 if (close(recvFile) != 0)
!                     ereport(PANIC,
!                             (errcode_for_file_access(),
!                              errmsg("could not close log file %u, segment %u: %m",
!                                     recvId, recvSeg)));
!             }
!             recvFile = -1;
!
!             /* Create/use new log file */
!             XLByteToSeg(recptr, recvId, recvSeg);
!             use_existent = true;
!             recvFile = XLogFileInit(recvId, recvSeg,
!                                     &use_existent, true);
!             recvOff = 0;
!         }
!
!         /* Calculate the start offset of the received logs */
!         startoff = recptr.xrecoff % XLogSegSize;
!
!         if (startoff + nbytes > XLOG_SEG_SIZE)
!             segbytes = XLOG_SEG_SIZE - startoff;
!         else
!             segbytes = nbytes;
!
!         /* Need to seek in the file? */
!         if (recvOff != startoff)
!         {
!             if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
!                 ereport(PANIC,
!                         (errcode_for_file_access(),
!                          errmsg("could not seek in log file %u, "
!                                 "segment %u to offset %u: %m",
!                                 recvId, recvSeg, startoff)));
!             recvOff = startoff;
!         }
!
!         /* OK to write the logs */
!         errno = 0;
!
!         byteswritten = write(recvFile, buf, segbytes);
!         if (byteswritten <= 0)
!         {
!             /* if write didn't set errno, assume no disk space */
!             if (errno == 0)
!                 errno = ENOSPC;
              ereport(PANIC,
                      (errcode_for_file_access(),
!                      errmsg("could not write to log file %u, segment %u "
!                             "at offset %u, length %lu: %m",
!                             recvId, recvSeg,
!                             recvOff, (unsigned long) segbytes)));
!         }

!         /* Update state for read */
!         tmp = recptr.xrecoff + byteswritten;
!         if (tmp < recptr.xrecoff)
!             recptr.xlogid++; /* overflow */
!         recptr.xrecoff = tmp;

!         recvOff += byteswritten;
!         nbytes -= byteswritten;
!         buf += byteswritten;
!
!         LogstreamResult.Send    = recptr;
!         LogstreamResult.Write    = recptr;
!
!         if (sync_method == SYNC_METHOD_OPEN ||
!             sync_method == SYNC_METHOD_OPEN_DSYNC)
!         {
!             LogstreamResult.Flush = recptr;
!         }
!
!         /*
!          * If the segment is ready to copy to archival storage,
!          * notify the archiver so.
!          */
!         if ((recptr.xrecoff % XLOG_SEG_SIZE == 0) && XLogArchivingActive())
!             XLogArchiveNotifySeg(recvId, recvSeg);
!
!         /*
!          * XXX: Should we signal bgwriter to start a restartpoint
!          * if we've consumed too much xlog since the last one, like
!          * in normal processing? But this is not worth doing unless
!          * a restartpoint can be created independently from a
!          * checkpoint record.
!          */
!     }

      /*
!      * Zero the rest of the last page we wrote to, so that bytes beyond what
!      * we've written will look like zeroes and not valid XLOG records. Save
!      * the end position of the already zeroed area at the variable
!      * ZeroedRecPtr, and avoid zeroing the same page two or more times.
       *
       * This must precede the writing of the actual logs. Otherwise, a crash
!      * before re-zeroing would cause a corrupted page. XXX: that's not really
!      * an issue, a hard crash could leave the page half-flushed anyway. And we
!      * have CRC to protect from that anyway, this zeroing business isn't
!      * absolutely necessary anyway.
       */
!     if (XLByteLT(ZeroedRecPtr, recptr) && recptr.xrecoff % XLOG_BLCKSZ != 0)
      {
          int        zlen;

!         zlen = XLOG_BLCKSZ - recptr.xrecoff % XLOG_BLCKSZ;
!
!         byteswritten = write(recvFile, ZeroedBuffer, zlen);
!         if (byteswritten != zlen)
!         {
!             /* if write didn't set errno, assume no disk space */
!             if (errno == 0)
!                 errno = ENOSPC;
!             ereport(PANIC,
!                     (errcode_for_file_access(),
!                      errmsg("could not write to log file %u, segment %u "
!                             "at offset %u, length %lu: %m",
!                             recvId, recvSeg,
!                             recvOff, (unsigned long) nbytes)));
!         }
          ZeroedRecPtr = recptr;
          ZeroedRecPtr.xrecoff += zlen;

!         recvOff += byteswritten;
      }

      /* Update shared-memory status */
***************
*** 594,600 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)
          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
          XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
!         if (*fsynced)
              XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);
      }
--- 628,635 ----
          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send);
          XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write);
!         if (sync_method == SYNC_METHOD_OPEN ||
!             sync_method == SYNC_METHOD_OPEN_DSYNC)
              XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);
      }
***************
*** 607,666 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced)

  /* Flush the log to disk */
  static void
! XLogWalRcvFlush(XLogRecPtr recptr)
  {
!     START_CRIT_SECTION();
!
!     issue_xlog_fsync(recvFile, recvId, recvSeg);
!
!     LogstreamResult.Flush = recptr;
!
!     /* Update shared-memory status */
      {
          /* use volatile pointer to prevent code rearrangement */
          volatile WalRcvData *walrcv = WalRcv;

          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);
-     }
-
-     END_CRIT_SECTION();
- }

! /* Physical write to the given logs */
! static void
! WritePhysicalXLog(char *from, Size nbytes, int startoff)
! {
!     /* Need to seek in the file? */
!     if (recvOff != startoff)
!     {
!         if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
!             ereport(PANIC,
!                     (errcode_for_file_access(),
!                      errmsg("could not seek in log file %u, "
!                             "segment %u to offset %u: %m",
!                             recvId, recvSeg, startoff)));
!         recvOff = startoff;
!     }

!     /* OK to write the logs */
!     errno = 0;
!     if (write(recvFile, from, nbytes) != nbytes)
!     {
!         /* if write didn't set errno, assume no disk space */
!         if (errno == 0)
!             errno = ENOSPC;
!         ereport(PANIC,
!                 (errcode_for_file_access(),
!                  errmsg("could not write to log file %u, segment %u "
!                         "at offset %u, length %lu: %m",
!                         recvId, recvSeg,
!                         recvOff, (unsigned long) nbytes)));
      }
-
-     /* Update state for write */
-     recvOff += nbytes;
  }

  /*
--- 642,674 ----

  /* Flush the log to disk */
  static void
! XLogWalRcvFlush(void)
  {
!     if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
      {
          /* use volatile pointer to prevent code rearrangement */
          volatile WalRcvData *walrcv = WalRcv;

+         START_CRIT_SECTION();
+
+         issue_xlog_fsync(recvFile, recvId, recvSeg);
+
+         LogstreamResult.Flush = LogstreamResult.Write;
+
+         /* Update shared-memory status */
          SpinLockAcquire(&walrcv->mutex);
          XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush);
          SpinLockRelease(&walrcv->mutex);

!         END_CRIT_SECTION();

!         /* Let the primary know */
!         if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid,
!                             LogstreamResult.Flush.xrecoff, 1) == -1)
!             ereport(FATAL,
!                     (errmsg("could not send a message to the primary: %s",
!                             PQerrorMessage(streamConn))));
      }
  }

  /*
*** a/src/backend/postmaster/walsender.c
--- b/src/backend/postmaster/walsender.c
***************
*** 113,122 **** static void WalSndQuickDieHandler(SIGNAL_ARGS);
  static int    WalSndLoop(void);
  static void    InitWalSnd(void);
  static void    WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr);
  static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
  static bool ProcessStreamMsgs(PendingMessage inMsg);

  /* Main entry point for walsender process */
  int
  WalSenderMain(void)
--- 113,127 ----
  static int    WalSndLoop(void);
  static void    InitWalSnd(void);
  static void    WalSndKill(int code, Datum arg);
! static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg);
  static bool ProcessStreamMsgs(PendingMessage inMsg);

+ /*
+  * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
+  */
+ #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2)
+
  /* Main entry point for walsender process */
  int
  WalSenderMain(void)
***************
*** 382,400 **** WalSndKill(int code, Datum arg)
  }

  /*
!  * Read the log into buffer.
!  *
!  * startoff is the file offset where we start reading the log from; nbytes is
!  * the number of bytes which needs to be read; recptr is the last byte + 1 to
!  * read.
   */
  void
! XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  {
      char path[MAXPGPATH];
!
!     /* Don't cross a segment boundary */
!     Assert(startoff + nbytes <= XLogSegSize);

  #ifdef REPLICATION_DEBUG
      if (REPLICATION_DEBUG_ENABLED)
--- 387,399 ----
  }

  /*
!  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
   */
  void
! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  {
      char path[MAXPGPATH];
!     uint32 startoff;

  #ifdef REPLICATION_DEBUG
      if (REPLICATION_DEBUG_ENABLED)
***************
*** 404,464 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
               LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
  #endif

!     if (!XLByteInPrevSeg(recptr, sendId, sendSeg))
      {
!         /* Switch to another logfile segment */
!         if (sendFile >= 0)
!             close(sendFile);

!         XLByteToPrevSeg(recptr, sendId, sendSeg);
!         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);

!         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
!         if (sendFile < 0)
!             ereport(FATAL,
!                     (errcode_for_file_access(),
!                      errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                             path, sendId, sendSeg)));
!         sendOff = 0;
!     }

!     /* Make sure we have the current logfile open */
!     if (sendFile < 0)
!     {
!         XLByteToPrevSeg(recptr, sendId, sendSeg);
!         XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);

!         sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
!         if (sendFile < 0)
!             ereport(FATAL,
!                     (errcode_for_file_access(),
!                      errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                             path, sendId, sendSeg)));
!         sendOff = 0;
!     }

!     /* Need to seek in the file? */
!     if (sendOff != startoff)
!     {
!         if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
              ereport(FATAL,
                      (errcode_for_file_access(),
!                      errmsg("could not seek in log file %u, segment %u to offset %u: %m",
!                             sendId, sendSeg, startoff)));
!         sendOff = startoff;
!     }
!
!     if (read(sendFile, buf, nbytes) != nbytes)
!     {
!         ereport(FATAL,
!                 (errcode_for_file_access(),
!                  errmsg("could not read from log file %u, segment %u, offset %u, "
!                         "length %lu: %m",
!                         sendId, sendSeg, sendOff, (unsigned long) nbytes)));
      }
-
-     /* Update state for read */
-     sendOff += nbytes;
  }

  /*
--- 403,469 ----
               LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
  #endif

!     while (nbytes > 0)
      {
!         int segbytes;
!         int readbytes;
!         uint32 tmp;

!         startoff = recptr.xrecoff % XLOG_SEG_SIZE;

!         if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
!         {
!             /* Switch to another logfile segment */
!             if (sendFile >= 0)
!                 close(sendFile);
!
!             XLByteToSeg(recptr, sendId, sendSeg);
!             XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
!
!             sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
!             if (sendFile < 0)
!                 ereport(FATAL, /* XXX: Why FATAL? */
!                         (errcode_for_file_access(),
!                          errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
!                                 path, sendId, sendSeg)));
!             sendOff = 0;
!         }

!         /* Need to seek in the file? */
!         if (sendOff != startoff)
!         {
!             if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
!                 ereport(FATAL,
!                         (errcode_for_file_access(),
!                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
!                                 sendId, sendSeg, startoff)));
!             sendOff = startoff;
!         }

!         /* How many bytes are within this segment? */
!         if (nbytes > (XLOG_SEG_SIZE - startoff))
!             segbytes = XLOG_SEG_SIZE - startoff;
!         else
!             segbytes = nbytes;

!         readbytes = read(sendFile, buf, segbytes);
!         if (readbytes <= 0)
              ereport(FATAL,
                      (errcode_for_file_access(),
!                      errmsg("could not read from log file %u, segment %u, offset %u, "
!                             "length %lu: %m",
!                             sendId, sendSeg, sendOff, (unsigned long) segbytes)));
!
!         /* Update state for read */
!         tmp = recptr.xrecoff + readbytes;
!         if (tmp < recptr.xrecoff)
!             recptr.xlogid++; /* overflow */
!         recptr.xrecoff = tmp;
!
!         sendOff += readbytes;
!         nbytes -= readbytes;
!         buf += readbytes;
      }
  }

  /*
***************
*** 469,488 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr)
  static bool
  XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  {
-     bool        ispartialpage;
-     bool        last_iteration;
-     bool        finishing_seg;
-     int            nmsgs;
-     int            npages;
      int            res;
-     uint32        startpos;
-     uint32        startoff;
-     uint32        endpos;
      XLogRecPtr    SendRqstPtr;

      /*
!      * Invalid position means that XLOG streaming is not started yet,
!      * so we do nothing here.
       */
      if (XLogRecPtrIsInvalid(LogstreamResult.Send))
          return true;
--- 474,486 ----
  static bool
  XLogSend(PendingMessage inMsg, PendingMessage outMsg)
  {
      int            res;
      XLogRecPtr    SendRqstPtr;

      /*
!      * Invalid position means that we have not yet received the initial
!      * XLogRecPtr message from the slave that indicates where to start the
!      * streaming.
       */
      if (XLogRecPtrIsInvalid(LogstreamResult.Send))
          return true;
***************
*** 490,495 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
--- 488,497 ----
      /* Attempt to send all the records which were written to the disk */
      SendRqstPtr = GetWriteRecPtr();

+     /* Quick exit if nothing to do */
+     if (!XLByteLT(LogstreamResult.Send, SendRqstPtr))
+         return true;
+
  #ifdef REPLICATION_DEBUG
      if (REPLICATION_DEBUG_ENABLED)
          elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X",
***************
*** 520,631 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg)
       * sending in the last page. We must initialize all of them to
       * keep the compiler quiet.
       */
-     nmsgs = 0;
-     npages = 0;
-     startpos = 0;
-     startoff = 0;
-     endpos = XLOG_BLCKSZ;

      while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
      {
          /*
!          * Advance LogstreamResult.Send to end of current page. If this
!          * is a first loop iteration (i.e., in the case where npages is 0),
!          * it might indicate a halfway position or cross a logid boundary,
!          * so alignment is needed. Otherwise, since it's guaranteed that
!          * LogstreamResult.Send indicates end of previous page and we have
!          * not crossed a logid boundary yet in this loop iteration,
!          * we have only to increment it by XLOG_BLCKSZ bytes.
           */
!         if (npages == 0)
!         {
!             startpos = LogstreamResult.Send.xrecoff % XLOG_BLCKSZ;
!             startoff = LogstreamResult.Send.xrecoff % XLogSegSize - startpos;

!             LogstreamResult.Send.xrecoff += XLOG_BLCKSZ - startpos;
!             if (LogstreamResult.Send.xrecoff > XLogFileSize)
!             {
!                 LogstreamResult.Send.xlogid++;
!                 LogstreamResult.Send.xrecoff %= XLogFileSize;
!             }
!         }
!         else
!             LogstreamResult.Send.xrecoff += XLOG_BLCKSZ;
!         ispartialpage = XLByteLT(SendRqstPtr, LogstreamResult.Send);

!         npages++;

          /*
!          * Read and send the set if this will be the last loop iteration,
!          * or if the number of pages in the set is larger than
!          * MaxPagesPerXLogData, or if we are at the end of the logfile
!          * segment.
           */
-         last_iteration = !XLByteLT(LogstreamResult.Send, SendRqstPtr);
-         if (last_iteration)
-         {
-             endpos = SendRqstPtr.xrecoff % XLOG_BLCKSZ;
-             if (endpos == 0)
-                 endpos = XLOG_BLCKSZ;
-         }
-
-         finishing_seg = !ispartialpage &&
-             (startoff + npages * XLOG_BLCKSZ) >= XLogSegSize;

!         /* Only asked to send a partial page */
!         if (ispartialpage)
!             LogstreamResult.Send = SendRqstPtr;

!         if (last_iteration ||
!             npages >= MaxPagesPerXLogData ||
!             finishing_seg)
          {
!             Size    nbytes;
!             uint8    flags = 0;
!
!             if (finishing_seg)
!                 flags |= XLOGSTREAM_END_SEG;
!
!             /*
!              * XXX: Should we request the standby to fsync the log if the
!              * current set might include a shutdown checkpoint record?
!              */
!
!             /* OK to read and send the log */
!             pq_beginasyncmsg(outMsg, 'w');
!             pq_sendint(outMsg->buf, flags, 1);
!             pq_sendint(outMsg->buf, LogstreamResult.Send.xlogid, 4);
!             pq_sendint(outMsg->buf, LogstreamResult.Send.xrecoff, 4);
!
!             nbytes = (npages - 1) * (Size) XLOG_BLCKSZ - startpos + endpos;
!
!             /*
!              * Read the log into the output buffer directly to prevent
!              * extra memcpy calls.
!              */
!             XLogRead(BufferGetStringInfo(outMsg->buf, nbytes),
!                      startoff + startpos, nbytes, LogstreamResult.Send);

!             res = pq_endasyncmsg(outMsg);
!             if (res < 0)
!                 return false;
!             if (res == 0)
!                 break;

!             /*
!              * Stop sending the log for another job (e.g., checking for
!              * interrupts) periodically.
!              */
!             if (++nmsgs > MaxMsgsPerXLogSend)
!             {
!                 pending_xlog_send = true;
!                 break;
!             }
!
!             npages = 0;
!         }

!         if (ispartialpage)
              break;
      }

--- 522,588 ----
       * sending in the last page. We must initialize all of them to
       * keep the compiler quiet.
       */

      while (XLByteLT(LogstreamResult.Send, SendRqstPtr))
      {
+         XLogRecPtr startptr;
+         XLogRecPtr endptr;
+         Size    nbytes;
+         uint8    flags = 0;
+
          /*
!          * Figure out how much to send in one message. If there's less than
!          * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
!          * MAX_SEND_SIZE bytes, but round to page boundary for efficiency.
           */
!         startptr = LogstreamResult.Send;
!         endptr = startptr;
!         endptr.xrecoff += MAX_SEND_SIZE;
!         if(endptr.xrecoff < startptr.xrecoff)
!             endptr.xlogid++; /* xrecoff overflowed */

!         /* round down to page boundary */
!         endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);

!         if (XLByteLT(SendRqstPtr, endptr))
!             endptr = SendRqstPtr;

          /*
!          * XXX: Should we request the standby to fsync the log if the
!          * current set might include a shutdown checkpoint record?
!          *
!          * Heikki: Well, we don't do that with other checkpoints, I don't
!          * see why we should at a shutdown checkpoint. However, perhaps
!          * walreceiver should do an fsync whenever the connection is lost,
!          * whatever the reason (e.g the master has been shut down) ?
           */

!         /* OK to read and send the log */
!         pq_beginasyncmsg(outMsg, 'w');
!         pq_sendint(outMsg->buf, flags, 1);
!         pq_sendint(outMsg->buf, startptr.xlogid, 4);
!         pq_sendint(outMsg->buf, startptr.xrecoff, 4);

!         if (endptr.xlogid != startptr.xlogid)
          {
!             Assert(endptr.xlogid == startptr.xlogid + 1);
!             nbytes = (0xffffffff - endptr.xrecoff) + startptr.xrecoff;
!         }
!         else
!             nbytes = endptr.xrecoff - startptr.xrecoff;

!         LogstreamResult.Send = endptr;

!         /*
!          * Read the log into the output buffer directly to prevent
!          * extra memcpy calls.
!          */
!         XLogRead(BufferGetStringInfo(outMsg->buf, nbytes), startptr, nbytes);

!         res = pq_endasyncmsg(outMsg);
!         if (res < 0)
!             return false;
!         if (res == 0)
              break;
      }


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

Sorry for the delay.

On Mon, Sep 21, 2009 at 4:51 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Having gone through the patch now in more detail, I think it's in pretty
> good shape. I'm happy with the overall design, except that I haven't
> been able to make up my mind if walreceiver should indeed be a
> stand-alone program as discussed, or a postmaster child process as in
> the patch you submitted. Putting that question aside for a moment,
> here's some minor things, in no particular order:

Thanks for the comments.

> - The async API in PQgetXLogData is quite different from the other
> commands. It's close to the API from PQgetCopyData(), but doesn't return
> a malloc'd buffer like PQgetCopyData does. I presume that's to optimize
> away the extra memcpy step?

Yes. This is for preventing extra memcpy.

> I don't think that's really necessary, I
> don't recall any complaints about that in PQgetCopyData(), and if it
> does become an issue, it could be optimized away by mallocing the buffer
> first and reading directly to that.

OK. I'll change PQgetXLogData() to return a malloc'd buffer, and will
remove PQmarkConsumed().

> - Can we avoid sprinkling XLogStreamingAllowed() calls to places where
> we check if WAL-logging is required (nbtsort.c, copy.c etc.). I think we
> need a new macro to encapsulate (XLogArchivingActive() ||
> XLogStreamingAllowed()).

Yes. I'll introduce a new macro XLogIsNeeded() which encapsulates
(XLogArchivingActive() || XLogStreamingAllowed()).

> - Is O_DIRECT ever a good idea in walreceiver? If it's really direct and
> doesn't get cached, the startup process will need to read from disk.

Good point. I agree that O_DIRECT is useless if walreceiver works
with the startup process. It might be useful if only stand-alone walreceiver
program is executed in the standby.

> - Can we replace read/write_conninfo with just a long-enough field in
> shared mem? Would be simpler. (this is moot if we go with the
> stand-alone walreceiver program and pass it as a command-line argument)

Yes, if we can decide the length of conninfo. Since I could not decide
that, I used read/write_conninfo to tell walreceiver the conninfo. Is the
fixed size 1024B enough for conninfo?

> - walreceiver shouldn't die on connection error, just to be restarted by
> startup process. Can we add error handling a la bgwriter and have a
> retry loop within walreceiver? (again, if we go with a stand-alone
> walreceiver program, it's probably better to have startup process
> responsible to restart walreceiver, as it is now)

Error handling a la bgwriter? You mean that PG_exception_stack
should be set up to handle an ERROR exception?

Anyway, I'll change walreceiver to retry connecting to the primary
after an error occurs in PQstartXLogStreaming()/PQgetXLogData()/
PQputXLogRecPtr(). Should we set an upper limit of the number of
the retries?

> - pq_wait in backend waits until you can read or write at least 1 byte.
> There is no guarantee that you can send or read the whole message
> without blocking. We'd have to put the socket in non-blocking mode for
> that. I'm not sure what the implications of this are.

Umm... AFAIK, poll and select guarantee that at least the subsequent
recv will not be blocked. If there is only 1 byte available in the buffer,
recv would read that 1 byte and return immediately. I'm not sure if send
will get stuck even after poll is passed. In my environment (RHEL5),
send seems not to be blocked.

> - we should include system_identifier somewhere in the replication
> startup handshake. Otherwise you can connect to server from a different
> system and have logs shipped, if they happen to be roughly at the same
> point in WAL. Replay will almost certainly fail, but we should error
> earlier.

Agreed. I'll do that.

> - I know I said we should have just asynchronous replication at first,
> but looking ahead, how would you do synchronous?

As the previous patch did, I'm going to make walsender read the latest
XLOG from wal_buffers, introduce the signaling between a backend
and walsender, and keep a backend waiting until the specified XLOG
has been written or fsynced in the standby.

> What kind of signaling
> is needed between walreceiver and startup process for that?

I was thinking that the synchronization mode which a client waits
until XLOG has been applied is not necessary right now, so no
signaling is also not required between those processes yet. But,
HS requires this capability?

> - 'replication' shouldn't be a real database.

Agreed. I'll remove that.

> I found the paging logic in walsender confusing, and didn't like the
> idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely
> walreceiver knows how to split the WAL into files without such a flag. I
> reworked that logic, I think it's easier to understand now. I kept the
> support for the flag in libpq and the protocol for now, but it should be
> removed too, or repurposed to indicate that pg_switch_xlog() was done in
> the master. I've pushed that to 'replication-orig' branch in my git
> repository, attached is the same as a diff against your SR_0914.patch.
>
> I need a break from this patch, so I'll take a closer look at Simon's
> hot standby now. Meanwhile, can you work on the above items and submit a
> new version, please?

Yeah, sure.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Mon, Sep 21, 2009 at 4:51 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> I found the paging logic in walsender confusing, and didn't like the
> idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely
> walreceiver knows how to split the WAL into files without such a flag. I
> reworked that logic, I think it's easier to understand now. I kept the
> support for the flag in libpq and the protocol for now, but it should be
> removed too, or repurposed to indicate that pg_switch_xlog() was done in
> the master. I've pushed that to 'replication-orig' branch in my git
> repository, attached is the same as a diff against your SR_0914.patch.

In the 'replication-orig' branch, walreceiver fsyncs the previous XLOG
file after receiving new XLOG records before writing them. This would
increase the backend's waiting time for replication in synchronous case.
The walreceiver should fsync the XLOG file after sending ACK (if needed)
before receiving the next XLOG records?

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> In the 'replication-orig' branch, walreceiver fsyncs the previous XLOG
> file after receiving new XLOG records before writing them. This would
> increase the backend's waiting time for replication in synchronous case.
> The walreceiver should fsync the XLOG file after sending ACK (if needed)
> before receiving the next XLOG records?

I don't follow. Wareceiver does fsync the file just after writing it ifthe fsync_requested flag was set in the message.
Surelythat would be
 
set in synchronous mode, that's what the flag is for, right?

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Thu, Sep 24, 2009 at 7:41 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Fujii Masao wrote:
>> In the 'replication-orig' branch, walreceiver fsyncs the previous XLOG
>> file after receiving new XLOG records before writing them. This would
>> increase the backend's waiting time for replication in synchronous case.
>> The walreceiver should fsync the XLOG file after sending ACK (if needed)
>> before receiving the next XLOG records?
>
> I don't follow. Wareceiver does fsync the file just after writing it if
>  the fsync_requested flag was set in the message. Surely that would be
> set in synchronous mode, that's what the flag is for, right?

That's the case where fsync is issued at the end of segment.
In this case, since the fsync_requested flag is not set,
walreceiver doesn't perform fsync in that loop. After the
next XLOG arrives, walreceiver does fsync to the previous file,
in XLogWalRcvWrite().

Am I missing something?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> On Mon, Sep 21, 2009 at 4:51 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> - Can we replace read/write_conninfo with just a long-enough field in
>> shared mem? Would be simpler. (this is moot if we go with the
>> stand-alone walreceiver program and pass it as a command-line argument)
> 
> Yes, if we can decide the length of conninfo. Since I could not decide
> that, I used read/write_conninfo to tell walreceiver the conninfo. Is the
> fixed size 1024B enough for conninfo?

Yeah, that should be plenty.

>> - walreceiver shouldn't die on connection error, just to be restarted by
>> startup process. Can we add error handling a la bgwriter and have a
>> retry loop within walreceiver? (again, if we go with a stand-alone
>> walreceiver program, it's probably better to have startup process
>> responsible to restart walreceiver, as it is now)
> 
> Error handling a la bgwriter? You mean that PG_exception_stack
> should be set up to handle an ERROR exception?

Yep.

> Anyway, I'll change walreceiver to retry connecting to the primary
> after an error occurs in PQstartXLogStreaming()/PQgetXLogData()/
> PQputXLogRecPtr(). Should we set an upper limit of the number of
> the retries?

I don't think we need an upper limit.

>> - pq_wait in backend waits until you can read or write at least 1 byte.
>> There is no guarantee that you can send or read the whole message
>> without blocking. We'd have to put the socket in non-blocking mode for
>> that. I'm not sure what the implications of this are.
> 
> Umm... AFAIK, poll and select guarantee that at least the subsequent
> recv will not be blocked. If there is only 1 byte available in the buffer,
> recv would read that 1 byte and return immediately. I'm not sure if send
> will get stuck even after poll is passed. In my environment (RHEL5),
> send seems not to be blocked.

Hmm, I guess you're right.

>> - I know I said we should have just asynchronous replication at first,
>> but looking ahead, how would you do synchronous?
> 
> As the previous patch did, I'm going to make walsender read the latest
> XLOG from wal_buffers, introduce the signaling between a backend
> and walsender, and keep a backend waiting until the specified XLOG
> has been written or fsynced in the standby.

Ok. I don't think walsender needs to access wal_buffers even then,
though. Once the backend has written the WAL, walsender can well read it
from disk (it will surely be in OS cache still).

>> What kind of signaling
>> is needed between walreceiver and startup process for that?
> 
> I was thinking that the synchronization mode which a client waits
> until XLOG has been applied is not necessary right now, so no
> signaling is also not required between those processes yet. But,
> HS requires this capability?

Yeah, I think it will be important with hot standby. It's a much more
useful guarantee that once COMMIT returns, the transactions is visible
in the standby, than that it's merely fsync'd to disk in the standby.

(don't need to solve it now, let's do just asynchronous mode now, but
it's something to keep in mind)

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> On Thu, Sep 24, 2009 at 7:41 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>> Fujii Masao wrote:
>>> In the 'replication-orig' branch, walreceiver fsyncs the previous XLOG
>>> file after receiving new XLOG records before writing them. This would
>>> increase the backend's waiting time for replication in synchronous case.
>>> The walreceiver should fsync the XLOG file after sending ACK (if needed)
>>> before receiving the next XLOG records?
>> I don't follow. Wareceiver does fsync the file just after writing it if
>>  the fsync_requested flag was set in the message. Surely that would be
>> set in synchronous mode, that's what the flag is for, right?
> 
> That's the case where fsync is issued at the end of segment.
> In this case, since the fsync_requested flag is not set,
> walreceiver doesn't perform fsync in that loop. After the
> next XLOG arrives, walreceiver does fsync to the previous file,
> in XLogWalRcvWrite().

Ok. I don't see anything wrong with that. If the primary didn't set
fsync_requested, it's not in a hurry to get an acknowledgment.

I guess we could check *after* writing, if we just finished filling the
segment. If we did, we could fsync since we're going to fsync anyway as
soon as we receive the next message. Not sure if it's worth the trouble.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
On Thu, Sep 24, 2009 at 7:57 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
>> Anyway, I'll change walreceiver to retry connecting to the primary
>> after an error occurs in PQstartXLogStreaming()/PQgetXLogData()/
>> PQputXLogRecPtr(). Should we set an upper limit of the number of
>> the retries?
>
> I don't think we need an upper limit.

Without an upper limit, for example, mis-setting of the primary_conninfo
would make walreceiver repeat PQstartXLogStreaming() forever. Is this OK?

>>> - I know I said we should have just asynchronous replication at first,
>>> but looking ahead, how would you do synchronous?
>>
>> As the previous patch did, I'm going to make walsender read the latest
>> XLOG from wal_buffers, introduce the signaling between a backend
>> and walsender, and keep a backend waiting until the specified XLOG
>> has been written or fsynced in the standby.
>
> Ok. I don't think walsender needs to access wal_buffers even then,
> though. Once the backend has written the WAL, walsender can well read it
> from disk (it will surely be in OS cache still).

I think that walsender should not delay sending the XLOG until it has
been written by the backend, for performance improvement. Otherwise,
XLOG write and send are performed in serial, which would increase a
response time. Should those be performed in parallel?

>>> What kind of signaling
>>> is needed between walreceiver and startup process for that?
>>
>> I was thinking that the synchronization mode which a client waits
>> until XLOG has been applied is not necessary right now, so no
>> signaling is also not required between those processes yet. But,
>> HS requires this capability?
>
> Yeah, I think it will be important with hot standby. It's a much more
> useful guarantee that once COMMIT returns, the transactions is visible
> in the standby, than that it's merely fsync'd to disk in the standby.
>
> (don't need to solve it now, let's do just asynchronous mode now, but
> it's something to keep in mind)

Okey.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Heikki Linnakangas
Дата:
Fujii Masao wrote:
> On Thu, Sep 24, 2009 at 7:57 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
>>>> - I know I said we should have just asynchronous replication at first,
>>>> but looking ahead, how would you do synchronous?
>>> As the previous patch did, I'm going to make walsender read the latest
>>> XLOG from wal_buffers, introduce the signaling between a backend
>>> and walsender, and keep a backend waiting until the specified XLOG
>>> has been written or fsynced in the standby.
>> Ok. I don't think walsender needs to access wal_buffers even then,
>> though. Once the backend has written the WAL, walsender can well read it
>> from disk (it will surely be in OS cache still).
> 
> I think that walsender should not delay sending the XLOG until it has
> been written by the backend, for performance improvement. Otherwise,
> XLOG write and send are performed in serial, which would increase a
> response time. Should those be performed in parallel?

Well, sure, performance is good, but let's keep it simple for now. The
write() to disk should normally be absorbed by the OS cache and return
quickly, so it's not a big delay.

--  Heikki Linnakangas EnterpriseDB   http://www.enterprisedb.com


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Fri, Sep 25, 2009 at 7:10 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Fujii Masao wrote:
>> On Thu, Sep 24, 2009 at 7:57 PM, Heikki Linnakangas
>> <heikki.linnakangas@enterprisedb.com> wrote:
>>>>> - I know I said we should have just asynchronous replication at first,
>>>>> but looking ahead, how would you do synchronous?
>>>> As the previous patch did, I'm going to make walsender read the latest
>>>> XLOG from wal_buffers, introduce the signaling between a backend
>>>> and walsender, and keep a backend waiting until the specified XLOG
>>>> has been written or fsynced in the standby.
>>> Ok. I don't think walsender needs to access wal_buffers even then,
>>> though. Once the backend has written the WAL, walsender can well read it
>>> from disk (it will surely be in OS cache still).
>>
>> I think that walsender should not delay sending the XLOG until it has
>> been written by the backend, for performance improvement. Otherwise,
>> XLOG write and send are performed in serial, which would increase a
>> response time. Should those be performed in parallel?
>
> Well, sure, performance is good, but let's keep it simple for now. The
> write() to disk should normally be absorbed by the OS cache and return
> quickly, so it's not a big delay.

Umm... a backend at least should tell walsender the location which it
has written the XLOG before issuing fsync. In the current XLogWrite(),
XLogCtl->LogwrtResult.Write is updated after fsync has been performed.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Thu, Sep 24, 2009 at 5:20 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
>> Meanwhile, can you work on the above items and submit a
>> new version, please?
>
> Yeah, sure.

The attached is the patch to tackle the items, against 'replication-orig' branch
in your git repository.

Changes:
    * Change PQgetXLogData() to return a malloc'd buffer instead of a
pointer of the internal buffer.
    * Remove PQmarkConsumed().
    * Introduce a new macro XLogIsNeeded() which encapsulates
(XLogArchivingActive() || XLogStreamingAllowed()).
    * Replace read/write_conninfo with just a long-enough field in shared mem.
    * Remove 'replication' database, and support a new keyword
'replication' for pg_hba.conf.
    * Include system_identifier in the replication startup handshake.
    * Add error handling a la bgwriter and have a retry loop within walreceiver.
    * Prevent the startup process from getting stuck when launching
walreceiver fails.

Since we might need to change the patch further, I've not modified the
document yet.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Вложения

Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
On Thu, Sep 17, 2009 at 5:08 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> Walreceiver is really a slave to the startup process. The startup
> process decides when it's launched, and it's the startup process that
> then waits for it to advance. But the way it's set up at the moment, the
> startup process needs to ask the postmaster to start it up, and it
> doesn't look very robust to me. For example, if launching walreceiver
> fails for some reason, startup process will just hang waiting for it.

I changed the postmaster to report the failure of  fork of the walreceiver
to the startup process by resetting WalRcv->in_progress, which prevents
the startup process from getting stuck when launching walreceiver fails.
http://archives.postgresql.org/pgsql-hackers/2009-09/msg01996.php

Do you have another concern about the robustness? If yes, I'll address that.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Mon, Sep 21, 2009 at 4:51 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
> I've pushed that to 'replication-orig' branch in my git
> repository, attached is the same as a diff against your SR_0914.patch.

The following changes about crossing a xlogid boundary seem wrong,
which would break the management of some XLOG positions.

> !         /* Update state for read */
> !         tmp = recptr.xrecoff + byteswritten;
> !         if (tmp < recptr.xrecoff)
> !             recptr.xlogid++; /* overflow */
> !         recptr.xrecoff = tmp;

> !         endptr.xrecoff += MAX_SEND_SIZE;
> !         if(endptr.xrecoff < startptr.xrecoff)
> !             endptr.xlogid++; /* xrecoff overflowed */

> !         if (endptr.xlogid != startptr.xlogid)
>           {
> !             Assert(endptr.xlogid == startptr.xlogid + 1);
> !             nbytes = (0xffffffff - endptr.xrecoff) + startptr.xrecoff;
> !         }

The size of a logical XLOG file is 0xff000000. So, even if xrecoff has
not been overflowed yet, we might need to cross a xlogid boundary.
The xrecoff should be compared with XLogFileSize, I think. Can I fix those?

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center


Re: Streaming Replication patch for CommitFest 2009-09

От
Alvaro Herrera
Дата:
Fujii Masao escribió:
> On Thu, Sep 17, 2009 at 5:08 PM, Heikki Linnakangas
> <heikki.linnakangas@enterprisedb.com> wrote:
> > Walreceiver is really a slave to the startup process. The startup
> > process decides when it's launched, and it's the startup process that
> > then waits for it to advance. But the way it's set up at the moment, the
> > startup process needs to ask the postmaster to start it up, and it
> > doesn't look very robust to me. For example, if launching walreceiver
> > fails for some reason, startup process will just hang waiting for it.
> 
> I changed the postmaster to report the failure of  fork of the walreceiver
> to the startup process by resetting WalRcv->in_progress, which prevents
> the startup process from getting stuck when launching walreceiver fails.
> http://archives.postgresql.org/pgsql-hackers/2009-09/msg01996.php
> 
> Do you have another concern about the robustness? If yes, I'll address that.

Hmm.  Without looking at the patch at all, this seems similar to how
autovacuum does things: autovac launcher signals postmaster that a
worker needs to be started.  Postmaster proceeds to fork a worker.  This
could obviously fail for a lot of reasons.

Now, there is code in place to notify the user when forking fails, and
this is seen on the wild quite a bit more than one would like :-(  I
think it would be a good idea to have a retry mechanism in the
walreceiver startup mechanism so that recovery does not get stuck due to
transient problems.

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support


Re: Streaming Replication patch for CommitFest 2009-09

От
Fujii Masao
Дата:
Hi,

On Tue, Oct 6, 2009 at 10:42 PM, Alvaro Herrera
<alvherre@commandprompt.com> wrote:
> Hmm.  Without looking at the patch at all, this seems similar to how
> autovacuum does things: autovac launcher signals postmaster that a
> worker needs to be started.  Postmaster proceeds to fork a worker.  This
> could obviously fail for a lot of reasons.

Yeah, I drew upon the autovac code.

> Now, there is code in place to notify the user when forking fails, and
> this is seen on the wild quite a bit more than one would like :-(  I
> think it would be a good idea to have a retry mechanism in the
> walreceiver startup mechanism so that recovery does not get stuck due to
> transient problems.

Agreed. The latest patch provides the retry mechanism.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center