Re: shared-memory based stats collector

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: shared-memory based stats collector
Дата
Msg-id 20180927.220049.168546206.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: shared-memory based stats collector  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Ответы Re: shared-memory based stats collector  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Список pgsql-hackers
Hello. This is a super-PoC of no-UDP stats collector.

At Wed, 26 Sep 2018 09:55:09 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20180926.095509.182252925.horiguchi.kyotaro@lab.ntt.co.jp>
> > I don't think either of these is right. I think it's crucial to get rid
> > of the UDP socket, but I think using a shmem queue is the wrong
> > approach. Not just because postgres' shm_mq is single-reader/writer, but
> > also because it's plainly unnecessary.  Backends should attempt to
> > update the shared hashtable, but acquire the necessary lock
> > conditionally, and leave the pending updates of the shared hashtable to
> > a later time if they cannot acquire the lock.
> 
> Ok, I just intended to avoid reading many bytes from a file and
> thought that writer-side can be resolved later.
> 
> Currently locks on the shared stats table is acquired by dshash
> mechanism in a partition-wise manner. The number of the
> partitions is currently fixed to 2^7 = 128, but writes for the
> same table confilicts each other regardless of the number of
> partitions. As the first step, I'm going to add
> conditional-locking capability to dsahsh_find_or_insert and each
> backend holds a queue of its pending updates.

I don't have more time 'til next monday so this is just a PoC
(sorry..).

- 0001 to 0006 is rebased version of v4.
- 0007 adds conditional locking to dshash

- 0008 is the no-UDP stats collector.

If required lock is not acquired for some stats items, report
funcions immediately return after storing the values locally. The
stored values are merged with later calls. Explicitly calling
pgstat_cleanup_pending_stat() at a convenient timing tries to
apply the pending values, but the function is not called anywhere
for now.

stats collector process is used only to save and load saved stats
files and create shared memory for stats. I'm going to remove
stats collector.

I'll continue working this way.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 86c11126fabafd1ca5637ed415b537ad7b1dec08 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 27 Sep 2018 21:36:06 +0900
Subject: [PATCH 8/8] Ultra PoC of full-shared-memory stats collector.

This path is superultra PoC of full-shared-memory stats collector,
which means UDP is no longer involved in stats collector mechanism.
Some statistics items can be postponed when required lock is not
available, and they can be tried to clean up by calling
pgstat_cleanup_pending_stat() at a convenient time (not called in this
patch).
---
 src/backend/postmaster/pgstat.c     | 2416 +++++++++++++----------------------
 src/backend/storage/lmgr/deadlock.c |    2 +-
 src/include/pgstat.h                |  357 +-----
 3 files changed, 917 insertions(+), 1858 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a3d5f4856f..b73da9a7f2 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -93,6 +93,23 @@
 #define PGSTAT_FUNCTION_HASH_SIZE    512
 
 
+#define PGSTAT_MAX_QUEUE_LEN    100
+
+/*
+ * Operation mode of pgstat_get_db_entry.
+ */
+#define PGSTAT_TABLE_READ    0
+#define PGSTAT_TABLE_WRITE    1
+#define PGSTAT_TABLE_CREATE 2
+#define    PGSTAT_TABLE_NOWAIT 4
+
+typedef enum
+{
+    PGSTAT_TABLE_NOT_FOUND,
+    PGSTAT_TABLE_FOUND,
+    PGSTAT_TABLE_LOCK_FAILED
+} pg_stat_table_result_status;
+
 /* ----------
  * Total number of backends including auxiliary
  *
@@ -119,16 +136,12 @@ int            pgstat_track_activity_query_size = 1024;
  * Stored directly in a stats message structure so it can be sent
  * without needing to copy things around.  We assume this inits to zeroes.
  */
-PgStat_MsgBgWriter BgWriterStats;
+PgStat_BgWriter BgWriterStats;
 
 /* ----------
  * Local data
  * ----------
  */
-NON_EXEC_STATIC pgsocket pgStatSock = PGINVALID_SOCKET;
-
-static struct sockaddr_storage pgStatAddr;
-
 static time_t last_pgstat_start_time;
 
 static bool pgStatRunningInCollector = false;
@@ -212,12 +225,6 @@ static HTAB *pgStatTabHash = NULL;
  */
 static HTAB *pgStatFunctions = NULL;
 
