[PATCH 13/16] Introduction of pair of logical walreceiver/sender

Поиск
Список
Период
Сортировка
От Andres Freund
Тема [PATCH 13/16] Introduction of pair of logical walreceiver/sender
Дата
Msg-id 1339586927-13156-13-git-send-email-andres@2ndquadrant.com
обсуждение исходный текст
Ответ на [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Ответы Re: [PATCH 13/16] Introduction of pair of logical walreceiver/sender  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Список pgsql-hackers
From: Andres Freund <andres@anarazel.de>

A logical WALReceiver is started directly by Postmaster when we enter PM_RUN
state and the new parameter multimaster_conninfo is set. For now only one of
those is started, but the code doesn't rely on that. In future multiple ones
should be allowed.

To transfer that data a new command, START_LOGICAL_REPLICATION is introduced in
the walsender reusing most of the infrastructure for START_REPLICATION. The
former uses the same on-the-wire format as the latter.

To make initialization possibly IDENTIFY_SYSTEM returns two new columns node_id
returning the multimaster_node_id and last_checkpoint returning the RedoRecPtr.

The walreceiver writes that data into the previously introduce pg_lcr/$node_id
directory.

Future Directions/TODO:
- pass node_ids were interested in to START_LOGICAL_REPLICATION to allow complex topologies
- allow to pass filters to reduce the transfer volume
- compress the transferred data by actually removing uninteresting records instead of replacing them by NOOP records.
Thisadds some complexities because we still need to map the received lsn to the requested lsn so we know where to
restarttransferring data and such.
 
- check that wal on the sending side was generated with WAL_LEVEL_LOGICAL
---src/backend/postmaster/postmaster.c                |   10 +-.../libpqwalreceiver/libpqwalreceiver.c            |
104++++-src/backend/replication/repl_gram.y                |   19 +-src/backend/replication/repl_scanner.l
|   1 +src/backend/replication/walreceiver.c              |  165 +++++++-src/backend/replication/walreceiverfuncs.c
   |    1 +src/backend/replication/walsender.c                |  422 +++++++++++++++-----src/backend/utils/misc/guc.c
                   |    9 +src/backend/utils/misc/postgresql.conf.sample      |    1 +src/include/nodes/nodes.h
                |    1 +src/include/nodes/replnodes.h                      |   10 +src/include/replication/logical.h
             |    4 +src/include/replication/walreceiver.h              |    9 +-13 files changed, 624 insertions(+),
132deletions(-)
 

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 71cfd6d..13e9592 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1449,6 +1449,11 @@ ServerLoop(void)                kill(AutoVacPID, SIGUSR2);        }
+        /* Restart walreceiver process in certain states only. */
+        if (WalReceiverPID == 0 && pmState == PM_RUN &&
+            LogicalWalReceiverActive())
+            WalReceiverPID = StartWalReceiver();
+        /* Check all the workers requested are running. */        if (pmState == PM_RUN)
StartBackgroundWorkers();
@@ -2169,7 +2174,8 @@ pmdie(SIGNAL_ARGS)                /* and the walwriter too */                if (WalWriterPID !=
0)                   signal_child(WalWriterPID, SIGTERM);
 
-
+                if (WalReceiverPID != 0)
+                    signal_child(WalReceiverPID, SIGTERM);                /*                 * If we're in recovery,
wecan't kill the startup process                 * right away, because at present doing so does not release
 
@@ -2421,6 +2427,8 @@ reaper(SIGNAL_ARGS)                PgArchPID = pgarch_start();            if (PgStatPID == 0)
          PgStatPID = pgstat_start();
 
+            if (WalReceiverPID == 0 && LogicalWalReceiverActive())
+                WalReceiverPID = StartWalReceiver();            StartBackgroundWorkers();            /* at this point
weare really open for business */
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 979b66b..0ea3fce 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -46,7 +46,8 @@ static PGconn *streamConn = NULL;static char *recvBuf = NULL;/* Prototypes for interface functions
*/
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery);
+static bool libpqrcv_start(char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery);static bool
libpqrcv_receive(inttimeout, unsigned char *type,                 char **buffer, int *len);static void
libpqrcv_send(constchar *buffer, int nbytes);
 
