Redesigning parallel dump/restore's wait-for-workers logic

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Redesigning parallel dump/restore's wait-for-workers logic
Дата
Msg-id 1188.1464544443@sss.pgh.pa.us
обсуждение исходный текст
Ответы Re: Redesigning parallel dump/restore's wait-for-workers logic  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
One of the things I do not like about the current coding of parallel
pg_dump/pg_restore is its baroque logic for handling worker completion
reports, specifically the ListenToWorkers/ReapWorkerStatus APIs.
That code is messy and hard to use --- if the existing logic in
restore_toc_entries_parallel doesn't make your head hurt, you're a better
man than I am.  And we've got two other similar loops using those
functions, which cry out to be merged but can't be because the other two
have hard-wired ideas about what the cleanup action is.

Hence, I propose the attached redesign.  This is based on the idea of
having DispatchJobForTocEntry register a callback function that will take
care of state cleanup, doing whatever had been done by the caller of
ReapWorkerStatus in the old design.  (This callback is essentially just
the old mark_work_done function in the restore case, and a trivial test
for worker failure in the dump case.)  Then we can have ListenToWorkers
call the callback immediately on receipt of a status message, and return
the worker to WRKR_IDLE state; so the WRKR_FINISHED state goes away.
And it becomes easy to design a unified wait-for-worker-messages loop:
in the attached, WaitForWorkers replaces EnsureIdleWorker and
EnsureWorkersFinished as well as the mess in restore_toc_entries_parallel.
Also, we no longer need the fragile API spec that the caller of
DispatchJobForTocEntry is responsible for ensuring there's an idle worker.

In passing, I got rid of the ParallelArgs struct, which was a net negative
in terms of notational verboseness, and didn't seem to be providing any
noticeable amount of abstraction either.

BTW, I also tried to make ParallelState an opaque struct known only
within parallel.c.  I failed at that because there are two loops in
get_next_work_item that want to look at all the actively-being-worked-on
TocEntrys.  A possible solution to that is to separate the TocEntry
fields into their own array, so that ParallelState looks like

typedef struct ParallelState
{
    int           numWorkers;       /* allowed number of workers */
    /* these arrays have numWorkers entries, one per worker: */
    TocEntry    **te;               /* item being worked on, or NULL */
    ParallelSlot *parallelSlot;     /* private info about each worker */
} ParallelState;

where ParallelSlot could be opaque outside parallel.c.  I'm not sure
if this is worth the trouble though.

Comments?

            regards, tom lane

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index e9e8698..5774093 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 35,43 ****
   * the required action (dump or restore) and returns a malloc'd status string.
   * The status string is passed back to the master where it is interpreted by
   * AH->MasterEndParallelItemPtr, another format-specific routine.  That
!  * function can update state or catalog information on the master's side,
   * depending on the reply from the worker process.  In the end it returns a
!  * status code, which is 0 for successful execution.
   *
   * Remember that we have forked off the workers only after we have read in
   * the catalog.  That's why our worker processes can also access the catalog
--- 35,45 ----
   * the required action (dump or restore) and returns a malloc'd status string.
   * The status string is passed back to the master where it is interpreted by
   * AH->MasterEndParallelItemPtr, another format-specific routine.  That
!  * function can update format-specific information on the master's side,
   * depending on the reply from the worker process.  In the end it returns a
!  * status code, which we pass to the ParallelCompletionPtr callback function
!  * that was passed to DispatchJobForTocEntry().  The callback function does
!  * state updating for the master control logic in pg_backup_archiver.c.
   *
   * Remember that we have forked off the workers only after we have read in
   * the catalog.  That's why our worker processes can also access the catalog
***************
*** 48,60 ****
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
   *        WRKR_IDLE: it's waiting for a command
!  *        WRKR_WORKING: it's been sent a command
!  *        WRKR_FINISHED: it's returned a result
   *        WRKR_TERMINATED: process ended
-  * The FINISHED state indicates that the worker is idle, but we've not yet
-  * dealt with the status code it returned from the prior command.
-  * ReapWorkerStatus() extracts the unhandled command status value and sets
-  * the workerStatus back to WRKR_IDLE.
   */

  #include "postgres_fe.h"
