Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24
Дата
Msg-id 9369.1507741218@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24  (Marko Tiikkaja <marko@joh.to>)
Список pgsql-bugs
I wrote:
> It occurred to me that it's easy to measure the worst case, ie always
> one message per notify interrupt:
> 
> $ cat bnch.txt
> LISTEN foo\; NOTIFY foo;
> $ pgbench -n -c 1 -T 10 -f bnch.txt
> 
> On this case it seems that the patch is circa 2% slower than HEAD,
> though that's only barely above the noise level in my measurements.

On looking at the patch again, I realized that it's not really necessary
to check TransactionIdIsCurrentTransactionId in the per-message loop.
Although it is possible to run the message-scanning code in a transaction
that's executed NOTIFY (if it also executed the session's first LISTEN),
at that point we will not yet have pushed the transaction's messages
into the queue.  Therefore there is no case where the loop can see
messages queued by its own transaction.

Although TransactionIdIsCurrentTransactionId isn't really all that
expensive, removing it brings the above test case to a statistical
tie: median of 3 60-second runs is 26054 tps in HEAD or 26034 tps
with patch.  (The cross-run variation is a couple hundred tps.)

So I'm now satisfied with the attached form of the patch, and
will proceed to make back-ported versions.

            regards, tom lane

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08e..a93c81b 100644
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 137,143 ****
--- 137,145 ----
  #include "utils/builtins.h"
  #include "utils/memutils.h"
  #include "utils/ps_status.h"
+ #include "utils/snapmgr.h"
  #include "utils/timestamp.h"
+ #include "utils/tqual.h"


  /*
*************** static bool SignalBackends(void);
*** 387,393 ****
  static void asyncQueueReadAllNotifications(void);
  static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer);
  static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
  static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
--- 389,396 ----
  static void asyncQueueReadAllNotifications(void);
  static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer,
!                              Snapshot snapshot);
  static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
  static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
*************** PreCommit_Notify(void)
*** 798,804 ****
          }
      }

!     /* Queue any pending notifies */
      if (pendingNotifies)
      {
          ListCell   *nextNotify;
--- 801,807 ----
          }
      }

!     /* Queue any pending notifies (must happen after the above) */
      if (pendingNotifies)
      {
          ListCell   *nextNotify;
*************** Exec_ListenPreCommit(void)
*** 987,993 ****
       * have already committed before we started to LISTEN.
       *
       * Note that we are not yet listening on anything, so we won't deliver any
!      * notification to the frontend.
       *
       * This will also advance the global tail pointer if possible.
       */
--- 990,998 ----
       * have already committed before we started to LISTEN.
       *
       * Note that we are not yet listening on anything, so we won't deliver any
!      * notification to the frontend.  Also, although our transaction might
!      * have executed NOTIFY, those message(s) aren't queued yet so we can't
!      * see them in the queue.
       *
       * This will also advance the global tail pointer if possible.
       */
*************** asyncQueueReadAllNotifications(void)
*** 1744,1749 ****
--- 1749,1755 ----
      volatile QueuePosition pos;
      QueuePosition oldpos;
      QueuePosition head;
+     Snapshot    snapshot;
      bool        advanceTail;

      /* page_buffer must be adequately aligned, so use a union */
*************** asyncQueueReadAllNotifications(void)
*** 1767,1772 ****
--- 1773,1781 ----
          return;
      }

+     /* Get snapshot we'll use to decide which xacts are still in progress */
+     snapshot = RegisterSnapshot(GetLatestSnapshot());
+
      /*----------
       * Note that we deliver everything that we see in the queue and that
       * matches our _current_ listening state.
*************** asyncQueueReadAllNotifications(void)
*** 1854,1860 ****
               * while sending the notifications to the frontend.
               */
              reachedStop = asyncQueueProcessPageEntries(&pos, head,
!                                                        page_buffer.buf);
          } while (!reachedStop);
      }
      PG_CATCH();
--- 1863,1870 ----
               * while sending the notifications to the frontend.
               */
              reachedStop = asyncQueueProcessPageEntries(&pos, head,
!                                                        page_buffer.buf,
!                                                        snapshot);
          } while (!reachedStop);
      }
      PG_CATCH();
*************** asyncQueueReadAllNotifications(void)
*** 1882,1887 ****
--- 1892,1900 ----
      /* If we were the laziest backend, try to advance the tail pointer */
      if (advanceTail)
          asyncQueueAdvanceTail();
+
+     /* Done with snapshot */
+     UnregisterSnapshot(snapshot);
  }

  /*
*************** asyncQueueReadAllNotifications(void)
*** 1903,1909 ****
  static bool
  asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer)
  {
      bool        reachedStop = false;
      bool        reachedEndOfPage;
--- 1916,1923 ----
  static bool
  asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer,
!                              Snapshot snapshot)
  {
      bool        reachedStop = false;
      bool        reachedEndOfPage;
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1928,1934 ****
          /* Ignore messages destined for other databases */
          if (qe->dboid == MyDatabaseId)
          {
!             if (TransactionIdIsInProgress(qe->xid))
              {
                  /*
                   * The source transaction is still in progress, so we can't
--- 1942,1948 ----
          /* Ignore messages destined for other databases */
          if (qe->dboid == MyDatabaseId)
          {
!             if (XidInMVCCSnapshot(qe->xid, snapshot))
              {
                  /*
                   * The source transaction is still in progress, so we can't
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1939,1948 ****
                   * this advance-then-back-up behavior when dealing with an
                   * uncommitted message.)
                   *
!                  * Note that we must test TransactionIdIsInProgress before we
!                  * test TransactionIdDidCommit, else we might return a message
!                  * from a transaction that is not yet visible to snapshots;
!                  * compare the comments at the head of tqual.c.
                   */
                  *current = thisentry;
                  reachedStop = true;
--- 1953,1967 ----
                   * this advance-then-back-up behavior when dealing with an
                   * uncommitted message.)
                   *
!                  * Note that we must test XidInMVCCSnapshot before we test
!                  * TransactionIdDidCommit, else we might return a message from
!                  * a transaction that is not yet visible to snapshots; compare
!                  * the comments at the head of tqual.c.
!                  *
!                  * Also, while our own xact won't be listed in the snapshot,
!                  * we need not check for TransactionIdIsCurrentTransactionId
!                  * because our transaction cannot (yet) have queued any
!                  * messages.
                   */
                  *current = thisentry;
                  reachedStop = true;
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index bbac408..28060c5 100644
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 81,88 ****
  SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
  SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};

- /* local functions */
- static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);

  /*
   * SetHintBits()
--- 81,86 ----
*************** HeapTupleIsSurelyDead(HeapTuple htup, Tr
*** 1482,1488 ****
   * TransactionIdIsCurrentTransactionId first, except for known-committed
   * XIDs which could not be ours anyway.
   */
! static bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
      uint32        i;
--- 1480,1486 ----
   * TransactionIdIsCurrentTransactionId first, except for known-committed
   * XIDs which could not be ours anyway.
   */
! bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
      uint32        i;
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index 9a3b56e..96eaf01 100644
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
*************** extern HTSV_Result HeapTupleSatisfiesVac
*** 78,83 ****
--- 78,84 ----
                           TransactionId OldestXmin, Buffer buffer);
  extern bool HeapTupleIsSurelyDead(HeapTuple htup,
                        TransactionId OldestXmin);
+ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);

  extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
                       uint16 infomask, TransactionId xid);

-- 
Sent via pgsql-bugs mailing list (pgsql-bugs@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-bugs

В списке pgsql-bugs по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5
Следующее
От: Tom Lane
Дата:
Сообщение: Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5