@@ -63,10 +64,12 @@ void_PG_init(void){    /* Tell walreceiver how to reach us */
-    if (walrcv_connect != NULL || walrcv_receive != NULL ||
-        walrcv_send != NULL || walrcv_disconnect != NULL)
+    if (walrcv_connect != NULL || walrcv_start != NULL ||
+        walrcv_receive != NULL || walrcv_send != NULL ||
+        walrcv_disconnect != NULL)        elog(ERROR, "libpqwalreceiver already loaded");    walrcv_connect =
libpqrcv_connect;
+    walrcv_start = libpqrcv_start;    walrcv_receive = libpqrcv_receive;    walrcv_send = libpqrcv_send;
walrcv_disconnect= libpqrcv_disconnect;
 
@@ -76,7 +79,7 @@ _PG_init(void) * Establish the connection to the primary server for XLOG streaming */static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+libpqrcv_connect(char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery){    char
conninfo_repl[MAXCONNINFO+ 75];    char       *primary_sysid;
 
@@ -84,7 +87,8 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)    TimeLineID    primary_tli;    TimeLineID
standby_tli;    PGresult   *res;
 
-    char        cmd[64];
+
+    elog(LOG, "wal receiver connecting");    /*     * Connect using deliberately undocumented parameter: replication.
The
@@ -96,10 +100,16 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)             conninfo);    streamConn =
PQconnectdb(conninfo_repl);
-    if (PQstatus(streamConn) != CONNECTION_OK)
+    if (PQstatus(streamConn) != CONNECTION_OK){
+        /*
+         * FIXME: its very annoying for development if the whole buffer is
+         * immediately filled. We need a better solution.
+         */
+        pg_usleep(1000000);        ereport(ERROR,                (errmsg("could not connect to the primary server:
%s",                       PQerrorMessage(streamConn))));
 
+    }    /*     * Get the system identifier and timeline ID as a DataRow message from the
@@ -114,7 +124,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)                        "the primary server:
%s",                       PQerrorMessage(streamConn))));    }
 
-    if (PQnfields(res) != 3 || PQntuples(res) != 1)
+    if (PQnfields(res) != 5 || PQntuples(res) != 1)    {        int            ntuples = PQntuples(res);        int
       nfields = PQnfields(res);
 
@@ -122,14 +132,40 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)        PQclear(res);
ereport(ERROR,               (errmsg("invalid response from primary server"),
 
-                 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+                 errdetail("Expected 1 tuple with 5 fields, got %d tuples with %d fields.",
ntuples,nfields)));    }    primary_sysid = PQgetvalue(res, 0, 0);
 
+    primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+    /* FIXME: this should be already implemented nicely somewhere? */
+    if(sscanf(PQgetvalue(res, 0, 2),
+              "%X/%X", &where_at->xlogid, &where_at->xrecoff) != 2){
+        elog(FATAL, "couldn't parse the xlog address from the other side: %s",
+             PQgetvalue(res, 0, 2));
+    }
+
+    elog(LOG, "other end is currently at %X/%X",
+         where_at->xlogid, where_at->xrecoff);
+
+    receiving_from_node_id = pg_atoi(PQgetvalue(res, 0, 3), 4, 0);
+
+    /* FIXME: this should be already implemented nicely somewhere? */
+    if(sscanf(PQgetvalue(res, 0, 4),
+              "%X/%X", &redo->xlogid, &redo->xrecoff) != 2){
+        elog(FATAL, "couldn't parse the xlog address from the other side: %s",
+             PQgetvalue(res, 0, 4));
+    }
+
+    elog(LOG, "other end's redo is currently at %X/%X",
+         redo->xlogid, redo->xrecoff);
+
+    /*     * Confirm that the system identifier of the primary is the same as ours.
+     *
+     * FIXME: do we wan't that restriction for mm?     */    snprintf(standby_sysid, sizeof(standby_sysid),
UINT64_FORMAT,            GetSystemIdentifier());
 
@@ -142,21 +178,49 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)                           primary_sysid,
standby_sysid)));   }
 
-    /*
-     * Confirm that the current timeline of the primary is the same as the
-     * recovery target timeline.
-     */
-    standby_tli = GetRecoveryTargetTLI();    PQclear(res);
-    if (primary_tli != standby_tli)
-        ereport(ERROR,
-                (errmsg("timeline %u of the primary does not match recovery target timeline %u",
-                        primary_tli, standby_tli)));
-    ThisTimeLineID = primary_tli;    /* Start streaming from the point requested by startup process */
-    snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
-             startpoint.xlogid, startpoint.xrecoff);
+    if (startedDuringRecovery)
+    {
+        /*
+         * Confirm that the current timeline of the primary is the same as the
+         * recovery target timeline.
+         */
+        standby_tli = GetRecoveryTargetTLI();
+        if (primary_tli != standby_tli)
+            ereport(ERROR,
+                    (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+                            primary_tli, standby_tli)));
+        ThisTimeLineID = primary_tli;
+    }
+
+    return true;
+}
+
+/*
+ * start streaming data
+ */
+static bool
+libpqrcv_start(char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery)
+{
+    PGresult   *res;
+    char        cmd[64];
+
+    if(startedDuringRecovery)
+    {
+        snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+             startpoint->xlogid, startpoint->xrecoff);
+    }
+    else
+    {
+        /* ignore the timeline */
+        elog(LOG, "receiving_from_node_id: %u at %X/%X", receiving_from_node_id,
+             startpoint->xlogid, startpoint->xrecoff);
+        snprintf(cmd, sizeof(cmd), "START_LOGICAL_REPLICATION %X/%X",
+             startpoint->xlogid, startpoint->xrecoff);
+    }
+    res = libpqrcv_PQexec(cmd);    if (PQresultStatus(res) != PGRES_COPY_BOTH)    {
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index b6cfdac..b49ae6f 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,9 +76,10 @@ Node *replication_parse_result;%token K_NOWAIT%token K_WAL%token K_START_REPLICATION
+%token K_START_LOGICAL_REPLICATION%type <node>    command
-%type <node>    base_backup start_replication identify_system
+%type <node>    base_backup start_replication start_logical_replication identify_system%type <list>
base_backup_opt_list%type<defelt>    base_backup_opt%%
 
@@ -97,6 +98,7 @@ command:            identify_system            | base_backup            | start_replication
+            | start_logical_replication            ;/*
@@ -166,6 +168,21 @@ start_replication:                    $$ = (Node *) cmd;                }            ;
+
+/*
+ * START_LOGICAL_REPLICATION %X/%X
+ */
+start_logical_replication:
+            K_START_LOGICAL_REPLICATION RECPTR
+                {
+                    StartLogicalReplicationCmd *cmd;
+
+                    cmd = makeNode(StartLogicalReplicationCmd);
+                    cmd->startpoint = $2;
+
+                    $$ = (Node *) cmd;
+                }
+            ;%%#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 9d4edcf..f8be982 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -64,6 +64,7 @@ NOWAIT            { return K_NOWAIT; }PROGRESS            { return K_PROGRESS; }WAL            {
returnK_WAL; }START_REPLICATION    { return K_START_REPLICATION; }
 
+START_LOGICAL_REPLICATION    { return K_START_LOGICAL_REPLICATION; }","                { return ','; }";"
 { return ';'; }
 
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e97196b..73a3021 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -44,6 +44,7 @@#include "replication/walprotocol.h"#include "replication/walreceiver.h"#include
"replication/walsender.h"
+#include "replication/logical.h"#include "storage/ipc.h"#include "storage/pmsignal.h"#include "storage/procarray.h"
@@ -58,9 +59,12 @@ bool        am_walreceiver;/* GUC variable */int            wal_receiver_status_interval;bool
hot_standby_feedback;
+char        *mm_conninfo = 0;
+RepNodeId receiving_from_node_id = InvalidMultimasterNodeId;/* libpqreceiver hooks to these when loaded
*/walrcv_connect_typewalrcv_connect = NULL;
 