-/*
- * Indicates if backend has some function stats that it hasn't yet
- * sent to the collector.
- */
-static bool have_function_stats = false;
-
 /*
  * Tuple insertion/deletion counts for an open transaction can't be propagated
  * into PgStat_TableStatus counters until we know if it is going to commit
@@ -311,7 +318,8 @@ static void pgstat_quickdie_handler(SIGNAL_ARGS);
 static void pgstat_beshutdown_hook(int code, Datum arg);
 static void pgstat_sighup_handler(SIGNAL_ARGS);
 
-static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
+static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, int op,
+                                    pg_stat_table_result_status *status);
 static PgStat_StatTabEntry *pgstat_get_tab_entry(dshash_table *table, Oid tableoid, bool create);
 static void pgstat_write_statsfiles(void);
 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry);
@@ -323,10 +331,9 @@ static bool backend_snapshot_global_stats(void);
 static PgStat_StatFuncEntry *backend_get_func_etnry(PgStat_StatDBEntry *dbent, Oid funcid, bool oneshot);
 static void pgstat_read_current_status(void);
 
-static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
 static HTAB *pgstat_collect_oids(Oid catalogid);
-
+static void reset_dbentry_counters(PgStat_StatDBEntry *dbentry);
 static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
 
 static void pgstat_setup_memcxt(void);
@@ -337,25 +344,13 @@ static const char *pgstat_get_wait_ipc(WaitEventIPC w);
 static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
 static const char *pgstat_get_wait_io(WaitEventIO w);
 
-static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
-static void pgstat_send(void *msg, int len);
-
-static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
-static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
-static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
-static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
-static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
-static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
-static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
-static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
-static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
-static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
-static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
-static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
-static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
-static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
-static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
-static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static dshash_table *pgstat_update_dbentry(Oid dboid);
+static bool pgstat_update_tabentry(dshash_table *tabhash,
+                                   PgStat_TableStatus *stat);
+static bool pgstat_update_funcentry(dshash_table *funchash,
+                                    PgStat_BackendFunctionEntry *stat);
+static bool pgstat_tabpurge(Oid dboid, Oid taboid);
+static bool pgstat_funcpurge(Oid dboid, Oid funcoid);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -374,280 +369,7 @@ static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 void
 pgstat_init(void)
 {
-    ACCEPT_TYPE_ARG3 alen;
-    struct addrinfo *addrs = NULL,
-               *addr,
-                hints;
-    int            ret;
-    fd_set        rset;
-    struct timeval tv;
-    char        test_byte;
-    int            sel_res;
-    int            tries = 0;
-
-#define TESTBYTEVAL ((char) 199)
-
-    /*
-     * This static assertion verifies that we didn't mess up the calculations
-     * involved in selecting maximum payload sizes for our UDP messages.
-     * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
-     * be silent performance loss from fragmentation, it seems worth having a
-     * compile-time cross-check that we didn't.
-     */
-    StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,
-                     "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
-
-    /*
-     * Create the UDP socket for sending and receiving statistic messages
-     */
-    hints.ai_flags = AI_PASSIVE;
-    hints.ai_family = AF_UNSPEC;
-    hints.ai_socktype = SOCK_DGRAM;
-    hints.ai_protocol = 0;
-    hints.ai_addrlen = 0;
-    hints.ai_addr = NULL;
-    hints.ai_canonname = NULL;
-    hints.ai_next = NULL;
-    ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
-    if (ret || !addrs)
-    {
-        ereport(LOG,
-                (errmsg("could not resolve \"localhost\": %s",
-                        gai_strerror(ret))));
-        goto startup_failed;
-    }
-
-    /*
-     * On some platforms, pg_getaddrinfo_all() may return multiple addresses
-     * only one of which will actually work (eg, both IPv6 and IPv4 addresses
-     * when kernel will reject IPv6).  Worse, the failure may occur at the
-     * bind() or perhaps even connect() stage.  So we must loop through the
-     * results till we find a working combination. We will generate LOG
-     * messages, but no error, for bogus combinations.
-     */
-    for (addr = addrs; addr; addr = addr->ai_next)
-    {
-#ifdef HAVE_UNIX_SOCKETS
-        /* Ignore AF_UNIX sockets, if any are returned. */
-        if (addr->ai_family == AF_UNIX)
-            continue;
-#endif
-
-        if (++tries > 1)
-            ereport(LOG,
-                    (errmsg("trying another address for the statistics collector")));
-
-        /*
-         * Create the socket.
-         */
-        if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
-        {
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("could not create socket for statistics collector: %m")));
-            continue;
-        }
-
-        /*
-         * Bind it to a kernel assigned port on localhost and get the assigned
-         * port via getsockname().
-         */
-        if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
-        {
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("could not bind socket for statistics collector: %m")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        alen = sizeof(pgStatAddr);
-        if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
-        {
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("could not get address of socket for statistics collector: %m")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        /*
-         * Connect the socket to its own address.  This saves a few cycles by
-         * not having to respecify the target address on every send. This also
-         * provides a kernel-level check that only packets from this same
-         * address will be received.
-         */
-        if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
-        {
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("could not connect socket for statistics collector: %m")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        /*
-         * Try to send and receive a one-byte test message on the socket. This
-         * is to catch situations where the socket can be created but will not
-         * actually pass data (for instance, because kernel packet filtering
-         * rules prevent it).
-         */
-        test_byte = TESTBYTEVAL;
-
-retry1:
-        if (send(pgStatSock, &test_byte, 1, 0) != 1)
-        {
-            if (errno == EINTR)
-                goto retry1;    /* if interrupted, just retry */
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("could not send test message on socket for statistics collector: %m")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        /*
-         * There could possibly be a little delay before the message can be
-         * received.  We arbitrarily allow up to half a second before deciding
-         * it's broken.
-         */
-        for (;;)                /* need a loop to handle EINTR */
-        {
-            FD_ZERO(&rset);
-            FD_SET(pgStatSock, &rset);
-
-            tv.tv_sec = 0;
-            tv.tv_usec = 500000;
-            sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
-            if (sel_res >= 0 || errno != EINTR)
-                break;
-        }
-        if (sel_res < 0)
-        {
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("select() failed in statistics collector: %m")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-        if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
-        {
-            /*
-             * This is the case we actually think is likely, so take pains to
-             * give a specific message for it.
-             *
-             * errno will not be set meaningfully here, so don't use it.
-             */
-            ereport(LOG,
-                    (errcode(ERRCODE_CONNECTION_FAILURE),
-                     errmsg("test message did not get through on socket for statistics collector")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        test_byte++;            /* just make sure variable is changed */
-
-retry2:
-        if (recv(pgStatSock, &test_byte, 1, 0) != 1)
-        {
-            if (errno == EINTR)
-                goto retry2;    /* if interrupted, just retry */
-            ereport(LOG,
-                    (errcode_for_socket_access(),
-                     errmsg("could not receive test message on socket for statistics collector: %m")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        if (test_byte != TESTBYTEVAL)    /* strictly paranoia ... */
-        {
-            ereport(LOG,
-                    (errcode(ERRCODE_INTERNAL_ERROR),
-                     errmsg("incorrect test message transmission on socket for statistics collector")));
-            closesocket(pgStatSock);
-            pgStatSock = PGINVALID_SOCKET;
-            continue;
-        }
-
-        /* If we get here, we have a working socket */
-        break;
-    }
-
-    /* Did we find a working address? */
-    if (!addr || pgStatSock == PGINVALID_SOCKET)
-        goto startup_failed;
-
-    /*
-     * Set the socket to non-blocking IO.  This ensures that if the collector
-     * falls behind, statistics messages will be discarded; backends won't
-     * block waiting to send messages to the collector.
-     */
-    if (!pg_set_noblock(pgStatSock))
-    {
-        ereport(LOG,
-                (errcode_for_socket_access(),
-                 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
-        goto startup_failed;
-    }
-
-    /*
-     * Try to ensure that the socket's receive buffer is at least
-     * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
-     * data.  Use of UDP protocol means that we are willing to lose data under
-     * heavy load, but we don't want it to happen just because of ridiculously
-     * small default buffer sizes (such as 8KB on older Windows versions).
-     */
-    {
-        int            old_rcvbuf;
-        int            new_rcvbuf;
-        ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
-
-        if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
-                       (char *) &old_rcvbuf, &rcvbufsize) < 0)
-        {
-            elog(LOG, "getsockopt(SO_RCVBUF) failed: %m");
-            /* if we can't get existing size, always try to set it */
-            old_rcvbuf = 0;
-        }
-
-        new_rcvbuf = PGSTAT_MIN_RCVBUF;
-        if (old_rcvbuf < new_rcvbuf)
-        {
-            if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
-                           (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
-                elog(LOG, "setsockopt(SO_RCVBUF) failed: %m");
-        }
-    }
-
-    pg_freeaddrinfo_all(hints.ai_family, addrs);
-
     return;
-
-startup_failed:
-    ereport(LOG,
-            (errmsg("disabling statistics collector for lack of working socket")));
-
-    if (addrs)
-        pg_freeaddrinfo_all(hints.ai_family, addrs);
-
-    if (pgStatSock != PGINVALID_SOCKET)
-        closesocket(pgStatSock);
-    pgStatSock = PGINVALID_SOCKET;
-
-    /*
-     * Adjust GUC variables to suppress useless activity, and for debugging
-     * purposes (seeing track_counts off is a clue that we failed here). We
-     * use PGC_S_OVERRIDE because there is no point in trying to turn it back
-     * on from postgresql.conf without a restart.
-     */
-    SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
 }
 
 /*
@@ -713,226 +435,6 @@ allow_immediate_pgstat_restart(void)
     last_pgstat_start_time = 0;
 }
 
-/* ------------------------------------------------------------
- * Public functions used by backends follow
- *------------------------------------------------------------
- */
-
-
-/* ----------
- * pgstat_report_stat() -
- *
- *    Must be called by processes that performs DML: tcop/postgres.c, logical
- *    receiver processes, SPI worker, etc. to send the so far collected
- *    per-table and function usage statistics to the collector.  Note that this
- *    is called only when not within a transaction, so it is fair to use
- *    transaction stop time as an approximation of current time.
- * ----------
- */
-void
-pgstat_report_stat(bool force)
-{
-    /* we assume this inits to all zeroes: */
-    static const PgStat_TableCounts all_zeroes;
-    static TimestampTz last_report = 0;
-
-    TimestampTz now;
-    PgStat_MsgTabstat regular_msg;
-    PgStat_MsgTabstat shared_msg;
-    TabStatusArray *tsa;
-    int            i;
-
-    /* Don't expend a clock check if nothing to do */
-    if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
-        pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
-        !have_function_stats)
-        return;
-
-    /*
-     * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
-     * msec since we last sent one, or the caller wants to force stats out.
-     */
-    now = GetCurrentTransactionStopTimestamp();
-    if (!force &&
-        !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
-        return;
-    last_report = now;
-
-    /*
-     * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
-     * entries it points to.  (Should we fail partway through the loop below,
-     * it's okay to have removed the hashtable already --- the only
-     * consequence is we'd get multiple entries for the same table in the
-     * pgStatTabList, and that's safe.)
-     */
-    if (pgStatTabHash)
-        hash_destroy(pgStatTabHash);
-    pgStatTabHash = NULL;
-
-    /*
-     * Scan through the TabStatusArray struct(s) to find tables that actually
-     * have counts, and build messages to send.  We have to separate shared
-     * relations from regular ones because the databaseid field in the message
-     * header has to depend on that.
-     */
-    regular_msg.m_databaseid = MyDatabaseId;
-    shared_msg.m_databaseid = InvalidOid;
-    regular_msg.m_nentries = 0;
-    shared_msg.m_nentries = 0;
-
-    for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
-    {
-        for (i = 0; i < tsa->tsa_used; i++)
-        {
-            PgStat_TableStatus *entry = &tsa->tsa_entries[i];
-            PgStat_MsgTabstat *this_msg;
-            PgStat_TableEntry *this_ent;
-
-            /* Shouldn't have any pending transaction-dependent counts */
-            Assert(entry->trans == NULL);
-
-            /*
-             * Ignore entries that didn't accumulate any actual counts, such
-             * as indexes that were opened by the planner but not used.
-             */
-            if (memcmp(&entry->t_counts, &all_zeroes,
-                       sizeof(PgStat_TableCounts)) == 0)
-                continue;
-
-            /*
-             * OK, insert data into the appropriate message, and send if full.
-             */
-            this_msg = entry->t_shared ? &shared_msg : ®ular_msg;
-            this_ent = &this_msg->m_entry[this_msg->m_nentries];
-            this_ent->t_id = entry->t_id;
-            memcpy(&this_ent->t_counts, &entry->t_counts,
-                   sizeof(PgStat_TableCounts));
-            if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
-            {
-                pgstat_send_tabstat(this_msg);
-                this_msg->m_nentries = 0;
-            }
-        }
-        /* zero out TableStatus structs after use */
-        MemSet(tsa->tsa_entries, 0,
-               tsa->tsa_used * sizeof(PgStat_TableStatus));
-        tsa->tsa_used = 0;
-    }
-
-    /*
-     * Send partial messages.  Make sure that any pending xact commit/abort
-     * gets counted, even if there are no table stats to send.
-     */
-    if (regular_msg.m_nentries > 0 ||
-        pgStatXactCommit > 0 || pgStatXactRollback > 0)
-        pgstat_send_tabstat(®ular_msg);
-    if (shared_msg.m_nentries > 0)
-        pgstat_send_tabstat(&shared_msg);
-
-    /* Now, send function statistics */
-    pgstat_send_funcstats();
-}
-
-/*
- * Subroutine for pgstat_report_stat: finish and send a tabstat message
- */
-static void
-pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
-{
-    int            n;
-    int            len;
-
-    /* It's unlikely we'd get here with no socket, but maybe not impossible */
-    if (pgStatSock == PGINVALID_SOCKET)
-        return;
-
-    /*
-     * Report and reset accumulated xact commit/rollback and I/O timings
-     * whenever we send a normal tabstat message
-     */
-    if (OidIsValid(tsmsg->m_databaseid))
-    {
-        tsmsg->m_xact_commit = pgStatXactCommit;
-        tsmsg->m_xact_rollback = pgStatXactRollback;
-        tsmsg->m_block_read_time = pgStatBlockReadTime;
-        tsmsg->m_block_write_time = pgStatBlockWriteTime;
-        pgStatXactCommit = 0;
-        pgStatXactRollback = 0;
-        pgStatBlockReadTime = 0;
-        pgStatBlockWriteTime = 0;
-    }
-    else
-    {
-        tsmsg->m_xact_commit = 0;
-        tsmsg->m_xact_rollback = 0;
-        tsmsg->m_block_read_time = 0;
-        tsmsg->m_block_write_time = 0;
-    }
-
-    n = tsmsg->m_nentries;
-    len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
-        n * sizeof(PgStat_TableEntry);
-
-    pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
-    pgstat_send(tsmsg, len);
-}
-
-/*
- * Subroutine for pgstat_report_stat: populate and send a function stat message
- */
-static void
-pgstat_send_funcstats(void)
-{
-    /* we assume this inits to all zeroes: */
-    static const PgStat_FunctionCounts all_zeroes;
-
-    PgStat_MsgFuncstat msg;
-    PgStat_BackendFunctionEntry *entry;
-    HASH_SEQ_STATUS fstat;
-
-    if (pgStatFunctions == NULL)
-        return;
-
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT);
-    msg.m_databaseid = MyDatabaseId;
-    msg.m_nentries = 0;
-
-    hash_seq_init(&fstat, pgStatFunctions);
-    while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
-    {
-        PgStat_FunctionEntry *m_ent;
-
-        /* Skip it if no counts accumulated since last time */
-        if (memcmp(&entry->f_counts, &all_zeroes,
-                   sizeof(PgStat_FunctionCounts)) == 0)
-            continue;
-
-        /* need to convert format of time accumulators */
-        m_ent = &msg.m_entry[msg.m_nentries];
-        m_ent->f_id = entry->f_id;
-        m_ent->f_numcalls = entry->f_counts.f_numcalls;
-        m_ent->f_total_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_total_time);
-        m_ent->f_self_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_self_time);
-
-        if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
-        {
-            pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
-                        msg.m_nentries * sizeof(PgStat_FunctionEntry));
-            msg.m_nentries = 0;
-        }
-
-        /* reset the entry's counts */
-        MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
-    }
-
-    if (msg.m_nentries > 0)
-        pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
-                    msg.m_nentries * sizeof(PgStat_FunctionEntry));
-
-    have_function_stats = false;
-}
-
-
 /* ----------
  * pgstat_attach_shared_stats() -
  *
@@ -1008,6 +510,223 @@ pgstat_create_shared_stats(void)
     LWLockRelease(StatsLock);
 }
 
+
+/* ------------------------------------------------------------
+ * Public functions used by backends follow
+ *------------------------------------------------------------
+ */
+static bool pgstat_pending_tabstats = false;
+static bool pgstat_pending_funcstats = false;
+static bool pgstat_pending_vacstats = false;
+static bool pgstat_pending_dropdb = false;
+static bool pgstat_pending_resetcounter = false;
+static bool pgstat_pending_resetsharedcounter = false;
+static bool pgstat_pending_resetsinglecounter = false;
+static bool pgstat_pending_autovac = false;
+static bool pgstat_pending_vacuum = false;
+static bool pgstat_pending_analyze = false;
+static bool pgstat_pending_recoveryconflict = false;
+static bool pgstat_pending_deadlock = false;
+static bool pgstat_pending_tempfile = false;
+
+void
+pgstat_cleanup_pending_stat(void)
+{
+    if (pgstat_pending_tabstats)
+        pgstat_report_stat(true);
+    if (pgstat_pending_funcstats)
+        pgstat_send_funcstats();
+    if (pgstat_pending_vacstats)
+        pgstat_vacuum_stat();
+    if (pgstat_pending_dropdb)
+        pgstat_drop_database(InvalidOid);
+    if (pgstat_pending_resetcounter)
+        pgstat_reset_counters();
+    if (pgstat_pending_resetsharedcounter)
+        pgstat_reset_shared_counters(NULL);
+    if (pgstat_pending_resetsinglecounter)
+        pgstat_reset_single_counter(InvalidOid, 0);
+    if (pgstat_pending_autovac)
+        pgstat_report_autovac(InvalidOid);
+    if (pgstat_pending_vacuum)
+        pgstat_report_vacuum(InvalidOid, false, 0, 0);
+    if (pgstat_pending_analyze)
+        pgstat_report_analyze(NULL, 0, 0, false);
+    if (pgstat_pending_recoveryconflict)
+        pgstat_report_recovery_conflict(-1);
+    if (pgstat_pending_deadlock)
+        pgstat_report_deadlock(true);
+    if (pgstat_pending_tempfile)
+        pgstat_report_tempfile(0);
+}
+
+/* ----------
+ * pgstat_report_stat() -
+ *
+ *    Must be called by processes that performs DML: tcop/postgres.c, logical
+ *    receiver processes, SPI worker, etc. to send the so far collected
+ *    per-table and function usage statistics to the collector.  Note that this
+ *    is called only when not within a transaction, so it is fair to use
+ *    transaction stop time as an approximation of current time.
+ * ----------
+ */
+void
+pgstat_report_stat(bool force)
+{
+    /* we assume this inits to all zeroes: */
+    static const PgStat_TableCounts all_zeroes;
+    static TimestampTz last_report = 0;
+    TimestampTz now;
+    TabStatusArray *tsa;
+    int            i;
+    dshash_table *shared_tabhash = NULL;
+    dshash_table *regular_tabhash = NULL;
+
+    /* Don't expend a clock check if nothing to do */
+    if (!pgstat_pending_tabstats &&
+        (pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
+        pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
+        !pgstat_pending_funcstats)
+        return;
+
+    /*
+     * Don't update shared stats aunless it's been at least
+     * PGSTAT_STAT_INTERVAL msec since we last sent one, or the caller wants
+     * to force stats out.
+     */
+    now = GetCurrentTransactionStopTimestamp();
+    if (!force &&
+        !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+        return;
+    last_report = now;
+
+    /*
+     * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
+     * entries it points to.  (Should we fail partway through the loop below,
+     * it's okay to have removed the hashtable already --- the only
+     * consequence is we'd get multiple entries for the same table in the
+     * pgStatTabList, and that's safe.)
+     */
+    if (pgStatTabHash)
+        hash_destroy(pgStatTabHash);
+    pgStatTabHash = NULL;
+
+    pgstat_pending_tabstats = false;
+    for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
+    {
+        int move_skipped_to = 0;
+
+        for (i = 0; i < tsa->tsa_used; i++)
+        {
+            PgStat_TableStatus *entry = &tsa->tsa_entries[i];
+            dshash_table *tabhash;
+
+            /* Shouldn't have any pending transaction-dependent counts */
+            Assert(entry->trans == NULL);
+
+            /*
+             * Ignore entries that didn't accumulate any actual counts, such
+             * as indexes that were opened by the planner but not used.
+             */
+            if (memcmp(&entry->t_counts, &all_zeroes,
+                       sizeof(PgStat_TableCounts)) != 0)
+            {
+                /*
+                 * OK, insert data into the appropriate message, and send if
+                 * full.
+                 */
+                if (entry->t_shared)
+                {
+                    if (!shared_tabhash)
+                        shared_tabhash = pgstat_update_dbentry(InvalidOid);
+                    tabhash = shared_tabhash;
+                }
+                else
+                {
+                    if (!regular_tabhash)
+                        regular_tabhash = pgstat_update_dbentry(MyDatabaseId);
+                    tabhash = regular_tabhash;
+                }
+
+                /*
+                 * If this entry failed to be processed, leave this entry for
+                 * the next turn. The enties should be in head-filled manner.
+                 */
+                if (!pgstat_update_tabentry(tabhash, entry))
+                {
+                    if (move_skipped_to < i)
+                        memmove(&tsa->tsa_entries[move_skipped_to],
+                                &tsa->tsa_entries[i],
+                                sizeof(PgStat_TableStatus));
+                    move_skipped_to++;
+                }
+            }
+        }
+
+        /* notify unapplied items are exists  */
+        if (move_skipped_to > 0)
+            pgstat_pending_tabstats = true;
+
+        tsa->tsa_used = move_skipped_to;
+        /* zero out TableStatus structs after use */
+        MemSet(&tsa->tsa_entries[tsa->tsa_used], 0,
+               (TABSTAT_QUANTUM - tsa->tsa_used) * sizeof(PgStat_TableStatus));
+    }
+
+    /* Now, send function statistics */
+    pgstat_send_funcstats();
+}
+
+/*
+ * Subroutine for pgstat_report_stat: populate and send a function stat message
+ */
+static void
+pgstat_send_funcstats(void)
+{
+    /* we assume this inits to all zeroes: */
+    static const PgStat_FunctionCounts all_zeroes;
+
+    PgStat_BackendFunctionEntry *entry;
+    HASH_SEQ_STATUS fstat;
+    PgStat_StatDBEntry *dbentry;
+    pg_stat_table_result_status status;
+    dshash_table *funchash;
+
+    if (pgStatFunctions == NULL)
+        return;
+
+    pgstat_pending_funcstats = false;
+
+    dbentry = pgstat_get_db_entry(MyDatabaseId,
+                                  PGSTAT_TABLE_WRITE
+                                  | PGSTAT_TABLE_CREATE
+                                  | PGSTAT_TABLE_NOWAIT,
+                                  &status);
+
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
+        return;
+
+    funchash = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+
+    hash_seq_init(&fstat, pgStatFunctions);
+    while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
+    {
+        /* Skip it if no counts accumulated since last time */
+        if (memcmp(&entry->f_counts, &all_zeroes,
+                   sizeof(PgStat_FunctionCounts)) == 0)
+            continue;
+
+        if (pgstat_update_funcentry(funchash, entry))
+        {
+            /* reset the entry's counts */
+            MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
+        }
+        else
+            pgstat_pending_funcstats = true;
+    }
+}
+
+
 /* ----------
  * pgstat_vacuum_stat() -
  *
@@ -1018,17 +737,13 @@ void
 pgstat_vacuum_stat(void)
 {
     HTAB       *oidtab;
-    PgStat_MsgTabpurge msg;
-    PgStat_MsgFuncpurge f_msg;
     dshash_table *dshtable;
     dshash_seq_status dshstat;
     PgStat_StatDBEntry *dbentry;
     PgStat_StatTabEntry *tabentry;
     PgStat_StatFuncEntry *funcentry;
-    int            len;
-
-    if (pgStatSock == PGINVALID_SOCKET)
-        return;
+    pg_stat_table_result_status status;
+    bool        no_pending_stats;
 
     /* If not done for this transaction, take a snapshot of stats */
     if (!backend_snapshot_global_stats())
@@ -1060,11 +775,15 @@ pgstat_vacuum_stat(void)
     /* Clean up */
     hash_destroy(oidtab);
 
+    pgstat_pending_vacstats = true;
+
     /*
      * Lookup our own database entry; if not found, nothing more to do.
      */
-    dbentry = backend_get_db_entry(MyDatabaseId, true);
-    if (dbentry == NULL)
+    dbentry = pgstat_get_db_entry(MyDatabaseId,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_NOWAIT,
+                                  &status);
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
         return;
     
     /*
@@ -1072,17 +791,13 @@ pgstat_vacuum_stat(void)
      */
     oidtab = pgstat_collect_oids(RelationRelationId);
 
-    /*
-     * Initialize our messages table counter to zero
-     */
-    msg.m_nentries = 0;
-
     /*
      * Check for all tables listed in stats hashtable if they still exist.
      * Stats cache is useless here so directly search the shared hash.
      */
     dshtable = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
     dshash_seq_init(&dshstat, dshtable, false);
+    no_pending_stats = true;
     while ((tabentry = (PgStat_StatTabEntry *) dshash_seq_next(&dshstat)) != NULL)
     {
         Oid            tabid = tabentry->tableid;
@@ -1092,41 +807,11 @@ pgstat_vacuum_stat(void)
         if (hash_search(oidtab, (void *) &tabid, HASH_FIND, NULL) != NULL)
             continue;
 
-        /*
-         * Not there, so add this table's Oid to the message
-         */
-        msg.m_tableid[msg.m_nentries++] = tabid;
-
-        /*
-         * If the message is full, send it out and reinitialize to empty
-         */
-        if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
-        {
-            len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
-                + msg.m_nentries * sizeof(Oid);
-
-            pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
-            msg.m_databaseid = MyDatabaseId;
-            pgstat_send(&msg, len);
-
-            msg.m_nentries = 0;
-        }
+        /* Not there, so purge this table */
+        if (!pgstat_tabpurge(MyDatabaseId, tabid))
+            no_pending_stats = false;
     }
     dshash_detach(dshtable);
-
-    /*
-     * Send the rest
-     */
-    if (msg.m_nentries > 0)
-    {
-        len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
-            + msg.m_nentries * sizeof(Oid);
-
-        pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
-        msg.m_databaseid = MyDatabaseId;
-        pgstat_send(&msg, len);
-    }
-
     /* Clean up */
     hash_destroy(oidtab);
 
@@ -1139,10 +824,6 @@ pgstat_vacuum_stat(void)
     {
         oidtab = pgstat_collect_oids(ProcedureRelationId);
 
-        pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
-        f_msg.m_databaseid = MyDatabaseId;
-        f_msg.m_nentries = 0;
-
         dshash_seq_init(&dshstat, dshtable, false);
         while ((funcentry = (PgStat_StatFuncEntry *) dshash_seq_next(&dshstat)) != NULL)
         {
@@ -1153,39 +834,16 @@ pgstat_vacuum_stat(void)
             if (hash_search(oidtab, (void *) &funcid, HASH_FIND, NULL) != NULL)
                 continue;
 
-            /*
-             * Not there, so add this function's Oid to the message
-             */
-            f_msg.m_functionid[f_msg.m_nentries++] = funcid;
-
-            /*
-             * If the message is full, send it out and reinitialize to empty
-             */
-            if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
-            {
-                len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
-                    + f_msg.m_nentries * sizeof(Oid);
-
-                pgstat_send(&f_msg, len);
-
-                f_msg.m_nentries = 0;
-            }
+            /* Not there, so move this function */
+            if (!pgstat_funcpurge(MyDatabaseId, funcid))
+                no_pending_stats = false;
         }
-
-        /*
-         * Send the rest
-         */
-        if (f_msg.m_nentries > 0)
-        {
-            len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
-                + f_msg.m_nentries * sizeof(Oid);
-
-            pgstat_send(&f_msg, len);
-        }
-
         hash_destroy(oidtab);
     }
     dshash_detach(dshtable);
+
+    if (no_pending_stats)
+        pgstat_pending_vacstats = false;
 }
 
 
@@ -1247,50 +905,69 @@ pgstat_collect_oids(Oid catalogid)
 void
 pgstat_drop_database(Oid databaseid)
 {
-    PgStat_MsgDropdb msg;
+    static List *pending_dbid = NIL;
+    List *left_dbid = NIL;
+    PgStat_StatDBEntry *dbentry;
+    pg_stat_table_result_status status;
+    ListCell *lc;
 
-    if (pgStatSock == PGINVALID_SOCKET)
+    if (OidIsValid(databaseid))
+        pending_dbid = lappend_oid(pending_dbid, databaseid);
+    pgstat_pending_dropdb = true;
+
+    if (db_stats == NULL)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
-    msg.m_databaseid = databaseid;
-    pgstat_send(&msg, sizeof(msg));
+    foreach (lc, pending_dbid)
+    {
+        Oid dbid = lfirst_oid(lc);
+
+        /*
+         * Lookup the database in the hashtable.
+         */
+        dbentry = pgstat_get_db_entry(dbid,
+                                      PGSTAT_TABLE_WRITE | PGSTAT_TABLE_NOWAIT,
+                                      &status);
+
+        /* skip on lock failure */
+        if (status == PGSTAT_TABLE_LOCK_FAILED)
+        {
+            left_dbid = lappend_oid(left_dbid, dbid);
+            continue;
+        }
+
+        /*
+         * If found, remove it (along with the db statfile).
+         */
+        if (dbentry)
+        {
+            if (dbentry->tables != DSM_HANDLE_INVALID)
+            {
+                dshash_table *tbl =
+                    dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+                dshash_destroy(tbl);
+            }
+            if (dbentry->functions != DSM_HANDLE_INVALID)
+            {
+                dshash_table *tbl =
+                    dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+                dshash_destroy(tbl);
+            }
+
+
+            dshash_delete_entry(db_stats, (void *)dbentry);
+        }
+    }
+
+    list_free(pending_dbid);
+    pending_dbid = left_dbid;
+
+    /*  we're done if no pending database ids */
+    if (list_length(pending_dbid) == 0)
+        pgstat_pending_dropdb = false;
 }
 
 
-/* ----------
- * pgstat_drop_relation() -
- *
- *    Tell the collector that we just dropped a relation.
- *    (If the message gets lost, we will still clean the dead entry eventually
- *    via future invocations of pgstat_vacuum_stat().)
- *
- *    Currently not used for lack of any good place to call it; we rely
- *    entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
- * ----------
- */
-#ifdef NOT_USED
-void
-pgstat_drop_relation(Oid relid)
-{
-    PgStat_MsgTabpurge msg;
-    int            len;
-
-    if (pgStatSock == PGINVALID_SOCKET)
-        return;
-
-    msg.m_tableid[0] = relid;
-    msg.m_nentries = 1;
-
-    len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
-
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
-    msg.m_databaseid = MyDatabaseId;
-    pgstat_send(&msg, len);
-}
-#endif                            /* NOT_USED */
-
-
 /* ----------
  * pgstat_reset_counters() -
  *
@@ -1303,14 +980,59 @@ pgstat_drop_relation(Oid relid)
 void
 pgstat_reset_counters(void)
 {
-    PgStat_MsgResetcounter msg;
+    PgStat_StatDBEntry *dbentry;
+    pg_stat_table_result_status status;
 
-    if (pgStatSock == PGINVALID_SOCKET)
+    pgstat_pending_resetcounter = true;
+
+    if (db_stats == NULL)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
-    msg.m_databaseid = MyDatabaseId;
-    pgstat_send(&msg, sizeof(msg));
+    /*
+     * Lookup the database in the hashtable.  Nothing to do if not there.
+     */
+    dbentry = pgstat_get_db_entry(MyDatabaseId,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_NOWAIT,
+                                  &status);
+
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
+        return;
+
+    if (!dbentry)
+    {
+        pgstat_pending_resetcounter = false;
+        return;
+    }
+
+    /*
+     * We simply throw away all the database's table entries by recreating a
+     * new hash table for them.
+     */
+    if (dbentry->tables != DSM_HANDLE_INVALID)
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+        dshash_destroy(t);
+        dbentry->tables = DSM_HANDLE_INVALID;
+    }
+    if (dbentry->functions != DSM_HANDLE_INVALID)
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+        dshash_destroy(t);
+        dbentry->functions = DSM_HANDLE_INVALID;
+    }
+
+    /*
+     * Reset database-level stats, too.  This creates empty hash tables for
+     * tables and functions.
+     */
+    reset_dbentry_counters(dbentry);
+
+    dshash_release_lock(db_stats, dbentry);
+
+    /*  we're done */
+    pgstat_pending_resetcounter = false;
 }
 
 /* ----------
@@ -1325,23 +1047,53 @@ pgstat_reset_counters(void)
 void
 pgstat_reset_shared_counters(const char *target)
 {
-    PgStat_MsgResetsharedcounter msg;
-
-    if (pgStatSock == PGINVALID_SOCKET)
-        return;
+    static bool archiver_pending = false;
+    static bool bgwriter_pending = false;
+    bool    have_lock = false;
 
     if (strcmp(target, "archiver") == 0)
-        msg.m_resettarget = RESET_ARCHIVER;
+    {
+        archiver_pending = true;
+    }
     else if (strcmp(target, "bgwriter") == 0)
-        msg.m_resettarget = RESET_BGWRITER;
-    else
+    {
+        bgwriter_pending = true;
+    }
+    else if (target != NULL)
         ereport(ERROR,
                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                  errmsg("unrecognized reset target: \"%s\"", target),
                  errhint("Target must be \"archiver\" or \"bgwriter\".")));
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
-    pgstat_send(&msg, sizeof(msg));
+    pgstat_pending_resetsharedcounter = true;
+
+    if (db_stats == NULL)
+        return;
+
+    /* Reset the archiver statistics for the cluster. */
+    if (archiver_pending && LWLockConditionalAcquire(StatsLock, LW_EXCLUSIVE))
+    {
+        memset(&shared_archiverStats, 0, sizeof(shared_archiverStats));
+        shared_archiverStats->stat_reset_timestamp = GetCurrentTimestamp();
+        archiver_pending = false;
+        have_lock = true;
+    }
+
+    if (bgwriter_pending &&
+        (have_lock || LWLockConditionalAcquire(StatsLock, LW_EXCLUSIVE)))
+    {
+        /* Reset the global background writer statistics for the cluster. */
+        memset(&shared_globalStats, 0, sizeof(shared_globalStats));
+        shared_globalStats->stat_reset_timestamp = GetCurrentTimestamp();
+        bgwriter_pending = false;
+        have_lock = true;
+    }
+
+    if (have_lock)
+        LWLockRelease(StatsLock);
+
+    /* notify any pending update  */
+    pgstat_pending_resetsharedcounter =    (archiver_pending || bgwriter_pending);
 }
 
 /* ----------
@@ -1356,17 +1108,37 @@ pgstat_reset_shared_counters(const char *target)
 void
 pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
 {
-    PgStat_MsgResetsinglecounter msg;
+    PgStat_StatDBEntry *dbentry;
 
-    if (pgStatSock == PGINVALID_SOCKET)
+    /* Don't defer */
+
+    if (db_stats == NULL)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER);
-    msg.m_databaseid = MyDatabaseId;
-    msg.m_resettype = type;
-    msg.m_objectid = objoid;
+    dbentry = pgstat_get_db_entry(MyDatabaseId, PGSTAT_TABLE_WRITE, NULL);
 
-    pgstat_send(&msg, sizeof(msg));
+    if (!dbentry)
+        return;
+
+    /* Set the reset timestamp for the whole database */
+    dbentry->stat_reset_timestamp = GetCurrentTimestamp();
+
+    /* Remove object if it exists, ignore it if not */
+    if (type == RESET_TABLE)
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+        dshash_delete_key(t, (void *) &objoid);
+    }
+
+    if (type == RESET_FUNCTION)
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+        dshash_delete_key(t, (void *) &objoid);
+    }
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -1380,16 +1152,23 @@ pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
 void
 pgstat_report_autovac(Oid dboid)
 {
-    PgStat_MsgAutovacStart msg;
+    PgStat_StatDBEntry *dbentry;
 
-    if (pgStatSock == PGINVALID_SOCKET)
+    /* Don't defer */
+
+    if (db_stats == NULL)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
-    msg.m_databaseid = dboid;
-    msg.m_start_time = GetCurrentTimestamp();
+    /*
+     * Store the last autovacuum time in the database's hashtable entry.
+     */
+    dbentry = pgstat_get_db_entry(dboid,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_CREATE,
+                                  NULL);
 
-    pgstat_send(&msg, sizeof(msg));
+    dbentry->last_autovac_time = GetCurrentTimestamp();
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 
@@ -1403,19 +1182,43 @@ void
 pgstat_report_vacuum(Oid tableoid, bool shared,
                      PgStat_Counter livetuples, PgStat_Counter deadtuples)
 {
-    PgStat_MsgVacuum msg;
+    Oid                    dboid;
+    PgStat_StatDBEntry *dbentry;
+    PgStat_StatTabEntry *tabentry;
+    dshash_table *table;
 
-    if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+    /* Don't defer */
+
+    if (db_stats == NULL || !pgstat_track_counts)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
-    msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
-    msg.m_tableoid = tableoid;
-    msg.m_autovacuum = IsAutoVacuumWorkerProcess();
-    msg.m_vacuumtime = GetCurrentTimestamp();
-    msg.m_live_tuples = livetuples;
-    msg.m_dead_tuples = deadtuples;
-    pgstat_send(&msg, sizeof(msg));
+    dboid = shared ? InvalidOid : MyDatabaseId;
+
+    /*
+     * Store the data in the table's hashtable entry.
+     */
+    dbentry = pgstat_get_db_entry(dboid,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_CREATE,
+                                  NULL);
+    table = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+    tabentry = pgstat_get_tab_entry(table, tableoid, true);
+
+    tabentry->n_live_tuples = livetuples;
+    tabentry->n_dead_tuples = deadtuples;
+
+    if (IsAutoVacuumWorkerProcess())
+    {
+        tabentry->autovac_vacuum_timestamp = GetCurrentTimestamp();
+        tabentry->autovac_vacuum_count++;
+    }
+    else
+    {
+        tabentry->vacuum_timestamp = GetCurrentTimestamp();
+        tabentry->vacuum_count++;
+    }
+    dshash_release_lock(table, tabentry);
+    dshash_detach(table);
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* --------
@@ -1432,9 +1235,14 @@ pgstat_report_analyze(Relation rel,
                       PgStat_Counter livetuples, PgStat_Counter deadtuples,
                       bool resetcounter)
 {
-    PgStat_MsgAnalyze msg;
+    Oid                    dboid;
+    PgStat_StatDBEntry *dbentry;
+    PgStat_StatTabEntry *tabentry;
+    dshash_table *table;
 
-    if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+    /* Don't defer */
+
+    if (db_stats == NULL || !pgstat_track_counts)
         return;
 
     /*
@@ -1463,15 +1271,42 @@ pgstat_report_analyze(Relation rel,
         deadtuples = Max(deadtuples, 0);
     }
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
-    msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
-    msg.m_tableoid = RelationGetRelid(rel);
-    msg.m_autovacuum = IsAutoVacuumWorkerProcess();
-    msg.m_resetcounter = resetcounter;
-    msg.m_analyzetime = GetCurrentTimestamp();
-    msg.m_live_tuples = livetuples;
-    msg.m_dead_tuples = deadtuples;
-    pgstat_send(&msg, sizeof(msg));
+    dboid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
+
+    /*
+     * Store the data in the table's hashtable entry.
+     */
+    dbentry = pgstat_get_db_entry(dboid,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_CREATE,
+                                  NULL);
+
+    table = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+    tabentry = pgstat_get_tab_entry(table, RelationGetRelid(rel), true);
+
+    tabentry->n_live_tuples = livetuples;
+    tabentry->n_dead_tuples = deadtuples;
+
+    /*
+     * If commanded, reset changes_since_analyze to zero.  This forgets any
+     * changes that were committed while the ANALYZE was in progress, but we
+     * have no good way to estimate how many of those there were.
+     */
+    if (resetcounter)
+        tabentry->changes_since_analyze = 0;
+
+    if (IsAutoVacuumWorkerProcess())
+    {
+        tabentry->autovac_analyze_timestamp = GetCurrentTimestamp();
+        tabentry->autovac_analyze_count++;
+    }
+    else
+    {
+        tabentry->analyze_timestamp = GetCurrentTimestamp();
+        tabentry->analyze_count++;
+    }
+    dshash_release_lock(table, tabentry);
+    dshash_detach(table);
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* --------
@@ -1483,15 +1318,67 @@ pgstat_report_analyze(Relation rel,
 void
 pgstat_report_recovery_conflict(int reason)
 {
-    PgStat_MsgRecoveryConflict msg;
+    static int pending_conflict_tablespace = 0;
+    static int pending_conflict_lock = 0;
+    static int pending_conflict_snapshot = 0;
+    static int pending_conflict_bufferpin = 0;
+    static int pending_conflict_startup_deadlock = 0;
+    PgStat_StatDBEntry *dbentry;
+    pg_stat_table_result_status status;
 
-    if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+    if (db_stats == NULL || !pgstat_track_counts)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
-    msg.m_databaseid = MyDatabaseId;
-    msg.m_reason = reason;
-    pgstat_send(&msg, sizeof(msg));
+    switch (reason)
+    {
+        case PROCSIG_RECOVERY_CONFLICT_DATABASE:
+
+            /*
+             * Since we drop the information about the database as soon as it
+             * replicates, there is no point in counting these conflicts.
+             */
+            break;
+        case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
+            pending_conflict_tablespace++;
+            break;
+        case PROCSIG_RECOVERY_CONFLICT_LOCK:
+            pending_conflict_lock++;
+            break;
+        case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+            pending_conflict_snapshot++;
+            break;
+        case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
+            pending_conflict_bufferpin++;
+            break;
+        case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
+            pending_conflict_startup_deadlock++;
+            break;
+    }
+    pgstat_pending_recoveryconflict = true;
+
+    dbentry = pgstat_get_db_entry(MyDatabaseId,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_CREATE |
+                                  PGSTAT_TABLE_NOWAIT,
+                                  &status);
+
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
+        return;
+
+    dbentry->n_conflict_tablespace    += pending_conflict_tablespace;
+    dbentry->n_conflict_lock         += pending_conflict_lock;
+    dbentry->n_conflict_snapshot    += pending_conflict_snapshot;
+    dbentry->n_conflict_bufferpin    += pending_conflict_bufferpin;
+    dbentry->n_conflict_startup_deadlock += pending_conflict_startup_deadlock;
+
+    pending_conflict_tablespace = 0;
+    pending_conflict_lock = 0;
+    pending_conflict_snapshot = 0;
+    pending_conflict_bufferpin = 0;
+    pending_conflict_startup_deadlock = 0;
+
+    dshash_release_lock(db_stats, dbentry);
+    
+    pgstat_pending_recoveryconflict = false;
 }
 
 /* --------
@@ -1501,16 +1388,31 @@ pgstat_report_recovery_conflict(int reason)
  * --------
  */
 void
-pgstat_report_deadlock(void)
+pgstat_report_deadlock(bool pending)
 {
-    PgStat_MsgDeadlock msg;
+    static int pending_deadlocks = 0;
+    PgStat_StatDBEntry *dbentry;
+    pg_stat_table_result_status status;
 
-    if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+    if (db_stats == NULL || !pgstat_track_counts)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DEADLOCK);
-    msg.m_databaseid = MyDatabaseId;
-    pgstat_send(&msg, sizeof(msg));
+    pending_deadlocks++;
+    pgstat_pending_deadlock = true;
+
+    dbentry = pgstat_get_db_entry(MyDatabaseId,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_CREATE |
+                                  PGSTAT_TABLE_NOWAIT,
+                                  &status);
+
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
+        return;
+
+    dbentry->n_deadlocks += pending_deadlocks;
+    pending_deadlocks = 0;
+    pgstat_pending_deadlock = false;
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* --------
@@ -1522,34 +1424,36 @@ pgstat_report_deadlock(void)
 void
 pgstat_report_tempfile(size_t filesize)
 {
-    PgStat_MsgTempFile msg;
+    static size_t pending_filesize = 0;
+    static size_t pending_files = 0;
+    PgStat_StatDBEntry *dbentry;
+    pg_stat_table_result_status status;
 
-    if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+    if (filesize > 0) /* Is't there a case where filesize is really 0? */
+    {
+        pending_filesize += filesize; /* needs check overflow */
+        pending_files++;
+    }
+    pgstat_pending_tempfile = true;
+
+    if (db_stats == NULL || !pgstat_track_counts)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TEMPFILE);
-    msg.m_databaseid = MyDatabaseId;
-    msg.m_filesize = filesize;
-    pgstat_send(&msg, sizeof(msg));
-}
+    dbentry = pgstat_get_db_entry(MyDatabaseId,
+                                  PGSTAT_TABLE_WRITE | PGSTAT_TABLE_CREATE |
+                                  PGSTAT_TABLE_NOWAIT,
+                                  &status);
 
-
-/* ----------
- * pgstat_ping() -
- *
- *    Send some junk data to the collector to increase traffic.
- * ----------
- */
-void
-pgstat_ping(void)
-{
-    PgStat_MsgDummy msg;
-
-    if (pgStatSock == PGINVALID_SOCKET)
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
         return;
 
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
-    pgstat_send(&msg, sizeof(msg));
+    dbentry->n_temp_bytes += pending_filesize;
+    dbentry->n_temp_files += pending_files;
+    pending_filesize = 0;
+    pending_files = 0;
+    pgstat_pending_tempfile = false;
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 
@@ -1670,7 +1574,7 @@ pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
     INSTR_TIME_ADD(fs->f_self_time, f_self);
 
     /* indicate that we have something to send */
-    have_function_stats = true;
+    pgstat_pending_funcstats = true;
 }
 
 
@@ -1691,6 +1595,7 @@ pgstat_initstats(Relation rel)
 {
     Oid            rel_id = rel->rd_id;
     char        relkind = rel->rd_rel->relkind;
+    MemoryContext oldcontext;
 
     /* We only count stats for things that have storage */
     if (!(relkind == RELKIND_RELATION ||
@@ -1703,7 +1608,14 @@ pgstat_initstats(Relation rel)
         return;
     }
 
-    if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
+    /* Attached shared memory lives for the process lifetime */
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    while (!pgstat_attach_shared_stats())
+        sleep(1);
+
+    MemoryContextSwitchTo(oldcontext);
+
+    if (db_stats == NULL || !pgstat_track_counts)
     {
         /* We're not counting at all */
         rel->pgstat_info = NULL;
@@ -2426,7 +2338,7 @@ pgstat_fetch_stat_funcentry(Oid func_id)
     PgStat_StatFuncEntry *funcentry = NULL;
 
     /* Lookup our database, then find the requested function */
-    dbentry = pgstat_get_db_entry(MyDatabaseId, false);
+    dbentry = pgstat_get_db_entry(MyDatabaseId, PGSTAT_TABLE_READ, NULL);
     if (dbentry == NULL)
         return NULL;
 
@@ -2721,7 +2633,7 @@ pgstat_initialize(void)
     }
 
     /* Set up a process-exit hook to clean up */
-    on_shmem_exit(pgstat_beshutdown_hook, 0);
+    before_shmem_exit(pgstat_beshutdown_hook, 0);
 }
 
 /* ----------
@@ -4105,49 +4017,6 @@ pgstat_get_backend_desc(BackendType backendType)
  * ------------------------------------------------------------
  */
 
-
-/* ----------
- * pgstat_setheader() -
- *
- *        Set common header fields in a statistics message
- * ----------
- */
-static void
-pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
-{
-    hdr->m_type = mtype;
-}
-
-
-/* ----------
- * pgstat_send() -
- *
- *        Send out one statistics message to the collector
- * ----------
- */
-static void
-pgstat_send(void *msg, int len)
-{
-    int            rc;
-
-    if (pgStatSock == PGINVALID_SOCKET)
-        return;
-
-    ((PgStat_MsgHdr *) msg)->m_size = len;
-
-    /* We'll retry after EINTR, but ignore all other failures */
-    do
-    {
-        rc = send(pgStatSock, msg, len, 0);
-    } while (rc < 0 && errno == EINTR);
-
-#ifdef USE_ASSERT_CHECKING
-    /* In debug builds, log send failures ... */
-    if (rc < 0)
-        elog(LOG, "could not send to statistics collector: %m");
-#endif
-}
-
 /* ----------
  * pgstat_send_archiver() -
  *
@@ -4158,16 +4027,22 @@ pgstat_send(void *msg, int len)
 void
 pgstat_send_archiver(const char *xlog, bool failed)
 {
-    PgStat_MsgArchiver msg;
-
-    /*
-     * Prepare and send the message
-     */
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ARCHIVER);
-    msg.m_failed = failed;
-    StrNCpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
-    msg.m_timestamp = GetCurrentTimestamp();
-    pgstat_send(&msg, sizeof(msg));
+    if (failed)
+    {
+        /* Failed archival attempt */
+        ++shared_archiverStats->failed_count;
+        memcpy(shared_archiverStats->last_failed_wal, xlog,
+               sizeof(shared_archiverStats->last_failed_wal));
+        shared_archiverStats->last_failed_timestamp = GetCurrentTimestamp();
+    }
+    else
+    {
+        /* Successful archival operation */
+        ++shared_archiverStats->archived_count;
+        memcpy(shared_archiverStats->last_archived_wal, xlog,
+               sizeof(shared_archiverStats->last_archived_wal));
+        shared_archiverStats->last_archived_timestamp = GetCurrentTimestamp();
+    }
 }
 
 /* ----------
@@ -4180,21 +4055,36 @@ void
 pgstat_send_bgwriter(void)
 {
     /* We assume this initializes to zeroes */
-    static const PgStat_MsgBgWriter all_zeroes;
+    static const PgStat_BgWriter all_zeroes;
+
+    PgStat_BgWriter *s = &BgWriterStats;
+    MemoryContext oldcontext;
 
     /*
      * This function can be called even if nothing at all has happened. In
      * this case, avoid sending a completely empty message to the stats
      * collector.
      */
-    if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
+    if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_BgWriter)) == 0)
         return;
 
