[PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes

Поиск
Список
Период
Сортировка
От Andres Freund
Тема [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes
Дата
Msg-id 1339586927-13156-8-git-send-email-andres@2ndquadrant.com
обсуждение исходный текст
Ответ на [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Ответы Re: [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes  (Steve Singer <steve@ssinger.info>)
Список pgsql-hackers
From: Andres Freund <andres@anarazel.de>

The individual changes need to be identified by an xid. The xid can be a
subtransaction or a toplevel one, at commit those can be reintegrated by doing
a k-way mergesort between the individual transaction.

Callbacks for apply_begin, apply_change and apply_commit are provided to
retrieve complete transactions.

Missing:
- spill-to-disk
- correct subtransaction merge, current behaviour is simple/wrong
- DDL handling (?)
- resource usage controls
---src/backend/replication/Makefile             |    2 +src/backend/replication/logical/Makefile     |   19
++src/backend/replication/logical/applycache.c|  380 ++++++++++++++++++++++++++src/include/replication/applycache.h
   |  185 +++++++++++++4 files changed, 586 insertions(+)create mode 100644
src/backend/replication/logical/Makefilecreatemode 100644 src/backend/replication/logical/applycache.ccreate mode
100644src/include/replication/applycache.h
 

diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 9d9ec87..ae7f6b1 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)OBJS = walsender.o walreceiverfuncs.o walreceiver.o
basebackup.o\    repl_gram.o syncrep.o
 
+SUBDIRS = logical
+include $(top_srcdir)/src/backend/common.mk# repl_scanner is compiled as part of repl_gram
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
new file mode 100644
index 0000000..2eadab8
--- /dev/null
+++ b/src/backend/replication/logical/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for src/backend/replication/logical
+#
+# IDENTIFICATION
+#    src/backend/replication/logical/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/logical
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
+
+OBJS = applycache.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/applycache.c b/src/backend/replication/logical/applycache.c
new file mode 100644
index 0000000..b73b0ba
--- /dev/null
+++ b/src/backend/replication/logical/applycache.c
@@ -0,0 +1,380 @@
+/*-------------------------------------------------------------------------
+ *
+ * applycache.c
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *      src/backend/replication/applycache.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "replication/applycache.h"
+
+#include "utils/ilist.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+
+const Size max_memtries = 1<<16;
+
+const size_t max_cached_changes = 1024;
+const size_t max_cached_tuplebufs = 1024; /* ~8MB */
+const size_t max_cached_transactions = 512;
+
+typedef struct ApplyCacheTXNByIdEnt
+{
+    TransactionId xid;
+    ApplyCacheTXN* txn;
+} ApplyCacheTXNByIdEnt;
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache);
+static void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn);
+
+static ApplyCacheTXN* ApplyCacheTXNByXid(ApplyCache*, TransactionId xid, bool create);
+
+
+ApplyCache*
+ApplyCacheAllocate(void)
+{
+    ApplyCache* cache = (ApplyCache*)malloc(sizeof(ApplyCache));
+    HASHCTL         hash_ctl;
+
+    if (!cache)
+        elog(ERROR, "Could not allocate the ApplyCache");
+
+    memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+    cache->context = AllocSetContextCreate(TopMemoryContext,
+                                           "ApplyCache",
+                                           ALLOCSET_DEFAULT_MINSIZE,
+                                           ALLOCSET_DEFAULT_INITSIZE,
+                                           ALLOCSET_DEFAULT_MAXSIZE);
+
+    hash_ctl.keysize = sizeof(TransactionId);
+    hash_ctl.entrysize = sizeof(ApplyCacheTXNByIdEnt);
+    hash_ctl.hash = tag_hash;
+    hash_ctl.hcxt = cache->context;
+
+    cache->by_txn = hash_create("ApplyCacheByXid", 1000, &hash_ctl,
+                                HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+    cache->nr_cached_transactions = 0;
+    cache->nr_cached_changes = 0;
+    cache->nr_cached_tuplebufs = 0;
+
+    ilist_d_init(&cache->cached_transactions);
+    ilist_d_init(&cache->cached_changes);
+    ilist_s_init(&cache->cached_tuplebufs);
+
+    return cache;
+}
+
+void ApplyCacheFree(ApplyCache* cache)
+{
+    /* FIXME: check for in-progress transactions */
+    /* FIXME: clean up cached transaction */
+    /* FIXME: clean up cached changes */
+    /* FIXME: clean up cached tuplebufs */
+    hash_destroy(cache->by_txn);
+    free(cache);
+}
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache)
+{
+    ApplyCacheTXN* txn;
+
+    if (cache->nr_cached_transactions)
+    {
+        cache->nr_cached_transactions--;
+        txn = ilist_container(ApplyCacheTXN, node,
+                              ilist_d_pop_front(&cache->cached_transactions));
+    }
+    else
+    {
+        txn = (ApplyCacheTXN*)
+            malloc(sizeof(ApplyCacheTXN));
+
+        if (!txn)
+            elog(ERROR, "Could not allocate a ApplyCacheTXN struct");
+    }
+
+    memset(txn, 0, sizeof(ApplyCacheTXN));
+    ilist_d_init(&txn->changes);
+    ilist_d_init(&txn->subtxns);
+    return txn;
+}
+
+void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn)
+{
+    if(cache->nr_cached_transactions < max_cached_transactions){
+        cache->nr_cached_transactions++;
+        ilist_d_push_front(&cache->cached_transactions, &txn->node);
+    }
+    else{
+        free(txn);
+    }
+}
+
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache* cache)
+{
+    ApplyCacheChange* change;
+
+    if (cache->nr_cached_changes)
+    {
+        cache->nr_cached_changes--;
+        change = ilist_container(ApplyCacheChange, node,
+                                 ilist_d_pop_front(&cache->cached_changes));
+    }
+    else
+    {
+        change = (ApplyCacheChange*)malloc(sizeof(ApplyCacheChange));
+
+        if (!change)
+            elog(ERROR, "Could not allocate a ApplyCacheChange struct");
+    }
+
+
+    memset(change, 0, sizeof(ApplyCacheChange));
+    return change;
+}
+
+void
+ApplyCacheReturnChange(ApplyCache* cache, ApplyCacheChange* change)
+{
+    if (change->newtuple)
+        ApplyCacheReturnTupleBuf(cache, change->newtuple);
+    if (change->oldtuple)
+        ApplyCacheReturnTupleBuf(cache, change->oldtuple);
+
+    if (change->table)
+        heap_freetuple(change->table);
+
+    if(cache->nr_cached_changes < max_cached_changes){
+        cache->nr_cached_changes++;
+        ilist_d_push_front(&cache->cached_changes, &change->node);
+    }
+    else{
+        free(change);
+    }
+}
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache* cache)
+{
+    ApplyCacheTupleBuf* tuple;
+
+    if (cache->nr_cached_tuplebufs)
+    {
+        cache->nr_cached_tuplebufs--;
+        tuple = ilist_container(ApplyCacheTupleBuf, node,
+                                ilist_s_pop_front(&cache->cached_tuplebufs));
+    }
+    else
+    {
+        tuple =
+            (ApplyCacheTupleBuf*)malloc(sizeof(ApplyCacheTupleBuf));
+
+        if (!tuple)
+            elog(ERROR, "Could not allocate a ApplyCacheTupleBuf struct");
+    }
+
+    return tuple;
+}
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple)
+{
+    if(cache->nr_cached_tuplebufs < max_cached_tuplebufs){
+        cache->nr_cached_tuplebufs++;
+        ilist_s_push_front(&cache->cached_tuplebufs, &tuple->node);
+    }
+    else{
+        free(tuple);
+    }
+}
+
+
+static
+ApplyCacheTXN*
+ApplyCacheTXNByXid(ApplyCache* cache, TransactionId xid, bool create)
+{
+    ApplyCacheTXNByIdEnt* ent;
+    bool found;
+
+    ent = (ApplyCacheTXNByIdEnt*)
+        hash_search(cache->by_txn,
+                    (void *)&xid,
+                    (create ? HASH_ENTER : HASH_FIND),
+                    &found);
+
+    if (found)
+    {
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "found cache entry for %u at %p", xid, ent);
+#endif
+    }
+    else
+    {
+#ifdef VERBOSE_DEBUG
+        elog(LOG, "didn't find cache entry for %u in %p at %p, creating %u",
+             xid, cache, ent, create);
+#endif
+    }
+
+    if (!found && !create)
+        return NULL;
+
+    if (!found)
+    {
+        ent->txn = ApplyCacheGetTXN(cache);
+    }
+
+    return ent->txn;
+}
+
+void
+ApplyCacheAddChange(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn,
+                    ApplyCacheChange* change)
+{
+    ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, true);
+    txn->lsn = lsn;
+    ilist_d_push_back(&txn->changes, &change->node);
+}
+
+
+void
+ApplyCacheCommitChild(ApplyCache* cache, TransactionId xid,
+                      TransactionId subxid, XLogRecPtr lsn)
+{
+    ApplyCacheTXN* txn;
+    ApplyCacheTXN* subtxn;
+
+    subtxn = ApplyCacheTXNByXid(cache, subxid, false);
+
+    /*
+     * No need to do anything if that subtxn didn't contain any changes
+     */
+    if (!subtxn)
+        return;
+
+    subtxn->lsn = lsn;
+
+    txn = ApplyCacheTXNByXid(cache, xid, true);
+
+    ilist_d_push_back(&txn->subtxns, &subtxn->node);
+}
+
+void
+ApplyCacheCommit(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+    ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+    ilist_d_node* cur_change, *next_change;
+    ilist_d_node* cur_txn, *next_txn;
+    bool found;
+
+    if (!txn)
+        return;
+
+    txn->lsn = lsn;
+
+    cache->begin(cache, txn);
+
+    /*
+     * FIXME:
+     * do a k-way mergesort of all changes ordered by xid
+     *
+     * For now we just iterate through all subtransactions and then through the
+     * main txn. But thats *WRONG*.
+     *
+     * The best way to do is probably to model the current heads of all TXNs as
+     * a heap and always remove from the smallest lsn till thats not the case
+     * anymore.
+     */
+    ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+    {
+        ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+
+        ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+        {
+            ApplyCacheChange* change =
+                ilist_container(ApplyCacheChange, node, cur_change);
+            cache->apply_change(cache, txn, subtxn, change);
+
+            ApplyCacheReturnChange(cache, change);
+        }
+        ApplyCacheReturnTXN(cache, subtxn);
+    }
+
+    ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+    {
+        ApplyCacheChange* change =
+            ilist_container(ApplyCacheChange, node, cur_change);
+        cache->apply_change(cache, txn, NULL, change);
+
+        ApplyCacheReturnChange(cache, change);
+    }
+
+    cache->commit(cache, txn);
+
+    /* now remove reference from cache */
+    hash_search(cache->by_txn,
+                (void *)&xid,
+                HASH_REMOVE,
+                &found);
+    Assert(found);
+
+    ApplyCacheReturnTXN(cache, txn);
+}
+
+void
+ApplyCacheAbort(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+    ilist_d_node* cur_change, *next_change;
+    ilist_d_node* cur_txn, *next_txn;
+    ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+    bool found;
+
+    /* no changes in this commit */
+    if (!txn)
+        return;
+
+    /* iterate through all subtransactions and free memory */
+    ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+    {
+        ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+        ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+        {
+            ApplyCacheChange* change =
+                ilist_container(ApplyCacheChange, node, cur_change);
+            ApplyCacheReturnChange(cache, change);
+        }
+        ApplyCacheReturnTXN(cache, subtxn);
+    }
+
+    ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+    {
+        ApplyCacheChange* change =
+            ilist_container(ApplyCacheChange, node, cur_change);
+        ApplyCacheReturnChange(cache, change);
+    }
+
+    /* now remove reference from cache */
+    hash_search(cache->by_txn,
+                (void *)&xid,
+                HASH_REMOVE,
+                &found);
+    Assert(found);
+
+    ApplyCacheReturnTXN(cache, txn);
+}
diff --git a/src/include/replication/applycache.h b/src/include/replication/applycache.h
new file mode 100644
index 0000000..4ceba63
--- /dev/null
+++ b/src/include/replication/applycache.h
@@ -0,0 +1,185 @@
+/*
+ * applycache.h
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/applycache.h
+ */
+#ifndef APPLYCACHE_H
+#define APPLYCACHE_H
+
+#include "access/htup.h"
+#include "utils/hsearch.h"
+#include "utils/ilist.h"
+
+typedef struct ApplyCache ApplyCache;
+
+enum ApplyCacheChangeType
+{
+    APPLY_CACHE_CHANGE_INSERT,
+    APPLY_CACHE_CHANGE_UPDATE,
+    APPLY_CACHE_CHANGE_DELETE
+};
+
+typedef struct ApplyCacheTupleBuf
+{
+    /* position in preallocated list */
+    ilist_s_node node;
+
+    HeapTupleData tuple;
+    HeapTupleHeaderData header;
+    char data[MaxHeapTupleSize];
+} ApplyCacheTupleBuf;
+
+typedef struct ApplyCacheChange
+{
+    XLogRecPtr lsn;
+    enum ApplyCacheChangeType action;
+
+    ApplyCacheTupleBuf* newtuple;
+
+    ApplyCacheTupleBuf* oldtuple;
+
+    HeapTuple table;
+
+    /*
+     * While in use this is how a change is linked into a transactions,
+     * otherwise its the preallocated list.
+    */
+    ilist_d_node node;
+} ApplyCacheChange;
+
+typedef struct ApplyCacheTXN
+{
+    TransactionId xid;
+
+    XLogRecPtr lsn;
+
+    /*
+     * How many ApplyCacheChange's do we have in this txn.
+     *
+     * Subtransactions are *not* included.
+     */
+    Size nentries;
+
+    /*
+     * How many of the above entries are stored in memory in contrast to being
+     * spilled to disk.
+     */
+    Size nentries_mem;
+
+    /*
+     * List of actual changes
+     */
+    ilist_d_head changes;
+
+    /*
+     * non-hierarchical list of subtransactions that are *not* aborted
+     */
+    ilist_d_head subtxns;
+
+    /*
+     * our position in a list of subtransactions while the TXN is in
+     * use. Otherwise its the position in the list of preallocated
+     * transactions.
+     */
+    ilist_d_node node;
+} ApplyCacheTXN;
+
+
+/* XXX: were currently passing the originating subtxn. Not sure thats necessary */
+typedef void (*ApplyCacheApplyChangeCB)(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn,
ApplyCacheChange*change);
 
