Re: Race conditions with checkpointer and shutdown

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Race conditions with checkpointer and shutdown
Дата
Msg-id 2766.1556413011@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: Race conditions with checkpointer and shutdown  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: Race conditions with checkpointer and shutdown  (Thomas Munro <thomas.munro@gmail.com>)
Re: Race conditions with checkpointer and shutdown  (Andres Freund <andres@anarazel.de>)
Re: Race conditions with checkpointer and shutdown  (Ashwin Agrawal <aagrawal@pivotal.io>)
Список pgsql-hackers
I have spent a fair amount of time trying to replicate these failures
locally, with little success.  I now think that the most promising theory
is Munro's idea in [1] that the walreceiver is hanging up during its
unsafe attempt to do ereport(FATAL) from inside a signal handler.  It's
extremely plausible that that could result in a deadlock inside libc's
malloc/free, or some similar place.  Moreover, if that's what's causing
it, then the windows for trouble are fixed by the length of time that
malloc might hold internal locks, which fits with the results I've gotten
that inserting delays in various promising-looking places doesn't do a
thing towards making this reproducible.

Even if that isn't the proximate cause of the current reports, it's
clearly trouble waiting to happen, and we should get rid of it.
Accordingly, see attached proposed patch.  This just flushes the
"immediate interrupt" stuff in favor of making sure that
libpqwalreceiver.c will take care of any signals received while
waiting for input.

The existing code does not use PQsetnonblocking, which means that it's
theoretically at risk of blocking while pushing out data to the remote
server.  In practice I think that risk is negligible because (IIUC) we
don't send very large amounts of data at one time.  So I didn't bother to
change that.  Note that for the most part, if that happened, the existing
code was at risk of slow response to SIGTERM anyway since it didn't have
Enable/DisableWalRcvImmediateExit around the places that send data.

My thought is to apply this only to HEAD for now; it's kind of a large
change to shove into the back branches to handle a failure mode that's
not been reported from the field.  Maybe we could back-patch after we
have more confidence in it.

            regards, tom lane

[1] https://www.postgresql.org/message-id/CA%2BhUKG%2B%3D1G98m61VjNS-qGboJPwdZcF%2BrAPu2eC4XuWRTR3UPw%40mail.gmail.com

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7123d41..765d58d 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -99,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {

 /* Prototypes for private functions */
 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
+static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);

 /*
@@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
         if (rc & WL_LATCH_SET)
         {
             ResetLatch(MyLatch);
-            CHECK_FOR_INTERRUPTS();
+            ProcessWalRcvInterrupts();
         }

         /* If socket is ready, advance the libpq state machine */
@@ -456,6 +457,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 {
     PGresult   *res;

+    /*
+     * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+     * block, but the risk seems small.
+     */
     if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
         PQflush(conn->streamConn))
         ereport(ERROR,
@@ -472,7 +477,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
      * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
      * also possible in case we aborted the copy in mid-stream.
      */
-    res = PQgetResult(conn->streamConn);
+    res = libpqrcv_PQgetResult(conn->streamConn);
     if (PQresultStatus(res) == PGRES_TUPLES_OK)
     {
         /*
@@ -486,7 +491,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
         PQclear(res);

         /* the result set should be followed by CommandComplete */
-        res = PQgetResult(conn->streamConn);
+        res = libpqrcv_PQgetResult(conn->streamConn);
     }
     else if (PQresultStatus(res) == PGRES_COPY_OUT)
     {
@@ -499,7 +504,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
                             pchomp(PQerrorMessage(conn->streamConn)))));

         /* CommandComplete should follow */
-        res = PQgetResult(conn->streamConn);
+        res = libpqrcv_PQgetResult(conn->streamConn);
     }

     if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -509,7 +514,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
     PQclear(res);

     /* Verify that there are no more results */
