Re: Reducing memory consumption for pending inval messages

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Reducing memory consumption for pending inval messages
Дата
Msg-id 3380133.1626207700@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Reducing memory consumption for pending inval messages  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: Reducing memory consumption for pending inval messages
Список pgsql-hackers
I wrote:
> It turns out that the existing implementation in inval.c is quite
> inefficient when a lot of individual commands each register just
> a few invalidations --- but a few invalidations per command is
> pretty typical.

Per the cfbot, here's a rebase over 3788c6678 (actually just
undoing its effects on inval.c, since that code is removed here).

            regards, tom lane

diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 9c79775725..9352c68090 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -71,11 +71,6 @@
  *    manipulating the init file is in relcache.c, but we keep track of the
  *    need for it here.
  *
- *    The request lists proper are kept in CurTransactionContext of their
- *    creating (sub)transaction, since they can be forgotten on abort of that
- *    transaction but must be kept till top-level commit otherwise.  For
- *    simplicity we keep the controlling list-of-lists in TopTransactionContext.
- *
  *    Currently, inval messages are sent without regard for the possibility
  *    that the object described by the catalog tuple might be a session-local
  *    object such as a temporary table.  This is because (1) this code has
@@ -106,7 +101,6 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_constraint.h"
 #include "miscadmin.h"
-#include "port/pg_bitutils.h"
 #include "storage/sinval.h"
 #include "storage/smgr.h"
 #include "utils/catcache.h"
@@ -121,35 +115,86 @@


 /*
- * To minimize palloc traffic, we keep pending requests in successively-
- * larger chunks (a slightly more sophisticated version of an expansible
- * array).  All request types can be stored as SharedInvalidationMessage
- * records.  The ordering of requests within a list is never significant.
+ * Pending requests are stored as ready-to-send SharedInvalidationMessages.
+ * We keep the messages themselves in arrays in TopTransactionContext
+ * (there are separate arrays for catcache and relcache messages).  Control
+ * information is kept in a chain of TransInvalidationInfo structs, also
+ * allocated in TopTransactionContext.  (We could keep a subtransaction's
+ * TransInvalidationInfo in its CurTransactionContext; but that's more
+ * wasteful not less so, since in very many scenarios it'd be the only
+ * allocation in the subtransaction's CurTransactionContext.)
+ *
+ * We can store the message arrays densely, and yet avoid moving data around
+ * within an array, because within any one subtransaction we need only
+ * distinguish between messages emitted by prior commands and those emitted
+ * by the current command.  Once a command completes and we've done local
+ * processing on its messages, we can fold those into the prior-commands
+ * messages just by changing array indexes in the TransInvalidationInfo
+ * struct.  Similarly, we need distinguish messages of prior subtransactions
+ * from those of the current subtransaction only until the subtransaction
+ * completes, after which we adjust the array indexes in the parent's
+ * TransInvalidationInfo to include the subtransaction's messages.
+ *
+ * The ordering of the individual messages within a command's or
+ * subtransaction's output is not considered significant, although this
+ * implementation happens to preserve the order in which they were queued.
+ * (Previous versions of this code did not preserve it.)
+ *
+ * For notational convenience, control information is kept in two-element
+ * arrays, the first for catcache messages and the second for relcache
+ * messages.
  */
-typedef struct InvalidationChunk
+#define CatCacheMsgs 0
+#define RelCacheMsgs 1
+
+/* Pointers to main arrays in TopTransactionContext */
+typedef struct InvalMessageArray
 {
-    struct InvalidationChunk *next; /* list link */
-    int            nitems;            /* # items currently stored in chunk */
-    int            maxitems;        /* size of allocated array in this chunk */
-    SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
-} InvalidationChunk;
+    SharedInvalidationMessage *msgs;    /* palloc'd array (can be expanded) */
+    int            maxmsgs;        /* current allocated size of array */
+} InvalMessageArray;