--- 50,57 ----
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
   *        WRKR_IDLE: it's waiting for a command
!  *        WRKR_WORKING: it's working on a command
   *        WRKR_TERMINATED: process ended
   */

  #include "postgres_fe.h"
***************
*** 75,80 ****
--- 72,79 ----
  #define PIPE_READ                            0
  #define PIPE_WRITE                            1

+ #define NO_SLOT (-1)            /* Failure result for GetIdleWorker() */
+
  #ifdef WIN32

  /*
*************** static void archive_close_connection(int
*** 135,143 ****
--- 134,145 ----
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
  static void RunWorker(ArchiveHandle *AH, int pipefd[2]);
+ static int    GetIdleWorker(ParallelState *pstate);
  static bool HasEveryWorkerTerminated(ParallelState *pstate);
  static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
  static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+ static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
+                 bool do_wait);
  static char *getMessageFromMaster(int pipefd[2]);
  static void sendMessageToMaster(int pipefd[2], const char *str);
  static int    select_loop(int maxFd, fd_set *workerset);
*************** archive_close_connection(int code, void
*** 309,316 ****
               * fail to detect it because there would be no EOF condition on
               * the other end of the pipe.)
               */
!             if (slot->args->AH)
!                 DisconnectDatabase(&(slot->args->AH->public));

  #ifdef WIN32
              closesocket(slot->pipeRevRead);
--- 311,318 ----
               * fail to detect it because there would be no EOF condition on
               * the other end of the pipe.)
               */
!             if (slot->AH)
!                 DisconnectDatabase(&(slot->AH->public));

  #ifdef WIN32
              closesocket(slot->pipeRevRead);
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 571,579 ****
                            strerror(errno));

          pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
!         pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
!         pstate->parallelSlot[i].args->AH = NULL;
!         pstate->parallelSlot[i].args->te = NULL;

          /* master's ends of the pipes */
          pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
--- 573,582 ----
                            strerror(errno));

          pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
!         pstate->parallelSlot[i].AH = NULL;
!         pstate->parallelSlot[i].te = NULL;
!         pstate->parallelSlot[i].callback = NULL;
!         pstate->parallelSlot[i].callback_data = NULL;

          /* master's ends of the pipes */
          pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 628,637 ****
               * state information and also clones the database connection which
               * both seem kinda helpful.
               */
!             pstate->parallelSlot[i].args->AH = CloneArchive(AH);

              /* Run the worker ... */
!             RunWorker(pstate->parallelSlot[i].args->AH, pipefd);

              /* We can just exit(0) when done */
              exit(0);
--- 631,640 ----
               * state information and also clones the database connection which
               * both seem kinda helpful.
               */
!             pstate->parallelSlot[i].AH = CloneArchive(AH);

              /* Run the worker ... */
!             RunWorker(pstate->parallelSlot[i].AH, pipefd);

              /* We can just exit(0) when done */
              exit(0);
*************** ParallelBackupEnd(ArchiveHandle *AH, Par
*** 703,722 ****
  }

  /*
!  * Dispatch a job to some free worker (caller must ensure there is one!)
   *
   * te is the TocEntry to be processed, act is the action to be taken on it.
   */
  void
! DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
!                        T_Action act)
  {
      int            worker;
      char       *arg;

!     /* our caller makes sure that at least one worker is idle */
!     worker = GetIdleWorker(pstate);
!     Assert(worker != NO_SLOT);

      /* Construct and send command string */
      arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
--- 706,733 ----
  }

  /*
!  * Dispatch a job to some free worker.
   *
   * te is the TocEntry to be processed, act is the action to be taken on it.
+  * callback is the function to call on completion of the job.
+  *
+  * If no worker is currently available, this will block, and previously
+  * registered callback functions may be called.
   */
  void