+walrcv_start_type walrcv_start = NULL;walrcv_receive_type walrcv_receive = NULL;walrcv_send_type walrcv_send =
NULL;walrcv_disconnect_typewalrcv_disconnect = NULL;
 
@@ -93,9 +97,13 @@ static struct    XLogRecPtr    Flush;            /* last byte + 1 flushed in the standby */}
LogstreamResult;
+XLogRecPtr curRecv;
+static StandbyReplyMessage reply_message;static StandbyHSFeedbackMessage feedback_message;
+static bool startedDuringRecovery;  /* are we going to receive WAL data */
+/* * About SIGTERM handling: *
@@ -122,6 +130,9 @@ static void WalRcvDie(int code, Datum arg);static void XLogWalRcvProcessMsg(unsigned char type,
char*buf, Size len);static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);static void
XLogWalRcvFlush(booldying);
 
+
+static void LogicalWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+static void XLogWalRcvSendReply(void);static void XLogWalRcvSendHSFeedback(void);static void
ProcessWalSndrMessage(XLogRecPtrwalEnd, TimestampTz sendTime);
 
@@ -170,13 +181,17 @@ voidWalReceiverMain(void){    char        conninfo[MAXCONNINFO];
-    XLogRecPtr    startpoint;
+    XLogRecPtr    startpoint = {0, 0};
+    XLogRecPtr    other_end_at;
+    XLogRecPtr    other_end_redo;    /* use volatile pointer to prevent code rearrangement */    volatile WalRcvData
*walrcv= WalRcv;    am_walreceiver = true;
 
+    elog(LOG, "wal receiver starting");
+    /*     * WalRcv should be set up already (if we are a backend, we inherit this     * by fork() or EXEC_BACKEND
mechanismfrom the postmaster).
 
@@ -200,8 +215,11 @@ WalReceiverMain(void)            /* fall through */        case WALRCV_STOPPED:
-            SpinLockRelease(&walrcv->mutex);
-            proc_exit(1);
+            if (startedDuringRecovery)
+            {
+                SpinLockRelease(&walrcv->mutex);
+                proc_exit(1);
+            }            break;        case WALRCV_STARTING:
@@ -212,13 +230,35 @@ WalReceiverMain(void)            /* Shouldn't happen */            elog(PANIC, "walreceiver still
runningaccording to shared memory state");    }
 
-    /* Advertise our PID so that the startup process can kill us */
+    /* Advertise our PID so that we can be killed */    walrcv->pid = MyProcPid;    walrcv->walRcvState =
WALRCV_RUNNING;
-    /* Fetch information required to start streaming */
-    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
-    startpoint = walrcv->receiveStart;
+    /*
+     * Fetch information required to start streaming.
+     *
+     * During recovery the WALReceiver is started from the Startup process,
+     * by sending a postmaster signal. In normal running the Postmaster
+     * starts the WALReceiver directly. In that case the walrcv shmem struct
+     * is simply zeroed, so walrcv->startedDuringRecovery will show as false.
+     *
+     * The connection info required to access the upstream master comes from
+     * the multimaster_conninfo parameter, stored in the mm_conninfo variable.
+     *
+     * XXX The starting point for logical replication is not yet determined.
+     */
+    startedDuringRecovery = walrcv->startedDuringRecovery;
+    if (startedDuringRecovery)
+    {
+        strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+        startpoint = walrcv->receiveStart;
+    }
+    else
+    {
+        elog(LOG, "logical replication starting: %s", mm_conninfo);
+        strlcpy(conninfo, (char *) mm_conninfo, MAXCONNINFO);
+        /* The startpoint for logical replay can only be determined after connecting */
+    }    /* Initialise to a sanish value */    walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime =
GetCurrentTimestamp();
@@ -262,8 +302,9 @@ WalReceiverMain(void)    /* Load the libpq-specific functions */    load_file("libpqwalreceiver",
false);
-    if (walrcv_connect == NULL || walrcv_receive == NULL ||
-        walrcv_send == NULL || walrcv_disconnect == NULL)
+    if (walrcv_connect == NULL || walrcv_start == NULL ||
+        walrcv_receive == NULL || walrcv_send == NULL ||
+        walrcv_disconnect == NULL)        elog(ERROR, "libpqwalreceiver didn't initialize correctly");    /*
@@ -277,7 +318,58 @@ WalReceiverMain(void)    /* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
-    walrcv_connect(conninfo, startpoint);
+    walrcv_connect(conninfo, &other_end_redo, &other_end_at, startedDuringRecovery);
+
+    if(LogicalWalReceiverActive()){
+        char buf[MAXPGPATH];
+
+        if(RecoveryInProgress()){
+            elog(FATAL, "cannot have the logical receiver running while recovery is ongoing");
+        }
+
+        if(receiving_from_node_id == InvalidMultimasterNodeId)
+            elog(FATAL, "didn't setup/derive other node id");
+
+        Assert(WalRcv);
+
+        startpoint = WalRcv->mm_receiveState[receiving_from_node_id];
+
+        /*
+         * in this case we connect to some master we haven't yet received data
+         * from yet.
+         * FIXME: This means we would need to initialize the local cluster!
+         */
+        if(XLByteEQ(startpoint, zeroRecPtr)){
+            startpoint = other_end_redo;
+
+            /* we need to scroll back to the begin of the segment */
+            startpoint.xrecoff -= startpoint.xrecoff % XLogSegSize;
+
+            WalRcv->mm_receiveState[receiving_from_node_id] = startpoint;
+
+            WalRcv->mm_applyState[receiving_from_node_id] = other_end_redo;
+
+            /* FIXME: this should be an ereport */
+            elog(LOG, "initializing recovery from logical node %d to %X/%X, transfer from %X/%X",
+                 receiving_from_node_id,
+                 other_end_at.xlogid, other_end_at.xrecoff,
+                 startpoint.xlogid, startpoint.xrecoff);
+        }
+        else if(XLByteLT(other_end_at, startpoint)){
+            elog(FATAL, "something went wrong, the other side has a too small xlogid/xlrecoff. Other: %X/%X, self:
%X/%X",
+                 other_end_at.xlogid, other_end_at.xrecoff,
+                 startpoint.xlogid, startpoint.xrecoff);
+        }
+
+        /*
+         * the set of foreign nodes can increase all the time, so we just make
+         * sure the particular one we need exists.
+         */
+        snprintf(buf, MAXPGPATH-1, "%s/%u", LCRDIR, receiving_from_node_id);
+        pg_mkdir_p(buf, S_IRWXU);
+    }
+
+    walrcv_start(conninfo, &startpoint, startedDuringRecovery);    DisableWalRcvImmediateExit();    /* Loop until
end-of-streamingor error */
 
