[PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions

Поиск
Список
Период
Сортировка
От Andres Freund
Тема [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions
Дата
Msg-id 1339586927-13156-14-git-send-email-andres@2ndquadrant.com
обсуждение исходный текст
Ответ на [RFC][PATCH] Logical Replication/BDR prototype and architecture  (Andres Freund <andres@2ndquadrant.com>)
Ответы Re: [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions  (Christopher Browne <cbbrowne@gmail.com>)
Re: [PATCH 14/16] Add module to apply changes from an apply-cache using low-level functions  (Alexander Korotkov <aekorotkov@gmail.com>)
Список pgsql-hackers
From: Andres Freund <andres@anarazel.de>

We decided to use low level functions to do the apply instead of producing sql
statements containing the data (or using prepared statements) because both, the
text conversion and the full executor overhead aren't introduce a significant
overhead which is unneccesary if youre using the same version of pg on the same
architecture.

There are loads of use cases though that require different methods of applyin
though - so the part doing the applying from an ApplyCache is just a bunch of
well abstracted callbacks getting passed all the required knowledge to change
the data representation into other formats.

Missing:

- TOAST handling. For physical apply not much needs to be done because the toast inserts will have been made
beforehand.There needs to be an option in ApplyCache that helps reassembling TOAST datums to make it easier to write
applymodules which convert to text.
 
---src/backend/replication/logical/Makefile |    2 +-src/backend/replication/logical/apply.c  |  313
++++++++++++++++++++++++++++++src/include/replication/apply.h         |   24 +++3 files changed, 338 insertions(+), 1
deletion(-)createmode 100644 src/backend/replication/logical/apply.ccreate mode 100644 src/include/replication/apply.h
 

diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c2d6d82..d0e0b13 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.globaloverride CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o decode.o logical.o
+OBJS = apply.o applycache.o decode.o logical.oinclude $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/apply.c b/src/backend/replication/logical/apply.c
new file mode 100644
index 0000000..646bd54
--- /dev/null
+++ b/src/backend/replication/logical/apply.c
@@ -0,0 +1,313 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *      src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/heapam.h"
+#include "access/genam.h"
+
+#include "catalog/pg_control.h"
+#include "catalog/index.h"
+
+#include "executor/executor.h"
+
+#include "replication/applycache.h"
+#include "replication/apply.h"
+
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
+
+
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple);
+
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+    ApplyApplyCacheState *state = cache->private_data;
+
+    state->original_resource_owner = CurrentResourceOwner;
+
+    PreventTransactionChain(true, "Apply Process cannot be started inside a txn");
+
+    StartTransactionCommand();
+
+    PushActiveSnapshot(GetTransactionSnapshot());
+}
+
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+    ApplyApplyCacheState *state = cache->private_data;
+
+    current_replication_origin_lsn = txn->lsn;
+
+    PopActiveSnapshot();
+    CommitTransactionCommand();
+
+
+    /*
+     * for some reason after (Start|Commit)TransactionCommand we loose our
+     * resource owner, restore it.
+     * XXX: is that correct?
+     */
+    CurrentResourceOwner = state->original_resource_owner;
+
+    current_replication_origin_lsn.xlogid = 0;
+    current_replication_origin_lsn.xrecoff = 0;
+}
+
+
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change)
+{
+    /* for inserting */
+    Relation    tuple_rel;
+
+    tuple_rel = heap_open(HeapTupleGetOid(change->table), RowExclusiveLock);
+
+    switch (change->action)
+    {
+        case APPLY_CACHE_CHANGE_INSERT:
+        {
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "INSERT");
+#endif
+            simple_heap_insert(tuple_rel, &change->newtuple->tuple);
+
+            UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple);
+            break;
+        }
+        case APPLY_CACHE_CHANGE_UPDATE:
+        {
+            Oid indexoid = InvalidOid;
+            int16 pknratts;
+            int16 pkattnum[INDEX_MAX_KEYS];
+            Oid pktypoid[INDEX_MAX_KEYS];
+            Oid pkopclass[INDEX_MAX_KEYS];
+
+            ScanKeyData cur_skey[INDEX_MAX_KEYS];
+            int i;
+            bool isnull;
+            TupleDesc desc = RelationGetDescr(tuple_rel);
+
+            Relation index_rel;
+
+            HeapTuple old_tuple;
+            bool found = false;
+
+            IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "UPDATE");
+#endif
+            MemSet(pkattnum, 0, sizeof(pkattnum));
+            MemSet(pktypoid, 0, sizeof(pktypoid));
+            MemSet(pkopclass, 0, sizeof(pkopclass));
+
+            relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+            if (!OidIsValid(indexoid))
+               ereport(ERROR,
+                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                        errmsg("there is no primary key for table \"%s\"",
+                               RelationGetRelationName(tuple_rel))));
+
+            index_rel = index_open(indexoid, AccessShareLock);
+
+            for (i = 0; i < pknratts; i++)
+            {
+                Oid operator;
+                Oid opfamily;
+                RegProcedure regop;
+
+                opfamily = get_opclass_family(pkopclass[i]);
+
+                operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+                regop = get_opcode(operator);
+
+                ScanKeyInit(&cur_skey[i],
+                            pkattnum[i],
+                            BTEqualStrategyNumber,
+                            regop,
+                            fastgetattr(&change->newtuple->tuple, pkattnum[i], desc, &isnull));
+
+                Assert(!isnull);
+            }
+
+            scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(),
+                                   pknratts, 0);
+            index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+            while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+            {
+                if (found)
+                {
+                    elog(ERROR, "WTF, more than one tuple found via pk???");
+                }
+                found = true;
+
+                simple_heap_update(tuple_rel, &old_tuple->t_self, &change->newtuple->tuple);
+            }
+
+            if (!found)
+                elog(ERROR, "could not find tuple to update");
+
+            index_endscan(scan);
+
+            if (!HeapTupleIsHeapOnly(&change->newtuple->tuple))
+                UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple);
+
+            heap_close(index_rel, NoLock);
+
+            break;
+        }
+        case APPLY_CACHE_CHANGE_DELETE:
+        {
+            Oid indexoid = InvalidOid;
+            int16 pknratts;
+            int16 pkattnum[INDEX_MAX_KEYS];
+            Oid pktypoid[INDEX_MAX_KEYS];
+            Oid pkopclass[INDEX_MAX_KEYS];
+
+            ScanKeyData cur_skey[INDEX_MAX_KEYS];
+            int i;
+            bool isnull;
+
+            Relation index_rel;
+
+            HeapTuple old_tuple;
+            bool found = false;
+
+            TupleDesc index_desc;
+
+            IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+            elog(LOG, "DELETE comming");
+#endif
+            MemSet(pkattnum, 0, sizeof(pkattnum));
+            MemSet(pktypoid, 0, sizeof(pktypoid));
+            MemSet(pkopclass, 0, sizeof(pkopclass));
+
+            relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+            if (!OidIsValid(indexoid))
+               ereport(ERROR,
+                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                        errmsg("there is no primary key for table \"%s\"",
+                               RelationGetRelationName(tuple_rel))));
+
+            index_rel = index_open(indexoid, AccessShareLock);
+            index_desc = RelationGetDescr(index_rel);
+
+            for (i = 0; i < pknratts; i++)
+            {
+                Oid operator;
+                Oid opfamily;
+                RegProcedure regop;
+
+                opfamily = get_opclass_family(pkopclass[i]);
+
+                operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+                regop = get_opcode(operator);
+
+                ScanKeyInit(&cur_skey[i],
+                            pkattnum[i],
+                            BTEqualStrategyNumber,
+                            regop,
+                            fastgetattr(&change->oldtuple->tuple, i + 1, index_desc, &isnull));
+
+                Assert(!isnull);
+            }
+
+            scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(),
+                                   pknratts, 0);
+            index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+
+            while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+            {
+                if (found)
+                {
+                    elog(ERROR, "WTF, more than one tuple found via pk???");
+                }
+                found = true;
+                simple_heap_delete(tuple_rel, &old_tuple->t_self);
+            }
+
+            if (!found)
+                elog(ERROR, "could not find tuple to update");
+
+            index_endscan(scan);
+
+            heap_close(index_rel, NoLock);
+
+            break;
+        }
+    }
+    /* FIXME: locking */
+
+    heap_close(tuple_rel, NoLock);
+    CommandCounterIncrement();
+}
+
+/*
+ * The state object used by CatalogOpenIndexes and friends is actually the
+ * same as the executor's ResultRelInfo, but we give it another type name
+ * to decouple callers from that fact.
+ */
+typedef struct ResultRelInfo *UserTableIndexState;
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple)
+{
+    /* this is largely copied together from copy.c's CopyFrom */
+    EState *estate = CreateExecutorState();
+    ResultRelInfo *resultRelInfo;
+    List *recheckIndexes = NIL;
+    TupleDesc tupleDesc = RelationGetDescr(heapRel);
+
+    resultRelInfo = makeNode(ResultRelInfo);
+    resultRelInfo->ri_RangeTableIndex = 1;        /* dummy */
+    resultRelInfo->ri_RelationDesc = heapRel;
+    resultRelInfo->ri_TrigInstrument = NULL;
+
+    ExecOpenIndices(resultRelInfo);
+
+    estate->es_result_relations = resultRelInfo;
+    estate->es_num_result_relations = 1;
+    estate->es_result_relation_info = resultRelInfo;
+
+    if (resultRelInfo->ri_NumIndices > 0)
+    {
+        TupleTableSlot *slot = ExecInitExtraTupleSlot(estate);
+        ExecSetSlotDescriptor(slot, tupleDesc);
+        ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
+
+        recheckIndexes = ExecInsertIndexTuples(slot, &heapTuple->t_self,
+                                               estate);
+    }
+
+    ExecResetTupleTable(estate->es_tupleTable, false);
+
+    ExecCloseIndices(resultRelInfo);
+
+    FreeExecutorState(estate);
+    /* FIXME: recheck the indexes */
+    list_free(recheckIndexes);
+}
diff --git a/src/include/replication/apply.h b/src/include/replication/apply.h
new file mode 100644
index 0000000..3b818c0
--- /dev/null
+++ b/src/include/replication/apply.h
@@ -0,0 +1,24 @@
+/*
+ * apply.h
+ *
+ * PostgreSQL logical replay
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical/replay.h
+ */
+#ifndef APPLY_H
+#define APPLY_H
+
+#include "utils/resowner.h"
+
+typedef struct ApplyApplyCacheState
+{
+    ResourceOwner original_resource_owner;
+} ApplyApplyCacheState;
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+
+#endif
-- 
1.7.10.rc3.3.g19a6c.dirty



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: [PATCH 15/16] Activate/Implement the "apply process" which applies received updates from another node
Следующее
От: Andres Freund
Дата:
Сообщение: [PATCH 13/16] Introduction of pair of logical walreceiver/sender