! DispatchJobForTocEntry(ArchiveHandle *AH,
!                        ParallelState *pstate,
!                        TocEntry *te,
!                        T_Action act,
!                        ParallelCompletionPtr callback,
!                        void *callback_data)
  {
      int            worker;
      char       *arg;

!     /* Get a worker, waiting if none are idle */
!     while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
!         WaitForWorkers(AH, pstate, WFW_ONE_IDLE);

      /* Construct and send command string */
      arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
*************** DispatchJobForTocEntry(ArchiveHandle *AH
*** 727,740 ****

      /* Remember worker is busy, and which TocEntry it's working on */
      pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
!     pstate->parallelSlot[worker].args->te = te;
  }

  /*
   * Find an idle worker and return its slot number.
   * Return NO_SLOT if none are idle.
   */
! int
  GetIdleWorker(ParallelState *pstate)
  {
      int            i;
--- 738,753 ----

      /* Remember worker is busy, and which TocEntry it's working on */
      pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
!     pstate->parallelSlot[worker].te = te;
!     pstate->parallelSlot[worker].callback = callback;
!     pstate->parallelSlot[worker].callback_data = callback_data;
  }

  /*
   * Find an idle worker and return its slot number.
   * Return NO_SLOT if none are idle.
   */
! static int
  GetIdleWorker(ParallelState *pstate)
  {
      int            i;
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 932,948 ****
   * immediately if there is none available.
   *
   * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then save the resulting status code and switch the worker's state to
!  * WRKR_FINISHED.  Later, caller must call ReapWorkerStatus() to verify
!  * that the status was "OK" and push the worker back to IDLE state.
   *
!  * XXX Rube Goldberg would be proud of this API, but no one else should be.
   *
   * XXX is it worth checking for more than one status message per call?
   * It seems somewhat unlikely that multiple workers would finish at exactly
   * the same time.
   */
! void
  ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
  {
      int            worker;
--- 945,960 ----
   * immediately if there is none available.
   *
   * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then pass the resulting status code to the callback function that was
!  * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
   *
!  * Returns true if we collected a status message, else false.
   *
   * XXX is it worth checking for more than one status message per call?
   * It seems somewhat unlikely that multiple workers would finish at exactly
   * the same time.
   */
! static bool
  ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
  {
      int            worker;
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 956,989 ****
          /* If do_wait is true, we must have detected EOF on some socket */
          if (do_wait)
              exit_horribly(modulename, "a worker process died unexpectedly\n");
!         return;
      }

      /* Process it and update our idea of the worker's status */
      if (messageStartsWith(msg, "OK "))
      {
!         TocEntry   *te = pstate->parallelSlot[worker].args->te;
          char       *statusString;

          if (messageStartsWith(msg, "OK RESTORE "))
          {
              statusString = msg + strlen("OK RESTORE ");
!             pstate->parallelSlot[worker].status =
                  (AH->MasterEndParallelItemPtr)
                  (AH, te, statusString, ACT_RESTORE);
          }
          else if (messageStartsWith(msg, "OK DUMP "))
          {
              statusString = msg + strlen("OK DUMP ");
!             pstate->parallelSlot[worker].status =
                  (AH->MasterEndParallelItemPtr)
                  (AH, te, statusString, ACT_DUMP);
          }
          else
              exit_horribly(modulename,
                            "invalid message received from worker: \"%s\"\n",
                            msg);
!         pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
      }
      else
          exit_horribly(modulename,
--- 968,1006 ----
          /* If do_wait is true, we must have detected EOF on some socket */
          if (do_wait)
              exit_horribly(modulename, "a worker process died unexpectedly\n");
!         return false;
      }

      /* Process it and update our idea of the worker's status */
      if (messageStartsWith(msg, "OK "))
      {
!         ParallelSlot *slot = &pstate->parallelSlot[worker];
!         TocEntry   *te = slot->te;
          char       *statusString;
+         int            status;

          if (messageStartsWith(msg, "OK RESTORE "))
          {
              statusString = msg + strlen("OK RESTORE ");
!             status =
                  (AH->MasterEndParallelItemPtr)
                  (AH, te, statusString, ACT_RESTORE);
+             slot->callback(AH, te, status, slot->callback_data);
          }
          else if (messageStartsWith(msg, "OK DUMP "))
          {
              statusString = msg + strlen("OK DUMP ");
!             status =
                  (AH->MasterEndParallelItemPtr)
                  (AH, te, statusString, ACT_DUMP);
+             slot->callback(AH, te, status, slot->callback_data);
          }
          else
              exit_horribly(modulename,
                            "invalid message received from worker: \"%s\"\n",
                            msg);
!         slot->workerStatus = WRKR_IDLE;
!         slot->te = NULL;
      }
      else
          exit_horribly(modulename,
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 992,1101 ****

      /* Free the string returned from getMessageFromWorker */
      free(msg);
- }
-
- /*
-  * Check to see if any worker is in WRKR_FINISHED state.  If so,
-  * return its command status code into *status, reset it to IDLE state,
-  * and return its slot number.  Otherwise return NO_SLOT.
-  *
-  * This function is executed in the master process.
-  */
- int
- ReapWorkerStatus(ParallelState *pstate, int *status)
- {
-     int            i;

!     for (i = 0; i < pstate->numWorkers; i++)
!     {
!         if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
!         {
!             *status = pstate->parallelSlot[i].status;
!             pstate->parallelSlot[i].status = 0;
!             pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
!             return i;
!         }
!     }
!     return NO_SLOT;
  }

  /*
!  * Wait, if necessary, until we have at least one idle worker.
!  * Reap worker status as necessary to move FINISHED workers to IDLE state.
   *
!  * We assume that no extra processing is required when reaping a finished
!  * command, except for checking that the status was OK (zero).
!  * Caution: that assumption means that this function can only be used in
!  * parallel dump, not parallel restore, because the latter has a more
!  * complex set of rules about handling status.
   *
   * This function is executed in the master process.
   */
  void
! EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
  {
!     int            ret_worker;
!     int            work_status;

!     for (;;)
      {
!         int            nTerm = 0;
!
!         while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
!         {
!             if (work_status != 0)
!                 exit_horribly(modulename, "error processing a parallel work item\n");
!
!             nTerm++;
!         }
!
!         /*
!          * We need to make sure that we have an idle worker before dispatching
!          * the next item. If nTerm > 0 we already have that (quick check).
!          */
!         if (nTerm > 0)
!             return;
!
!         /* explicit check for an idle worker */
!         if (GetIdleWorker(pstate) != NO_SLOT)
!             return;

          /*
!          * If we have no idle worker, read the result of one or more workers
!          * and loop the loop to call ReapWorkerStatus() on them
           */
!         ListenToWorkers(AH, pstate, true);
!     }
! }
!
! /*
!  * Wait for all workers to be idle.
!  * Reap worker status as necessary to move FINISHED workers to IDLE state.
!  *
!  * We assume that no extra processing is required when reaping a finished
!  * command, except for checking that the status was OK (zero).
!  * Caution: that assumption means that this function can only be used in
!  * parallel dump, not parallel restore, because the latter has a more
!  * complex set of rules about handling status.
!  *
!  * This function is executed in the master process.
!  */
! void
! EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
! {
!     int            work_status;

!     if (!pstate || pstate->numWorkers == 1)
!         return;

!     /* Waiting for the remaining worker processes to finish */
!     while (!IsEveryWorkerIdle(pstate))
!     {
!         if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
!             ListenToWorkers(AH, pstate, true);
!         else if (work_status != 0)
!             exit_horribly(modulename,
!                           "error processing a parallel work item\n");
      }
  }

--- 1009,1087 ----

      /* Free the string returned from getMessageFromWorker */
      free(msg);

!     return true;
  }

  /*
!  * Check for status results from workers, waiting if necessary.
   *
!  * Available wait modes are:
!  * WFW_NO_WAIT: reap any available status, but don't block
!  * WFW_GOT_STATUS: wait for at least one more worker to finish
!  * WFW_ONE_IDLE: wait for at least one worker to be idle
!  * WFW_ALL_IDLE: wait for all workers to be idle
!  *
!  * Any received results are passed to MasterEndParallelItemPtr and then
!  * to the callback specified to DispatchJobForTocEntry.
   *
   * This function is executed in the master process.
   */
  void
! WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
  {
!     bool        do_wait = false;

!     /*
!      * In GOT_STATUS mode, always block waiting for a message, since we can't
!      * return till we get something.  In other modes, we don't block the first
!      * time through the loop.
!      */
!     if (mode == WFW_GOT_STATUS)
      {
!         /* Assert that caller knows what it's doing */
!         Assert(!IsEveryWorkerIdle(pstate));
!         do_wait = true;
!     }

+     for (;;)
+     {
          /*
!          * Check for status messages, even if we don't need to block.  We do
!          * not try very hard to reap all available messages, though, since
!          * there's unlikely to be more than one.
           */
!         if (ListenToWorkers(AH, pstate, do_wait))
!         {
!             /*
!              * If we got a message, we are done by definition for GOT_STATUS
!              * mode, and we can also be certain that there's at least one idle
!              * worker.  So we're done in all but ALL_IDLE mode.
!              */
!             if (mode != WFW_ALL_IDLE)
!                 return;
!         }

!         /* Check whether we must wait for new status messages */
!         switch (mode)
!         {
!             case WFW_NO_WAIT:
!                 return;            /* never wait */
!             case WFW_GOT_STATUS:
!                 Assert(false);    /* can't get here, because we waited */
!                 break;
!             case WFW_ONE_IDLE:
!                 if (GetIdleWorker(pstate) != NO_SLOT)
!                     return;
!                 break;
!             case WFW_ALL_IDLE:
!                 if (IsEveryWorkerIdle(pstate))
!                     return;
!                 break;
!         }

!         /* Loop back, and this time wait for something to happen */
!         do_wait = true;
      }
  }

diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index 8d70428..7be0f7c 100644
*** a/src/bin/pg_dump/parallel.h
--- b/src/bin/pg_dump/parallel.h
***************
*** 2,15 ****
   *
   * parallel.h
   *
!  *    Parallel support header file for the pg_dump archiver
   *
   * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
-  *    The author is not responsible for loss or damages that may
-  *    result from its use.
-  *
   * IDENTIFICATION
   *        src/bin/pg_dump/parallel.h
   *
--- 2,12 ----
   *
   * parallel.h
   *
!  *    Parallel support for pg_dump and pg_restore
   *
   * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
   * IDENTIFICATION
   *        src/bin/pg_dump/parallel.h
   *
***************
*** 21,51 ****

  #include "pg_backup_archiver.h"

  typedef enum
  {
-     WRKR_TERMINATED = 0,
      WRKR_IDLE,
      WRKR_WORKING,
!     WRKR_FINISHED
  } T_WorkerStatus;

! /* Arguments needed for a worker process */
! typedef struct ParallelArgs
! {
!     ArchiveHandle *AH;
!     TocEntry   *te;
! } ParallelArgs;
!
! /* State for each parallel activity slot */
  typedef struct ParallelSlot
  {
!     ParallelArgs *args;
!     T_WorkerStatus workerStatus;
!     int            status;
      int            pipeRead;        /* master's end of the pipes */
      int            pipeWrite;
      int            pipeRevRead;    /* child's end of the pipes */
      int            pipeRevWrite;
  #ifdef WIN32
      uintptr_t    hThread;
      unsigned int threadId;
--- 18,70 ----

  #include "pg_backup_archiver.h"

+ /* Function to call in master process on completion of a worker task */
+ typedef void (*ParallelCompletionPtr) (ArchiveHandle *AH,
+                                                    TocEntry *te,
+                                                    int status,
+                                                    void *callback_data);
+
+ /* Wait options for WaitForWorkers */
+ typedef enum
+ {
+     WFW_NO_WAIT,
+     WFW_GOT_STATUS,
+     WFW_ONE_IDLE,
+     WFW_ALL_IDLE
+ } WFW_WaitOption;
+
+ /* Worker process statuses */
  typedef enum
  {
      WRKR_IDLE,
      WRKR_WORKING,
!     WRKR_TERMINATED
  } T_WorkerStatus;

! /*
!  * Per-parallel-worker state of parallel.c.
!  *
!  * Much of this is valid only in the master process (or, on Windows, should
!  * be touched only by the master thread).  But the AH field should be touched
!  * only by workers.  The pipe descriptors are valid everywhere.
!  */
  typedef struct ParallelSlot
  {
!     T_WorkerStatus workerStatus;    /* see enum above */
!
!     /* These fields are valid if workerStatus == WRKR_WORKING: */
!     TocEntry   *te;                /* item being worked on */
!     ParallelCompletionPtr callback;        /* function to call on completion */
!     void       *callback_data;    /* passthru data for it */
!
!     ArchiveHandle *AH;            /* Archive data worker is using */
!
      int            pipeRead;        /* master's end of the pipes */
      int            pipeWrite;
      int            pipeRevRead;    /* child's end of the pipes */
      int            pipeRevWrite;
+
+     /* Child process/thread identity info: */
  #ifdef WIN32
      uintptr_t    hThread;
      unsigned int threadId;
*************** typedef struct ParallelSlot
*** 54,65 ****
  #endif
  } ParallelSlot;

! #define NO_SLOT (-1)
!
  typedef struct ParallelState
  {
!     int            numWorkers;
!     ParallelSlot *parallelSlot;
  } ParallelState;

  #ifdef WIN32
--- 73,83 ----
  #endif
  } ParallelSlot;

! /* Overall state for parallel.c */
  typedef struct ParallelState
  {
!     int            numWorkers;        /* allowed number of workers */
!     ParallelSlot *parallelSlot; /* array of numWorkers slots */
  } ParallelState;

  #ifdef WIN32
*************** extern DWORD mainThreadId;
*** 69,85 ****

  extern void init_parallel_dump_utils(void);

- extern int    GetIdleWorker(ParallelState *pstate);
  extern bool IsEveryWorkerIdle(ParallelState *pstate);
! extern void ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait);
! extern int    ReapWorkerStatus(ParallelState *pstate, int *status);
! extern void EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate);
! extern void EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate);

  extern ParallelState *ParallelBackupStart(ArchiveHandle *AH);
  extern void DispatchJobForTocEntry(ArchiveHandle *AH,
                         ParallelState *pstate,
!                        TocEntry *te, T_Action act);
  extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);

  extern void checkAborting(ArchiveHandle *AH);