@@ -298,7 +390,7 @@ WalReceiverMain(void)         * Exit walreceiver if we're not in recovery. This should not happen,
      * but cross-check the status here.         */
 
-        if (!RecoveryInProgress())
+        if (!RecoveryInProgress() && !LogicalWalReceiverActive())            ereport(FATAL,
(errmsg("cannotcontinue WAL streaming, recovery has already ended")));
 
@@ -443,7 +535,17 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)                buf +=
sizeof(WalDataMessageHeader);               len -= sizeof(WalDataMessageHeader);
 
-                XLogWalRcvWrite(buf, len, msghdr.dataStart);
+
+                /*
+                 * The WALReceiver connects either during recovery or during
+                 * normal running. During recovery then pure WAL data is
+                 * received, whereas during normal running we send Logical
+                 * Change Records (LCRs) which are stored differently.
+                 */
+                if (LogicalWalReceiverActive())
+                    XLogWalRcvWrite(buf, len, msghdr.dataStart);
+                else
+                    LogicalWalRcvWrite(buf, len, msghdr.dataStart);                break;            }        case
'k':               /* Keepalive */
 
@@ -477,6 +579,10 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)    int            startoff;    int
     byteswritten;
 
+#ifdef VERBOSE_DEBUG
+      elog(LOG, "received data len %lu, at %X/%X",
+         nbytes, recptr.xlogid, recptr.xrecoff);
+#endif    while (nbytes > 0)    {        int            segbytes;
@@ -509,7 +615,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)            /* Create/use new log file */
          XLByteToSeg(recptr, recvId, recvSeg);            use_existent = true;
 
-            recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true);
+            recvFile = XLogFileInit(receiving_from_node_id, recvId, recvSeg, &use_existent, true);            recvOff
=0;        }
 
@@ -585,6 +691,27 @@ XLogWalRcvFlush(bool dying)        {            walrcv->latestChunkStart = walrcv->receivedUpto;
        walrcv->receivedUpto = LogstreamResult.Flush;
 
+
+            /* FIXME */
+            if(LogicalWalReceiverActive()){
+                if(XLByteLE(curRecv, LogstreamResult.Write)){
+                    WalRcv->mm_receiveState[receiving_from_node_id] = curRecv;
+
+                    if(WalRcv->mm_receiveLatch[receiving_from_node_id])
+                        SetLatch(WalRcv->mm_receiveLatch[receiving_from_node_id]);
+#if 0
+                    elog(LOG, "confirming flush to %X/%X",
+                         curRecv.xlogid, curRecv.xrecoff);
+#endif
+                }
+                else{
+#if 0
+                    elog(LOG, "not conf flush to %X/%X, wrote to %X/%X",
+                         curRecv.xlogid, curRecv.xrecoff,
+                         LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+#endif
+                }
+            }        }        SpinLockRelease(&walrcv->mutex);
@@ -614,6 +741,15 @@ XLogWalRcvFlush(bool dying)}/*
+ * Handle LCR data.
+ */
+static void
+LogicalWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+{
+    elog(LOG, "received msg of length %u", (uint) nbytes);
+}
+
+/* * Send reply message to primary, indicating our current XLOG positions and * the current time. */
@@ -750,6 +886,9 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)    walrcv->lastMsgReceiptTime =
lastMsgReceiptTime;   SpinLockRelease(&walrcv->mutex);
 
+    /* we need to store that in shmem */
+    curRecv = walEnd;
+    if (log_min_messages <= DEBUG2)        elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms
transferlatency %d ms",             timestamptz_to_str(sendTime),
 
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index cb49282..aa07746 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -207,6 +207,7 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)        walrcv->conninfo[0] = '\0';
walrcv->walRcvState= WALRCV_STARTING;    walrcv->startTime = now;
 
+    walrcv->startedDuringRecovery = true;    /*     * If this is the first startup of walreceiver, we initialize
receivedUpto
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8cd3a00..d2e1c76 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -37,9 +37,13 @@#include <signal.h>#include <unistd.h>
+#include "access/xlogreader.h"#include "access/transam.h"
+#include "access/xact.h"#include "access/xlog_internal.h"#include "catalog/pg_type.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"#include "funcapi.h"#include "libpq/libpq.h"#include "libpq/pqformat.h"
@@ -64,6 +68,7 @@#include "utils/ps_status.h"#include "utils/resowner.h"#include "utils/timestamp.h"
+#include "utils/syscache.h"/* Array of WalSnds in shared memory */
@@ -74,9 +79,12 @@ WalSnd       *MyWalSnd = NULL;/* Global state */bool        am_walsender = false;        /* Am I a
walsenderprocess ? */
 
+bool        am_cascading_walsender = false;        /* Am I cascading WAL to
    * another standby ? */
 
