Re: BUG #15293: Stored Procedure Triggered by Logical Replication is Unable to use Notification Events

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: BUG #15293: Stored Procedure Triggered by Logical Replication is Unable to use Notification Events
Дата
Msg-id 284756.1631467798@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: BUG #15293: Stored Procedure Triggered by Logical Replication is Unable to use Notification Events  (Artur Zakirov <artur.zakirov@adjust.com>)
Ответы Re: BUG #15293: Stored Procedure Triggered by Logical Replication is Unable to use Notification Events  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
Artur Zakirov <artur.zakirov@adjust.com> writes:
> [ v2-0001-signal-backends-on-commit.patch ]

I had an epiphany while looking at this.  Now that PostgresMain
calls ProcessNotifyInterrupt at the same place it calls
ProcessCompletedNotifies (which it does since 790026972), we don't
actually need ProcessCompletedNotifies to read self-notifies either.
If we merely set notifyInterruptPending, ProcessNotifyInterrupt will
handle that just fine.  With the other changes already under discussion,
this means ProcessCompletedNotifies can go away entirely.

That's not only less code, but fewer cycles: in cases where we have both
self-notifies and inbound notify signals, the existing code starts two
transactions and runs asyncQueueReadAllNotifications twice, but there's
no need to do it more than once.  Self-notifies become less of a special
case on the sending side too, since we can just treat that as signalling
ourselves --- though it still seems worthwhile to optimize that by
setting notifyInterruptPending directly instead of invoking kill().

Hence, I present the attached, which also tweaks things to avoid an
extra pq_flush in the end-of-command code path, and improves the
comments to discuss the issue of NOTIFYs sent by procedures.

There is still a loose end we ought to think about: what to do when
someone issues LISTEN in a background worker.  With the code as
it stands, or with this patch, the worker will block cleanout of
the async SLRU since it will never read any messages.  (With
the code as it stands, a bgworker author can ameliorate that by
calling ProcessCompletedNotifies, but this patch is going to either
eliminate ProcessCompletedNotifies or turn it into a no-op.  In
any case, we still have a problem if an ill-considered trigger
issues LISTEN in a replication worker.)

I'm inclined to think we should flat-out reject LISTEN in any process
that is not attached to a frontend, at least until somebody takes the
trouble to add infrastructure that would let it be useful.  I've not
done that here though; I'm not quite sure what we should test for.

            regards, tom lane

diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml
index c0811935a1..73207f72fe 100644
--- a/doc/src/sgml/bgworker.sgml
+++ b/doc/src/sgml/bgworker.sgml
@@ -280,15 +280,13 @@ typedef struct BackgroundWorker
   </para>

   <para>
-   If a background worker sends asynchronous notifications with the
-   <command>NOTIFY</command> command via the Server Programming Interface
-   (<acronym>SPI</acronym>), it should call
-   <function>ProcessCompletedNotifies</function> explicitly after committing
-   the enclosing transaction so that any notifications can be delivered.  If a
-   background worker registers to receive asynchronous notifications with
-   the <command>LISTEN</command> through <acronym>SPI</acronym>, the worker
-   will log those notifications, but there is no programmatic way for the
-   worker to intercept and respond to those notifications.
+   Background workers can send asynchronous notification messages, either by
+   using the <command>NOTIFY</command> command via <acronym>SPI</acronym>,
+   or directly via <function>Async_Notify()</function>.  Such notifications
+   will be sent at transaction commit.
+   Background workers should not register to receive asynchronous
+   notifications with the <command>LISTEN</command> command, as there is no
+   infrastructure for a worker to consume such notifications.
   </para>

   <para>
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 387f80419a..6597ec45a9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2269,7 +2269,17 @@ CommitTransaction(void)
      */
     smgrDoPendingDeletes(true);

+    /*
+     * Send out notification signals to other backends (and do other
+     * post-commit NOTIFY cleanup).  This must not happen until after our
+     * transaction is fully done from the viewpoint of other backends.
+     */
     AtCommit_Notify();
+
+    /*
+     * Everything after this should be purely internal-to-this-backend
+     * cleanup.
+     */
     AtEOXact_GUC(true, 1);
     AtEOXact_SPI(true);
     AtEOXact_Enum();
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4b16fb5682..8557008545 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -68,17 +68,27 @@
  *      CommitTransaction() which will then do the actual transaction commit.
  *
  *      After commit we are called another time (AtCommit_Notify()). Here we
