Re: wake up logical workers after ALTER SUBSCRIPTION

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: wake up logical workers after ALTER SUBSCRIPTION
Дата
Msg-id 3355444.1673044286@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: wake up logical workers after ALTER SUBSCRIPTION  (Nathan Bossart <nathandbossart@gmail.com>)
Ответы Re: wake up logical workers after ALTER SUBSCRIPTION  (Nathan Bossart <nathandbossart@gmail.com>)
Список pgsql-hackers
Nathan Bossart <nathandbossart@gmail.com> writes:
> I found some additional places that should remove the last-start time from
> the hash table.  I've added those in v14.

I've pushed 0001 and 0002, which seem pretty uncontroversial.
Attached is a rebased 0003, just to keep the cfbot happy.
I'm kind of wondering whether 0003 is worth the complexity TBH,
but in any case I ran out of time to look at it closely today.

            regards, tom lane

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5bcba0fdec..8f06e234c8 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1991,6 +1991,16 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to read or update information
        about <quote>heavyweight</quote> locks.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalRepLauncherDSA</literal></entry>
+      <entry>Waiting for logical replication launcher dynamic shared memory
+      allocator access</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalRepLauncherHash</literal></entry>
+      <entry>Waiting for logical replication launcher shared memory hash table
+      access</entry>
+     </row>
      <row>
       <entry><literal>LogicalRepWorker</literal></entry>
       <entry>Waiting to read or update the state of logical replication
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f15a332bae..88f180c2a7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1504,6 +1504,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
     }
     list_free(subworkers);

+    /*
+     * Clear the last-start time for the apply worker to free up space.  If
+     * this transaction rolls back, the launcher might restart the apply worker
+     * before wal_retrieve_retry_interval milliseconds have elapsed, but that's
+     * probably okay.
+     */
+    logicalrep_launcher_delete_last_start_time(subid);
+
     /*
      * Cleanup of tablesync replication origins.
      *
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c05..dfe49db64f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -56,6 +57,25 @@
 int            max_logical_replication_workers = 4;
 int            max_sync_workers_per_subscription = 2;

+/* an entry in the last-start times hash table */
+typedef struct LauncherLastStartTimesEntry
+{
+    Oid        subid;
+    TimestampTz last_start_time;
+} LauncherLastStartTimesEntry;
+
+/* parameters for the last-start times hash table */
+static const dshash_parameters dsh_params = {
+    sizeof(Oid),
+    sizeof(LauncherLastStartTimesEntry),
+    dshash_memcmp,
+    dshash_memhash,
+    LWTRANCHE_LAUNCHER_HASH
+};
+
+static dsa_area *last_start_times_dsa = NULL;
+static dshash_table *last_start_times = NULL;
+
 LogicalRepWorker *MyLogicalRepWorker = NULL;

 typedef struct LogicalRepCtxStruct
@@ -63,6 +83,10 @@ typedef struct LogicalRepCtxStruct
     /* Supervisor process. */
     pid_t        launcher_pid;

+    /* hash table for last-start times */
+    dsa_handle    last_start_dsa;
+    dshash_table_handle last_start_dsh;
+
     /* Background workers. */
     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
@@ -74,6 +98,9 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void logicalrep_launcher_attach_dshmem(void);
+static void logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time);
+static TimestampTz logicalrep_launcher_get_last_start_time(Oid subid);

 static bool on_commit_launcher_wakeup = false;

@@ -756,6 +783,9 @@ ApplyLauncherShmemInit(void)
             memset(worker, 0, sizeof(LogicalRepWorker));
             SpinLockInit(&worker->relmutex);
         }
+
+        LogicalRepCtx->last_start_dsa = DSM_HANDLE_INVALID;
+        LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
     }
 }