+bool        am_doing_logical = false; /* Am I sending logical changes instead of physical ones */
+/* User-settable parameters for walsender */int            max_wal_senders = 0;    /* the maximum number of concurrent
walsenders*/int            replication_timeout = 60 * 1000;    /* maximum time to send one
 
@@ -112,6 +120,12 @@ static TimestampTz last_reply_timestamp; */static bool wroteNewXlogData = false;
+/*
+ * state for continuous reading of the local servers wal for sending logical
+ * wal
+ */
+static XLogReaderState* xlogreader_state = 0;
+/* Flags set by signal handlers for later service in main loop */static volatile sig_atomic_t got_SIGHUP =
false;volatilesig_atomic_t walsender_shutdown_requested = false;
 
@@ -131,8 +145,19 @@ static void InitWalSnd(void);static void WalSndHandshake(void);static void WalSndKill(int code,
Datumarg);static void XLogSend(char *msgbuf, bool *caughtup);
 
+static Size XLogSendPhysical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+                             XLogRecPtr endptr);
+static Size XLogSendLogical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+                            XLogRecPtr endptr);static void IdentifySystem(void);static void
StartReplication(StartReplicationCmd*cmd);
 
+static void StartLogicalReplication(StartLogicalReplicationCmd *cmd);
+
+static bool RecordRelevantForLogicalReplication(XLogReaderState* state, XLogRecord* r);
+static void ProcessRecord(XLogReaderState* state, XLogRecordBuffer* buf);
+static void WriteoutData(XLogReaderState* state, char* data, Size len);
+static void XLogReadPage(XLogReaderState* state, char *buf, XLogRecPtr startptr);
+static void ProcessStandbyMessage(void);static void ProcessStandbyReplyMessage(void);static void
ProcessStandbyHSFeedbackMessage(void);
@@ -293,8 +318,10 @@ IdentifySystem(void)    char        sysid[32];    char        tli[11];    char
xpos[MAXFNAMELEN];
+    char        node_id[MAXFNAMELEN];//FIXME
+    char        redoptr_s[MAXFNAMELEN];    XLogRecPtr    logptr;
-
+    XLogRecPtr    redoptr = GetRedoRecPtr();    /*     * Reply with a result set with one row, three columns. First
colis     * system ID, second is timeline ID, and third is current xlog location.
 
@@ -309,9 +336,14 @@ IdentifySystem(void)    snprintf(xpos, sizeof(xpos), "%X/%X",             logptr.xlogid,
logptr.xrecoff);
+    snprintf(node_id, sizeof(node_id), "%i", guc_replication_origin_id);
+
+    snprintf(redoptr_s, sizeof(redoptr_s), "%X/%X",
+             redoptr.xlogid, redoptr.xrecoff);
+    /* Send a RowDescription message */    pq_beginmessage(&buf, 'T');
-    pq_sendint(&buf, 3, 2);        /* 3 fields */
+    pq_sendint(&buf, 5, 2);        /* 5 fields */    /* first field */    pq_sendstring(&buf, "systemid");    /* col
name*/
 
@@ -332,24 +364,47 @@ IdentifySystem(void)    pq_sendint(&buf, 0, 2);        /* format code */    /* third field */
-    pq_sendstring(&buf, "xlogpos");
-    pq_sendint(&buf, 0, 4);
-    pq_sendint(&buf, 0, 2);
-    pq_sendint(&buf, TEXTOID, 4);
-    pq_sendint(&buf, -1, 2);
-    pq_sendint(&buf, 0, 4);
-    pq_sendint(&buf, 0, 2);
+    pq_sendstring(&buf, "xlogpos");    /* col name */
+    pq_sendint(&buf, 0, 4);        /* table oid */
+    pq_sendint(&buf, 0, 2);        /* attnum */
+    pq_sendint(&buf, TEXTOID, 4);        /* type oid */
+    pq_sendint(&buf, -1, 2);        /* typlen */
+    pq_sendint(&buf, 0, 4);        /* typmod */
+    pq_sendint(&buf, 0, 2);        /* format code */
+
+    /* fourth field */
+    pq_sendstring(&buf, "node_id");    /* col name */
+    pq_sendint(&buf, 0, 4);        /* table oid */
+    pq_sendint(&buf, 0, 2);        /* attnum */
+    pq_sendint(&buf, INT4OID, 4);        /* type oid */
+    pq_sendint(&buf, 4, 2);        /* typlen */
+    pq_sendint(&buf, 0, 4);        /* typmod */
+    pq_sendint(&buf, 0, 2);        /* format code */
+
+    /* fifth field */
+    pq_sendstring(&buf, "last_checkpoint");    /* col name */
+    pq_sendint(&buf, 0, 4);        /* table oid */
+    pq_sendint(&buf, 0, 2);        /* attnum */
+    pq_sendint(&buf, TEXTOID, 4);        /* type oid */
+    pq_sendint(&buf, -1, 2);        /* typlen */
+    pq_sendint(&buf, 0, 4);        /* typmod */
+    pq_sendint(&buf, 0, 2);        /* format code */
+    pq_endmessage(&buf);    /* Send a DataRow message */    pq_beginmessage(&buf, 'D');
-    pq_sendint(&buf, 3, 2);        /* # of columns */
+    pq_sendint(&buf, 5, 2);        /* # of columns */    pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf,(char *) &sysid, strlen(sysid));    pq_sendint(&buf, strlen(tli), 4);    /* col2 len */
pq_sendbytes(&buf,(char *) tli, strlen(tli));    pq_sendint(&buf, strlen(xpos), 4);    /* col3 len */
pq_sendbytes(&buf,(char *) xpos, strlen(xpos));
 