-    /*
-     * Prepare and send the message
-     */
-    pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
-    pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
+    /* Attached shared memory lives for the process lifetime */
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    while (!pgstat_attach_shared_stats())
+        sleep(1);
+
+    MemoryContextSwitchTo(oldcontext);
+
+    shared_globalStats->timed_checkpoints += s->timed_checkpoints;
+    shared_globalStats->requested_checkpoints += s->requested_checkpoints;
+    shared_globalStats->checkpoint_write_time += s->checkpoint_write_time;
+    shared_globalStats->checkpoint_sync_time += s->checkpoint_sync_time;
+    shared_globalStats->buf_written_checkpoints += s->buf_written_checkpoints;
+    shared_globalStats->buf_written_clean += s->buf_written_clean;
+    shared_globalStats->maxwritten_clean += s->maxwritten_clean;
+    shared_globalStats->buf_written_backend += s->buf_written_backend;
+    shared_globalStats->buf_fsync_backend += s->buf_fsync_backend;
+    shared_globalStats->buf_alloc += s->buf_alloc;
 
     /*
      * Clear out the statistics buffer, so it can be re-used.
@@ -4215,8 +4105,6 @@ pgstat_send_bgwriter(void)
 void
 PgstatCollectorMain(void)
 {
-    int            len;
-    PgStat_Msg    msg;
     int            wr;
 
     /*
@@ -4246,20 +4134,6 @@ PgstatCollectorMain(void)
     pgStatRunningInCollector = true;
     pgstat_read_statsfiles();
 
-    /*
-     * Loop to process messages until we get SIGQUIT or detect ungraceful
-     * death of our parent postmaster.
-     *
-     * For performance reasons, we don't want to do ResetLatch/WaitLatch after
-     * every message; instead, do that only after a recv() fails to obtain a
-     * message.  (This effectively means that if backends are sending us stuff
-     * like mad, we won't notice postmaster death until things slack off a
-     * bit; which seems fine.)    To do that, we have an inner loop that
-     * iterates as long as recv() succeeds.  We do recognize got_SIGHUP inside
-     * the inner loop, which means that such interrupts will get serviced but
-     * the latch won't get cleared until next time there is a break in the
-     * action.
-     */
     for (;;)
     {
         /* Clear any already-pending wakeups */
@@ -4272,164 +4146,16 @@ PgstatCollectorMain(void)
             break;
 
         /*
-         * Inner loop iterates as long as we keep getting messages, or until
-         * need_exit becomes set.
+         * Reload configuration if we got SIGHUP from the postmaster.
          */
-        while (!got_SIGTERM)
+        if (got_SIGHUP)
         {
-            /*
-             * Reload configuration if we got SIGHUP from the postmaster.
-             */
-            if (got_SIGHUP)
-            {
-                got_SIGHUP = false;
-                ProcessConfigFile(PGC_SIGHUP);
-            }
-
-            /*
-             * Try to receive and process a message.  This will not block,
-             * since the socket is set to non-blocking mode.
-             *
-             * XXX On Windows, we have to force pgwin32_recv to cooperate,
-             * despite the previous use of pg_set_noblock() on the socket.
-             * This is extremely broken and should be fixed someday.
-             */
-#ifdef WIN32
-            pgwin32_noblock = 1;
-#endif
-
-            len = recv(pgStatSock, (char *) &msg,
-                       sizeof(PgStat_Msg), 0);
-
-#ifdef WIN32
-            pgwin32_noblock = 0;
-#endif
-
-            if (len < 0)
-            {
-                if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-                    break;        /* out of inner loop */
-                ereport(ERROR,
-                        (errcode_for_socket_access(),
-                         errmsg("could not read statistics message: %m")));
-            }
-
-            /*
-             * We ignore messages that are smaller than our common header
-             */
-            if (len < sizeof(PgStat_MsgHdr))
-                continue;
-
-            /*
-             * The received length must match the length in the header
-             */
-            if (msg.msg_hdr.m_size != len)
-                continue;
-
-            /*
-             * O.K. - we accept this message.  Process it.
-             */
-            switch (msg.msg_hdr.m_type)
-            {
-                case PGSTAT_MTYPE_DUMMY:
-                    break;
-
-                case PGSTAT_MTYPE_TABSTAT:
-                    pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_TABPURGE:
-                    pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_DROPDB:
-                    pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_RESETCOUNTER:
-                    pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
-                                             len);
-                    break;
-
-                case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
-                    pgstat_recv_resetsharedcounter(
-                                                   (PgStat_MsgResetsharedcounter *) &msg,
-                                                   len);
-                    break;
-
-                case PGSTAT_MTYPE_RESETSINGLECOUNTER:
-                    pgstat_recv_resetsinglecounter(
-                                                   (PgStat_MsgResetsinglecounter *) &msg,
-                                                   len);
-                    break;
-
-                case PGSTAT_MTYPE_AUTOVAC_START:
-                    pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_VACUUM:
-                    pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_ANALYZE:
-                    pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_ARCHIVER:
-                    pgstat_recv_archiver((PgStat_MsgArchiver *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_BGWRITER:
-                    pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_FUNCSTAT:
-                    pgstat_recv_funcstat((PgStat_MsgFuncstat *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_FUNCPURGE:
-                    pgstat_recv_funcpurge((PgStat_MsgFuncpurge *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_RECOVERYCONFLICT:
-                    pgstat_recv_recoveryconflict((PgStat_MsgRecoveryConflict *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_DEADLOCK:
-                    pgstat_recv_deadlock((PgStat_MsgDeadlock *) &msg, len);
-                    break;
-
-                case PGSTAT_MTYPE_TEMPFILE:
-                    pgstat_recv_tempfile((PgStat_MsgTempFile *) &msg, len);
-                    break;
-
-                default:
-                    break;
-            }
-        }                        /* end of inner message-processing loop */
-
-        /* Sleep until there's something to do */
-#ifndef WIN32
-        wr = WaitLatchOrSocket(MyLatch,
-                               WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
-                               pgStatSock, -1L,
-                               WAIT_EVENT_PGSTAT_MAIN);
-#else
-
-        /*
-         * Windows, at least in its Windows Server 2003 R2 incarnation,
-         * sometimes loses FD_READ events.  Waking up and retrying the recv()
-         * fixes that, so don't sleep indefinitely.  This is a crock of the
-         * first water, but until somebody wants to debug exactly what's
-         * happening there, this is the best we can do.  The two-second
-         * timeout matches our pre-9.2 behavior.
-         */
-        wr = WaitLatchOrSocket(MyLatch,
-                               WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT,
-                               pgStatSock,
-                               2 * 1000L /* msec */ ,
-                               WAIT_EVENT_PGSTAT_MAIN);
-#endif
+            got_SIGHUP = false;
+            ProcessConfigFile(PGC_SIGHUP);
+        }
+        
+        wr = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1L,
+                       WAIT_EVENT_PGSTAT_MAIN);
 
         /*
          * Emergency bailout if postmaster has died.  This is to avoid the
@@ -4437,8 +4163,8 @@ PgstatCollectorMain(void)
          */
         if (wr & WL_POSTMASTER_DEATH)
             break;
-    }                            /* end of outer loop */
-
+    }
+        
     /*
      * Save the final stats to reuse at next startup.
      */
@@ -4552,29 +4278,62 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
  * Else, return NULL.
  */
 static PgStat_StatDBEntry *
-pgstat_get_db_entry(Oid databaseid, bool create)
+pgstat_get_db_entry(Oid databaseid, int op,    pg_stat_table_result_status *status)
 {
     PgStat_StatDBEntry *result;
-    bool        found;
+    bool        nowait = ((op & PGSTAT_TABLE_NOWAIT) != 0);
+    bool        lock_acquired = true;
+    bool        found = true;
+    MemoryContext oldcontext;
 
-    Assert(pgStatRunningInCollector);
+    /* XXXXXXX */
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    if (!pgstat_attach_shared_stats())
+    {
+        MemoryContextSwitchTo(oldcontext);
+        return false;
+    }
+    MemoryContextSwitchTo(oldcontext);
 
     /* Lookup or create the hash table entry for this database */
-    if (create)
+    if (op & PGSTAT_TABLE_CREATE)
+    {
         result = (PgStat_StatDBEntry *)
-            dshash_find_or_insert(db_stats,    &databaseid, &found);
+            dshash_find_or_insert_extended(db_stats, &databaseid,
+                                           &found, nowait);
+        if (result == NULL)
+            lock_acquired = false;
+        else if (!found)
+        {
+            /*
+             * If not found, initialize the new one.  This creates empty hash
+             * tables for tables and functions, too.
+             */
+            reset_dbentry_counters(result);
+        }
+    }
     else
-        result = (PgStat_StatDBEntry *)    dshash_find(db_stats, &databaseid, true);
+    {
+        result = (PgStat_StatDBEntry *)
+            dshash_find_extended(db_stats, &databaseid,
+                                 &lock_acquired, true, nowait);
+        if (result == NULL)
+            found = false;
+    }
 
-    if (!create)
-        return result;
-
-    /*
-     * If not found, initialize the new one.  This creates empty hash tables
-     * for tables and functions, too.
-     */
-    if (!found)
-        reset_dbentry_counters(result);
+    /* Set return status if requested */
+    if (status)
+    {
+        if (!lock_acquired)
+        {
+            Assert(nowait);
+            *status = PGSTAT_TABLE_LOCK_FAILED;
+        }
+        else if (!found)
+            *status = PGSTAT_TABLE_NOT_FOUND;
+        else
+            *status = PGSTAT_TABLE_FOUND;
+    }
 
     return result;
 }
@@ -5646,108 +5405,124 @@ pgstat_clear_snapshot(void)
  *    Count what the backend has done.
  * ----------
  */
-static void
-pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
+static dshash_table *
+pgstat_update_dbentry(Oid dboid)
 {
-    dshash_table *tabhash;
     PgStat_StatDBEntry *dbentry;
-    PgStat_StatTabEntry *tabentry;
-    int            i;
-    bool        found;
+    pg_stat_table_result_status status;
+    dshash_table *tabhash;
 
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    /*
-     * Update database-wide stats.
-     */
-    dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
-    dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
-    dbentry->n_block_read_time += msg->m_block_read_time;
-    dbentry->n_block_write_time += msg->m_block_write_time;
+    dbentry = pgstat_get_db_entry(dboid,
+                                  PGSTAT_TABLE_WRITE
+                                  | PGSTAT_TABLE_CREATE
+                                  | PGSTAT_TABLE_NOWAIT,
+                                  &status);
+    
+    /* return if lock failed */
+    if (status == PGSTAT_TABLE_LOCK_FAILED)
+        return NULL;
 
     tabhash = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-    /*
-     * Process all table entries in the message.
-     */
-    for (i = 0; i < msg->m_nentries; i++)
+    
+    if (OidIsValid(dboid))
     {
-        PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
-
-        tabentry = (PgStat_StatTabEntry *)
-            dshash_find_or_insert(tabhash, (void *) &(tabmsg->t_id), &found);
-
-        if (!found)
-        {
-            /*
-             * If it's a new table entry, initialize counters to the values we
-             * just got.
-             */
-            tabentry->numscans = tabmsg->t_counts.t_numscans;
-            tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
-            tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
-            tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
-            tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
-            tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
-            tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
-            tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
-            tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
-            tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
-            tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
-            tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
-
-            tabentry->vacuum_timestamp = 0;
-            tabentry->vacuum_count = 0;
-            tabentry->autovac_vacuum_timestamp = 0;
-            tabentry->autovac_vacuum_count = 0;
-            tabentry->analyze_timestamp = 0;
-            tabentry->analyze_count = 0;
-            tabentry->autovac_analyze_timestamp = 0;
-            tabentry->autovac_analyze_count = 0;
-        }
-        else
-        {
-            /*
-             * Otherwise add the values to the existing entry.
-             */
-            tabentry->numscans += tabmsg->t_counts.t_numscans;
-            tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
-            tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
-            tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
-            tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
-            tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
-            tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
-            /* If table was truncated, first reset the live/dead counters */
-            if (tabmsg->t_counts.t_truncated)
-            {
-                tabentry->n_live_tuples = 0;
-                tabentry->n_dead_tuples = 0;
-            }
-            tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
-            tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
-            tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
-            tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
-            tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
-        }
-
-        /* Clamp n_live_tuples in case of negative delta_live_tuples */
-        tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
-        /* Likewise for n_dead_tuples */
-        tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
-        dshash_release_lock(tabhash, tabentry);
-
         /*
-         * Add per-table stats to the per-database entry, too.
+         * Update database-wide stats.
          */
-        dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
-        dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
-        dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
-        dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
-        dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
-        dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
-        dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
+        dbentry->n_xact_commit += pgStatXactCommit;
+        dbentry->n_xact_rollback += pgStatXactRollback;
+        dbentry->n_block_read_time += pgStatBlockReadTime;
+        dbentry->n_block_write_time += pgStatBlockWriteTime;
     }
 
+    pgStatXactCommit = 0;
+    pgStatXactRollback = 0;
+    pgStatBlockReadTime = 0;
+    pgStatBlockWriteTime = 0;
+
     dshash_release_lock(db_stats, dbentry);
+
+    return tabhash;
+}
+
+static bool
+pgstat_update_tabentry(dshash_table *tabhash, PgStat_TableStatus *stat)
+{
+    PgStat_StatTabEntry *tabentry;
+    bool    found;
+
+    if (tabhash == NULL)
+        return false;
+
+    tabentry = (PgStat_StatTabEntry *)
+        dshash_find_or_insert_extended(tabhash, (void *) &(stat->t_id),
+                                       &found, true);
+
+    /* failed to acquire lock */
+    if (tabentry == NULL)
+        return false;
+
+    if (!found)
+    {
+        /*
+         * If it's a new table entry, initialize counters to the values we
+         * just got.
+         */
+        tabentry->numscans = stat->t_counts.t_numscans;
+        tabentry->tuples_returned = stat->t_counts.t_tuples_returned;
+        tabentry->tuples_fetched = stat->t_counts.t_tuples_fetched;
+        tabentry->tuples_inserted = stat->t_counts.t_tuples_inserted;
+        tabentry->tuples_updated = stat->t_counts.t_tuples_updated;
+        tabentry->tuples_deleted = stat->t_counts.t_tuples_deleted;
+        tabentry->tuples_hot_updated = stat->t_counts.t_tuples_hot_updated;
+        tabentry->n_live_tuples = stat->t_counts.t_delta_live_tuples;
+        tabentry->n_dead_tuples = stat->t_counts.t_delta_dead_tuples;
+        tabentry->changes_since_analyze = stat->t_counts.t_changed_tuples;
+        tabentry->blocks_fetched = stat->t_counts.t_blocks_fetched;
+        tabentry->blocks_hit = stat->t_counts.t_blocks_hit;
+
+        tabentry->vacuum_timestamp = 0;
+        tabentry->vacuum_count = 0;
+        tabentry->autovac_vacuum_timestamp = 0;
+        tabentry->autovac_vacuum_count = 0;
+        tabentry->analyze_timestamp = 0;
+        tabentry->analyze_count = 0;
+        tabentry->autovac_analyze_timestamp = 0;
+        tabentry->autovac_analyze_count = 0;
+    }
+    else
+    {
+        /*
+         * Otherwise add the values to the existing entry.
+         */
+        tabentry->numscans += stat->t_counts.t_numscans;
+        tabentry->tuples_returned += stat->t_counts.t_tuples_returned;
+        tabentry->tuples_fetched += stat->t_counts.t_tuples_fetched;
+        tabentry->tuples_inserted += stat->t_counts.t_tuples_inserted;
+        tabentry->tuples_updated += stat->t_counts.t_tuples_updated;
+        tabentry->tuples_deleted += stat->t_counts.t_tuples_deleted;
+        tabentry->tuples_hot_updated += stat->t_counts.t_tuples_hot_updated;
+        /* If table was truncated, first reset the live/dead counters */
+        if (stat->t_counts.t_truncated)
+        {
+            tabentry->n_live_tuples = 0;
+            tabentry->n_dead_tuples = 0;
+        }
+        tabentry->n_live_tuples += stat->t_counts.t_delta_live_tuples;
+        tabentry->n_dead_tuples += stat->t_counts.t_delta_dead_tuples;
+        tabentry->changes_since_analyze += stat->t_counts.t_changed_tuples;
+        tabentry->blocks_fetched += stat->t_counts.t_blocks_fetched;
+        tabentry->blocks_hit += stat->t_counts.t_blocks_hit;
+    }
+
+    /* Clamp n_live_tuples in case of negative delta_live_tuples */
+    tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
+    /* Likewise for n_dead_tuples */
+    tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
+    
+    dshash_release_lock(tabhash, tabentry);
+
+    return true;
 }
 
 
@@ -5757,14 +5532,15 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
  *    Arrange for dead table removal.
  * ----------
  */
-static void
-pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
+static bool
+pgstat_tabpurge(Oid dboid, Oid taboid)
 {
     dshash_table *tbl;
     PgStat_StatDBEntry *dbentry;
-    int            i;
 
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
+    /* wait for lock */
+    dbentry = pgstat_get_db_entry(dboid, PGSTAT_TABLE_WRITE, NULL);
+
     /*
      * No need to purge if we don't even know the database.
      */
@@ -5772,459 +5548,67 @@ pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
     {
         if (dbentry)
             dshash_release_lock(db_stats, dbentry);
-        return;
+        return true;
     }
 
     tbl = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-    /*
-     * Process all table entries in the message.
-     */
-    for (i = 0; i < msg->m_nentries; i++)
-    {
-        /* Remove from hashtable if present; we don't care if it's not. */
-        (void) dshash_delete_key(tbl, (void *) &(msg->m_tableid[i]));
-    }
+
+    /* Remove from hashtable if present; we don't care if it's not. */
+    (void) dshash_delete_key(tbl, (void *) &taboid);
 
     dshash_release_lock(db_stats, dbentry);
 
+    return true;
 }
 
 
 /* ----------
- * pgstat_recv_dropdb() -
- *
- *    Arrange for dead database removal
- * ----------
- */
-static void
-pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
-{
-    Oid            dbid = msg->m_databaseid;
-    PgStat_StatDBEntry *dbentry;
-
-    /*
-     * Lookup the database in the hashtable.
-     */
-    dbentry = pgstat_get_db_entry(dbid, false);
-
-    /*
-     * If found, remove it (along with the db statfile).
-     */
-    if (dbentry)
-    {
-        if (dbentry->tables != DSM_HANDLE_INVALID)
-        {
-            dshash_table *tbl =
-                dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-            dshash_destroy(tbl);
-        }
-        if (dbentry->functions != DSM_HANDLE_INVALID)
-        {
-            dshash_table *tbl =
-                dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
-            dshash_destroy(tbl);
-        }
-
-        dshash_delete_entry(db_stats, (void *)dbentry);
-    }
-}
-
-
-/* ----------
- * pgstat_recv_resetcounter() -
- *
- *    Reset the statistics for the specified database.
- * ----------
- */
-static void
-pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    /*
-     * Lookup the database in the hashtable.  Nothing to do if not there.
-     */
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
-    if (!dbentry)
-        return;
-
-    /*
-     * We simply throw away all the database's table entries by recreating a
-     * new hash table for them.
-     */
-    if (dbentry->tables != DSM_HANDLE_INVALID)
-    {
-        dshash_table *t =
-            dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-        dshash_destroy(t);
-        dbentry->tables = DSM_HANDLE_INVALID;
-    }
-    if (dbentry->functions != DSM_HANDLE_INVALID)
-    {
-        dshash_table *t =
-            dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
-        dshash_destroy(t);
-        dbentry->functions = DSM_HANDLE_INVALID;
-    }
-
-    /*
-     * Reset database-level stats, too.  This creates empty hash tables for
-     * tables and functions.
-     */
-    reset_dbentry_counters(dbentry);
-
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_resetshared() -
- *
- *    Reset some shared statistics of the cluster.
- * ----------
- */
-static void
-pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
-{
-    if (msg->m_resettarget == RESET_BGWRITER)
-    {
-        /* Reset the global background writer statistics for the cluster. */
-        memset(&shared_globalStats, 0, sizeof(shared_globalStats));
-        shared_globalStats->stat_reset_timestamp = GetCurrentTimestamp();
-    }
-    else if (msg->m_resettarget == RESET_ARCHIVER)
-    {
-        /* Reset the archiver statistics for the cluster. */
-        memset(&shared_archiverStats, 0, sizeof(shared_archiverStats));
-        shared_archiverStats->stat_reset_timestamp = GetCurrentTimestamp();
-    }
-
-    /*
-     * Presumably the sender of this message validated the target, don't
-     * complain here if it's not valid
-     */
-}
-
-/* ----------
- * pgstat_recv_resetsinglecounter() -
- *
- *    Reset a statistics for a single object
- * ----------
- */
-static void
-pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
-    if (!dbentry)
-        return;
-
-    /* Set the reset timestamp for the whole database */
-    dbentry->stat_reset_timestamp = GetCurrentTimestamp();
-
-    /* Remove object if it exists, ignore it if not */
-    if (msg->m_resettype == RESET_TABLE)
-    {
-        dshash_table *t =
-            dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-        dshash_delete_key(t, (void *) &(msg->m_objectid));
-    }
-    else if (msg->m_resettype == RESET_FUNCTION)
-    {
-        dshash_table *t =
-            dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
-        dshash_delete_key(t, (void *) &(msg->m_objectid));
-    }
-
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_autovac() -
- *
- *    Process an autovacuum signalling message.
- * ----------
- */
-static void
-pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    /*
-     * Store the last autovacuum time in the database's hashtable entry.
-     */
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    dbentry->last_autovac_time = msg->m_start_time;
-
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_vacuum() -
- *
- *    Process a VACUUM message.
- * ----------
- */
-static void
-pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-    PgStat_StatTabEntry *tabentry;
-    dshash_table *table;
-    /*
-     * Store the data in the table's hashtable entry.
-     */
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-    table = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-    tabentry = pgstat_get_tab_entry(table, msg->m_tableoid, true);
-
-    tabentry->n_live_tuples = msg->m_live_tuples;
-    tabentry->n_dead_tuples = msg->m_dead_tuples;
-
-    if (msg->m_autovacuum)
-    {
-        tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
-        tabentry->autovac_vacuum_count++;
-    }
-    else
-    {
-        tabentry->vacuum_timestamp = msg->m_vacuumtime;
-        tabentry->vacuum_count++;
-    }
-    dshash_release_lock(table, tabentry);
-    dshash_detach(table);
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_analyze() -
- *
- *    Process an ANALYZE message.
- * ----------
- */
-static void
-pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-    PgStat_StatTabEntry *tabentry;
-    dshash_table *table;
-
-    /*
-     * Store the data in the table's hashtable entry.
-     */
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    table = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
-    tabentry = pgstat_get_tab_entry(table, msg->m_tableoid, true);
-
-    tabentry->n_live_tuples = msg->m_live_tuples;
-    tabentry->n_dead_tuples = msg->m_dead_tuples;
-
-    /*
-     * If commanded, reset changes_since_analyze to zero.  This forgets any
-     * changes that were committed while the ANALYZE was in progress, but we
-     * have no good way to estimate how many of those there were.
-     */
-    if (msg->m_resetcounter)
-        tabentry->changes_since_analyze = 0;
-
-    if (msg->m_autovacuum)
-    {
-        tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
-        tabentry->autovac_analyze_count++;
-    }
-    else
-    {
-        tabentry->analyze_timestamp = msg->m_analyzetime;
-        tabentry->analyze_count++;
-    }
-    dshash_release_lock(table, tabentry);
-    dshash_detach(table);
-    dshash_release_lock(db_stats, dbentry);
-}
-
-
-/* ----------
- * pgstat_recv_archiver() -
- *
- *    Process a ARCHIVER message.
- * ----------
- */
-static void
-pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
-{
-    if (msg->m_failed)
-    {
-        /* Failed archival attempt */
-        ++shared_archiverStats->failed_count;
-        memcpy(shared_archiverStats->last_failed_wal, msg->m_xlog,
-               sizeof(shared_archiverStats->last_failed_wal));
-        shared_archiverStats->last_failed_timestamp = msg->m_timestamp;
-    }
-    else
-    {
-        /* Successful archival operation */
-        ++shared_archiverStats->archived_count;
-        memcpy(shared_archiverStats->last_archived_wal, msg->m_xlog,
-               sizeof(shared_archiverStats->last_archived_wal));
-        shared_archiverStats->last_archived_timestamp = msg->m_timestamp;
-    }
-}
-
-/* ----------
- * pgstat_recv_bgwriter() -
- *
- *    Process a BGWRITER message.
- * ----------
- */
-static void
-pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
-{
-    shared_globalStats->timed_checkpoints += msg->m_timed_checkpoints;
-    shared_globalStats->requested_checkpoints += msg->m_requested_checkpoints;
-    shared_globalStats->checkpoint_write_time += msg->m_checkpoint_write_time;
-    shared_globalStats->checkpoint_sync_time += msg->m_checkpoint_sync_time;
-    shared_globalStats->buf_written_checkpoints += msg->m_buf_written_checkpoints;
-    shared_globalStats->buf_written_clean += msg->m_buf_written_clean;
-    shared_globalStats->maxwritten_clean += msg->m_maxwritten_clean;
-    shared_globalStats->buf_written_backend += msg->m_buf_written_backend;
-    shared_globalStats->buf_fsync_backend += msg->m_buf_fsync_backend;
-    shared_globalStats->buf_alloc += msg->m_buf_alloc;
-}
-
-/* ----------
- * pgstat_recv_recoveryconflict() -
- *
- *    Process a RECOVERYCONFLICT message.
- * ----------
- */
-static void
-pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    switch (msg->m_reason)
-    {
-        case PROCSIG_RECOVERY_CONFLICT_DATABASE:
-
-            /*
-             * Since we drop the information about the database as soon as it
-             * replicates, there is no point in counting these conflicts.
-             */
-            break;
-        case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
-            dbentry->n_conflict_tablespace++;
-            break;
-        case PROCSIG_RECOVERY_CONFLICT_LOCK:
-            dbentry->n_conflict_lock++;
-            break;
-        case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
-            dbentry->n_conflict_snapshot++;
-            break;
-        case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
-            dbentry->n_conflict_bufferpin++;
-            break;
-        case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
-            dbentry->n_conflict_startup_deadlock++;
-            break;
-    }
-
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_deadlock() -
- *
- *    Process a DEADLOCK message.
- * ----------
- */
-static void
-pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    dbentry->n_deadlocks++;
-
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_tempfile() -
- *
- *    Process a TEMPFILE message.
- * ----------
- */
-static void
-pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    dbentry->n_temp_bytes += msg->m_filesize;
-    dbentry->n_temp_files += 1;
-
-    dshash_release_lock(db_stats, dbentry);
-}
-
-/* ----------
- * pgstat_recv_funcstat() -
+ * pgstat_funcstat() -
  *
  *    Count what the backend has done.
  * ----------
  */
-static void
-pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
+static bool
+pgstat_update_funcentry(dshash_table *funchash,
+                        PgStat_BackendFunctionEntry *stat)
 {
-    dshash_table *t;
-    PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
-    PgStat_StatDBEntry *dbentry;
     PgStat_StatFuncEntry *funcentry;
-    int            i;
     bool        found;
 
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+    funcentry = (PgStat_StatFuncEntry *)
+        dshash_find_or_insert_extended(funchash, (void *) &(stat->f_id),
+                                       &found, true);
 
-    t = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
-    /*
-     * Process all function entries in the message.
-     */
-    for (i = 0; i < msg->m_nentries; i++, funcmsg++)
+    if (funcentry == NULL)
+        return false;
+
+    if (!found)
     {
-        funcentry = (PgStat_StatFuncEntry *)
-            dshash_find_or_insert(t, (void *) &(funcmsg->f_id), &found);
-
-        if (!found)
-        {
-            /*
-             * If it's a new function entry, initialize counters to the values
-             * we just got.
-             */
-            funcentry->f_numcalls = funcmsg->f_numcalls;
-            funcentry->f_total_time = funcmsg->f_total_time;
-            funcentry->f_self_time = funcmsg->f_self_time;
-        }
-        else
-        {
-            /*
-             * Otherwise add the values to the existing entry.
-             */
-            funcentry->f_numcalls += funcmsg->f_numcalls;
-            funcentry->f_total_time += funcmsg->f_total_time;
-            funcentry->f_self_time += funcmsg->f_self_time;
-        }
-        dshash_release_lock(t, funcentry);
+        /*
+         * If it's a new function entry, initialize counters to the values
+         * we just got.
+         */
+        funcentry->f_numcalls = stat->f_counts.f_numcalls;
+        funcentry->f_total_time =
+            INSTR_TIME_GET_MICROSEC(stat->f_counts.f_total_time);
+        funcentry->f_self_time =
+            INSTR_TIME_GET_MICROSEC(stat->f_counts.f_self_time);
+    }
+    else
+    {
+        /*
+         * Otherwise add the values to the existing entry.
+         */
+        funcentry->f_numcalls += stat->f_counts.f_numcalls;
+        funcentry->f_total_time +=
+            INSTR_TIME_GET_MICROSEC(stat->f_counts.f_total_time);
+        funcentry->f_self_time +=
+            INSTR_TIME_GET_MICROSEC(stat->f_counts.f_self_time);
     }
 
-    dshash_detach(t);
-    dshash_release_lock(db_stats, dbentry);
+    dshash_release_lock(funchash, funcentry);
+
+    return true;
 }
 
 /* ----------
@@ -6233,32 +5617,30 @@ pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
  *    Arrange for dead function removal.
  * ----------
  */
-static void
-pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
+static bool
+pgstat_funcpurge(Oid dboid, Oid funcoid)
 {
     dshash_table *t;
     PgStat_StatDBEntry *dbentry;
-    int            i;
 
-    dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
+    /* wait for lock */
+    dbentry = pgstat_get_db_entry(dboid, PGSTAT_TABLE_WRITE, NULL);
 
     /*
      * No need to purge if we don't even know the database.
      */
     if (!dbentry || dbentry->functions == DSM_HANDLE_INVALID)
-        return;
+        return true;
 
     t = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
-    /*
-     * Process all function entries in the message.
-     */
-    for (i = 0; i < msg->m_nentries; i++)
-    {
-        /* Remove from hashtable if present; we don't care if it's not. */
-        dshash_delete_key(t, (void *) &(msg->m_functionid[i]));
-    }
+
+    /* Remove from hashtable if present; we don't care if it's not. */
+    dshash_delete_key(t, (void *) &funcoid);
     dshash_detach(t);
+
     dshash_release_lock(db_stats, dbentry);
+
+    return true;
 }
 
 /*
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index aeaf1f3ab4..2b64d313b9 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -1130,7 +1130,7 @@ DeadLockReport(void)
                          pgstat_get_backend_current_activity(info->pid, false));
     }
 
-    pgstat_report_deadlock();
+    pgstat_report_deadlock(false);
 
     ereport(ERROR,
             (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index afc1927250..ff97d6ab6e 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -39,31 +39,6 @@ typedef enum TrackFunctionsLevel
     TRACK_FUNC_ALL
 }            TrackFunctionsLevel;
 
-/* ----------
- * The types of backend -> collector messages
- * ----------
- */
-typedef enum StatMsgType
-{
-    PGSTAT_MTYPE_DUMMY,
-    PGSTAT_MTYPE_TABSTAT,
-    PGSTAT_MTYPE_TABPURGE,
-    PGSTAT_MTYPE_DROPDB,
-    PGSTAT_MTYPE_RESETCOUNTER,
-    PGSTAT_MTYPE_RESETSHAREDCOUNTER,
-    PGSTAT_MTYPE_RESETSINGLECOUNTER,
-    PGSTAT_MTYPE_AUTOVAC_START,
-    PGSTAT_MTYPE_VACUUM,
-    PGSTAT_MTYPE_ANALYZE,
-    PGSTAT_MTYPE_ARCHIVER,
-    PGSTAT_MTYPE_BGWRITER,
-    PGSTAT_MTYPE_FUNCSTAT,
-    PGSTAT_MTYPE_FUNCPURGE,
-    PGSTAT_MTYPE_RECOVERYCONFLICT,
-    PGSTAT_MTYPE_TEMPFILE,
-    PGSTAT_MTYPE_DEADLOCK
-} StatMsgType;
-
 /* ----------
  * The data type used for counters.
  * ----------
@@ -112,13 +87,6 @@ typedef struct PgStat_TableCounts
     PgStat_Counter t_blocks_hit;
 } PgStat_TableCounts;
 
-/* Possible targets for resetting cluster-wide shared values */
-typedef enum PgStat_Shared_Reset_Target
-{
-    RESET_ARCHIVER,
-    RESET_BGWRITER
-} PgStat_Shared_Reset_Target;
-
 /* Possible object types for resetting single counters */
 typedef enum PgStat_Single_Reset_Type
 {
@@ -177,242 +145,23 @@ typedef struct PgStat_TableXactStatus
 } PgStat_TableXactStatus;
 
 
-/* ------------------------------------------------------------
- * Message formats follow
- * ------------------------------------------------------------
- */
-
-
 /* ----------
- * PgStat_MsgHdr                The common message header
+ * PgStat_BgWriter            bgwriter statistics
  * ----------
  */
-typedef struct PgStat_MsgHdr
+typedef struct PgStat_BgWriter
 {
-    StatMsgType m_type;
-    int            m_size;
-} PgStat_MsgHdr;
-
-/* ----------
- * Space available in a message.  This will keep the UDP packets below 1K,
- * which should fit unfragmented into the MTU of the loopback interface.
- * (Larger values of PGSTAT_MAX_MSG_SIZE would work for that on most
- * platforms, but we're being conservative here.)
- * ----------
- */
-#define PGSTAT_MAX_MSG_SIZE 1000
-#define PGSTAT_MSG_PAYLOAD    (PGSTAT_MAX_MSG_SIZE - sizeof(PgStat_MsgHdr))
-
-
-/* ----------
- * PgStat_MsgDummy                A dummy message, ignored by the collector
- * ----------
- */
-typedef struct PgStat_MsgDummy
-{
-    PgStat_MsgHdr m_hdr;
-} PgStat_MsgDummy;
-
-/* ----------
- * PgStat_TableEntry            Per-table info in a MsgTabstat
- * ----------
- */
-typedef struct PgStat_TableEntry
-{
-    Oid            t_id;
-    PgStat_TableCounts t_counts;
-} PgStat_TableEntry;
-
-/* ----------
- * PgStat_MsgTabstat            Sent by the backend to report table
- *                                and buffer access statistics.
- * ----------
- */
-#define PGSTAT_NUM_TABENTRIES  \
-    ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - 3 * sizeof(int) - 2 * sizeof(PgStat_Counter))    \
-     / sizeof(PgStat_TableEntry))
-
-typedef struct PgStat_MsgTabstat
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    int            m_nentries;
-    int            m_xact_commit;
-    int            m_xact_rollback;
-    PgStat_Counter m_block_read_time;    /* times in microseconds */
-    PgStat_Counter m_block_write_time;
-    PgStat_TableEntry m_entry[PGSTAT_NUM_TABENTRIES];
-} PgStat_MsgTabstat;
-
-
-/* ----------
- * PgStat_MsgTabpurge            Sent by the backend to tell the collector
- *                                about dead tables.
- * ----------
- */
-#define PGSTAT_NUM_TABPURGE  \
-    ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int))  \
-     / sizeof(Oid))
-
-typedef struct PgStat_MsgTabpurge
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    int            m_nentries;
-    Oid            m_tableid[PGSTAT_NUM_TABPURGE];
-} PgStat_MsgTabpurge;
-
-
-/* ----------
- * PgStat_MsgDropdb                Sent by the backend to tell the collector
- *                                about a dropped database
- * ----------
- */
-typedef struct PgStat_MsgDropdb
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-} PgStat_MsgDropdb;
-
-
-/* ----------
- * PgStat_MsgResetcounter        Sent by the backend to tell the collector
- *                                to reset counters
- * ----------
- */
-typedef struct PgStat_MsgResetcounter
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-} PgStat_MsgResetcounter;
-
-/* ----------
- * PgStat_MsgResetsharedcounter Sent by the backend to tell the collector
- *                                to reset a shared counter
- * ----------
- */
-typedef struct PgStat_MsgResetsharedcounter
-{
-    PgStat_MsgHdr m_hdr;
-    PgStat_Shared_Reset_Target m_resettarget;
-} PgStat_MsgResetsharedcounter;
-
-/* ----------
- * PgStat_MsgResetsinglecounter Sent by the backend to tell the collector
- *                                to reset a single counter
- * ----------
- */
-typedef struct PgStat_MsgResetsinglecounter
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    PgStat_Single_Reset_Type m_resettype;
-    Oid            m_objectid;
-} PgStat_MsgResetsinglecounter;
-
-/* ----------
- * PgStat_MsgAutovacStart        Sent by the autovacuum daemon to signal
- *                                that a database is going to be processed
- * ----------
- */
-typedef struct PgStat_MsgAutovacStart
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    TimestampTz m_start_time;
-} PgStat_MsgAutovacStart;
-
-
-/* ----------
- * PgStat_MsgVacuum                Sent by the backend or autovacuum daemon
- *                                after VACUUM
- * ----------
- */
-typedef struct PgStat_MsgVacuum
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    Oid            m_tableoid;
-    bool        m_autovacuum;
-    TimestampTz m_vacuumtime;
-    PgStat_Counter m_live_tuples;
-    PgStat_Counter m_dead_tuples;
-} PgStat_MsgVacuum;
-
-
-/* ----------
- * PgStat_MsgAnalyze            Sent by the backend or autovacuum daemon
- *                                after ANALYZE
- * ----------
- */
-typedef struct PgStat_MsgAnalyze
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    Oid            m_tableoid;
-    bool        m_autovacuum;
-    bool        m_resetcounter;
-    TimestampTz m_analyzetime;
-    PgStat_Counter m_live_tuples;
-    PgStat_Counter m_dead_tuples;
-} PgStat_MsgAnalyze;
-
-
-/* ----------
- * PgStat_MsgArchiver            Sent by the archiver to update statistics.
- * ----------
- */
-typedef struct PgStat_MsgArchiver
-{
-    PgStat_MsgHdr m_hdr;
-    bool        m_failed;        /* Failed attempt */
-    char        m_xlog[MAX_XFN_CHARS + 1];
-    TimestampTz m_timestamp;
-} PgStat_MsgArchiver;
-
-/* ----------
- * PgStat_MsgBgWriter            Sent by the bgwriter to update statistics.
- * ----------
- */
-typedef struct PgStat_MsgBgWriter
-{
-    PgStat_MsgHdr m_hdr;
-
-    PgStat_Counter m_timed_checkpoints;
-    PgStat_Counter m_requested_checkpoints;
-    PgStat_Counter m_buf_written_checkpoints;
-    PgStat_Counter m_buf_written_clean;
-    PgStat_Counter m_maxwritten_clean;
-    PgStat_Counter m_buf_written_backend;
-    PgStat_Counter m_buf_fsync_backend;
-    PgStat_Counter m_buf_alloc;
-    PgStat_Counter m_checkpoint_write_time; /* times in milliseconds */
-    PgStat_Counter m_checkpoint_sync_time;
-} PgStat_MsgBgWriter;
-
-/* ----------
- * PgStat_MsgRecoveryConflict    Sent by the backend upon recovery conflict
- * ----------
- */
-typedef struct PgStat_MsgRecoveryConflict
-{
-    PgStat_MsgHdr m_hdr;
-
-    Oid            m_databaseid;
-    int            m_reason;
-} PgStat_MsgRecoveryConflict;
-
-/* ----------
- * PgStat_MsgTempFile    Sent by the backend upon creating a temp file
- * ----------
- */
-typedef struct PgStat_MsgTempFile
-{
-    PgStat_MsgHdr m_hdr;
-
-    Oid            m_databaseid;
-    size_t        m_filesize;
-} PgStat_MsgTempFile;
+    PgStat_Counter timed_checkpoints;
+    PgStat_Counter requested_checkpoints;
+    PgStat_Counter buf_written_checkpoints;
+    PgStat_Counter buf_written_clean;
+    PgStat_Counter maxwritten_clean;
+    PgStat_Counter buf_written_backend;
+    PgStat_Counter buf_fsync_backend;
+    PgStat_Counter buf_alloc;
+    PgStat_Counter checkpoint_write_time; /* times in milliseconds */
+    PgStat_Counter checkpoint_sync_time;
+} PgStat_BgWriter;
 
 /* ----------
  * PgStat_FunctionCounts    The actual per-function counts kept by a backend
@@ -453,78 +202,6 @@ typedef struct PgStat_FunctionEntry
     PgStat_Counter f_self_time;
 } PgStat_FunctionEntry;
 
-/* ----------
- * PgStat_MsgFuncstat            Sent by the backend to report function
- *                                usage statistics.
- * ----------
- */
-#define PGSTAT_NUM_FUNCENTRIES    \
-    ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int))  \
-     / sizeof(PgStat_FunctionEntry))
-
-typedef struct PgStat_MsgFuncstat
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    int            m_nentries;
-    PgStat_FunctionEntry m_entry[PGSTAT_NUM_FUNCENTRIES];
-} PgStat_MsgFuncstat;
-
-/* ----------
- * PgStat_MsgFuncpurge            Sent by the backend to tell the collector
- *                                about dead functions.
- * ----------
- */
-#define PGSTAT_NUM_FUNCPURGE  \
-    ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int))  \
-     / sizeof(Oid))
-
-typedef struct PgStat_MsgFuncpurge
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-    int            m_nentries;
-    Oid            m_functionid[PGSTAT_NUM_FUNCPURGE];
-} PgStat_MsgFuncpurge;
-
-/* ----------
- * PgStat_MsgDeadlock            Sent by the backend to tell the collector
- *                                about a deadlock that occurred.
- * ----------
- */
-typedef struct PgStat_MsgDeadlock
-{
-    PgStat_MsgHdr m_hdr;
-    Oid            m_databaseid;
-} PgStat_MsgDeadlock;
-
-
-/* ----------
- * PgStat_Msg                    Union over all possible messages.
- * ----------
- */
-typedef union PgStat_Msg
-{
-    PgStat_MsgHdr msg_hdr;
-    PgStat_MsgDummy msg_dummy;
-    PgStat_MsgTabstat msg_tabstat;
-    PgStat_MsgTabpurge msg_tabpurge;
-    PgStat_MsgDropdb msg_dropdb;
-    PgStat_MsgResetcounter msg_resetcounter;
-    PgStat_MsgResetsharedcounter msg_resetsharedcounter;
-    PgStat_MsgResetsinglecounter msg_resetsinglecounter;
-    PgStat_MsgAutovacStart msg_autovacuum;
-    PgStat_MsgVacuum msg_vacuum;
-    PgStat_MsgAnalyze msg_analyze;
-    PgStat_MsgArchiver msg_archiver;
-    PgStat_MsgBgWriter msg_bgwriter;
-    PgStat_MsgFuncstat msg_funcstat;
-    PgStat_MsgFuncpurge msg_funcpurge;
-    PgStat_MsgRecoveryConflict msg_recoveryconflict;
-    PgStat_MsgDeadlock msg_deadlock;
-} PgStat_Msg;
-
-
 /* ------------------------------------------------------------
  * Statistic collector data structures follow
  *
@@ -1111,7 +788,7 @@ extern char *pgstat_stat_filename;
 /*
  * BgWriter statistics counters are updated directly by bgwriter and bufmgr
  */
