Re: [PATCH] Improve performance of NOTIFY over many databases (v2)

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [PATCH] Improve performance of NOTIFY over many databases (v2)
Дата
Msg-id 22742.1568585664@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [PATCH] Improve performance of NOTIFY over many databases (v2)  (Martijn van Oosterhout <kleptog@gmail.com>)
Ответы Re: [PATCH] Improve performance of NOTIFY over many databases (v2)  (Martijn van Oosterhout <kleptog@gmail.com>)
Список pgsql-hackers
Martijn van Oosterhout <kleptog@gmail.com> writes:
> On Sat, 14 Sep 2019 at 17:08, Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> None of this seems to respond to my point: it looks to me like it would
>> work fine if you simply dropped the patch's additions in PreCommit_Notify
>> and ProcessCompletedNotifies, because there is already enough logic to
>> decide when to call asyncQueueAdvanceTail.

> ...
> However, I guess you're thinking of asyncQueueReadAllNotifications()
> triggering if the queue as a whole was too long. This could in
> principle work but it does mean that at some point all backends
> sending NOTIFY are going to start calling asyncQueueAdvanceTail()
> every time, until the tail gets advanced, and if there are many idle
> listening backends behind this could take a while. The slowest backend
> might receive more signals while it is processing and so end up
> running asyncQueueAdvanceTail() twice. The fact that signals coalesce
> stops the process getting completely out of hand but it does feel a
> little uncontrolled.
> The whole point of this patch is to ensure that at any time only one
> backend is being woken up and calling asyncQueueAdvanceTail() at a
> time.

I spent some more time thinking about this, and I'm still not too
satisfied with this patch's approach.  It seems to me the key insights
we're trying to make use of are:

1. We don't really need to keep the global tail pointer exactly
up to date.  It's bad if it falls way behind, but a few pages back
is fine.

2. When sending notifies, only listening backends connected to our
own database need be awakened immediately.  Backends connected to
other DBs will need to advance their queue pointer sometime, but
again it doesn't need to be right away.

3. It's bad for multiple processes to all be trying to do
asyncQueueAdvanceTail concurrently: they'll contend for exclusive
access to the AsyncQueueLock.  Therefore, having the listeners
do it is really the wrong thing, and instead we should do it on
the sending side.

However, the patch as presented doesn't go all the way on point 3,
instead having listeners maybe-or-maybe-not do asyncQueueAdvanceTail
in asyncQueueReadAllNotifications.  I propose that we should go all
the way and just define tail-advancing as something that happens on
the sending side, and only once every few pages.  I also think we
can simplify the handling of other-database listeners by including
them in the set signaled by SignalBackends, but only if they're
several pages behind.  So that leads me to the attached patch;
what do you think?

BTW, in my hands it seems like point 2 (skip wakening other-database
listeners) is the only really significant win here, and of course
that only wins when the notify traffic is spread across a fair number
of databases.  Which I fear is not the typical use-case.  In single-DB
use-cases, point 2 helps not at all.  I had a really hard time measuring
any benefit from point 3 --- I eventually saw a noticeable savings
when I tried having one notifier and 100 listen-only backends, but
again that doesn't seem like a typical use-case.  I could not replicate
your report of lots of time spent in asyncQueueAdvanceTail's lock
acquisition.  I wonder whether you're using a very large max_connections
setting and we already fixed most of the problem with that in bca6e6435.
Still, this patch doesn't seem to make any cases worse, so I don't mind
if it's just improving unusual use-cases.

            regards, tom lane

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b..7791f78 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -75,8 +75,10 @@
  *      list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
  *      to every listening backend (we don't know which backend is listening on
  *      which channel so we must signal them all). We can exclude backends that
- *      are already up to date, though.  We don't bother with a self-signal
- *      either, but just process the queue directly.
+ *      are already up to date, though, and we can also exclude backends that
+ *      are in other databases (unless they are way behind and should be kicked
+ *      to make them advance their pointers).  We don't bother with a
+ *      self-signal either, but just process the queue directly.
  *
  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
  *      sets the process's latch, which triggers the event to be processed
@@ -89,13 +91,14 @@
  *      Inbound-notify processing consists of reading all of the notifications
  *      that have arrived since scanning last time. We read every notification
  *      until we reach either a notification from an uncommitted transaction or
- *      the head pointer's position. Then we check if we were the laziest
- *      backend: if our pointer is set to the same position as the global tail
- *      pointer is set, then we move the global tail pointer ahead to where the
- *      second-laziest backend is (in general, we take the MIN of the current
- *      head position and all active backends' new tail pointers). Whenever we
- *      move the global tail pointer we also truncate now-unused pages (i.e.,
- *      delete files in pg_notify/ that are no longer used).
+ *      the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
+ *      pointer needs to be advanced so that old pages can be truncated.
+ *      This is relatively expensive (notably, it requires an exclusive lock),
+ *      so we don't want to do it often.  We make sending backends do this work
+ *      if they advanced the queue head into a new page, but only once every
+ *      QUEUE_CLEANUP_DELAY pages.
  *
  * An application that listens on the same channel it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
@@ -212,6 +215,19 @@ typedef struct QueuePosition
      (x).offset > (y).offset ? (x) : (y))

 /*
+ * Parameter determining how often we try to advance the tail pointer:
+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
+ * also the distance by which a backend in another database needs to be
+ * behind before we'll decide we need to wake it up to advance its pointer.
+ *
+ * Resist the temptation to make this really large.  While that would save
+ * work in some places, it would add cost in others.  In particular, this
+ * should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
+ * catch up before the pages they'll need to read fall out of SLRU cache.
+ */
+#define QUEUE_CLEANUP_DELAY 4
+
+/*
  * Struct describing a listening backend's status
  */
 typedef struct QueueBackendStatus
@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
     QueuePosition head;            /* head points to the next free location */
-    QueuePosition tail;            /* the global tail is equivalent to the pos of
-                                 * the "slowest" backend */
+    QueuePosition tail;            /* tail must be <= the queue position of every
+                                 * listening backend */
     BackendId    firstListener;    /* id of first listener, or InvalidBackendId */
     TimestampTz lastQueueFillWarn;    /* time of last queue-full msg */
     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
@@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
 /* has this backend sent notifications in the current transaction? */
 static bool backendHasSentNotifications = false;

+/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
+static bool backendTryAdvanceTail = false;
+
 /* GUC parameter */
 bool        Trace_notify = false;

 /* local function prototypes */
+static int    asyncQueuePageDiff(int p, int q);
 static bool asyncQueuePagePrecedes(int p, int q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                          QueuePosition stop,
@@ -436,10 +456,11 @@ static int    notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);

 /*
- * We will work on the page range of 0..QUEUE_MAX_PAGE.
+ * Compute the difference between two queue page numbers (i.e., p - q),
+ * accounting for wraparound.
  */
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
 {
     int            diff;

@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
         diff -= QUEUE_MAX_PAGE + 1;
     else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
         diff += QUEUE_MAX_PAGE + 1;
-    return diff < 0;
+    return diff;
+}
+
+/* Is p < q, accounting for wraparound? */
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+    return asyncQueuePageDiff(p, q) < 0;
 }

 /*
@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
      * notification to the frontend.  Also, although our transaction might
      * have executed NOTIFY, those message(s) aren't queued yet so we can't
      * see them in the queue.
-     *
-     * This will also advance the global tail pointer if possible.
      */
     if (!QUEUE_POS_EQUAL(max, head))
         asyncQueueReadAllNotifications();
@@ -1156,7 +1182,6 @@ void
 ProcessCompletedNotifies(void)
 {
     MemoryContext caller_context;
-    bool        signalled;

     /* Nothing to do if we didn't send any notifications */
     if (!backendHasSentNotifications)
@@ -1185,23 +1210,20 @@ ProcessCompletedNotifies(void)
     StartTransactionCommand();

     /* Send signals to other backends */
-    signalled = SignalBackends();
+    SignalBackends();

     if (listenChannels != NIL)
     {
         /* Read the queue ourselves, and send relevant stuff to the frontend */
         asyncQueueReadAllNotifications();
     }
-    else if (!signalled)
+
+    /*
+     * If it's time to try to advance the global tail pointer, do that.
+     */
+    if (backendTryAdvanceTail)
     {
-        /*
-         * If we found no other listening backends, and we aren't listening
-         * ourselves, then we must execute asyncQueueAdvanceTail to flush the
-         * queue, because ain't nobody else gonna do it.  This prevents queue
-         * overflow when we're sending useless notifies to nobody. (A new
-         * listener could have joined since we looked, but if so this is
-         * harmless.)
-         */
+        backendTryAdvanceTail = false;
         asyncQueueAdvanceTail();
     }

@@ -1242,8 +1264,6 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-    bool        advanceTail;
-
     Assert(listenChannels == NIL);    /* else caller error */

     if (!amRegisteredListener)    /* nothing to do */
@@ -1253,10 +1273,7 @@ asyncQueueUnregister(void)
      * Need exclusive lock here to manipulate list links.
      */
     LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
-    /* check if entry is valid and oldest ... */
-    advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
-        QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
-    /* ... then mark it invalid */
+    /* Mark our entry as invalid */
     QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
     QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
     /* and remove it from the list */
@@ -1278,10 +1295,6 @@ asyncQueueUnregister(void)

     /* mark ourselves as no longer listed in the global array */
     amRegisteredListener = false;
-
-    /* If we were the laziest backend, try to advance the tail pointer */
-    if (advanceTail)
-        asyncQueueAdvanceTail();
 }

 /*
@@ -1467,6 +1480,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
              * page without overrunning the queue.
              */
             slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