+    pq_sendint(&buf, strlen(node_id), 4);    /* col4 len */
+    pq_sendbytes(&buf, (char *)node_id, strlen(node_id));
+    pq_sendint(&buf, strlen(redoptr_s), 4);    /* col5 len */
+    pq_sendbytes(&buf, (char *)redoptr_s, strlen(redoptr_s));    pq_endmessage(&buf);
@@ -432,6 +487,8 @@ StartReplication(StartReplicationCmd *cmd)    pq_endmessage(&buf);    pq_flush();
+    am_doing_logical = false;
+    /*     * Initialize position to the received one, then the xlog records begin to     * be shipped from that
position
@@ -440,6 +497,56 @@ StartReplication(StartReplicationCmd *cmd)}/*
+ * START_LOGICAL_REPLICATION
+ */
+static void
+StartLogicalReplication(StartLogicalReplicationCmd *cmd)
+{
+    StringInfoData buf;
+
+    /* XXX: see above */
+    MarkPostmasterChildWalSender();
+    SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+    /* XXX: see above*/
+    if (am_cascading_walsender && !RecoveryInProgress())
+    {
+        ereport(LOG,
+                (errmsg("terminating walsender process to force cascaded standby "
+                        "to update timeline and reconnect")));
+        walsender_ready_to_stop = true;
+    }
+
+    /* XXX: see above*/
+    WalSndSetState(WALSNDSTATE_CATCHUP);
+
+    /* Send a CopyBothResponse message, and start streaming */
+    pq_beginmessage(&buf, 'W');
+    pq_sendbyte(&buf, 0);
+    pq_sendint(&buf, 0, 2);
+    pq_endmessage(&buf);
+    pq_flush();
+
+    am_doing_logical = true;
+
+    sentPtr = cmd->startpoint;
+
+    if(!xlogreader_state){
+        xlogreader_state = XLogReaderAllocate();
+        xlogreader_state->is_record_interesting = RecordRelevantForLogicalReplication;
+        xlogreader_state->finished_record = ProcessRecord;
+        xlogreader_state->writeout_data = WriteoutData;
+        xlogreader_state->read_page = XLogReadPage;
+
+        /* FIXME: it would probably better to handle this */
+        XLogReaderReset(xlogreader_state);
+    }
+
+    xlogreader_state->startptr = cmd->startpoint;
+    xlogreader_state->curptr = cmd->startpoint;
+}
+
+/* * Execute an incoming replication command. */static bool
@@ -483,6 +590,13 @@ HandleReplicationCommand(const char *cmd_string)            replication_started = true;
break;
+        case T_StartLogicalReplicationCmd:
+            StartLogicalReplication((StartLogicalReplicationCmd *) cmd_node);
+
+            /* break out of the loop */
+            replication_started = true;
+            break;
+        case T_BaseBackupCmd:            SendBaseBackup((BaseBackupCmd *) cmd_node);
@@ -1071,54 +1185,142 @@ retry:        p += readbytes;    }
-    /*
-     * After reading into the buffer, check that what we read was valid. We do
-     * this after reading, because even though the segment was present when we
-     * opened it, it might get recycled or removed while we read it. The
-     * read() succeeds in that case, but the data we tried to read might
-     * already have been overwritten with new WAL records.
-     */
-    XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
-    XLByteToSeg(startptr, log, seg);
-    if (log < lastRemovedLog ||
-        (log == lastRemovedLog && seg <= lastRemovedSeg))
-    {
-        char        filename[MAXFNAMELEN];
+    if(node_id == InvalidMultimasterNodeId){
+        /*
+         * After reading into the buffer, check that what we read was valid. We
+         * do this after reading, because even though the segment was present
+         * when we opened it, it might get recycled or removed while we read
+         * it. The read() succeeds in that case, but the data we tried to read
+         * might already have been overwritten with new WAL records.
+         */
+        XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
+        XLByteToSeg(startptr, log, seg);
+        if (log < lastRemovedLog ||
+            (log == lastRemovedLog && seg <= lastRemovedSeg))
+        {
+            char        filename[MAXFNAMELEN];
-        XLogFileName(filename, ThisTimeLineID, log, seg);
-        ereport(ERROR,
-                (errcode_for_file_access(),
-                 errmsg("requested WAL segment %s has already been removed",
-                        filename)));
+            XLogFileName(filename, ThisTimeLineID, log, seg);
+            ereport(ERROR,
+                    (errcode_for_file_access(),
+                     errmsg("requested WAL segment %s has already been removed",
+                            filename)));
+        }
+
+        /*
+         * During recovery, the currently-open WAL file might be replaced with
+         * the file of the same name retrieved from archive. So we always need
+         * to check what we read was valid after reading into the buffer. If
+         * it's invalid, we try to open and read the file again.
+         */
+        if (am_cascading_walsender)
+        {
+            /* use volatile pointer to prevent code rearrangement */
+            volatile WalSnd *walsnd = MyWalSnd;
+            bool        reload;
+
+            SpinLockAcquire(&walsnd->mutex);
+            reload = walsnd->needreload;
+            walsnd->needreload = false;
+            SpinLockRelease(&walsnd->mutex);
+
+            if (reload && sendFile >= 0)
+            {
+                close(sendFile);
+                sendFile = -1;
+
+                goto retry;
+            }
+        }    }
+    else{
+        /* FIXME: check shm? */
+    }
+}
+static bool
+RecordRelevantForLogicalReplication(XLogReaderState* state, XLogRecord* r){    /*
-     * During recovery, the currently-open WAL file might be replaced with the
-     * file of the same name retrieved from archive. So we always need to
-     * check what we read was valid after reading into the buffer. If it's
-     * invalid, we try to open and read the file again.
+     * For now we only send out data that are originating locally which implies
+     * a start topology between all nodes. Later we might support more
+     * complicated models. For that filtering positively by wanted IDs sounds
+     * like a better idea.     */
-    if (am_cascading_walsender)
-    {
-        /* use volatile pointer to prevent code rearrangement */
-        volatile WalSnd *walsnd = MyWalSnd;
-        bool        reload;
+    if(r->xl_origin_id != current_replication_origin_id)
+        return false;
+
+    switch(r->xl_rmid){
+        case RM_HEAP_ID:
+        case RM_HEAP2_ID:
+        case RM_XACT_ID:
+        case RM_XLOG_ID:
+            /* FIXME: filter additionally */
+            return true;
+        default:
+            return false;
+    }
+}
-        SpinLockAcquire(&walsnd->mutex);
-        reload = walsnd->needreload;
-        walsnd->needreload = false;
-        SpinLockRelease(&walsnd->mutex);
-        if (reload && sendFile >= 0)
-        {
-            close(sendFile);
-            sendFile = -1;
+static void
+XLogReadPage(XLogReaderState* state, char *buf, XLogRecPtr startptr)
+{
+    XLogPageHeader page_header;
-            goto retry;
-        }
+    Assert((startptr.xrecoff % XLOG_BLCKSZ) == 0);
+
+    /* elog(LOG, "Reading from %X/%X", startptr.xlogid, startptr.xrecoff); */
+
+    /* FIXME: more sensible implementation */
+    XLogRead(buf, InvalidMultimasterNodeId, startptr, XLOG_BLCKSZ);
+
+    page_header = (XLogPageHeader)buf;
+
+    if(page_header->xlp_magic != XLOG_PAGE_MAGIC){
+        elog(FATAL, "page header magic %x, should be %x", page_header->xlp_magic,
+             XLOG_PAGE_MAGIC);    }}
+static void
+ProcessRecord(XLogReaderState* state, XLogRecordBuffer* buf){
+    //FIXME: process table relfilenode reassignments here
+}
+
+static void WriteoutData(XLogReaderState* state, char* data, Size len){
+    //FIXME: state->nbytes shouldn't be used in here
+    /* we want to writeout zeros */
+    if(data == 0)
+        memset((char*)state->private_data + state->nbytes, 0, len);
+    else
+        memcpy((char*)state->private_data + state->nbytes, data, len);
+    state->nbytes += len;
+}
+
+static Size
+XLogSendLogical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+                XLogRecPtr endptr)
+{
+#ifdef BUGGY
+    if(!xlogreader_state->incomplete){
+        XLogReaderReset(xlogreader_state);
+        xlogreader_state->startptr = startptr;
+        xlogreader_state->curptr = startptr;
+    }
+#endif
+
+    xlogreader_state->endptr = endptr;
+    xlogreader_state->private_data = msgbuf;
+    xlogreader_state->nbytes = 0;//FIXME: this should go
+
+    XLogReaderRead(xlogreader_state);
+
+    //FIXME
+    sentPtr = xlogreader_state->curptr;
+
+    return xlogreader_state->nbytes;
+}
+/* * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, * but not yet sent to the client, and buffer
itin the libpq output
 
@@ -1136,10 +1338,11 @@ static voidXLogSend(char *msgbuf, bool *caughtup){    XLogRecPtr    SendRqstPtr;
-    XLogRecPtr    startptr;
-    XLogRecPtr    endptr;
-    Size        nbytes;
+    XLogRecPtr    startptr = sentPtr;
+    XLogRecPtr    endptr = sentPtr;
+    WalDataMessageHeader msghdr;
+    Size        nbytes = 0;    /*     * Attempt to send all data that's already been written out and fsync'd to
@@ -1155,44 +1358,17 @@ XLogSend(char *msgbuf, bool *caughtup)    if (XLByteLE(SendRqstPtr, sentPtr))    {
*caughtup= true;
 
+#if 0
+        elog(LOG, "caughtup %X/%X", SendRqstPtr.xlogid, SendRqstPtr.xrecoff);
+#endif        return;    }
-    /*
-     * Figure out how much to send in one message. If there's no more than
-     * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
-     * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
-     *
-     * The rounding is not only for performance reasons. Walreceiver relies on
-     * the fact that we never split a WAL record across two messages. Since a
-     * long WAL record is split at page boundary into continuation records,
-     * page boundary is always a safe cut-off point. We also assume that
-     * SendRqstPtr never points to the middle of a WAL record.
-     */
-    startptr = sentPtr;
-    if (startptr.xrecoff >= XLogFileSize)
-    {
-        /*
-         * crossing a logid boundary, skip the non-existent last log segment
-         * in previous logical log file.
-         */
-        startptr.xlogid += 1;
-        startptr.xrecoff = 0;
-    }
-
-    endptr = startptr;
+    /* FIXME: this is duplicated in physical transport */    XLByteAdvance(endptr, MAX_SEND_SIZE);
-    if (endptr.xlogid != startptr.xlogid)
-    {
-        /* Don't cross a logfile boundary within one message */
-        Assert(endptr.xlogid == startptr.xlogid + 1);
-        endptr.xlogid = startptr.xlogid;
-        endptr.xrecoff = XLogFileSize;
-    }    /* if we went beyond SendRqstPtr, back off */
-    if (XLByteLE(SendRqstPtr, endptr))
-    {
+    if (XLByteLE(SendRqstPtr, endptr)){        endptr = SendRqstPtr;        *caughtup = true;    }
@@ -1203,34 +1379,39 @@ XLogSend(char *msgbuf, bool *caughtup)        *caughtup = false;    }
-    nbytes = endptr.xrecoff - startptr.xrecoff;
-    Assert(nbytes <= MAX_SEND_SIZE);
-    /*     * OK to read and send the slice.     */    msgbuf[0] = 'w';
-    /*
-     * Read the log directly into the output buffer to avoid extra memcpy
-     * calls.
-     */
-    XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId,
-             startptr, nbytes);
+    nbytes += 1 + sizeof(WalDataMessageHeader);
+
+    if(am_doing_logical)
+        nbytes += XLogSendLogical(msgbuf + nbytes, caughtup, sentPtr, endptr);
+    else
+        nbytes += XLogSendPhysical(msgbuf + nbytes, caughtup, sentPtr, endptr);
+
+#if 0
+    elog(LOG, "setting sentPtr to %X/%X, SendRqstPtr %X/%X, endptr %X/%X",
+         sentPtr.xlogid, sentPtr.xrecoff,
+         SendRqstPtr.xlogid, SendRqstPtr.xrecoff,
+         endptr.xlogid, endptr.xrecoff);
+#endif    /*     * We fill the message header last so that the send timestamp is taken as     * late as possible.
*/   msghdr.dataStart = startptr;
 
