Redesign of parallel dump/restore's response to SIGINT

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Redesign of parallel dump/restore's response to SIGINT
Дата
Msg-id 7005.1464657274@sss.pgh.pa.us
обсуждение исходный текст
Список pgsql-hackers
There are several things that are not very good about the handling of
cancel interrupts (control-C etc) in parallel pg_dump/pg_restore.
Currently, the signal handler just sets a "wantAbort" flag which is
tested in checkAborting().  That means:

1. There had better be a checkAborting() call in every potentially
long-running loop in pg_dump or pg_restore.  I already fixed two such
oversights yesterday in commit 3c8aa6654, but I have little confidence
that there aren't more omissions of that ilk, and none whatsoever that
we won't introduce more of them in the future.

2. If we're not executing code on the client side, but just waiting for
a SQL query to finish, this infrastructure fails to move things along at
all.  That's not a huge problem for pg_dump, nor for data transfer in
pg_restore.  But operations such as a long-running CREATE INDEX in
pg_restore will not get cancelled, and IMV that *is* a problem.

3. On Unix, the key reason the current code manages to produce early exit
in parallel mode is that if you run pg_dump or pg_restore manually, and
type control-C at the console, the SIGINT will be delivered to all members
of the terminal process group; that is, all the worker processes along
with the master.  It's not hard to envision use-cases in which someone
tries to SIGINT just the master.  There's a kluge in the current code that
tries to deal with that case by checking wantAbort in select_loop(), but
that only helps if the master is waiting for workers at the instant the
SIGINT arrives.  If it's someplace else, it will fail to notice wantAbort
till after the next worker finishes, which might be a long time.

4. On Windows, there's no signal handler and so no designed response to
control-C at all.  The pg_dump or pg_restore process just exits, leaving
the connected backend(s) to clean up.  This isn't too awful ... except in
the long-running-CREATE-INDEX case.

Point 2 is basically unfixable without a redesign of pg_dump/pg_restore's
interrupt handling ... so attached is one.  The key idea here is to make
the signal handler itself send a PQcancel request, and then exit(1),
rather than trusting the mainline code to notice the signal anytime soon.
So we can get rid of checkAborting() and thus point 1 goes away too.
Point 3 is dealt with by teaching the master's signal handler to also send
SIGTERM to all the workers.  (Each worker PQcancel's its own connection
and then exits.)  Point 4 is dealt with by introducing a console interrupt
handler (code stolen from psql), which unlike the Unix case can directly
send all the required PQcancels, and then just let the process exit.

We need a bit of notational complexity to ensure that the signal handler
or console interrupt handler thread can't see an inconsistent state; but
we already had some critical-section code in place for that, so it doesn't
get much worse than before.

This patch fixes a few other minor bugs too, for instance that on Windows
we never bothered to set a ParallelSlot's args->AH, so that the intended
DisconnectDatabase call in a worker's archive_close_connection callback
never happened, and thus even the weak existing provision for sending a
PQcancel didn't work.

I've done a fair amount of testing of this behavior on Unix, including
strace'ing the run to make sure it did what I expected.  One thing
I noticed while doing that is that in worker processes, DisconnectDatabase
tends to send a cancel request before exiting, even if nothing went wrong.
This is at least a waste of cycles, and could lead to unexpected log
messages, or maybe even data loss if it happened in pg_restore.  I dug
into it and found that the reason is that after a COPY step, pg_dump was
leaving libpq in PGASYNC_BUSY state, causing PQtransactionStatus to report
PQTRANS_ACTIVE.  That's normally harmless because the next PQexec() will
silently clear the PGASYNC_BUSY state; but in a parallel worker we might
exit without any additional commands after a COPY.  The attached patch
adds an extra PQgetResult() call after a COPY to allow libpq to return to
PGASYNC_IDLE state.

I have not, however, tested the Windows side of this at all.

I think this is a bug fix and should be back-patched, but I'm not going
to risk committing it without review and testing of the Windows version.
Any volunteers?

            regards, tom lane

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 4cf0935..e0ef9cd 100644
*** a/src/bin/pg_dump/compress_io.c
--- b/src/bin/pg_dump/compress_io.c
***************
*** 54,60 ****
  #include "postgres_fe.h"

  #include "compress_io.h"
