Bugs in pgoutput.c

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Bugs in pgoutput.c
Дата
Msg-id 885288.1641420714@sss.pgh.pa.us
обсуждение исходный текст
Ответы Re: Bugs in pgoutput.c  (Amit Kapila <amit.kapila16@gmail.com>)
Re: Bugs in pgoutput.c  (Amit Kapila <amit.kapila16@gmail.com>)
Список pgsql-hackers
Commit 6ce16088b caused me to look at pgoutput.c's handling of
cache invalidations, and I was pretty appalled by what I found.

* rel_sync_cache_relation_cb does the wrong thing when called for
a cache flush (i.e., relid == 0).  Instead of invalidating all
RelationSyncCache entries as it should, it will do nothing.

* When rel_sync_cache_relation_cb does invalidate an entry,
it immediately zaps the entry->map structure, even though that
might still be in use (as per the adjacent comment that carefully
explains why this isn't safe).  I'm not sure if this could lead
to a dangling-pointer core dump, but it sure seems like it could
lead to failing to translate tuples that are about to be sent.

* Similarly, rel_sync_cache_publication_cb is way too eager to
reset the pubactions flags, which would likely lead to failing
to transmit changes that we should transmit.

The attached patch fixes these things, but I'm still pretty
unhappy with the general design of the data structures in
pgoutput.c, because there is this weird random mishmash of
static variables along with a palloc'd PGOutputData struct.
This cannot work if there are ever two active LogicalDecodingContexts
in the same process.  I don't think serial use of LogicalDecodingContexts
(ie, destroy one and then make another) works very well either,
because pgoutput_shutdown is a mere fig leaf that ignores all the
junk the module previously made (in CacheMemoryContext no less).
So I wonder whether either of those scenarios is possible/supported/
expected to be needed in future.

Also ... maybe I'm not looking in the right place, but I do not
see anything anywhere in logical decoding that is taking any lock
on the relation being processed.  How can that be safe?  In
particular, how do we know that the data collected by get_rel_sync_entry
isn't already stale by the time we return from the function?
Setting replicate_valid = true at the bottom of the function would
overwrite any notification we might have gotten from a syscache callback
while reading catalog data.

            regards, tom lane

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index a08da859b4..f7e991cd16 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
 {
     Oid            relid;            /* relation oid */

+    bool        replicate_valid;    /* overall validity flag for entry */
+
     bool        schema_sent;
     List       *streamed_txns;    /* streamed toplevel transactions with this
                                  * schema */

-    bool        replicate_valid;
+    /* are we publishing this rel? */
     PublicationActions pubactions;

     /*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
 }

 /*
- * Publication cache invalidation callback.
+ * Publication syscache invalidation callback.
+ *
+ * Called for invalidations on pg_publication.
  */
 static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                                               HASH_ENTER, &found);
     Assert(entry != NULL);

-    /* Not found means schema wasn't sent */
+    /* initialize entry, if it's new */
     if (!found)
     {
-        /* immediately make a new entry valid enough to satisfy callbacks */
+        entry->replicate_valid = false;
         entry->schema_sent = false;
         entry->streamed_txns = NIL;
-        entry->replicate_valid = false;
         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
         entry->publish_as_relid = InvalidOid;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
         {
             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
             if (data->publications)
+            {
                 list_free_deep(data->publications);
-
+                data->publications = NIL;
+            }
             data->publications = LoadPublications(data->publication_names);
             MemoryContextSwitchTo(oldctx);
             publications_valid = true;
         }

+        /*
+         * Reset schema_sent status as the relation definition may have
+         * changed.  Also reset pubactions to empty in case rel was dropped
+         * from a publication.  Also free any objects that depended on the
+         * earlier definition.
+         */
+        entry->schema_sent = false;
+        list_free(entry->streamed_txns);
+        entry->streamed_txns = NIL;
+        entry->pubactions.pubinsert = false;
+        entry->pubactions.pubupdate = false;
+        entry->pubactions.pubdelete = false;
+        entry->pubactions.pubtruncate = false;
+        if (entry->map)
+        {
+            /*
+             * Must free the TupleDescs contained in the map explicitly,
+             * because free_conversion_map() doesn't.
+             */
+            FreeTupleDesc(entry->map->indesc);
+            FreeTupleDesc(entry->map->outdesc);
+            free_conversion_map(entry->map);
+        }
+        entry->map = NULL;
+
         /*
          * Build publication cache. We can't use one provided by relcache as
          * relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                     foreach(lc2, ancestors)
                     {
                         Oid            ancestor = lfirst_oid(lc2);
+                        List       *apubids = GetRelationPublications(ancestor);
+                        List       *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));

-                        if (list_member_oid(GetRelationPublications(ancestor),
-                                            pub->oid) ||
-                            list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
-                                            pub->oid))
+                        if (list_member_oid(apubids, pub->oid) ||
+                            list_member_oid(aschemaPubids, pub->oid))
                         {
                             ancestor_published = true;
                             if (pub->pubviaroot)
                                 publish_as_relid = ancestor;
                         }
+                        list_free(apubids);
+                        list_free(aschemaPubids);
                     }
                 }

@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
         }

         list_free(pubids);
+        list_free(schemaPubids);

         entry->publish_as_relid = publish_as_relid;
         entry->replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
     /*
      * Nobody keeps pointers to entries in this hash table around outside
      * logical decoding callback calls - but invalidation events can come in
-     * *during* a callback if we access the relcache in the callback. Because
-     * of that we must mark the cache entry as invalid but not remove it from
-     * the hash while it could still be referenced, then prune it at a later
-     * safe point.
-     *
-     * Getting invalidations for relations that aren't in the table is
-     * entirely normal, since there's no way to unregister for an invalidation
-     * event. So we don't care if it's found or not.
+     * *during* a callback if we do any syscache or table access in the
+     * callback.  Because of that we must mark the cache entry as invalid but
+     * not damage any of its substructure here.  The next get_rel_sync_entry()
+     * call will rebuild it all.
      */
-    entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
-                                              HASH_FIND, NULL);
-
-    /*
-     * Reset schema sent status as the relation definition may have changed.
-     * Also free any objects that depended on the earlier definition.
-     */
-    if (entry != NULL)
+    if (OidIsValid(relid))
     {
-        entry->schema_sent = false;
-        list_free(entry->streamed_txns);
-        entry->streamed_txns = NIL;
-        if (entry->map)
+        /*
+         * Getting invalidations for relations that aren't in the table is
+         * entirely normal.  So we don't care if it's found or not.
+         */
+        entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
+                                                  HASH_FIND, NULL);
+        if (entry != NULL)
+            entry->replicate_valid = false;
+    }
+    else
+    {
+        /* Whole cache must be flushed. */
+        HASH_SEQ_STATUS status;
+
+        hash_seq_init(&status, RelationSyncCache);
+        while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
         {
-            /*
-             * Must free the TupleDescs contained in the map explicitly,
-             * because free_conversion_map() doesn't.
-             */
-            FreeTupleDesc(entry->map->indesc);
-            FreeTupleDesc(entry->map->outdesc);
-            free_conversion_map(entry->map);
+            entry->replicate_valid = false;
         }
-        entry->map = NULL;
     }
 }

 /*
  * Publication relation/schema map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel, and
+ * pg_publication_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
     {
         entry->replicate_valid = false;
-
-        /*
-         * There might be some relations dropped from the publication so we
-         * don't need to publish the changes for them.
-         */
-        entry->pubactions.pubinsert = false;
-        entry->pubactions.pubupdate = false;
-        entry->pubactions.pubdelete = false;
-        entry->pubactions.pubtruncate = false;
     }
 }


В списке pgsql-hackers по дате отправления:

Предыдущее
От: "Bossart, Nathan"
Дата:
Сообщение: Re: Pre-allocating WAL files
Следующее
От: Alvaro Herrera
Дата:
Сообщение: Re: a misbehavior of partition row movement (?)