-    msghdr.walEnd = SendRqstPtr;
+    msghdr.walEnd = sentPtr;    msghdr.sendTime = GetCurrentTimestamp();
+    memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
-    pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+    pq_putmessage_noblock('d', msgbuf,
+                          nbytes);
-    sentPtr = endptr;    /* Update shared memory status */    {
@@ -1251,8 +1432,59 @@ XLogSend(char *msgbuf, bool *caughtup)                 sentPtr.xlogid, sentPtr.xrecoff);
set_ps_display(activitymsg,false);    }
 
+}
+
+static Size
+XLogSendPhysical(char *msgbuf, bool *caughtup, XLogRecPtr startptr, XLogRecPtr endptr){
+    Size        nbytes;
+
+    /*
+     * Figure out how much to send in one message. If there's no more than
+     * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+     * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
+     *
+     * The rounding is not only for performance reasons. Walreceiver relies on
+     * the fact that we never split a WAL record across two messages. Since a
+     * long WAL record is split at page boundary into continuation records,
+     * page boundary is always a safe cut-off point. We also assume that
+     * endptr never points to the middle of a WAL record.
+     */
+    startptr = sentPtr;
+    if (startptr.xrecoff >= XLogFileSize)
+    {
+        /*
+         * crossing a logid boundary, skip the non-existent last log segment
+         * in previous logical log file.
+         *
+         * FIXME: Isn't getting to that point a bug in the XLByte arithmetic?
+         */
+        startptr.xlogid += 1;
+        startptr.xrecoff = 0;
+    }
+
+    endptr = startptr;
+    XLByteAdvance(endptr, MAX_SEND_SIZE);
+    if (endptr.xlogid != startptr.xlogid)
+    {
+        /* Don't cross a logfile boundary within one message */
+        Assert(endptr.xlogid == startptr.xlogid + 1);
+        endptr.xlogid = startptr.xlogid;
+        endptr.xrecoff = XLogFileSize;
+    }
+
+
+    nbytes = endptr.xrecoff - startptr.xrecoff;
+    Assert(nbytes <= MAX_SEND_SIZE);
+
+    /*
+     * Read the log directly into the output buffer to avoid extra memcpy
+     * calls.
+     */
+    XLogRead(msgbuf, InvalidMultimasterNodeId, startptr, nbytes);
+
+    sentPtr = endptr;
-    return;
+    return nbytes;}/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 46b0657..6a58f96 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3058,6 +3058,15 @@ static struct config_string ConfigureNamesString[] =    },    {
+        {"multimaster_conninfo", PGC_POSTMASTER, REPLICATION_MASTER,
+            gettext_noop("Connection string to upstream master."),
+            NULL
+        },
+        &mm_conninfo,
+        0, NULL, NULL, NULL
+    },
+
+    {        {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,            gettext_noop("Sets default
textsearch configuration."),            NULL
 
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 12f8a3f..240c13d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -243,6 +243,7 @@# - Multi Master Servers -
+#multimaster_conninfo = 'host=myupstreammaster'#multimaster_node_id = 0 #invalid node
id#------------------------------------------------------------------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 1e16088..78b2f5f 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -403,6 +403,7 @@ typedef enum NodeTag    T_IdentifySystemCmd,    T_BaseBackupCmd,    T_StartReplicationCmd,
+    T_StartLogicalReplicationCmd,    /*     * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 236a36d..fee111c 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -49,4 +49,14 @@ typedef struct StartReplicationCmd    XLogRecPtr    startpoint;} StartReplicationCmd;
+/* ----------------------
+ *        START_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct StartLogicalReplicationCmd
+{
+    NodeTag        type;
+    XLogRecPtr    startpoint;
+} StartLogicalReplicationCmd;
+#endif   /* REPLNODES_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 8f44fad..fc9e120 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -13,6 +13,10 @@#include "access/xlogdefs.h"
+/* user settable parameters for multi-master in postmaster */
+extern char *mm_conninfo;    /* copied in walreceiver.h also */
+#define LogicalWalReceiverActive() (mm_conninfo != NULL)
+extern int guc_replication_origin_id;extern RepNodeId current_replication_origin_id;extern XLogRecPtr
current_replication_origin_lsn;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c9ab1be..b565190 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -22,6 +22,7 @@extern bool am_walreceiver;extern int    wal_receiver_status_interval;extern bool
hot_standby_feedback;
+extern RepNodeId receiving_from_node_id;/* * MAXCONNINFO: maximum size of a connection string.
@@ -38,9 +39,9 @@ extern bool hot_standby_feedback; */typedef enum{
-    WALRCV_STOPPED,                /* stopped and mustn't start up again */    WALRCV_STARTING,            /*
launched,but the process hasn't                                 * initialized yet */
 
+    WALRCV_STOPPED,                /* stopped and mustn't start up again */    WALRCV_RUNNING,                /*
walreceiveris running */    WALRCV_STOPPING                /* requested to stop, but still running */} WalRcvState;
 
@@ -55,6 +56,7 @@ typedef struct     */    pid_t        pid;    WalRcvState walRcvState;
+    bool        startedDuringRecovery;    pg_time_t    startTime;    /*
@@ -108,9 +110,12 @@ typedef structextern WalRcvData *WalRcv;/* libpqwalreceiver hooks */
-typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool
startedDuringRecovery);externPGDLLIMPORT walrcv_connect_type walrcv_connect;
 
+typedef bool (*walrcv_start_type) (char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery);
+extern PGDLLIMPORT walrcv_start_type walrcv_start;
+typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char**buffer, int *len);extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
 
-- 
1.7.10.rc3.3.g19a6c.dirty



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions
Следующее
От: Honza Horak
Дата:
Сообщение: Re: Ability to listen on two unix sockets