- #include "parallel.h"
  #include "pg_backup_utils.h"

  /*----------------------
--- 54,59 ----
*************** void
*** 184,192 ****
  WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
                     const void *data, size_t dLen)
  {
-     /* Are we aborting? */
-     checkAborting(AH);
-
      switch (cs->comprAlg)
      {
          case COMPR_ALG_LIBZ:
--- 183,188 ----
*************** ReadDataFromArchiveZlib(ArchiveHandle *A
*** 351,359 ****
      /* no minimal chunk size for zlib */
      while ((cnt = readF(AH, &buf, &buflen)))
      {
-         /* Are we aborting? */
-         checkAborting(AH);
-
          zp->next_in = (void *) buf;
          zp->avail_in = cnt;

--- 347,352 ----
*************** ReadDataFromArchiveNone(ArchiveHandle *A
*** 414,422 ****

      while ((cnt = readF(AH, &buf, &buflen)))
      {
-         /* Are we aborting? */
-         checkAborting(AH);
-
          ahwrite(buf, 1, cnt, AH);
      }

--- 407,412 ----
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index e9e8698..414c101 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 83,91 ****
   */
  typedef struct
  {
!     ArchiveHandle *AH;
!     int            pipeRead;
!     int            pipeWrite;
  } WorkerInfo;

  /* Windows implementation of pipe access */
--- 83,90 ----
   */
  typedef struct
  {
!     ArchiveHandle *AH;            /* master database connection */
!     ParallelSlot *slot;            /* this worker's parallel slot */
  } WorkerInfo;

  /* Windows implementation of pipe access */
*************** static int    piperead(int s, char *buf, in
*** 95,103 ****

  #else                            /* !WIN32 */

- /* Signal handler flag */
- static volatile sig_atomic_t wantAbort = 0;
-
  /* Non-Windows implementation of pipe access */
  #define pgpipe(a)            pipe(a)
  #define piperead(a,b,c)        read(a,b,c)
--- 94,99 ----
*************** typedef struct ShutdownInformation
*** 116,125 ****

  static ShutdownInformation shutdown_info;

  #ifdef WIN32
  /* file-scope variables */
- static unsigned int tMasterThreadId = 0;
- static HANDLE termEvent = INVALID_HANDLE_VALUE;
  static DWORD tls_index;

  /* globally visible variables (needed by exit_nicely) */
--- 112,148 ----

  static ShutdownInformation shutdown_info;

+ /*
+  * State info for signal handling.
+  * We assume signal_info initializes to zeroes.
+  *
+  * On Unix, ourAH is the master DB connection in the master process, and the
+  * worker's own connection in worker processes.  On Windows, we have only one
+  * instance of signal_info, so ourAH is the master connection and the worker
+  * connections must be dug out of pstate->parallelSlot[].
+  */
+ typedef struct DumpSignalInformation
+ {
+     ArchiveHandle *ourAH;        /* database connection to issue cancel for */
+     ParallelState *pstate;        /* parallel state, if any */
+     bool        handler_set;    /* signal handler set up in this process? */
+ #ifndef WIN32
+     bool        am_worker;        /* am I a worker process? */
+ #endif
+ } DumpSignalInformation;
+
+ static volatile DumpSignalInformation signal_info;
+
+ #ifdef WIN32
+ static CRITICAL_SECTION signal_info_lock;
+ #endif
+
+ /* Used from signal handlers, no buffering */
+ #define write_stderr(str)    write(fileno(stderr), str, strlen(str))
+
+
  #ifdef WIN32
  /* file-scope variables */
  static DWORD tls_index;

  /* globally visible variables (needed by exit_nicely) */
*************** static ParallelSlot *GetMyPSlot(Parallel
*** 134,140 ****
  static void archive_close_connection(int code, void *arg);
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
! static void RunWorker(ArchiveHandle *AH, int pipefd[2]);
  static bool HasEveryWorkerTerminated(ParallelState *pstate);
  static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
  static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
--- 157,166 ----
  static void archive_close_connection(int code, void *arg);
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
! static void setup_cancel_handler(void);
! static void set_cancel_pstate(ParallelState *pstate);
! static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
! static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
  static bool HasEveryWorkerTerminated(ParallelState *pstate);
  static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
  static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
*************** archive_close_connection(int code, void
*** 291,303 ****
          if (!slot)
          {
              /*
!              * We're the master.  Close our own database connection, if any,
!              * and then forcibly shut down workers.
               */
              if (si->AHX)
                  DisconnectDatabase(si->AHX);
-
-             ShutdownWorkersHard(si->pstate);
          }
          else
          {
--- 317,329 ----
          if (!slot)
          {
              /*
!              * We're the master.  Forcibly shut down workers, then close our
!              * own database connection, if any.
               */
+             ShutdownWorkersHard(si->pstate);
+
              if (si->AHX)
                  DisconnectDatabase(si->AHX);
          }
          else
          {
*************** archive_close_connection(int code, void
*** 327,359 ****
  }

  /*
-  * Check to see if we've been told to abort, and exit the process/thread if
-  * so.  We don't print any error message; that would just clutter the screen.
-  *
-  * If we have one worker that terminates for some reason, we'd like the other
-  * threads to terminate as well (and not finish with their 70 GB table dump
-  * first...).  In Unix, the master sends SIGTERM and the worker's signal
-  * handler sets wantAbort to 1.  In Windows we set a termEvent and this serves
-  * as the signal for worker threads to exit.  Note that while we check this
-  * fairly frequently during data transfers, an idle worker doesn't come here
-  * at all, so additional measures are needed to force shutdown.
-  *
-  * XXX in parallel restore, slow server-side operations like CREATE INDEX
-  * are not interrupted by anything we do here.  This needs more work.
-  */
- void
- checkAborting(ArchiveHandle *AH)
- {
- #ifdef WIN32
-     if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
- #else
-     if (wantAbort)
- #endif
-         exit_nicely(1);
- }
-
- /*
   * Forcibly shut down any remaining workers, waiting for them to finish.
   */
  static void
  ShutdownWorkersHard(ParallelState *pstate)
--- 353,364 ----
  }

  /*
   * Forcibly shut down any remaining workers, waiting for them to finish.
+  *
+  * Note that we don't expect to come here during normal exit (the workers
+  * should be long gone, and the ParallelState too).  We're only here in an
+  * exit_horribly() situation, so intervening to cancel active commands is
+  * appropriate.
   */
  static void
  ShutdownWorkersHard(ParallelState *pstate)
*************** ShutdownWorkersHard(ParallelState *pstat
*** 367,381 ****
      for (i = 0; i < pstate->numWorkers; i++)
          closesocket(pstate->parallelSlot[i].pipeWrite);

  #ifndef WIN32
!     /* On non-Windows, send SIGTERM to abort commands-in-progress. */
      for (i = 0; i < pstate->numWorkers; i++)
!         kill(pstate->parallelSlot[i].pid, SIGTERM);
  #else
!     /* Non-idle workers monitor this event via checkAborting(). */
!     SetEvent(termEvent);
  #endif

      WaitForTerminatingWorkers(pstate);
  }

--- 372,408 ----
      for (i = 0; i < pstate->numWorkers; i++)
          closesocket(pstate->parallelSlot[i].pipeWrite);

+     /*
+      * Force early termination of any commands currently in progress.
+      */
  #ifndef WIN32
!     /* On non-Windows, send SIGTERM to each worker process. */
      for (i = 0; i < pstate->numWorkers; i++)
!     {
!         pid_t        pid = pstate->parallelSlot[i].pid;
!
!         if (pid != 0)
!             kill(pid, SIGTERM);
!     }
  #else
!
!     /*
!      * On Windows, send query cancels directly to the workers' backends.  Use
!      * a critical section to ensure worker threads don't change state.
!      */
!     EnterCriticalSection(&signal_info_lock);
!     for (i = 0; i < pstate->numWorkers; i++)
!     {
!         ArchiveHandle *AH = pstate->parallelSlot[i].args->AH;
!         char        errbuf[1];
!
!         if (AH != NULL && AH->connCancel != NULL)
!             (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
!     }
!     LeaveCriticalSection(&signal_info_lock);
  #endif

+     /* Now wait for them to terminate. */
      WaitForTerminatingWorkers(pstate);
  }

*************** WaitForTerminatingWorkers(ParallelState
*** 445,480 ****
      }
  }

  /*
!  * Signal handler (Unix only)
   */
  #ifndef WIN32
  static void
  sigTermHandler(SIGNAL_ARGS)
  {
!     wantAbort = 1;
  }
  #endif

  /*
   * This function is called by both Unix and Windows variants to set up
   * and run a worker process.  Caller should exit the process (or thread)
   * upon return.
   */
  static void
! RunWorker(ArchiveHandle *AH, int pipefd[2])
  {
      /*
       * Call the setup worker function that's defined in the ArchiveHandle.
       */
      (AH->SetupWorkerPtr) ((Archive *) AH);

-     Assert(AH->connection != NULL);
-
      /*
       * Execute commands until done.
       */
      WaitForCommands(AH, pipefd);
  }

  /*
--- 472,806 ----
      }
  }

+
  /*
!  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
!  *
!  * This doesn't quite belong in this module, but it needs access to the
!  * ParallelState data, so there's not really a better place either.
!  *
!  * When we get a cancel interrupt, we could just die, but in pg_restore that
!  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
!  * for a long time.  Instead, we try to send a cancel request and then die.
!  * pg_dump probably doesn't really need this, but we might as well use it
!  * there too.  Note that sending the cancel directly from the signal handler
!  * is safe because PQcancel() is written to make it so.
!  *
!  * In parallel operation on Unix, each process is responsible for canceling
!  * its own connection (this must be so because nobody else has access to it).
!  * Furthermore, the master process should attempt to forward its signal to
!  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
!  * needed because typing control-C at the console would deliver SIGINT to
!  * every member of the terminal process group --- but in other scenarios it
!  * might be that only the master gets signaled.
!  *
!  * On Windows, the cancel handler runs in a separate thread, because that's
!  * how SetConsoleCtrlHandler works.  We make it send cancels on all active
!  * connections and then return FALSE, which will allow the process to die.
!  * For safety's sake, we use a critical section to protect the PGcancel
!  * structures against being changed while the signal thread runs.
   */
+
  #ifndef WIN32
+
+ /*
+  * Signal handler (Unix only)
+  */
  static void
  sigTermHandler(SIGNAL_ARGS)
  {
!     int            i;
!     char        errbuf[1];
!
!     /*
!      * Some platforms allow delivery of new signals to interrupt an active
!      * signal handler.  That could muck up our attempt to send PQcancel, so
!      * disable the signals that setup_cancel_handler enabled.
!      */
!     pqsignal(SIGINT, SIG_IGN);
!     pqsignal(SIGTERM, SIG_IGN);
!     pqsignal(SIGQUIT, SIG_IGN);
!
!     /*
!      * If we're in the master, forward signal to all workers.  (It seems best
!      * to do this before PQcancel; killing the master transaction will result
!      * in invalid-snapshot errors from active workers, which maybe we can
!      * quiet by killing workers first.)  Ignore any errors.
!      */
!     if (signal_info.pstate != NULL)
!     {
!         for (i = 0; i < signal_info.pstate->numWorkers; i++)
!         {
!             pid_t        pid = signal_info.pstate->parallelSlot[i].pid;
!
!             if (pid != 0)
!                 kill(pid, SIGTERM);
!         }
!     }
!
!     /*
!      * Send QueryCancel if we have a connection to send to.  Ignore errors,
!      * there's not much we can do about them anyway.
!      */
!     if (signal_info.ourAH != NULL && signal_info.ourAH->connCancel != NULL)
!         (void) PQcancel(signal_info.ourAH->connCancel, errbuf, sizeof(errbuf));
!
!     /*
!      * Report we're quitting, using nothing more complicated than write(2).
!      * When in parallel operation, only the master process should do this.
!      */
!     if (!signal_info.am_worker)
!     {
!         if (progname)
!         {
!             write_stderr(progname);
!             write_stderr(": ");
!         }
!         write_stderr("terminated by user\n");
!     }
!
!     /* And die. */
!     exit(1);
! }
!
! /*
!  * Enable cancel interrupt handler, if not already done.
!  */
! static void
! setup_cancel_handler(void)
! {
!     /*
!      * When forking, signal_info.handler_set will propagate into the new
!      * process, but that's fine because the signal handler state does too.
!      */
!     if (!signal_info.handler_set)
!     {
!         signal_info.handler_set = true;
!
!         pqsignal(SIGINT, sigTermHandler);
!         pqsignal(SIGTERM, sigTermHandler);
!         pqsignal(SIGQUIT, sigTermHandler);
!     }
! }
!
! #else                            /* WIN32 */
!
! /*
!  * Console interrupt handler --- runs in a newly-started thread.
!  *
!  * Basically, we just send cancel requests on all open connections, then
!  * return FALSE which will allow the default ExitProcess() action to be
!  * taken.  We could make some effort to stop other threads first, but
!  * there doesn't seem to be much need.
!  */
! static BOOL WINAPI
! consoleHandler(DWORD dwCtrlType)
! {
!     int            i;
!     char        errbuf[1];
!
!     if (dwCtrlType == CTRL_C_EVENT ||
!         dwCtrlType == CTRL_BREAK_EVENT)
!     {
!         /* Critical section prevents changing data we look at here */
!         EnterCriticalSection(&signal_info_lock);
!
!         /*
!          * Send QueryCancel to all workers, if in parallel mode.  (It seems
!          * best to do this before killing the master transaction; that will
!          * result in invalid-snapshot errors from active workers, which maybe
!          * we can quiet by killing workers first.)    Ignore errors, there's not
!          * much we can do about them anyway.
!          */
!         if (signal_info.pstate != NULL)
!         {
!             for (i = 0; i < signal_info.pstate->numWorkers; i++)
!             {
!                 ArchiveHandle *AH = signal_info.pstate->parallelSlot[i].args->AH;
!
!                 if (AH != NULL && AH->connCancel != NULL)
!                     (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
!             }
!         }
!
!         /*
!          * Send QueryCancel to master connection, if enabled.  Ignore errors,
!          * there's not much we can do about them anyway.
!          */
!         if (signal_info.ourAH != NULL && signal_info.ourAH->connCancel != NULL)
!             (void) PQcancel(signal_info.ourAH->connCancel,
!                             errbuf, sizeof(errbuf));
!
!         LeaveCriticalSection(&signal_info_lock);
!     }
!
!     /* Always return FALSE to allow signal handling to continue */
!     return FALSE;
! }
!
! /*
!  * Enable cancel interrupt handler, if not already done.
!  */
! static void
! setup_cancel_handler(void)
! {
!     if (!signal_info.handler_set)
!     {
!         signal_info.handler_set = true;
!
!         InitializeCriticalSection(&signal_info_lock);
!
!         SetConsoleCtrlHandler(consoleHandler, TRUE);
!     }
  }
+
+ #endif   /* WIN32 */
+
+
+ /*
+  * set_archive_cancel_info
+  *
+  * Fill AH->connCancel with cancellation info for the specified database
+  * connection; or clear it if conn is NULL.
+  */
+ void
+ set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
+ {
+     PGcancel   *oldConnCancel;
+
+     /*
+      * Activate the interrupt handler if we didn't yet in this process.  On
+      * Windows, this also initializes signal_info_lock; therefore it's
+      * important that this happen at least once before we fork off any
+      * threads.
+      */
+     setup_cancel_handler();
+
+     /*
+      * On Unix, we assume that storing a pointer value is atomic with respect
+      * to any possible signal interrupt.  On Windows, use a critical section.
+      */
+
+ #ifdef WIN32
+     EnterCriticalSection(&signal_info_lock);
  #endif

+     /* Free the old one if we have one */
+     oldConnCancel = AH->connCancel;
+     /* be sure interrupt handler doesn't use pointer while freeing */
+     AH->connCancel = NULL;
+
+     if (oldConnCancel != NULL)
+         PQfreeCancel(oldConnCancel);
+
+     /* Set the new one if specified */
+     if (conn)
+         AH->connCancel = PQgetCancel(conn);
+
+     /*
+      * On Unix, there's only ever one active ArchiveHandle per process, so we
+      * can just set signal_info.ourAH unconditionally.  On Windows, do that
+      * only in the main thread; worker threads have to make sure their
+      * ArchiveHandle appears in the pstate data, which is dealt with in
+      * RunWorker().
+      */
+ #ifndef WIN32
+     signal_info.ourAH = AH;
+ #else
+     if (mainThreadId == GetCurrentThreadId())
+         signal_info.ourAH = AH;
+ #endif
+
+ #ifdef WIN32
+     LeaveCriticalSection(&signal_info_lock);
+ #endif
+ }
+
+ /*
+  * set_cancel_pstate
+  *
+  * Set signal_info.pstate to point to the specified ParallelState, if any.
+  * We need this mainly to have an interlock against Windows signal thread.
+  */
+ static void
+ set_cancel_pstate(ParallelState *pstate)
+ {
+ #ifdef WIN32
+     EnterCriticalSection(&signal_info_lock);
+ #endif
+
+     signal_info.pstate = pstate;
+
+ #ifdef WIN32
+     LeaveCriticalSection(&signal_info_lock);
+ #endif
+ }
+
+ /*
+  * set_cancel_slot_archive
+  *
+  * Set ParallelSlot's AH field to point to the specified archive, if any.
+  * We need this mainly to have an interlock against Windows signal thread.
+  */
+ static void
+ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
+ {
+ #ifdef WIN32
+     EnterCriticalSection(&signal_info_lock);
+ #endif
+
+     slot->args->AH = AH;
+
+ #ifdef WIN32
+     LeaveCriticalSection(&signal_info_lock);
+ #endif
+ }
+
+
  /*
   * This function is called by both Unix and Windows variants to set up
   * and run a worker process.  Caller should exit the process (or thread)
   * upon return.
   */
  static void
! RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
  {
+     int            pipefd[2];
+
+     /* fetch child ends of pipes */
+     pipefd[PIPE_READ] = slot->pipeRevRead;
+     pipefd[PIPE_WRITE] = slot->pipeRevWrite;
+
+     /*
+      * Clone the archive so that we have our own state to work with, and in
+      * particular our own database connection.
+      *
+      * We clone on Unix as well as Windows, even though technically we don't
+      * need to because fork() gives us a copy in our own address space
+      * already.  But CloneArchive resets the state information and also clones
+      * the database connection which both seem kinda helpful.
+      */
+     AH = CloneArchive(AH);
+
+     /* Remember cloned archive where signal handler can find it */
+     set_cancel_slot_archive(slot, AH);
+
      /*
       * Call the setup worker function that's defined in the ArchiveHandle.
       */
      (AH->SetupWorkerPtr) ((Archive *) AH);

      /*
       * Execute commands until done.
       */
      WaitForCommands(AH, pipefd);
+
+     /*
+      * Disconnect from database and clean up.
+      */
+     set_cancel_slot_archive(slot, NULL);
+     DisconnectDatabase(&(AH->public));
+     DeCloneArchive(AH);
  }

  /*
*************** RunWorker(ArchiveHandle *AH, int pipefd[
*** 484,505 ****
  static unsigned __stdcall
  init_spawned_worker_win32(WorkerInfo *wi)
  {
!     ArchiveHandle *AH;
!     int            pipefd[2] = {wi->pipeRead, wi->pipeWrite};
!
!     /*
!      * Clone the archive so that we have our own state to work with, and in
!      * particular our own database connection.
!      */
!     AH = CloneArchive(wi->AH);

      free(wi);

      /* Run the worker ... */
!     RunWorker(AH, pipefd);

!     /* Clean up and exit the thread */
!     DeCloneArchive(AH);
      _endthreadex(0);
      return 0;
  }
--- 810,825 ----
  static unsigned __stdcall
  init_spawned_worker_win32(WorkerInfo *wi)
  {
!     ArchiveHandle *AH = wi->AH;
!     ParallelSlot *slot = wi->slot;

+     /* Don't need WorkerInfo anymore */
      free(wi);

      /* Run the worker ... */
!     RunWorker(AH, slot);

!     /* Exit the thread */
      _endthreadex(0);
      return 0;
  }
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 519,527 ****

      Assert(AH->public.numWorkers > 0);

-     /* Ensure stdio state is quiesced before forking */
-     fflush(NULL);
-
      pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));

      pstate->numWorkers = AH->public.numWorkers;
--- 839,844 ----
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 533,557 ****
      pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
      memset((void *) pstate->parallelSlot, 0, slotSize);

-     /*
-      * Set the pstate in the shutdown_info. The exit handler uses pstate if
-      * set and falls back to AHX otherwise.
-      */
-     shutdown_info.pstate = pstate;
-
  #ifdef WIN32
-     /* Set up thread management state */
-     tMasterThreadId = GetCurrentThreadId();
-     termEvent = CreateEvent(NULL, true, false, "Terminate");
      /* Make fmtId() and fmtQualifiedId() use thread-local storage */
      getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
- #else
-     /* Set up signal handling state */
-     signal(SIGTERM, sigTermHandler);
-     signal(SIGINT, sigTermHandler);
-     signal(SIGQUIT, sigTermHandler);
  #endif

      /* Create desired number of workers */
      for (i = 0; i < pstate->numWorkers; i++)
      {
--- 850,880 ----
      pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
      memset((void *) pstate->parallelSlot, 0, slotSize);

  #ifdef WIN32
      /* Make fmtId() and fmtQualifiedId() use thread-local storage */
      getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
  #endif

+     /*
+      * Set the pstate in shutdown_info, to tell the exit handler that it must
+      * clean up workers as well as the main database connection.  But we don't
+      * set this in signal_info yet, because we don't want child processes to
+      * inherit non-NULL signal_info.pstate.
+      */
+     shutdown_info.pstate = pstate;
+
+     /*
+      * Temporarily disable query cancellation on the master connection.  This
+      * ensures that child processes won't inherit valid AH->connCancel
+      * settings and thus won't try to issue cancels against the master's
+      * connection.  No harm is done if we fail while it's disabled, because
+      * the master connection is idle at this point anyway.
+      */
+     set_archive_cancel_info(AH, NULL);
+
+     /* Ensure stdio state is quiesced before forking */
+     fflush(NULL);
+
      /* Create desired number of workers */
      for (i = 0; i < pstate->numWorkers; i++)
      {
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 561,566 ****
--- 884,890 ----
  #else
          pid_t        pid;
  #endif
+         ParallelSlot *slot = &(pstate->parallelSlot[i]);
          int            pipeMW[2],
                      pipeWM[2];

*************** ParallelBackupStart(ArchiveHandle *AH)
*** 570,610 ****
                            "could not create communication channels: %s\n",
                            strerror(errno));

!         pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
!         pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
!         pstate->parallelSlot[i].args->AH = NULL;
!         pstate->parallelSlot[i].args->te = NULL;

          /* master's ends of the pipes */
!         pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
!         pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
          /* child's ends of the pipes */
!         pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
!         pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];

  #ifdef WIN32
          /* Create transient structure to pass args to worker function */
          wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

          wi->AH = AH;
!         wi->pipeRead = pipeMW[PIPE_READ];
!         wi->pipeWrite = pipeWM[PIPE_WRITE];

          handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
!                                 wi, 0, &(pstate->parallelSlot[i].threadId));
!         pstate->parallelSlot[i].hThread = handle;
  #else                            /* !WIN32 */
          pid = fork();
          if (pid == 0)
          {
              /* we are the worker */
              int            j;
-             int            pipefd[2];

!             pipefd[0] = pipeMW[PIPE_READ];
!             pipefd[1] = pipeWM[PIPE_WRITE];

!             pstate->parallelSlot[i].pid = getpid();

              /* close read end of Worker -> Master */
              closesocket(pipeWM[PIPE_READ]);
--- 894,933 ----
                            "could not create communication channels: %s\n",
                            strerror(errno));

!         slot->workerStatus = WRKR_IDLE;
!         slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
!         slot->args->AH = NULL;
!         slot->args->te = NULL;

          /* master's ends of the pipes */
!         slot->pipeRead = pipeWM[PIPE_READ];
!         slot->pipeWrite = pipeMW[PIPE_WRITE];
          /* child's ends of the pipes */
!         slot->pipeRevRead = pipeMW[PIPE_READ];
!         slot->pipeRevWrite = pipeWM[PIPE_WRITE];

  #ifdef WIN32
          /* Create transient structure to pass args to worker function */
          wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

          wi->AH = AH;
!         wi->slot = slot;

          handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
!                                 wi, 0, &(slot->threadId));
!         slot->hThread = handle;
  #else                            /* !WIN32 */
          pid = fork();
          if (pid == 0)
          {
              /* we are the worker */
              int            j;

!             /* this is needed for GetMyPSlot() */
!             slot->pid = getpid();

!             /* instruct signal handler that we're in a worker now */
!             signal_info.am_worker = true;

              /* close read end of Worker -> Master */
              closesocket(pipeWM[PIPE_READ]);
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 621,637 ****
                  closesocket(pstate->parallelSlot[j].pipeWrite);
              }

-             /*
-              * Call CloneArchive on Unix as well as Windows, even though
-              * technically we don't need to because fork() gives us a copy in
-              * our own address space already.  But CloneArchive resets the
-              * state information and also clones the database connection which
-              * both seem kinda helpful.
-              */
-             pstate->parallelSlot[i].args->AH = CloneArchive(AH);
-
              /* Run the worker ... */
!             RunWorker(pstate->parallelSlot[i].args->AH, pipefd);

              /* We can just exit(0) when done */
              exit(0);
--- 944,951 ----
                  closesocket(pstate->parallelSlot[j].pipeWrite);
              }

              /* Run the worker ... */
!             RunWorker(AH, slot);

              /* We can just exit(0) when done */
              exit(0);
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 645,651 ****
          }

          /* In Master after successful fork */
!         pstate->parallelSlot[i].pid = pid;

          /* close read end of Master -> Worker */
          closesocket(pipeMW[PIPE_READ]);
--- 959,965 ----
          }

          /* In Master after successful fork */