@@ -801,8 +831,6 @@ ApplyLauncherWakeup(void)
 void
 ApplyLauncherMain(Datum main_arg)
 {
-    TimestampTz last_start_time = 0;
-
     ereport(DEBUG1,
             (errmsg_internal("logical replication launcher started")));

@@ -837,58 +865,55 @@ ApplyLauncherMain(Datum main_arg)

         now = GetCurrentTimestamp();

-        /* Limit the start retry to once a wal_retrieve_retry_interval */
-        if (TimestampDifferenceExceeds(last_start_time, now,
-                                       wal_retrieve_retry_interval))
-        {
-            /* Use temporary context for the database list and worker info. */
-            subctx = AllocSetContextCreate(TopMemoryContext,
-                                           "Logical Replication Launcher sublist",
-                                           ALLOCSET_DEFAULT_SIZES);
-            oldctx = MemoryContextSwitchTo(subctx);
-
-            /* search for subscriptions to start or stop. */
-            sublist = get_subscription_list();
-
-            /* Start the missing workers for enabled subscriptions. */
-            foreach(lc, sublist)
-            {
-                Subscription *sub = (Subscription *) lfirst(lc);
-                LogicalRepWorker *w;
+        /* Use temporary context for the database list and worker info. */
+        subctx = AllocSetContextCreate(TopMemoryContext,
+                                       "Logical Replication Launcher sublist",
+                                       ALLOCSET_DEFAULT_SIZES);
+        oldctx = MemoryContextSwitchTo(subctx);

-                if (!sub->enabled)
-                    continue;
+        sublist = get_subscription_list();
+        foreach(lc, sublist)
+        {
+            Subscription *sub = (Subscription *) lfirst(lc);
+            LogicalRepWorker *w;
+            TimestampTz last_start;

-                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-                w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-                LWLockRelease(LogicalRepWorkerLock);
+            if (!sub->enabled)
+                continue;

-                if (w == NULL)
-                {
-                    last_start_time = now;
-                    wait_time = wal_retrieve_retry_interval;
+            LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+            w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+            LWLockRelease(LogicalRepWorkerLock);

-                    logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-                                             sub->owner, InvalidOid);
-                }
-            }
+            if (w != NULL)
+                continue;

-            /* Switch back to original memory context. */
-            MemoryContextSwitchTo(oldctx);
-            /* Clean the temporary memory. */
-            MemoryContextDelete(subctx);
-        }
-        else
-        {
             /*
-             * The wait in previous cycle was interrupted in less than
-             * wal_retrieve_retry_interval since last worker was started, this
-             * usually means crash of the worker, so we should retry in
-             * wal_retrieve_retry_interval again.
+             * If the worker is eligible to start now, launch it.  Otherwise,
+             * adjust wait_time so that we wake up when it can be started.
              */
-            wait_time = wal_retrieve_retry_interval;
+            last_start = logicalrep_launcher_get_last_start_time(sub->oid);
+            if (TimestampDifferenceExceeds(last_start, now,
+                                           wal_retrieve_retry_interval))
+            {
+                logicalrep_launcher_set_last_start_time(sub->oid, now);
+                logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+                                         sub->owner, InvalidOid);
+            }
+            else
+            {
+                long        elapsed;
+
+                elapsed = TimestampDifferenceMilliseconds(last_start, now);
+                wait_time = Min(wait_time, wal_retrieve_retry_interval - elapsed);
+            }
         }

+        /* Switch back to original memory context. */
+        MemoryContextSwitchTo(oldctx);
+        /* Clean the temporary memory. */
+        MemoryContextDelete(subctx);
+
         /* Wait for more work. */
         rc = WaitLatch(MyLatch,
                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -996,3 +1021,89 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)

     return (Datum) 0;
 }
+
+/*
+ * Initialize or attach to the dynamic shared hash table that stores the
+ * last-start times, if not already done.  This must be called before using the
+ * table.
+ */
+static void
+logicalrep_launcher_attach_dshmem(void)
+{
+    MemoryContext oldcontext;
+
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+
+    if (LogicalRepCtx->last_start_dsh == DSM_HANDLE_INVALID)
+    {
+        /* Initialize dynamic shared hash table for last-start times. */
+        last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
+        dsa_pin(last_start_times_dsa);
+        dsa_pin_mapping(last_start_times_dsa);
+        last_start_times = dshash_create(last_start_times_dsa, &dsh_params, 0);
+
+        /* Store handles in shared memory for other backends to use. */
+        LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
+        LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
+    }
+    else if (!last_start_times)
+    {
+        /* Attach to existing dynamic shared hash table. */
+        last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
+        dsa_pin_mapping(last_start_times_dsa);
+        last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
+                                         LogicalRepCtx->last_start_dsh, 0);
+    }
+
+    LWLockRelease(LogicalRepWorkerLock);
+    MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Set the last-start time for the subscription.
+ */
+static void
+logicalrep_launcher_set_last_start_time(Oid subid, TimestampTz start_time)
+{
+    LauncherLastStartTimesEntry *entry;
+    bool        found;
+
+    logicalrep_launcher_attach_dshmem();
+
+    entry = dshash_find_or_insert(last_start_times, &subid, &found);
+    entry->last_start_time = start_time;
+    dshash_release_lock(last_start_times, entry);
+}
+
+/*
+ * Return the last-start time for the subscription, or 0 if there isn't one.
+ */
+static TimestampTz
+logicalrep_launcher_get_last_start_time(Oid subid)
+{
+    LauncherLastStartTimesEntry *entry;
+    TimestampTz ret;
+
+    logicalrep_launcher_attach_dshmem();
+
+    entry = dshash_find(last_start_times, &subid, false);
+    if (entry == NULL)
+        return 0;
+
+    ret = entry->last_start_time;
+    dshash_release_lock(last_start_times, entry);
+
+    return ret;
+}
+
+/*
+ * Remove the last-start time for the subscription, if one exists.
+ */
+void
+logicalrep_launcher_delete_last_start_time(Oid subid)
+{
+    logicalrep_launcher_attach_dshmem();
+
+    (void) dshash_delete_key(last_start_times, &subid);
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 09b3e8b32a..61be761e03 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -618,6 +618,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                 ereport(LOG,
                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that
two_phasecan be enabled", 
                                 MySubscription->name)));
