Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24
| От | Tom Lane |
|---|---|
| Тема | Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24 |
| Дата | |
| Msg-id | 7833.1507658856@sss.pgh.pa.us обсуждение исходный текст |
| Ответ на | Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24 (Tom Lane <tgl@sss.pgh.pa.us>) |
| Ответы |
Re: [BUGS] BUG #14830: Missed NOTIFications, PostgreSQL 9.1.24
|
| Список | pgsql-bugs |
I wrote:
> Wait, I have an idea. Let's fetch a snapshot at the top of
> ProcessIncomingNotify, and then use the snapshot to decide whether
> xids are running, instead of calling TransactionIdIsInProgress.
Concretely, I suggest the attached patch. I tried Marko's testbed
against this, and it seems indeed a bit faster than before ---
running it for 100000 notifies takes about 19.25 seconds, rather than
19.5 seconds with HEAD. But the testbed is probably about the best
case because it has a bunch of threads sending notifies to one
receiver, so that the receiver is quite likely to have multiple
messages to read at a time. OTOH, in a situation where the NOTIFY
traffic isn't so high, it probably hardly matters anyway.
regards, tom lane
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08e..bec5185 100644
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 137,143 ****
--- 137,145 ----
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+ #include "utils/snapmgr.h"
#include "utils/timestamp.h"
+ #include "utils/tqual.h"
/*
*************** static bool SignalBackends(void);
*** 387,393 ****
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
! char *page_buffer);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
--- 389,396 ----
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
! char *page_buffer,
! Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(void);
static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
*************** asyncQueueReadAllNotifications(void)
*** 1744,1749 ****
--- 1747,1753 ----
volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
+ Snapshot snapshot;
bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
*************** asyncQueueReadAllNotifications(void)
*** 1767,1772 ****
--- 1771,1779 ----
return;
}
+ /* Get snapshot we'll use to decide which xacts are still in progress */
+ snapshot = RegisterSnapshot(GetLatestSnapshot());
+
/*----------
* Note that we deliver everything that we see in the queue and that
* matches our _current_ listening state.
*************** asyncQueueReadAllNotifications(void)
*** 1854,1860 ****
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
! page_buffer.buf);
} while (!reachedStop);
}
PG_CATCH();
--- 1861,1868 ----
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
! page_buffer.buf,
! snapshot);
} while (!reachedStop);
}
PG_CATCH();
*************** asyncQueueReadAllNotifications(void)
*** 1882,1887 ****
--- 1890,1898 ----
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
+
+ /* Done with snapshot */
+ UnregisterSnapshot(snapshot);
}
/*
*************** asyncQueueReadAllNotifications(void)
*** 1903,1909 ****
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
! char *page_buffer)
{
bool reachedStop = false;
bool reachedEndOfPage;
--- 1914,1921 ----
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
! char *page_buffer,
! Snapshot snapshot)
{
bool reachedStop = false;
bool reachedEndOfPage;
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1928,1934 ****
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
! if (TransactionIdIsInProgress(qe->xid))
{
/*
* The source transaction is still in progress, so we can't
--- 1940,1947 ----
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
! if (TransactionIdIsCurrentTransactionId(qe->xid) ||
! XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
* The source transaction is still in progress, so we can't
*************** asyncQueueProcessPageEntries(volatile Qu
*** 1939,1948 ****
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
! * Note that we must test TransactionIdIsInProgress before we
! * test TransactionIdDidCommit, else we might return a message
! * from a transaction that is not yet visible to snapshots;
! * compare the comments at the head of tqual.c.
*/
*current = thisentry;
reachedStop = true;
--- 1952,1965 ----
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
! * Note that we must test XidInMVCCSnapshot before we test
! * TransactionIdDidCommit, else we might return a message from
! * a transaction that is not yet visible to snapshots; compare
! * the comments at the head of tqual.c. Furthermore, because
! * our own xact won't be listed in the snapshot, we must test
! * TransactionIdIsCurrentTransactionId as well. (It is
! * possible to get here inside a transaction that has sent
! * notifications, see Exec_ListenPreCommit.)
*/
*current = thisentry;
reachedStop = true;
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index bbac408..28060c5 100644
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 81,88 ****
SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
- /* local functions */
- static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
/*
* SetHintBits()
--- 81,86 ----
*************** HeapTupleIsSurelyDead(HeapTuple htup, Tr
*** 1482,1488 ****
* TransactionIdIsCurrentTransactionId first, except for known-committed
* XIDs which could not be ours anyway.
*/
! static bool
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
--- 1480,1486 ----
* TransactionIdIsCurrentTransactionId first, except for known-committed
* XIDs which could not be ours anyway.
*/
! bool
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
{
uint32 i;
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index 9a3b56e..96eaf01 100644
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
*************** extern HTSV_Result HeapTupleSatisfiesVac
*** 78,83 ****
--- 78,84 ----
TransactionId OldestXmin, Buffer buffer);
extern bool HeapTupleIsSurelyDead(HeapTuple htup,
TransactionId OldestXmin);
+ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
uint16 infomask, TransactionId xid);
--
Sent via pgsql-bugs mailing list (pgsql-bugs@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-bugs
В списке pgsql-bugs по дате отправления: