*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2018,2023 **** SET ENABLE_SEQSCAN TO OFF;
--- 2018,2131 ----
+
+ Synchronous Replication
+
+
+ These settings control the behavior of the built-in
+ synchronous replication> feature.
+ These parameters would be set on the primary server that is
+ to send replication data to one or more standby servers.
+
+
+
+
+ synchronous_replication (boolean)
+
+ synchronous_replication> configuration parameter
+
+
+
+ Specifies whether transaction commit will wait for WAL records
+ to be replicated before the command returns a success>
+ indication to the client. The default setting is off>.
+ When on>, there will be a delay while the client waits
+ for confirmation of successful replication. That delay will
+ increase depending upon the physical distance and network activity
+ between primary and standby. The commit wait will last until a
+ reply from the current synchronous standby indicates it has received
+ the commit record of the transaction. Synchronous standbys must
+ already have been defined (see ).
+
+
+ This parameter can be changed at any time; the
+ behavior for any one transaction is determined by the setting in
+ effect when it commits. It is therefore possible, and useful, to have
+ some transactions replicate synchronously and others asynchronously.
+ For example, to make a single multistatement transaction commit
+ asynchronously when the default is synchronous replication, issue
+ SET LOCAL synchronous_replication TO OFF> within the
+ transaction.
+
+
+
+
+
+ sync_replication_timeout (integer)
+
+ sync_replication_timeout> configuration parameter
+
+
+
+ If the client has synchronous_replication set,
+ and a synchronous standby is currently available
+ then the commit will wait for up to replication_timeout_client>
+ seconds before it returns a success>. The commit will wait
+ forever for a confirmation when replication_timeout_client>
+ is set to 0.
+
+
+ If the client has synchronous_replication set,
+ and yet no synchronous standby is available when we commit then we
+ don't wait at all.
+
+
+
+
+
+ synchronous_standby_names (integer)
+
+ synchronous_standby_names> configuration parameter
+
+
+
+ Specifies a priority ordered list of standby names that can offer
+ synchronous replication. At any one time there will be just one
+ synchronous standby that will wake sleeping users following commit.
+ The synchronous standby will be the first named standby that is
+ both currently connected and streaming in real-time to the standby
+ (as shown by a state of "STREAMING"). Other standby servers
+ with listed later will become potential synchronous standbys.
+ If the current synchronous standby disconnects for whatever reason
+ it will be replaced immediately with the next highest priority standby.
+ Specifying more than one standby name can allow very high availability.
+
+
+ The standby name is currently taken as the application_name of the
+ standby, as set in the primary_conninfo on the standby. Names are
+ not enforced for uniqueness. In case of duplicates one of the standbys
+ will be chosen to be the synchronous standby, though exactly which
+ one is indeterminate.
+
+
+ The default is the special entry *> which matches any
+ application_name, including the default application name of
+ walsender>. This is not recommended and a more carefully
+ thought through configuration will be desirable.
+
+
+ If a standby is removed from the list of servers then it will stop
+ being the synchronous standby, allowing another to take it's place.
+ If the list is empty, synchronous replication will not be
+ possible, whatever the setting of synchronous_replication>.
+ Standbys may also be added to the list without restarting the server.
+
+
+
+
+
+
+
Standby Servers
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 875,880 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 875,1107 ----
+
+ Synchronous Replication
+
+
+ Synchronous Replication
+
+
+
+ PostgreSQL> streaming replication is asynchronous by
+ default. If the primary server
+ crashes then some transactions that were committed may not have been
+ replicated to the standby server, causing data loss. The amount
+ of data loss is proportional to the replication delay at the time of
+ failover.
+
+
+
+ Synchronous replication offers the ability to confirm that all changes
+ made by a transaction have been transferred to one synchronous standby
+ server. This extends the standard level of durability
+ offered by a transaction commit. This level of protection is referred
+ to as 2-safe replication in computer science theory.
+
+
+
+ When requesting synchronous replication, each commit of a
+ write transaction will wait until confirmation is
+ received that the commit has been written to the transaction log on disk
+ of both the primary and standby server. The only possibility that data
+ can be lost is if both the primary and the standby suffer crashes at the
+ same time. This can provide a much higher level of durability, though only
+ if the sysadmin is cautious about the placement and management of the two
+ servers. Waiting for confirmation increases the user's confidence that the
+ changes will not be lost in the event of server crashes but it also
+ necessarily increases the response time for the requesting transaction.
+ The minimum wait time is the roundtrip time between primary to standby.
+
+
+
+ Read only transactions and transaction rollbacks need not wait for
+ replies from standby servers. Subtransaction commits do not wait for
+ responses from standby servers, only top-level commits. Long
+ running actions such as data loading or index building do not wait
+ until the very final commit message. All two-phase commit actions
+ require commit waits, including both prepare and commit.
+
+
+
+ Basic Configuration
+
+
+ All parameters have useful default values, so we can enable
+ synchronous replication easily just by setting this on the primary
+
+
+ synchronous_replication = on
+
+
+ When synchronous_replication> is set, a commit will wait
+ for up to synchronous_replication_timeout> seconds to
+ confirm that the standby has received the commit record. Both
+ synchronous_replication> and
+ synchronous_replication_timeout> can be set by individual
+ users, so can be configured in the configuration file, for particular
+ users or databases, or dynamically by applications programs.
+ It is possible for user sessions to reach timeout even though
+ standbys are communicating normally. In that case, the setting of
+ synchronous_replication_timeout> is probably too low though
+ you probably have other system or network issues as well.
+
+
+
+ After a commit record has been written to disk on the primary the
+ WAL record is then sent to the standby. The standby sends reply
+ messages each time a new batch of WAL data is received, unless
+ wal_receiver_status_interval> is set to zero on the standby.
+ If the standby is the first matching standby, as specified in
+ synchronous_standby_names> on the primary, the reply
+ messages from that standby will be used to wake users waiting for
+ confirmation the commit record has been received. These parameters
+ allow the administrator to specify which standby servers should be
+ synchronous standbys. Note that the configuration of synchronous
+ replication is mainly on the master.
+
+
+
+ The default setting of synchronous_replication_timeout> is
+ 120 seconds to ensure that users do not wait forever if all specified
+ standby servers go down. If you wish to have stronger guarantees the
+ timeout can be set higher, or even to zero, meaning wait forever.
+ Users will stop waiting if a fast shutdown is requested, though the
+ server does not fully shutdown until all outstanding WAL records are
+ transferred to standby servers.
+
+
+
+ Note also that synchronous_commit> is used when the user
+ specifies synchronous_replication>, overriding even an
+ explicit setting of synchronous_commit> to off>.
+ This is because we must write WAL to disk on primary before we replicate
+ to ensure the standby never gets ahead of the primary.
+
+
+
+
+
+ Planning for Performance
+
+
+ Synchronous replication usually requires carefully planned and placed
+ standby servers to ensure applications perform acceptably. Waiting
+ doesn't utilise system resources, but transaction locks continue to be
+ held until the transfer is confirmed. As a result, incautious use of
+ synchronous replication will reduce performance for database
+ applications because of increased response times and higher contention.
+
+
+
+ PostgreSQL> allows the application developer
+ to specify the durability level required via replication. This can be
+ specified for the system overall, though it can also be specified for
+ specific users or connections, or even individual transactions.
+
+
+
+ For example, an application workload might consist of:
+ 10% of changes are important customer details, while
+ 90% of changes are less important data that the business can more
+ easily survive if it is lost, such as chat messages between users.
+
+
+
+ With synchronous replication options specified at the application level
+ (on the primary) we can offer sync rep for the most important changes,
+ without slowing down the bulk of the total workload. Application level
+ options are an important and practical tool for allowing the benefits of
+ synchronous replication for high performance applications.
+
+
+
+ You should consider that the network bandwidth must be higher than
+ the rate of generation of WAL data.
+ 10% of changes are important customer details, while
+ 90% of changes are less important data that the business can more
+ easily survive if it is lost, such as chat messages between users.
+
+
+
+
+
+ Planning for High Availability
+
+
+ The easiest and safest method of gaining High Availability using
+ synchronous replication is to configure at least two standby servers.
+ To understand why, we need to examine what can happen when you lose all
+ standby servers.
+
+
+
+ Commits made when synchronous_replication is set will wait until at
+ the sync standby responds. The response may never occur if the last,
+ or only, standby should crash or the network drops. What should we do in
+ that situation?
+
+
+
+ If a standby was available immediately after commit we will wait.
+ Sitting and waiting will typically cause operational problems
+ because it is an effective outage of the primary server should all
+ sessions end up waiting. This is why we offer the facility to set
+ synchronous_replication_timeout>.
+
+
+
+ Once the last synchronous standby has been lost we allow transactions
+ to skip waiting, since we know there isn't anybody to reply, or at
+ least we might expect it to be some time before one returns. You will
+ note that this provides high availability but a primary server working
+ alone could allow changes that are not replicated to other servers,
+ placing your data at risk if the primary fails also.
+
+
+
+ The best solution for avoiding data loss is to ensure you don't lose
+ your last remaining sync standby. This can be achieved by naming multiple
+ potential synchronous standbys using synchronous_standby_names>.
+ The first named standby will be used as the synchronous standby. Standbys
+ listed after this will takeover the role of synchronous standby if the
+ first one should fail.
+
+
+
+ When a standby first attaches to the primary, it will not yet be properly
+ synchronized. This is described as CATCHUP> mode. Once
+ the lag between standby and primary reaches zero for the first time
+ we move to real-time STREAMING> state.
+ The catch-up duration may be long immediately after the standby has
+ been created. If the standby is shutdown, then the catch-up period
+ will increase according to the length of time the standby has been down.
+ The standby is only able to become a synchronous standby
+ once it has reached STREAMING> state.
+
+
+
+ If primary crashes while commits are waiting for acknowledgement, those
+ waiting transactions will be marked fully committed once the primary
+ database recovers.
+ There is no way to be certain that all standbys have received all
+ outstanding WAL data at time of the crash of the primary. Some
+ transactions may not show as committed on the standby, even though
+ they show as committed on the primary. The guarantee we offer is that
+ the application will not receive explicit acknowledgement of the
+ successful commit of a transaction until the WAL data is known to be
+ safely received by the standby.
+
+
+
+ If you need to re-create a standby server while transactions are
+ waiting, make sure that the commands to run pg_start_backup() and
+ pg_stop_backup() are run in a session with
+ synchronous_replication = off, otherwise those requests will wait
+ forever for the standby to appear.
+
+
+
+
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 56,61 ****
--- 56,62 ----
#include "pg_trace.h"
#include "pgstat.h"
#include "replication/walsender.h"
+ #include "replication/syncrep.h"
#include "storage/fd.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
***************
*** 1071,1076 **** EndPrepare(GlobalTransaction gxact)
--- 1072,1085 ----
END_CRIT_SECTION();
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked the prepare, but still show as
+ * running in the procarray (twice!) and continue to hold locks.
+ */
+ SyncRepWaitForLSN(gxact->prepare_lsn);
+
records.tail = records.head = NULL;
}
***************
*** 2030,2035 **** RecordTransactionCommitPrepared(TransactionId xid,
--- 2039,2052 ----
MyProc->inCommit = false;
END_CRIT_SECTION();
+
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(recptr);
}
/*
***************
*** 2109,2112 **** RecordTransactionAbortPrepared(TransactionId xid,
--- 2126,2137 ----
TransactionIdAbortTree(xid, nchildren, children);
END_CRIT_SECTION();
+
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(recptr);
}
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 37,42 ****
--- 37,43 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/walsender.h"
+ #include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1055,1061 **** RecordTransactionCommit(void)
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
*/
! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0)
{
/*
* Synchronous commit case:
--- 1056,1062 ----
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
*/
! if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested())
{
/*
* Synchronous commit case:
***************
*** 1125,1130 **** RecordTransactionCommit(void)
--- 1126,1139 ----
/* Compute latestXid while we have the child XIDs handy */
latestXid = TransactionIdLatest(xid, nchildren, children);
+ /*
+ * Wait for synchronous replication, if required.
+ *
+ * Note that at this stage we have marked clog, but still show as
+ * running in the procarray and continue to hold locks.
+ */
+ SyncRepWaitForLSN(XactLastRecEnd);
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd.xrecoff = 0;
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 521,526 **** CREATE VIEW pg_stat_replication AS
--- 521,527 ----
W.write_location,
W.flush_location,
W.replay_location
+ W.sync_priority
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
WHERE S.usesysid = U.oid AND
*** a/src/backend/postmaster/autovacuum.c
--- b/src/backend/postmaster/autovacuum.c
***************
*** 1527,1532 **** AutoVacWorkerMain(int argc, char *argv[])
--- 1527,1539 ----
SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
/*
+ * Force synchronous replication off to allow regular maintenance even
+ * if we are waiting for standbys to connect. This is important to
+ * ensure we aren't blocked from performing anti-wraparound tasks.
+ */
+ SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
* Get the info about the database we're going to work on.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 1836,1842 **** retry1:
errmsg("the database system is starting up")));
break;
case CAC_SHUTDOWN:
! ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("the database system is shutting down")));
break;
--- 1836,1843 ----
errmsg("the database system is starting up")));
break;
case CAC_SHUTDOWN:
! if (!am_walsender)
! ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("the database system is shutting down")));
break;
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 13,19 **** top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! repl_gram.o
include $(top_srcdir)/src/backend/common.mk
--- 13,19 ----
include $(top_builddir)/src/Makefile.global
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! repl_gram.o syncrep.o
include $(top_srcdir)/src/backend/common.mk
*** /dev/null
--- b/src/backend/replication/syncrep.c
***************
*** 0 ****
--- 1,617 ----
+ /*-------------------------------------------------------------------------
+ *
+ * syncrep.c
+ *
+ * Synchronous replication is new as of PostgreSQL 9.1.
+ *
+ * If requested, transaction commits wait until their commit LSN is
+ * acknowledged by the standby, or the wait hits timeout.
+ *
+ * This module contains the code for waiting and release of backends.
+ * All code in this module executes on the primary. The core streaming
+ * replication transport remains within WALreceiver/WALsender modules.
+ *
+ * The essence of this design is that it isolates all logic about
+ * waiting/releasing onto the primary. The primary defines which standbys
+ * it wishes to wait for. The standby is completely unaware of the
+ * durability requirements of transactions on the primary, reducing the
+ * complexity of the code and streamlining both standby operations and
+ * network bandwidth because there is no requirement to ship
+ * per-transaction state information.
+ *
+ * The bookeeping approach we take is that a commit is either synchronous
+ * or not synchronous (async). If it is async, we just fastpath out of
+ * here. If it is sync, then in 9.1 we wait for the flush location on the
+ * standby before releasing the waiting backend. Further complexity
+ * in that interaction is expected in later releases.
+ *
+ * The best performing way to manage the waiting backends is to have a
+ * single ordered queue of waiting backends, so that we can avoid
+ * searching the through all waiters each time we receive a reply.
+ *
+ * Starting sync replication is a multi stage process. First, the standby
+ * must be a potential synchronous standby. Next, we must have caught up
+ * with the primary; that may take some time. If there is no current
+ * synchronous standby then the WALsender will offer a sync rep service.
+ *
+ * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include
+
+ #include "access/xact.h"
+ #include "access/xlog_internal.h"
+ #include "miscadmin.h"
+ #include "postmaster/autovacuum.h"
+ #include "replication/syncrep.h"
+ #include "replication/walsender.h"
+ #include "storage/latch.h"
+ #include "storage/ipc.h"
+ #include "storage/pmsignal.h"
+ #include "storage/proc.h"
+ #include "utils/builtins.h"
+ #include "utils/guc.h"
+ #include "utils/guc_tables.h"
+ #include "utils/memutils.h"
+ #include "utils/ps_status.h"
+
+ /* User-settable parameters for sync rep */
+ bool sync_rep_mode = false; /* Only set in user backends */
+ int sync_rep_timeout = 120; /* Only set in user backends */
+ char *SyncRepStandbyNames;
+
+ bool WaitingForSyncRep = false; /* Global state for some exit methods */
+
+ #define IsOnSyncRepQueue() (MyProc->lwWaiting)
+
+ static bool announce_next_takeover = true;
+
+ static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN);
+ static void SyncRepRemoveFromQueue(void);
+ static void SyncRepAddToQueue(void);
+ static long SyncRepGetWaitTimeout(void);
+
+ static int SyncRepGetStandbyPriority(void);
+ static int SyncRepWakeQueue(void);
+
+
+ /*
+ * ===========================================================
+ * Synchronous Replication functions for normal user backends
+ * ===========================================================
+ */
+
+ /*
+ * Wait for synchronous replication, if requested by user.
+ */
+ void
+ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+ {
+ /*
+ * Fast exit if user has not requested sync replication, or
+ * streaming replication is inactive in this server.
+ */
+ if (!SyncRepRequested() || max_wal_senders == 0)
+ return;
+
+ /*
+ * Wait on queue. We check for a fast exit once we have the lock.
+ */
+ SyncRepWaitOnQueue(XactCommitLSN);
+ }
+
+ void
+ SyncRepCleanupAtProcExit(int code, Datum arg)
+ {
+ if (IsOnSyncRepQueue())
+ {
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ SyncRepRemoveFromQueue();
+ LWLockRelease(SyncRepLock);
+ }
+
+ if (MyProc != NULL)
+ DisownLatch(&MyProc->waitLatch);
+ }
+
+ /*
+ * Wait for specified LSN to be confirmed at the requested level
+ * of durability. Each proc has its own wait latch, so we perform
+ * a normal latch check/wait loop here.
+ */
+ static void
+ SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ TimestampTz now = GetCurrentTransactionStopTimestamp();
+ long timeout = SyncRepGetWaitTimeout();
+ char *new_status = NULL;
+ const char *old_status;
+ int len;
+ bool wait_on_queue = false;
+
+ ereport(DEBUG3,
+ (errmsg("synchronous replication waiting for %X/%X starting at %s",
+ XactCommitLSN.xlogid,
+ XactCommitLSN.xrecoff,
+ timestamptz_to_str(GetCurrentTransactionStopTimestamp()))));
+
+ for (;;)
+ {
+ ResetLatch(&MyProc->waitLatch);
+
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+
+ /*
+ * First time through, add ourselves to the queue.
+ */
+ if (!IsOnSyncRepQueue())
+ {
+ int i;
+
+ /*
+ * Wait no longer if we have already reached our LSN
+ */
+ if (XLByteLE(XactCommitLSN, queue->lsn))
+ {
+ /* No need to wait */
+ LWLockRelease(SyncRepLock);
+ return;
+ }
+
+ /*
+ * Check that we have at least one sync standby active that
+ * has caught up with the primary.
+ */
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid != 0 &&
+ walsnd->sync_standby_priority > 0 &&
+ walsnd->state == WALSNDSTATE_STREAMING)
+ {
+ wait_on_queue = true;
+ break;
+ }
+ }
+
+ /*
+ * Leave quickly if we don't have a sync standby that will
+ * confirm it has received our commit.
+ */
+ if (!wait_on_queue)
+ {
+ LWLockRelease(SyncRepLock);
+ return;
+ }
+
+ /*
+ * Set our waitLSN so WALSender will know when to wake us.
+ * We set this before we add ourselves to queue, so that
+ * any proc on the queue can be examined freely without
+ * taking a lock on each process in the queue.
+ */
+ MyProc->waitLSN = XactCommitLSN;
+ SyncRepAddToQueue();
+ LWLockRelease(SyncRepLock);
+ WaitingForSyncRep = true;
+
+ /*
+ * Alter ps display to show waiting for sync rep.
+ */
+ if (update_process_title)
+ {
+ old_status = get_ps_display(&len);
+ new_status = (char *) palloc(len + 21 + 1);
+ memcpy(new_status, old_status, len);
+ strcpy(new_status + len, " waiting for sync rep");
+ set_ps_display(new_status, false);
+ new_status[len] = '\0'; /* truncate off " waiting" */
+ }
+ }
+ else
+ {
+ bool release = false;
+ bool timed_out = false;
+
+ /*
+ * Check the LSN on our queue and if it's moved far enough then
+ * remove us from the queue. First time through this is
+ * unlikely to be far enough, yet is possible. Next time we are
+ * woken we should be more lucky.
+ */
+ if (XLByteLE(XactCommitLSN, queue->lsn))
+ release = true;
+ else if (timeout > 0 &&
+ TimestampDifferenceExceeds(GetCurrentTransactionStopTimestamp(),
+ now, timeout))
+ {
+ release = true;
+ timed_out = true;
+ }
+
+ if (release)
+ {
+ SyncRepRemoveFromQueue();
+ LWLockRelease(SyncRepLock);
+ WaitingForSyncRep = false;
+
+ /*
+ * Reset our waitLSN.
+ */
+ MyProc->waitLSN.xlogid = 0;
+ MyProc->waitLSN.xrecoff = 0;
+
+ if (new_status)
+ {
+ /* Reset ps display */
+ set_ps_display(new_status, false);
+ pfree(new_status);
+ }
+
+ /*
+ * Our response to the timeout is to simply post a NOTICE and
+ * then return to the user. The commit has happened, we just
+ * haven't been able to verify it has been replicated in the
+ * way requested.
+ */
+ if (timed_out)
+ ereport(NOTICE,
+ (errmsg("synchronous replication timeout at %s",
+ timestamptz_to_str(now))));
+ else
+ ereport(DEBUG3,
+ (errmsg("synchronous replication wait complete at %s",
+ timestamptz_to_str(now))));
+ return;
+ }
+
+ LWLockRelease(SyncRepLock);
+ }
+
+ WaitLatch(&MyProc->waitLatch, timeout);
+ now = GetCurrentTimestamp();
+ }
+ }
+
+ /*
+ * Remove myself from sync rep wait queue.
+ *
+ * Assume on queue at start; will not be on queue at end.
+ * Queue is already locked at start and remains locked on exit.
+ */
+ static void
+ SyncRepRemoveFromQueue(void)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ PGPROC *proc = queue->head;
+
+ Assert(IsOnSyncRepQueue());
+
+ proc = queue->head;
+
+ if (proc == MyProc)
+ {
+ if (MyProc->lwWaitLink == NULL)
+ {
+ /*
+ * We were the only waiter on the queue. Reset head and tail.
+ */
+ Assert(queue->tail == MyProc);
+ queue->head = NULL;
+ queue->tail = NULL;
+ }
+ else
+ /*
+ * Move head to next proc on the queue.
+ */
+ queue->head = MyProc->lwWaitLink;
+ }
+ else
+ {
+ bool found = false;
+
+ while (proc->lwWaitLink != NULL)
+ {
+ /* Are we the next proc in our traversal of the queue? */
+ if (proc->lwWaitLink == MyProc)
+ {
+ /*
+ * Remove ourselves from middle of queue.
+ * No need to touch head or tail.
+ */
+ proc->lwWaitLink = MyProc->lwWaitLink;
+ found = true;
+ break;
+ }
+
+ proc = proc->lwWaitLink;
+ }
+
+ if (!found)
+ elog(WARNING, "could not locate ourselves on wait queue");
+
+ if (proc->lwWaitLink == NULL) /* At tail */
+ {
+ Assert(proc != MyProc);
+ /* Remove ourselves from tail of queue */
+ Assert(queue->tail == MyProc);
+ queue->tail = proc;
+ proc->lwWaitLink = NULL;
+ }
+ }
+ MyProc->lwWaitLink = NULL;
+ MyProc->lwWaiting = false;
+ }
+
+ /*
+ * Add myself to sync rep wait queue.
+ *
+ * Assume not on queue at start; will be on queue at end.
+ * Queue is already locked at start and remains locked on exit.
+ */
+ static void
+ SyncRepAddToQueue(void)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ PGPROC *tail = queue->tail;
+
+ /*
+ * Add myself to tail of wait queue.
+ */
+ if (tail == NULL)
+ {
+ queue->head = MyProc;
+ queue->tail = MyProc;
+ }
+ else
+ {
+ /*
+ * XXX extra code needed here to maintain sorted invariant.
+ * Our approach should be same as racing car - slow in, fast out.
+ */
+ Assert(tail->lwWaitLink == NULL);
+ tail->lwWaitLink = MyProc;
+ }
+ queue->tail = MyProc;
+
+ MyProc->lwWaiting = true;
+ MyProc->lwWaitLink = NULL;
+ }
+
+ /*
+ * Return a value that we can use directly in WaitLatch(). We need to
+ * handle special values, plus convert from seconds to microseconds.
+ *
+ */
+ static long
+ SyncRepGetWaitTimeout(void)
+ {
+ if (sync_rep_timeout == 0)
+ return -1L;
+
+ return 1000000L * sync_rep_timeout;
+ }
+
+ /*
+ * ===========================================================
+ * Synchronous Replication functions for wal sender processes
+ * ===========================================================
+ */
+
+ /*
+ * Take any action required to initialise sync rep state from config
+ * data. Called at WALSender startup and after each SIGHUP.
+ */
+ void
+ SyncRepInitConfig(void)
+ {
+ int priority;
+
+ /*
+ * Determine if we are a potential sync standby and remember the result
+ * for handling replies from standby.
+ */
+ priority = SyncRepGetStandbyPriority();
+ if (MyWalSnd->sync_standby_priority != priority)
+ {
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ MyWalSnd->sync_standby_priority = priority;
+ LWLockRelease(SyncRepLock);
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" now has synchronous standby priority %u",
+ application_name, priority)));
+ }
+ }
+
+ /*
+ * Update the LSNs on each queue based upon our latest state. This
+ * implements a simple policy of first-valid-standby-releases-waiter.
+ *
+ * Other policies are possible, which would change what we do here and what
+ * perhaps also which information we store as well.
+ */
+ void
+ SyncRepReleaseWaiters(void)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ volatile WalSnd *syncWalSnd = NULL;
+ int numprocs = 0;
+ int priority = 0;
+ int i;
+
+ /*
+ * If this WALSender is serving a standby that is not on the list of
+ * potential standbys then we have nothing to do. If we are still
+ * starting up or still running base backup, then leave quicly also.
+ */
+ if (MyWalSnd->sync_standby_priority == 0 ||
+ MyWalSnd->state < WALSNDSTATE_CATCHUP)
+ return;
+
+ /*
+ * We're a potential sync standby. Release waiters if we are the
+ * highest priority standby. We do this even if the standby is not yet
+ * caught up, in case this is a restart situation and
+ * there are backends waiting for us. That allows backends to exit the
+ * wait state even if new backends cannot yet enter the wait state.
+ */
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &walsndctl->walsnds[i];
+
+ if (walsnd->pid != 0 &&
+ walsnd->sync_standby_priority > 0 &&
+ (priority == 0 ||
+ priority < walsnd->sync_standby_priority))
+ {
+ priority = walsnd->sync_standby_priority;
+ syncWalSnd = walsnd;
+ }
+ }
+
+ /*
+ * We should have found ourselves at least.
+ */
+ Assert(syncWalSnd);
+
+ /*
+ * If we aren't managing the highest priority standby then just leave.
+ */
+ if (syncWalSnd != MyWalSnd)
+ {
+ LWLockRelease(SyncRepLock);
+ announce_next_takeover = true;
+ return;
+ }
+
+ if (XLByteLT(queue->lsn, MyWalSnd->flush))
+ {
+ /*
+ * Set the lsn first so that when we wake backends they will
+ * release up to this location.
+ */
+ queue->lsn = MyWalSnd->flush;
+ numprocs = SyncRepWakeQueue();
+ }
+
+ LWLockRelease(SyncRepLock);
+
+ elog(DEBUG3, "released %d procs up to %X/%X",
+ numprocs,
+ MyWalSnd->flush.xlogid,
+ MyWalSnd->flush.xrecoff);
+
+ /*
+ * If we are managing the highest priority standby, though we weren't
+ * prior to this, then announce we are now the sync standby.
+ */
+ if (announce_next_takeover)
+ {
+ announce_next_takeover = false;
+ ereport(LOG,
+ (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
+ application_name, MyWalSnd->sync_standby_priority)));
+ }
+ }
+
+ /*
+ * Check if we are in the list of sync standbys, and if so, determine
+ * priority sequence. Return priority if set, or zero to indicate that
+ * we are not a potential sync standby.
+ *
+ * Compare the parameter SyncRepStandbyNames against the application_name
+ * for this WALSender, or allow any name if we find a wildcard "*".
+ */
+ static int
+ SyncRepGetStandbyPriority(void)
+ {
+ char *rawstring;
+ List *elemlist;
+ ListCell *l;
+ int priority = 0;
+ bool found = false;
+
+ /* Need a modifiable copy of string */
+ rawstring = pstrdup(SyncRepStandbyNames);
+
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
+ {
+ /* syntax error in list */
+ pfree(rawstring);
+ list_free(elemlist);
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for parameter \"synchronous_standby_names\"")));
+ return 0;
+ }
+
+ foreach(l, elemlist)
+ {
+ char *standby_name = (char *) lfirst(l);
+
+ priority++;
+
+ if (pg_strcasecmp(standby_name, application_name) == 0 ||
+ pg_strcasecmp(standby_name, "*") == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ pfree(rawstring);
+ list_free(elemlist);
+
+ return (found ? priority : 0);
+ }
+
+ /*
+ * Walk queue from head setting the latches of any procs that need
+ * to be woken. We don't modify the queue, we leave that for individual
+ * procs to release themselves.
+ *
+ * Must hold SyncRepLock
+ */
+ static int
+ SyncRepWakeQueue(void)
+ {
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+ volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ PGPROC *proc = queue->head;
+ int numprocs = 0;
+
+ /* fast exit for empty queue */
+ if (proc == NULL)
+ return 0;
+
+ for (; proc != NULL; proc = proc->lwWaitLink)
+ {
+ /*
+ * Assume the queue is ordered by LSN
+ */
+ if (XLByteLT(queue->lsn, proc->waitLSN))
+ return numprocs;
+
+ numprocs++;
+ SetLatch(&proc->waitLatch);
+ }
+
+ return numprocs;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 66,72 ****
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
! static WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
--- 66,72 ----
WalSndCtlData *WalSndCtl = NULL;
/* My slot in the shared memory array */
! WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
***************
*** 174,179 **** WalSenderMain(void)
--- 174,181 ----
SpinLockRelease(&walsnd->mutex);
}
+ SyncRepInitConfig();
+
/* Main loop of walsender */
return WalSndLoop();
}
***************
*** 584,589 **** ProcessStandbyReplyMessage(void)
--- 586,593 ----
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+
+ SyncRepReleaseWaiters();
}
/*
***************
*** 700,705 **** WalSndLoop(void)
--- 704,710 ----
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+ SyncRepInitConfig();
}
/*
***************
*** 771,777 **** WalSndLoop(void)
--- 776,787 ----
* that point might wait for some time.
*/
if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
+ {
+ ereport(DEBUG1,
+ (errmsg("standby \"%s\" has now caught up with primary",
+ application_name)));
WalSndSetState(WALSNDSTATE_STREAMING);
+ }
ProcessRepliesIfAny();
}
***************
*** 1304,1310 **** WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
! #define PG_STAT_GET_WAL_SENDERS_COLS 6
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
--- 1314,1320 ----
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
! #define PG_STAT_GET_WAL_SENDERS_COLS 7
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
***************
*** 1346,1351 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1356,1362 ----
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ int sync_priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
***************
*** 1361,1366 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1372,1381 ----
apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
+ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ sync_priority = walsnd->sync_standby_priority;
+ LWLockRelease(SyncRepLock);
+
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
***************
*** 1370,1380 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* Only superusers can see details. Other users only get
* the pid value to know it's a walsender, but no details.
*/
! nulls[1] = true;
! nulls[2] = true;
! nulls[3] = true;
! nulls[4] = true;
! nulls[5] = true;
}
else
{
--- 1385,1391 ----
* Only superusers can see details. Other users only get
* the pid value to know it's a walsender, but no details.
*/
! MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
}
else
{
***************
*** 1401,1406 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1412,1419 ----
snprintf(location, sizeof(location), "%X/%X",
apply.xlogid, apply.xrecoff);
values[5] = CStringGetTextDatum(location);
+
+ values[6] = Int32GetDatum(sync_priority);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 39,44 ****
--- 39,45 ----
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+ #include "replication/syncrep.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/pmsignal.h"
***************
*** 196,201 **** InitProcGlobal(void)
--- 197,203 ----
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
ProcGlobal->freeProcs = &procs[i];
+ InitSharedLatch(&procs[i].waitLatch);
}
/*
***************
*** 214,219 **** InitProcGlobal(void)
--- 216,222 ----
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = &procs[i];
+ InitSharedLatch(&procs[i].waitLatch);
}
/*
***************
*** 224,229 **** InitProcGlobal(void)
--- 227,233 ----
{
AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */
PGSemaphoreCreate(&(AuxiliaryProcs[i].sem));
+ InitSharedLatch(&procs[i].waitLatch);
}
/* Create ProcStructLock spinlock, too */
***************
*** 326,331 **** InitProcess(void)
--- 330,341 ----
SHMQueueInit(&(MyProc->myProcLocks[i]));
MyProc->recoveryConflictPending = false;
+ /* Initialise the waitLSN for sync rep */
+ MyProc->waitLSN.xlogid = 0;
+ MyProc->waitLSN.xrecoff = 0;
+
+ OwnLatch((Latch *) &MyProc->waitLatch);
+
/*
* We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly
***************
*** 365,370 **** InitProcessPhase2(void)
--- 375,381 ----
/*
* Arrange to clean that up at backend exit.
*/
+ on_shmem_exit(SyncRepCleanupAtProcExit, 0);
on_shmem_exit(RemoveProcFromArray, 0);
}
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 2861,2866 **** ProcessInterrupts(void)
--- 2861,2894 ----
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command")));
+ else if (WaitingForSyncRep)
+ {
+ /*
+ * This must NOT be a FATAL message. We want the state of the
+ * transaction being aborted to be indeterminate to ensure that
+ * the transaction completion guarantee is never broken.
+ */
+ ereport(WARNING,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection because fast shutdown is requested"),
+ errdetail("This connection requested synchronous replication at commit"
+ " yet confirmation of replication has not been received."
+ " The transaction has committed locally and might be committed"
+ " on recently disconnected standby servers also.")));
+
+ /*
+ * We DO NOT want to run proc_exit() callbacks -- we're here because
+ * we are shutting down and don't want any code to stall or
+ * prevent that.
+ */
+ on_exit_reset();
+
+ /*
+ * Note we do exit(0) not exit(>0). This is to avoid forcing
+ * postmaster into a system reset cycle.
+ */
+ exit(0);
+ }
else if (RecoveryConflictPending && RecoveryConflictRetryable)
{
pgstat_report_recovery_conflict(RecoveryConflictReason);
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 55,60 ****
--- 55,61 ----
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
+ #include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
***************
*** 754,759 **** static struct config_bool ConfigureNamesBool[] =
--- 755,768 ----
true, NULL, NULL
},
{
+ {"synchronous_replication", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Requests synchronous replication."),
+ NULL
+ },
+ &sync_rep_mode,
+ false, NULL, NULL
+ },
+ {
{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Continues processing past damaged page headers."),
gettext_noop("Detection of a damaged page header normally causes PostgreSQL to "
***************
*** 2161,2166 **** static struct config_int ConfigureNamesInt[] =
--- 2170,2185 ----
},
{
+ {"sync_replication_timeout", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Sets the maximum wait time for a response from synchronous replication."),
+ gettext_noop("A value of 0 turns off the timeout."),
+ GUC_UNIT_S
+ },
+ &sync_rep_timeout,
+ 120, 0, INT_MAX, NULL, NULL
+ },
+
+ {
{"track_activity_query_size", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size reserved for pg_stat_activity.current_query, in bytes."),
NULL,
***************
*** 2717,2722 **** static struct config_string ConfigureNamesString[] =
--- 2736,2751 ----
},
{
+ {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION,
+ gettext_noop("List of potential standby names to synchronise with."),
+ NULL,
+ GUC_LIST_INPUT
+ },
+ &SyncRepStandbyNames,
+ "*", NULL, NULL
+ },
+
+ {
{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
gettext_noop("Sets default text search configuration."),
NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 184,190 ****
#archive_timeout = 0 # force a logfile segment switch after this
# number of seconds; 0 disables
! # - Streaming Replication -
#max_wal_senders = 0 # max number of walsender processes
# (change requires restart)
--- 184,200 ----
#archive_timeout = 0 # force a logfile segment switch after this
# number of seconds; 0 disables
! # - Replication - User Settings
!
! #synchronous_replication = off # does commit wait for reply from standby
! #replication_timeout_client = 120 # 0 means wait forever
!
! # - Streaming Replication - Server Settings
!
! #synchronous_standby_names = '*' # standby servers that provide sync rep
! # comma-separated list of application_name from standby(s);
! # '*' = all (default)
!
#max_wal_senders = 0 # max number of walsender processes
# (change requires restart)
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 3078,3084 **** DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
--- 3078,3084 ----
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23}" "{o,o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location,sync_priority}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
*** a/src/include/miscadmin.h
--- b/src/include/miscadmin.h
***************
*** 78,83 **** extern PGDLLIMPORT volatile uint32 CritSectionCount;
--- 78,86 ----
/* in tcop/postgres.c */
extern void ProcessInterrupts(void);
+ /* in replication/syncrep.c */
+ extern bool WaitingForSyncRep;
+
#ifndef WIN32
#define CHECK_FOR_INTERRUPTS() \
*** /dev/null
--- b/src/include/replication/syncrep.h
***************
*** 0 ****
--- 1,53 ----
+ /*-------------------------------------------------------------------------
+ *
+ * syncrep.h
+ * Exports from replication/syncrep.c.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef _SYNCREP_H
+ #define _SYNCREP_H
+
+ #include "access/xlog.h"
+ #include "storage/proc.h"
+ #include "storage/shmem.h"
+ #include "storage/spin.h"
+
+ #define SyncRepRequested() (sync_rep_mode)
+
+ /*
+ * Each synchronous rep queue lives in the WAL sender shmem area.
+ */
+ typedef struct SyncRepQueue
+ {
+ /*
+ * Current location of the head of the queue. All waiters should have
+ * a waitLSN that follows this value, or they are currently being woken
+ * to remove themselves from the queue.
+ */
+ XLogRecPtr lsn;
+
+ PGPROC *head;
+ PGPROC *tail;
+ } SyncRepQueue;
+
+ /* user-settable parameters for synchronous replication */
+ extern bool sync_rep_mode;
+ extern int sync_rep_timeout;
+ extern char *SyncRepStandbyNames;
+
+ /* called by user backend */
+ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
+
+ /* callback at backend exit */
+ extern void SyncRepCleanupAtProcExit(int code, Datum arg);
+
+ /* called by wal sender */
+ extern void SyncRepInitConfig(void);
+ extern void SyncRepReleaseWaiters(void);
+
+ #endif /* _SYNCREP_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 15,20 ****
--- 15,21 ----
#include "access/xlog.h"
#include "nodes/nodes.h"
#include "storage/latch.h"
+ #include "replication/syncrep.h"
#include "storage/spin.h"
***************
*** 52,62 **** typedef struct WalSnd
--- 53,77 ----
* to do.
*/
Latch latch;
+
+ /*
+ * The priority order of the standby managed by this WALSender, as
+ * listed in synchronous_standby_names, or 0 if not-listed.
+ * Protected by SyncRepLock.
+ */
+ int sync_standby_priority;
} WalSnd;
+ extern WalSnd *MyWalSnd;
+
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
+ /*
+ * Synchronous replication queue, protected by SyncRepLock.
+ */
+ SyncRepQueue sync_rep_queue; /* Proc queue, sorted by LSN */
+
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 78,83 **** typedef enum LWLockId
--- 78,84 ----
SerializableFinishedListLock,
SerializablePredicateLockListLock,
OldSerXidLock,
+ SyncRepLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 14,19 ****
--- 14,21 ----
#ifndef _PROC_H_
#define _PROC_H_
+ #include "access/xlog.h"
+ #include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
#include "utils/timestamp.h"
***************
*** 115,120 **** struct PGPROC
--- 117,126 ----
LOCKMASK heldLocks; /* bitmask for lock types already held on this
* lock object by this backend */
+ /* Info to allow us to wait for synchronous replication, if needed. */
+ Latch waitLatch;
+ XLogRecPtr waitLSN; /* waiting for this LSN or higher */
+
/*
* All PROCLOCK objects for locks held or awaited by this backend are
* linked into one of these lists, according to the partition number of
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
***************
*** 1298,1304 **** SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
! pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));
--- 1298,1304 ----
pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
! pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));