-    res = PQgetResult(conn->streamConn);
+    res = libpqrcv_PQgetResult(conn->streamConn);
     if (res != NULL)
         ereport(ERROR,
                 (errmsg("unexpected result after CommandComplete: %s",
@@ -572,12 +577,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * The function is modeled on PQexec() in libpq, but only implements
  * those parts that are in use in the walreceiver api.
  *
- * Queries are always executed on the connection in streamConn.
+ * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
 libpqrcv_PQexec(PGconn *streamConn, const char *query)
 {
-    PGresult   *result = NULL;
     PGresult   *lastResult = NULL;

     /*
@@ -588,60 +592,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
      */

     /*
-     * Submit a query. Since we don't use non-blocking mode, this also can
-     * block. But its risk is relatively small, so we ignore that for now.
+     * Submit the query.  Since we don't use non-blocking mode, this could
+     * theoretically block.  In practice, since we don't send very long query
+     * strings, the risk seems negligible.
      */
     if (!PQsendQuery(streamConn, query))
         return NULL;

     for (;;)
     {
-        /*
-         * Receive data until PQgetResult is ready to get the result without
-         * blocking.
-         */
-        while (PQisBusy(streamConn))
-        {
-            int            rc;
-
-            /*
-             * We don't need to break down the sleep into smaller increments,
-             * since we'll get interrupted by signals and can either handle
-             * interrupts here or elog(FATAL) within SIGTERM signal handler if
-             * the signal arrives in the middle of establishment of
-             * replication connection.
-             */
-            rc = WaitLatchOrSocket(MyLatch,
-                                   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-                                   WL_LATCH_SET,
-                                   PQsocket(streamConn),
-                                   0,
-                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-            /* Interrupted? */
-            if (rc & WL_LATCH_SET)
-            {
-                ResetLatch(MyLatch);
-                CHECK_FOR_INTERRUPTS();
-            }
+        /* Wait for, and collect, the next PGresult. */
+        PGresult   *result;

-            /* Consume whatever data is available from the socket */
-            if (PQconsumeInput(streamConn) == 0)
-            {
-                /* trouble; drop whatever we had and return NULL */
-                PQclear(lastResult);
-                return NULL;
-            }
-        }
+        result = libpqrcv_PQgetResult(streamConn);
+        if (result == NULL)
+            break;                /* query is complete, or failure */

         /*
          * Emulate PQexec()'s behavior of returning the last result when there
          * are many.  We are fine with returning just last error message.
          */
-        result = PQgetResult(streamConn);
-        if (result == NULL)
-            break;                /* query is complete */
-
         PQclear(lastResult);
         lastResult = result;

@@ -656,6 +626,51 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 }

 /*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static PGresult *
+libpqrcv_PQgetResult(PGconn *streamConn)
+{
+    /*
+     * Collect data until PQgetResult is ready to get the result without
+     * blocking.
+     */
+    while (PQisBusy(streamConn))
+    {
+        int            rc;
+
+        /*
+         * We don't need to break down the sleep into smaller increments,
+         * since we'll get interrupted by signals and can handle any
+         * interrupts here.
+         */
+        rc = WaitLatchOrSocket(MyLatch,
+                               WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
+                               WL_LATCH_SET,
+                               PQsocket(streamConn),
+                               0,
+                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+
+        /* Interrupted? */
+        if (rc & WL_LATCH_SET)
+        {
+            ResetLatch(MyLatch);
+            ProcessWalRcvInterrupts();
+        }
+
+        /* Consume whatever data is available from the socket */
+        if (PQconsumeInput(streamConn) == 0)
+        {
+            /* trouble; return NULL */
+            return NULL;
+        }
+    }
+
+    /* Now we can collect and return the next PGresult */
+    return PQgetResult(streamConn);
+}
+
+/*
  * Disconnect connection to primary, if any.
  */
 static void
@@ -716,13 +731,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
     {
         PGresult   *res;

-        res = PQgetResult(conn->streamConn);
+        res = libpqrcv_PQgetResult(conn->streamConn);
         if (PQresultStatus(res) == PGRES_COMMAND_OK)
         {
             PQclear(res);

             /* Verify that there are no more results. */
-            res = PQgetResult(conn->streamConn);
+            res = libpqrcv_PQgetResult(conn->streamConn);
             if (res != NULL)
             {
                 PQclear(res);
@@ -886,7 +901,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
     {
         char       *cstrs[MaxTupleAttributeNumber];

-        CHECK_FOR_INTERRUPTS();
+        ProcessWalRcvInterrupts();

         /* Do the allocations in temporary context. */
         oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f32cf91..7f4bd56 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -112,28 +112,7 @@ static struct
 static StringInfoData reply_message;
 static StringInfoData incoming_message;

-/*
- * About SIGTERM handling:
- *
- * We can't just exit(1) within SIGTERM signal handler, because the signal
- * might arrive in the middle of some critical operation, like while we're
- * holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking operations
- * like libpqrcv_PQexec(), which can take a long time to finish.
- *
- * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
- * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
- * sets got_SIGTERM flag, which is checked in the main loop when convenient.
- *
- * This is very much like what regular backends do with ImmediateInterruptOK,
- * ProcessInterrupts() etc.
- */
-static volatile bool WalRcvImmediateInterruptOK = false;
-
 /* Prototypes for private functions */
-static void ProcessWalRcvInterrupts(void);
-static void EnableWalRcvImmediateExit(void);
-static void DisableWalRcvImmediateExit(void);
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
@@ -151,7 +130,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);


-static void
+/*
+ * Process any interrupts the walreceiver process may have received.
+ * This should be called any time the process's latch has become set.
+ *
+ * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
+ * SIGTERM signal handler, because the signal might arrive in the middle of
+ * some critical operation, like while we're holding a spinlock.  Instead, the
+ * signal handler sets a flag variable as well as setting the process's latch.
+ * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
+ * latch has become set.  Operations that could block for a long time, such as
+ * reading from a remote server, must pay attention to the latch too; see
+ * libpqrcv_PQexec for example.
+ */
+void
 ProcessWalRcvInterrupts(void)
 {
     /*
@@ -163,26 +155,12 @@ ProcessWalRcvInterrupts(void)

     if (got_SIGTERM)
     {
-        WalRcvImmediateInterruptOK = false;
         ereport(FATAL,
                 (errcode(ERRCODE_ADMIN_SHUTDOWN),
                  errmsg("terminating walreceiver process due to administrator command")));
     }
 }

-static void
-EnableWalRcvImmediateExit(void)
-{
-    WalRcvImmediateInterruptOK = true;
-    ProcessWalRcvInterrupts();
-}
-
-static void
-DisableWalRcvImmediateExit(void)
-{
-    WalRcvImmediateInterruptOK = false;
-    ProcessWalRcvInterrupts();
-}

 /* Main entry point for walreceiver process */
 void
@@ -292,12 +270,10 @@ WalReceiverMain(void)
     PG_SETMASK(&UnBlockSig);

     /* Establish the connection to the primary for XLOG streaming */
-    EnableWalRcvImmediateExit();
     wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
     if (!wrconn)
         ereport(ERROR,
                 (errmsg("could not connect to the primary server: %s", err)));
-    DisableWalRcvImmediateExit();

     /*
      * Save user-visible connection string.  This clobbers the original
@@ -336,7 +312,6 @@ WalReceiverMain(void)
          * Check that we're connected to a valid server using the
          * IDENTIFY_SYSTEM replication command.
          */
-        EnableWalRcvImmediateExit();
         primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);

         snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
@@ -348,7 +323,6 @@ WalReceiverMain(void)
                      errdetail("The primary's identifier is %s, the standby's identifier is %s.",
                                primary_sysid, standby_sysid)));
         }
-        DisableWalRcvImmediateExit();

         /*
          * Confirm that the current timeline of the primary is the same or
@@ -509,6 +483,8 @@ WalReceiverMain(void)
                 if (rc & WL_LATCH_SET)
                 {
                     ResetLatch(walrcv->latch);
+                    ProcessWalRcvInterrupts();
+
                     if (walrcv->force_reply)
                     {
                         /*
@@ -577,9 +553,7 @@ WalReceiverMain(void)
              * The backend finished streaming. Exit streaming COPY-mode from
              * our side, too.
              */
-            EnableWalRcvImmediateExit();
             walrcv_endstreaming(wrconn, &primaryTLI);
-            DisableWalRcvImmediateExit();

             /*
              * If the server had switched to a new timeline that we didn't
@@ -726,9 +700,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
                     (errmsg("fetching timeline history file for timeline %u from primary server",
                             tli)));

-            EnableWalRcvImmediateExit();
             walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
-            DisableWalRcvImmediateExit();

             /*
              * Check that the filename on the master matches what we
@@ -805,7 +777,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
     errno = save_errno;
 }

-/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
+/* SIGTERM: set flag for ProcessWalRcvInterrupts */
 static void
 WalRcvShutdownHandler(SIGNAL_ARGS)
 {
@@ -816,10 +788,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
     if (WalRcv->latch)
         SetLatch(WalRcv->latch);

-    /* Don't joggle the elbow of proc_exit */
-    if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
-        ProcessWalRcvInterrupts();
-
     errno = save_errno;
 }

diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 33e89ca..7f2927c 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -302,6 +302,7 @@ walrcv_clear_result(WalRcvExecResult *walres)

 /* prototypes for functions in walreceiver.c */
 extern void WalReceiverMain(void) pg_attribute_noreturn();
+extern void ProcessWalRcvInterrupts(void);

 /* prototypes for functions in walreceiverfuncs.c */
 extern Size WalRcvShmemSize(void);

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Peter Geoghegan
Дата:
Сообщение: Re: Improve search for missing parent downlinks in amcheck
Следующее
От: Peter Geoghegan
Дата:
Сообщение: Re: Improve search for missing parent downlinks in amcheck