Re: Parallel pg_dump's error reporting doesn't work worth squat

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Parallel pg_dump's error reporting doesn't work worth squat
Дата
Msg-id 24048.1463265913@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Parallel pg_dump's error reporting doesn't work worth squat  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: Parallel pg_dump's error reporting doesn't work worth squat  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
[ getting back to a complaint I made in December ]

I wrote:
> ... the pg_dump process has crashed with a SIGPIPE without printing
> any message whatsoever, and without coming anywhere near finishing the
> dump.

> A bit of investigation says that this is because somebody had the bright
> idea that worker processes could report fatal errors back to the master
> process instead of just printing them to stderr.  So when the workers
> fail to establish connections (because of the password problem cited in
> #13727), they don't tell me about that.  Oh no, they send those errors
> back up to the pipe to the parent, and then die silently.  Meanwhile,
> the parent is trying to send them commands, and since it doesn't protect
> itself against SIGPIPE on the command pipes, it crashes without ever
> reporting anything.  If you aren't paying close attention, you wouldn't
> even realize you didn't get a completed dump.

Attached is a proposed patch for this.  It reverts exit_horribly() to
what it used to be pre-9.3, ie just print the message on stderr and die.
The master now notices child failure by observing EOF on the status pipe.
While that works automatically on Unix, we have to explicitly close the
child side of the pipe on Windows (could someone check that that works?).
I also fixed the parent side to ignore SIGPIPE earlier, so that it won't
just die if it was in the midst of sending to the child.

BTW, after having spent the afternoon staring at it, I will assert with
confidence that pg_dump/parallel.c is an embarrassment to the project.
It is chock-full of misleading, out-of-date, and/or outright wrong
comments, useless or even wrong Asserts, ill-designed APIs, code that
works quite differently in Unix and Windows cases without a shred of
commentary about the fact, etc etc.  I have mostly resisted the temptation
to do cosmetic cleanup in the attached, but this code really needs some
editorial attention.

            regards, tom lane

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 9167294..f650d3f 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
*************** static ShutdownInformation shutdown_info
*** 77,84 ****
  static const char *modulename = gettext_noop("parallel archiver");

  static ParallelSlot *GetMyPSlot(ParallelState *pstate);
- static void parallel_msg_master(ParallelSlot *slot, const char *modulename,
-                     const char *fmt, va_list ap) pg_attribute_printf(3, 0);
  static void archive_close_connection(int code, void *arg);
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
--- 77,82 ----
*************** GetMyPSlot(ParallelState *pstate)
*** 163,227 ****
  }

  /*
-  * Fail and die, with a message to stderr.  Parameters as for write_msg.
-  *
-  * This is defined in parallel.c, because in parallel mode, things are more
-  * complicated. If the worker process does exit_horribly(), we forward its
-  * last words to the master process. The master process then does
-  * exit_horribly() with this error message itself and prints it normally.
-  * After printing the message, exit_horribly() on the master will shut down
-  * the remaining worker processes.
-  */
- void
- exit_horribly(const char *modulename, const char *fmt,...)
- {
-     va_list        ap;
-     ParallelState *pstate = shutdown_info.pstate;
-     ParallelSlot *slot;
-
-     va_start(ap, fmt);
-
-     if (pstate == NULL)
-     {
-         /* Not in parallel mode, just write to stderr */
-         vwrite_msg(modulename, fmt, ap);
-     }
-     else
-     {
-         slot = GetMyPSlot(pstate);
-
-         if (!slot)
-             /* We're the parent, just write the message out */
-             vwrite_msg(modulename, fmt, ap);
-         else
-             /* If we're a worker process, send the msg to the master process */
-             parallel_msg_master(slot, modulename, fmt, ap);
-     }
-
-     va_end(ap);
-
-     exit_nicely(1);
- }
-
- /* Sends the error message from the worker to the master process */
- static void
- parallel_msg_master(ParallelSlot *slot, const char *modulename,
-                     const char *fmt, va_list ap)
- {
-     char        buf[512];
-     int            pipefd[2];
-
-     pipefd[PIPE_READ] = slot->pipeRevRead;
-     pipefd[PIPE_WRITE] = slot->pipeRevWrite;
-
-     strcpy(buf, "ERROR ");
-     vsnprintf(buf + strlen("ERROR "),
-               sizeof(buf) - strlen("ERROR "), fmt, ap);
-
-     sendMessageToMaster(pipefd, buf);
- }
-
- /*
   * A thread-local version of getLocalPQExpBuffer().
   *
   * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
--- 161,166 ----
*************** getThreadLocalPQExpBuffer(void)
*** 271,277 ****

  /*
   * pg_dump and pg_restore register the Archive pointer for the exit handler
!  * (called from exit_horribly). This function mainly exists so that we can
   * keep shutdown_info in file scope only.
   */
  void