+typedef void (*ApplyCacheBeginCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+typedef void (*ApplyCacheCommitCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+
+/*
+ * max number of concurrent top-level transactions or transaction where we
+ * don't know if they are top-level can be calculated by:
+ * (max_connections + max_prepared_xactx + ?)  * PGPROC_MAX_CACHED_SUBXIDS
+ */
+struct ApplyCache
+{
+    TransactionId last_txn;
+    ApplyCacheTXN *last_txn_cache;
+    HTAB *by_txn;
+
+    ApplyCacheBeginCB begin;
+    ApplyCacheApplyChangeCB apply_change;
+    ApplyCacheCommitCB commit;
+
+    void* private_data;
+
+    MemoryContext context;
+
+    /*
+     * we don't want to repeatedly (de-)allocated those structs, so cache them for reusage.
+     */
+    ilist_d_head cached_transactions;
+    size_t nr_cached_transactions;
+
+    ilist_d_head cached_changes;
+    size_t nr_cached_changes;
+
+    ilist_s_head cached_tuplebufs;
+    size_t nr_cached_tuplebufs;
+};
+
+
+ApplyCache*
+ApplyCacheAllocate(void);
+
+void
+ApplyCacheFree(ApplyCache*);
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache*);
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple);
+
+/*
+ * Returns a (potentically preallocated) change struct. Its lifetime is managed
+ * by the applycache module.
+ *
+ * If not added to a transaction with ApplyCacheAddChange it needs to be
+ * returned via ApplyCacheReturnChange
+ *
+ * FIXME: better name
+ */
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache*);
+
+/*
+ * Return an unused ApplyCacheChange struct
+ */
+void
+ApplyCacheReturnChange(ApplyCache*, ApplyCacheChange*);
+
+
+/*
+ * record the transaction as in-progress if not already done, add the current
+ * change.
+ *
+ * We have a one-entry cache for lookin up the current ApplyCacheTXN so we
+ * don't need to do a full hash-lookup if the same xid is used
+ * sequentially. Them being used multiple times that way is rather frequent.
+ */
+void
+ApplyCacheAddChange(ApplyCache*, TransactionId, XLogRecPtr lsn, ApplyCacheChange*);
+
+/*
+ *
+ */
+void
+ApplyCacheCommit(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheCommitChild(ApplyCache*, TransactionId, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheAbort(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: [PATCH 12/16] Add state to keep track of logical replication
Следующее
От: Andres Freund
Дата:
Сообщение: [PATCH 10/16] Introduce the concept that wal has a 'origin' node