+
+            /*
+             * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
+             * set flag to remember that we should try to advance the tail
+             * pointer (we don't want to actually do that right here).
+             */
+            if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+                backendTryAdvanceTail = true;
+
             /* And exit the loop */
             break;
         }
@@ -1570,31 +1592,30 @@ asyncQueueFillWarning(void)
 }

 /*
- * Send signals to all listening backends (except our own).
+ * Send signals to listening backends.
  *
- * Returns true if we sent at least one signal.
+ * We never signal our own process; that should be handled by our caller.
  *
- * Since we need EXCLUSIVE lock anyway we also check the position of the other
- * backends and in case one is already up-to-date we don't signal it.
- * This can happen if concurrent notifying transactions have sent a signal and
- * the signaled backend has read the other notifications and ours in the same
- * step.
+ * Normally we signal only backends in our own database, since only those
+ * backends could be interested in notifies we send.  However, if there's
+ * notify traffic in our database but no traffic in another database that
+ * does have listener(s), those listeners will fall further and further
+ * behind.  Waken them anyway if they're far enough behind, so that they'll
+ * advance their queue position pointers, allowing the global tail to advance.
  *
  * Since we know the BackendId and the Pid the signalling is quite cheap.
  */
-static bool
+static void
 SignalBackends(void)
 {
-    bool        signalled = false;
     int32       *pids;
     BackendId  *ids;
     int            count;
-    int32        pid;

     /*
-     * Identify all backends that are listening and not already up-to-date. We
-     * don't want to send signals while holding the AsyncQueueLock, so we just
-     * build a list of target PIDs.
+     * Identify backends that we need to signal.  We don't want to send
+     * signals while holding the AsyncQueueLock, so this loop just builds a
+     * list of target PIDs.
      *
      * XXX in principle these pallocs could fail, which would be bad. Maybe
      * preallocate the arrays?    But in practice this is only run in trivial
@@ -1607,26 +1628,43 @@ SignalBackends(void)
     LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
     {
-        pid = QUEUE_BACKEND_PID(i);
+        int32        pid = QUEUE_BACKEND_PID(i);
+        QueuePosition pos;
+
         Assert(pid != InvalidPid);
-        if (pid != MyProcPid)
+        if (pid == MyProcPid)
+            continue;            /* never signal self */
+        pos = QUEUE_BACKEND_POS(i);
+        if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
         {
-            QueuePosition pos = QUEUE_BACKEND_POS(i);
-
-            if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
-            {
-                pids[count] = pid;
-                ids[count] = i;
-                count++;
-            }
+            /*
+             * Always signal listeners in our own database, unless they're
+             * already caught up (unlikely, but possible).
+             */
+            if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+                continue;
+        }
+        else
+        {
+            /*
+             * Listeners in other databases should be signaled only if they
+             * are far behind.
+             */
+            if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+                                   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+                continue;
         }
+        /* OK, need to signal this one */
+        pids[count] = pid;
+        ids[count] = i;
+        count++;
     }
     LWLockRelease(AsyncQueueLock);

     /* Now send signals */
     for (int i = 0; i < count; i++)
     {
-        pid = pids[i];
+        int32        pid = pids[i];

         /*
          * Note: assuming things aren't broken, a signal failure here could
@@ -1636,14 +1674,10 @@ SignalBackends(void)
          */
         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
-        else
-            signalled = true;
     }

     pfree(pids);
     pfree(ids);
-
-    return signalled;
 }

 /*
@@ -1844,7 +1878,6 @@ asyncQueueReadAllNotifications(void)
     QueuePosition oldpos;
     QueuePosition head;
     Snapshot    snapshot;
-    bool        advanceTail;

     /* page_buffer must be adequately aligned, so use a union */
     union
@@ -1966,13 +1999,8 @@ asyncQueueReadAllNotifications(void)
         /* Update shared state */
         LWLockAcquire(AsyncQueueLock, LW_SHARED);
         QUEUE_BACKEND_POS(MyBackendId) = pos;
-        advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
         LWLockRelease(AsyncQueueLock);

-        /* If we were the laziest backend, try to advance the tail pointer */
-        if (advanceTail)
-            asyncQueueAdvanceTail();
-
         PG_RE_THROW();
     }
     PG_END_TRY();
@@ -1980,13 +2008,8 @@ asyncQueueReadAllNotifications(void)
     /* Update shared state */
     LWLockAcquire(AsyncQueueLock, LW_SHARED);
     QUEUE_BACKEND_POS(MyBackendId) = pos;
-    advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
     LWLockRelease(AsyncQueueLock);

-    /* If we were the laziest backend, try to advance the tail pointer */
-    if (advanceTail)
-        asyncQueueAdvanceTail();
-
     /* Done with snapshot */
     UnregisterSnapshot(snapshot);
 }

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tomas Vondra
Дата:
Сообщение: Re: (Re)building index using itself or another index of the sametable
Следующее
От: James Coleman
Дата:
Сообщение: Re: [PATCH] Incremental sort (was: PoC: Partial sort)