--- 210,216 ----

  /*
   * pg_dump and pg_restore register the Archive pointer for the exit handler
!  * (called from exit_nicely). This function mainly exists so that we can
   * keep shutdown_info in file scope only.
   */
  void
*************** on_exit_close_archive(Archive *AHX)
*** 282,289 ****
  }

  /*
!  * This function can close archives in both the parallel and non-parallel
!  * case.
   */
  static void
  archive_close_connection(int code, void *arg)
--- 221,228 ----
  }

  /*
!  * on_exit_nicely handler for shutting down database connections and
!  * worker processes cleanly.
   */
  static void
  archive_close_connection(int code, void *arg)
*************** archive_close_connection(int code, void
*** 292,325 ****

      if (si->pstate)
      {
          ParallelSlot *slot = GetMyPSlot(si->pstate);

          if (!slot)
          {
              /*
!              * We're the master: We have already printed out the message
!              * passed to exit_horribly() either from the master itself or from
!              * a worker process. Now we need to close our own database
!              * connection (only open during parallel dump but not restore) and
!              * shut down the remaining workers.
               */
!             DisconnectDatabase(si->AHX);
  #ifndef WIN32

              /*
!              * Setting aborting to true switches to best-effort-mode
!              * (send/receive but ignore errors) in communicating with our
!              * workers.
               */
              aborting = true;
  #endif
              ShutdownWorkersHard(si->pstate);
          }
!         else if (slot->args->AH)
!             DisconnectDatabase(&(slot->args->AH->public));
      }
-     else if (si->AHX)
-         DisconnectDatabase(si->AHX);
  }

  /*
--- 231,283 ----

      if (si->pstate)
      {
+         /* In parallel mode, must figure out who we are */
          ParallelSlot *slot = GetMyPSlot(si->pstate);

          if (!slot)
          {
              /*
!              * We're the master.  Close our own database connection, if any,
!              * and then forcibly shut down workers.
               */
!             if (si->AHX)
!                 DisconnectDatabase(si->AHX);
!
  #ifndef WIN32

              /*
!              * Setting aborting to true shuts off error/warning messages that
!              * are no longer useful once we start killing workers.
               */
              aborting = true;
  #endif
              ShutdownWorkersHard(si->pstate);
          }
!         else
!         {
!             /*
!              * We're a worker.  Shut down our own DB connection if any.  On
!              * Windows, we also have to close our communication sockets, to
!              * emulate what will happen on Unix when the worker process exits.
!              * (Without this, if this is a premature exit, the master would
!              * fail to detect it because there would be no EOF condition on
!              * the other end of the pipe.)
!              */
!             if (slot->args->AH)
!                 DisconnectDatabase(&(slot->args->AH->public));
!
! #ifdef WIN32
!             closesocket(slot->pipeRevRead);
!             closesocket(slot->pipeRevWrite);
! #endif
!         }
!     }
!     else
!     {
!         /* Non-parallel operation: just kill the master DB connection */
!         if (si->AHX)
!             DisconnectDatabase(si->AHX);
      }
  }

  /*
*************** archive_close_connection(int code, void
*** 327,333 ****
   * threads to terminate as well (and not finish with their 70 GB table dump
   * first...). Now in UNIX we can just kill these processes, and let the signal
   * handler set wantAbort to 1. In Windows we set a termEvent and this serves
!  * as the signal for everyone to terminate.
   */
  void
  checkAborting(ArchiveHandle *AH)
--- 285,292 ----
   * threads to terminate as well (and not finish with their 70 GB table dump
   * first...). Now in UNIX we can just kill these processes, and let the signal
   * handler set wantAbort to 1. In Windows we set a termEvent and this serves