--- 87,103 ----

  extern void init_parallel_dump_utils(void);

  extern bool IsEveryWorkerIdle(ParallelState *pstate);
! extern void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate,
!                WFW_WaitOption mode);

  extern ParallelState *ParallelBackupStart(ArchiveHandle *AH);
  extern void DispatchJobForTocEntry(ArchiveHandle *AH,
                         ParallelState *pstate,
!                        TocEntry *te,
!                        T_Action act,
!                        ParallelCompletionPtr callback,
!                        void *callback_data);
  extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);

  extern void checkAborting(ArchiveHandle *AH);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index ad8e132..83e899b 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** static void par_list_remove(TocEntry *te
*** 97,105 ****
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
                     TocEntry *ready_list,
                     ParallelState *pstate);
! static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
!                int worker, int status,
!                ParallelState *pstate);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
--- 97,110 ----
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
                     TocEntry *ready_list,
                     ParallelState *pstate);
! static void mark_dump_job_done(ArchiveHandle *AH,
!                    TocEntry *te,
!                    int status,
!                    void *callback_data);
! static void mark_restore_job_done(ArchiveHandle *AH,
!                       TocEntry *te,
!                       int status,
!                       void *callback_data);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2348,2355 ****
               * If we are in a parallel backup, then we are always the master
               * process.  Dispatch each data-transfer job to a worker.
               */
