[PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance

Поиск
Список
Период
Сортировка
От Andres Freund
Тема [PATCH 09/16] Decode wal (with wal_level=logical) into changes in an ApplyCache instance
Дата
Msg-id 1339586927-13156-9-git-send-email-andres@2ndquadrant.com
обсуждение исходный текст
Ответ на [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Список pgsql-hackers
From: Andres Freund <andres@anarazel.de>

This requires an up2date catalog and can thus only be run on a replica.

Missing:
- HEAP_NEWPAGE support
- HEAP2_MULTI_INSERT support
- DDL integration. *No* ddl, including TRUNCATE is possible atm
---src/backend/replication/logical/Makefile |    2 +-src/backend/replication/logical/decode.c |  439
++++++++++++++++++++++++++++++src/include/replication/decode.h        |   23 ++3 files changed, 463 insertions(+), 1
deletion(-)createmode 100644 src/backend/replication/logical/decode.ccreate mode 100644
src/include/replication/decode.h

diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2eadab8..7dd9663 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.globaloverride CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o
+OBJS = applycache.o decode.oinclude $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..7e07d50
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,439 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *
+ * Decodes wal records from an xlogreader.h callback into an applycache
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *      src/backend/replication/logical/decode.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char* data, Size len,
+                            HeapTuple table, ApplyCacheTupleBuf* tuple);
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf);
+static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+                         TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+    XLogRecord* r = &buf->record;
+    uint8 info = r->xl_info & ~XLR_INFO_MASK;
+
+    switch (r->xl_rmid)
+    {
+        case RM_HEAP_ID:
+        {
+            info &= XLOG_HEAP_OPMASK;
+            switch (info)
+            {
+                case XLOG_HEAP_INSERT:
+                    DecodeInsert(cache, buf);
+                    break;
+
+                /* no guarantee that we get an HOT update again, so handle it as a normal update*/
+                case XLOG_HEAP_HOT_UPDATE:
+                case XLOG_HEAP_UPDATE:
+                    DecodeUpdate(cache, buf);
+                    break;
+
+                case XLOG_HEAP_NEWPAGE:
+                    DecodeNewpage(cache, buf);
+                    break;
+
+                case XLOG_HEAP_DELETE:
+                    DecodeDelete(cache, buf);
+                    break;
+                default:
+                    break;
+            }
+            break;
+        }
+        case RM_HEAP2_ID:
+        {
+            info &= XLOG_HEAP_OPMASK;
+            switch (info)
+            {
+                case XLOG_HEAP2_MULTI_INSERT:
+                    /* this also handles the XLOG_HEAP_INIT_PAGE case */
+                    DecodeMultiInsert(cache, buf);
+                    break;
+                default:
+                    /* everything else here is just physical stuff were not interested in */
+                    break;
+            }
+            break;
+        }
+
+        case RM_XACT_ID:
+        {
+            switch (info)
+            {
+                case XLOG_XACT_COMMIT:
+                {
+                    TransactionId *sub_xids;
+                    xl_xact_commit *xlrec = (xl_xact_commit*)buf->record_data;
+
+                    /* FIXME: this is not really allowed if there is no subtransactions */
+                    sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+                    DecodeCommit(cache, buf, r->xl_xid, sub_xids, xlrec->nsubxacts);
+
+                    break;
+                }
+                case XLOG_XACT_COMMIT_PREPARED:
+                {
+                    TransactionId *sub_xids;
+                    xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared*)buf->record_data;
+
+                    sub_xids = (TransactionId *) &(xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+                    DecodeCommit(cache, buf, r->xl_xid, sub_xids,
+                                 xlrec->crec.nsubxacts);
+
+                    break;
+                }
+                case XLOG_XACT_COMMIT_COMPACT:
+                {
+                    xl_xact_commit_compact *xlrec = (xl_xact_commit_compact*)buf->record_data;
+                    DecodeCommit(cache, buf, r->xl_xid, xlrec->subxacts,
+                                 xlrec->nsubxacts);
+                    break;
+                }
+                case XLOG_XACT_ABORT:
+                case XLOG_XACT_ABORT_PREPARED:
+                {
+                    TransactionId *sub_xids;
+                    xl_xact_abort *xlrec = (xl_xact_abort*)buf->record_data;
+                    int i;
+
+                    /* FIXME: this is not really allowed if there is no subtransactions */
+                    sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+
+                    for(i = 0; i < xlrec->nsubxacts; i++)
+                    {
+                        ApplyCacheAbort(cache, *sub_xids, buf->origptr);
+                        sub_xids += 1;
+                    }
+
+                    /* TODO: check that this also contains not-yet-aborted subtxns */
+                    ApplyCacheAbort(cache, r->xl_xid, buf->origptr);
+
+                    elog(WARNING, "ABORT %u", r->xl_xid);
+                    break;
+                }
+                case XLOG_XACT_ASSIGNMENT:
+                    /*
+                     * XXX: We could reassign transactions to the parent here
+                     * to save space and effort when merging transactions at
+                     * commit.
+                     */
+                    break;
+                case XLOG_XACT_PREPARE:
+                    /*
+                     * FXIME: we should replay the transaction and prepare it
+                     * as well.
+                     */
+                    break;
+                default:
+                    break;
+                    ;
+            }
+            break;
+        }
+        default:
+            break;
+    }
+}
+
+static void
+DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+             TransactionId *sub_xids, int nsubxacts)
+{
+    int i;
+
+    for (i = 0; i < nsubxacts; i++)
+    {
+        ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr);
+        sub_xids++;
+    }
+
+    /* replay actions of all transaction + subtransactions in order */
+    ApplyCacheCommit(cache, xid, buf->origptr);
+}
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+    XLogRecord* r = &buf->record;
+    xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+    Oid relfilenode = xlrec->target.node.relNode;
+
+    ApplyCacheChange* change;
+
+    if (r->xl_info & XLR_BKP_BLOCK_1
+        && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+    {
+        elog(FATAL, "huh, no tuple data on wal_level = logical?");
+    }
+
+    if(relfilenode == 0)
+    {
+        elog(ERROR, "nailed catalog changed");
+    }
+
+    change = ApplyCacheGetChange(cache);
+    change->action = APPLY_CACHE_CHANGE_INSERT;
+
+    /*
+     * Lookup the pg_class entry for the relfilenode to get the real oid
+     */
+    {
+        MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+        change->table = SearchSysCacheCopy1(RELFILENODE,
+                                            relfilenode);
+        MemoryContextSwitchTo(curctx);
+    }
+
+    if (!HeapTupleIsValid(change->table))
+    {
+#ifdef SHOULD_BE_HANDLED_BETTER
+        elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+             relfilenode);
+#endif
+        ApplyCacheReturnChange(cache, change);
+        return;
+    }
+
+    if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+    {
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "skipping change to systable");
+#endif
+        ApplyCacheReturnChange(cache, change);
+        return;
+    }
+
+#ifdef VERBOSE_DEBUG
+    {
+        /*for accessing the cache */
+        Form_pg_class class_form;
+        class_form = (Form_pg_class) GETSTRUCT(change->table);
+        elog(WARNING, "INSERT INTO \"%s\"", NameStr(class_form->relname));
+    }
+#endif
+
+    change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+    DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert,
+                    r->xl_len - SizeOfHeapInsert,
+                    change->table, change->newtuple);
+
+    ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+    XLogRecord* r = &buf->record;
+    xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+    Oid relfilenode = xlrec->target.node.relNode;
+
+    ApplyCacheChange* change;
+
+    if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) &&
+        (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+    {
+        elog(FATAL, "huh, no tuple data on wal_level = logical?");
+    }
+
+    change = ApplyCacheGetChange(cache);
+    change->action = APPLY_CACHE_CHANGE_UPDATE;
+
+    /*
+     * Lookup the pg_class entry for the relfilenode to get the real oid
+     */
+    {
+        MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+        change->table = SearchSysCacheCopy1(RELFILENODE,
+                                            relfilenode);
+        MemoryContextSwitchTo(curctx);
+    }
+
+    if (!HeapTupleIsValid(change->table))
+    {
+#ifdef SHOULD_BE_HANDLED_BETTER
+        elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+             relfilenode);
+#endif
+        ApplyCacheReturnChange(cache, change);
+        return;
+    }
+
+    if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+    {
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "skipping change to systable");
+#endif
+        ApplyCacheReturnChange(cache, change);
+        return;
+    }
+
+#ifdef VERBOSE_DEBUG
+    {
+        /*for accessing the cache */
+        Form_pg_class class_form;
+        class_form = (Form_pg_class) GETSTRUCT(change->table);
+        elog(WARNING, "UPDATE \"%s\"", NameStr(class_form->relname));
+    }
+#endif
+
+    /* FIXME: need to save the old tuple as well if we want primary key updates to work. */
+    change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+    DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate,
+                    r->xl_len - SizeOfHeapUpdate,
+                    change->table, change->newtuple);
+
+    ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+    XLogRecord* r = &buf->record;
+
+    xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+    Oid relfilenode = xlrec->target.node.relNode;
+
+    ApplyCacheChange* change;
+
+    change = ApplyCacheGetChange(cache);
+    change->action = APPLY_CACHE_CHANGE_DELETE;
+
+    if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+    {
+        elog(FATAL, "huh, no primary key for a delete on wal_level = logical?");
+    }
+
+    /*
+     * Lookup the pg_class entry for the relfilenode to get the real oid
+     */
+    {
+        MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+        change->table = SearchSysCacheCopy1(RELFILENODE,
+                                            relfilenode);
+        MemoryContextSwitchTo(curctx);
+    }
+
+    if (!HeapTupleIsValid(change->table))
+    {
+#ifdef SHOULD_BE_HANDLED_BETTER
+        elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+             relfilenode);
+#endif
+        ApplyCacheReturnChange(cache, change);
+        return;
+    }
+
+    if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+    {
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "skipping change to systable");
+#endif
+        ApplyCacheReturnChange(cache, change);
+        return;
+    }
+
+#ifdef VERBOSE_DEBUG
+    {
+        /*for accessing the cache */
+        Form_pg_class class_form;
+        class_form = (Form_pg_class) GETSTRUCT(change->table);
+        elog(WARNING, "DELETE FROM \"%s\"", NameStr(class_form->relname));
+    }
+#endif
+
+    change->oldtuple = ApplyCacheGetTupleBuf(cache);
+
+    DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete,
+                    r->xl_len - SizeOfHeapDelete,
+                    change->table, change->oldtuple);
+
+    ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+
+static void
+DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+    elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too dumb");
+}
+
+static void
+DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+    elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are too dumb");
+}
+
+
+static void DecodeXLogTuple(char* data, Size len,
+                            HeapTuple table, ApplyCacheTupleBuf* tuple)
+{
+    xl_heap_header xlhdr;
+    int datalen = len - SizeOfHeapHeader;
+
+    Assert(datalen >= 0);
+    Assert(datalen <= MaxHeapTupleSize);
+
+    tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+    /* not a disk based tuple */
+    ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+    /* probably not needed, but ... (is it actually valid to set it?) */
+    tuple->tuple.t_tableOid = HeapTupleGetOid(table);
+    tuple->tuple.t_data = &tuple->header;
+
+    /* data is not stored aligned */
+    memcpy((char *) &xlhdr,
+           data,
+           SizeOfHeapHeader);
+
+    memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+    memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+           data + SizeOfHeapHeader,
+           datalen);
+
+    tuple->header.t_infomask = xlhdr.t_infomask;
+    tuple->header.t_infomask2 = xlhdr.t_infomask2;
+    tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..53088e2
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ *     PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/applycache.h"
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf);
+
+typedef struct ReaderApplyState
+{
+    ApplyCache *apply_cache;
+} ReaderApplyState;
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical
Следующее
От: Andres Freund
Дата:
Сообщение: [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level