-extern PgStat_MsgBgWriter BgWriterStats;
+extern PgStat_BgWriter BgWriterStats;
 
 /*
  * Updated by pgstat_count_buffer_*_time macros
@@ -1135,8 +812,6 @@ extern void allow_immediate_pgstat_restart(void);
  * Functions called from backends
  * ----------
  */
-extern void pgstat_ping(void);
-
 extern void pgstat_report_stat(bool force);
 extern void pgstat_vacuum_stat(void);
 extern void pgstat_drop_database(Oid databaseid);
@@ -1154,7 +829,7 @@ extern void pgstat_report_analyze(Relation rel,
                       bool resetcounter);
 
 extern void pgstat_report_recovery_conflict(int reason);
-extern void pgstat_report_deadlock(void);
+extern void pgstat_report_deadlock(bool pending);
 
 extern void pgstat_initialize(void);
 extern void pgstat_bestart(void);
@@ -1328,4 +1003,6 @@ extern void PgstatCollectorMain(void) pg_attribute_noreturn();
 extern Size StatsShmemSize(void);
 extern void StatsShmemInit(void);
 
+extern void pgstat_cleanup_pending_stat(void);
+
 #endif                            /* PGSTAT_H */
-- 
2.16.3

From fdc3f14554cb80aa0cef1b3f75aa8978e1671cde Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 27 Sep 2018 11:15:19 +0900
Subject: [PATCH 7/8] Add conditional lock feature to dshash

Dshash currently waits for lock unconditionally. This commit adds new
interfaces for dshash_find anddshash_find_or_insert. The new
interfaces has an extra parameter "nowait" to command to return
immediately if required lock is not aqcuired.
---
 src/backend/lib/dshash.c | 58 ++++++++++++++++++++++++++++++++++++++++++++----
 src/include/lib/dshash.h |  6 ++++-
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c
index 5b133226ac..7584931515 100644
--- a/src/backend/lib/dshash.c
+++ b/src/backend/lib/dshash.c
@@ -383,6 +383,17 @@ dshash_get_hash_table_handle(dshash_table *hash_table)
  */
 void *
 dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
+{
+    return dshash_find_extended(hash_table, key, NULL, exclusive, false);
+}
+
+/*
+ * Addition to dshash_find, returns immediately when nowait is true and lock
+ * was not acquired. Lock status is set to *lock_failed if any.
+ */
+void *
+dshash_find_extended(dshash_table *hash_table, const void *key,
+                     bool *lock_acquired, bool exclusive, bool nowait)
 {
     dshash_hash hash;
     size_t        partition;
@@ -394,8 +405,23 @@ dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
     Assert(hash_table->control->magic == DSHASH_MAGIC);
     Assert(!hash_table->find_locked);
 
-    LWLockAcquire(PARTITION_LOCK(hash_table, partition),
-                  exclusive ? LW_EXCLUSIVE : LW_SHARED);
+    if (nowait)
+    {
+        if (!LWLockConditionalAcquire(PARTITION_LOCK(hash_table, partition),
+                                      exclusive ? LW_EXCLUSIVE : LW_SHARED))
+        {
+            if (lock_acquired)
+                *lock_acquired = false;
+            return NULL;
+        }
+    }
+    else
+        LWLockAcquire(PARTITION_LOCK(hash_table, partition),
+                      exclusive ? LW_EXCLUSIVE : LW_SHARED);
+
+    if (lock_acquired)
+        *lock_acquired = true;
+
     ensure_valid_bucket_pointers(hash_table);
 
     /* Search the active bucket. */
@@ -430,6 +456,22 @@ void *
 dshash_find_or_insert(dshash_table *hash_table,
                       const void *key,
                       bool *found)
+{
+    return dshash_find_or_insert_extended(hash_table, key, found, false);
+}
+
+/*
+ * Addition to dshash_find_or_insert, returns NULL if nowait is true and lock
+ * was not acquired.
+ *
+ * Notes above dshash_find_extended() regarding locking and error handling
+ * equally apply here.
+ */
+void *
+dshash_find_or_insert_extended(dshash_table *hash_table,
+                               const void *key,
+                               bool *found,
+                               bool nowait)
 {
     dshash_hash hash;
     size_t        partition_index;
@@ -444,8 +486,16 @@ dshash_find_or_insert(dshash_table *hash_table,
     Assert(!hash_table->find_locked);
 
 restart:
-    LWLockAcquire(PARTITION_LOCK(hash_table, partition_index),
-                  LW_EXCLUSIVE);
+    if (nowait)
+    {
+        if (!LWLockConditionalAcquire(
+                PARTITION_LOCK(hash_table, partition_index),
+                LW_EXCLUSIVE))
+            return NULL;
+    }
+    else
+        LWLockAcquire(PARTITION_LOCK(hash_table, partition_index),
+                      LW_EXCLUSIVE);
     ensure_valid_bucket_pointers(hash_table);
 
     /* Search the active bucket. */
diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h
index 8598d9ed84..b207585eeb 100644
--- a/src/include/lib/dshash.h
+++ b/src/include/lib/dshash.h
@@ -89,8 +89,12 @@ extern void dshash_destroy(dshash_table *hash_table);
 /* Finding, creating, deleting entries. */
 extern void *dshash_find(dshash_table *hash_table,
             const void *key, bool exclusive);
+extern void *dshash_find_extended(dshash_table *hash_table, const void *key,
+            bool *lock_acquired, bool exclusive, bool nowait);
 extern void *dshash_find_or_insert(dshash_table *hash_table,
-                      const void *key, bool *found);
+            const void *key, bool *found);
+extern void *dshash_find_or_insert_extended(dshash_table *hash_table,
+            const void *key, bool *found, bool nowait);
 extern bool dshash_delete_key(dshash_table *hash_table, const void *key);
 extern void dshash_delete_entry(dshash_table *hash_table, void *entry);
 extern void dshash_release_lock(dshash_table *hash_table, void *entry);
-- 
2.16.3

From d926ffaa496198d5b6aec91163f3b5e38bd1dec2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 4 Jul 2018 11:46:43 +0900
Subject: [PATCH 6/8] Remove pg_stat_tmp exclusion from pg_rewind

The directory "pg_stat_tmp" no longer exists so remove it from the
exclusion list.
---
 src/bin/pg_rewind/filemap.c | 7 -------
 1 file changed, 7 deletions(-)

diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 4ad7b2f207..075697be44 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -43,13 +43,6 @@ static bool check_file_excluded(const char *path, bool is_source);
  */
 static const char *excludeDirContents[] =
 {
-    /*
-     * Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped even
-     * when stats_temp_directory is set because PGSS_TEXT_FILE is always
-     * created there.
-     */
-    "pg_stat_tmp",                /* defined as PG_STAT_TMP_DIR */
-
     /*
      * It is generally not useful to backup the contents of this directory
      * even if the intention is to restore to another master. See backup.sgml
-- 
2.16.3

From c002149ee9eb8ef352930bedd132d70d8f83e898 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 4 Jul 2018 10:59:17 +0900
Subject: [PATCH 5/8] Let pg_stat_statements not to use PG_STAT_TMP_DIR.

This patchset removes the definition because pg_stat.c no longer uses
the directory and no other sutable module to pass it over. As a
tentative solution this patch moves query text file into permanent
stats directory. pg_basebackup and pg_rewind are conscious of the
directory. They currently omit the text file but becomes to copy it by
this change.
---
 contrib/pg_stat_statements/pg_stat_statements.c | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 33f9a79f54..ec2fa9881c 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -86,14 +86,11 @@ PG_MODULE_MAGIC;
 #define PGSS_DUMP_FILE    PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat"
 
 /*
- * Location of external query text file.  We don't keep it in the core
- * system's stats_temp_directory.  The core system can safely use that GUC
- * setting, because the statistics collector temp file paths are set only once
- * as part of changing the GUC, but pg_stat_statements has no way of avoiding
- * race conditions.  Besides, we only expect modest, infrequent I/O for query
- * strings, so placing the file on a faster filesystem is not compelling.
+ * Location of external query text file. We only expect modest, infrequent I/O
+ * for query strings, so placing the file on a faster filesystem is not
+ * compelling.
  */
-#define PGSS_TEXT_FILE    PG_STAT_TMP_DIR "/pgss_query_texts.stat"
+#define PGSS_TEXT_FILE    PGSTAT_STAT_PERMANENT_DIRECTORY "/pgss_query_texts.stat"
 
 /* Magic number identifying the stats file format */
 static const uint32 PGSS_FILE_HEADER = 0x20171004;
-- 
2.16.3

From f8d185198145e9319cbbefd9355aa07d32606ba4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 4 Jul 2018 11:44:31 +0900
Subject: [PATCH 4/8] Documentation update

Remove all description on pg_stat_tmp directory from documentation.
---
 doc/src/sgml/backup.sgml        |  4 +---
 doc/src/sgml/config.sgml        | 19 -------------------
 doc/src/sgml/func.sgml          |  3 +--
 doc/src/sgml/monitoring.sgml    |  7 +------
 doc/src/sgml/protocol.sgml      |  2 +-
 doc/src/sgml/ref/pg_rewind.sgml |  3 +--
 doc/src/sgml/storage.sgml       |  6 ------
 7 files changed, 5 insertions(+), 39 deletions(-)

diff --git a/doc/src/sgml/backup.sgml b/doc/src/sgml/backup.sgml
index 3fa5efdd78..31e94c1fe9 100644
--- a/doc/src/sgml/backup.sgml
+++ b/doc/src/sgml/backup.sgml
@@ -1116,11 +1116,9 @@ SELECT pg_stop_backup();
    <para>
     The contents of the directories <filename>pg_dynshmem/</filename>,
     <filename>pg_notify/</filename>, <filename>pg_serial/</filename>,
-    <filename>pg_snapshots/</filename>, <filename>pg_stat_tmp/</filename>,
+    <filename>pg_snapshots/</filename>,
     and <filename>pg_subtrans/</filename> (but not the directories themselves) can be
     omitted from the backup as they will be initialized on postmaster startup.
-    If <xref linkend="guc-stats-temp-directory"/> is set and is under the data
-    directory then the contents of that directory can also be omitted.
    </para>
 
    <para>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f11b8f724c..7a2cf74e6c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6114,25 +6114,6 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
       </listitem>
      </varlistentry>
 
-     <varlistentry id="guc-stats-temp-directory" xreflabel="stats_temp_directory">
-      <term><varname>stats_temp_directory</varname> (<type>string</type>)
-      <indexterm>
-       <primary><varname>stats_temp_directory</varname> configuration parameter</primary>
-      </indexterm>
-      </term>
-      <listitem>
-       <para>
-        Sets the directory to store temporary statistics data in. This can be
-        a path relative to the data directory or an absolute path. The default
-        is <filename>pg_stat_tmp</filename>. Pointing this at a RAM-based
-        file system will decrease physical I/O requirements and can lead to
-        improved performance.
-        This parameter can only be set in the <filename>postgresql.conf</filename>
-        file or on the server command line.
-       </para>
-      </listitem>
-     </varlistentry>
-
      </variablelist>
     </sect2>
 
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 9a7f683658..b6ad7bbed5 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -15953,8 +15953,7 @@ SELECT * FROM pg_ls_dir('.') WITH ORDINALITY AS t(ls,n);
  PG_VERSION      | 15
  pg_wal          | 16
  pg_hba.conf     | 17
- pg_stat_tmp     | 18
- pg_subtrans     | 19
+ pg_subtrans     | 18
 (19 rows)
 </programlisting>
   </para>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 0484cfa77a..bd50efcec8 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -197,12 +197,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
   <para>
    The statistics collector transmits the collected information to other
-   <productname>PostgreSQL</productname> processes through temporary files.
-   These files are stored in the directory named by the
-   <xref linkend="guc-stats-temp-directory"/> parameter,
-   <filename>pg_stat_tmp</filename> by default.
-   For better performance, <varname>stats_temp_directory</varname> can be
-   pointed at a RAM-based file system, decreasing physical I/O requirements.
+   <productname>PostgreSQL</productname> processes through shared memory.
    When the server shuts down cleanly, a permanent copy of the statistics
    data is stored in the <filename>pg_stat</filename> subdirectory, so that
    statistics can be retained across server restarts.  When recovery is
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index f0b2145208..11f263f378 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2612,7 +2612,7 @@ The commands accepted in replication mode are:
         <para>
          <filename>pg_dynshmem</filename>, <filename>pg_notify</filename>,
          <filename>pg_replslot</filename>, <filename>pg_serial</filename>,
-         <filename>pg_snapshots</filename>, <filename>pg_stat_tmp</filename>, and
+         <filename>pg_snapshots</filename>, and
          <filename>pg_subtrans</filename> are copied as empty directories (even if
          they are symbolic links).
         </para>
diff --git a/doc/src/sgml/ref/pg_rewind.sgml b/doc/src/sgml/ref/pg_rewind.sgml
index e2662bbf81..bf9c5dd580 100644
--- a/doc/src/sgml/ref/pg_rewind.sgml
+++ b/doc/src/sgml/ref/pg_rewind.sgml
@@ -270,8 +270,7 @@ PostgreSQL documentation
       (everything except the relation files). Similarly to base backups,
       the contents of the directories <filename>pg_dynshmem/</filename>,
       <filename>pg_notify/</filename>, <filename>pg_replslot/</filename>,
-      <filename>pg_serial/</filename>, <filename>pg_snapshots/</filename>,
-      <filename>pg_stat_tmp/</filename>, and
+      <filename>pg_serial/</filename>, <filename>pg_snapshots/</filename>, and
       <filename>pg_subtrans/</filename> are omitted from the data copied
       from the source cluster. Any file or directory beginning with
       <filename>pgsql_tmp</filename> is omitted, as well as are
diff --git a/doc/src/sgml/storage.sgml b/doc/src/sgml/storage.sgml
index 8ef2ac8010..5ee7493970 100644
--- a/doc/src/sgml/storage.sgml
+++ b/doc/src/sgml/storage.sgml
@@ -120,12 +120,6 @@ Item
   subsystem</entry>
 </row>
 
-<row>
- <entry><filename>pg_stat_tmp</filename></entry>
- <entry>Subdirectory containing temporary files for the statistics
-  subsystem</entry>
-</row>
-
 <row>
  <entry><filename>pg_subtrans</filename></entry>
  <entry>Subdirectory containing subtransaction status data</entry>
-- 
2.16.3

From 66841965d11bff65b029c7f1fa8b5ecf0cd47b2f Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 29 Jun 2018 17:05:46 +0900
Subject: [PATCH 3/8] dshash-based stats collector

Stats collector no longer uses files to distribute stats numbers. They
are now stored in dynamic shared hash. The stats entries are cached
one by one to give a consistent snapshot during a transaction. On the
other hand vacuum no longer take a complete cache of stats.

This patch removes PG_STAT_TMP_DIR and GUC stats_temp_directory.  That
affects pg_basebackup and pg_stat_statements but this patch fixes only
pg_basbackup. Fix for pg_stat_statements is done in another patch.
---
 src/backend/postmaster/autovacuum.c           |   59 +-
 src/backend/postmaster/pgstat.c               | 1566 ++++++++++++-------------
 src/backend/replication/basebackup.c          |   36 -
 src/backend/storage/ipc/ipci.c                |    2 +
 src/backend/storage/lmgr/lwlock.c             |    3 +
 src/backend/storage/lmgr/lwlocknames.txt      |    1 +
 src/backend/utils/misc/guc.c                  |   41 -
 src/backend/utils/misc/postgresql.conf.sample |    1 -
 src/bin/initdb/initdb.c                       |    1 -
 src/bin/pg_basebackup/t/010_pg_basebackup.pl  |    2 +-
 src/include/pgstat.h                          |   50 +-
 src/include/storage/lwlock.h                  |    3 +
 12 files changed, 836 insertions(+), 929 deletions(-)

diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 978089575b..65956c0c35 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -977,7 +977,7 @@ rebuild_database_list(Oid newdb)
         PgStat_StatDBEntry *entry;
 
         /* only consider this database if it has a pgstat entry */
-        entry = pgstat_fetch_stat_dbentry(newdb);
+        entry = backend_get_db_entry(newdb, true);
         if (entry != NULL)
         {
             /* we assume it isn't found because the hash was just created */
@@ -986,6 +986,7 @@ rebuild_database_list(Oid newdb)
             /* hash_search already filled in the key */
             db->adl_score = score++;
             /* next_worker is filled in later */
+            pfree(entry);
         }
     }
 
@@ -1001,7 +1002,7 @@ rebuild_database_list(Oid newdb)
          * skip databases with no stat entries -- in particular, this gets rid
          * of dropped databases
          */
-        entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+        entry = backend_get_db_entry(avdb->adl_datid, true);
         if (entry == NULL)
             continue;
 
@@ -1013,6 +1014,7 @@ rebuild_database_list(Oid newdb)
             db->adl_score = score++;
             /* next_worker is filled in later */
         }
+        pfree(entry);
     }
 
     /* finally, insert all qualifying databases not previously inserted */
@@ -1025,7 +1027,7 @@ rebuild_database_list(Oid newdb)
         PgStat_StatDBEntry *entry;
 
         /* only consider databases with a pgstat entry */