- *      make the actual updates to the effective listen state (listenChannels).
- *
- *      Finally, after we are out of the transaction altogether, we check if
- *      we need to signal listening backends.  In SignalBackends() we scan the
- *      list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- *      to every listening backend (we don't know which backend is listening on
- *      which channel so we must signal them all). We can exclude backends that
- *      are already up to date, though, and we can also exclude backends that
- *      are in other databases (unless they are way behind and should be kicked
- *      to make them advance their pointers).  We don't bother with a
- *      self-signal either, but just process the queue directly.
+ *      make any actual updates to the effective listen state (listenChannels).
+ *      Then we signal any backends that may be interested in our messages
+ *      (including our own backend, if listening).  This is done by
+ *      SignalBackends(), which scans the list of listening backends and sends a
+ *      PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
+ *      know which backend is listening on which channel so we must signal them
+ *      all).  We can exclude backends that are already up to date, though, and
+ *      we can also exclude backends that are in other databases (unless they
+ *      are way behind and should be kicked to make them advance their
+ *      pointers).
+ *
+ *      Finally, after we are out of the transaction altogether and about to go
+ *      idle, we scan the queue for messages that need to be sent to our
+ *      frontend (which might be notifies from other backends, or self-notifies
+ *      from our own).  This step is not part of the CommitTransaction sequence
+ *      for two important reasons.  First, we could get errors while sending
+ *      data to our frontend, and it's really bad for errors to happen in
+ *      post-commit cleanup.  Second, in cases where a procedure issues commits
+ *      within a single frontend command, we don't want to send notifies to our
+ *      frontend until the command is done; but notifies to other backends
+ *      should go out immediately after each commit.
  *
  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
  *      sets the process's latch, which triggers the event to be processed
@@ -426,11 +436,8 @@ static bool unlistenExitRegistered = false;
 /* True if we're currently registered as a listener in asyncQueueControl */
 static bool amRegisteredListener = false;

-/* has this backend sent notifications in the current transaction? */
-static bool backendHasSentNotifications = false;
-
 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
-static bool backendTryAdvanceTail = false;
+static bool tryAdvanceTail = false;

 /* GUC parameter */
 bool        Trace_notify = false;
@@ -459,7 +466,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                          char *page_buffer,
                                          Snapshot snapshot);
 static void asyncQueueAdvanceTail(void);
-static void ProcessIncomingNotify(void);
+static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
 static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
@@ -950,8 +957,6 @@ PreCommit_Notify(void)
                          AccessExclusiveLock);

         /* Now push the notifications into the queue */
-        backendHasSentNotifications = true;
-
         nextNotify = list_head(pendingNotifies->events);
         while (nextNotify != NULL)
         {
@@ -976,6 +981,8 @@ PreCommit_Notify(void)
             nextNotify = asyncQueueAddEntries(nextNotify);
             LWLockRelease(NotifyQueueLock);
         }
+
+        /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
     }
 }

@@ -985,6 +992,11 @@ PreCommit_Notify(void)
  *        This is called at transaction commit, after committing to clog.
  *
  *        Update listenChannels and clear transaction-local state.
+ *
+ *        If we issued any notifications in the transaction, send signals to
+ *        listening backends (possibly including ourselves) to process them.
+ *        Also, if we filled enough queue pages with new notifies, try to
+ *        advance the queue tail pointer.
  */
 void
 AtCommit_Notify(void)
@@ -1027,6 +1039,29 @@ AtCommit_Notify(void)
     if (amRegisteredListener && listenChannels == NIL)
         asyncQueueUnregister();

+    /*
+     * Send signals to listening backends.  We need do this only if there are
+     * pending notifies, which were previously added to the shared queue by
+     * PreCommit_Notify().
+     */
+    if (pendingNotifies != NULL)
+        SignalBackends();
+
+    /*
+     * If it's time to try to advance the global tail pointer, do that.
+     *
+     * (It might seem odd to do this in the sender, when more than likely the
+     * listeners won't yet have read the messages we just sent.  However,
+     * there's less contention if only the sender does it, and there is little
+     * need for urgency in advancing the global tail.  So this typically will
+     * be clearing out messages that were sent some time ago.)
+     */
+    if (tryAdvanceTail)
+    {
+        tryAdvanceTail = false;
+        asyncQueueAdvanceTail();
+    }
+
     /* And clean up */
     ClearPendingActionsAndNotifies();
 }
@@ -1199,85 +1234,6 @@ Exec_UnlistenAllCommit(void)
     listenChannels = NIL;
 }