!  * as the signal for everyone to terminate.  We don't print any error message,
!  * that would just clutter the screen.
   */
  void
  checkAborting(ArchiveHandle *AH)
*************** checkAborting(ArchiveHandle *AH)
*** 337,343 ****
  #else
      if (wantAbort)
  #endif
!         exit_horribly(modulename, "worker is terminating\n");
  }

  /*
--- 296,302 ----
  #else
      if (wantAbort)
  #endif
!         exit_nicely(1);
  }

  /*
*************** ShutdownWorkersHard(ParallelState *pstat
*** 352,359 ****
  #ifndef WIN32
      int            i;

-     signal(SIGPIPE, SIG_IGN);
-
      /*
       * Close our write end of the sockets so that the workers know they can
       * exit.
--- 311,316 ----
*************** sigTermHandler(int signum)
*** 428,454 ****
  #endif

  /*
!  * This function is called by both UNIX and Windows variants to set up a
!  * worker process.
   */
  static void
  SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
  {
      /*
       * Call the setup worker function that's defined in the ArchiveHandle.
-      *
-      * We get the raw connection only for the reason that we can close it
-      * properly when we shut down. This happens only that way when it is
-      * brought down because of an error.
       */
      (AH->SetupWorkerPtr) ((Archive *) AH);

      Assert(AH->connection != NULL);

      WaitForCommands(AH, pipefd);
-
-     closesocket(pipefd[PIPE_READ]);
-     closesocket(pipefd[PIPE_WRITE]);
  }

  #ifdef WIN32
--- 385,405 ----
  #endif

  /*
!  * This function is called by both UNIX and Windows variants to set up
!  * and run a worker process.  Caller should exit the process (or thread)
!  * upon return.
   */
  static void
  SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
  {
      /*
       * Call the setup worker function that's defined in the ArchiveHandle.
       */
      (AH->SetupWorkerPtr) ((Archive *) AH);

      Assert(AH->connection != NULL);

      WaitForCommands(AH, pipefd);
  }

  #ifdef WIN32
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 534,547 ****
          pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
          pstate->parallelSlot[i].args->AH = NULL;
          pstate->parallelSlot[i].args->te = NULL;
  #ifdef WIN32
          /* Allocate a new structure for every worker */
          wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

          wi->worker = i;
          wi->AH = AH;
!         wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
!         wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];

          handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
                                  wi, 0, &(pstate->parallelSlot[i].threadId));
--- 485,506 ----
          pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
          pstate->parallelSlot[i].args->AH = NULL;
          pstate->parallelSlot[i].args->te = NULL;
+
+         /* master's ends of the pipes */
+         pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+         pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+         /* child's ends of the pipes */
+         pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+         pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+
  #ifdef WIN32
          /* Allocate a new structure for every worker */
          wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

          wi->worker = i;
          wi->AH = AH;
!         wi->pipeRead = pipeMW[PIPE_READ];
!         wi->pipeWrite = pipeWM[PIPE_WRITE];

          handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
                                  wi, 0, &(pstate->parallelSlot[i].threadId));
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 557,571 ****
              pipefd[0] = pipeMW[PIPE_READ];
              pipefd[1] = pipeWM[PIPE_WRITE];

-             /*
-              * Store the fds for the reverse communication in pstate. Actually
-              * we only use this in case of an error and don't use pstate
-              * otherwise in the worker process. On Windows we write to the
-              * global pstate, in Unix we write to our process-local copy but
-              * that's also where we'd retrieve this information back from.
-              */
-             pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
-             pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
              pstate->parallelSlot[i].pid = getpid();

              /*
--- 516,521 ----
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 584,590 ****

              /*
               * Close all inherited fds for communication of the master with
!              * the other workers.
               */
              for (j = 0; j < i; j++)
              {
--- 534,540 ----

              /*
               * Close all inherited fds for communication of the master with
!              * previously-forked workers.
               */
              for (j = 0; j < i; j++)
              {
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 612,622 ****

          pstate->parallelSlot[i].pid = pid;
  #endif
-
-         pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
-         pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
      }

      return pstate;
  }

--- 562,577 ----

          pstate->parallelSlot[i].pid = pid;
  #endif
      }