-        entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+        entry = backend_get_db_entry(avdb->adw_datid, true);
         if (entry == NULL)
             continue;
 
@@ -1037,6 +1039,7 @@ rebuild_database_list(Oid newdb)
             db->adl_score = score++;
             /* next_worker is filled in later */
         }
+        pfree(entry);
     }
     nelems = score;
 
@@ -1235,7 +1238,7 @@ do_start_worker(void)
             continue;            /* ignore not-at-risk DBs */
 
         /* Find pgstat entry if any */
-        tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
+        tmp->adw_entry = backend_get_db_entry(tmp->adw_datid, true);
 
         /*
          * Skip a database with no pgstat entry; it means it hasn't seen any
@@ -1273,16 +1276,22 @@ do_start_worker(void)
                 break;
             }
         }
-        if (skipit)
-            continue;
+        if (!skipit)
+        {
+            /* Remember the db with oldest autovac time. */
+            if (avdb == NULL ||
+                tmp->adw_entry->last_autovac_time <
+                avdb->adw_entry->last_autovac_time)
+            {
+                if (avdb)
+                    pfree(avdb->adw_entry);
+                avdb = tmp;
+            }
+        }
 
-        /*
-         * Remember the db with oldest autovac time.  (If we are here, both
-         * tmp->entry and db->entry must be non-null.)
-         */
-        if (avdb == NULL ||
-            tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
-            avdb = tmp;
+        /* Immediately free it if not used */
+        if(avdb != tmp)
+            pfree(tmp->adw_entry);
     }
 
     /* Found a database -- process it */
@@ -1971,7 +1980,7 @@ do_autovacuum(void)
      * may be NULL if we couldn't find an entry (only happens if we are
      * forcing a vacuum for anti-wrap purposes).
      */
-    dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+    dbentry = backend_get_db_entry(MyDatabaseId, true);
 
     /* Start a transaction so our commands have one to play into. */
     StartTransactionCommand();
@@ -2021,7 +2030,7 @@ do_autovacuum(void)
     MemoryContextSwitchTo(AutovacMemCxt);
 
     /* The database hash where pgstat keeps shared relations */
-    shared = pgstat_fetch_stat_dbentry(InvalidOid);
+    shared = backend_get_db_entry(InvalidOid, true);
 
     classRel = heap_open(RelationRelationId, AccessShareLock);
 
@@ -2107,6 +2116,8 @@ do_autovacuum(void)
         relation_needs_vacanalyze(relid, relopts, classForm, tabentry,
                                   effective_multixact_freeze_max_age,
                                   &dovacuum, &doanalyze, &wraparound);
+        if (tabentry)
+            pfree(tabentry);
 
         /* Relations that need work are added to table_oids */
         if (dovacuum || doanalyze)
@@ -2186,10 +2197,11 @@ do_autovacuum(void)
         /* Fetch the pgstat entry for this table */
         tabentry = get_pgstat_tabentry_relid(relid, classForm->relisshared,
                                              shared, dbentry);
-
         relation_needs_vacanalyze(relid, relopts, classForm, tabentry,
                                   effective_multixact_freeze_max_age,
                                   &dovacuum, &doanalyze, &wraparound);
+        if (tabentry)
+            pfree(tabentry);
 
         /* ignore analyze for toast tables */
         if (dovacuum)
@@ -2758,12 +2770,10 @@ get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *shared,
     if (isshared)
     {
         if (PointerIsValid(shared))
-            tabentry = hash_search(shared->tables, &relid,
-                                   HASH_FIND, NULL);
+            tabentry = backend_get_tab_entry(shared, relid, true);
     }
     else if (PointerIsValid(dbentry))
-        tabentry = hash_search(dbentry->tables, &relid,
-                               HASH_FIND, NULL);
+        tabentry = backend_get_tab_entry(dbentry, relid, true);
 
     return tabentry;
 }
@@ -2795,8 +2805,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
     /* use fresh stats */
     autovac_refresh_stats();
 
-    shared = pgstat_fetch_stat_dbentry(InvalidOid);
-    dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+    shared = backend_get_db_entry(InvalidOid, true);
+    dbentry = backend_get_db_entry(MyDatabaseId, true);
 
     /* fetch the relation's relcache entry */
     classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid));
@@ -2827,6 +2837,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
     relation_needs_vacanalyze(relid, avopts, classForm, tabentry,
                               effective_multixact_freeze_max_age,
                               &dovacuum, &doanalyze, &wraparound);
+    if (tabentry)
+        pfree(tabentry);
 
     /* ignore ANALYZE for toast tables */
     if (classForm->relkind == RELKIND_TOASTVALUE)
@@ -2917,7 +2929,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
     }
 
     heap_freetuple(classTup);
-
+    pfree(shared);
+    pfree(dbentry);
     return tab;
 }
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 999325ae53..a3d5f4856f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -77,22 +77,10 @@
 #define PGSTAT_STAT_INTERVAL    500 /* Minimum time between stats file
                                      * updates; in milliseconds. */
 
-#define PGSTAT_RETRY_DELAY        10    /* How long to wait between checks for a
-                                     * new file; in milliseconds. */
-
-#define PGSTAT_MAX_WAIT_TIME    10000    /* Maximum time to wait for a stats
-                                         * file update; in milliseconds. */
-
-#define PGSTAT_INQ_INTERVAL        640 /* How often to ping the collector for a
-                                     * new file; in milliseconds. */
-
 #define PGSTAT_RESTART_INTERVAL 60    /* How often to attempt to restart a
                                      * failed statistics collector; in
                                      * seconds. */
 
-#define PGSTAT_POLL_LOOP_COUNT    (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
-#define PGSTAT_INQ_LOOP_COUNT    (PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
-
 /* Minimum receive buffer size for the collector's socket. */
 #define PGSTAT_MIN_RCVBUF        (100 * 1024)
 
@@ -101,7 +89,6 @@
  * The initial size hints for the hash tables used in the collector.
  * ----------
  */
-#define PGSTAT_DB_HASH_SIZE        16
 #define PGSTAT_TAB_HASH_SIZE    512
 #define PGSTAT_FUNCTION_HASH_SIZE    512
 
@@ -127,14 +114,6 @@ bool        pgstat_track_counts = false;
 int            pgstat_track_functions = TRACK_FUNC_OFF;
 int            pgstat_track_activity_query_size = 1024;
 
-/* ----------
- * Built from GUC parameter
- * ----------
- */
-char       *pgstat_stat_directory = NULL;
-char       *pgstat_stat_filename = NULL;
-char       *pgstat_stat_tmpname = NULL;
-
 /*
  * BgWriter global statistics counters (unused in other processes).
  * Stored directly in a stats message structure so it can be sent
@@ -154,6 +133,43 @@ static time_t last_pgstat_start_time;
 
 static bool pgStatRunningInCollector = false;
 
+/* Shared stats bootstrap infomation */
+typedef struct StatsShmemStruct {
+    dsa_handle stats_dsa_handle;
+    dshash_table_handle db_stats_handle;
+    dsa_pointer    global_stats;
+    dsa_pointer    archiver_stats;
+} StatsShmemStruct;
+
+static StatsShmemStruct * StatsShmem = NULL;
+static dsa_area *area = NULL;
+static dshash_table *db_stats;
+static HTAB *snapshot_db_stats;
+static MemoryContext stats_cxt;
+
+/* dshash parameter for each type of table */
+static const dshash_parameters dsh_dbparams = {
+    sizeof(Oid),
+    sizeof(PgStat_StatDBEntry),
+    dshash_memcmp,
+    dshash_memhash,
+    LWTRANCHE_STATS_DB
+};
+static const dshash_parameters dsh_tblparams = {
+    sizeof(Oid),
+    sizeof(PgStat_StatTabEntry),
+    dshash_memcmp,
+    dshash_memhash,
+    LWTRANCHE_STATS_FUNC_TABLE
+};
+static const dshash_parameters dsh_funcparams = {
+    sizeof(Oid),
+    sizeof(PgStat_StatFuncEntry),
+    dshash_memcmp,
+    dshash_memhash,
+    LWTRANCHE_STATS_FUNC_TABLE
+};
+
 /*
  * Structures in which backends store per-table info that's waiting to be
  * sent to the collector.
@@ -250,12 +266,16 @@ static LocalPgBackendStatus *localBackendStatusTable = NULL;
 static int    localNumBackends = 0;
 
 /*
- * Cluster wide statistics, kept in the stats collector.
- * Contains statistics that are not collected per database
- * or per table.
+ * Cluster wide statistics.
+ * Contains statistics that are not collected per database or per table.
+ * shared_* are the statistics maintained by pgstats and snapshot_* are the
+ * snapshots taken on backends.
  */
-static PgStat_ArchiverStats archiverStats;
-static PgStat_GlobalStats globalStats;
+static PgStat_ArchiverStats *shared_archiverStats;
+static PgStat_ArchiverStats *snapshot_archiverStats;
+static PgStat_GlobalStats *shared_globalStats;
+static PgStat_GlobalStats *snapshot_globalStats;
+
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -285,23 +305,23 @@ static instr_time total_func_time;
 static pid_t pgstat_forkexec(void);
 #endif
 
+/* functions used in stats collector */
 static void pgstat_shutdown_handler(SIGNAL_ARGS);
 static void pgstat_quickdie_handler(SIGNAL_ARGS);
 static void pgstat_beshutdown_hook(int code, Datum arg);
 static void pgstat_sighup_handler(SIGNAL_ARGS);
 
 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
-static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
-                     Oid tableoid, bool create);
-static void pgstat_write_statsfiles(bool permanent, bool allDbs);
-static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
-static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
-static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
-static void backend_read_statsfile(void);
-static void pgstat_read_current_status(void);
+static PgStat_StatTabEntry *pgstat_get_tab_entry(dshash_table *table, Oid tableoid, bool create);
+static void pgstat_write_statsfiles(void);
+static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry);
+static void pgstat_read_statsfiles(void);
+static void pgstat_read_db_statsfile(Oid databaseid, dshash_table *tabhash, dshash_table *funchash);
 
-static bool pgstat_write_statsfile_needed(void);
-static bool pgstat_db_requested(Oid databaseid);
+/* functions used in backends */
+static bool backend_snapshot_global_stats(void);
+static PgStat_StatFuncEntry *backend_get_func_etnry(PgStat_StatDBEntry *dbent, Oid funcid, bool oneshot);
+static void pgstat_read_current_status(void);
 
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
@@ -320,7 +340,6 @@ static const char *pgstat_get_wait_io(WaitEventIO w);
 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
 static void pgstat_send(void *msg, int len);
 
-static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
@@ -685,7 +704,6 @@ pgstat_reset_remove_files(const char *directory)
 void
 pgstat_reset_all(void)
 {
-    pgstat_reset_remove_files(pgstat_stat_directory);
     pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
 }
 
@@ -915,6 +933,81 @@ pgstat_send_funcstats(void)
 }
 
 
+/* ----------
+ * pgstat_attach_shared_stats() -
+ *
+ *    attach existing shared stats memory
+ * ----------
+ */
+static bool
+pgstat_attach_shared_stats(void)
+{
+    MemoryContext oldcontext;
+
+    LWLockAcquire(StatsLock, LW_EXCLUSIVE);
+    if (StatsShmem->stats_dsa_handle == DSM_HANDLE_INVALID || area != NULL)
+    {
+        LWLockRelease(StatsLock);
+        return area != NULL;
+    }
+
+    /* top level varialbles. lives for the lifetime of the process */
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    area = dsa_attach(StatsShmem->stats_dsa_handle);
+    dsa_pin_mapping(area);
+    db_stats = dshash_attach(area, &dsh_dbparams,
+                             StatsShmem->db_stats_handle, 0);
+    snapshot_db_stats = NULL;
+    shared_globalStats = (PgStat_GlobalStats *)
+        dsa_get_address(area, StatsShmem->global_stats);
+    shared_archiverStats =    (PgStat_ArchiverStats *)
+        dsa_get_address(area, StatsShmem->archiver_stats);
+    MemoryContextSwitchTo(oldcontext);
+    LWLockRelease(StatsLock);
+
+    return true;
+}
+
+/* ----------
+ * pgstat_create_shared_stats() -
+ *
+ *    create shared stats memory
+ * ----------
+ */
+static void
+pgstat_create_shared_stats(void)
+{
+    MemoryContext oldcontext;
+
+    LWLockAcquire(StatsLock, LW_EXCLUSIVE);
+    Assert(StatsShmem->stats_dsa_handle == DSM_HANDLE_INVALID);
+
+    /* lives for the lifetime of the process */
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    area = dsa_create(LWTRANCHE_STATS_DSA);
+    dsa_pin_mapping(area);
+
+    db_stats = dshash_create(area, &dsh_dbparams, 0);
+
+    /* create shared area and write bootstrap information */
+    StatsShmem->stats_dsa_handle = dsa_get_handle(area);
+    StatsShmem->global_stats =
+        dsa_allocate0(area, sizeof(PgStat_GlobalStats));
+    StatsShmem->archiver_stats =
+        dsa_allocate0(area, sizeof(PgStat_ArchiverStats));
+    StatsShmem->db_stats_handle =
+        dshash_get_hash_table_handle(db_stats);
+
+    /* connect to the memory */
+    snapshot_db_stats = NULL;
+    shared_globalStats = (PgStat_GlobalStats *)
+        dsa_get_address(area, StatsShmem->global_stats);
+    shared_archiverStats = (PgStat_ArchiverStats *)
+        dsa_get_address(area, StatsShmem->archiver_stats);
+    MemoryContextSwitchTo(oldcontext);
+    LWLockRelease(StatsLock);
+}
+
 /* ----------
  * pgstat_vacuum_stat() -
  *
@@ -924,10 +1017,11 @@ pgstat_send_funcstats(void)
 void
 pgstat_vacuum_stat(void)
 {
-    HTAB       *htab;
+    HTAB       *oidtab;
     PgStat_MsgTabpurge msg;
     PgStat_MsgFuncpurge f_msg;
-    HASH_SEQ_STATUS hstat;
+    dshash_table *dshtable;
+    dshash_seq_status dshstat;
     PgStat_StatDBEntry *dbentry;
     PgStat_StatTabEntry *tabentry;
     PgStat_StatFuncEntry *funcentry;
@@ -936,23 +1030,22 @@ pgstat_vacuum_stat(void)
     if (pgStatSock == PGINVALID_SOCKET)
         return;
 
-    /*
-     * If not done for this transaction, read the statistics collector stats
-     * file into some hash tables.
-     */
-    backend_read_statsfile();
+    /* If not done for this transaction, take a snapshot of stats */
+    if (!backend_snapshot_global_stats())
+        return;
 
     /*
      * Read pg_database and make a list of OIDs of all existing databases
      */
-    htab = pgstat_collect_oids(DatabaseRelationId);
+    oidtab = pgstat_collect_oids(DatabaseRelationId);
 
     /*
      * Search the database hash table for dead databases and tell the
      * collector to drop them.
      */
-    hash_seq_init(&hstat, pgStatDBHash);
-    while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
+
+    dshash_seq_init(&dshstat, db_stats, true);
+    while ((dbentry = (PgStat_StatDBEntry *) dshash_seq_next(&dshstat)) != NULL)
     {
         Oid            dbid = dbentry->databaseid;
 
@@ -960,26 +1053,24 @@ pgstat_vacuum_stat(void)
 
         /* the DB entry for shared tables (with InvalidOid) is never dropped */
         if (OidIsValid(dbid) &&
-            hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
+            hash_search(oidtab, (void *) &dbid, HASH_FIND, NULL) == NULL)
             pgstat_drop_database(dbid);
     }
 
     /* Clean up */
-    hash_destroy(htab);
+    hash_destroy(oidtab);
 
     /*
      * Lookup our own database entry; if not found, nothing more to do.
      */
-    dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
-                                                 (void *) &MyDatabaseId,
-                                                 HASH_FIND, NULL);
-    if (dbentry == NULL || dbentry->tables == NULL)
+    dbentry = backend_get_db_entry(MyDatabaseId, true);
+    if (dbentry == NULL)
         return;
-
+    
     /*
      * Similarly to above, make a list of all known relations in this DB.
      */
-    htab = pgstat_collect_oids(RelationRelationId);
+    oidtab = pgstat_collect_oids(RelationRelationId);
 
     /*
      * Initialize our messages table counter to zero
@@ -988,15 +1079,17 @@ pgstat_vacuum_stat(void)
 
     /*
      * Check for all tables listed in stats hashtable if they still exist.
+     * Stats cache is useless here so directly search the shared hash.
      */
-    hash_seq_init(&hstat, dbentry->tables);
-    while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
+    dshtable = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+    dshash_seq_init(&dshstat, dshtable, false);
+    while ((tabentry = (PgStat_StatTabEntry *) dshash_seq_next(&dshstat)) != NULL)
     {
         Oid            tabid = tabentry->tableid;
 
         CHECK_FOR_INTERRUPTS();
 
-        if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
+        if (hash_search(oidtab, (void *) &tabid, HASH_FIND, NULL) != NULL)
             continue;
 
         /*
@@ -1019,6 +1112,7 @@ pgstat_vacuum_stat(void)
             msg.m_nentries = 0;
         }
     }
+    dshash_detach(dshtable);
 
     /*
      * Send the rest
@@ -1034,29 +1128,29 @@ pgstat_vacuum_stat(void)
     }
 
     /* Clean up */
-    hash_destroy(htab);
+    hash_destroy(oidtab);
 
     /*
      * Now repeat the above steps for functions.  However, we needn't bother
      * in the common case where no function stats are being collected.
      */
-    if (dbentry->functions != NULL &&
-        hash_get_num_entries(dbentry->functions) > 0)
+    dshtable = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+    if (dshash_get_num_entries(dshtable) > 0)
     {
-        htab = pgstat_collect_oids(ProcedureRelationId);
+        oidtab = pgstat_collect_oids(ProcedureRelationId);
 
         pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
         f_msg.m_databaseid = MyDatabaseId;
         f_msg.m_nentries = 0;
 
-        hash_seq_init(&hstat, dbentry->functions);
-        while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
+        dshash_seq_init(&dshstat, dshtable, false);
+        while ((funcentry = (PgStat_StatFuncEntry *) dshash_seq_next(&dshstat)) != NULL)
         {
             Oid            funcid = funcentry->functionid;
 
             CHECK_FOR_INTERRUPTS();
 
-            if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
+            if (hash_search(oidtab, (void *) &funcid, HASH_FIND, NULL) != NULL)
                 continue;
 
             /*
@@ -1089,8 +1183,9 @@ pgstat_vacuum_stat(void)
             pgstat_send(&f_msg, len);
         }
 
-        hash_destroy(htab);
+        hash_destroy(oidtab);
     }
+    dshash_detach(dshtable);
 }
 
 
@@ -1457,24 +1552,6 @@ pgstat_ping(void)
     pgstat_send(&msg, sizeof(msg));
 }
 
-/* ----------
- * pgstat_send_inquiry() -
- *
- *    Notify collector that we need fresh data.
- * ----------
- */
-static void
-pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
-{
-    PgStat_MsgInquiry msg;
-
-    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
-    msg.clock_time = clock_time;
-    msg.cutoff_time = cutoff_time;
-    msg.databaseid = databaseid;
-    pgstat_send(&msg, sizeof(msg));
-}
-
 
 /*
  * Initialize function call usage data.
@@ -2289,18 +2366,10 @@ pgstat_twophase_postabort(TransactionId xid, uint16 info,
 PgStat_StatDBEntry *
 pgstat_fetch_stat_dbentry(Oid dbid)
 {
-    /*
-     * If not done for this transaction, read the statistics collector stats
-     * file into some hash tables.
-     */
-    backend_read_statsfile();
+    PgStat_StatDBEntry *dbentry;
 
-    /*
-     * Lookup the requested database; return NULL if not found
-     */
-    return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
-                                              (void *) &dbid,
-                                              HASH_FIND, NULL);
+    dbentry = backend_get_db_entry(dbid, false);
+    return dbentry;
 }
 
 
@@ -2316,47 +2385,28 @@ pgstat_fetch_stat_dbentry(Oid dbid)
 PgStat_StatTabEntry *
 pgstat_fetch_stat_tabentry(Oid relid)
 {
-    Oid            dbid;
     PgStat_StatDBEntry *dbentry;
     PgStat_StatTabEntry *tabentry;
 
-    /*
-     * If not done for this transaction, read the statistics collector stats
-     * file into some hash tables.
-     */
-    backend_read_statsfile();
+    /* Lookup our database, then look in its table hash table. */
+    dbentry = backend_get_db_entry(MyDatabaseId, false);
+    if (dbentry == NULL)
+        return NULL;
 
-    /*
-     * Lookup our database, then look in its table hash table.
-     */
-    dbid = MyDatabaseId;
-    dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
-                                                 (void *) &dbid,
-                                                 HASH_FIND, NULL);
-    if (dbentry != NULL && dbentry->tables != NULL)
-    {
-        tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
-                                                       (void *) &relid,
-                                                       HASH_FIND, NULL);
-        if (tabentry)
-            return tabentry;
-    }
+    tabentry = backend_get_tab_entry(dbentry, relid, false);
+    if (tabentry != NULL)
+        return tabentry;
 
     /*
      * If we didn't find it, maybe it's a shared table.
      */
-    dbid = InvalidOid;
-    dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
-                                                 (void *) &dbid,
-                                                 HASH_FIND, NULL);
-    if (dbentry != NULL && dbentry->tables != NULL)
-    {
-        tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
-                                                       (void *) &relid,
-                                                       HASH_FIND, NULL);
-        if (tabentry)
-            return tabentry;
-    }
+    dbentry = backend_get_db_entry(InvalidOid, false);
+    if (dbentry == NULL)
+        return NULL;
+
+    tabentry = backend_get_tab_entry(dbentry, relid, false);
+    if (tabentry != NULL)
+        return tabentry;
 
     return NULL;
 }
@@ -2375,17 +2425,14 @@ pgstat_fetch_stat_funcentry(Oid func_id)
     PgStat_StatDBEntry *dbentry;
     PgStat_StatFuncEntry *funcentry = NULL;
 
-    /* load the stats file if needed */
-    backend_read_statsfile();
+    /* Lookup our database, then find the requested function */
+    dbentry = pgstat_get_db_entry(MyDatabaseId, false);
+    if (dbentry == NULL)
+        return NULL;
 
-    /* Lookup our database, then find the requested function.  */
-    dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
-    if (dbentry != NULL && dbentry->functions != NULL)
-    {
-        funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
-                                                         (void *) &func_id,
-                                                         HASH_FIND, NULL);
-    }
+    funcentry = backend_get_func_etnry(dbentry, func_id, false);
+    if (funcentry == NULL)
+        return NULL;
 
     return funcentry;
 }
@@ -2461,9 +2508,11 @@ pgstat_fetch_stat_numbackends(void)
 PgStat_ArchiverStats *
 pgstat_fetch_stat_archiver(void)
 {
-    backend_read_statsfile();
+    /* If not done for this transaction, take a stats snapshot */
+    if (!backend_snapshot_global_stats())
+        return NULL;
 
-    return &archiverStats;
+    return snapshot_archiverStats;
 }
 
 
@@ -2478,9 +2527,11 @@ pgstat_fetch_stat_archiver(void)
 PgStat_GlobalStats *
 pgstat_fetch_global(void)
 {
-    backend_read_statsfile();
+    /* If not done for this transaction, take a stats snapshot */
+    if (!backend_snapshot_global_stats())
+        return NULL;
 
-    return &globalStats;
+    return snapshot_globalStats;
 }
 
 
@@ -4186,18 +4237,14 @@ PgstatCollectorMain(void)
     pqsignal(SIGTTOU, SIG_DFL);
     pqsignal(SIGCONT, SIG_DFL);
     pqsignal(SIGWINCH, SIG_DFL);
-    PG_SETMASK(&UnBlockSig);
 
-    /*
-     * Identify myself via ps
-     */
-    init_ps_display("stats collector", "", "", "");
+    PG_SETMASK(&UnBlockSig);
 
     /*
      * Read in existing stats files or initialize the stats to zero.
      */
     pgStatRunningInCollector = true;
-    pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
+    pgstat_read_statsfiles();
 
     /*
      * Loop to process messages until we get SIGQUIT or detect ungraceful
@@ -4239,13 +4286,6 @@ PgstatCollectorMain(void)
                 ProcessConfigFile(PGC_SIGHUP);
             }
 
-            /*
-             * Write the stats file(s) if a new request has arrived that is
-             * not satisfied by existing file(s).
-             */
-            if (pgstat_write_statsfile_needed())
-                pgstat_write_statsfiles(false, false);
-
             /*
              * Try to receive and process a message.  This will not block,
              * since the socket is set to non-blocking mode.
@@ -4294,10 +4334,6 @@ PgstatCollectorMain(void)
                 case PGSTAT_MTYPE_DUMMY:
                     break;
 
-                case PGSTAT_MTYPE_INQUIRY:
-                    pgstat_recv_inquiry((PgStat_MsgInquiry *) &msg, len);
-                    break;
-
                 case PGSTAT_MTYPE_TABSTAT:
                     pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
                     break;
@@ -4386,9 +4422,7 @@ PgstatCollectorMain(void)
          * fixes that, so don't sleep indefinitely.  This is a crock of the
          * first water, but until somebody wants to debug exactly what's
          * happening there, this is the best we can do.  The two-second
-         * timeout matches our pre-9.2 behavior, and needs to be short enough
-         * to not provoke "using stale statistics" complaints from
-         * backend_read_statsfile.
+         * timeout matches our pre-9.2 behavior.
          */
         wr = WaitLatchOrSocket(MyLatch,
                                WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT,
@@ -4408,7 +4442,7 @@ PgstatCollectorMain(void)
     /*
      * Save the final stats to reuse at next startup.
      */
-    pgstat_write_statsfiles(true, true);
+    pgstat_write_statsfiles();
 
     exit(0);
 }
@@ -4466,14 +4500,14 @@ pgstat_shutdown_handler(SIGNAL_ARGS)
 }
 
 /*
- * Subroutine to clear stats in a database entry
+ * Subroutine to reset stats in a shared database entry
  *
  * Tables and functions hashes are initialized to empty.
  */
 static void
 reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
 {
-    HASHCTL        hash_ctl;
+    dshash_table *tbl;
 
     dbentry->n_xact_commit = 0;
     dbentry->n_xact_rollback = 0;
@@ -4499,20 +4533,17 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
     dbentry->stat_reset_timestamp = GetCurrentTimestamp();
     dbentry->stats_timestamp = 0;
 
-    memset(&hash_ctl, 0, sizeof(hash_ctl));
-    hash_ctl.keysize = sizeof(Oid);
-    hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
-    dbentry->tables = hash_create("Per-database table",
-                                  PGSTAT_TAB_HASH_SIZE,
-                                  &hash_ctl,
-                                  HASH_ELEM | HASH_BLOBS);
 
-    hash_ctl.keysize = sizeof(Oid);
-    hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
-    dbentry->functions = hash_create("Per-database function",
-                                     PGSTAT_FUNCTION_HASH_SIZE,
-                                     &hash_ctl,
-                                     HASH_ELEM | HASH_BLOBS);
+    tbl = dshash_create(area, &dsh_tblparams, 0);
+    dbentry->tables = dshash_get_hash_table_handle(tbl);
+    dshash_detach(tbl);
+
+    tbl = dshash_create(area, &dsh_funcparams, 0);
+    dbentry->functions = dshash_get_hash_table_handle(tbl);
+    dshash_detach(tbl);
+
+    dbentry->snapshot_tables = NULL;
+    dbentry->snapshot_functions = NULL;
 }
 
 /*
@@ -4525,15 +4556,18 @@ pgstat_get_db_entry(Oid databaseid, bool create)
 {
     PgStat_StatDBEntry *result;
     bool        found;
-    HASHACTION    action = (create ? HASH_ENTER : HASH_FIND);
+
+    Assert(pgStatRunningInCollector);
 
     /* Lookup or create the hash table entry for this database */
-    result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
-                                                &databaseid,
-                                                action, &found);
+    if (create)
+        result = (PgStat_StatDBEntry *)
+            dshash_find_or_insert(db_stats,    &databaseid, &found);
+    else
+        result = (PgStat_StatDBEntry *)    dshash_find(db_stats, &databaseid, true);
 
-    if (!create && !found)
-        return NULL;
+    if (!create)
+        return result;
 
     /*
      * If not found, initialize the new one.  This creates empty hash tables
@@ -4545,23 +4579,23 @@ pgstat_get_db_entry(Oid databaseid, bool create)
     return result;
 }
 
-
 /*
  * Lookup the hash table entry for the specified table. If no hash
  * table entry exists, initialize it, if the create parameter is true.
  * Else, return NULL.
  */
 static PgStat_StatTabEntry *
-pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
+pgstat_get_tab_entry(dshash_table *table, Oid tableoid, bool create)
 {
     PgStat_StatTabEntry *result;
     bool        found;
-    HASHACTION    action = (create ? HASH_ENTER : HASH_FIND);
 
     /* Lookup or create the hash table entry for this table */
-    result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
-                                                 &tableoid,
-                                                 action, &found);
+    if (create)
+        result = (PgStat_StatTabEntry *)
+            dshash_find_or_insert(table, &tableoid, &found);
+    else
+        result = (PgStat_StatTabEntry *) dshash_find(table, &tableoid, false);
 
     if (!create && !found)
         return NULL;
@@ -4599,25 +4633,20 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
  * pgstat_write_statsfiles() -
  *        Write the global statistics file, as well as requested DB files.
  *
- *    'permanent' specifies writing to the permanent files not temporary ones.
- *    When true (happens only when the collector is shutting down), also remove
- *    the temporary files so that backends starting up under a new postmaster
- *    can't read old data before the new collector is ready.
- *
  *    When 'allDbs' is false, only the requested databases (listed in
  *    pending_write_requests) will be written; otherwise, all databases
  *    will be written.
  * ----------
  */
 static void
-pgstat_write_statsfiles(bool permanent, bool allDbs)
+pgstat_write_statsfiles(void)
 {
-    HASH_SEQ_STATUS hstat;
+    dshash_seq_status hstat;
     PgStat_StatDBEntry *dbentry;
     FILE       *fpout;
     int32        format_id;
-    const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
-    const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
+    const char *tmpfile = PGSTAT_STAT_PERMANENT_TMPFILE;
+    const char *statfile = PGSTAT_STAT_PERMANENT_FILENAME;
     int            rc;
 
     elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -4638,7 +4667,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
     /*
      * Set the timestamp of the stats file.
      */
-    globalStats.stats_timestamp = GetCurrentTimestamp();
+    shared_globalStats->stats_timestamp = GetCurrentTimestamp();
 
     /*
      * Write the file header --- currently just a format ID.
@@ -4650,32 +4679,29 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
     /*
      * Write global stats struct
      */
-    rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
+    rc = fwrite(shared_globalStats, sizeof(shared_globalStats), 1, fpout);
     (void) rc;                    /* we'll check for error with ferror */
 
     /*
      * Write archiver stats struct
      */
-    rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
+    rc = fwrite(shared_archiverStats, sizeof(shared_archiverStats), 1, fpout);
     (void) rc;                    /* we'll check for error with ferror */
 
     /*
      * Walk through the database table.
      */
-    hash_seq_init(&hstat, pgStatDBHash);
-    while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
+    dshash_seq_init(&hstat, db_stats, false);
+    while ((dbentry = (PgStat_StatDBEntry *) dshash_seq_next(&hstat)) != NULL)
     {
         /*
          * Write out the table and function stats for this DB into the
          * appropriate per-DB stat file, if required.
          */
-        if (allDbs || pgstat_db_requested(dbentry->databaseid))
-        {
-            /* Make DB's timestamp consistent with the global stats */
-            dbentry->stats_timestamp = globalStats.stats_timestamp;
+        /* Make DB's timestamp consistent with the global stats */
+        dbentry->stats_timestamp = shared_globalStats->stats_timestamp;
 
-            pgstat_write_db_statsfile(dbentry, permanent);
-        }
+        pgstat_write_db_statsfile(dbentry);
 
         /*
          * Write out the DB entry. We don't write the tables or functions
@@ -4719,9 +4745,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
         unlink(tmpfile);
     }
 
-    if (permanent)
-        unlink(pgstat_stat_filename);
-
     /*
      * Now throw away the list of requests.  Note that requests sent after we
      * started the write are still waiting on the network socket.
@@ -4735,15 +4758,14 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
  * of length len.
  */
 static void
-get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
+get_dbstat_filename(bool tempname, Oid databaseid,
                     char *filename, int len)
 {
     int            printed;
 
     /* NB -- pgstat_reset_remove_files knows about the pattern this uses */
     printed = snprintf(filename, len, "%s/db_%u.%s",
-                       permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
-                       pgstat_stat_directory,
+                       PGSTAT_STAT_PERMANENT_DIRECTORY,
                        databaseid,
                        tempname ? "tmp" : "stat");
     if (printed >= len)
@@ -4761,10 +4783,10 @@ get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
  * ----------
  */
 static void
-pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
+pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry)
 {
-    HASH_SEQ_STATUS tstat;
-    HASH_SEQ_STATUS fstat;
+    dshash_seq_status tstat;
+    dshash_seq_status fstat;
     PgStat_StatTabEntry *tabentry;
     PgStat_StatFuncEntry *funcentry;
     FILE       *fpout;
@@ -4773,9 +4795,10 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
     int            rc;
     char        tmpfile[MAXPGPATH];
     char        statfile[MAXPGPATH];
+    dshash_table *tbl;
 
-    get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
-    get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
+    get_dbstat_filename(true, dbid, tmpfile, MAXPGPATH);
+    get_dbstat_filename(false, dbid, statfile, MAXPGPATH);
 
     elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -4802,24 +4825,28 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
     /*
      * Walk through the database's access stats per table.
      */
-    hash_seq_init(&tstat, dbentry->tables);
-    while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
+    tbl = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+    dshash_seq_init(&tstat, tbl, false);
+    while ((tabentry = (PgStat_StatTabEntry *) dshash_seq_next(&tstat)) != NULL)
     {
         fputc('T', fpout);
         rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
         (void) rc;                /* we'll check for error with ferror */
     }
+    dshash_detach(tbl);
 
     /*
      * Walk through the database's function stats table.
      */
-    hash_seq_init(&fstat, dbentry->functions);
-    while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
+    tbl = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+    dshash_seq_init(&fstat, tbl, false);
+    while ((funcentry = (PgStat_StatFuncEntry *) dshash_seq_next(&fstat)) != NULL)
     {
         fputc('F', fpout);
         rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
         (void) rc;                /* we'll check for error with ferror */
     }
+    dshash_detach(tbl);
 
     /*
      * No more output to be done. Close the temp file and replace the old
@@ -4853,76 +4880,45 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
                         tmpfile, statfile)));
         unlink(tmpfile);
     }
-
-    if (permanent)
-    {
-        get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
-
-        elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
-        unlink(statfile);
-    }
 }
 
 /* ----------
  * pgstat_read_statsfiles() -
  *
- *    Reads in some existing statistics collector files and returns the
- *    databases hash table that is the top level of the data.
+ *    Reads in some existing statistics collector files into the shared stats
+ *    hash.
  *
- *    If 'onlydb' is not InvalidOid, it means we only want data for that DB
- *    plus the shared catalogs ("DB 0").  We'll still populate the DB hash
- *    table for all databases, but we don't bother even creating table/function
- *    hash tables for other databases.
- *
- *    'permanent' specifies reading from the permanent files not temporary ones.
- *    When true (happens only when the collector is starting up), remove the
- *    files after reading; the in-memory status is now authoritative, and the
- *    files would be out of date in case somebody else reads them.
- *
- *    If a 'deep' read is requested, table/function stats are read, otherwise
- *    the table/function hash tables remain empty.
  * ----------
  */
-static HTAB *
-pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
+static void
+pgstat_read_statsfiles(void)
 {
     PgStat_StatDBEntry *dbentry;
     PgStat_StatDBEntry dbbuf;
-    HASHCTL        hash_ctl;
-    HTAB       *dbhash;
     FILE       *fpin;
     int32        format_id;
     bool        found;
-    const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
+    const char *statfile = PGSTAT_STAT_PERMANENT_FILENAME;
+    dshash_table *tblstats = NULL;
+    dshash_table *funcstats = NULL;
 
+    Assert(pgStatRunningInCollector);
     /*
      * The tables will live in pgStatLocalContext.
      */
     pgstat_setup_memcxt();
 
     /*
-     * Create the DB hashtable
+     * Create the DB hashtable and global stas area
      */
-    memset(&hash_ctl, 0, sizeof(hash_ctl));
-    hash_ctl.keysize = sizeof(Oid);
-    hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
-    hash_ctl.hcxt = pgStatLocalContext;
-    dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
-                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-    /*
-     * Clear out global and archiver statistics so they start from zero in
-     * case we can't load an existing statsfile.
-     */
-    memset(&globalStats, 0, sizeof(globalStats));
-    memset(&archiverStats, 0, sizeof(archiverStats));
+    pgstat_create_shared_stats();
 
     /*
      * Set the current timestamp (will be kept only in case we can't load an
      * existing statsfile).
      */
-    globalStats.stat_reset_timestamp = GetCurrentTimestamp();
-    archiverStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
+    shared_globalStats->stat_reset_timestamp = GetCurrentTimestamp();
+    shared_archiverStats->stat_reset_timestamp = shared_globalStats->stat_reset_timestamp;
 
     /*
      * Try to open the stats file. If it doesn't exist, the backends simply
@@ -4940,7 +4936,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
                     (errcode_for_file_access(),
                      errmsg("could not open statistics file \"%s\": %m",
                             statfile)));
-        return dbhash;
+        return;
     }
 
     /*
@@ -4957,11 +4953,11 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
     /*
      * Read global stats struct
      */
-    if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
+    if (fread(shared_globalStats, 1, sizeof(shared_globalStats), fpin) != sizeof(shared_globalStats))
     {
         ereport(pgStatRunningInCollector ? LOG : WARNING,
                 (errmsg("corrupted statistics file \"%s\"", statfile)));
-        memset(&globalStats, 0, sizeof(globalStats));
+        memset(shared_globalStats, 0, sizeof(*shared_globalStats));
         goto done;
     }
 
@@ -4972,17 +4968,16 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
      * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
      * an unusual scenario.
      */
-    if (pgStatRunningInCollector)
-        globalStats.stats_timestamp = 0;
+    shared_globalStats->stats_timestamp = 0;
 
     /*
      * Read archiver stats struct
      */
-    if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
+    if (fread(shared_archiverStats, 1, sizeof(shared_archiverStats), fpin) != sizeof(shared_archiverStats))
     {
         ereport(pgStatRunningInCollector ? LOG : WARNING,
                 (errmsg("corrupted statistics file \"%s\"", statfile)));
-        memset(&archiverStats, 0, sizeof(archiverStats));
+        memset(shared_archiverStats, 0, sizeof(*shared_archiverStats));
         goto done;
     }
 
@@ -5011,12 +5006,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
                 /*
                  * Add to the DB hash
                  */
-                dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
-                                                             (void *) &dbbuf.databaseid,
-                                                             HASH_ENTER,
-                                                             &found);
+                dbentry = (PgStat_StatDBEntry *)
+                    dshash_find_or_insert(db_stats, (void *) &dbbuf.databaseid,
+                                          &found);
                 if (found)
                 {
+                    dshash_release_lock(db_stats, dbentry);
                     ereport(pgStatRunningInCollector ? LOG : WARNING,
                             (errmsg("corrupted statistics file \"%s\"",
                                     statfile)));
@@ -5024,8 +5019,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
                 }
 
                 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
-                dbentry->tables = NULL;
-                dbentry->functions = NULL;
+                dbentry->tables = DSM_HANDLE_INVALID;
+                dbentry->functions = DSM_HANDLE_INVALID;
 
                 /*
                  * In the collector, disregard the timestamp we read from the
@@ -5033,47 +5028,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
                  * stats file immediately upon the first request from any
                  * backend.
                  */
-                if (pgStatRunningInCollector)
-                    dbentry->stats_timestamp = 0;
-
-                /*
-                 * Don't create tables/functions hashtables for uninteresting
-                 * databases.
-                 */
-                if (onlydb != InvalidOid)
-                {
-                    if (dbbuf.databaseid != onlydb &&
-                        dbbuf.databaseid != InvalidOid)
-                        break;
-                }
-
-                memset(&hash_ctl, 0, sizeof(hash_ctl));
-                hash_ctl.keysize = sizeof(Oid);
-                hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
-                hash_ctl.hcxt = pgStatLocalContext;
-                dbentry->tables = hash_create("Per-database table",
-                                              PGSTAT_TAB_HASH_SIZE,
-                                              &hash_ctl,
-                                              HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-                hash_ctl.keysize = sizeof(Oid);
-                hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
-                hash_ctl.hcxt = pgStatLocalContext;
-                dbentry->functions = hash_create("Per-database function",
-                                                 PGSTAT_FUNCTION_HASH_SIZE,
-                                                 &hash_ctl,
-                                                 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+                Assert(pgStatRunningInCollector);
+                dbentry->stats_timestamp = 0;
 
                 /*
                  * If requested, read the data from the database-specific
                  * file.  Otherwise we just leave the hashtables empty.
                  */
-                if (deep)
-                    pgstat_read_db_statsfile(dbentry->databaseid,
-                                             dbentry->tables,
-                                             dbentry->functions,
-                                             permanent);
-
+                tblstats = dshash_create(area, &dsh_tblparams, 0);
+                dbentry->tables = dshash_get_hash_table_handle(tblstats);
+                funcstats = dshash_create(area, &dsh_funcparams, 0);
+                dbentry->functions =
+                    dshash_get_hash_table_handle(funcstats);
+                dshash_release_lock(db_stats, dbentry);
+                pgstat_read_db_statsfile(dbentry->databaseid,
+                                         tblstats, funcstats);
+                dshash_detach(tblstats);
+                dshash_detach(funcstats);
                 break;
 
             case 'E':
@@ -5090,34 +5061,47 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 done:
     FreeFile(fpin);
 
-    /* If requested to read the permanent file, also get rid of it. */
-    if (permanent)
-    {
-        elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
-        unlink(statfile);
-    }
+    elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
+    unlink(statfile);
 
-    return dbhash;
+    return;
 }
 
 
+Size
+StatsShmemSize(void)
+{
+    return sizeof(StatsShmemStruct);
+}
+
+void
+StatsShmemInit(void)
+{
+    bool    found;
+
+    StatsShmem = (StatsShmemStruct *)
+        ShmemInitStruct("Stats area", StatsShmemSize(),
+                        &found);
+    if (!IsUnderPostmaster)
+    {
+        Assert(!found);
+
+        StatsShmem->stats_dsa_handle = DSM_HANDLE_INVALID;
+    }
+    else
+        Assert(found);
+}
+
 /* ----------
  * pgstat_read_db_statsfile() -
  *
- *    Reads in the existing statistics collector file for the given database,
- *    filling the passed-in tables and functions hash tables.
- *
- *    As in pgstat_read_statsfiles, if the permanent file is requested, it is
- *    removed after reading.
- *
- *    Note: this code has the ability to skip storing per-table or per-function
- *    data, if NULL is passed for the corresponding hashtable.  That's not used
- *    at the moment though.
+ *    Reads in the permanent statistics collector file and create shared
+ *    statistics tables. The file is removed afer reading.
  * ----------
  */
 static void
-pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
-                         bool permanent)
+pgstat_read_db_statsfile(Oid databaseid,
+                         dshash_table *tabhash, dshash_table *funchash)
 {
     PgStat_StatTabEntry *tabentry;
     PgStat_StatTabEntry tabbuf;
@@ -5128,7 +5112,8 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
     bool        found;
     char        statfile[MAXPGPATH];
 
-    get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
+    Assert(pgStatRunningInCollector);
+    get_dbstat_filename(false, databaseid, statfile, MAXPGPATH);
 
     /*
      * Try to open the stats file. If it doesn't exist, the backends simply
@@ -5187,12 +5172,13 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
                 if (tabhash == NULL)
                     break;
 
-                tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
-                                                               (void *) &tabbuf.tableid,
-                                                               HASH_ENTER, &found);
+                tabentry = (PgStat_StatTabEntry *)
+                    dshash_find_or_insert(tabhash,
+                                          (void *) &tabbuf.tableid, &found);
 
                 if (found)
                 {
+                    dshash_release_lock(tabhash, tabentry);
                     ereport(pgStatRunningInCollector ? LOG : WARNING,
                             (errmsg("corrupted statistics file \"%s\"",
                                     statfile)));
@@ -5200,6 +5186,7 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
                 }
 
                 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
+                dshash_release_lock(tabhash, tabentry);
                 break;
 
                 /*
@@ -5221,9 +5208,9 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
                 if (funchash == NULL)
                     break;
 
-                funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
-                                                                 (void *) &funcbuf.functionid,
-                                                                 HASH_ENTER, &found);
+                funcentry = (PgStat_StatFuncEntry *)
+                    dshash_find_or_insert(funchash,
+                                          (void *) &funcbuf.functionid, &found);
 
                 if (found)
                 {
@@ -5234,6 +5221,7 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
                 }
 
                 memcpy(funcentry, &funcbuf, sizeof(funcbuf));
+                dshash_release_lock(funchash, funcentry);
                 break;
 
                 /*
@@ -5253,276 +5241,355 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
 done:
     FreeFile(fpin);
 
-    if (permanent)
-    {
-        elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
-        unlink(statfile);
-    }
+    elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
+    unlink(statfile);
 }
 
 /* ----------
- * pgstat_read_db_statsfile_timestamp() -
+ * backend_clean_snapshot_callback() -
  *
- *    Attempt to determine the timestamp of the last db statfile write.
- *    Returns true if successful; the timestamp is stored in *ts.
- *
- *    This needs to be careful about handling databases for which no stats file
- *    exists, such as databases without a stat entry or those not yet written:
- *
- *    - if there's a database entry in the global file, return the corresponding
- *    stats_timestamp value.
- *
- *    - if there's no db stat entry (e.g. for a new or inactive database),
- *    there's no stats_timestamp value, but also nothing to write so we return
- *    the timestamp of the global statfile.
+ *    This is usually called with arg = NULL when the memory context where the
+ *  current snapshot has been taken. Don't bother releasing memory in the
+ *  case.
  * ----------
  */
-static bool
-pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
-                                   TimestampTz *ts)
+static void
+backend_clean_snapshot_callback(void *arg)
 {
-    PgStat_StatDBEntry dbentry;
-    PgStat_GlobalStats myGlobalStats;
-    PgStat_ArchiverStats myArchiverStats;
-    FILE       *fpin;
-    int32        format_id;
-    const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
-
-    /*
-     * Try to open the stats file.  As above, anything but ENOENT is worthy of
-     * complaining about.
-     */
-    if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
+    if (arg != NULL)
     {
-        if (errno != ENOENT)
-            ereport(pgStatRunningInCollector ? LOG : WARNING,
-                    (errcode_for_file_access(),
-                     errmsg("could not open statistics file \"%s\": %m",
-                            statfile)));
-        return false;
-    }
+        /* explicitly called, so explicitly free resources */
+        if (snapshot_globalStats)
+            pfree(snapshot_globalStats);
 
-    /*
-     * Verify it's of the expected format.
-     */
-    if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
-        format_id != PGSTAT_FILE_FORMAT_ID)
-    {
-        ereport(pgStatRunningInCollector ? LOG : WARNING,
-                (errmsg("corrupted statistics file \"%s\"", statfile)));
-        FreeFile(fpin);
-        return false;
-    }
+        if (snapshot_archiverStats)
+            pfree(snapshot_archiverStats);
 
-    /*
-     * Read global stats struct
-     */
-    if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
-              fpin) != sizeof(myGlobalStats))
-    {
-        ereport(pgStatRunningInCollector ? LOG : WARNING,
-                (errmsg("corrupted statistics file \"%s\"", statfile)));
-        FreeFile(fpin);
-        return false;
-    }
-
-    /*
-     * Read archiver stats struct
-     */
-    if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
-              fpin) != sizeof(myArchiverStats))
-    {
-        ereport(pgStatRunningInCollector ? LOG : WARNING,
-                (errmsg("corrupted statistics file \"%s\"", statfile)));
-        FreeFile(fpin);
-        return false;
-    }
-
-    /* By default, we're going to return the timestamp of the global file. */
-    *ts = myGlobalStats.stats_timestamp;
-
-    /*
-     * We found an existing collector stats file.  Read it and look for a
-     * record for the requested database.  If found, use its timestamp.
-     */
-    for (;;)
-    {
-        switch (fgetc(fpin))
+        if (snapshot_db_stats)
         {
-                /*
-                 * 'D'    A PgStat_StatDBEntry struct describing a database
-                 * follows.
-                 */
-            case 'D':
-                if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
-                          fpin) != offsetof(PgStat_StatDBEntry, tables))
-                {
-                    ereport(pgStatRunningInCollector ? LOG : WARNING,
-                            (errmsg("corrupted statistics file \"%s\"",
-                                    statfile)));
-                    goto done;
-                }
+            HASH_SEQ_STATUS seq;
+            PgStat_StatDBEntry *dbent;
 
-                /*
-                 * If this is the DB we're looking for, save its timestamp and
-                 * we're done.
-                 */
-                if (dbentry.databaseid == databaseid)
-                {
-                    *ts = dbentry.stats_timestamp;
-                    goto done;
-                }
-
-                break;
-
-            case 'E':
-                goto done;
-
-            default:
-                ereport(pgStatRunningInCollector ? LOG : WARNING,
-                        (errmsg("corrupted statistics file \"%s\"",
-                                statfile)));
-                goto done;
+            hash_seq_init(&seq, snapshot_db_stats);
+            while ((dbent = hash_seq_search(&seq)) != NULL)
+            {
+                if (dbent->snapshot_tables)
+                    hash_destroy(dbent->snapshot_tables);
+                if (dbent->snapshot_functions)
+                    hash_destroy(dbent->snapshot_functions);
+            }
+            hash_destroy(snapshot_db_stats);
         }
     }
 
-done:
-    FreeFile(fpin);
-    return true;
+    /* mark as the resource are not allocated */
+    snapshot_globalStats = NULL;
+    snapshot_archiverStats = NULL;
+    snapshot_db_stats = NULL;
 }
 
 /*
- * If not already done, read the statistics collector stats file into
- * some hash tables.  The results will be kept until pgstat_clear_snapshot()
- * is called (typically, at end of transaction).
+ * create_local_stats_hash() -
+ *
+ * Creates a dynahash used for table/function stats cache.
  */