-/*
- * ProcessCompletedNotifies --- send out signals and self-notifies
- *
- * This is called from postgres.c just before going idle at the completion
- * of a transaction.  If we issued any notifications in the just-completed
- * transaction, send signals to other backends to process them, and also
- * process the queue ourselves to send messages to our own frontend.
- * Also, if we filled enough queue pages with new notifies, try to advance
- * the queue tail pointer.
- *
- * The reason that this is not done in AtCommit_Notify is that there is
- * a nonzero chance of errors here (for example, encoding conversion errors
- * while trying to format messages to our frontend).  An error during
- * AtCommit_Notify would be a PANIC condition.  The timing is also arranged
- * to ensure that a transaction's self-notifies are delivered to the frontend
- * before it gets the terminating ReadyForQuery message.
- *
- * Note that we send signals and process the queue even if the transaction
- * eventually aborted.  This is because we need to clean out whatever got
- * added to the queue.
- *
- * NOTE: we are outside of any transaction here.
- */
-void
-ProcessCompletedNotifies(void)
-{
-    MemoryContext caller_context;
-
-    /* Nothing to do if we didn't send any notifications */
-    if (!backendHasSentNotifications)
-        return;
-
-    /*
-     * We reset the flag immediately; otherwise, if any sort of error occurs
-     * below, we'd be locked up in an infinite loop, because control will come
-     * right back here after error cleanup.
-     */
-    backendHasSentNotifications = false;
-
-    /*
-     * We must preserve the caller's memory context (probably MessageContext)
-     * across the transaction we do here.
-     */
-    caller_context = CurrentMemoryContext;
-
-    if (Trace_notify)
-        elog(DEBUG1, "ProcessCompletedNotifies");
-
-    /*
-     * We must run asyncQueueReadAllNotifications inside a transaction, else
-     * bad things happen if it gets an error.
-     */
-    StartTransactionCommand();
-
-    /* Send signals to other backends */
-    SignalBackends();
-
-    if (listenChannels != NIL)
-    {
-        /* Read the queue ourselves, and send relevant stuff to the frontend */
-        asyncQueueReadAllNotifications();
-    }
-
-    /*
-     * If it's time to try to advance the global tail pointer, do that.
-     */
-    if (backendTryAdvanceTail)
-    {
-        backendTryAdvanceTail = false;
-        asyncQueueAdvanceTail();
-    }
-
-    CommitTransactionCommand();
-
-    MemoryContextSwitchTo(caller_context);
-
-    /* We don't need pq_flush() here since postgres.c will do one shortly */
-}
-
 /*
  * Test whether we are actively listening on the given channel name.
  *
@@ -1543,7 +1499,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
              * pointer (we don't want to actually do that right here).
              */
             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
-                backendTryAdvanceTail = true;
+                tryAdvanceTail = true;

             /* And exit the loop */
             break;
@@ -1658,8 +1614,6 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * We never signal our own process; that should be handled by our caller.
- *
  * Normally we signal only backends in our own database, since only those
  * backends could be interested in notifies we send.  However, if there's
  * notify traffic in our database but no traffic in another database that
@@ -1668,6 +1622,9 @@ asyncQueueFillWarning(void)
  * advance their queue position pointers, allowing the global tail to advance.
  *
  * Since we know the BackendId and the Pid the signaling is quite cheap.
+ *
+ * This is called during CommitTransaction(), so it's important for it
+ * to have very low probability of failure.
  */
 static void
 SignalBackends(void)
@@ -1682,8 +1639,7 @@ SignalBackends(void)
      * list of target PIDs.
      *
      * XXX in principle these pallocs could fail, which would be bad. Maybe
-     * preallocate the arrays?    But in practice this is only run in trivial
-     * transactions, so there should surely be space available.
+     * preallocate the arrays?  They're not that large, though.
      */
     pids = (int32 *) palloc(MaxBackends * sizeof(int32));
     ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
@@ -1696,8 +1652,6 @@ SignalBackends(void)
         QueuePosition pos;

         Assert(pid != InvalidPid);
-        if (pid == MyProcPid)
-            continue;            /* never signal self */
         pos = QUEUE_BACKEND_POS(i);
         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
         {
@@ -1730,6 +1684,16 @@ SignalBackends(void)
     {
         int32        pid = pids[i];

+        /*
+         * If we are signaling our own process, no need to involve the kernel;
+         * just set the flag directly.
+         */
+        if (pid == MyProcPid)
+        {
+            notifyInterruptPending = true;
+            continue;
+        }
+
         /*
          * Note: assuming things aren't broken, a signal failure here could
          * only occur if the target backend exited since we released
@@ -1910,15 +1874,20 @@ HandleNotifyInterrupt(void)
  *        via the process's latch, and this routine will get called.
  *        If we are truly idle (ie, *not* inside a transaction block),
  *        process the incoming notifies.
+ *
+ *        If "flush" is true, force any frontend messages out immediately.
+ *        This can be false when being called at the end of a frontend command,
+ *        since we'll flush after sending ReadyForQuery.
  */
 void
-ProcessNotifyInterrupt(void)
+ProcessNotifyInterrupt(bool flush)
 {
     if (IsTransactionOrTransactionBlock())
         return;                    /* not really idle */

+    /* Loop in case another signal arrives while sending messages */
     while (notifyInterruptPending)
-        ProcessIncomingNotify();
+        ProcessIncomingNotify(flush);
 }


@@ -2180,6 +2149,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
+ *
+ * This is (usually) called during CommitTransaction(), so it's important for
+ * it to have very low probability of failure.
  */
 static void
 asyncQueueAdvanceTail(void)
@@ -2253,17 +2225,16 @@ asyncQueueAdvanceTail(void)
 /*
  * ProcessIncomingNotify
  *
- *        Deal with arriving NOTIFYs from other backends as soon as it's safe to
- *        do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
- *        signal handler, but isn't anymore.
+ *        Scan the queue for arriving notifications and report them to the front
+ *        end.  The notifications might be from other sessions, or our own;
+ *        there's no need to distinguish here.
  *
- *        Scan the queue for arriving notifications and report them to my front
- *        end.
+ *        If "flush" is true, force any frontend messages out immediately.
  *
  *        NOTE: since we are outside any transaction, we must create our own.
  */
 static void
-ProcessIncomingNotify(void)
+ProcessIncomingNotify(bool flush)
 {
     /* We *must* reset the flag */
     notifyInterruptPending = false;
@@ -2288,9 +2259,11 @@ ProcessIncomingNotify(void)
     CommitTransactionCommand();

     /*
-     * Must flush the notify messages to ensure frontend gets them promptly.
+     * If this isn't an end-of-command case, we must flush the notify messages
+     * to ensure frontend gets them promptly.
      */
-    pq_flush();
+    if (flush)
+        pq_flush();

     set_ps_display("idle");

@@ -2315,9 +2288,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
         pq_endmessage(&buf);

         /*
-         * NOTE: we do not do pq_flush() here.  For a self-notify, it will
-         * happen at the end of the transaction, and for incoming notifies
-         * ProcessIncomingNotify will do it after finding all the notifies.
+         * NOTE: we do not do pq_flush() here.  Some level of caller will
+         * handle it later, allowing this message to be combined into a packet
+         * with other ones.
          */
     }
     else
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 58b5960e27..3f9ed549f9 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -504,7 +504,7 @@ ProcessClientReadInterrupt(bool blocked)

         /* Process notify interrupts, if any */
         if (notifyInterruptPending)
-            ProcessNotifyInterrupt();
+            ProcessNotifyInterrupt(true);
     }
     else if (ProcDiePending)
     {
@@ -4373,17 +4373,15 @@ PostgresMain(int argc, char *argv[],
             }
             else
             {
-                /* Send out notify signals and transmit self-notifies */
-                ProcessCompletedNotifies();
-
                 /*
-                 * Also process incoming notifies, if any.  This is mostly to
-                 * ensure stable behavior in tests: if any notifies were
-                 * received during the just-finished transaction, they'll be
-                 * seen by the client before ReadyForQuery is.
+                 * Process incoming notifies (including self-notifies), if
+                 * any, and send relevant messages to the client.  Doing it
+                 * here helps ensure stable behavior in tests: if any notifies
+                 * were received during the just-finished transaction, they'll
+                 * be seen by the client before ReadyForQuery is.
                  */
                 if (notifyInterruptPending)
-                    ProcessNotifyInterrupt();
+                    ProcessNotifyInterrupt(false);

                 pgstat_report_stat(false);

diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 9217f66b91..f371ac896b 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -43,12 +43,11 @@ extern void AtAbort_Notify(void);
 extern void AtSubCommit_Notify(void);
 extern void AtSubAbort_Notify(void);
 extern void AtPrepare_Notify(void);
-extern void ProcessCompletedNotifies(void);

 /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */
 extern void HandleNotifyInterrupt(void);

 /* process interrupts */
-extern void ProcessNotifyInterrupt(void);
+extern void ProcessNotifyInterrupt(bool flush);

 #endif                            /* ASYNC_H */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Pavel Stehule
Дата:
Сообщение: Re: Schema variables - new implementation for Postgres 15
Следующее
От: Andrew Dunstan
Дата:
Сообщение: Re: pg_upgrade test for binary compatibility of core data types