!             EnsureIdleWorker(AH, pstate);
!             DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
          }
          else
              WriteDataChunksForTocEntry(AH, te);
--- 2353,2360 ----
               * If we are in a parallel backup, then we are always the master
               * process.  Dispatch each data-transfer job to a worker.
               */
!             DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
!                                    mark_dump_job_done, NULL);
          }
          else
              WriteDataChunksForTocEntry(AH, te);
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2358,2366 ****
      /*
       * If parallel, wait for workers to finish.
       */
!     EnsureWorkersFinished(AH, pstate);
  }

  void
  WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
  {
--- 2363,2394 ----
      /*
       * If parallel, wait for workers to finish.
       */
!     if (pstate && pstate->numWorkers > 1)
!         WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
! }
!
!
! /*
!  * Callback function that's invoked in the master process after a step has
!  * been parallel dumped.
!  *
!  * We don't need to do anything except check for worker failure.
!  */
! static void
! mark_dump_job_done(ArchiveHandle *AH,
!                    TocEntry *te,
!                    int status,
!                    void *callback_data)
! {
!     ahlog(AH, 1, "finished item %d %s %s\n",
!           te->dumpId, te->desc, te->tag);
!
!     if (status != 0)
!         exit_horribly(modulename, "worker process failed: exit code %d\n",
!                       status);
  }

