Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24
Дата
Msg-id 7833.1507658856@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24
Список pgsql-bugs
I wrote:
> Wait, I have an idea.  Let's fetch a snapshot at the top of
> ProcessIncomingNotify, and then use the snapshot to decide whether
> xids are running, instead of calling TransactionIdIsInProgress.

Concretely, I suggest the attached patch.  I tried Marko's testbed
against this, and it seems indeed a bit faster than before ---
running it for 100000 notifies takes about 19.25 seconds, rather than
19.5 seconds with HEAD.  But the testbed is probably about the best
case because it has a bunch of threads sending notifies to one
receiver, so that the receiver is quite likely to have multiple
messages to read at a time.  OTOH, in a situation where the NOTIFY
traffic isn't so high, it probably hardly matters anyway.

            regards, tom lane

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08e..bec5185 100644
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 137,143 ****
--- 137,145 ----
  #include "utils/builtins.h"
  #include "utils/memutils.h"
  #include "utils/ps_status.h"
+ #include "utils/snapmgr.h"
  #include "utils/timestamp.h"
+ #include "utils/tqual.h"


  /*
*************** static bool SignalBackends(void);
*** 387,393 ****
  static void asyncQueueReadAllNotifications(void);
  static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer);
  static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
  static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
--- 389,396 ----
  static void asyncQueueReadAllNotifications(void);
  static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer,
!                              Snapshot snapshot);
  static void asyncQueueAdvanceTail(void);
  static void ProcessIncomingNotify(void);
  static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
*************** asyncQueueReadAllNotifications(void)
*** 1744,1749 ****
--- 1747,1753 ----
      volatile QueuePosition pos;
      QueuePosition oldpos;
      QueuePosition head;
+     Snapshot    snapshot;
      bool        advanceTail;

      /* page_buffer must be adequately aligned, so use a union */
*************** asyncQueueReadAllNotifications(void)
*** 1767,1772 ****
--- 1771,1779 ----
          return;
      }

+     /* Get snapshot we'll use to decide which xacts are still in progress */
+     snapshot = RegisterSnapshot(GetLatestSnapshot());
+
      /*----------
       * Note that we deliver everything that we see in the queue and that
       * matches our _current_ listening state.
*************** asyncQueueReadAllNotifications(void)
*** 1854,1860 ****
               * while sending the notifications to the frontend.
               */
              reachedStop = asyncQueueProcessPageEntries(&pos, head,
!                                                        page_buffer.buf);
          } while (!reachedStop);
      }
      PG_CATCH();
--- 1861,1868 ----
               * while sending the notifications to the frontend.
               */
              reachedStop = asyncQueueProcessPageEntries(&pos, head,
!                                                        page_buffer.buf,
!                                                        snapshot);
          } while (!reachedStop);
      }
      PG_CATCH();
*************** asyncQueueReadAllNotifications(void)
*** 1882,1887 ****
--- 1890,1898 ----
      /* If we were the laziest backend, try to advance the tail pointer */
      if (advanceTail)
          asyncQueueAdvanceTail();
+
+     /* Done with snapshot */
+     UnregisterSnapshot(snapshot);
  }

  /*
*************** asyncQueueReadAllNotifications(void)
*** 1903,1909 ****
  static bool
  asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer)
  {
      bool        reachedStop = false;
      bool        reachedEndOfPage;
--- 1914,1921 ----
  static bool
  asyncQueueProcessPageEntries(volatile QueuePosition *current,
                               QueuePosition stop,
!                              char *page_buffer,
!                              Snapshot snapshot)
  {
      bool        reachedStop = false;
      bool        reachedEndOfPage;
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1928,1934 ****
          /* Ignore messages destined for other databases */
          if (qe->dboid == MyDatabaseId)
          {
!             if (TransactionIdIsInProgress(qe->xid))
              {
                  /*
                   * The source transaction is still in progress, so we can't
--- 1940,1947 ----
          /* Ignore messages destined for other databases */
          if (qe->dboid == MyDatabaseId)
          {
!             if (TransactionIdIsCurrentTransactionId(qe->xid) ||
!                 XidInMVCCSnapshot(qe->xid, snapshot))
              {
                  /*
                   * The source transaction is still in progress, so we can't
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1939,1948 ****
                   * this advance-then-back-up behavior when dealing with an
                   * uncommitted message.)
                   *
!                  * Note that we must test TransactionIdIsInProgress before we
!                  * test TransactionIdDidCommit, else we might return a message
!                  * from a transaction that is not yet visible to snapshots;
!                  * compare the comments at the head of tqual.c.
                   */
                  *current = thisentry;
                  reachedStop = true;
--- 1952,1965 ----
                   * this advance-then-back-up behavior when dealing with an
                   * uncommitted message.)
                   *
!                  * Note that we must test XidInMVCCSnapshot before we test
!                  * TransactionIdDidCommit, else we might return a message from
!                  * a transaction that is not yet visible to snapshots; compare
!                  * the comments at the head of tqual.c.  Furthermore, because
!                  * our own xact won't be listed in the snapshot, we must test
!                  * TransactionIdIsCurrentTransactionId as well.  (It is
!                  * possible to get here inside a transaction that has sent
!                  * notifications, see Exec_ListenPreCommit.)
                   */
                  *current = thisentry;
                  reachedStop = true;
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index bbac408..28060c5 100644
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 81,88 ****
  SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
  SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};

- /* local functions */
- static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);

  /*
   * SetHintBits()
--- 81,86 ----
*************** HeapTupleIsSurelyDead(HeapTuple htup, Tr
*** 1482,1488 ****
   * TransactionIdIsCurrentTransactionId first, except for known-committed
   * XIDs which could not be ours anyway.
   */
! static bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
      uint32        i;
--- 1480,1486 ----
   * TransactionIdIsCurrentTransactionId first, except for known-committed
   * XIDs which could not be ours anyway.
   */
! bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
      uint32        i;
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index 9a3b56e..96eaf01 100644
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
*************** extern HTSV_Result HeapTupleSatisfiesVac
*** 78,83 ****
--- 78,84 ----
                           TransactionId OldestXmin, Buffer buffer);
  extern bool HeapTupleIsSurelyDead(HeapTuple htup,
                        TransactionId OldestXmin);
+ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);

  extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
                       uint16 infomask, TransactionId xid);

-- 
Sent via pgsql-bugs mailing list (pgsql-bugs@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-bugs

В списке pgsql-bugs по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: [BUGS] ./configure error: Cannot find a working 64-bit integer type
Следующее
От: Petr Jelinek
Дата:
Сообщение: Re: [BUGS] 10.0: Logical replication doesn't execute BEFORE UPDATE OF trigger