-typedef struct InvalidationListHeader
+static InvalMessageArray InvalMessageArrays[2];
+
+/* Control information for one logical group of messages */
+typedef struct InvalidationMsgsGroup
 {
-    InvalidationChunk *cclist;    /* list of chunks holding catcache msgs */
-    InvalidationChunk *rclist;    /* list of chunks holding relcache msgs */
-} InvalidationListHeader;
+    int            firstmsg[2];    /* first index in relevant array */
+    int            nextmsg[2];        /* last+1 index */
+} InvalidationMsgsGroup;
+
+/* Macros to help preserve InvalidationMsgsGroup abstraction */
+#define SetSubGroupToFollow(targetgroup, priorgroup, subgroup) \
+    do { \
+        (targetgroup)->firstmsg[subgroup] = \
+            (targetgroup)->nextmsg[subgroup] = \
+            (priorgroup)->nextmsg[subgroup]; \
+    } while (0)
+
+#define SetGroupToFollow(targetgroup, priorgroup) \
+    do { \
+        SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \
+        SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \
+    } while (0)
+
+#define NumMessagesInSubGroup(group, subgroup) \
+    ((group)->nextmsg[subgroup] - (group)->firstmsg[subgroup])
+
+#define NumMessagesInGroup(group) \
+    (NumMessagesInSubGroup(group, CatCacheMsgs) + \
+     NumMessagesInSubGroup(group, RelCacheMsgs))
+

 /*----------------
- * Invalidation info is divided into two lists:
+ * Invalidation messages are divided into two groups:
  *    1) events so far in current command, not yet reflected to caches.
  *    2) events in previous commands of current transaction; these have
  *       been reflected to local caches, and must be either broadcast to
  *       other backends or rolled back from local cache when we commit
  *       or abort the transaction.
- * Actually, we need two such lists for each level of nested transaction,
+ * Actually, we need such groups for each level of nested transaction,
  * so that we can discard events from an aborted subtransaction.  When
- * a subtransaction commits, we append its lists to the parent's lists.
+ * a subtransaction commits, we append its events to the parent's groups.
  *
  * The relcache-file-invalidated flag can just be a simple boolean,
  * since we only act on it at transaction commit; we don't care which
@@ -165,11 +210,11 @@ typedef struct TransInvalidationInfo
     /* Subtransaction nesting depth */
     int            my_level;

-    /* head of current-command event list */
-    InvalidationListHeader CurrentCmdInvalidMsgs;
+    /* Events emitted by current command */
+    InvalidationMsgsGroup CurrentCmdInvalidMsgs;

-    /* head of previous-commands event list */
-    InvalidationListHeader PriorCmdInvalidMsgs;
+    /* Events emitted by previous commands of this (sub)transaction */
+    InvalidationMsgsGroup PriorCmdInvalidMsgs;

     /* init file must be invalidated? */
     bool        RelcacheInitFileInval;
@@ -177,10 +222,6 @@ typedef struct TransInvalidationInfo

 static TransInvalidationInfo *transInvalInfo = NULL;

-static SharedInvalidationMessage *SharedInvalidMessagesArray;
-static int    numSharedInvalidMessagesArray;
-static int    maxSharedInvalidMessagesArray;
-
 /* GUC storage */
 int            debug_discard_caches = 0;

@@ -218,124 +259,118 @@ static struct RELCACHECALLBACK
 static int    relcache_callback_count = 0;

 /* ----------------------------------------------------------------
- *                Invalidation list support functions
- *
- * These three routines encapsulate processing of the "chunked"
- * representation of what is logically just a list of messages.
+ *                Invalidation subgroup support functions
  * ----------------------------------------------------------------
  */

 /*
  * AddInvalidationMessage
- *        Add an invalidation message to a list (of chunks).
+ *        Add an invalidation message to a (sub)group.
+ *
+ * The group must be the last active one, since we assume we can add to the
+ * end of the relevant InvalMessageArray.
  *
- * Note that we do not pay any great attention to maintaining the original
- * ordering of the messages.
+ * subgroup must be CatCacheMsgs or RelCacheMsgs.
  */
 static void