-static void
-backend_read_statsfile(void)
+static HTAB *
+create_local_stats_hash(const char *name, size_t keysize, size_t entrysize,
+                        int nentries)
 {
-    TimestampTz min_ts = 0;
-    TimestampTz ref_ts = 0;
-    Oid            inquiry_db;
-    int            count;
+    HTAB *result;
+    HASHCTL ctl;
 
-    /* already read it? */
-    if (pgStatDBHash)
-        return;
-    Assert(!pgStatRunningInCollector);
-
-    /*
-     * In a normal backend, we check staleness of the data for our own DB, and
-     * so we send MyDatabaseId in inquiry messages.  In the autovac launcher,
-     * check staleness of the shared-catalog data, and send InvalidOid in
-     * inquiry messages so as not to force writing unnecessary data.
-     */
-    if (IsAutoVacuumLauncherProcess())
-        inquiry_db = InvalidOid;
-    else
-        inquiry_db = MyDatabaseId;
-
-    /*
-     * Loop until fresh enough stats file is available or we ran out of time.
-     * The stats inquiry message is sent repeatedly in case collector drops
-     * it; but not every single time, as that just swamps the collector.
-     */
-    for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
-    {
-        bool        ok;
-        TimestampTz file_ts = 0;
-        TimestampTz cur_ts;
-
-        CHECK_FOR_INTERRUPTS();
-
-        ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
-
-        cur_ts = GetCurrentTimestamp();
-        /* Calculate min acceptable timestamp, if we didn't already */
-        if (count == 0 || cur_ts < ref_ts)
-        {
-            /*
-             * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
-             * msec before now.  This indirectly ensures that the collector
-             * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
-             * an autovacuum worker, however, we want a lower delay to avoid
-             * using stale data, so we use PGSTAT_RETRY_DELAY (since the
-             * number of workers is low, this shouldn't be a problem).
-             *
-             * We don't recompute min_ts after sleeping, except in the
-             * unlikely case that cur_ts went backwards.  So we might end up
-             * accepting a file a bit older than PGSTAT_STAT_INTERVAL.  In
-             * practice that shouldn't happen, though, as long as the sleep
-             * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
-             * tell the collector that our cutoff time is less than what we'd
-             * actually accept.
-             */
-            ref_ts = cur_ts;
-            if (IsAutoVacuumWorkerProcess())
-                min_ts = TimestampTzPlusMilliseconds(ref_ts,
-                                                     -PGSTAT_RETRY_DELAY);
-            else
-                min_ts = TimestampTzPlusMilliseconds(ref_ts,
-                                                     -PGSTAT_STAT_INTERVAL);
-        }
-
-        /*
-         * If the file timestamp is actually newer than cur_ts, we must have
-         * had a clock glitch (system time went backwards) or there is clock
-         * skew between our processor and the stats collector's processor.
-         * Accept the file, but send an inquiry message anyway to make
-         * pgstat_recv_inquiry do a sanity check on the collector's time.
-         */
-        if (ok && file_ts > cur_ts)
-        {
-            /*
-             * A small amount of clock skew between processors isn't terribly
-             * surprising, but a large difference is worth logging.  We
-             * arbitrarily define "large" as 1000 msec.
-             */
-            if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
-            {
-                char       *filetime;
-                char       *mytime;
-
-                /* Copy because timestamptz_to_str returns a static buffer */
-                filetime = pstrdup(timestamptz_to_str(file_ts));
-                mytime = pstrdup(timestamptz_to_str(cur_ts));
-                elog(LOG, "stats collector's time %s is later than backend local time %s",
-                     filetime, mytime);
-                pfree(filetime);
-                pfree(mytime);
-            }
-
-            pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
-            break;
-        }
-
-        /* Normal acceptance case: file is not older than cutoff time */
-        if (ok && file_ts >= min_ts)
-            break;
-
-        /* Not there or too old, so kick the collector and wait a bit */
-        if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
-            pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
-
-        pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
-    }
-
-    if (count >= PGSTAT_POLL_LOOP_COUNT)
-        ereport(LOG,
-                (errmsg("using stale statistics instead of current ones "
-                        "because stats collector is not responding")));
-
-    /*
-     * Autovacuum launcher wants stats about all databases, but a shallow read
-     * is sufficient.  Regular backends want a deep read for just the tables
-     * they can see (MyDatabaseId + shared catalogs).
-     */
-    if (IsAutoVacuumLauncherProcess())
-        pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
-    else
-        pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
+    /* Create the hash in the stats context */
+    ctl.keysize        = keysize;
+    ctl.entrysize    = entrysize;
+    ctl.hcxt        = stats_cxt;
+    result = hash_create(name, nentries, &ctl,
+                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+    return result;
 }
 
+/*
+ * snapshot_statentry() - Find an entriy from source dshash.
+ *
+ * Returns the entry for key or NULL if not found. If dest is not null, uses
+ * *dest as local cache, which is created in the same shape with the given
+ * dshash when *dest is NULL. In both cases the result is cached in the hash
+ * and the same entry is returned to subsequent calls for the same key.
+ * 
+ * Otherwise returned entry is a copy that is palloc'ed in the current memory
+ * context. Its content may differ for every request.
+ *
+ * If dshash is NULL, temporaralily attaches dsh_handle instead.
+ */
+static void *
+snapshot_statentry(HTAB **dest, const char *hashname,
+                   dshash_table *dshash, dshash_table_handle dsh_handle,
+                   const dshash_parameters *dsh_params, Oid key)
+{
+    void *lentry = NULL;
+    size_t keysize = dsh_params->key_size;
+    size_t entrysize = dsh_params->entry_size;
+
+    if (dest)
+    {
+        /* caches the result entry */
+        bool found;
+
+        /*
+         * Create new hash with arbitrary initial entries since we don't know
+         * how this hash will grow.
+         */
+        if (!*dest)
+        {
+            Assert(hashname);
+            *dest = create_local_stats_hash(hashname, keysize, entrysize, 32);
+        }
+
+        lentry = hash_search(*dest, &key, HASH_ENTER, &found);
+        if (!found)
+        {
+            dshash_table *t = dshash;
+            void *sentry;
+
+            if (!t)
+                t = dshash_attach(area, dsh_params, dsh_handle, NULL);
+
+            sentry = dshash_find(t, &key, false);
+
+            /*
+             * We expect that the stats for specified database exists in most
+             * cases.
+             */
+            if (!sentry)
+            {
+                hash_search(*dest, &key, HASH_REMOVE, NULL);
+                if (!dshash)
+                    dshash_detach(t);
+                return NULL;
+            }
+            memcpy(lentry, sentry, entrysize);
+            dshash_release_lock(t, sentry);
+
+            if (!dshash)
+                dshash_detach(t);
+        }
+    }
+    else
+    {
+        /*
+         * The caller don't want caching. Just make a copy of the entry then
+         * return.
+         */
+        dshash_table *t = dshash;
+        void *sentry;
+
+        if (!t)
+            t = dshash_attach(area, dsh_params, dsh_handle, NULL);
+
+        sentry = dshash_find(t, &key, false);
+        if (sentry)
+        {
+            lentry = palloc(entrysize);
+            memcpy(lentry, sentry, entrysize);
+            dshash_release_lock(t, sentry);
+        }
+
+        if (!dshash)
+            dshash_detach(t);
+    }
+
+    return lentry;
+}
+
+/*
+ * snapshot_statentry_all() - Take a snapshot of all shared stats entries
+ *
+ * Returns a local hash contains all entries in the shared stats.
+ *
+ * The given dshash is used if any. Elsewise temporarily attach dsh_handle.
+ */
+static HTAB *
+snapshot_statentry_all(const char *hashname,
+                       dshash_table *dshash, dshash_table_handle dsh_handle,
+                       const dshash_parameters *dsh_params)
+{
+    dshash_table *t;
+    dshash_seq_status s;
+    size_t keysize = dsh_params->key_size;
+    size_t entrysize = dsh_params->entry_size;
+    void *ps;
+    int num_entries;
+    HTAB *dest;
+
+    t = dshash ? dshash : dshash_attach(area, dsh_params, dsh_handle, NULL);
+
+    /*
+     * No need to create new hash if no entry exists. The number can change
+     * after this, but dynahash can store extra entries in the case.
+     */
+    num_entries = dshash_get_num_entries(t);
+    if (num_entries == 0)
+    {
+        dshash_detach(t);
+        return NULL;
+    }
+
+    Assert(hashname);
+    dest = create_local_stats_hash(hashname,
+                                    keysize, entrysize, num_entries);
+
+    dshash_seq_init(&s, t, true);
+    while ((ps = dshash_seq_next(&s)) != NULL)
+    {
+        bool found;
+        void *pd = hash_search(dest, ps, HASH_ENTER, &found);
+        Assert(!found);
+        memcpy(pd, ps, entrysize);
+        /* dshash_seq_next releases entry lock automatically */
+    }
+
+    if (!dshash)
+        dshash_detach(t);
+
+    return dest;
+}
+
+/*
+ * backend_snapshot_global_stats() -
+ *
+ * Makes a local copy of global stats if not already done.  They will be kept
+ * until pgstat_clear_snapshot() is called or the end of the current memory
+ * context (typically TopTransactionContext).  Returns false if the shared
+ * stats is not created yet.
+ */
+static bool
+backend_snapshot_global_stats(void)
+{
+    MemoryContext oldcontext;
+    MemoryContextCallback *mcxt_cb;
+
+    /* Nothing to do if already done */
+    if (snapshot_globalStats)
+        return true;
+
+    Assert(!pgStatRunningInCollector);
+
+    /* Attached shared memory lives for the process lifetime */
+    oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+    if (!pgstat_attach_shared_stats())
+    {
+        MemoryContextSwitchTo(oldcontext);
+        return false;
+    }
+    MemoryContextSwitchTo(oldcontext);
+
+    Assert(snapshot_archiverStats == NULL);
+
+    /*
+     * The snapshot lives within the current top transaction if any, or the
+     * current memory context liftime otherwise.
+     */
+    if (IsTransactionState())
+        MemoryContextSwitchTo(TopTransactionContext);
+
+    /* Remember for stats memory allocation later */
+    stats_cxt = CurrentMemoryContext;
+
+    /* global stats can be just copied  */
+    snapshot_globalStats = palloc(sizeof(PgStat_GlobalStats));
+    memcpy(snapshot_globalStats, shared_globalStats,
+           sizeof(PgStat_GlobalStats));
+
+    snapshot_archiverStats = palloc(sizeof(PgStat_ArchiverStats));
+    memcpy(snapshot_archiverStats, shared_archiverStats,
+           sizeof(PgStat_ArchiverStats));
+
+    /* set the timestamp of this snapshot */
+    snapshot_globalStats->stats_timestamp = GetCurrentTimestamp();
+
+    /* register callback to clear snapshot */
+    mcxt_cb = (MemoryContextCallback *)palloc(sizeof(MemoryContextCallback));
+    mcxt_cb->func = backend_clean_snapshot_callback;
+    mcxt_cb->arg = NULL;
+    MemoryContextRegisterResetCallback(CurrentMemoryContext, mcxt_cb);
+    MemoryContextSwitchTo(oldcontext);
+
+    return true;
+}
+
+/* ----------
+ * backend_get_db_entry() -
+ *
+ *    Find database stats entry on backends. The returned entries are cached
+ *    until transaction end. If onshot is true, they are not cached and returned
+ *    in a palloc'ed memory.
+ */
+PgStat_StatDBEntry *
+backend_get_db_entry(Oid dbid, bool oneshot)
+{
+    /* take a local snapshot if we don't have one */
+    char *hashname = "local database stats hash";
+
+    /* If not done for this transaction, take a snapshot of global stats */
+    if (!backend_snapshot_global_stats())
+        return NULL;
+
+    return snapshot_statentry(oneshot ? NULL : &snapshot_db_stats,
+                              hashname, db_stats, 0, &dsh_dbparams,
+                              dbid);
+}
+
+/* ----------
+ * backend_snapshot_all_db_entries() -
+ *
+ *    Take a snapshot of all databsae stats at once into returned hash.
+ */
+HTAB *
+backend_snapshot_all_db_entries(void)
+{
+    /* take a local snapshot if we don't have one */
+    char *hashname = "local database stats hash";
+
+    /* If not done for this transaction, take a snapshot of global stats */
+    if (!backend_snapshot_global_stats())
+        return NULL;
+
+    return snapshot_statentry_all(hashname, db_stats, 0, &dsh_dbparams);
+}
+
+/* ----------
+ * backend_get_tab_entry() -
+ *
+ *    Find table stats entry on backends. The returned entries are cached until
+ *    transaction end. If onshot is true, they are not cached and returned in a
+ *    palloc'ed memory.
+ */
+PgStat_StatTabEntry *
+backend_get_tab_entry(PgStat_StatDBEntry *dbent, Oid reloid, bool oneshot)
+{
+    /* take a local snapshot if we don't have one */
+    char *hashname = "local table stats hash";
+    return snapshot_statentry(oneshot ? NULL : &dbent->snapshot_tables,
+                              hashname, NULL, dbent->tables, &dsh_tblparams,
+                              reloid);
+}
+
+/* ----------
+ * backend_get_func_entry() -
+ *
+ *    Find function stats entry on backends. The returned entries are cached
+ *    until transaction end. If onshot is true, they are not cached and returned
+ *    in a palloc'ed memory.
+ */
+static PgStat_StatFuncEntry *
+backend_get_func_etnry(PgStat_StatDBEntry *dbent, Oid funcid, bool oneshot)
+{
+    char *hashname = "local table stats hash";
+    return snapshot_statentry(oneshot ? NULL : &dbent->snapshot_tables,
+                              hashname, NULL, dbent->functions, &dsh_funcparams,
+                              funcid);
+}
 
 /* ----------
  * pgstat_setup_memcxt() -
@@ -5553,6 +5620,8 @@ pgstat_setup_memcxt(void)
 void
 pgstat_clear_snapshot(void)
 {
+    int param = 0;    /* only the address is significant */
+
     /* Release memory, if any was allocated */
     if (pgStatLocalContext)
         MemoryContextDelete(pgStatLocalContext);
@@ -5562,99 +5631,12 @@ pgstat_clear_snapshot(void)
     pgStatDBHash = NULL;
     localBackendStatusTable = NULL;
     localNumBackends = 0;
-}
-
-
-/* ----------
- * pgstat_recv_inquiry() -
- *
- *    Process stat inquiry requests.
- * ----------
- */
-static void
-pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
-{
-    PgStat_StatDBEntry *dbentry;
-
-    elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
 
     /*
-     * If there's already a write request for this DB, there's nothing to do.
-     *
-     * Note that if a request is found, we return early and skip the below
-     * check for clock skew.  This is okay, since the only way for a DB
-     * request to be present in the list is that we have been here since the
-     * last write round.  It seems sufficient to check for clock skew once per
-     * write round.
+     * the parameter inform the function that it is not called from
+     * MemoryContextCallback
      */
-    if (list_member_oid(pending_write_requests, msg->databaseid))
-        return;
-
-    /*
-     * Check to see if we last wrote this database at a time >= the requested
-     * cutoff time.  If so, this is a stale request that was generated before
-     * we updated the DB file, and we don't need to do so again.
-     *
-     * If the requestor's local clock time is older than stats_timestamp, we
-     * should suspect a clock glitch, ie system time going backwards; though
-     * the more likely explanation is just delayed message receipt.  It is
-     * worth expending a GetCurrentTimestamp call to be sure, since a large
-     * retreat in the system clock reading could otherwise cause us to neglect
-     * to update the stats file for a long time.
-     */
-    dbentry = pgstat_get_db_entry(msg->databaseid, false);
-    if (dbentry == NULL)
-    {
-        /*
-         * We have no data for this DB.  Enter a write request anyway so that
-         * the global stats will get updated.  This is needed to prevent
-         * backend_read_statsfile from waiting for data that we cannot supply,
-         * in the case of a new DB that nobody has yet reported any stats for.
-         * See the behavior of pgstat_read_db_statsfile_timestamp.
-         */
-    }
-    else if (msg->clock_time < dbentry->stats_timestamp)
-    {
-        TimestampTz cur_ts = GetCurrentTimestamp();
-
-        if (cur_ts < dbentry->stats_timestamp)
-        {
-            /*
-             * Sure enough, time went backwards.  Force a new stats file write
-             * to get back in sync; but first, log a complaint.
-             */
-            char       *writetime;
-            char       *mytime;
-
-            /* Copy because timestamptz_to_str returns a static buffer */
-            writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
-            mytime = pstrdup(timestamptz_to_str(cur_ts));
-            elog(LOG,
-                 "stats_timestamp %s is later than collector's time %s for database %u",
-                 writetime, mytime, dbentry->databaseid);
-            pfree(writetime);
-            pfree(mytime);
-        }
-        else
-        {
-            /*
-             * Nope, it's just an old request.  Assuming msg's clock_time is
-             * >= its cutoff_time, it must be stale, so we can ignore it.
-             */
-            return;
-        }
-    }
-    else if (msg->cutoff_time <= dbentry->stats_timestamp)
-    {
-        /* Stale request, ignore it */
-        return;
-    }
-
-    /*
-     * We need to write this DB, so create a request.
-     */
-    pending_write_requests = lappend_oid(pending_write_requests,
-                                         msg->databaseid);
+    backend_clean_snapshot_callback(¶m);
 }
 
 
@@ -5667,6 +5649,7 @@ pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
 static void
 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
 {
+    dshash_table *tabhash;
     PgStat_StatDBEntry *dbentry;
     PgStat_StatTabEntry *tabentry;
     int            i;
@@ -5682,6 +5665,7 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
     dbentry->n_block_read_time += msg->m_block_read_time;
     dbentry->n_block_write_time += msg->m_block_write_time;
 
+    tabhash = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
     /*
      * Process all table entries in the message.
      */
@@ -5689,9 +5673,8 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
     {
         PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
 
-        tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
-                                                       (void *) &(tabmsg->t_id),
-                                                       HASH_ENTER, &found);
+        tabentry = (PgStat_StatTabEntry *)
+            dshash_find_or_insert(tabhash, (void *) &(tabmsg->t_id), &found);
 
         if (!found)
         {
@@ -5750,6 +5733,7 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
         tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
         /* Likewise for n_dead_tuples */
         tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
+        dshash_release_lock(tabhash, tabentry);
 
         /*
          * Add per-table stats to the per-database entry, too.
@@ -5762,6 +5746,8 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
         dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
         dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
     }
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 
@@ -5774,27 +5760,33 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
 static void
 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
 {
+    dshash_table *tbl;
     PgStat_StatDBEntry *dbentry;
     int            i;
 
     dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
     /*
      * No need to purge if we don't even know the database.
      */
-    if (!dbentry || !dbentry->tables)
+    if (!dbentry || dbentry->tables == DSM_HANDLE_INVALID)
+    {
+        if (dbentry)
+            dshash_release_lock(db_stats, dbentry);
         return;
+    }
 
+    tbl = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
     /*
      * Process all table entries in the message.
      */
     for (i = 0; i < msg->m_nentries; i++)
     {
         /* Remove from hashtable if present; we don't care if it's not. */
-        (void) hash_search(dbentry->tables,
-                           (void *) &(msg->m_tableid[i]),
-                           HASH_REMOVE, NULL);
+        (void) dshash_delete_key(tbl, (void *) &(msg->m_tableid[i]));
     }
+
+    dshash_release_lock(db_stats, dbentry);
+
 }
 
 
@@ -5820,23 +5812,20 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
      */
     if (dbentry)
     {
-        char        statfile[MAXPGPATH];
+        if (dbentry->tables != DSM_HANDLE_INVALID)
+        {
+            dshash_table *tbl =
+                dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+            dshash_destroy(tbl);
+        }
+        if (dbentry->functions != DSM_HANDLE_INVALID)
+        {
+            dshash_table *tbl =
+                dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+            dshash_destroy(tbl);
+        }
 
-        get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
-
-        elog(DEBUG2, "removing stats file \"%s\"", statfile);
-        unlink(statfile);
-
-        if (dbentry->tables != NULL)
-            hash_destroy(dbentry->tables);
-        if (dbentry->functions != NULL)
-            hash_destroy(dbentry->functions);
-
-        if (hash_search(pgStatDBHash,
-                        (void *) &dbid,
-                        HASH_REMOVE, NULL) == NULL)
-            ereport(ERROR,
-                    (errmsg("database hash table corrupted during cleanup --- abort")));
+        dshash_delete_entry(db_stats, (void *)dbentry);
     }
 }
 
@@ -5864,19 +5853,28 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
      * We simply throw away all the database's table entries by recreating a
      * new hash table for them.
      */
-    if (dbentry->tables != NULL)
-        hash_destroy(dbentry->tables);
-    if (dbentry->functions != NULL)
-        hash_destroy(dbentry->functions);
-
-    dbentry->tables = NULL;
-    dbentry->functions = NULL;
+    if (dbentry->tables != DSM_HANDLE_INVALID)
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+        dshash_destroy(t);
+        dbentry->tables = DSM_HANDLE_INVALID;
+    }
+    if (dbentry->functions != DSM_HANDLE_INVALID)
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+        dshash_destroy(t);
+        dbentry->functions = DSM_HANDLE_INVALID;
+    }
 
     /*
      * Reset database-level stats, too.  This creates empty hash tables for
      * tables and functions.
      */
     reset_dbentry_counters(dbentry);
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -5891,14 +5889,14 @@ pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
     if (msg->m_resettarget == RESET_BGWRITER)
     {
         /* Reset the global background writer statistics for the cluster. */
-        memset(&globalStats, 0, sizeof(globalStats));
-        globalStats.stat_reset_timestamp = GetCurrentTimestamp();
+        memset(&shared_globalStats, 0, sizeof(shared_globalStats));
+        shared_globalStats->stat_reset_timestamp = GetCurrentTimestamp();
     }
     else if (msg->m_resettarget == RESET_ARCHIVER)
     {
         /* Reset the archiver statistics for the cluster. */
-        memset(&archiverStats, 0, sizeof(archiverStats));
-        archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
+        memset(&shared_archiverStats, 0, sizeof(shared_archiverStats));
+        shared_archiverStats->stat_reset_timestamp = GetCurrentTimestamp();
     }
 
     /*
@@ -5928,11 +5926,19 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
 
     /* Remove object if it exists, ignore it if not */
     if (msg->m_resettype == RESET_TABLE)
-        (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
-                           HASH_REMOVE, NULL);
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+        dshash_delete_key(t, (void *) &(msg->m_objectid));
+    }
     else if (msg->m_resettype == RESET_FUNCTION)
-        (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
-                           HASH_REMOVE, NULL);
+    {
+        dshash_table *t =
+            dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
+        dshash_delete_key(t, (void *) &(msg->m_objectid));
+    }
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -5952,6 +5958,8 @@ pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
     dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
 
     dbentry->last_autovac_time = msg->m_start_time;
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -5965,13 +5973,13 @@ pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
 {
     PgStat_StatDBEntry *dbentry;
     PgStat_StatTabEntry *tabentry;
-
+    dshash_table *table;
     /*
      * Store the data in the table's hashtable entry.
      */
     dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
-
-    tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
+    table = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+    tabentry = pgstat_get_tab_entry(table, msg->m_tableoid, true);
 
     tabentry->n_live_tuples = msg->m_live_tuples;
     tabentry->n_dead_tuples = msg->m_dead_tuples;
@@ -5986,6 +5994,9 @@ pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
         tabentry->vacuum_timestamp = msg->m_vacuumtime;
         tabentry->vacuum_count++;
     }
+    dshash_release_lock(table, tabentry);
+    dshash_detach(table);
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -5999,13 +6010,15 @@ pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
 {
     PgStat_StatDBEntry *dbentry;
     PgStat_StatTabEntry *tabentry;
+    dshash_table *table;
 
     /*
      * Store the data in the table's hashtable entry.
      */
     dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
 
-    tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
+    table = dshash_attach(area, &dsh_tblparams, dbentry->tables, 0);
+    tabentry = pgstat_get_tab_entry(table, msg->m_tableoid, true);
 
     tabentry->n_live_tuples = msg->m_live_tuples;
     tabentry->n_dead_tuples = msg->m_dead_tuples;
@@ -6028,6 +6041,9 @@ pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
         tabentry->analyze_timestamp = msg->m_analyzetime;
         tabentry->analyze_count++;
     }
+    dshash_release_lock(table, tabentry);
+    dshash_detach(table);
+    dshash_release_lock(db_stats, dbentry);
 }
 
 
@@ -6043,18 +6059,18 @@ pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
     if (msg->m_failed)
     {
         /* Failed archival attempt */
-        ++archiverStats.failed_count;
-        memcpy(archiverStats.last_failed_wal, msg->m_xlog,
-               sizeof(archiverStats.last_failed_wal));
-        archiverStats.last_failed_timestamp = msg->m_timestamp;
+        ++shared_archiverStats->failed_count;
+        memcpy(shared_archiverStats->last_failed_wal, msg->m_xlog,
+               sizeof(shared_archiverStats->last_failed_wal));
+        shared_archiverStats->last_failed_timestamp = msg->m_timestamp;
     }
     else
     {
         /* Successful archival operation */
-        ++archiverStats.archived_count;
-        memcpy(archiverStats.last_archived_wal, msg->m_xlog,
-               sizeof(archiverStats.last_archived_wal));
-        archiverStats.last_archived_timestamp = msg->m_timestamp;
+        ++shared_archiverStats->archived_count;
+        memcpy(shared_archiverStats->last_archived_wal, msg->m_xlog,
+               sizeof(shared_archiverStats->last_archived_wal));
+        shared_archiverStats->last_archived_timestamp = msg->m_timestamp;
     }
 }
 
@@ -6067,16 +6083,16 @@ pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
 static void
 pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
 {
-    globalStats.timed_checkpoints += msg->m_timed_checkpoints;
-    globalStats.requested_checkpoints += msg->m_requested_checkpoints;
-    globalStats.checkpoint_write_time += msg->m_checkpoint_write_time;
-    globalStats.checkpoint_sync_time += msg->m_checkpoint_sync_time;
-    globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
-    globalStats.buf_written_clean += msg->m_buf_written_clean;
-    globalStats.maxwritten_clean += msg->m_maxwritten_clean;
-    globalStats.buf_written_backend += msg->m_buf_written_backend;
-    globalStats.buf_fsync_backend += msg->m_buf_fsync_backend;
-    globalStats.buf_alloc += msg->m_buf_alloc;
+    shared_globalStats->timed_checkpoints += msg->m_timed_checkpoints;
+    shared_globalStats->requested_checkpoints += msg->m_requested_checkpoints;
+    shared_globalStats->checkpoint_write_time += msg->m_checkpoint_write_time;
+    shared_globalStats->checkpoint_sync_time += msg->m_checkpoint_sync_time;
+    shared_globalStats->buf_written_checkpoints += msg->m_buf_written_checkpoints;
+    shared_globalStats->buf_written_clean += msg->m_buf_written_clean;
+    shared_globalStats->maxwritten_clean += msg->m_maxwritten_clean;
+    shared_globalStats->buf_written_backend += msg->m_buf_written_backend;
+    shared_globalStats->buf_fsync_backend += msg->m_buf_fsync_backend;
+    shared_globalStats->buf_alloc += msg->m_buf_alloc;
 }
 
 /* ----------
@@ -6117,6 +6133,8 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
             dbentry->n_conflict_startup_deadlock++;
             break;
     }
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -6133,6 +6151,8 @@ pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
     dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
 
     dbentry->n_deadlocks++;
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -6150,6 +6170,8 @@ pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
 
     dbentry->n_temp_bytes += msg->m_filesize;
     dbentry->n_temp_files += 1;
+
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -6161,6 +6183,7 @@ pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
 static void
 pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
 {
+    dshash_table *t;
     PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
     PgStat_StatDBEntry *dbentry;
     PgStat_StatFuncEntry *funcentry;
@@ -6169,14 +6192,14 @@ pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
 
     dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
 
+    t = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
     /*
      * Process all function entries in the message.
      */
     for (i = 0; i < msg->m_nentries; i++, funcmsg++)
     {
-        funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
-                                                         (void *) &(funcmsg->f_id),
-                                                         HASH_ENTER, &found);
+        funcentry = (PgStat_StatFuncEntry *)
+            dshash_find_or_insert(t, (void *) &(funcmsg->f_id), &found);
 
         if (!found)
         {
@@ -6197,7 +6220,11 @@ pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
             funcentry->f_total_time += funcmsg->f_total_time;
             funcentry->f_self_time += funcmsg->f_self_time;
         }
+        dshash_release_lock(t, funcentry);
     }
+
+    dshash_detach(t);
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /* ----------
@@ -6209,6 +6236,7 @@ pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
 static void
 pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 {
+    dshash_table *t;
     PgStat_StatDBEntry *dbentry;
     int            i;
 
@@ -6217,60 +6245,20 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
     /*
      * No need to purge if we don't even know the database.
      */
-    if (!dbentry || !dbentry->functions)
+    if (!dbentry || dbentry->functions == DSM_HANDLE_INVALID)
         return;
 
+    t = dshash_attach(area, &dsh_funcparams, dbentry->functions, 0);
     /*
      * Process all function entries in the message.
      */
     for (i = 0; i < msg->m_nentries; i++)
     {
         /* Remove from hashtable if present; we don't care if it's not. */
-        (void) hash_search(dbentry->functions,
-                           (void *) &(msg->m_functionid[i]),
-                           HASH_REMOVE, NULL);
+        dshash_delete_key(t, (void *) &(msg->m_functionid[i]));
     }
-}
-
-/* ----------
- * pgstat_write_statsfile_needed() -
- *
- *    Do we need to write out any stats files?
- * ----------
- */
-static bool
-pgstat_write_statsfile_needed(void)
-{
-    if (pending_write_requests != NIL)
-        return true;
-
-    /* Everything was written recently */
-    return false;
-}
-
-/* ----------
- * pgstat_db_requested() -
- *
- *    Checks whether stats for a particular DB need to be written to a file.
- * ----------
- */
-static bool
-pgstat_db_requested(Oid databaseid)
-{
-    /*
-     * If any requests are outstanding at all, we should write the stats for
-     * shared catalogs (the "database" with OID 0).  This ensures that
-     * backends will see up-to-date stats for shared catalogs, even though
-     * they send inquiry messages mentioning only their own DB.
-     */
-    if (databaseid == InvalidOid && pending_write_requests != NIL)
-        return true;
-
-    /* Search to see if there's an open request to write this database. */
-    if (list_member_oid(pending_write_requests, databaseid))
-        return true;
-
-    return false;
+    dshash_detach(t);
+    dshash_release_lock(db_stats, dbentry);
 }
 
 /*
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 91ae448955..5ff62fa0dc 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -77,9 +77,6 @@ static bool is_checksummed_file(const char *fullpath, const char *filename);
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
 
-/* Relative path of temporary statistics directory */
-static char *statrelpath = NULL;
-
 /*
  * Size of each block sent into the tar stream for larger files.
  */
@@ -121,13 +118,6 @@ static bool noverify_checksums = false;
  */
 static const char *excludeDirContents[] =
 {
-    /*
-     * Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped even
-     * when stats_temp_directory is set because PGSS_TEXT_FILE is always
-     * created there.
-     */
-    PG_STAT_TMP_DIR,
-
     /*
      * It is generally not useful to backup the contents of this directory
      * even if the intention is to restore to another master. See backup.sgml
@@ -223,11 +213,8 @@ perform_base_backup(basebackup_options *opt)
     TimeLineID    endtli;
     StringInfo    labelfile;
     StringInfo    tblspc_map_file = NULL;
-    int            datadirpathlen;
     List       *tablespaces = NIL;
 
-    datadirpathlen = strlen(DataDir);
-
     backup_started_in_recovery = RecoveryInProgress();
 
     labelfile = makeStringInfo();
@@ -254,18 +241,6 @@ perform_base_backup(basebackup_options *opt)
 
         SendXlogRecPtrResult(startptr, starttli);
 
-        /*
-         * Calculate the relative path of temporary statistics directory in
-         * order to skip the files which are located in that directory later.
-         */
-        if (is_absolute_path(pgstat_stat_directory) &&
-            strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
-            statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
-        else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
-            statrelpath = psprintf("./%s", pgstat_stat_directory);
-        else
-            statrelpath = pgstat_stat_directory;
-
         /* Add a node for the base directory at the end */
         ti = palloc0(sizeof(tablespaceinfo));
         ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
@@ -1174,17 +1149,6 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
         if (excludeFound)
             continue;
 
-        /*
-         * Exclude contents of directory specified by statrelpath if not set
-         * to the default (pg_stat_tmp) which is caught in the loop above.
-         */
-        if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
-        {
-            elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
-            size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
-            continue;
-        }
-
         /*
          * We can skip pg_wal, the WAL segments need to be fetched from the
          * WAL archive anyway. But include it as an empty directory anyway, so
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 0c86a581c0..ee30e8a14f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -150,6 +150,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
         size = add_size(size, SyncScanShmemSize());
         size = add_size(size, AsyncShmemSize());
         size = add_size(size, BackendRandomShmemSize());
+        size = add_size(size, StatsShmemSize());
 #ifdef EXEC_BACKEND
         size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -270,6 +271,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
     SyncScanShmemInit();
     AsyncShmemInit();
     BackendRandomShmemInit();
+    StatsShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index a6fda81feb..c46bb8d057 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -521,6 +521,9 @@ RegisterLWLockTranches(void)
     LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
     LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
     LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
+    LWLockRegisterTranche(LWTRANCHE_STATS_DSA, "stats table dsa");
+    LWLockRegisterTranche(LWTRANCHE_STATS_DB, "db stats");
+    LWLockRegisterTranche(LWTRANCHE_STATS_FUNC_TABLE, "table/func stats");
 
     /* Register named tranches. */
     for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ecedb..798af9f168 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock                42
 BackendRandomLock                    43
 LogicalRepWorkerLock                44
 CLogTruncationLock                    45
+StatsLock                            46
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e9f542cfed..a0fa3dac4e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -187,7 +187,6 @@ static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource so
 static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source);
 static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source);
 static void assign_effective_io_concurrency(int newval, void *extra);
-static void assign_pgstat_temp_directory(const char *newval, void *extra);
 static bool check_application_name(char **newval, void **extra, GucSource source);
 static void assign_application_name(const char *newval, void *extra);
 static bool check_cluster_name(char **newval, void **extra, GucSource source);
@@ -3778,17 +3777,6 @@ static struct config_string ConfigureNamesString[] =
         NULL, NULL, NULL
     },
 