+     /*
+      * Having forked off the workers, disable SIGPIPE so that master isn't
+      * killed if it tries to send a command to a dead worker.
+      */
+ #ifndef WIN32
+     signal(SIGPIPE, SIG_IGN);
+ #endif
+
      return pstate;
  }

*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 977,992 ****
          }
          else
              exit_horribly(modulename,
!                           "invalid message received from worker: %s\n", msg);
!     }
!     else if (messageStartsWith(msg, "ERROR "))
!     {
!         Assert(AH->format == archDirectory || AH->format == archCustom);
!         pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
!         exit_horribly(modulename, "%s", msg + strlen("ERROR "));
      }
      else
!         exit_horribly(modulename, "invalid message received from worker: %s\n", msg);

      /* both Unix and Win32 return pg_malloc()ed space, so we free it */
      free(msg);
--- 932,944 ----
          }
          else
              exit_horribly(modulename,
!                           "invalid message received from worker: \"%s\"\n",
!                           msg);
      }
      else
!         exit_horribly(modulename,
!                       "invalid message received from worker: \"%s\"\n",
!                       msg);

      /* both Unix and Win32 return pg_malloc()ed space, so we free it */
      free(msg);
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index 591653b..8d70428 100644
*** a/src/bin/pg_dump/parallel.h
--- b/src/bin/pg_dump/parallel.h
*************** typedef struct ParallelSlot
*** 42,50 ****
      ParallelArgs *args;
      T_WorkerStatus workerStatus;
      int            status;
!     int            pipeRead;
      int            pipeWrite;
!     int            pipeRevRead;
      int            pipeRevWrite;
  #ifdef WIN32
      uintptr_t    hThread;
--- 42,50 ----
      ParallelArgs *args;
      T_WorkerStatus workerStatus;
      int            status;
!     int            pipeRead;        /* master's end of the pipes */
      int            pipeWrite;
!     int            pipeRevRead;    /* child's end of the pipes */
      int            pipeRevWrite;
  #ifdef WIN32
      uintptr_t    hThread;
diff --git a/src/bin/pg_dump/pg_backup_utils.c b/src/bin/pg_dump/pg_backup_utils.c
index 0aa13cd..5d1d875 100644
*** a/src/bin/pg_dump/pg_backup_utils.c
--- b/src/bin/pg_dump/pg_backup_utils.c
*************** vwrite_msg(const char *modulename, const
*** 93,98 ****
--- 93,115 ----
      vfprintf(stderr, _(fmt), ap);
  }

+ /*
+  * Fail and die, with a message to stderr.  Parameters as for write_msg.
+  *
+  * Note that on_exit_nicely callbacks will get run.
+  */
+ void
+ exit_horribly(const char *modulename, const char *fmt,...)
+ {
+     va_list        ap;
+
+     va_start(ap, fmt);
+     vwrite_msg(modulename, fmt, ap);
+     va_end(ap);
+
+     exit_nicely(1);
+ }
+
  /* Register a callback to be run when exit_nicely is invoked. */
  void
  on_exit_nicely(on_exit_nicely_callback function, void *arg)
*************** on_exit_nicely(on_exit_nicely_callback f
*** 106,112 ****

  /*
   * Run accumulated on_exit_nicely callbacks in reverse order and then exit
!  * quietly.  This needs to be thread-safe.
   */
  void
  exit_nicely(int code)
--- 123,142 ----

  /*
   * Run accumulated on_exit_nicely callbacks in reverse order and then exit
!  * without printing any message.
!  *
!  * If running in a parallel worker thread on Windows, we only exit the thread,
!  * not the whole process.
!  *
!  * Note that in parallel operation on Windows, the callback(s) will be run
!  * by each thread since the list state is necessarily shared by all threads;
!  * each callback must contain logic to ensure it does only what's appropriate
!  * for its thread.  On Unix, callbacks are also run by each process, but only
!  * for callbacks established before we fork off the child processes.  (It'd
!  * be cleaner to reset the list after fork(), and let each child establish
!  * its own callbacks; but then the behavior would be completely inconsistent
!  * between Windows and Unix.  For now, just be sure to establish callbacks
!  * before forking to avoid inconsistency.)
   */
  void
  exit_nicely(int code)

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: exit() behavior on Windows?
Следующее
От: Josh berkus
Дата:
Сообщение: Re: 10.0