-AddInvalidationMessage(InvalidationChunk **listHdr,
-                       SharedInvalidationMessage *msg)
+AddInvalidationMessage(InvalidationMsgsGroup *group, int subgroup,
+                       const SharedInvalidationMessage *msg)
 {
-    InvalidationChunk *chunk = *listHdr;
+    InvalMessageArray *ima = &InvalMessageArrays[subgroup];
+    int            nextindex = group->nextmsg[subgroup];

-    if (chunk == NULL)
-    {
-        /* First time through; create initial chunk */
-#define FIRSTCHUNKSIZE 32
-        chunk = (InvalidationChunk *)
-            MemoryContextAlloc(CurTransactionContext,
-                               offsetof(InvalidationChunk, msgs) +
-                               FIRSTCHUNKSIZE * sizeof(SharedInvalidationMessage));
-        chunk->nitems = 0;
-        chunk->maxitems = FIRSTCHUNKSIZE;
-        chunk->next = *listHdr;
-        *listHdr = chunk;
-    }
-    else if (chunk->nitems >= chunk->maxitems)
+    if (nextindex >= ima->maxmsgs)
     {
-        /* Need another chunk; double size of last chunk */
-        int            chunksize = 2 * chunk->maxitems;
-
-        chunk = (InvalidationChunk *)
-            MemoryContextAlloc(CurTransactionContext,
-                               offsetof(InvalidationChunk, msgs) +
-                               chunksize * sizeof(SharedInvalidationMessage));
-        chunk->nitems = 0;
-        chunk->maxitems = chunksize;
-        chunk->next = *listHdr;
-        *listHdr = chunk;
+        if (ima->msgs == NULL)
+        {
+            /* Create new storage array in TopTransactionContext */
+            int            reqsize = 32;    /* arbitrary */
+
+            ima->msgs = (SharedInvalidationMessage *)
+                MemoryContextAlloc(TopTransactionContext,
+                                   reqsize * sizeof(SharedInvalidationMessage));
+            ima->maxmsgs = reqsize;
+            Assert(nextindex == 0);
+        }
+        else
+        {
+            /* Enlarge storage array */
+            int            reqsize = 2 * ima->maxmsgs;
+
+            ima->msgs = (SharedInvalidationMessage *)
+                repalloc(ima->msgs,
+                         reqsize * sizeof(SharedInvalidationMessage));
+            ima->maxmsgs = reqsize;
+        }
     }
-    /* Okay, add message to current chunk */
-    chunk->msgs[chunk->nitems] = *msg;
-    chunk->nitems++;
+    /* Okay, add message to current group */
+    ima->msgs[nextindex] = *msg;
+    group->nextmsg[subgroup]++;
 }

 /*
- * Append one list of invalidation message chunks to another, resetting
- * the source chunk-list pointer to NULL.
+ * Append one subgroup of invalidation messages to another, resetting
+ * the source subgroup to empty.
  */
 static void
-AppendInvalidationMessageList(InvalidationChunk **destHdr,
-                              InvalidationChunk **srcHdr)
+AppendInvalidationMessageSubGroup(InvalidationMsgsGroup *dest,
+                                  InvalidationMsgsGroup *src,
+                                  int subgroup)
 {
-    InvalidationChunk *chunk = *srcHdr;
-
-    if (chunk == NULL)
-        return;                    /* nothing to do */
-
-    while (chunk->next != NULL)
-        chunk = chunk->next;
+    /* Messages must be adjacent in main array */
+    Assert(dest->nextmsg[subgroup] == src->firstmsg[subgroup]);

-    chunk->next = *destHdr;
+    /* ... which makes this easy: */
+    dest->nextmsg[subgroup] = src->nextmsg[subgroup];

-    *destHdr = *srcHdr;
-
-    *srcHdr = NULL;
+    /*
+     * This is handy for some callers and irrelevant for others.  But we do it
+     * always, reasoning that it's bad to leave different groups pointing at
+     * the same fragment of the message array.
+     */
+    SetSubGroupToFollow(src, dest, subgroup);
 }

 /*
- * Process a list of invalidation messages.
+ * Process a subgroup of invalidation messages.
  *
  * This is a macro that executes the given code fragment for each message in
- * a message chunk list.  The fragment should refer to the message as *msg.
+ * a message subgroup.  The fragment should refer to the message as *msg.
  */