+
+                /*
+                 * Clear the last-start time for this worker so that the
+                 * launcher will restart it immediately.
+                 */
+                logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
                 should_exit = true;
             }
         }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8649e142c..1e4d94d359 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -164,6 +164,7 @@
 #include "postmaster/walwriter.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/logicallauncher.h"
 #include "replication/logicalproto.h"
 #include "replication/logicalrelation.h"
 #include "replication/logicalworker.h"
@@ -3089,6 +3090,8 @@ maybe_reread_subscription(void)
                         "stop because the subscription was removed",
                         MySubscription->name)));

+        if (!am_tablesync_worker())
+            logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
         proc_exit(0);
     }

@@ -3100,6 +3103,8 @@ maybe_reread_subscription(void)
                         "stop because the subscription was disabled",
                         MySubscription->name)));

+        if (!am_tablesync_worker())
+            logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
         proc_exit(0);
     }

@@ -3126,6 +3131,13 @@ maybe_reread_subscription(void)
                 (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter
change",
                         MySubscription->name)));

+        /*
+         * Clear the last-start time for the apply worker so that the launcher
+         * will restart it immediately, bypassing wal_retrieve_retry_interval.
+         */
+        if (!am_tablesync_worker())
+            logicalrep_launcher_delete_last_start_time(MySubscription->oid);
+
         proc_exit(0);
     }

@@ -3671,6 +3683,8 @@ ApplyWorkerMain(Datum main_arg)
                 (errmsg("logical replication apply worker for subscription %u will not "
                         "start because the subscription was removed during startup",
                         MyLogicalRepWorker->subid)));
+        if (!am_tablesync_worker())
+            logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
         proc_exit(0);
     }

@@ -3684,6 +3698,8 @@ ApplyWorkerMain(Datum main_arg)
                         "start because the subscription was disabled during startup",
                         MySubscription->name)));

+        if (!am_tablesync_worker())
+            logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
         proc_exit(0);
     }

@@ -3884,6 +3900,8 @@ DisableSubscriptionAndExit(void)
             errmsg("subscription \"%s\" has been disabled because of an error",
                    MySubscription->name));

+    if (!am_tablesync_worker())
+        logicalrep_launcher_delete_last_start_time(MyLogicalRepWorker->subid);
     proc_exit(0);
 }

diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 196bece0a3..d2ec396045 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -186,6 +186,10 @@ static const char *const BuiltinTrancheNames[] = {
     "PgStatsHash",
     /* LWTRANCHE_PGSTATS_DATA: */
     "PgStatsData",
+    /* LWTRANCHE_LAUNCHER_DSA: */
+    "LogicalRepLauncherDSA",
+    /* LWTRANCHE_LAUNCHER_HASH: */
+    "LogicalRepLauncherHash",
 };

 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index e1661b6c91..00f6f89d72 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -26,4 +26,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);

 extern bool IsLogicalLauncher(void);

+extern void logicalrep_launcher_delete_last_start_time(Oid subid);
+
 #endif                            /* LOGICALLAUNCHER_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e4162db613..d2c7afb8f4 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -204,6 +204,8 @@ typedef enum BuiltinTrancheIds
     LWTRANCHE_PGSTATS_DSA,
     LWTRANCHE_PGSTATS_HASH,
     LWTRANCHE_PGSTATS_DATA,
+    LWTRANCHE_LAUNCHER_DSA,
+    LWTRANCHE_LAUNCHER_HASH,
     LWTRANCHE_FIRST_USER_DEFINED
 }            BuiltinTrancheIds;


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Dag Lem
Дата:
Сообщение: Re: daitch_mokotoff module
Следующее
От: Tomas Vondra
Дата:
Сообщение: Re: postgres_fdw: using TABLESAMPLE to collect remote sample