Re: Logical replication - schema change not invalidating the relation cache

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Logical replication - schema change not invalidating the relation cache
Дата
Msg-id 2868694.1672959751@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: Logical replication - schema change not invalidating the relation cache  (vignesh C <vignesh21@gmail.com>)
Ответы Re: Logical replication - schema change not invalidating the relation cache  (vignesh C <vignesh21@gmail.com>)
Список pgsql-hackers
vignesh C <vignesh21@gmail.com> writes:
> On Thu, 5 Jan 2023 at 03:17, Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> The bigger picture here though is that in examples such as the one
>> you gave at the top of the thread, it's not very clear to me that
>> there's *any* principled behavior.  If the connection between publisher
>> and subscriber tables is only the relation name, fine ... but exactly
>> which relation name applies?

> The connection between publisher and subscriber table is based on
> relation id, During the first change relid, relname and schema name
> from publisher will be sent to the subscriber. Subscriber stores these
> id, relname and schema name in the LogicalRepRelMap hash for which
> relation id is the key. Subsequent data received in the subscriber
> will use the relation id received from the publisher and apply the
> changes in the subscriber.

Hm.  I spent some time cleaning up this patch, and found that there's
still a problem.  ISTM that the row with value "3" ought to end up
in the subscriber's sch2.t1 table, but it does not: the attached
test script fails with

t/100_bugs.pl .. 6/? 
#   Failed test 'check data in subscriber sch2.t1 after schema rename'
#   at t/100_bugs.pl line 361.
#          got: ''
#     expected: '3'
# Looks like you failed 1 test of 9.
t/100_bugs.pl .. Dubious, test returned 1 (wstat 256, 0x100)
Failed 1/9 subtests 

What's up with that?

            regards, tom lane

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..876adab38e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx)

     Assert(RelationSyncCache != NULL);

+    /* We must update the cache entry for a relation after a relcache flush */
     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+    /*
+     * Flush all cache entries after a pg_namespace change, in case it was a
+     * schema rename affecting a relation being replicated.
+     */
+    CacheRegisterSyscacheCallback(NAMESPACEOID,
+                                  rel_sync_cache_publication_cb,
+                                  (Datum) 0);
+
+    /*
+     * Flush all cache entries after any publication changes.  (We need no
+     * callback entry for pg_publication, because publication_invalidation_cb
+     * will take care of it.)
+     */
     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
                                   rel_sync_cache_publication_cb,
                                   (Datum) 0);
@@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 /*
  * Publication relation/schema map syscache invalidation callback
  *
- * Called for invalidations on pg_publication, pg_publication_rel, and
- * pg_publication_namespace.
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
     /*
      * We can get here if the plugin was used in SQL interface as the
      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
-     * is no way to unregister the relcache invalidation callback.
+     * is no way to unregister the invalidation callbacks.
      */
     if (RelationSyncCache == NULL)
         return;

     /*
-     * There is no way to find which entry in our cache the hash belongs to so
-     * mark the whole cache as invalid.
+     * We have no easy way to identify which cache entries this invalidation
+     * event might have affected, so just mark them all invalid.
      */
     hash_seq_init(&status, RelationSyncCache);
     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 143caac792..de827f8777 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1');
 pass('index predicates do not cause crash');

 # We'll re-use these nodes below, so drop their replication state.
-# We don't bother to drop the tables though.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+# Drop the tables too.
+$node_publisher->safe_psql('postgres', "DROP TABLE tab1");

 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
@@ -307,6 +308,58 @@ is( $node_subscriber->safe_psql(
     qq(-1|1),
     "update works with REPLICA IDENTITY");

+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+
+# Test schema invalidation by renaming the schema
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will cover t1 under both schema names
+$node_publisher->safe_psql('postgres',
+    "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+    "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Check what happens to data inserted before and after schema rename
+$node_publisher->safe_psql(
+    'postgres',
+    "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1,
+# but not the row inserted into the old sch1.t1 post-rename.
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is( $result, qq(1
+2), 'check data in subscriber sch1.t1 after schema rename');
+
+# Instead, that row should appear in sch2.t1.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is($result, qq(3), 'check data in subscriber sch2.t1 after schema rename');
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: postgres_fdw: using TABLESAMPLE to collect remote sample
Следующее
От: Alex Fan
Дата:
Сообщение: Re: [PATCH] Enable using llvm jitlink as an alternative llvm jit linker of old Rtdyld.