!         slot->pid = pid;

          /* close read end of Master -> Worker */
          closesocket(pipeMW[PIPE_READ]);
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 660,668 ****
       * the workers to inherit this setting, though.
       */
  #ifndef WIN32
!     signal(SIGPIPE, SIG_IGN);
  #endif

      return pstate;
  }

--- 974,995 ----
       * the workers to inherit this setting, though.
       */
  #ifndef WIN32
!     pqsignal(SIGPIPE, SIG_IGN);
  #endif

+     /*
+      * Re-establish query cancellation on the master connection.
+      */
+     set_archive_cancel_info(AH, AH->connection);
+
+     /*
+      * Tell the cancel signal handler to forward signals to worker processes,
+      * too.  (As with query cancel, we did not need this earlier because the
+      * workers have not yet been given anything to do; if we die before this
+      * point, any already-started workers will see EOF and quit promptly.)
+      */
+     set_cancel_pstate(pstate);
+
      return pstate;
  }

*************** ParallelBackupEnd(ArchiveHandle *AH, Par
*** 692,701 ****
      WaitForTerminatingWorkers(pstate);

      /*
!      * Unlink pstate from shutdown_info, so the exit handler will again fall
!      * back to closing AH->connection (if connected).
       */
      shutdown_info.pstate = NULL;

      /* Release state (mere neatnik-ism, since we're about to terminate) */
      free(pstate->parallelSlot);
--- 1019,1029 ----
      WaitForTerminatingWorkers(pstate);

      /*
!      * Unlink pstate from shutdown_info, so the exit handler will not try to
!      * use it; and likewise unlink from signal_info.
       */
      shutdown_info.pstate = NULL;
+     set_cancel_pstate(NULL);

      /* Release state (mere neatnik-ism, since we're about to terminate) */
      free(pstate->parallelSlot);
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 872,880 ****
      {
          if (!(command = getMessageFromMaster(pipefd)))
          {
!             /* EOF ... clean up */
!             PQfinish(AH->connection);
!             AH->connection = NULL;
              return;
          }

--- 1200,1206 ----
      {
          if (!(command = getMessageFromMaster(pipefd)))
          {
!             /* EOF, so done */
              return;
          }

*************** select_loop(int maxFd, fd_set *workerset
*** 1138,1181 ****
      int            i;
      fd_set        saveSet = *workerset;

- #ifdef WIN32
-     for (;;)
-     {
-         /*
-          * Sleep a quarter of a second before checking if we should terminate.
-          *
-          * XXX we're not actually checking for a cancel interrupt ... but we
-          * should be.
-          */
-         struct timeval tv = {0, 250000};
-
-         *workerset = saveSet;
-         i = select(maxFd + 1, workerset, NULL, NULL, &tv);
-
-         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
-             continue;
-         if (i)
-             break;
-     }
- #else                            /* !WIN32 */
      for (;;)
      {
          *workerset = saveSet;
          i = select(maxFd + 1, workerset, NULL, NULL, NULL);

!         /*
!          * If we Ctrl-C the master process, it's likely that we interrupt
!          * select() here. The signal handler will set wantAbort == true and
!          * the shutdown journey starts from here.
!          */
!         if (wantAbort)
!             exit_horribly(modulename, "terminated by user\n");
!
          if (i < 0 && errno == EINTR)
              continue;
          break;
      }
- #endif   /* WIN32 */

      return i;
  }
--- 1464,1483 ----
      int            i;
      fd_set        saveSet = *workerset;

      for (;;)
      {
          *workerset = saveSet;
          i = select(maxFd + 1, workerset, NULL, NULL, NULL);

! #ifndef WIN32
          if (i < 0 && errno == EINTR)
              continue;
+ #else
+         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+             continue;
+ #endif
          break;
      }

      return i;
  }
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index 8d70428..21739ca 100644
*** a/src/bin/pg_dump/parallel.h
--- b/src/bin/pg_dump/parallel.h
*************** extern void DispatchJobForTocEntry(Archi
*** 82,87 ****
                         TocEntry *te, T_Action act);
  extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);

! extern void checkAborting(ArchiveHandle *AH);

  #endif   /* PG_DUMP_PARALLEL_H */
--- 82,87 ----
                         TocEntry *te, T_Action act);
  extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);