-#define ProcessMessageList(listHdr, codeFragment) \
+#define ProcessMessageSubGroup(group, subgroup, codeFragment) \
     do { \
-        InvalidationChunk *_chunk; \
-        for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
+        int        _msgindex = (group)->firstmsg[subgroup]; \
+        int        _endmsg = (group)->nextmsg[subgroup]; \
+        for (; _msgindex < _endmsg; _msgindex++) \
         { \
-            int        _cindex; \
-            for (_cindex = 0; _cindex < _chunk->nitems; _cindex++) \
-            { \
-                SharedInvalidationMessage *msg = &_chunk->msgs[_cindex]; \
-                codeFragment; \
-            } \
+            SharedInvalidationMessage *msg = \
+                &InvalMessageArrays[subgroup].msgs[_msgindex]; \
+            codeFragment; \
         } \
     } while (0)

 /*
- * Process a list of invalidation messages group-wise.
+ * Process a subgroup of invalidation messages as an array.
  *
  * As above, but the code fragment can handle an array of messages.
  * The fragment should refer to the messages as msgs[], with n entries.
  */
-#define ProcessMessageListMulti(listHdr, codeFragment) \
+#define ProcessMessageSubGroupMulti(group, subgroup, codeFragment) \
     do { \
-        InvalidationChunk *_chunk; \
-        for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
-        { \
-            SharedInvalidationMessage *msgs = _chunk->msgs; \
-            int        n = _chunk->nitems; \
+        int        n = NumMessagesInSubGroup(group, subgroup); \
+        if (n > 0) { \
+            SharedInvalidationMessage *msgs = \
+                &InvalMessageArrays[subgroup].msgs[(group)->firstmsg[subgroup]]; \
             codeFragment; \
         } \
     } while (0)


 /* ----------------------------------------------------------------
- *                Invalidation set support functions
+ *                Invalidation group support functions
  *
  * These routines understand about the division of a logical invalidation
- * list into separate physical lists for catcache and relcache entries.
+ * group into separate physical arrays for catcache and relcache entries.
  * ----------------------------------------------------------------
  */

@@ -343,7 +378,7 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
  * Add a catcache inval entry
  */
 static void
-AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
+AddCatcacheInvalidationMessage(InvalidationMsgsGroup *group,
                                int id, uint32 hashValue, Oid dbId)
 {
     SharedInvalidationMessage msg;
@@ -364,14 +399,14 @@ AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
      */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->cclist, &msg);
+    AddInvalidationMessage(group, CatCacheMsgs, &msg);
 }

 /*
  * Add a whole-catalog inval entry
  */
 static void
-AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
+AddCatalogInvalidationMessage(InvalidationMsgsGroup *group,
                               Oid dbId, Oid catId)
 {
     SharedInvalidationMessage msg;
@@ -382,14 +417,14 @@ AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
     /* check AddCatcacheInvalidationMessage() for an explanation */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->cclist, &msg);
+    AddInvalidationMessage(group, CatCacheMsgs, &msg);
 }

 /*
  * Add a relcache inval entry
  */
 static void
-AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
+AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
                                Oid dbId, Oid relId)
 {
     SharedInvalidationMessage msg;
@@ -399,11 +434,11 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
      * it will never change. InvalidOid for relId means all relations so we
      * don't need to add individual ones when it is present.
      */
-    ProcessMessageList(hdr->rclist,
-                       if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
-                           (msg->rc.relId == relId ||
-                            msg->rc.relId == InvalidOid))
-                       return);
+    ProcessMessageSubGroup(group, RelCacheMsgs,
+                           if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
+                               (msg->rc.relId == relId ||
+                                msg->rc.relId == InvalidOid))
+                           return);

     /* OK, add the item */
     msg.rc.id = SHAREDINVALRELCACHE_ID;
@@ -412,24 +447,26 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
     /* check AddCatcacheInvalidationMessage() for an explanation */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->rclist, &msg);
+    AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }

 /*
  * Add a snapshot inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
  */
 static void
-AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
+AddSnapshotInvalidationMessage(InvalidationMsgsGroup *group,
                                Oid dbId, Oid relId)
 {
     SharedInvalidationMessage msg;

     /* Don't add a duplicate item */
     /* We assume dbId need not be checked because it will never change */
-    ProcessMessageList(hdr->rclist,
-                       if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
-                           msg->sn.relId == relId)
-                       return);
+    ProcessMessageSubGroup(group, RelCacheMsgs,
+                           if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
+                               msg->sn.relId == relId)
+                           return);

     /* OK, add the item */
     msg.sn.id = SHAREDINVALSNAPSHOT_ID;
@@ -438,33 +475,33 @@ AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
     /* check AddCatcacheInvalidationMessage() for an explanation */
     VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));

-    AddInvalidationMessage(&hdr->rclist, &msg);
+    AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }

 /*
- * Append one list of invalidation messages to another, resetting
- * the source list to empty.
+ * Append one group of invalidation messages to another, resetting
+ * the source group to empty.
  */
 static void
-AppendInvalidationMessages(InvalidationListHeader *dest,
-                           InvalidationListHeader *src)
+AppendInvalidationMessages(InvalidationMsgsGroup *dest,
+                           InvalidationMsgsGroup *src)
 {
-    AppendInvalidationMessageList(&dest->cclist, &src->cclist);
-    AppendInvalidationMessageList(&dest->rclist, &src->rclist);
+    AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs);
+    AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs);
 }

 /*
- * Execute the given function for all the messages in an invalidation list.
- * The list is not altered.
+ * Execute the given function for all the messages in an invalidation group.
+ * The group is not altered.
  *
  * catcache entries are processed first, for reasons mentioned above.
  */
 static void
-ProcessInvalidationMessages(InvalidationListHeader *hdr,
+ProcessInvalidationMessages(InvalidationMsgsGroup *group,
                             void (*func) (SharedInvalidationMessage *msg))
 {
-    ProcessMessageList(hdr->cclist, func(msg));
-    ProcessMessageList(hdr->rclist, func(msg));
+    ProcessMessageSubGroup(group, CatCacheMsgs, func(msg));
+    ProcessMessageSubGroup(group, RelCacheMsgs, func(msg));
 }

 /*
@@ -472,11 +509,11 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
  * rather than just one at a time.
  */
 static void
-ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
+ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group,
                                  void (*func) (const SharedInvalidationMessage *msgs, int n))
 {
-    ProcessMessageListMulti(hdr->cclist, func(msgs, n));
-    ProcessMessageListMulti(hdr->rclist, func(msgs, n));
+    ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n));
+    ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n));
 }

 /* ----------------------------------------------------------------
@@ -731,7 +768,7 @@ AcceptInvalidationMessages(void)

 /*
  * PrepareInvalidationState
- *        Initialize inval lists for the current (sub)transaction.
+ *        Initialize inval data for the current (sub)transaction.
  */
 static void
 PrepareInvalidationState(void)
@@ -748,12 +785,45 @@ PrepareInvalidationState(void)
     myInfo->parent = transInvalInfo;
     myInfo->my_level = GetCurrentTransactionNestLevel();

-    /*
-     * If there's any previous entry, this one should be for a deeper nesting
-     * level.
-     */
-    Assert(transInvalInfo == NULL ||
-           myInfo->my_level > transInvalInfo->my_level);
+    /* Now, do we have a previous stack entry? */
+    if (transInvalInfo != NULL)
+    {
+        /* Yes; this one should be for a deeper nesting level. */
+        Assert(myInfo->my_level > transInvalInfo->my_level);
+
+        /*
+         * The parent (sub)transaction must not have any current (i.e.,
+         * not-yet-locally-processed) messages.  If it did, we'd have a
+         * semantic problem: the new subtransaction presumably ought not be
+         * able to see those events yet, but since the CommandCounter is
+         * linear, that can't work once the subtransaction advances the
+         * counter.  This is a convenient place to check for that, as well as
+         * being important to keep management of the message arrays simple.
+         */
+        if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) != 0)
+            elog(ERROR, "cannot start a subtransaction when there are unprocessed inval messages");
+
+        /*
+         * MemoryContextAllocZero set firstmsg = nextmsg = 0 in each group,
+         * which is fine for the first (sub)transaction, but otherwise we need
+         * to update them to follow whatever is already in the arrays.
+         */
+        SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
+                         &transInvalInfo->CurrentCmdInvalidMsgs);
+        SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
+                         &myInfo->PriorCmdInvalidMsgs);
+    }
+    else
+    {
+        /*
+         * Here, we need only clear any array pointers left over from a prior
+         * transaction.
+         */
+        InvalMessageArrays[CatCacheMsgs].msgs = NULL;
+        InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
+        InvalMessageArrays[RelCacheMsgs].msgs = NULL;
+        InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
+    }

     transInvalInfo = myInfo;
 }