+
  void
  WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
  {
*************** static void
*** 3744,3754 ****
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
-     int            work_status;
      bool        skipped_some;
      TocEntry    ready_list;
      TocEntry   *next_work_item;
-     int            ret_child;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

--- 3772,3780 ----
*************** restore_toc_entries_parallel(ArchiveHand
*** 3825,3878 ****

              par_list_remove(next_work_item);

!             DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
          }
          else
          {
              /* at least one child is working and we have nothing ready. */
          }

!         for (;;)
!         {
!             int            nTerm = 0;
!
!             /*
!              * In order to reduce dependencies as soon as possible and
!              * especially to reap the status of workers who are working on
!              * items that pending items depend on, we do a non-blocking check
!              * for ended workers first.
!              *
!              * However, if we do not have any other work items currently that
!              * workers can work on, we do not busy-loop here but instead
!              * really wait for at least one worker to terminate. Hence we call
!              * ListenToWorkers(..., ..., do_wait = true) in this case.
!              */
!             ListenToWorkers(AH, pstate, !next_work_item);
!
!             while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
!             {
!                 nTerm++;
!                 mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
!             }
!
!             /*
!              * We need to make sure that we have an idle worker before
!              * re-running the loop. If nTerm > 0 we already have that (quick
!              * check).
!              */
!             if (nTerm > 0)
!                 break;
!
!             /* if nobody terminated, explicitly check for an idle worker */
!             if (GetIdleWorker(pstate) != NO_SLOT)
!                 break;
!
!             /*
!              * If we have no idle worker, read the result of one or more
!              * workers and loop the loop to call ReapWorkerStatus() on them.
!              */
!             ListenToWorkers(AH, pstate, true);
!         }
      }

      ahlog(AH, 1, "finished main parallel loop\n");