! extern void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn);

  #endif   /* PG_DUMP_PARALLEL_H */
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index ad8e132..93ec4fb 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** CloneArchive(ArchiveHandle *AH)
*** 4420,4425 ****
--- 4420,4426 ----

      /* The clone will have its own connection, so disregard connection state */
      clone->connection = NULL;
+     clone->connCancel = NULL;
      clone->currUser = NULL;
      clone->currSchema = NULL;
      clone->currTablespace = NULL;
*************** CloneArchive(ArchiveHandle *AH)
*** 4497,4502 ****
--- 4498,4506 ----
  void
  DeCloneArchive(ArchiveHandle *AH)
  {
+     /* Should not have an open database connection */
+     Assert(AH->connection == NULL);
+
      /* Clear format-specific state */
      (AH->DeClonePtr) (AH);

diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 4aa7190..0376f2b 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** struct _archiveHandle
*** 285,290 ****
--- 285,293 ----
      char       *savedPassword;    /* password for ropt->username, if known */
      char       *use_role;
      PGconn       *connection;
+     /* If connCancel isn't NULL, SIGINT handler will send a cancel */
+     PGcancel   *volatile connCancel;
+
      int            connectToDB;    /* Flag to indicate if direct DB connection is
                                   * required */
      ArchiverOutput outputKind;    /* Flag for what we're currently writing */
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index 818bc9e..352595e 100644
*** a/src/bin/pg_dump/pg_backup_db.c
--- b/src/bin/pg_dump/pg_backup_db.c
***************
*** 12,17 ****
--- 12,18 ----
  #include "postgres_fe.h"

  #include "dumputils.h"
+ #include "parallel.h"
  #include "pg_backup_archiver.h"
  #include "pg_backup_db.h"
  #include "pg_backup_utils.h"
*************** ReconnectToServer(ArchiveHandle *AH, con
*** 106,111 ****
--- 107,115 ----

      newConn = _connectDB(AH, newdbname, newusername);

+     /* Update ArchiveHandle's connCancel before closing old connection */
+     set_archive_cancel_info(AH, newConn);
+
      PQfinish(AH->connection);
      AH->connection = newConn;

*************** ConnectDatabase(Archive *AHX,
*** 327,332 ****
--- 331,339 ----
      _check_database_version(AH);

      PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
+
+     /* arrange for SIGINT to issue a query cancel on this connection */
+     set_archive_cancel_info(AH, AH->connection);
  }

  /*
*************** void
*** 337,355 ****
  DisconnectDatabase(Archive *AHX)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
-     PGcancel   *cancel;
      char        errbuf[1];

      if (!AH->connection)
          return;

!     if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
      {
!         if ((cancel = PQgetCancel(AH->connection)))
!         {
!             PQcancel(cancel, errbuf, sizeof(errbuf));
!             PQfreeCancel(cancel);
!         }
      }

      PQfinish(AH->connection);
--- 344,368 ----
  DisconnectDatabase(Archive *AHX)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      char        errbuf[1];

      if (!AH->connection)
          return;

!     if (AH->connCancel)
      {
!         /*
!          * If we have an active query, send a cancel before closing.  This is
!          * of no use for a normal exit, but might be helpful during
!          * exit_horribly().
!          */
!         if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
!             PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
!
!         /*
!          * Prevent signal handler from sending a cancel after this.
!          */
!         set_archive_cancel_info(AH, NULL);
      }

      PQfinish(AH->connection);
*************** EndDBCopyMode(Archive *AHX, const char *
*** 631,636 ****
--- 644,654 ----
                                  tocEntryTag, PQerrorMessage(AH->connection));
          PQclear(res);

+         /* Do this to ensure we've pumped libpq back to idle state */
+         if (PQgetResult(AH->connection) != NULL)
+             write_msg(NULL, "WARNING: unexpected extra results during COPY of table \"%s\"\n",
+                       tocEntryTag);
+
          AH->pgCopyIn = false;
      }
  }
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 27c6190..e52f122 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** _WriteData(ArchiveHandle *AH, const void
*** 356,364 ****
  {
      lclContext *ctx = (lclContext *) AH->formatData;

-     /* Are we aborting? */
-     checkAborting(AH);
-
      if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen)
          WRITE_ERROR_EXIT;

--- 356,361 ----
*************** _PrintFileData(ArchiveHandle *AH, char *
*** 407,415 ****

      while ((cnt = cfread(buf, buflen, cfp)))
      {
-         /* Are we aborting? */
-         checkAborting(AH);
-
          ahwrite(buf, 1, cnt, AH);
      }

--- 404,409 ----
*************** _WriteBuf(ArchiveHandle *AH, const void
*** 529,537 ****
  {
      lclContext *ctx = (lclContext *) AH->formatData;

-     /* Are we aborting? */
-     checkAborting(AH);
-
      if (cfwrite(buf, len, ctx->dataFH) != len)
          WRITE_ERROR_EXIT;

--- 523,528 ----
*************** _ReadBuf(ArchiveHandle *AH, void *buf, s
*** 548,556 ****
  {
      lclContext *ctx = (lclContext *) AH->formatData;

-     /* Are we aborting? */
-     checkAborting(AH);
-
      /*
       * If there was an I/O error, we already exited in cfread(), so here we
       * exit on short reads.
--- 539,544 ----
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f85778d..a17fb3d 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
*************** dumpTableData_copy(Archive *fout, void *
*** 1768,1773 ****
--- 1768,1778 ----
      }
      PQclear(res);

+     /* Do this to ensure we've pumped libpq back to idle state */
+     if (PQgetResult(conn) != NULL)
+         write_msg(NULL, "WARNING: unexpected extra results during COPY of table \"%s\"\n",
+                   classname);
+
      destroyPQExpBuffer(q);
      return 1;
  }

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Gavin Flower
Дата:
Сообщение: Re: Does people favor to have matrix data type?
Следующее
От: Andreas Karlsson
Дата:
Сообщение: Re: Parallel safety tagging of extension functions