-    {
-        {"stats_temp_directory", PGC_SIGHUP, STATS_COLLECTOR,
-            gettext_noop("Writes temporary statistics files to the specified directory."),
-            NULL,
-            GUC_SUPERUSER_ONLY
-        },
-        &pgstat_temp_directory,
-        PG_STAT_TMP_DIR,
-        check_canonical_path, assign_pgstat_temp_directory, NULL
-    },
-
     {
         {"synchronous_standby_names", PGC_SIGHUP, REPLICATION_MASTER,
             gettext_noop("Number of synchronous standbys and list of names of potential synchronous ones."),
@@ -10721,35 +10709,6 @@ assign_effective_io_concurrency(int newval, void *extra)
 #endif                            /* USE_PREFETCH */
 }
 
-static void
-assign_pgstat_temp_directory(const char *newval, void *extra)
-{
-    /* check_canonical_path already canonicalized newval for us */
-    char       *dname;
-    char       *tname;
-    char       *fname;
-
-    /* directory */
-    dname = guc_malloc(ERROR, strlen(newval) + 1);    /* runtime dir */
-    sprintf(dname, "%s", newval);
-
-    /* global stats */
-    tname = guc_malloc(ERROR, strlen(newval) + 12); /* /global.tmp */
-    sprintf(tname, "%s/global.tmp", newval);
-    fname = guc_malloc(ERROR, strlen(newval) + 13); /* /global.stat */
-    sprintf(fname, "%s/global.stat", newval);
-
-    if (pgstat_stat_directory)
-        free(pgstat_stat_directory);
-    pgstat_stat_directory = dname;
-    if (pgstat_stat_tmpname)
-        free(pgstat_stat_tmpname);
-    pgstat_stat_tmpname = tname;
-    if (pgstat_stat_filename)
-        free(pgstat_stat_filename);
-    pgstat_stat_filename = fname;
-}
-
 static bool
 check_application_name(char **newval, void **extra, GucSource source)
 {
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 4e61bc6521..1277740473 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -512,7 +512,6 @@
 #track_io_timing = off
 #track_functions = none            # none, pl, all
 #track_activity_query_size = 1024    # (change requires restart)
-#stats_temp_directory = 'pg_stat_tmp'
 
 
 # - Monitoring -
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index ab5cb7f0c1..f13b2dde6b 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -217,7 +217,6 @@ static const char *const subdirs[] = {
     "pg_replslot",
     "pg_tblspc",
     "pg_stat",
-    "pg_stat_tmp",
     "pg_xact",
     "pg_logical",
     "pg_logical/snapshots",
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index 2211d90c6f..e6f4d30658 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -123,7 +123,7 @@ is_deeply(
 
 # Contents of these directories should not be copied.
 foreach my $dirname (
-    qw(pg_dynshmem pg_notify pg_replslot pg_serial pg_snapshots pg_stat_tmp pg_subtrans)
+    qw(pg_dynshmem pg_notify pg_replslot pg_serial pg_snapshots pg_subtrans)
   )
 {
     is_deeply(
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e97b25bd72..afc1927250 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -13,6 +13,7 @@
 
 #include "datatype/timestamp.h"
 #include "fmgr.h"
+#include "lib/dshash.h"
 #include "libpq/pqcomm.h"
 #include "port/atomics.h"
 #include "portability/instr_time.h"
@@ -30,9 +31,6 @@
 #define PGSTAT_STAT_PERMANENT_FILENAME        "pg_stat/global.stat"
 #define PGSTAT_STAT_PERMANENT_TMPFILE        "pg_stat/global.tmp"
 
-/* Default directory to store temporary statistics data in */
-#define PG_STAT_TMP_DIR        "pg_stat_tmp"
-
 /* Values for track_functions GUC variable --- order is significant! */
 typedef enum TrackFunctionsLevel
 {
@@ -48,7 +46,6 @@ typedef enum TrackFunctionsLevel
 typedef enum StatMsgType
 {
     PGSTAT_MTYPE_DUMMY,
-    PGSTAT_MTYPE_INQUIRY,
     PGSTAT_MTYPE_TABSTAT,
     PGSTAT_MTYPE_TABPURGE,
     PGSTAT_MTYPE_DROPDB,
@@ -216,35 +213,6 @@ typedef struct PgStat_MsgDummy
     PgStat_MsgHdr m_hdr;
 } PgStat_MsgDummy;
 
-
-/* ----------
- * PgStat_MsgInquiry            Sent by a backend to ask the collector
- *                                to write the stats file(s).
- *
- * Ordinarily, an inquiry message prompts writing of the global stats file,
- * the stats file for shared catalogs, and the stats file for the specified
- * database.  If databaseid is InvalidOid, only the first two are written.
- *
- * New file(s) will be written only if the existing file has a timestamp
- * older than the specified cutoff_time; this prevents duplicated effort
- * when multiple requests arrive at nearly the same time, assuming that
- * backends send requests with cutoff_times a little bit in the past.
- *
- * clock_time should be the requestor's current local time; the collector
- * uses this to check for the system clock going backward, but it has no
- * effect unless that occurs.  We assume clock_time >= cutoff_time, though.
- * ----------
- */
-
-typedef struct PgStat_MsgInquiry
-{
-    PgStat_MsgHdr m_hdr;
-    TimestampTz clock_time;        /* observed local clock time */
-    TimestampTz cutoff_time;    /* minimum acceptable file timestamp */
-    Oid            databaseid;        /* requested DB (InvalidOid => shared only) */
-} PgStat_MsgInquiry;
-
-
 /* ----------
  * PgStat_TableEntry            Per-table info in a MsgTabstat
  * ----------
@@ -539,7 +507,6 @@ typedef union PgStat_Msg
 {
     PgStat_MsgHdr msg_hdr;
     PgStat_MsgDummy msg_dummy;
-    PgStat_MsgInquiry msg_inquiry;
     PgStat_MsgTabstat msg_tabstat;
     PgStat_MsgTabpurge msg_tabpurge;
     PgStat_MsgDropdb msg_dropdb;
@@ -601,10 +568,13 @@ typedef struct PgStat_StatDBEntry
 
     /*
      * tables and functions must be last in the struct, because we don't write
-     * the pointers out to the stats file.
+     * the handles and pointers out to the stats file.
      */
-    HTAB       *tables;
-    HTAB       *functions;
+    dshash_table_handle tables;
+    dshash_table_handle functions;
+    /* for snapshot tables */
+    HTAB *snapshot_tables;
+    HTAB *snapshot_functions;
 } PgStat_StatDBEntry;
 
 
@@ -1213,6 +1183,9 @@ extern PgStat_BackendFunctionEntry *find_funcstat_entry(Oid func_id);
 extern void pgstat_initstats(Relation rel);
 
 extern char *pgstat_clip_activity(const char *raw_activity);
+extern PgStat_StatDBEntry *backend_get_db_entry(Oid dbid, bool oneshot);
+extern HTAB *backend_snapshot_all_db_entries(void);
+extern PgStat_StatTabEntry *backend_get_tab_entry(PgStat_StatDBEntry *dbent, Oid relid, bool oneshot);
 
 /* ----------
  * pgstat_report_wait_start() -
@@ -1352,4 +1325,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
 /* Main loop */
 extern void PgstatCollectorMain(void) pg_attribute_noreturn();
 
+extern Size StatsShmemSize(void);
+extern void StatsShmemInit(void);
+
 #endif                            /* PGSTAT_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c21bfe2f66..2cdd10c2fd 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -219,6 +219,9 @@ typedef enum BuiltinTrancheIds
     LWTRANCHE_SHARED_TUPLESTORE,
     LWTRANCHE_TBM,
     LWTRANCHE_PARALLEL_APPEND,
+    LWTRANCHE_STATS_DSA,
+    LWTRANCHE_STATS_DB,
+    LWTRANCHE_STATS_FUNC_TABLE,
     LWTRANCHE_FIRST_USER_DEFINED
 }            BuiltinTrancheIds;
 
-- 
2.16.3

From 0f68698ac32dff2c8fad1984cc2da55b5aac7113 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 29 Jun 2018 16:58:32 +0900
Subject: [PATCH 2/8] Change stats collector to an axiliary process.

Shared memory and LWLocks are required to let stats collector use
dshash. This patch makes stats collector an auxiliary process.
---
 src/backend/bootstrap/bootstrap.c   |   8 ++
 src/backend/postmaster/pgstat.c     | 158 +++++++++++-------------------------
 src/backend/postmaster/postmaster.c |  30 +++----
 src/include/miscadmin.h             |   3 +-
 src/include/pgstat.h                |  11 ++-
 5 files changed, 77 insertions(+), 133 deletions(-)

diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 578af2e66d..ece200877c 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -336,6 +336,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
             case WalReceiverProcess:
                 statmsg = pgstat_get_backend_desc(B_WAL_RECEIVER);
                 break;
+            case StatsCollectorProcess:
+                statmsg = pgstat_get_backend_desc(B_STATS_COLLECTOR);
+                break;
             default:
                 statmsg = "??? process";
                 break;
@@ -470,6 +473,11 @@ AuxiliaryProcessMain(int argc, char *argv[])
             WalReceiverMain();
             proc_exit(1);        /* should never return */
 
+        case StatsCollectorProcess:
+            /* don't set signals, stats collector has its own agenda */
+            PgstatCollectorMain();
+            proc_exit(1);        /* should never return */
+
         default:
             elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
             proc_exit(1);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8a5b2b3b42..999325ae53 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -267,6 +267,7 @@ static List *pending_write_requests = NIL;
 /* Signal handler flags */
 static volatile bool need_exit = false;
 static volatile bool got_SIGHUP = false;
+static volatile bool got_SIGTERM = false;
 
 /*
  * Total time charged to functions so far in the current backend.
@@ -284,8 +285,8 @@ static instr_time total_func_time;
 static pid_t pgstat_forkexec(void);
 #endif
 
-NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
-static void pgstat_exit(SIGNAL_ARGS);
+static void pgstat_shutdown_handler(SIGNAL_ARGS);
+static void pgstat_quickdie_handler(SIGNAL_ARGS);
 static void pgstat_beshutdown_hook(int code, Datum arg);
 static void pgstat_sighup_handler(SIGNAL_ARGS);
 
@@ -688,104 +689,6 @@ pgstat_reset_all(void)
     pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
 }
 
-#ifdef EXEC_BACKEND
-
-/*
- * pgstat_forkexec() -
- *
- * Format up the arglist for, then fork and exec, statistics collector process
- */
-static pid_t
-pgstat_forkexec(void)
-{
-    char       *av[10];
-    int            ac = 0;
-
-    av[ac++] = "postgres";
-    av[ac++] = "--forkcol";
-    av[ac++] = NULL;            /* filled in by postmaster_forkexec */
-
-    av[ac] = NULL;
-    Assert(ac < lengthof(av));
-
-    return postmaster_forkexec(ac, av);
-}
-#endif                            /* EXEC_BACKEND */
-
-
-/*
- * pgstat_start() -
- *
- *    Called from postmaster at startup or after an existing collector
- *    died.  Attempt to fire up a fresh statistics collector.
- *
- *    Returns PID of child process, or 0 if fail.
- *
- *    Note: if fail, we will be called again from the postmaster main loop.
- */
-int
-pgstat_start(void)
-{
-    time_t        curtime;
-    pid_t        pgStatPid;
-
-    /*
-     * Check that the socket is there, else pgstat_init failed and we can do
-     * nothing useful.
-     */
-    if (pgStatSock == PGINVALID_SOCKET)
-        return 0;
-
-    /*
-     * Do nothing if too soon since last collector start.  This is a safety
-     * valve to protect against continuous respawn attempts if the collector
-     * is dying immediately at launch.  Note that since we will be re-called
-     * from the postmaster main loop, we will get another chance later.
-     */
-    curtime = time(NULL);
-    if ((unsigned int) (curtime - last_pgstat_start_time) <
-        (unsigned int) PGSTAT_RESTART_INTERVAL)
-        return 0;
-    last_pgstat_start_time = curtime;
-
-    /*
-     * Okay, fork off the collector.
-     */
-#ifdef EXEC_BACKEND
-    switch ((pgStatPid = pgstat_forkexec()))
-#else
-    switch ((pgStatPid = fork_process()))
-#endif
-    {
-        case -1:
-            ereport(LOG,
-                    (errmsg("could not fork statistics collector: %m")));
-            return 0;
-
-#ifndef EXEC_BACKEND
-        case 0:
-            /* in postmaster child ... */
-            InitPostmasterChild();
-
-            /* Close the postmaster's sockets */
-            ClosePostmasterPorts(false);
-
-            /* Drop our connection to postmaster's shared memory, as well */
-            dsm_detach_all();
-            PGSharedMemoryDetach();
-
-            PgstatCollectorMain(0, NULL);
-            break;
-#endif
-
-        default:
-            return (int) pgStatPid;
-    }
-
-    /* shouldn't get here */
-    return 0;
-}
-
 void
 allow_immediate_pgstat_restart(void)
 {
@@ -2870,6 +2773,9 @@ pgstat_bestart(void)
             case WalReceiverProcess:
                 beentry->st_backendType = B_WAL_RECEIVER;
                 break;
+            case StatsCollectorProcess:
+                beentry->st_backendType = B_STATS_COLLECTOR;
+                break;
             default:
                 elog(FATAL, "unrecognized process type: %d",
                      (int) MyAuxProcType);
@@ -4135,6 +4041,9 @@ pgstat_get_backend_desc(BackendType backendType)
         case B_WAL_WRITER:
             backendDesc = "walwriter";
             break;
+        case B_STATS_COLLECTOR:
+            backendDesc = "stats collector";
+            break;
     }
 
     return backendDesc;
@@ -4252,8 +4161,8 @@ pgstat_send_bgwriter(void)
  *    The argc/argv parameters are valid only in EXEC_BACKEND case.
  * ----------
  */
-NON_EXEC_STATIC void
-PgstatCollectorMain(int argc, char *argv[])
+void
+PgstatCollectorMain(void)
 {
     int            len;
     PgStat_Msg    msg;
@@ -4266,8 +4175,8 @@ PgstatCollectorMain(int argc, char *argv[])
      */
     pqsignal(SIGHUP, pgstat_sighup_handler);
     pqsignal(SIGINT, SIG_IGN);
-    pqsignal(SIGTERM, SIG_IGN);
-    pqsignal(SIGQUIT, pgstat_exit);
+    pqsignal(SIGTERM, pgstat_shutdown_handler);
+    pqsignal(SIGQUIT, pgstat_quickdie_handler);
     pqsignal(SIGALRM, SIG_IGN);
     pqsignal(SIGPIPE, SIG_IGN);
     pqsignal(SIGUSR1, SIG_IGN);
@@ -4312,14 +4221,14 @@ PgstatCollectorMain(int argc, char *argv[])
         /*
          * Quit if we get SIGQUIT from the postmaster.
          */
-        if (need_exit)
+        if (got_SIGTERM)
             break;
 
         /*
          * Inner loop iterates as long as we keep getting messages, or until
          * need_exit becomes set.
          */
-        while (!need_exit)
+        while (!got_SIGTERM)
         {
             /*
              * Reload configuration if we got SIGHUP from the postmaster.
@@ -4507,14 +4416,29 @@ PgstatCollectorMain(int argc, char *argv[])
 
 /* SIGQUIT signal handler for collector process */
 static void
-pgstat_exit(SIGNAL_ARGS)
+pgstat_quickdie_handler(SIGNAL_ARGS)
 {
-    int            save_errno = errno;
+    PG_SETMASK(&BlockSig);
 
-    need_exit = true;
-    SetLatch(MyLatch);
+    /*
+     * We DO NOT want to run proc_exit() callbacks -- we're here because
+     * shared memory may be corrupted, so we don't want to try to clean up our
+     * transaction.  Just nail the windows shut and get out of town.  Now that
+     * there's an atexit callback to prevent third-party code from breaking
+     * things by calling exit() directly, we have to reset the callbacks
+     * explicitly to make this work as intended.
+     */
+    on_exit_reset();
 
-    errno = save_errno;
+    /*
+     * Note we do exit(2) not exit(0).  This is to force the postmaster into a
+     * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+     * backend.  This is necessary precisely because we don't clean up our
+     * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
+     * should ensure the postmaster sees this as a crash, too, but no harm in
+     * being doubly sure.)
+     */
+    exit(2);
 }
 
 /* SIGHUP handler for collector process */
@@ -4529,6 +4453,18 @@ pgstat_sighup_handler(SIGNAL_ARGS)
     errno = save_errno;
 }
 
+static void
+pgstat_shutdown_handler(SIGNAL_ARGS)
+{
+    int save_errno = errno;
+
+    got_SIGTERM = true;
+
+    SetLatch(MyLatch);
+
+    errno = save_errno;
+}
+
 /*
  * Subroutine to clear stats in a database entry
  *
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 305ff36258..b273fa0717 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -144,7 +144,8 @@
 #define BACKEND_TYPE_AUTOVAC    0x0002    /* autovacuum worker process */
 #define BACKEND_TYPE_WALSND        0x0004    /* walsender process */
 #define BACKEND_TYPE_BGWORKER    0x0008    /* bgworker process */
-#define BACKEND_TYPE_ALL        0x000F    /* OR of all the above */
+#define BACKEND_TYPE_STATS        0x0010    /* bgworker process */
+#define BACKEND_TYPE_ALL        0x001F    /* OR of all the above */
 
 #define BACKEND_TYPE_WORKER        (BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER)
 
@@ -550,6 +551,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartCheckpointer()        StartChildProcess(CheckpointerProcess)
 #define StartWalWriter()        StartChildProcess(WalWriterProcess)
 #define StartWalReceiver()        StartChildProcess(WalReceiverProcess)
+#define StartStatsCollector()    StartChildProcess(StatsCollectorProcess)
 
 /* Macros to check exit status of a child process */
 #define EXIT_STATUS_0(st)  ((st) == 0)
@@ -1762,7 +1764,7 @@ ServerLoop(void)
         /* If we have lost the stats collector, try to start a new one */
         if (PgStatPID == 0 &&
             (pmState == PM_RUN || pmState == PM_HOT_STANDBY))
-            PgStatPID = pgstat_start();
+            PgStatPID = StartStatsCollector();
 
         /* If we have lost the archiver, try to start a new one. */
         if (PgArchPID == 0 && PgArchStartupAllowed())
@@ -2880,7 +2882,7 @@ reaper(SIGNAL_ARGS)
             if (PgArchStartupAllowed() && PgArchPID == 0)
                 PgArchPID = pgarch_start();
             if (PgStatPID == 0)
-                PgStatPID = pgstat_start();
+                PgStatPID = StartStatsCollector();
 
             /* workers may be scheduled to start now */
             maybe_start_bgworkers();
@@ -2953,7 +2955,7 @@ reaper(SIGNAL_ARGS)
                  * nothing left for it to do.
                  */
                 if (PgStatPID != 0)
-                    signal_child(PgStatPID, SIGQUIT);
+                    signal_child(PgStatPID, SIGTERM);
             }
             else
             {
@@ -3039,10 +3041,10 @@ reaper(SIGNAL_ARGS)
         {
             PgStatPID = 0;
             if (!EXIT_STATUS_0(exitstatus))
-                LogChildExit(LOG, _("statistics collector process"),
-                             pid, exitstatus);
+                HandleChildCrash(pid, exitstatus,
+                                 _("statistics collector process"));
             if (pmState == PM_RUN || pmState == PM_HOT_STANDBY)
-                PgStatPID = pgstat_start();
+                PgStatPID = StartStatsCollector();
             continue;
         }
 
@@ -3272,7 +3274,7 @@ CleanupBackend(int pid,
 
 /*
  * HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer,
- * walwriter, autovacuum, or background worker.
+ * walwriter, autovacuum, stats collector or background worker.
  *
  * The objectives here are to clean up our local state about the child
  * process, and to signal all other remaining children to quickdie.
@@ -4951,12 +4953,6 @@ SubPostmasterMain(int argc, char *argv[])
 
         PgArchiverMain(argc, argv); /* does not return */
     }
-    if (strcmp(argv[1], "--forkcol") == 0)
-    {
-        /* Do not want to attach to shared memory */
-
-        PgstatCollectorMain(argc, argv);    /* does not return */
-    }
     if (strcmp(argv[1], "--forklog") == 0)
     {
         /* Do not want to attach to shared memory */
@@ -5073,7 +5069,7 @@ sigusr1_handler(SIGNAL_ARGS)
          * Likewise, start other special children as needed.
          */
         Assert(PgStatPID == 0);
-        PgStatPID = pgstat_start();
+        PgStatPID = StartStatsCollector();
 
         ereport(LOG,
                 (errmsg("database system is ready to accept read only connections")));
@@ -5370,6 +5366,10 @@ StartChildProcess(AuxProcType type)
                 ereport(LOG,
                         (errmsg("could not fork WAL receiver process: %m")));
                 break;
+            case StatsCollectorProcess:
+                ereport(LOG,
+                        (errmsg("could not fork stats collector process: %m")));
+                break;
             default:
                 ereport(LOG,
                         (errmsg("could not fork process: %m")));
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 69f356f8cd..433d1ed0eb 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -400,7 +400,7 @@ typedef enum
     CheckpointerProcess,
     WalWriterProcess,
     WalReceiverProcess,
-
+    StatsCollectorProcess,
     NUM_AUXPROCTYPES            /* Must be last! */
 } AuxProcType;
 
@@ -412,6 +412,7 @@ extern AuxProcType MyAuxProcType;
 #define AmCheckpointerProcess()        (MyAuxProcType == CheckpointerProcess)
 #define AmWalWriterProcess()        (MyAuxProcType == WalWriterProcess)
 #define AmWalReceiverProcess()        (MyAuxProcType == WalReceiverProcess)
+#define AmStatsCollectorProcess()    (MyAuxProcType == StatsCollectorProcess)
 
 
 /*****************************************************************************
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index d59c24ae23..e97b25bd72 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -710,7 +710,8 @@ typedef enum BackendType
     B_STARTUP,
     B_WAL_RECEIVER,
     B_WAL_SENDER,
-    B_WAL_WRITER
+    B_WAL_WRITER,
+    B_STATS_COLLECTOR
 } BackendType;
 
 
@@ -1160,11 +1161,6 @@ extern int    pgstat_start(void);
 extern void pgstat_reset_all(void);
 extern void allow_immediate_pgstat_restart(void);
 
-#ifdef EXEC_BACKEND
-extern void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
-#endif
-
-
 /* ----------
  * Functions called from backends
  * ----------
@@ -1353,4 +1349,7 @@ extern int    pgstat_fetch_stat_numbackends(void);
 extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 
+/* Main loop */
+extern void PgstatCollectorMain(void) pg_attribute_noreturn();
+
 #endif                            /* PGSTAT_H */
-- 
2.16.3

From 0a9053c6d54c5b80649504d7192fe5fd772110c4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 29 Jun 2018 16:41:04 +0900
Subject: [PATCH 1/8] sequential scan for dshash

Add sequential scan feature to dshash.
---
 src/backend/lib/dshash.c | 138 +++++++++++++++++++++++++++++++++++++++++++++++
 src/include/lib/dshash.h |  23 +++++++-
 2 files changed, 160 insertions(+), 1 deletion(-)

diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c
index b46f7c4cfd..5b133226ac 100644
--- a/src/backend/lib/dshash.c
+++ b/src/backend/lib/dshash.c
@@ -592,6 +592,144 @@ dshash_memhash(const void *v, size_t size, void *arg)
     return tag_hash(v, size);
 }
 
+/*
+ * dshash_seq_init/_next/_term
+ *           Sequentially scan trhough dshash table and return all the
+ *           elements one by one, return NULL when no more.
+ *
+ * dshash_seq_term should be called if and only if the scan is abandoned
+ * before completion; if dshash_seq_next returns NULL then it has already done
+ * the end-of-scan cleanup.
+ *
+ * On returning element, it is locked as is the case with dshash_find.
+ * However, the caller must not release the lock. The lock is released as
+ * necessary in continued scan.
+ *
+ * As opposed to the equivalent for dynanash, the caller is not supposed to
+ * delete the returned element before continuing the scan.
+ *
+ * If consistent is set for dshash_seq_init, the whole hash table is
+ * non-exclusively locked. Otherwise a part of the hash table is locked in the
+ * same mode (partition lock).
+ */
+void
+dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
+                bool consistent)
+{
+    status->hash_table = hash_table;
+    status->curbucket = 0;
+    status->nbuckets = ((size_t) 1) << hash_table->control->size_log2;
+    status->curitem = NULL;
+    status->curpartition = -1;
+    status->consistent = consistent;
+
+    /*
+     * Protect all partitions from modification if the caller wants a
+     * consistent result.
+     */
+    if (consistent)
+    {
+        int i;
+
+        for (i = 0; i < DSHASH_NUM_PARTITIONS; ++i)
+        {
+            Assert(!LWLockHeldByMe(PARTITION_LOCK(hash_table, i)));
+
+            LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
+        }
+    }
+    ensure_valid_bucket_pointers(hash_table);
+}
+
+void *
+dshash_seq_next(dshash_seq_status *status)
+{
+    dsa_pointer next_item_pointer;
+
+    if (status->curitem == NULL)
+    {
+        Assert (status->curbucket == 0);
+        Assert(!status->hash_table->find_locked);
+
+        /* first shot. grab the first item. */
+        next_item_pointer = status->hash_table->buckets[status->curbucket];
+        status->hash_table->find_locked = true;
+    }
+    else
+        next_item_pointer = status->curitem->next;
+
+    /* Move to the next bucket if we finished the current bucket */
+    while (!DsaPointerIsValid(next_item_pointer))
+    {
+        if (++status->curbucket >= status->nbuckets)
+        {
+            /* all buckets have been scanned. finsih. */
+            dshash_seq_term(status);
+            return NULL;
+        }
+        Assert(status->hash_table->find_locked);
+
+        next_item_pointer = status->hash_table->buckets[status->curbucket];
+
+        /*
+         * we need a lock on the scanning partition even if the caller don't
+         * requested a consistent snapshot.
+         */
+        if (!status->consistent && DsaPointerIsValid(next_item_pointer))
+        {
+            dshash_table_item  *item = dsa_get_address(status->hash_table->area,
+                                                       next_item_pointer);
+            int next_partition = PARTITION_FOR_HASH(item->hash);
+            if (status->curpartition != next_partition)
+            {
+                if (status->curpartition >= 0)
+                    LWLockRelease(PARTITION_LOCK(status->hash_table,
+                                                 status->curpartition));
+                LWLockAcquire(PARTITION_LOCK(status->hash_table,
+                                             next_partition),
+                              LW_SHARED);
+                status->curpartition = next_partition;
+            }
+        }
+    }
+
+    status->curitem =
+        dsa_get_address(status->hash_table->area, next_item_pointer);
+    return ENTRY_FROM_ITEM(status->curitem);
+}
+
+void
+dshash_seq_term(dshash_seq_status *status)
+{
+    Assert(status->hash_table->find_locked);
+    status->hash_table->find_locked = false;
+
+    if (status->consistent)
+    {
+        int i;
+
+        for (i = 0; i < DSHASH_NUM_PARTITIONS; ++i)
+            LWLockRelease(PARTITION_LOCK(status->hash_table, i));
+    }
+    else if (status->curpartition >= 0)
+        LWLockRelease(PARTITION_LOCK(status->hash_table, status->curpartition));
+}
+
+int
+dshash_get_num_entries(dshash_table *hash_table)
+{
+    /* a shotcut implement. should be improved  */
+    dshash_seq_status s;
+    void *p;
+    int n = 0;
+
+    dshash_seq_init(&s, hash_table, false);
+    while ((p = dshash_seq_next(&s)) != NULL)
+        n++;
+
+    return n;
+}
+
 /*
  * Print debugging information about the internal state of the hash table to
  * stderr.  The caller must hold no partition locks.
diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h
index 8c733bfe25..8598d9ed84 100644
--- a/src/include/lib/dshash.h
+++ b/src/include/lib/dshash.h
@@ -15,6 +15,7 @@
 #define DSHASH_H
 
 #include "utils/dsa.h"
+#include "utils/hsearch.h"
 
 /* The opaque type representing a hash table. */
 struct dshash_table;
@@ -59,6 +60,21 @@ typedef struct dshash_parameters
 struct dshash_table_item;
 typedef struct dshash_table_item dshash_table_item;
 
+/*
+ * Sequential scan state of dshash. The detail is exposed since the storage
+ * size should be known to users but it should be considered as an opaque
+ * type by callers.
+ */
+typedef struct dshash_seq_status
+{
+    dshash_table       *hash_table;
+    int                    curbucket;
+    int                    nbuckets;
+    dshash_table_item  *curitem;
+    int                    curpartition;
+    bool                consistent;
+} dshash_seq_status;
+
 /* Creating, sharing and destroying from hash tables. */
 extern dshash_table *dshash_create(dsa_area *area,
               const dshash_parameters *params,
@@ -70,7 +86,6 @@ extern dshash_table *dshash_attach(dsa_area *area,
 extern void dshash_detach(dshash_table *hash_table);
 extern dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table);
 extern void dshash_destroy(dshash_table *hash_table);
-
 /* Finding, creating, deleting entries. */
 extern void *dshash_find(dshash_table *hash_table,
             const void *key, bool exclusive);
@@ -80,6 +95,12 @@ extern bool dshash_delete_key(dshash_table *hash_table, const void *key);
 extern void dshash_delete_entry(dshash_table *hash_table, void *entry);
 extern void dshash_release_lock(dshash_table *hash_table, void *entry);
 
+/* seq scan support */
+extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
+                            bool exclusive);
+extern void *dshash_seq_next(dshash_seq_status *status);
+extern void dshash_seq_term(dshash_seq_status *status);
+extern int dshash_get_num_entries(dshash_table *hash_table);
 /* Convenience hash and compare functions wrapping memcmp and tag_hash. */
 extern int    dshash_memcmp(const void *a, const void *b, size_t size, void *arg);
 extern dshash_hash dshash_memhash(const void *v, size_t size, void *arg);
-- 
2.16.3


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Michael Paquier
Дата:
Сообщение: Re: Problem while setting the fpw with SIGHUP
Следующее
От: Dean Rasheed
Дата:
Сообщение: Re: BUG #15307: Low numerical precision of (Co-) Variance