@@ -777,47 +847,8 @@ PostPrepare_Inval(void)
 }

 /*
- * Collect invalidation messages into SharedInvalidMessagesArray array.
- */
-static void
-MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n)
-{
-    /*
-     * Initialise array first time through in each commit
-     */
-    if (SharedInvalidMessagesArray == NULL)
-    {
-        maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE;
-        numSharedInvalidMessagesArray = 0;
-
-        /*
-         * Although this is being palloc'd we don't actually free it directly.
-         * We're so close to EOXact that we now we're going to lose it anyhow.
-         */
-        SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray
-                                            * sizeof(SharedInvalidationMessage));
-    }
-
-    if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
-    {
-        maxSharedInvalidMessagesArray = pg_nextpower2_32(numSharedInvalidMessagesArray + n);
-
-        SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray,
-                                              maxSharedInvalidMessagesArray
-                                              * sizeof(SharedInvalidationMessage));
-    }
-
-    /*
-     * Append the next chunk onto the array
-     */
-    memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray,
-           msgs, n * sizeof(SharedInvalidationMessage));
-    numSharedInvalidMessagesArray += n;
-}
-
-/*
- * xactGetCommittedInvalidationMessages() is executed by
- * RecordTransactionCommit() to add invalidation messages onto the
+ * xactGetCommittedInvalidationMessages() is called by
+ * RecordTransactionCommit() to collect invalidation messages to add to the
  * commit record. This applies only to commit message types, never to
  * abort records. Must always run before AtEOXact_Inval(), since that
  * removes the data we need to see.
@@ -832,7 +863,9 @@ int
 xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
                                      bool *RelcacheInitFileInval)
 {
-    MemoryContext oldcontext;
+    SharedInvalidationMessage *msgarray;
+    int            nummsgs;
+    int            nmsgs;

     /* Quick exit if we haven't done anything with invalidation messages. */
     if (transInvalInfo == NULL)
@@ -853,27 +886,48 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
     *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;

     /*
-     * Walk through TransInvalidationInfo to collect all the messages into a
-     * single contiguous array of invalidation messages. It must be contiguous
-     * so we can copy directly into WAL message. Maintain the order that they
-     * would be processed in by AtEOXact_Inval(), to ensure emulated behaviour
-     * in redo is as similar as possible to original. We want the same bugs,
-     * if any, not new ones.
+     * Collect all the pending messages into a single contiguous array of
+     * invalidation messages, to simplify what needs to happen while building
+     * the commit WAL message.  Maintain the order that they would be
+     * processed in by AtEOXact_Inval(), to ensure emulated behaviour in redo
+     * is as similar as possible to original.  We want the same bugs, if any,
+     * not new ones.
      */
-    oldcontext = MemoryContextSwitchTo(CurTransactionContext);
-
-    ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                     MakeSharedInvalidMessagesArray);
-    ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
-                                     MakeSharedInvalidMessagesArray);
-    MemoryContextSwitchTo(oldcontext);
-
-    Assert(!(numSharedInvalidMessagesArray > 0 &&
-             SharedInvalidMessagesArray == NULL));
-
-    *msgs = SharedInvalidMessagesArray;
-
-    return numSharedInvalidMessagesArray;
+    nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
+        NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
+
+    *msgs = msgarray = (SharedInvalidationMessage *)
+        MemoryContextAlloc(CurTransactionContext,
+                           nummsgs * sizeof(SharedInvalidationMessage));
+
+    nmsgs = 0;
+    ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                CatCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                CatCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                RelCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                RelCacheMsgs,
+                                (memcpy(msgarray + nmsgs,
+                                        msgs,
+                                        n * sizeof(SharedInvalidationMessage)),
+                                 nmsgs += n));
+    Assert(nmsgs == nummsgs);
+
+    return nmsgs;
 }

 /*
@@ -942,7 +996,7 @@ ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs,
  * about CurrentCmdInvalidMsgs too, since those changes haven't touched
  * the caches yet.
  *
- * In any case, reset the various lists to empty.  We need not physically
+ * In any case, reset our state to empty.  We need not physically
  * free memory here, since TopTransactionContext is about to be emptied
  * anyway.
  *
@@ -986,8 +1040,6 @@ AtEOXact_Inval(bool isCommit)

     /* Need not free anything explicitly */
     transInvalInfo = NULL;