--- 3851,3879 ----

              par_list_remove(next_work_item);

!             DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
!                                    mark_restore_job_done, &ready_list);
          }
          else
          {
              /* at least one child is working and we have nothing ready. */
          }

!         /*
!          * Before dispatching another job, check to see if anything has
!          * finished.  We should check every time through the loop so as to
!          * reduce dependencies as soon as possible.  If we were unable to
!          * dispatch any job this time through, wait until some worker finishes
!          * (and, hopefully, unblocks some pending item).  If we did dispatch
!          * something, continue as soon as there's at least one idle worker.
!          * Note that in either case, there's guaranteed to be at least one
!          * idle worker when we return to the top of the loop.  This ensures we
!          * won't block inside DispatchJobForTocEntry, which would be
!          * undesirable: we'd rather postpone dispatching until we see what's
!          * been unblocked by finished jobs.
!          */
!         WaitForWorkers(AH, pstate,
!                        next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
      }

      ahlog(AH, 1, "finished main parallel loop\n");
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3999,4007 ****
          int            count = 0;

          for (k = 0; k < pstate->numWorkers; k++)
!             if (pstate->parallelSlot[k].args->te != NULL &&
!                 pstate->parallelSlot[k].args->te->section == SECTION_DATA)
                  count++;
          if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
              pref_non_data = false;
      }
--- 4000,4010 ----
          int            count = 0;

          for (k = 0; k < pstate->numWorkers; k++)
!         {
!             if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING &&
!                 pstate->parallelSlot[k].te->section == SECTION_DATA)
                  count++;
+         }
          if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
              pref_non_data = false;
      }
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4018,4030 ****
           * that a currently running item also needs lock on, or vice versa. If
           * so, we don't want to schedule them together.
           */