-    SharedInvalidMessagesArray = NULL;
-    numSharedInvalidMessagesArray = 0;
 }

 /*
@@ -1043,10 +1095,21 @@ AtEOSubXact_Inval(bool isCommit)
             return;
         }

-        /* Pass up my inval messages to parent */
+        /*
+         * Pass up my inval messages to parent.  Notice that we stick them in
+         * PriorCmdInvalidMsgs, not CurrentCmdInvalidMsgs, since they've
+         * already been locally processed.  (This would trigger the Assert in
+         * AppendInvalidationMessageSubGroup if the parent's
+         * CurrentCmdInvalidMsgs isn't empty; but we already checked that in
+         * PrepareInvalidationState.)
+         */
         AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs,
                                    &myInfo->PriorCmdInvalidMsgs);

+        /* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
+        SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
+                         &myInfo->parent->PriorCmdInvalidMsgs);
+
         /* Pending relcache inval becomes parent's problem too */
         if (myInfo->RelcacheInitFileInval)
             myInfo->parent->RelcacheInitFileInval = true;
@@ -1514,31 +1577,24 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 /*
  * LogLogicalInvalidations
  *
- * Emit WAL for invalidations.  This is currently only used for logging
- * invalidations at the command end or at commit time if any invalidations
- * are pending.
+ * Emit WAL for invalidations caused by the current command.
+ *
+ * This is currently only used for logging invalidations at the command end
+ * or at commit time if any invalidations are pending.
  */
 void
-LogLogicalInvalidations()
+LogLogicalInvalidations(void)
 {
     xl_xact_invals xlrec;
-    SharedInvalidationMessage *invalMessages;
-    int            nmsgs = 0;
+    InvalidationMsgsGroup *group;
+    int            nmsgs;

     /* Quick exit if we haven't done anything with invalidation messages. */
     if (transInvalInfo == NULL)
         return;

-    ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
-                                     MakeSharedInvalidMessagesArray);
-
-    Assert(!(numSharedInvalidMessagesArray > 0 &&
-             SharedInvalidMessagesArray == NULL));
-
-    invalMessages = SharedInvalidMessagesArray;
-    nmsgs = numSharedInvalidMessagesArray;
-    SharedInvalidMessagesArray = NULL;
-    numSharedInvalidMessagesArray = 0;
+    group = &transInvalInfo->CurrentCmdInvalidMsgs;
+    nmsgs = NumMessagesInGroup(group);

     if (nmsgs > 0)
     {
@@ -1549,10 +1605,12 @@ LogLogicalInvalidations()
         /* perform insertion */
         XLogBeginInsert();
         XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
-        XLogRegisterData((char *) invalMessages,
-                         nmsgs * sizeof(SharedInvalidationMessage));
+        ProcessMessageSubGroupMulti(group, CatCacheMsgs,
+                                    XLogRegisterData((char *) msgs,
+                                                     n * sizeof(SharedInvalidationMessage)));
+        ProcessMessageSubGroupMulti(group, RelCacheMsgs,
+                                    XLogRegisterData((char *) msgs,
+                                                     n * sizeof(SharedInvalidationMessage)));
         XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
-
-        pfree(invalMessages);
     }
 }

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Alvaro Herrera
Дата:
Сообщение: Re: row filtering for logical replication
Следующее
От: "Euler Taveira"
Дата:
Сообщение: Re: row filtering for logical replication