!         for (i = 0; i < pstate->numWorkers && !conflicts; i++)
          {
              TocEntry   *running_te;

              if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
                  continue;
!             running_te = pstate->parallelSlot[i].args->te;

              if (has_lock_conflicts(te, running_te) ||
                  has_lock_conflicts(running_te, te))
--- 4021,4033 ----
           * that a currently running item also needs lock on, or vice versa. If
           * so, we don't want to schedule them together.
           */
!         for (i = 0; i < pstate->numWorkers; i++)
          {
              TocEntry   *running_te;

              if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
                  continue;
!             running_te = pstate->parallelSlot[i].te;

              if (has_lock_conflicts(te, running_te) ||
                  has_lock_conflicts(running_te, te))
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4065,4074 ****
   * our work is finished, the master process will assign us a new work item.
   */
  int
! parallel_restore(ParallelArgs *args)
  {
-     ArchiveHandle *AH = args->AH;
-     TocEntry   *te = args->te;
      int            status;

      _doSetFixedOutputState(AH);
--- 4068,4075 ----
   * our work is finished, the master process will assign us a new work item.
   */
  int
! parallel_restore(ArchiveHandle *AH, TocEntry *te)
  {
      int            status;

      _doSetFixedOutputState(AH);
*************** parallel_restore(ParallelArgs *args)
*** 4085,4106 ****


  /*
!  * Housekeeping to be done after a step has been parallel restored.
   *
!  * Clear the appropriate slot, free all the extra memory we allocated,
!  * update status, and reduce the dependency count of any dependent items.
   */
  static void
! mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
!                int worker, int status,
!                ParallelState *pstate)
  {
!     TocEntry   *te = NULL;
!
!     te = pstate->parallelSlot[worker].args->te;
!
!     if (te == NULL)
!         exit_horribly(modulename, "could not find slot of finished worker\n");

      ahlog(AH, 1, "finished item %d %s %s\n",
            te->dumpId, te->desc, te->tag);
--- 4086,4103 ----


  /*
!  * Callback function that's invoked in the master process after a step has
!  * been parallel restored.
   *
!  * Update status and reduce the dependency count of any dependent items.
   */
  static void
! mark_restore_job_done(ArchiveHandle *AH,
!                       TocEntry *te,
!                       int status,
!                       void *callback_data)
  {
!     TocEntry   *ready_list = (TocEntry *) callback_data;

      ahlog(AH, 1, "finished item %d %s %s\n",
            te->dumpId, te->desc, te->tag);
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 4aa7190..c3e64cb 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef z_stream *z_streamp;
*** 111,117 ****

  typedef struct _archiveHandle ArchiveHandle;
  typedef struct _tocEntry TocEntry;
- struct ParallelArgs;
  struct ParallelState;

  #define READ_ERROR_EXIT(fd) \
--- 111,116 ----
*************** struct _tocEntry
*** 372,378 ****
      int            nLockDeps;        /* number of such dependencies */
  };

! extern int    parallel_restore(struct ParallelArgs *args);
  extern void on_exit_close_archive(Archive *AHX);

  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
pg_attribute_printf(3,4); 
--- 371,377 ----
      int            nLockDeps;        /* number of such dependencies */
  };

! extern int    parallel_restore(ArchiveHandle *AH, TocEntry *te);
  extern void on_exit_close_archive(Archive *AHX);

  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
pg_attribute_printf(3,4); 
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 66329dc..c4f487a 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** _WorkerJobRestoreCustom(ArchiveHandle *A
*** 820,832 ****
       */
      const int    buflen = 64;
      char       *buf = (char *) pg_malloc(buflen);
-     ParallelArgs pargs;
      int            status;

!     pargs.AH = AH;
!     pargs.te = te;
!
!     status = parallel_restore(&pargs);

      snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
               status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
--- 820,828 ----
       */
      const int    buflen = 64;
      char       *buf = (char *) pg_malloc(buflen);
      int            status;

!     status = parallel_restore(AH, te);

      snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
               status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 27c6190..129d761 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** _WorkerJobRestoreDirectory(ArchiveHandle
*** 838,850 ****
       */
      const int    buflen = 64;
      char       *buf = (char *) pg_malloc(buflen);
-     ParallelArgs pargs;
      int            status;

!     pargs.AH = AH;
!     pargs.te = te;
!
!     status = parallel_restore(&pargs);

      snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
               status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
--- 838,846 ----
       */
      const int    buflen = 64;
      char       *buf = (char *) pg_malloc(buflen);
      int            status;

!     status = parallel_restore(AH, te);

      snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
               status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Emre Hasegeli
Дата:
Сообщение: regexp_match() returning text
Следующее
От: Andreas Seltenreich
Дата:
Сообщение: Re: [sqlsmith] PANIC: failed to add BRIN tuple