Обсуждение: parallel pg_restore - WIP patch

Поиск
Список
Период
Сортировка

parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:
Attached is my WIP patch for parallel pg_restore. It's still very rough,
but seems to work.

Anyone who can test this with highend equipment would be helping some.

cheers

andrew
Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h    13 Apr 2008 03:49:21 -0000    1.47
--- pg_backup.h    23 Sep 2008 18:10:58 -0000
***************
*** 123,128 ****
--- 123,130 ----
      int            suppressDumpWarnings;    /* Suppress output of WARNING entries
                                           * to stderr */
      bool        single_txn;
+     int         number_of_threads;
+     bool        truncate_before_load;

      bool       *idWanted;        /* array showing which dump IDs to emit */
  } RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
  extern void CloseArchive(Archive *AH);

  extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);

  /* Open an existing archive */
  extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c    5 Sep 2008 23:53:42 -0000    1.158
--- pg_backup_archiver.c    23 Sep 2008 18:10:59 -0000
***************
*** 27,38 ****
--- 27,50 ----

  #include <unistd.h>

+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
  #ifdef WIN32
  #include <io.h>
  #endif

  #include "libpq/libpq-fs.h"

+ typedef struct _parallel_slot
+ {
+     pid_t   pid;
+     TocEntry *te;
+     DumpId  dumpId;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)

  const char *progname;

***************
*** 70,76 ****
--- 82,99 ----
  static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
  static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
  static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+

+ static ArchiveHandle *GAH;

  /*
   *    Wrapper functions.
***************
*** 125,137 ****

  /* Public */
  void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;
-     bool        defnDumped;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
--- 148,523 ----

  /* Public */
  void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+     ArchiveHandle *AH = (ArchiveHandle *) AHX;
+     ParallelSlot  *slots;
+     int next_slot;
+     TocEntry *next_work_item = NULL;
+     int work_status;
+     pid_t ret_child;
+     int n_slots = ropt->number_of_threads;
+     TocEntry *te;
+     teReqs    reqs;
+
+
+     /*     AH->debugLevel = 99; */
+     /* some routines that use ahlog() don't get passed AH */
+     GAH = AH;
+
+     ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+     slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+     AH->ropt = ropt;
+
+     if (ropt->create)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --create\n");
+
+     if (ropt->dropSchema)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --clean\n");
+
+     if (!ropt->useDB)
+         die_horribly(AH,modulename,
+                      "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename,
+                              "cannot restore from compressed archive (compression not supported in this
installation)\n");
+         }
+     }
+ #endif
+
+     ahlog(AH, 1, "connecting to database for restore\n");
+     if (AH->version < K_VERS_1_3)
+         die_horribly(AH, modulename,
+                      "direct database connections are not supported in pre-1.3 archives\n");
+
+     /* XXX Should get this from the archive */
+     AHX->minRemoteVersion = 070100;
+     AHX->maxRemoteVersion = 999999;
+
+     /* correct dependency counts in case we're doing a partial restore */
+     if (ropt->idWanted == NULL)
+         InitDummyWantedList(AHX,ropt);
+     _fix_dependency_counts(AH);
+
+     /*
+      * Since we're talking to the DB directly, don't send comments since they
+      * obscure SQL when displaying errors
+      */
+     AH->noTocComments = 1;
+
+     /* Do all the early stuff in a single connection in the parent.
+      * There's no great point in running it in parallel and it will actually
+      * run faster in a single connection because we avoid all the connection
+      * and setup overhead, including the 0.5s sleep below.
+      */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     while((next_work_item = get_next_work_item(AH)) != NULL)
+     {
+         /* XXX need to improve this test in case there is no table data */
+         /* need to test for indexes, FKs, PK, Unique, etc */
+         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+             break;
+         (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+         next_work_item->prestored = true;
+
+         _reduce_dependencies(AH,next_work_item);
+     }
+
+
+     /*
+      * now close parent connection in prep for parallel step.
+      */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+     /* main parent loop */
+
+     ahlog(AH,1,"entering main loop\n");
+
+     while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+            (work_is_being_done(slots,n_slots)))
+     {
+         if (next_work_item != NULL &&
+             ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+         {
+             /* there is work still to do and a worker slot available */
+
+             pid_t child;
+
+             next_work_item->prestored = true;
+
+             child = fork();
+             if (child == 0)
+             {
+                 prestore(AH,next_work_item);
+                 /* should not happen ... we expect prestore to exit */
+                 exit(1);
+             }
+             else if (child > 0)
+             {
+                 slots[next_slot].pid = child;
+                 slots[next_slot].te = next_work_item;
+                 slots[next_slot].dumpId = next_work_item->dumpId;
+             }
+             else
+             {
+                 /* XXX fork error - handle it! */
+             }
+             /* delay just long enough betweek forks to give the catalog some
+              * breathing space. Without this sleep I got
+              * "tuple concurrently updated" errors.
+              */
+             pg_usleep(500000);
+             continue; /* in case the slots are not yet full */
+         }
+         /* if we get here there must be work being done */
+         ret_child = wait(&work_status);
+
+         if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+         {
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+         {
+             int i;
+
+             for (i = 0; i < n_slots; i++)
+             {
+                 if (slots[i].pid == ret_child)
+                     _inhibit_data_for_failed_table(AH, slots[i].te);
+                 break;
+             }
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else
+         {
+             /* XXX something went wrong - deal with it */
+         }
+     }
+
+     /*
+      * now process the ACLs - no need to do this in parallel
+      */
+
+     /* reconnect from parent */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Scan TOC to output ownership commands and ACLs
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         AH->currentTE = te;
+
+         /* Work out what, if anything, we want from this entry */
+         reqs = _tocEntryRequired(te, ropt, true);
+
+         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+         {
+             ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+                   te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, true);
+         }
+     }
+
+     /* clean up */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+     ahlog(GAH,1,"is work being done?\n");
+     while(n_slots--)
+     {
+         if (slot->pid > 0)
+             return true;
+         slot++;
+     }
+     ahlog(GAH,1,"work is not being done\n");
+     return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == 0)
+         {
+             ahlog(GAH,1,"available slots is %d\n",i);
+             return i;
+         }
+     }
+     ahlog(GAH,1,"No slot available\n");
+     return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+     TocEntry *te;
+     teReqs    reqs;
+
+     /* just search from the top of the queue until we find an available item.
+      * Note that the queue isn't reordered in the current implementation. If
+      * we ever do reorder it, then certain code that processes entries from the
+      * current item to the end of the queue will probably need to be
+      * re-examined.
+      */
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         if (!te->prestored && te->nDeps < 1)
+         {
+             /* make sure it's not an ACL */
+             reqs = _tocEntryRequired (te, AH->ropt, false);
+             if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+             {
+                 ahlog(AH,1,"next item is %d\n",te->dumpId);
+                 return te;
+             }
+         }
+     }
+     ahlog(AH,1,"No item ready\n");
+     return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+     RestoreOptions *ropt = AH->ropt;
+     int retval;
+
+     ConnectDatabase((Archive *)AH, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     retval = _restore_one_te(AH, te, ropt, true);
+
+     PQfinish(AH->connection);
+     exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+                ParallelSlot *slots, int n_slots)
+ {
+
+     TocEntry *te = NULL;
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == worker)
+         {
+             te = slots[i].te;
+             slots[i].pid = 0;
+             slots[i].te = NULL;
+             slots[i].dumpId = 0;
+             break;
+         }
+     }
+
+     /* Assert (te != NULL); */
+
+     _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+  * Make sure the head of each dependency chain is a live item
+  *
+  * Once this is established the property will be maintained by
+  * _reduce_dependencies called as items are done.
+  */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+     TocEntry * te;
+     RestoreOptions * ropt = AH->ropt;
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+     DumpId item = te->dumpId;
+     RestoreOptions * ropt = AH->ropt;
+     int i;
+
+     for (te = te->next; te != AH->toc; te = te->next)
+     {
+         for (i = 0; i < te->nDeps; i++)
+             if (te->dependencies[i] == item)
+             {
+                 te->nDeps = te->nDeps - 1;
+                 /*
+                  * If this item won't in fact be done, and is now  at
+                  * 0 dependency count, we pretend it's been done and
+                  * reduce the dependency counts of all the things that
+                  * depend on it, by a recursive call
+                  */
+                 if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1])
+                     _reduce_dependencies(AH,te);
+
+                 break;
+             }
+     }
+
+ }
+
+
+ /* Public */
+ void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 557,576 ----
          AH->noTocComments = 1;
      }

+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in
thisinstallation)\n"); 
+         }
+     }
+ #endif
+
      /*
       * Work out if we have an implied data-only restore. This can happen if
       * the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         AH->currentTE = te;
!
!         /* Work out what, if anything, we want from this entry */
!         reqs = _tocEntryRequired(te, ropt, false);
!
!         /* Dump any relevant dump warnings to stderr */
!         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
!         {
!             if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->defn);
!             else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
!         }
!
!         defnDumped = false;
!
!         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
!         {
!             ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
!             _printTocEntry(AH, te, ropt, false, false);
!             defnDumped = true;
!
!             /*
!              * If we could not create a table and --no-data-for-failed-tables
!              * was given, ignore the corresponding TABLE DATA
!              */
!             if (ropt->noDataForFailedTables &&
!                 AH->lastErrorTE == te &&
!                 strcmp(te->desc, "TABLE") == 0)
!             {
!                 TocEntry   *tes;
!
!                 ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
!                       te->tag);
!
!                 for (tes = te->next; tes != AH->toc; tes = tes->next)
!                 {
!                     if (strcmp(tes->desc, "TABLE DATA") == 0 &&
!                         strcmp(tes->tag, te->tag) == 0 &&
!                         strcmp(tes->namespace ? tes->namespace : "",
!                                te->namespace ? te->namespace : "") == 0)
!                     {
!                         /* mark it unwanted */
!                         ropt->idWanted[tes->dumpId - 1] = false;
!                         break;
!                     }
!                 }
!             }
!
!             /* If we created a DB, connect to it... */
!             if (strcmp(te->desc, "DATABASE") == 0)
!             {
!                 ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
!                 _reconnectToDB(AH, te->tag);
!             }
!         }
!
!         /*
!          * If we have a data component, then process it
!          */
!         if ((reqs & REQ_DATA) != 0)
!         {
!             /*
!              * hadDumper will be set if there is genuine data component for
!              * this node. Otherwise, we need to check the defn field for
!              * statements that need to be executed in data-only restores.
!              */
!             if (te->hadDumper)
!             {
!                 /*
!                  * If we can output the data, then restore it.
!                  */
!                 if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
!                 {
! #ifndef HAVE_LIBZ
!                     if (AH->compression != 0)
!                         die_horribly(AH, modulename, "cannot restore from compressed archive (compression not
supportedin this installation)\n"); 
! #endif
!
!                     _printTocEntry(AH, te, ropt, true, false);
!
!                     if (strcmp(te->desc, "BLOBS") == 0 ||
!                         strcmp(te->desc, "BLOB COMMENTS") == 0)
!                     {
!                         ahlog(AH, 1, "restoring %s\n", te->desc);
!
!                         _selectOutputSchema(AH, "pg_catalog");
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!                     }
!                     else
!                     {
!                         _disableTriggersIfNecessary(AH, te, ropt);
!
!                         /* Select owner and schema as necessary */
!                         _becomeOwner(AH, te);
!                         _selectOutputSchema(AH, te->namespace);
!
!                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
!                               te->tag);
!
!                         /*
!                          * If we have a copy statement, use it. As of V1.3,
!                          * these are separate to allow easy import from
!                          * withing a database connection. Pre 1.3 archives can
!                          * not use DB connections and are sent to output only.
!                          *
!                          * For V1.3+, the table data MUST have a copy
!                          * statement so that we can go into appropriate mode
!                          * with libpq.
!                          */
!                         if (te->copyStmt && strlen(te->copyStmt) > 0)
!                         {
!                             ahprintf(AH, "%s", te->copyStmt);
!                             AH->writingCopyData = true;
!                         }
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!
!                         AH->writingCopyData = false;
!
!                         _enableTriggersIfNecessary(AH, te, ropt);
!                     }
!                 }
!             }
!             else if (!defnDumped)
!             {
!                 /* If we haven't already dumped the defn part, do so now */
!                 ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!                 _printTocEntry(AH, te, ropt, false, false);
!             }
!         }
!     }                            /* end loop over TOC entries */

      /*
       * Scan TOC again to output ownership commands and ACLs
--- 670,677 ----
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         (void) _restore_one_te(AH, te, ropt, false);
!     }

      /*
       * Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 719,899 ----
      }
  }

+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+                 RestoreOptions *ropt, bool is_parallel)
+ {
+     teReqs        reqs;
+     bool        defnDumped;
+     int         retval = 0;
+
+     AH->currentTE = te;
+
+     /* Work out what, if anything, we want from this entry */
+     reqs = _tocEntryRequired(te, ropt, false);
+
+     /* Dump any relevant dump warnings to stderr */
+     if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+     {
+         if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+         else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+     }
+
+     defnDumped = false;
+
+     if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+     {
+         ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+         _printTocEntry(AH, te, ropt, false, false);
+         defnDumped = true;
+
+         /*
+          * If we could not create a table and --no-data-for-failed-tables
+          * was given, ignore the corresponding TABLE DATA
+          *
+          * For the parallel case this must be done in the parent, so we just
+          * set a return value.
+          */
+         if (ropt->noDataForFailedTables &&
+             AH->lastErrorTE == te &&
+             strcmp(te->desc, "TABLE") == 0)
+         {
+             if (is_parallel)
+                 retval = 1;
+             else
+                 _inhibit_data_for_failed_table(AH,te);
+         }
+
+         /* If we created a DB, connect to it... */
+         /* won't happen in parallel restore */
+         if (strcmp(te->desc, "DATABASE") == 0)
+         {
+             ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+             _reconnectToDB(AH, te->tag);
+         }
+     }
+
+     /*
+      * If we have a data component, then process it
+      */
+     if ((reqs & REQ_DATA) != 0)
+     {
+         /*
+          * hadDumper will be set if there is genuine data component for
+          * this node. Otherwise, we need to check the defn field for
+          * statements that need to be executed in data-only restores.
+          */
+         if (te->hadDumper)
+         {
+             /*
+              * If we can output the data, then restore it.
+              */
+             if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+             {
+                 _printTocEntry(AH, te, ropt, true, false);
+
+                 if (strcmp(te->desc, "BLOBS") == 0 ||
+                     strcmp(te->desc, "BLOB COMMENTS") == 0)
+                 {
+                     ahlog(AH, 1, "restoring %s\n", te->desc);
+
+                     _selectOutputSchema(AH, "pg_catalog");
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+                 }
+                 else
+                 {
+                     _disableTriggersIfNecessary(AH, te, ropt);
+
+                     /* Select owner and schema as necessary */
+                     _becomeOwner(AH, te);
+                     _selectOutputSchema(AH, te->namespace);
+
+                     ahlog(AH, 1, "restoring data for table \"%s\"\n",
+                           te->tag);
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             StartTransaction(AH);
+                         else
+                             ahprintf(AH, "BEGIN;\n\n");
+
+                         ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+                                  fmtId(te->tag));                    }
+
+                     /*
+                      * If we have a copy statement, use it. As of V1.3,
+                      * these are separate to allow easy import from
+                      * withing a database connection. Pre 1.3 archives can
+                      * not use DB connections and are sent to output only.
+                      *
+                      * For V1.3+, the table data MUST have a copy
+                      * statement so that we can go into appropriate mode
+                      * with libpq.
+                      */
+                     if (te->copyStmt && strlen(te->copyStmt) > 0)
+                     {
+                         ahprintf(AH, "%s", te->copyStmt);
+                         AH->writingCopyData = true;
+                     }
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+                     AH->writingCopyData = false;
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             CommitTransaction(AH);
+                         else
+                             ahprintf(AH, "COMMIT;\n\n");
+                     }
+
+
+                     _enableTriggersIfNecessary(AH, te, ropt);
+                 }
+             }
+         }
+         else if (!defnDumped)
+         {
+             /* If we haven't already dumped the defn part, do so now */
+             ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, false);
+         }
+     }
+
+     return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+     TocEntry   *tes;
+     RestoreOptions *ropt = AH->ropt;
+
+     ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+           te->tag);
+
+     for (tes = te->next; tes != AH->toc; tes = tes->next)
+     {
+         if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+             strcmp(tes->tag, te->tag) == 0 &&
+             strcmp(tes->namespace ? tes->namespace : "",
+                    te->namespace ? te->namespace : "") == 0)
+         {
+             /* mark it unwanted */
+             ropt->idWanted[tes->dumpId - 1] = false;
+
+             _reduce_dependencies(AH, tes);
+             break;
+         }
+     }
+ }
+
  /*
   * Allocate a new RestoreOptions block.
   * This is mainly so we can initialize it, but also for future expansion,
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h    7 Nov 2007 12:24:24 -0000    1.76
--- pg_backup_archiver.h    23 Sep 2008 18:10:59 -0000
***************
*** 231,236 ****
--- 231,237 ----
      char       *archdbname;        /* DB name *read* from archive */
      bool        requirePassword;
      PGconn       *connection;
+     char       *cachepw;
      int            connectToDB;    /* Flag to indicate if direct DB connection is
                                   * required */
      bool        writingCopyData;    /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 285,291 ----
      DumpId        dumpId;
      bool        hadDumper;        /* Archiver was passed a dumper routine (used
                                   * in restore) */
+     bool        prestored;      /* keep track of parallel restore */
      char       *tag;            /* index tag */
      char       *namespace;        /* null or empty string if not in a schema */
      char       *tablespace;        /* null if not in a tablespace; empty string
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c    16 Aug 2008 02:25:06 -0000    1.80
--- pg_backup_db.c    23 Sep 2008 18:10:59 -0000
***************
*** 206,220 ****
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
      else
          AH->requirePassword = false;

      /*
       * Start the connection.  Loop until we have a password if requested by
--- 206,226 ----
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
+     else if (reqPwd)
+     {
+         password = AH->cachepw;
+     }
      else
+     {
          AH->requirePassword = false;
+     }

      /*
       * Start the connection.  Loop until we have a password if requested by
***************
*** 241,247 ****
      } while (new_pass);

      if (password)
!         free(password);

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 247,253 ----
      } while (new_pass);

      if (password)
!         AH->cachepw = password;

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c    13 Apr 2008 03:49:22 -0000    1.88
--- pg_restore.c    23 Sep 2008 18:10:59 -0000
***************
*** 78,83 ****
--- 78,84 ----
      static int    no_data_for_failed_tables = 0;
      static int  outputNoTablespaces = 0;
      static int    use_setsessauth = 0;
+     static int  truncate_before_load = 0;

      struct option cmdopts[] = {
          {"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
          {"ignore-version", 0, NULL, 'i'},
          {"index", 1, NULL, 'I'},
          {"list", 0, NULL, 'l'},
+         {"multi-thread",1,NULL,'m'},
          {"no-privileges", 0, NULL, 'x'},
          {"no-acl", 0, NULL, 'x'},
          {"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
          {"disable-triggers", no_argument, &disable_triggers, 1},
          {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
          {"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+         {"truncate-before-load", no_argument, &truncate_before_load, 1},
          {"use-set-session-authorization", no_argument, &use_setsessauth, 1},

          {NULL, 0, NULL, 0}
***************
*** 139,145 ****
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
--- 142,148 ----
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
***************
*** 182,187 ****
--- 185,194 ----
                  opts->tocFile = strdup(optarg);
                  break;

+             case 'm':
+                 opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+                 break;
+
              case 'n':            /* Dump data for this schema only */
                  opts->schemaNames = strdup(optarg);
                  break;
***************
*** 262,268 ****
                  break;

              case 0:
!                 /* This covers the long options equivalent to -X xxx. */
                  break;

              case '1':            /* Restore data in a single transaction */
--- 269,278 ----
                  break;

              case 0:
!                 /*
!                  * This covers the long options without a short equivalent,
!                  * including those equivalent to -X xxx.
!                  */
                  break;

              case '1':            /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
      opts->noDataForFailedTables = no_data_for_failed_tables;
      opts->noTablespace = outputNoTablespaces;
      opts->use_setsessauth = use_setsessauth;
+     opts->truncate_before_load = truncate_before_load;
+
+     if (opts->single_txn)
+     {
+         if (opts->number_of_threads > 1)
+         {
+             write_msg(NULL, "single transaction not compatible with multi-threading");
+             exit(1);
+         }
+         else if (opts->truncate_before_load)
+         {
+             write_msg(NULL, "single transaction not compatible with truncate-before-load");
+             exit(1);
+         }
+     }

      if (opts->formatName)
      {
***************
*** 330,335 ****
--- 355,362 ----

      AH = OpenArchive(inputFileSpec, opts->format);

+     /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
      /* Let the archiver know how noisy to be */
      AH->verbose = opts->verbose;

***************
*** 351,356 ****
--- 378,385 ----

      if (opts->tocSummary)
          PrintTOCSummary(AH, opts);
+     else if (opts->number_of_threads > 1)
+         RestoreArchiveParallel(AH, opts);
      else
          RestoreArchive(AH, opts);


Re: parallel pg_restore - WIP patch

От
Stefan Kaltenbrunner
Дата:
Andrew Dunstan wrote:
> 
> Attached is my WIP patch for parallel pg_restore. It's still very rough, 
> but seems to work.
> 
> Anyone who can test this with highend equipment would be helping some.

tried playing with this(on a 22Gb compressed dump using 4 connections) 
but it does not seem to work at all for me:

pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] could not uncompress data: invalid block type
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] could not uncompress data: invalid stored 
block lengths
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] could not uncompress data: invalid 
distance too far back
pg_restore: [custom archiver] could not uncompress data: invalid 
distances set
pg_restore: [custom archiver] could not uncompress data: invalid code 
lengths set
pg_restore: [custom archiver] could not uncompress data: incorrect data 
check
pg_restore: [custom archiver] could not uncompress data: invalid code 
lengths set
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] out of memory
pg_restore: [custom archiver] could not uncompress data: invalid 
literal/length code
pg_restore: [custom archiver] could not uncompress data: invalid 
literal/length code
pg_restore: [custom archiver] could not uncompress data: invalid block type

each pg_restore process seem to eat a few gigabytes of memory in a few 
seconds.


Stefan


Re: parallel pg_restore - WIP patch

От
Russell Smith
Дата:
Hi,

As I'm interested in this topic, I thought I'd take a look at the
patch.  I have no capability to test it on high end hardware but did
some basic testing on my workstation and basic review of the patch.

I somehow had the impression that instead of creating a new connection
for each restore item we would create the processes at the start and
then send them the dumpId's they should be restoring.  That would allow
the controller to batch dumpId's together and expect the worker to
process them in a transaction.  But this is probably just an idea I
created in my head.

Do we know why we experience "tuple concurrently updated" errors if we
spawn thread too fast?

I completed some test restores using the pg_restore from head with the
patch applied.  The dump was a custom dump created with pg 8.2 and
restored to an 8.2 database.  To confirm this would work, I completed a
restore using the standard single threaded mode.   The schema restore
successfully.  The only errors reported involved non-existent roles.

When I attempt to restore using parallel restore I get out of memory
errors reported from _PrintData.   The code returning the error is;

_PrintData(...   while (blkLen != 0)   {       if (blkLen + 1 > ctx->inSize)       {           free(ctx->zlibIn);
   ctx->zlibIn = NULL;           ctx->zlibIn = (char *) malloc(blkLen + 1);           if (!ctx->zlibIn)
die_horribly(AH,modulename, " out of memory\n");
 
           ctx->inSize = blkLen + 1;           in = ctx->zlibIn;       }


It appears from my debugging and looking at the code that in _PrintData;   lclContext *ctx = (lclContext *)
AH->formatData;

the memory context is shared across all threads.  Which means that it's
possible the memory contexts are stomping on each other.  My GDB skills
are now up to being able to reproduce this in a gdb session as there are
forks going on all over the place.  And if you process them in a serial
fashion, there aren't any errors.  I'm not sure of the fix for this. 
But in a parallel environment it doesn't seem possible to store the
memory context in the AH.

I also receive messages saying "pg_restore: [custom archiver] could not
read from input file: end of file".  I have not investigated these
further as my current guess is they are linked to the out of memory error.

Given I ran into this error at my first testing attempt  I haven't
evaluated much else at this point in time.  Now all this could be
because I'm using the 8.2 archive, but it works fine in single restore
mode.  The dump file is about 400M compressed and an entire archive
schema was removed from the restore path with a custom restore list.

Command line used;  PGPORT=5432 ./pg_restore -h /var/run/postgresql -m4
--truncate-before-load -v -d tt2 -L tt.list
/home/mr-russ/pg-index-test/timetable.pgdump 2> log.txt

I sent the log and this email originally to the list, but I think the attachment was too large, so I've resent without
anyattachements.  Since my initial testing, Stefan has confirmed the problem I am having.
 

If you have any questions, would like me to run other tests or anything,
feel free to contact me.

Regards

Russell



Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Russell Smith wrote:
> Hi,
>
> As I'm interested in this topic, I thought I'd take a look at the
> patch.  I have no capability to test it on high end hardware but did
> some basic testing on my workstation and basic review of the patch.
>
> I somehow had the impression that instead of creating a new connection
> for each restore item we would create the processes at the start and
> then send them the dumpId's they should be restoring.  That would allow
> the controller to batch dumpId's together and expect the worker to
> process them in a transaction.  But this is probably just an idea I
> created in my head.
>   

Yes it is. To do that I would have to invent a protocol for talking to 
the workers, etc, and there is not the slightest chance I would get that 
done by November.
And I don't see the virtue in processing them all in a transaction. I've 
provided a much simpler means of avoiding WAL logging of the COPY.

> Do we know why we experience "tuple concurrently updated" errors if we
> spawn thread too fast?
>   

No. That's an open item.
> I completed some test restores using the pg_restore from head with the
> patch applied.  The dump was a custom dump created with pg 8.2 and
> restored to an 8.2 database.  To confirm this would work, I completed a
> restore using the standard single threaded mode.   The schema restore
> successfully.  The only errors reported involved non-existent roles.
>
> When I attempt to restore using parallel restore I get out of memory
> errors reported from _PrintData.   The code returning the error is;
>
> _PrintData(...
>     while (blkLen != 0)
>     {
>         if (blkLen + 1 > ctx->inSize)
>         {
>             free(ctx->zlibIn);
>             ctx->zlibIn = NULL;
>             ctx->zlibIn = (char *) malloc(blkLen + 1);
>             if (!ctx->zlibIn)
>                 die_horribly(AH, modulename, " out of memory\n");
>
>             ctx->inSize = blkLen + 1;
>             in = ctx->zlibIn;
>         }
>
>
> It appears from my debugging and looking at the code that in _PrintData;
>     lclContext *ctx = (lclContext *) AH->formatData;
>
> the memory context is shared across all threads.  Which means that it's
> possible the memory contexts are stomping on each other.  My GDB skills
> are now up to being able to reproduce this in a gdb session as there are
> forks going on all over the place.  And if you process them in a serial
> fashion, there aren't any errors.  I'm not sure of the fix for this. 
> But in a parallel environment it doesn't seem possible to store the
> memory context in the AH.
>   


There are no threads, hence nothing is shared. fork() create s new 
process, not a new thread, and all they share are file descriptors.


> I also receive messages saying "pg_restore: [custom archiver] could not
> read from input file: end of file".  I have not investigated these
> further as my current guess is they are linked to the out of memory error.
>
> Given I ran into this error at my first testing attempt  I haven't
> evaluated much else at this point in time.  Now all this could be
> because I'm using the 8.2 archive, but it works fine in single restore
> mode.  The dump file is about 400M compressed and an entire archive
> schema was removed from the restore path with a custom restore list.
>
> Command line used;  PGPORT=5432 ./pg_restore -h /var/run/postgresql -m4
> --truncate-before-load -v -d tt2 -L tt.list
> /home/mr-russ/pg-index-test/timetable.pgdump 2> log.txt
>
> I've attached the log.txt file so you can review the errors that I saw. 
> I have adjusted the "out of memory" error to include a number to work
> out which one was being triggered.  So you'll see "5 out of memory" in
> the log file, which corresponds to the code above.
>   

However, there does seem to be something odd happening with the 
compression lib, which I will investigate. Thanks for the report.

cheers

andrew



Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Stefan Kaltenbrunner wrote:
> Andrew Dunstan wrote:
>>
>> Attached is my WIP patch for parallel pg_restore. It's still very 
>> rough, but seems to work.
>>
>> Anyone who can test this with highend equipment would be helping some.
>
> tried playing with this(on a 22Gb compressed dump using 4 connections) 
> but it does not seem to work at all for me:
>
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] could not uncompress data: invalid block 
> type
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] could not uncompress data: invalid 
> stored block lengths
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] could not uncompress data: invalid 
> distance too far back
> pg_restore: [custom archiver] could not uncompress data: invalid 
> distances set
> pg_restore: [custom archiver] could not uncompress data: invalid code 
> lengths set
> pg_restore: [custom archiver] could not uncompress data: incorrect 
> data check
> pg_restore: [custom archiver] could not uncompress data: invalid code 
> lengths set
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] out of memory
> pg_restore: [custom archiver] could not uncompress data: invalid 
> literal/length code
> pg_restore: [custom archiver] could not uncompress data: invalid 
> literal/length code
> pg_restore: [custom archiver] could not uncompress data: invalid block 
> type
>
> each pg_restore process seem to eat a few gigabytes of memory in a few 
> seconds.


Ouch. Ok, Thanks for the report. I will investigate.

cheers

andrew



Re: parallel pg_restore - WIP patch

От
Russell Smith
Дата:
Andrew Dunstan wrote:
>> Do we know why we experience "tuple concurrently updated" errors if we
>> spawn thread too fast?
>>   
>
> No. That's an open item.

Okay, I'll see if I can have a little more of a look into it.  No
promises as the restore the restore isn't playing nicely.
>
>>
>> the memory context is shared across all threads.  Which means that it's
>> possible the memory contexts are stomping on each other.  My GDB skills
>> are now up to being able to reproduce this in a gdb session as there are
>> forks going on all over the place.  And if you process them in a serial
>> fashion, there aren't any errors.  I'm not sure of the fix for this.
>> But in a parallel environment it doesn't seem possible to store the
>> memory context in the AH.
>>   
>
>
> There are no threads, hence nothing is shared. fork() create s new
> process, not a new thread, and all they share are file descriptors.
>
> However, there does seem to be something odd happening with the
> compression lib, which I will investigate. Thanks for the report.

I'm sorry, I meant processes there.  I'm aware there are no threads. 
But my feeling was that when you forked with open files you got all of
the open file properties, including positions, and as you dupped the
descriptor, you share all that it's pointing to with every other copy of
the descriptor.  My brief research on that shows that in 2005 there was
a kernel mailing list discussion on this issue. 
http://mail.nl.linux.org/kernelnewbies/2005-09/msg00479.html was quite
informative for me.  I again could be wrong but worth a read.  If it is
true, then the file needs to be reopened by each child, it can't use the
duplicated descriptor.  I haven't had a change to implementation test is
as it's late here.  But I'd take a stab that it will solve the
compression library problems.

I hope this helps, not hinders

Russell.


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Russell Smith wrote:
> I'm sorry, I meant processes there.  I'm aware there are no threads. 
> But my feeling was that when you forked with open files you got all of
> the open file properties, including positions, and as you dupped the
> descriptor, you share all that it's pointing to with every other copy of
> the descriptor.  My brief research on that shows that in 2005 there was
> a kernel mailing list discussion on this issue. 
> http://mail.nl.linux.org/kernelnewbies/2005-09/msg00479.html was quite
> informative for me.  I again could be wrong but worth a read.  If it is
> true, then the file needs to be reopened by each child, it can't use the
> duplicated descriptor.  I haven't had a change to implementation test is
> as it's late here.  But I'd take a stab that it will solve the
> compression library problems.
>
> I hope this helps, not hinders
>
>
>   

I'm sure that's the problem. Should be fairly easily fixable, I believe.

Thanks for the info.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

This version of the patch should fix the "shared file descriptor" bug
Russell Smith noticed. It also disables the 1/2 second sleep between
forks, so the performance on a small db (regression) is vastly improved.

cheers

andrew



Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h    13 Apr 2008 03:49:21 -0000    1.47
--- pg_backup.h    26 Sep 2008 15:15:38 -0000
***************
*** 123,128 ****
--- 123,130 ----
      int            suppressDumpWarnings;    /* Suppress output of WARNING entries
                                           * to stderr */
      bool        single_txn;
+     int         number_of_threads;
+     bool        truncate_before_load;

      bool       *idWanted;        /* array showing which dump IDs to emit */
  } RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
  extern void CloseArchive(Archive *AH);

  extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);

  /* Open an existing archive */
  extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c    5 Sep 2008 23:53:42 -0000    1.158
--- pg_backup_archiver.c    26 Sep 2008 15:15:39 -0000
***************
*** 27,38 ****
--- 27,50 ----

  #include <unistd.h>

+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
  #ifdef WIN32
  #include <io.h>
  #endif

  #include "libpq/libpq-fs.h"

+ typedef struct _parallel_slot
+ {
+     pid_t   pid;
+     TocEntry *te;
+     DumpId  dumpId;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)

  const char *progname;

***************
*** 70,76 ****
--- 82,99 ----
  static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
  static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
  static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+

+ static ArchiveHandle *GAH;

  /*
   *    Wrapper functions.
***************
*** 125,137 ****

  /* Public */
  void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;
-     bool        defnDumped;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
--- 148,529 ----

  /* Public */
  void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+     ArchiveHandle *AH = (ArchiveHandle *) AHX;
+     ParallelSlot  *slots;
+     int next_slot;
+     TocEntry *next_work_item = NULL;
+     int work_status;
+     pid_t ret_child;
+     int n_slots = ropt->number_of_threads;
+     TocEntry *te;
+     teReqs    reqs;
+
+
+     /*     AH->debugLevel = 99; */
+     /* some routines that use ahlog() don't get passed AH */
+     GAH = AH;
+
+     ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+     slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+     AH->ropt = ropt;
+
+     if (ropt->create)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --create\n");
+
+     if (ropt->dropSchema)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --clean\n");
+
+     if (!ropt->useDB)
+         die_horribly(AH,modulename,
+                      "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename,
+                              "cannot restore from compressed archive (compression not supported in this
installation)\n");
+         }
+     }
+ #endif
+
+     ahlog(AH, 1, "connecting to database for restore\n");
+     if (AH->version < K_VERS_1_3)
+         die_horribly(AH, modulename,
+                      "direct database connections are not supported in pre-1.3 archives\n");
+
+     /* XXX Should get this from the archive */
+     AHX->minRemoteVersion = 070100;
+     AHX->maxRemoteVersion = 999999;
+
+     /* correct dependency counts in case we're doing a partial restore */
+     if (ropt->idWanted == NULL)
+         InitDummyWantedList(AHX,ropt);
+     _fix_dependency_counts(AH);
+
+     /*
+      * Since we're talking to the DB directly, don't send comments since they
+      * obscure SQL when displaying errors
+      */
+     AH->noTocComments = 1;
+
+     /* Do all the early stuff in a single connection in the parent.
+      * There's no great point in running it in parallel and it will actually
+      * run faster in a single connection because we avoid all the connection
+      * and setup overhead, including the 0.5s sleep below.
+      */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     while((next_work_item = get_next_work_item(AH)) != NULL)
+     {
+         /* XXX need to improve this test in case there is no table data */
+         /* need to test for indexes, FKs, PK, Unique, etc */
+         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+             break;
+         (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+         next_work_item->prestored = true;
+
+         _reduce_dependencies(AH,next_work_item);
+     }
+
+
+     /*
+      * now close parent connection in prep for parallel step.
+      */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+     /* main parent loop */
+
+     ahlog(AH,1,"entering main loop\n");
+
+     while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+            (work_is_being_done(slots,n_slots)))
+     {
+         if (next_work_item != NULL &&
+             ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+         {
+             /* there is work still to do and a worker slot available */
+
+             pid_t child;
+
+             next_work_item->prestored = true;
+
+             child = fork();
+             if (child == 0)
+             {
+                 prestore(AH,next_work_item);
+                 /* should not happen ... we expect prestore to exit */
+                 exit(1);
+             }
+             else if (child > 0)
+             {
+                 slots[next_slot].pid = child;
+                 slots[next_slot].te = next_work_item;
+                 slots[next_slot].dumpId = next_work_item->dumpId;
+             }
+             else
+             {
+                 /* XXX fork error - handle it! */
+             }
+             /* delay just long enough betweek forks to give the catalog some
+              * breathing space. Without this sleep I got
+              * "tuple concurrently updated" errors.
+              */
+             /* pg_usleep(500000); */
+             continue; /* in case the slots are not yet full */
+         }
+         /* if we get here there must be work being done */
+         ret_child = wait(&work_status);
+
+         if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+         {
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+         {
+             int i;
+
+             for (i = 0; i < n_slots; i++)
+             {
+                 if (slots[i].pid == ret_child)
+                     _inhibit_data_for_failed_table(AH, slots[i].te);
+                 break;
+             }
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else
+         {
+             /* XXX something went wrong - deal with it */
+         }
+     }
+
+     /*
+      * now process the ACLs - no need to do this in parallel
+      */
+
+     /* reconnect from parent */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Scan TOC to output ownership commands and ACLs
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         AH->currentTE = te;
+
+         /* Work out what, if anything, we want from this entry */
+         reqs = _tocEntryRequired(te, ropt, true);
+
+         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+         {
+             ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+                   te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, true);
+         }
+     }
+
+     /* clean up */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+     ahlog(GAH,1,"is work being done?\n");
+     while(n_slots--)
+     {
+         if (slot->pid > 0)
+             return true;
+         slot++;
+     }
+     ahlog(GAH,1,"work is not being done\n");
+     return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == 0)
+         {
+             ahlog(GAH,1,"available slots is %d\n",i);
+             return i;
+         }
+     }
+     ahlog(GAH,1,"No slot available\n");
+     return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+     TocEntry *te;
+     teReqs    reqs;
+
+     /* just search from the top of the queue until we find an available item.
+      * Note that the queue isn't reordered in the current implementation. If
+      * we ever do reorder it, then certain code that processes entries from the
+      * current item to the end of the queue will probably need to be
+      * re-examined.
+      */
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         if (!te->prestored && te->nDeps < 1)
+         {
+             /* make sure it's not an ACL */
+             reqs = _tocEntryRequired (te, AH->ropt, false);
+             if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+             {
+                 ahlog(AH,1,"next item is %d\n",te->dumpId);
+                 return te;
+             }
+         }
+     }
+     ahlog(AH,1,"No item ready\n");
+     return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+     RestoreOptions *ropt = AH->ropt;
+     int retval;
+
+     /* close and reopen the archive so we have a private copy that doesn't
+      * stomp on anyone else's file pointer
+      */
+
+     (AH->ReopenPtr)(AH);
+
+     ConnectDatabase((Archive *)AH, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     retval = _restore_one_te(AH, te, ropt, true);
+
+     PQfinish(AH->connection);
+     exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+                ParallelSlot *slots, int n_slots)
+ {
+
+     TocEntry *te = NULL;
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == worker)
+         {
+             te = slots[i].te;
+             slots[i].pid = 0;
+             slots[i].te = NULL;
+             slots[i].dumpId = 0;
+             break;
+         }
+     }
+
+     /* Assert (te != NULL); */
+
+     _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+  * Make sure the head of each dependency chain is a live item
+  *
+  * Once this is established the property will be maintained by
+  * _reduce_dependencies called as items are done.
+  */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+     TocEntry * te;
+     RestoreOptions * ropt = AH->ropt;
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+     DumpId item = te->dumpId;
+     RestoreOptions * ropt = AH->ropt;
+     int i;
+
+     for (te = te->next; te != AH->toc; te = te->next)
+     {
+         for (i = 0; i < te->nDeps; i++)
+             if (te->dependencies[i] == item)
+             {
+                 te->nDeps = te->nDeps - 1;
+                 /*
+                  * If this item won't in fact be done, and is now  at
+                  * 0 dependency count, we pretend it's been done and
+                  * reduce the dependency counts of all the things that
+                  * depend on it, by a recursive call
+                  */
+                 if (te->nDeps == 0 && ! ropt->idWanted[te->dumpId -1])
+                     _reduce_dependencies(AH,te);
+
+                 break;
+             }
+     }
+
+ }
+
+
+ /* Public */
+ void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 563,582 ----
          AH->noTocComments = 1;
      }

+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in
thisinstallation)\n"); 
+         }
+     }
+ #endif
+
      /*
       * Work out if we have an implied data-only restore. This can happen if
       * the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         AH->currentTE = te;
!
!         /* Work out what, if anything, we want from this entry */
!         reqs = _tocEntryRequired(te, ropt, false);
!
!         /* Dump any relevant dump warnings to stderr */
!         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
!         {
!             if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->defn);
!             else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
!         }
!
!         defnDumped = false;
!
!         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
!         {
!             ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
!             _printTocEntry(AH, te, ropt, false, false);
!             defnDumped = true;
!
!             /*
!              * If we could not create a table and --no-data-for-failed-tables
!              * was given, ignore the corresponding TABLE DATA
!              */
!             if (ropt->noDataForFailedTables &&
!                 AH->lastErrorTE == te &&
!                 strcmp(te->desc, "TABLE") == 0)
!             {
!                 TocEntry   *tes;
!
!                 ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
!                       te->tag);
!
!                 for (tes = te->next; tes != AH->toc; tes = tes->next)
!                 {
!                     if (strcmp(tes->desc, "TABLE DATA") == 0 &&
!                         strcmp(tes->tag, te->tag) == 0 &&
!                         strcmp(tes->namespace ? tes->namespace : "",
!                                te->namespace ? te->namespace : "") == 0)
!                     {
!                         /* mark it unwanted */
!                         ropt->idWanted[tes->dumpId - 1] = false;
!                         break;
!                     }
!                 }
!             }
!
!             /* If we created a DB, connect to it... */
!             if (strcmp(te->desc, "DATABASE") == 0)
!             {
!                 ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
!                 _reconnectToDB(AH, te->tag);
!             }
!         }
!
!         /*
!          * If we have a data component, then process it
!          */
!         if ((reqs & REQ_DATA) != 0)
!         {
!             /*
!              * hadDumper will be set if there is genuine data component for
!              * this node. Otherwise, we need to check the defn field for
!              * statements that need to be executed in data-only restores.
!              */
!             if (te->hadDumper)
!             {
!                 /*
!                  * If we can output the data, then restore it.
!                  */
!                 if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
!                 {
! #ifndef HAVE_LIBZ
!                     if (AH->compression != 0)
!                         die_horribly(AH, modulename, "cannot restore from compressed archive (compression not
supportedin this installation)\n"); 
! #endif
!
!                     _printTocEntry(AH, te, ropt, true, false);
!
!                     if (strcmp(te->desc, "BLOBS") == 0 ||
!                         strcmp(te->desc, "BLOB COMMENTS") == 0)
!                     {
!                         ahlog(AH, 1, "restoring %s\n", te->desc);
!
!                         _selectOutputSchema(AH, "pg_catalog");
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!                     }
!                     else
!                     {
!                         _disableTriggersIfNecessary(AH, te, ropt);
!
!                         /* Select owner and schema as necessary */
!                         _becomeOwner(AH, te);
!                         _selectOutputSchema(AH, te->namespace);
!
!                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
!                               te->tag);
!
!                         /*
!                          * If we have a copy statement, use it. As of V1.3,
!                          * these are separate to allow easy import from
!                          * withing a database connection. Pre 1.3 archives can
!                          * not use DB connections and are sent to output only.
!                          *
!                          * For V1.3+, the table data MUST have a copy
!                          * statement so that we can go into appropriate mode
!                          * with libpq.
!                          */
!                         if (te->copyStmt && strlen(te->copyStmt) > 0)
!                         {
!                             ahprintf(AH, "%s", te->copyStmt);
!                             AH->writingCopyData = true;
!                         }
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!
!                         AH->writingCopyData = false;
!
!                         _enableTriggersIfNecessary(AH, te, ropt);
!                     }
!                 }
!             }
!             else if (!defnDumped)
!             {
!                 /* If we haven't already dumped the defn part, do so now */
!                 ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!                 _printTocEntry(AH, te, ropt, false, false);
!             }
!         }
!     }                            /* end loop over TOC entries */

      /*
       * Scan TOC again to output ownership commands and ACLs
--- 676,683 ----
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         (void) _restore_one_te(AH, te, ropt, false);
!     }

      /*
       * Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 725,905 ----
      }
  }

+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+                 RestoreOptions *ropt, bool is_parallel)
+ {
+     teReqs        reqs;
+     bool        defnDumped;
+     int         retval = 0;
+
+     AH->currentTE = te;
+
+     /* Work out what, if anything, we want from this entry */
+     reqs = _tocEntryRequired(te, ropt, false);
+
+     /* Dump any relevant dump warnings to stderr */
+     if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+     {
+         if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+         else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+     }
+
+     defnDumped = false;
+
+     if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+     {
+         ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+         _printTocEntry(AH, te, ropt, false, false);
+         defnDumped = true;
+
+         /*
+          * If we could not create a table and --no-data-for-failed-tables
+          * was given, ignore the corresponding TABLE DATA
+          *
+          * For the parallel case this must be done in the parent, so we just
+          * set a return value.
+          */
+         if (ropt->noDataForFailedTables &&
+             AH->lastErrorTE == te &&
+             strcmp(te->desc, "TABLE") == 0)
+         {
+             if (is_parallel)
+                 retval = 1;
+             else
+                 _inhibit_data_for_failed_table(AH,te);
+         }
+
+         /* If we created a DB, connect to it... */
+         /* won't happen in parallel restore */
+         if (strcmp(te->desc, "DATABASE") == 0)
+         {
+             ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+             _reconnectToDB(AH, te->tag);
+         }
+     }
+
+     /*
+      * If we have a data component, then process it
+      */
+     if ((reqs & REQ_DATA) != 0)
+     {
+         /*
+          * hadDumper will be set if there is genuine data component for
+          * this node. Otherwise, we need to check the defn field for
+          * statements that need to be executed in data-only restores.
+          */
+         if (te->hadDumper)
+         {
+             /*
+              * If we can output the data, then restore it.
+              */
+             if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+             {
+                 _printTocEntry(AH, te, ropt, true, false);
+
+                 if (strcmp(te->desc, "BLOBS") == 0 ||
+                     strcmp(te->desc, "BLOB COMMENTS") == 0)
+                 {
+                     ahlog(AH, 1, "restoring %s\n", te->desc);
+
+                     _selectOutputSchema(AH, "pg_catalog");
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+                 }
+                 else
+                 {
+                     _disableTriggersIfNecessary(AH, te, ropt);
+
+                     /* Select owner and schema as necessary */
+                     _becomeOwner(AH, te);
+                     _selectOutputSchema(AH, te->namespace);
+
+                     ahlog(AH, 1, "restoring data for table \"%s\"\n",
+                           te->tag);
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             StartTransaction(AH);
+                         else
+                             ahprintf(AH, "BEGIN;\n\n");
+
+                         ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+                                  fmtId(te->tag));                    }
+
+                     /*
+                      * If we have a copy statement, use it. As of V1.3,
+                      * these are separate to allow easy import from
+                      * withing a database connection. Pre 1.3 archives can
+                      * not use DB connections and are sent to output only.
+                      *
+                      * For V1.3+, the table data MUST have a copy
+                      * statement so that we can go into appropriate mode
+                      * with libpq.
+                      */
+                     if (te->copyStmt && strlen(te->copyStmt) > 0)
+                     {
+                         ahprintf(AH, "%s", te->copyStmt);
+                         AH->writingCopyData = true;
+                     }
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+                     AH->writingCopyData = false;
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             CommitTransaction(AH);
+                         else
+                             ahprintf(AH, "COMMIT;\n\n");
+                     }
+
+
+                     _enableTriggersIfNecessary(AH, te, ropt);
+                 }
+             }
+         }
+         else if (!defnDumped)
+         {
+             /* If we haven't already dumped the defn part, do so now */
+             ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, false);
+         }
+     }
+
+     return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+     TocEntry   *tes;
+     RestoreOptions *ropt = AH->ropt;
+
+     ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+           te->tag);
+
+     for (tes = te->next; tes != AH->toc; tes = tes->next)
+     {
+         if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+             strcmp(tes->tag, te->tag) == 0 &&
+             strcmp(tes->namespace ? tes->namespace : "",
+                    te->namespace ? te->namespace : "") == 0)
+         {
+             /* mark it unwanted */
+             ropt->idWanted[tes->dumpId - 1] = false;
+
+             _reduce_dependencies(AH, tes);
+             break;
+         }
+     }
+ }
+
  /*
   * Allocate a new RestoreOptions block.
   * This is mainly so we can initialize it, but also for future expansion,
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h    7 Nov 2007 12:24:24 -0000    1.76
--- pg_backup_archiver.h    26 Sep 2008 15:15:39 -0000
***************
*** 99,104 ****
--- 99,105 ----
  struct _restoreList;

  typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
  typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);

  typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
      WriteBufPtr WriteBufPtr;    /* Write a buffer of output to the archive */
      ReadBufPtr ReadBufPtr;        /* Read a buffer of input from the archive */
      ClosePtr ClosePtr;            /* Close the archive */
+     ReopenPtr ReopenPtr;            /* Reopen the archive */
      WriteExtraTocPtr WriteExtraTocPtr;    /* Write extra TOC entry data
                                           * associated with the current archive
                                           * format */
***************
*** 231,236 ****
--- 233,239 ----
      char       *archdbname;        /* DB name *read* from archive */
      bool        requirePassword;
      PGconn       *connection;
+     char       *cachepw;
      int            connectToDB;    /* Flag to indicate if direct DB connection is
                                   * required */
      bool        writingCopyData;    /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
      DumpId        dumpId;
      bool        hadDumper;        /* Archiver was passed a dumper routine (used
                                   * in restore) */
+     bool        prestored;      /* keep track of parallel restore */
      char       *tag;            /* index tag */
      char       *namespace;        /* null or empty string if not in a schema */
      char       *tablespace;        /* null if not in a tablespace; empty string
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c    28 Oct 2007 21:55:52 -0000    1.40
--- pg_backup_custom.c    26 Sep 2008 15:15:39 -0000
***************
*** 40,45 ****
--- 40,46 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
  static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = _ReopenArchive;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
      AH->FH = NULL;
  }

+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+     lclContext *ctx = (lclContext *) AH->formatData;
+     pgoff_t        tpos;
+
+     if (AH->mode == archModeWrite)
+     {
+         die_horribly(AH,modulename,"Can only reopen input archives");
+     }
+     else if ((! AH->fSpec) ||  strcmp(AH->fSpec, "") == 0)
+     {
+         die_horribly(AH,modulename,"Cannot reopen stdin");
+     }
+
+     tpos = ftello(AH->FH);
+
+     if (fclose(AH->FH) != 0)
+         die_horribly(AH, modulename, "could not close archive file: %s\n",
+                      strerror(errno));
+
+     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+     if (!AH->FH)
+         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+                      AH->fSpec, strerror(errno));
+
+     if (ctx->hasSeek)
+     {
+         fseeko(AH->FH, tpos, SEEK_SET);
+     }
+     else
+     {
+         die_horribly(AH,modulename,"cannot reopen non-seekable file");
+     }
+
+ }
+
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c    16 Aug 2008 02:25:06 -0000    1.80
--- pg_backup_db.c    26 Sep 2008 15:15:39 -0000
***************
*** 206,220 ****
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
      else
          AH->requirePassword = false;

      /*
       * Start the connection.  Loop until we have a password if requested by
--- 206,226 ----
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
+     else if (reqPwd)
+     {
+         password = AH->cachepw;
+     }
      else
+     {
          AH->requirePassword = false;
+     }

      /*
       * Start the connection.  Loop until we have a password if requested by
***************
*** 241,247 ****
      } while (new_pass);

      if (password)
!         free(password);

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 247,253 ----
      } while (new_pass);

      if (password)
!         AH->cachepw = password;

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c    28 Oct 2007 21:55:52 -0000    1.34
--- pg_backup_files.c    26 Sep 2008 15:15:39 -0000
***************
*** 87,92 ****
--- 87,93 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c    15 Nov 2007 21:14:41 -0000    1.62
--- pg_backup_tar.c    26 Sep 2008 15:15:39 -0000
***************
*** 143,148 ****
--- 143,149 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c    13 Apr 2008 03:49:22 -0000    1.88
--- pg_restore.c    26 Sep 2008 15:15:39 -0000
***************
*** 78,83 ****
--- 78,84 ----
      static int    no_data_for_failed_tables = 0;
      static int  outputNoTablespaces = 0;
      static int    use_setsessauth = 0;
+     static int  truncate_before_load = 0;

      struct option cmdopts[] = {
          {"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
          {"ignore-version", 0, NULL, 'i'},
          {"index", 1, NULL, 'I'},
          {"list", 0, NULL, 'l'},
+         {"multi-thread",1,NULL,'m'},
          {"no-privileges", 0, NULL, 'x'},
          {"no-acl", 0, NULL, 'x'},
          {"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
          {"disable-triggers", no_argument, &disable_triggers, 1},
          {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
          {"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+         {"truncate-before-load", no_argument, &truncate_before_load, 1},
          {"use-set-session-authorization", no_argument, &use_setsessauth, 1},

          {NULL, 0, NULL, 0}
***************
*** 139,145 ****
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
--- 142,148 ----
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
***************
*** 182,187 ****
--- 185,194 ----
                  opts->tocFile = strdup(optarg);
                  break;

+             case 'm':
+                 opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+                 break;
+
              case 'n':            /* Dump data for this schema only */
                  opts->schemaNames = strdup(optarg);
                  break;
***************
*** 262,268 ****
                  break;

              case 0:
!                 /* This covers the long options equivalent to -X xxx. */
                  break;

              case '1':            /* Restore data in a single transaction */
--- 269,278 ----
                  break;

              case 0:
!                 /*
!                  * This covers the long options without a short equivalent,
!                  * including those equivalent to -X xxx.
!                  */
                  break;

              case '1':            /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
      opts->noDataForFailedTables = no_data_for_failed_tables;
      opts->noTablespace = outputNoTablespaces;
      opts->use_setsessauth = use_setsessauth;
+     opts->truncate_before_load = truncate_before_load;
+
+     if (opts->single_txn)
+     {
+         if (opts->number_of_threads > 1)
+         {
+             write_msg(NULL, "single transaction not compatible with multi-threading");
+             exit(1);
+         }
+         else if (opts->truncate_before_load)
+         {
+             write_msg(NULL, "single transaction not compatible with truncate-before-load");
+             exit(1);
+         }
+     }

      if (opts->formatName)
      {
***************
*** 330,335 ****
--- 355,362 ----

      AH = OpenArchive(inputFileSpec, opts->format);

+     /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
      /* Let the archiver know how noisy to be */
      AH->verbose = opts->verbose;

***************
*** 351,356 ****
--- 378,385 ----

      if (opts->tocSummary)
          PrintTOCSummary(AH, opts);
+     else if (opts->number_of_threads > 1)
+         RestoreArchiveParallel(AH, opts);
      else
          RestoreArchive(AH, opts);


Re: parallel pg_restore - WIP patch

От
Stefan Kaltenbrunner
Дата:
Andrew Dunstan wrote:
> 
> 
> This version of the patch should fix the "shared file descriptor" bug 
> Russell Smith noticed. It also disables the 1/2 second sleep between 
> forks, so the performance on a small db (regression) is vastly improved.

this works better but there is something fishy still - using the same 
dump file I get a proper restore using pg_restore normally. If I however 
use -m for a parallel one I only get parts (in this case only 243 of the 
709 tables) of the database restored ...


Stefan


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Stefan Kaltenbrunner wrote:
> Andrew Dunstan wrote:
>>
>>
>> This version of the patch should fix the "shared file descriptor" bug 
>> Russell Smith noticed. It also disables the 1/2 second sleep between 
>> forks, so the performance on a small db (regression) is vastly improved.
>
> this works better but there is something fishy still - using the same 
> dump file I get a proper restore using pg_restore normally. If I 
> however use -m for a parallel one I only get parts (in this case only 
> 243 of the 709 tables) of the database restored ...
>
>
>

Yes, there are several funny things going on, including some stuff with 
dependencies. I'll have a new patch tomorrow with luck. Thanks for testing.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Joshua Drake
Дата:
On Fri, 26 Sep 2008 17:10:44 -0400
Andrew Dunstan <andrew@dunslane.net> wrote:

> Yes, there are several funny things going on, including some stuff
> with dependencies. I'll have a new patch tomorrow with luck. Thanks
> for testing.

O.k. I took at look at the patch itself and although I don't understand
all of it there were a couple of red flags to me:

+ if (ropt->create)
+         die_horribly(AH,modulename,
+                      "parallel restore is
incompatible with --create\n");
+ 

This seems like an odd limitation. In my mind, the schema would not be
restored in parallel. The schema before data would restore as a single
thread. Even the largest schemas would only take minutes (if that).
Thus something like --create should never be a problem.

I also noticed you check if we have zlib? Is it even possible to use
the c format without it? (that would be new to me).

I noticed this line:


+     while((next_work_item = get_next_work_item(AH)) != NULL)
+     {
+         /* XXX need to improve this test in case there is no
table data */
+         /* need to test for indexes, FKs, PK, Unique, etc */
+         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+             break;
+         (void) _restore_one_te(AH, next_work_item, ropt,
false);
+ 
+         next_work_item->prestored = true;
+ 
+         _reduce_dependencies(AH,next_work_item);
+     }


Intead of the TABLE DATA compare, perhaps it makes sense to back patch
pg_dump to have a line delimiter in the TOC? That way even if there is
no TABLE DATA there would be a delimiter that says:

--- BEGIN TABLE DATA
--- END TABLE DATA

Thus if nothing is there... nothing is there?

+             /* delay just long enough betweek forks to
give the catalog some
+              * breathing space. Without this sleep I got 
+              * "tuple concurrently updated" errors.
+              */
+             pg_usleep(500000);
+             continue; /* in case the slots are not yet
full */
+         }

Could that be solved with a lock instead? Once the lock is released....

Anyway... just some thoughts. I apologize if I misunderstood the patch.

Sincerely,

Joshua D. Drake



> 
> cheers
> 
> andrew
> 


-- 
The PostgreSQL Company since 1997: http://www.commandprompt.com/ 
PostgreSQL Community Conference: http://www.postgresqlconference.org/
United States PostgreSQL Association: http://www.postgresql.us/




Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Joshua Drake wrote:
> On Fri, 26 Sep 2008 17:10:44 -0400
> Andrew Dunstan <andrew@dunslane.net> wrote:
>
>   
>> Yes, there are several funny things going on, including some stuff
>> with dependencies. I'll have a new patch tomorrow with luck. Thanks
>> for testing.
>>     
>
> O.k. I took at look at the patch itself and although I don't understand
> all of it there were a couple of red flags to me:
>
> + if (ropt->create)
> +         die_horribly(AH,modulename,
> +                      "parallel restore is
> incompatible with --create\n");
> + 
>   
> This seems like an odd limitation. In my mind, the schema would not be
> restored in parallel. The schema before data would restore as a single
> thread. Even the largest schemas would only take minutes (if that).
> Thus something like --create should never be a problem.
>   


Originally I had everything restoring in parallel. Now I am in fact (as 
the patch should have showed you) restoring the first part in a single 
thread like you say. Thus I probably can relax that restriction. I will 
look and see.

> I also noticed you check if we have zlib? Is it even possible to use
> the c format without it? (that would be new to me).
>
> I noticed this line:
>
>
> +     while((next_work_item = get_next_work_item(AH)) != NULL)
> +     {
> +         /* XXX need to improve this test in case there is no
> table data */
> +         /* need to test for indexes, FKs, PK, Unique, etc */
> +         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
> +             break;
> +         (void) _restore_one_te(AH, next_work_item, ropt,
> false);
> + 
> +         next_work_item->prestored = true;
> + 
> +         _reduce_dependencies(AH,next_work_item);
> +     }
>
>
> Intead of the TABLE DATA compare, perhaps it makes sense to back patch
> pg_dump to have a line delimiter in the TOC? That way even if there is
> no TABLE DATA there would be a delimiter that says:
>
> --- BEGIN TABLE DATA
> --- END TABLE DATA
>
> Thus if nothing is there... nothing is there?
>   

The TOC isn't stored as a text file. So we'll need to look by entry 
tags. It's no big deal - there aren't a huge number.

> +             /* delay just long enough betweek forks to
> give the catalog some
> +              * breathing space. Without this sleep I got 
> +              * "tuple concurrently updated" errors.
> +              */
> +             pg_usleep(500000);
> +             continue; /* in case the slots are not yet
> full */
> +         }
>
> Could that be solved with a lock instead? Once the lock is released....
>   


That sleep is now gone.


> Anyway... just some thoughts. I apologize if I misunderstood the patch.
>
>
>   


No problem. Thanks for looking.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Andrew Dunstan wrote:
>
>
>>
>> this works better but there is something fishy still - using the same
>> dump file I get a proper restore using pg_restore normally. If I
>> however use -m for a parallel one I only get parts (in this case only
>> 243 of the 709 tables) of the database restored ...
>>
>>
>>
>
> Yes, there are several funny things going on, including some stuff
> with dependencies. I'll have a new patch tomorrow with luck. Thanks
> for testing.
>
>

OK, in this version a whole heap of bugs are fixed, mainly those to do
with dependencies and saved state. I get identical row counts in the
source and destination now, quite reliably.

cheers

andrew
Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h    13 Apr 2008 03:49:21 -0000    1.47
--- pg_backup.h    29 Sep 2008 02:43:51 -0000
***************
*** 123,128 ****
--- 123,130 ----
      int            suppressDumpWarnings;    /* Suppress output of WARNING entries
                                           * to stderr */
      bool        single_txn;
+     int         number_of_threads;
+     bool        truncate_before_load;

      bool       *idWanted;        /* array showing which dump IDs to emit */
  } RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
  extern void CloseArchive(Archive *AH);

  extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);

  /* Open an existing archive */
  extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c    5 Sep 2008 23:53:42 -0000    1.158
--- pg_backup_archiver.c    29 Sep 2008 02:43:52 -0000
***************
*** 27,38 ****
--- 27,50 ----

  #include <unistd.h>

+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
  #ifdef WIN32
  #include <io.h>
  #endif

  #include "libpq/libpq-fs.h"

+ typedef struct _parallel_slot
+ {
+     pid_t   pid;
+     TocEntry *te;
+     DumpId  dumpId;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)

  const char *progname;

***************
*** 70,76 ****
--- 82,99 ----
  static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
  static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
  static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+

+ static ArchiveHandle *GAH;

  /*
   *    Wrapper functions.
***************
*** 125,137 ****

  /* Public */
  void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;
-     bool        defnDumped;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
--- 148,579 ----

  /* Public */
  void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+     ArchiveHandle *AH = (ArchiveHandle *) AHX;
+     ParallelSlot  *slots;
+     int next_slot;
+     TocEntry *next_work_item = NULL;
+     int work_status;
+     pid_t ret_child;
+     int n_slots = ropt->number_of_threads;
+     TocEntry *te;
+     teReqs    reqs;
+
+
+     /*     AH->debugLevel = 99; */
+     /* some routines that use ahlog() don't get passed AH */
+     GAH = AH;
+
+     ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+     slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+     AH->ropt = ropt;
+
+ /*
+     if (ropt->create)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --create\n");
+ */
+
+
+     if (ropt->dropSchema)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --clean\n");
+
+     if (!ropt->useDB)
+         die_horribly(AH,modulename,
+                      "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename,
+                              "cannot restore from compressed archive (compression not supported in this
installation)\n");
+         }
+     }
+ #endif
+
+     ahlog(AH, 1, "connecting to database for restore\n");
+     if (AH->version < K_VERS_1_3)
+         die_horribly(AH, modulename,
+                      "direct database connections are not supported in pre-1.3 archives\n");
+
+     /* XXX Should get this from the archive */
+     AHX->minRemoteVersion = 070100;
+     AHX->maxRemoteVersion = 999999;
+
+     /* correct dependency counts in case we're doing a partial restore */
+     if (ropt->idWanted == NULL)
+         InitDummyWantedList(AHX,ropt);
+     _fix_dependency_counts(AH);
+
+     /*
+      * Since we're talking to the DB directly, don't send comments since they
+      * obscure SQL when displaying errors
+      */
+     AH->noTocComments = 1;
+
+     /* Do all the early stuff in a single connection in the parent.
+      * There's no great point in running it in parallel and it will actually
+      * run faster in a single connection because we avoid all the connection
+      * and setup overhead, including the 0.5s sleep below.
+      */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     while((next_work_item = get_next_work_item(AH)) != NULL)
+     {
+         /* XXX need to improve this test in case there is no table data */
+         /* need to test for indexes, FKs, PK, Unique, etc */
+         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+             break;
+         (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+         next_work_item->prestored = true;
+
+         _reduce_dependencies(AH,next_work_item);
+     }
+
+
+     /*
+      * now close parent connection in prep for parallel step.
+      */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+     /* blow away any preserved state from the previous connection */
+
+     if (AH->currSchema)
+         free(AH->currSchema);
+     AH->currSchema = strdup("");
+     if (AH->currUser)
+         free(AH->currUser);
+     AH->currUser = strdup("");
+     if (AH->currTablespace)
+         free(AH->currTablespace);
+     AH->currTablespace = NULL;
+     AH->currWithOids = -1;
+
+     /* main parent loop */
+
+     ahlog(AH,1,"entering main loop\n");
+
+     while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+            (work_is_being_done(slots,n_slots)))
+     {
+         if (next_work_item != NULL &&
+             ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+         {
+             /* there is work still to do and a worker slot available */
+
+             pid_t child;
+
+             next_work_item->prestored = true;
+
+             child = fork();
+             if (child == 0)
+             {
+                 prestore(AH,next_work_item);
+                 /* should not happen ... we expect prestore to exit */
+                 exit(1);
+             }
+             else if (child > 0)
+             {
+                 slots[next_slot].pid = child;
+                 slots[next_slot].te = next_work_item;
+                 slots[next_slot].dumpId = next_work_item->dumpId;
+             }
+             else
+             {
+                 /* XXX fork error - handle it! */
+             }
+             /* delay just long enough betweek forks to give the catalog some
+              * breathing space. Without this sleep I got
+              * "tuple concurrently updated" errors.
+              */
+             /* pg_usleep(500000); */
+             continue; /* in case the slots are not yet full */
+         }
+         /* if we get here there must be work being done */
+         ret_child = wait(&work_status);
+
+         if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+         {
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+         {
+             int i;
+
+             for (i = 0; i < n_slots; i++)
+             {
+                 if (slots[i].pid == ret_child)
+                     _inhibit_data_for_failed_table(AH, slots[i].te);
+                 break;
+             }
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else
+         {
+             /* XXX something went wrong - deal with it */
+         }
+     }
+
+     /*
+      * now process the ACLs - no need to do this in parallel
+      */
+
+     /* reconnect from parent */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Scan TOC to output ownership commands and ACLs
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         AH->currentTE = te;
+
+         /* Work out what, if anything, we want from this entry */
+         reqs = _tocEntryRequired(te, ropt, true);
+
+         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+         {
+             ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+                   te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, true);
+         }
+     }
+
+     /* clean up */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+     ahlog(GAH,1,"is work being done?\n");
+     while(n_slots--)
+     {
+         if (slot->pid > 0)
+             return true;
+         slot++;
+     }
+     ahlog(GAH,1,"work is not being done\n");
+     return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == 0)
+         {
+             ahlog(GAH,1,"available slots is %d\n",i);
+             return i;
+         }
+     }
+     ahlog(GAH,1,"No slot available\n");
+     return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+     TocEntry *te;
+     teReqs    reqs;
+
+     /* just search from the top of the queue until we find an available item.
+      * Note that the queue isn't reordered in the current implementation. If
+      * we ever do reorder it, then certain code that processes entries from the
+      * current item to the end of the queue will probably need to be
+      * re-examined.
+      */
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         if (!te->prestored && te->depCount < 1)
+         {
+             /* make sure it's not an ACL */
+             reqs = _tocEntryRequired (te, AH->ropt, false);
+             if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+             {
+                 ahlog(AH,1,"next item is %d\n",te->dumpId);
+                 return te;
+             }
+         }
+     }
+     ahlog(AH,1,"No item ready\n");
+     return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+     RestoreOptions *ropt = AH->ropt;
+     int retval;
+
+     /* close and reopen the archive so we have a private copy that doesn't
+      * stomp on anyone else's file pointer
+      */
+
+     (AH->ReopenPtr)(AH);
+
+     ConnectDatabase((Archive *)AH, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     retval = _restore_one_te(AH, te, ropt, true);
+
+     PQfinish(AH->connection);
+     exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+                ParallelSlot *slots, int n_slots)
+ {
+
+     TocEntry *te = NULL;
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == worker)
+         {
+             te = slots[i].te;
+             slots[i].pid = 0;
+             slots[i].te = NULL;
+             slots[i].dumpId = 0;
+             break;
+         }
+     }
+
+     /* Assert (te != NULL); */
+
+     _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+  * Make sure the head of each dependency chain is a live item
+  *
+  * Once this is established the property will be maintained by
+  * _reduce_dependencies called as items are done.
+  */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+     TocEntry * te;
+     RestoreOptions * ropt = AH->ropt;
+     bool * RealDumpIds;
+     int i;
+
+
+     RealDumpIds = calloc(AH->maxDumpId, sizeof(bool));
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         RealDumpIds[te->dumpId-1] = true;
+         if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+     }
+
+     /*
+      * It is possible that the dependencies list items that are
+      * not in the archive at all. Reduce the depcounts so those get
+      * ignored.
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         for (i = 0; i < te->nDeps; i++)
+             if (!RealDumpIds[te->dependencies[i]-1])
+                 te->depCount--;
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+     DumpId item = te->dumpId;
+     RestoreOptions * ropt = AH->ropt;
+     int i;
+
+     for (te = te->next; te != AH->toc; te = te->next)
+     {
+         if (te->nDeps == 0)
+             continue;
+
+         for (i = 0; i < te->nDeps; i++)
+             if (te->dependencies[i] == item)
+                 te->depCount = te->depCount - 1;
+
+         /* If this is a table data item we are making available,
+          * make the table's dependencies depend on this item instead of
+          * the table definition, so they
+          * don't get scheduled until the data is loaded.
+          * Have to do this now before the main loop gets to anything
+          * further down the list.
+          */
+         if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0)
+         {
+             TocEntry *tes;
+             int j;
+             for (tes = te->next; tes != AH->toc; tes = tes->next)
+                 for (j = 0; j < tes->nDeps; j++)
+                     if (tes->dependencies[j] == item)
+                         tes->dependencies[j] = te->dumpId;
+         }
+
+         /*
+          * If this item won't in fact be done, and is now  at
+          * 0 dependency count, we pretend it's been done and
+          * reduce the dependency counts of all the things that
+          * depend on it, by a recursive call
+          */
+         if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+     }
+
+ }
+
+
+ /* Public */
+ void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 613,632 ----
          AH->noTocComments = 1;
      }

+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in
thisinstallation)\n"); 
+         }
+     }
+ #endif
+
      /*
       * Work out if we have an implied data-only restore. This can happen if
       * the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         AH->currentTE = te;
!
!         /* Work out what, if anything, we want from this entry */
!         reqs = _tocEntryRequired(te, ropt, false);
!
!         /* Dump any relevant dump warnings to stderr */
!         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
!         {
!             if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->defn);
!             else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
!         }
!
!         defnDumped = false;
!
!         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
!         {
!             ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
!             _printTocEntry(AH, te, ropt, false, false);
!             defnDumped = true;
!
!             /*
!              * If we could not create a table and --no-data-for-failed-tables
!              * was given, ignore the corresponding TABLE DATA
!              */
!             if (ropt->noDataForFailedTables &&
!                 AH->lastErrorTE == te &&
!                 strcmp(te->desc, "TABLE") == 0)
!             {
!                 TocEntry   *tes;
!
!                 ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
!                       te->tag);
!
!                 for (tes = te->next; tes != AH->toc; tes = tes->next)
!                 {
!                     if (strcmp(tes->desc, "TABLE DATA") == 0 &&
!                         strcmp(tes->tag, te->tag) == 0 &&
!                         strcmp(tes->namespace ? tes->namespace : "",
!                                te->namespace ? te->namespace : "") == 0)
!                     {
!                         /* mark it unwanted */
!                         ropt->idWanted[tes->dumpId - 1] = false;
!                         break;
!                     }
!                 }
!             }
!
!             /* If we created a DB, connect to it... */
!             if (strcmp(te->desc, "DATABASE") == 0)
!             {
!                 ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
!                 _reconnectToDB(AH, te->tag);
!             }
!         }
!
!         /*
!          * If we have a data component, then process it
!          */
!         if ((reqs & REQ_DATA) != 0)
!         {
!             /*
!              * hadDumper will be set if there is genuine data component for
!              * this node. Otherwise, we need to check the defn field for
!              * statements that need to be executed in data-only restores.
!              */
!             if (te->hadDumper)
!             {
!                 /*
!                  * If we can output the data, then restore it.
!                  */
!                 if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
!                 {
! #ifndef HAVE_LIBZ
!                     if (AH->compression != 0)
!                         die_horribly(AH, modulename, "cannot restore from compressed archive (compression not
supportedin this installation)\n"); 
! #endif
!
!                     _printTocEntry(AH, te, ropt, true, false);
!
!                     if (strcmp(te->desc, "BLOBS") == 0 ||
!                         strcmp(te->desc, "BLOB COMMENTS") == 0)
!                     {
!                         ahlog(AH, 1, "restoring %s\n", te->desc);
!
!                         _selectOutputSchema(AH, "pg_catalog");
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!                     }
!                     else
!                     {
!                         _disableTriggersIfNecessary(AH, te, ropt);
!
!                         /* Select owner and schema as necessary */
!                         _becomeOwner(AH, te);
!                         _selectOutputSchema(AH, te->namespace);
!
!                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
!                               te->tag);
!
!                         /*
!                          * If we have a copy statement, use it. As of V1.3,
!                          * these are separate to allow easy import from
!                          * withing a database connection. Pre 1.3 archives can
!                          * not use DB connections and are sent to output only.
!                          *
!                          * For V1.3+, the table data MUST have a copy
!                          * statement so that we can go into appropriate mode
!                          * with libpq.
!                          */
!                         if (te->copyStmt && strlen(te->copyStmt) > 0)
!                         {
!                             ahprintf(AH, "%s", te->copyStmt);
!                             AH->writingCopyData = true;
!                         }
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!
!                         AH->writingCopyData = false;
!
!                         _enableTriggersIfNecessary(AH, te, ropt);
!                     }
!                 }
!             }
!             else if (!defnDumped)
!             {
!                 /* If we haven't already dumped the defn part, do so now */
!                 ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!                 _printTocEntry(AH, te, ropt, false, false);
!             }
!         }
!     }                            /* end loop over TOC entries */

      /*
       * Scan TOC again to output ownership commands and ACLs
--- 726,733 ----
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         (void) _restore_one_te(AH, te, ropt, false);
!     }

      /*
       * Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 775,955 ----
      }
  }

+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+                 RestoreOptions *ropt, bool is_parallel)
+ {
+     teReqs        reqs;
+     bool        defnDumped;
+     int         retval = 0;
+
+     AH->currentTE = te;
+
+     /* Work out what, if anything, we want from this entry */
+     reqs = _tocEntryRequired(te, ropt, false);
+
+     /* Dump any relevant dump warnings to stderr */
+     if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+     {
+         if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+         else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+     }
+
+     defnDumped = false;
+
+     if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+     {
+         ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+         _printTocEntry(AH, te, ropt, false, false);
+         defnDumped = true;
+
+         /*
+          * If we could not create a table and --no-data-for-failed-tables
+          * was given, ignore the corresponding TABLE DATA
+          *
+          * For the parallel case this must be done in the parent, so we just
+          * set a return value.
+          */
+         if (ropt->noDataForFailedTables &&
+             AH->lastErrorTE == te &&
+             strcmp(te->desc, "TABLE") == 0)
+         {
+             if (is_parallel)
+                 retval = 1;
+             else
+                 _inhibit_data_for_failed_table(AH,te);
+         }
+
+         /* If we created a DB, connect to it... */
+         /* won't happen in parallel restore */
+         if (strcmp(te->desc, "DATABASE") == 0)
+         {
+             ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+             _reconnectToDB(AH, te->tag);
+         }
+     }
+
+     /*
+      * If we have a data component, then process it
+      */
+     if ((reqs & REQ_DATA) != 0)
+     {
+         /*
+          * hadDumper will be set if there is genuine data component for
+          * this node. Otherwise, we need to check the defn field for
+          * statements that need to be executed in data-only restores.
+          */
+         if (te->hadDumper)
+         {
+             /*
+              * If we can output the data, then restore it.
+              */
+             if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+             {
+                 _printTocEntry(AH, te, ropt, true, false);
+
+                 if (strcmp(te->desc, "BLOBS") == 0 ||
+                     strcmp(te->desc, "BLOB COMMENTS") == 0)
+                 {
+                     ahlog(AH, 1, "restoring %s\n", te->desc);
+
+                     _selectOutputSchema(AH, "pg_catalog");
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+                 }
+                 else
+                 {
+                     _disableTriggersIfNecessary(AH, te, ropt);
+
+                     /* Select owner and schema as necessary */
+                     _becomeOwner(AH, te);
+                     _selectOutputSchema(AH, te->namespace);
+
+                     ahlog(AH, 1, "restoring data for table \"%s\"\n",
+                           te->tag);
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             StartTransaction(AH);
+                         else
+                             ahprintf(AH, "BEGIN;\n\n");
+
+                         ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+                                  fmtId(te->tag));                    }
+
+                     /*
+                      * If we have a copy statement, use it. As of V1.3,
+                      * these are separate to allow easy import from
+                      * withing a database connection. Pre 1.3 archives can
+                      * not use DB connections and are sent to output only.
+                      *
+                      * For V1.3+, the table data MUST have a copy
+                      * statement so that we can go into appropriate mode
+                      * with libpq.
+                      */
+                     if (te->copyStmt && strlen(te->copyStmt) > 0)
+                     {
+                         ahprintf(AH, "%s", te->copyStmt);
+                         AH->writingCopyData = true;
+                     }
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+                     AH->writingCopyData = false;
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             CommitTransaction(AH);
+                         else
+                             ahprintf(AH, "COMMIT;\n\n");
+                     }
+
+
+                     _enableTriggersIfNecessary(AH, te, ropt);
+                 }
+             }
+         }
+         else if (!defnDumped)
+         {
+             /* If we haven't already dumped the defn part, do so now */
+             ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, false);
+         }
+     }
+
+     return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+     TocEntry   *tes;
+     RestoreOptions *ropt = AH->ropt;
+
+     ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+           te->tag);
+
+     for (tes = te->next; tes != AH->toc; tes = tes->next)
+     {
+         if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+             strcmp(tes->tag, te->tag) == 0 &&
+             strcmp(tes->namespace ? tes->namespace : "",
+                    te->namespace ? te->namespace : "") == 0)
+         {
+             /* mark it unwanted */
+             ropt->idWanted[tes->dumpId - 1] = false;
+
+             _reduce_dependencies(AH, tes);
+             break;
+         }
+     }
+ }
+
  /*
   * Allocate a new RestoreOptions block.
   * This is mainly so we can initialize it, but also for future expansion,
***************
*** 653,662 ****
      while (te != AH->toc)
      {
          if (_tocEntryRequired(te, ropt, true) != 0)
!             ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
                       te->catalogId.tableoid, te->catalogId.oid,
                       te->desc, te->namespace ? te->namespace : "-",
                       te->tag, te->owner);
          te = te->next;
      }

--- 1152,1167 ----
      while (te != AH->toc)
      {
          if (_tocEntryRequired(te, ropt, true) != 0)
!         {
!             int i;
!             ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps);
!             for (i=0 ;i<te->nDeps; i++)
!                 ahprintf(AH, "%d ",te->dependencies[i]);
!             ahprintf(AH, "] %u %u %s %s %s %s\n",
                       te->catalogId.tableoid, te->catalogId.oid,
                       te->desc, te->namespace ? te->namespace : "-",
                       te->tag, te->owner);
+         }
          te = te->next;
      }

***************
*** 1948,1965 ****
--- 2453,2473 ----
                  deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
                  te->dependencies = deps;
                  te->nDeps = depIdx;
+                 te->depCount = depIdx;
              }
              else
              {
                  free(deps);
                  te->dependencies = NULL;
                  te->nDeps = 0;
+                 te->depCount = 0;
              }
          }
          else
          {
              te->dependencies = NULL;
              te->nDeps = 0;
+             te->depCount = 0;
          }

          if (AH->ReadExtraTocPtr)
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h    7 Nov 2007 12:24:24 -0000    1.76
--- pg_backup_archiver.h    29 Sep 2008 02:43:52 -0000
***************
*** 99,104 ****
--- 99,105 ----
  struct _restoreList;

  typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
  typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);

  typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
      WriteBufPtr WriteBufPtr;    /* Write a buffer of output to the archive */
      ReadBufPtr ReadBufPtr;        /* Read a buffer of input from the archive */
      ClosePtr ClosePtr;            /* Close the archive */
+     ReopenPtr ReopenPtr;            /* Reopen the archive */
      WriteExtraTocPtr WriteExtraTocPtr;    /* Write extra TOC entry data
                                           * associated with the current archive
                                           * format */
***************
*** 231,236 ****
--- 233,239 ----
      char       *archdbname;        /* DB name *read* from archive */
      bool        requirePassword;
      PGconn       *connection;
+     char       *cachepw;
      int            connectToDB;    /* Flag to indicate if direct DB connection is
                                   * required */
      bool        writingCopyData;    /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
      DumpId        dumpId;
      bool        hadDumper;        /* Archiver was passed a dumper routine (used
                                   * in restore) */
+     bool        prestored;      /* keep track of parallel restore */
      char       *tag;            /* index tag */
      char       *namespace;        /* null or empty string if not in a schema */
      char       *tablespace;        /* null if not in a tablespace; empty string
***************
*** 296,301 ****
--- 300,306 ----
      char       *copyStmt;
      DumpId       *dependencies;    /* dumpIds of objects this one depends on */
      int            nDeps;            /* number of dependencies */
+     int         depCount;       /* adjustable tally of dependencies */

      DataDumperPtr dataDumper;    /* Routine to dump data for object */
      void       *dataDumperArg;    /* Arg for above routine */
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c    28 Oct 2007 21:55:52 -0000    1.40
--- pg_backup_custom.c    29 Sep 2008 02:43:52 -0000
***************
*** 40,45 ****
--- 40,46 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
  static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = _ReopenArchive;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
      AH->FH = NULL;
  }

+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+     lclContext *ctx = (lclContext *) AH->formatData;
+     pgoff_t        tpos;
+
+     if (AH->mode == archModeWrite)
+     {
+         die_horribly(AH,modulename,"Can only reopen input archives");
+     }
+     else if ((! AH->fSpec) ||  strcmp(AH->fSpec, "") == 0)
+     {
+         die_horribly(AH,modulename,"Cannot reopen stdin");
+     }
+
+     tpos = ftello(AH->FH);
+
+     if (fclose(AH->FH) != 0)
+         die_horribly(AH, modulename, "could not close archive file: %s\n",
+                      strerror(errno));
+
+     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+     if (!AH->FH)
+         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+                      AH->fSpec, strerror(errno));
+
+     if (ctx->hasSeek)
+     {
+         fseeko(AH->FH, tpos, SEEK_SET);
+     }
+     else
+     {
+         die_horribly(AH,modulename,"cannot reopen non-seekable file");
+     }
+
+ }
+
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c    16 Aug 2008 02:25:06 -0000    1.80
--- pg_backup_db.c    29 Sep 2008 02:43:52 -0000
***************
*** 138,148 ****

      ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

!     if (AH->requirePassword)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
      }

      do
--- 138,153 ----

      ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

!     if (AH->requirePassword && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
+         AH->requirePassword = true;
+     }
+     else if (AH->requirePassword)
+     {
+         password = AH->cachepw;
      }

      do
***************
*** 174,180 ****
          }
      } while (new_pass);

!     if (password)
          free(password);

      /* check for version mismatch */
--- 179,185 ----
          }
      } while (new_pass);

!     if (password != AH->cachepw)
          free(password);

      /* check for version mismatch */
***************
*** 206,220 ****
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
      else
          AH->requirePassword = false;

      /*
       * Start the connection.  Loop until we have a password if requested by
--- 211,231 ----
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
+     else if (reqPwd)
+     {
+         password = AH->cachepw;
+     }
      else
+     {
          AH->requirePassword = false;
+     }

      /*
       * Start the connection.  Loop until we have a password if requested by
***************
*** 241,247 ****
      } while (new_pass);

      if (password)
!         free(password);

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 252,258 ----
      } while (new_pass);

      if (password)
!         AH->cachepw = password;

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c    28 Oct 2007 21:55:52 -0000    1.34
--- pg_backup_files.c    29 Sep 2008 02:43:52 -0000
***************
*** 87,92 ****
--- 87,93 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c    15 Nov 2007 21:14:41 -0000    1.62
--- pg_backup_tar.c    29 Sep 2008 02:43:52 -0000
***************
*** 143,148 ****
--- 143,149 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c    13 Apr 2008 03:49:22 -0000    1.88
--- pg_restore.c    29 Sep 2008 02:43:52 -0000
***************
*** 78,83 ****
--- 78,84 ----
      static int    no_data_for_failed_tables = 0;
      static int  outputNoTablespaces = 0;
      static int    use_setsessauth = 0;
+     static int  truncate_before_load = 0;

      struct option cmdopts[] = {
          {"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
          {"ignore-version", 0, NULL, 'i'},
          {"index", 1, NULL, 'I'},
          {"list", 0, NULL, 'l'},
+         {"multi-thread",1,NULL,'m'},
          {"no-privileges", 0, NULL, 'x'},
          {"no-acl", 0, NULL, 'x'},
          {"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
          {"disable-triggers", no_argument, &disable_triggers, 1},
          {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
          {"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+         {"truncate-before-load", no_argument, &truncate_before_load, 1},
          {"use-set-session-authorization", no_argument, &use_setsessauth, 1},

          {NULL, 0, NULL, 0}
***************
*** 139,145 ****
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
--- 142,148 ----
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
***************
*** 182,187 ****
--- 185,194 ----
                  opts->tocFile = strdup(optarg);
                  break;

+             case 'm':
+                 opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+                 break;
+
              case 'n':            /* Dump data for this schema only */
                  opts->schemaNames = strdup(optarg);
                  break;
***************
*** 262,268 ****
                  break;

              case 0:
!                 /* This covers the long options equivalent to -X xxx. */
                  break;

              case '1':            /* Restore data in a single transaction */
--- 269,278 ----
                  break;

              case 0:
!                 /*
!                  * This covers the long options without a short equivalent,
!                  * including those equivalent to -X xxx.
!                  */
                  break;

              case '1':            /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
      opts->noDataForFailedTables = no_data_for_failed_tables;
      opts->noTablespace = outputNoTablespaces;
      opts->use_setsessauth = use_setsessauth;
+     opts->truncate_before_load = truncate_before_load;
+
+     if (opts->single_txn)
+     {
+         if (opts->number_of_threads > 1)
+         {
+             write_msg(NULL, "single transaction not compatible with multi-threading");
+             exit(1);
+         }
+         else if (opts->truncate_before_load)
+         {
+             write_msg(NULL, "single transaction not compatible with truncate-before-load");
+             exit(1);
+         }
+     }

      if (opts->formatName)
      {
***************
*** 330,335 ****
--- 355,362 ----

      AH = OpenArchive(inputFileSpec, opts->format);

+     /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
      /* Let the archiver know how noisy to be */
      AH->verbose = opts->verbose;

***************
*** 351,356 ****
--- 378,385 ----

      if (opts->tocSummary)
          PrintTOCSummary(AH, opts);
+     else if (opts->number_of_threads > 1)
+         RestoreArchiveParallel(AH, opts);
      else
          RestoreArchive(AH, opts);


Re: parallel pg_restore - WIP patch

От
Stefan Kaltenbrunner
Дата:
Andrew Dunstan wrote:
> 
> 
> Andrew Dunstan wrote:
>>
>>
>>>
>>> this works better but there is something fishy still - using the same 
>>> dump file I get a proper restore using pg_restore normally. If I 
>>> however use -m for a parallel one I only get parts (in this case only 
>>> 243 of the 709 tables) of the database restored ...
>>>
>>>
>>>
>>
>> Yes, there are several funny things going on, including some stuff 
>> with dependencies. I'll have a new patch tomorrow with luck. Thanks 
>> for testing.
>>
>>
> 
> OK, in this version a whole heap of bugs are fixed, mainly those to do 
> with dependencies and saved state. I get identical row counts in the 
> source and destination now, quite reliably.

this looks much better (for a restore that usually takes 180min I can 
get down to 72min using -m 4) - however especially with higher 
concurrency I'm sometimes running into restore failures due to deadlocks 
happening during constraint restoration (slightly redacted):

pg_restore: [archiver (db)] Error from TOC entry 7765; 2606 1460743180 
FK CONSTRAINT fk_av_relations_av db_owner
pg_restore: [archiver (db)] could not execute query: ERROR:  deadlock 
detected
DETAIL:  Process 18100 waits for AccessExclusiveLock on relation 
1460818342 of database 1460815284; blocked by process 18103.
Process 18103 waits for AccessExclusiveLock on relation 1460818336 of 
database 1460815284; blocked by process 18100.
HINT:  See server log for query details.

ALTER TABLE ONLY foo    ADD CONSTRAINT fk_av_relations_av FOREIGN KEY (vs_id) REFERENCES 
bar ...


Re: parallel pg_restore - WIP patch

От
Tom Lane
Дата:
Stefan Kaltenbrunner <stefan@kaltenbrunner.cc> writes:
> pg_restore: [archiver (db)] could not execute query: ERROR:  deadlock 
> detected
> DETAIL:  Process 18100 waits for AccessExclusiveLock on relation 
> 1460818342 of database 1460815284; blocked by process 18103.
> Process 18103 waits for AccessExclusiveLock on relation 1460818336 of 
> database 1460815284; blocked by process 18100.
> HINT:  See server log for query details.

> ALTER TABLE ONLY foo
>      ADD CONSTRAINT fk_av_relations_av FOREIGN KEY (vs_id) REFERENCES 
> bar ...

Hmm, I'll bet the restore code doesn't realize that this can't run in
parallel with index creation on either table ...
        regards, tom lane


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Tom Lane wrote:
> Stefan Kaltenbrunner <stefan@kaltenbrunner.cc> writes:
>   
>> pg_restore: [archiver (db)] could not execute query: ERROR:  deadlock 
>> detected
>> DETAIL:  Process 18100 waits for AccessExclusiveLock on relation 
>> 1460818342 of database 1460815284; blocked by process 18103.
>> Process 18103 waits for AccessExclusiveLock on relation 1460818336 of 
>> database 1460815284; blocked by process 18100.
>> HINT:  See server log for query details.
>>     
>
>   
>> ALTER TABLE ONLY foo
>>      ADD CONSTRAINT fk_av_relations_av FOREIGN KEY (vs_id) REFERENCES 
>> bar ...
>>     
>
> Hmm, I'll bet the restore code doesn't realize that this can't run in
> parallel with index creation on either table ...
>
>             
>   

Yeah. Of course, it's never needed to bother with stuff like that till now.

The very simple fix is probably to run a separate parallel cycle just 
for FKs, after the index creation.

A slightly more elegant fix would probably be to add dependencies from 
each index that might cause this to the FK constraint.

I'll work on the first for now.

Is there any chance that the locks we're taking here are too strong? 
Intuitively it looks a bit like it.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Tom Lane
Дата:
Andrew Dunstan <andrew@dunslane.net> writes:
> Tom Lane wrote:
>> Hmm, I'll bet the restore code doesn't realize that this can't run in
>> parallel with index creation on either table ...

> Yeah. Of course, it's never needed to bother with stuff like that till now.

> The very simple fix is probably to run a separate parallel cycle just 
> for FKs, after the index creation.

Um, FKs could conflict with each other too, so that by itself isn't
gonna fix anything.
        regards, tom lane


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Tom Lane wrote:
> Andrew Dunstan <andrew@dunslane.net> writes:
>   
>> Tom Lane wrote:
>>     
>>> Hmm, I'll bet the restore code doesn't realize that this can't run in
>>> parallel with index creation on either table ...
>>>       
>
>   
>> Yeah. Of course, it's never needed to bother with stuff like that till now.
>>     
>
>   
>> The very simple fix is probably to run a separate parallel cycle just 
>> for FKs, after the index creation.
>>     
>
> Um, FKs could conflict with each other too, so that by itself isn't
> gonna fix anything.
>
>             
>   

Good point. Looks like we'll need to make a list of "can't run in 
parallel with" items as well as strict dependencies.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Tom Lane
Дата:
Andrew Dunstan <andrew@dunslane.net> writes:
> Tom Lane wrote:
>> Um, FKs could conflict with each other too, so that by itself isn't
>> gonna fix anything.

> Good point. Looks like we'll need to make a list of "can't run in 
> parallel with" items as well as strict dependencies.

Yeah, I was just thinking about that.  The current archive format
doesn't really carry enough information for this.  I think there
are two basic solutions we could adopt:

* Extend the archive format to provide some indication that "restoring
this object requires exclusive access to these dependencies".

* Hardwire knowledge into pg_restore that certain types of objects
require exclusive access to their dependencies.

The former seems more flexible, as well as more in tune with the basic
design assumption that pg_restore shouldn't have a lot of knowledge
about individual archive object types.  But it would mean that you
couldn't use parallel restore with any pre-8.4 dumps.  In the long run
that's no big deal, but in the short run it's annoying.

Another angle is that it's not clear what happens if the need for
exclusive access changes over time.  You were just speculating about
reducing the lock strength required for ALTER TABLE ADD FOREIGN KEY.
I don't know if that's workable or not, but certainly reducing the
lock strength for some types of ALTER TABLE might be in our future.
Contrarily, we don't currently try hard to lock any non-table objects
(schemas, functions, etc) while building dependent objects; but that's
obviously not really right, and someday we might decide to fix it.
So having pg_dump prepare the list of exclusive dependencies at dump
time might be the wrong thing --- it would reflect the behavior of
the source server version, not the target which is what matters.

Thoughts?
        regards, tom lane


Re: parallel pg_restore - WIP patch

От
Dimitri Fontaine
Дата:
Le lundi 29 septembre 2008, Tom Lane a écrit :
> * Extend the archive format to provide some indication that "restoring
> this object requires exclusive access to these dependencies".
>
> * Hardwire knowledge into pg_restore that certain types of objects
> require exclusive access to their dependencies.

Well, it seems to me that currently the FK needs in term of existing indexes
and locks, and some other object lock needs, are all hardwired. Is it even
safe to consider having the locks needed for certain commands not be
hardwired?

Provided I'm not all wrong here, I don't see how having something more
flexible at restore time than at build time is a win. The drawback is that
whenever you change a lock need in commands, you have to remember teaching
pg_restore about it too.

So my vote here is in favor of hardwired knowledge of pg_restore, matching
target server code assumptions and needs.

Regards,
--
dim

Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Dimitri Fontaine wrote:
> Le lundi 29 septembre 2008, Tom Lane a écrit :
>   
>> * Extend the archive format to provide some indication that "restoring
>> this object requires exclusive access to these dependencies".
>>
>> * Hardwire knowledge into pg_restore that certain types of objects
>> require exclusive access to their dependencies.
>>     
>
> Well, it seems to me that currently the FK needs in term of existing indexes 
> and locks, and some other object lock needs, are all hardwired. Is it even 
> safe to consider having the locks needed for certain commands not be 
> hardwired?
>
> Provided I'm not all wrong here, I don't see how having something more 
> flexible at restore time than at build time is a win. The drawback is that 
> whenever you change a lock need in commands, you have to remember teaching 
> pg_restore about it too.
>
> So my vote here is in favor of hardwired knowledge of pg_restore, matching 
> target server code assumptions and needs.
>
>   

Well, I've had to use some knowledge of various item types already, and 
I have been trying not to disturb pg_dump also, so I'm inclined to build 
this knowledge into pg_restore.

ISTM that "things that will have lock conflicts" are different and more 
target version dependent than "things that logically depend on other 
things", so we can still rely on pg_dump to some extent to provide the 
latter while building the former at restore time.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Stefan Kaltenbrunner
Дата:
Tom Lane wrote:
> Andrew Dunstan <andrew@dunslane.net> writes:
>> Tom Lane wrote:
>>> Um, FKs could conflict with each other too, so that by itself isn't
>>> gonna fix anything.
> 
>> Good point. Looks like we'll need to make a list of "can't run in 
>> parallel with" items as well as strict dependencies.
> 
> Yeah, I was just thinking about that.  The current archive format
> doesn't really carry enough information for this.  I think there
> are two basic solutions we could adopt:
> 
> * Extend the archive format to provide some indication that "restoring
> this object requires exclusive access to these dependencies".
> 
> * Hardwire knowledge into pg_restore that certain types of objects
> require exclusive access to their dependencies.
> 
> The former seems more flexible, as well as more in tune with the basic
> design assumption that pg_restore shouldn't have a lot of knowledge
> about individual archive object types.  But it would mean that you
> couldn't use parallel restore with any pre-8.4 dumps.  In the long run
> that's no big deal, but in the short run it's annoying.

hmm not sure how much of a problem that really is - we usually recommend 
to use the pg_dump version of the target database anyway.




Stefan


Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Stefan Kaltenbrunner wrote:
> Tom Lane wrote:
>> Andrew Dunstan <andrew@dunslane.net> writes:
>>> Tom Lane wrote:
>>>> Um, FKs could conflict with each other too, so that by itself isn't
>>>> gonna fix anything.
>>
>>> Good point. Looks like we'll need to make a list of "can't run in
>>> parallel with" items as well as strict dependencies.
>>
>> Yeah, I was just thinking about that.  The current archive format
>> doesn't really carry enough information for this.  I think there
>> are two basic solutions we could adopt:
>>
>> * Extend the archive format to provide some indication that "restoring
>> this object requires exclusive access to these dependencies".
>>
>> * Hardwire knowledge into pg_restore that certain types of objects
>> require exclusive access to their dependencies.
>>
>> The former seems more flexible, as well as more in tune with the basic
>> design assumption that pg_restore shouldn't have a lot of knowledge
>> about individual archive object types.  But it would mean that you
>> couldn't use parallel restore with any pre-8.4 dumps.  In the long run
>> that's no big deal, but in the short run it's annoying.
>
> hmm not sure how much of a problem that really is - we usually
> recommend to use the pg_dump version of the target database anyway.
>
>
>
>

We don't really need a huge amount of hardwiring as it turns out. Here
is a version of the patch that tries to do what's needed in this area.

cheers

andrew
Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h    13 Apr 2008 03:49:21 -0000    1.47
--- pg_backup.h    29 Sep 2008 23:34:57 -0000
***************
*** 123,128 ****
--- 123,130 ----
      int            suppressDumpWarnings;    /* Suppress output of WARNING entries
                                           * to stderr */
      bool        single_txn;
+     int         number_of_threads;
+     bool        truncate_before_load;

      bool       *idWanted;        /* array showing which dump IDs to emit */
  } RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
  extern void CloseArchive(Archive *AH);

  extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);

  /* Open an existing archive */
  extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c    5 Sep 2008 23:53:42 -0000    1.158
--- pg_backup_archiver.c    29 Sep 2008 23:34:58 -0000
***************
*** 27,38 ****
--- 27,51 ----

  #include <unistd.h>

+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
  #ifdef WIN32
  #include <io.h>
  #endif

  #include "libpq/libpq-fs.h"

+ typedef struct _parallel_slot
+ {
+     pid_t   pid;
+     TocEntry *te;
+     DumpId  dumpId;
+     DumpId  tdeps[2];
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)

  const char *progname;

***************
*** 70,76 ****
--- 83,100 ----
  static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
  static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
  static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH, ParallelSlot *slots, int n_slots);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+

+ static ArchiveHandle *GAH;

  /*
   *    Wrapper functions.
***************
*** 125,137 ****

  /* Public */
  void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;
-     bool        defnDumped;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
--- 149,633 ----

  /* Public */
  void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+     ArchiveHandle *AH = (ArchiveHandle *) AHX;
+     ParallelSlot  *slots;
+     int next_slot;
+     TocEntry *next_work_item = NULL;
+     int work_status;
+     pid_t ret_child;
+     int n_slots = ropt->number_of_threads;
+     TocEntry *te;
+     teReqs    reqs;
+
+
+          AH->debugLevel = 99;
+     /* some routines that use ahlog() don't get passed AH */
+     GAH = AH;
+
+     ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+     slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+     AH->ropt = ropt;
+
+ /*
+     if (ropt->create)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --create\n");
+ */
+
+
+     if (ropt->dropSchema)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --clean\n");
+
+     if (!ropt->useDB)
+         die_horribly(AH,modulename,
+                      "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename,
+                              "cannot restore from compressed archive (compression not supported in this
installation)\n");
+         }
+     }
+ #endif
+
+     ahlog(AH, 1, "connecting to database for restore\n");
+     if (AH->version < K_VERS_1_3)
+         die_horribly(AH, modulename,
+                      "direct database connections are not supported in pre-1.3 archives\n");
+
+     /* XXX Should get this from the archive */
+     AHX->minRemoteVersion = 070100;
+     AHX->maxRemoteVersion = 999999;
+
+     /* correct dependency counts in case we're doing a partial restore */
+     if (ropt->idWanted == NULL)
+         InitDummyWantedList(AHX,ropt);
+     _fix_dependency_counts(AH);
+
+     /*
+      * Since we're talking to the DB directly, don't send comments since they
+      * obscure SQL when displaying errors
+      */
+     AH->noTocComments = 1;
+
+     /* Do all the early stuff in a single connection in the parent.
+      * There's no great point in running it in parallel and it will actually
+      * run faster in a single connection because we avoid all the connection
+      * and setup overhead, including the 0.5s sleep below.
+      */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     while((next_work_item = get_next_work_item(AH,NULL,0)) != NULL)
+     {
+         /* XXX need to improve this test in case there is no table data */
+         /* need to test for indexes, FKs, PK, Unique, etc */
+         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+             break;
+         (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+         next_work_item->prestored = true;
+
+         _reduce_dependencies(AH,next_work_item);
+     }
+
+
+     /*
+      * now close parent connection in prep for parallel step.
+      */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+     /* blow away any preserved state from the previous connection */
+
+     if (AH->currSchema)
+         free(AH->currSchema);
+     AH->currSchema = strdup("");
+     if (AH->currUser)
+         free(AH->currUser);
+     AH->currUser = strdup("");
+     if (AH->currTablespace)
+         free(AH->currTablespace);
+     AH->currTablespace = NULL;
+     AH->currWithOids = -1;
+
+     /* main parent loop */
+
+     ahlog(AH,1,"entering main loop\n");
+
+     while (((next_work_item = get_next_work_item(AH,slots,n_slots)) != NULL) ||
+            (work_is_being_done(slots,n_slots)))
+     {
+         if (next_work_item != NULL &&
+             ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+         {
+             /* there is work still to do and a worker slot available */
+
+             pid_t child;
+
+             next_work_item->prestored = true;
+
+             child = fork();
+             if (child == 0)
+             {
+                 prestore(AH,next_work_item);
+                 /* should not happen ... we expect prestore to exit */
+                 exit(1);
+             }
+             else if (child > 0)
+             {
+                 slots[next_slot].pid = child;
+                 slots[next_slot].te = next_work_item;
+                 slots[next_slot].dumpId = next_work_item->dumpId;
+                 slots[next_slot].tdeps[0] = next_work_item->tdeps[0];
+                 slots[next_slot].tdeps[1] = next_work_item->tdeps[1];
+             }
+             else
+             {
+                 /* XXX fork error - handle it! */
+             }
+             continue; /* in case the slots are not yet full */
+         }
+         /* if we get here there must be work being done */
+         ret_child = wait(&work_status);
+
+         if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+         {
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+         {
+             int i;
+
+             for (i = 0; i < n_slots; i++)
+             {
+                 if (slots[i].pid == ret_child)
+                     _inhibit_data_for_failed_table(AH, slots[i].te);
+                 break;
+             }
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else
+         {
+             /* XXX something went wrong - deal with it */
+         }
+     }
+
+     /*
+  * now process the ACLs - no need to do this in parallel
+      */
+
+     /* reconnect from parent */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Scan TOC to output ownership commands and ACLs
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         AH->currentTE = te;
+
+         /* Work out what, if anything, we want from this entry */
+         reqs = _tocEntryRequired(te, ropt, true);
+
+         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+         {
+             ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+                   te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, true);
+         }
+     }
+
+     /* clean up */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+     ahlog(GAH,1,"is work being done?\n");
+     while(n_slots--)
+     {
+         if (slot->pid > 0)
+             return true;
+         slot++;
+     }
+     ahlog(GAH,1,"work is not being done\n");
+     return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == 0)
+         {
+             ahlog(GAH,1,"available slots is %d\n",i);
+             return i;
+         }
+     }
+     ahlog(GAH,1,"No slot available\n");
+     return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH, ParallelSlot *slots, int n_slots)
+ {
+     TocEntry *te;
+     teReqs    reqs;
+     int       i;
+
+     /* just search from the top of the queue until we find an available item.
+      * Note that the queue isn't reordered in the current implementation. If
+      * we ever do reorder it, then certain code that processes entries from the
+      * current item to the end of the queue will probably need to be
+      * re-examined.
+      */
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         if (!te->prestored && te->depCount < 1)
+         {
+             /* make sure it's not an ACL */
+             reqs = _tocEntryRequired (te, AH->ropt, false);
+             if ((reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
+                 continue;
+
+             /* check against parallel slots for incompatible table locks */
+             for (i=0; i < n_slots; i++)
+             {
+                 if ((slots[i].tdeps[0] != 0 &&
+                      (te->tdeps[0] == slots[i].tdeps[0] || te->tdeps[1] == slots[i].tdeps[0])) ||
+                     (slots[i].tdeps[1] != 0 &&
+                      (te->tdeps[0] == slots[i].tdeps[1] || te->tdeps[1] == slots[i].tdeps[1])))
+                 {
+                     if (strcmp(te->desc,"CONSTRAINT") == 0 ||
+                         strcmp(te->desc,"FK CONSTRAINT") == 0 ||
+                         strcmp(te->desc,"CHECK CONSTRAINT") == 0 ||
+                         strcmp(te->desc,"TRIGGER") == 0 ||
+                         strcmp(slots[i].te->desc,"CONSTRAINT") == 0 ||
+                         strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 ||
+                         strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 ||
+                         strcmp(slots[i].te->desc,"TRIGGER") == 0)
+                     {
+                         /* If either the thing that is running will have an
+                          * AccessExclusive lock on the table, or this item
+                          * would acquire such a lock, the item can't run yet.
+                          */
+                         continue;
+                     }
+
+                 }
+             }
+
+             ahlog(AH,1,"next item is %d\n",te->dumpId);
+             return te;
+         }
+     }
+     ahlog(AH,1,"No item ready\n");
+     return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+     RestoreOptions *ropt = AH->ropt;
+     int retval;
+
+     /* close and reopen the archive so we have a private copy that doesn't
+      * stomp on anyone else's file pointer
+      */
+
+     (AH->ReopenPtr)(AH);
+
+     ConnectDatabase((Archive *)AH, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     retval = _restore_one_te(AH, te, ropt, true);
+
+     PQfinish(AH->connection);
+     exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+                ParallelSlot *slots, int n_slots)
+ {
+
+     TocEntry *te = NULL;
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == worker)
+         {
+             te = slots[i].te;
+             slots[i].pid = 0;
+             slots[i].te = NULL;
+             slots[i].dumpId = 0;
+             slots[i].tdeps[0] = 0;
+             slots[i].tdeps[1] = 0;
+
+             break;
+         }
+     }
+
+     /* Assert (te != NULL); */
+
+     _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+  * Make sure the head of each dependency chain is a live item
+  *
+  * Once this is established the property will be maintained by
+  * _reduce_dependencies called as items are done.
+  */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+     TocEntry * te;
+     RestoreOptions * ropt = AH->ropt;
+     bool *RealDumpIds, *TableDumpIds;
+     DumpId d;
+     int i;
+
+
+     RealDumpIds = calloc(AH->maxDumpId, sizeof(bool));
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         RealDumpIds[te->dumpId-1] = true;
+         if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+     }
+
+     /*
+      * It is possible that the dependencies list items that are
+      * not in the archive at all. Reduce the depcounts so those get
+      * ignored.
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         for (i = 0; i < te->nDeps; i++)
+             if (!RealDumpIds[te->dependencies[i]-1])
+                 te->depCount--;
+
+     TableDumpIds = calloc(AH->maxDumpId,sizeof(bool));
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         if (strcmp(te->desc,"TABLE") == 0)
+             TableDumpIds[te->dumpId-1] = true;
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         for (i = 0; i < te->nDeps; i++)
+         {
+             d = te->dependencies[i];
+             if (TableDumpIds[d-1])
+             {
+                 if (te->tdeps[0] == d || te->tdeps[1] == d)
+                     continue;
+
+                 if (te->tdeps[0] == 0)
+                     te->tdeps[0] = d;
+                 else if (te->tdeps[1] == 0)
+                     te->tdeps[1] = d;
+                 else
+                     die_horribly(AH,modulename,
+                                  "item %d has a dependency on more than two tables", te->dumpId);
+             }
+         }
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+     DumpId item = te->dumpId;
+     RestoreOptions * ropt = AH->ropt;
+     int i;
+
+     for (te = te->next; te != AH->toc; te = te->next)
+     {
+         if (te->nDeps == 0)
+             continue;
+
+         for (i = 0; i < te->nDeps; i++)
+             if (te->dependencies[i] == item)
+                 te->depCount = te->depCount - 1;
+
+         /* If this is a table data item we are making available,
+          * make the table's dependencies depend on this item instead of
+          * the table definition, so they
+          * don't get scheduled until the data is loaded.
+          * Have to do this now before the main loop gets to anything
+          * further down the list.
+          */
+         if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0)
+         {
+             TocEntry *tes;
+             int j;
+             for (tes = te->next; tes != AH->toc; tes = tes->next)
+                 for (j = 0; j < tes->nDeps; j++)
+                     if (tes->dependencies[j] == item)
+                         tes->dependencies[j] = te->dumpId;
+         }
+
+         /*
+          * If this item won't in fact be done, and is now  at
+          * 0 dependency count, we pretend it's been done and
+          * reduce the dependency counts of all the things that
+          * depend on it, by a recursive call
+          */
+         if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+     }
+
+ }
+
+
+ /* Public */
+ void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 667,686 ----
          AH->noTocComments = 1;
      }

+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in
thisinstallation)\n"); 
+         }
+     }
+ #endif
+
      /*
       * Work out if we have an implied data-only restore. This can happen if
       * the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         AH->currentTE = te;
!
!         /* Work out what, if anything, we want from this entry */
!         reqs = _tocEntryRequired(te, ropt, false);
!
!         /* Dump any relevant dump warnings to stderr */
!         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
!         {
!             if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->defn);
!             else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
!         }
!
!         defnDumped = false;
!
!         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
!         {
!             ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
!             _printTocEntry(AH, te, ropt, false, false);
!             defnDumped = true;
!
!             /*
!              * If we could not create a table and --no-data-for-failed-tables
!              * was given, ignore the corresponding TABLE DATA
!              */
!             if (ropt->noDataForFailedTables &&
!                 AH->lastErrorTE == te &&
!                 strcmp(te->desc, "TABLE") == 0)
!             {
!                 TocEntry   *tes;
!
!                 ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
!                       te->tag);
!
!                 for (tes = te->next; tes != AH->toc; tes = tes->next)
!                 {
!                     if (strcmp(tes->desc, "TABLE DATA") == 0 &&
!                         strcmp(tes->tag, te->tag) == 0 &&
!                         strcmp(tes->namespace ? tes->namespace : "",
!                                te->namespace ? te->namespace : "") == 0)
!                     {
!                         /* mark it unwanted */
!                         ropt->idWanted[tes->dumpId - 1] = false;
!                         break;
!                     }
!                 }
!             }
!
!             /* If we created a DB, connect to it... */
!             if (strcmp(te->desc, "DATABASE") == 0)
!             {
!                 ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
!                 _reconnectToDB(AH, te->tag);
!             }
!         }
!
!         /*
!          * If we have a data component, then process it
!          */
!         if ((reqs & REQ_DATA) != 0)
!         {
!             /*
!              * hadDumper will be set if there is genuine data component for
!              * this node. Otherwise, we need to check the defn field for
!              * statements that need to be executed in data-only restores.
!              */
!             if (te->hadDumper)
!             {
!                 /*
!                  * If we can output the data, then restore it.
!                  */
!                 if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
!                 {
! #ifndef HAVE_LIBZ
!                     if (AH->compression != 0)
!                         die_horribly(AH, modulename, "cannot restore from compressed archive (compression not
supportedin this installation)\n"); 
! #endif
!
!                     _printTocEntry(AH, te, ropt, true, false);
!
!                     if (strcmp(te->desc, "BLOBS") == 0 ||
!                         strcmp(te->desc, "BLOB COMMENTS") == 0)
!                     {
!                         ahlog(AH, 1, "restoring %s\n", te->desc);
!
!                         _selectOutputSchema(AH, "pg_catalog");
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!                     }
!                     else
!                     {
!                         _disableTriggersIfNecessary(AH, te, ropt);
!
!                         /* Select owner and schema as necessary */
!                         _becomeOwner(AH, te);
!                         _selectOutputSchema(AH, te->namespace);
!
!                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
!                               te->tag);
!
!                         /*
!                          * If we have a copy statement, use it. As of V1.3,
!                          * these are separate to allow easy import from
!                          * withing a database connection. Pre 1.3 archives can
!                          * not use DB connections and are sent to output only.
!                          *
!                          * For V1.3+, the table data MUST have a copy
!                          * statement so that we can go into appropriate mode
!                          * with libpq.
!                          */
!                         if (te->copyStmt && strlen(te->copyStmt) > 0)
!                         {
!                             ahprintf(AH, "%s", te->copyStmt);
!                             AH->writingCopyData = true;
!                         }
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!
!                         AH->writingCopyData = false;
!
!                         _enableTriggersIfNecessary(AH, te, ropt);
!                     }
!                 }
!             }
!             else if (!defnDumped)
!             {
!                 /* If we haven't already dumped the defn part, do so now */
!                 ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!                 _printTocEntry(AH, te, ropt, false, false);
!             }
!         }
!     }                            /* end loop over TOC entries */

      /*
       * Scan TOC again to output ownership commands and ACLs
--- 780,787 ----
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         (void) _restore_one_te(AH, te, ropt, false);
!     }

      /*
       * Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 829,1009 ----
      }
  }

+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+                 RestoreOptions *ropt, bool is_parallel)
+ {
+     teReqs        reqs;
+     bool        defnDumped;
+     int         retval = 0;
+
+     AH->currentTE = te;
+
+     /* Work out what, if anything, we want from this entry */
+     reqs = _tocEntryRequired(te, ropt, false);
+
+     /* Dump any relevant dump warnings to stderr */
+     if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+     {
+         if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+         else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+     }
+
+     defnDumped = false;
+
+     if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+     {
+         ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+         _printTocEntry(AH, te, ropt, false, false);
+         defnDumped = true;
+
+         /*
+          * If we could not create a table and --no-data-for-failed-tables
+          * was given, ignore the corresponding TABLE DATA
+          *
+          * For the parallel case this must be done in the parent, so we just
+          * set a return value.
+          */
+         if (ropt->noDataForFailedTables &&
+             AH->lastErrorTE == te &&
+             strcmp(te->desc, "TABLE") == 0)
+         {
+             if (is_parallel)
+                 retval = 1;
+             else
+                 _inhibit_data_for_failed_table(AH,te);
+         }
+
+         /* If we created a DB, connect to it... */
+         /* won't happen in parallel restore */
+         if (strcmp(te->desc, "DATABASE") == 0)
+         {
+             ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+             _reconnectToDB(AH, te->tag);
+         }
+     }
+
+     /*
+      * If we have a data component, then process it
+      */
+     if ((reqs & REQ_DATA) != 0)
+     {
+         /*
+          * hadDumper will be set if there is genuine data component for
+          * this node. Otherwise, we need to check the defn field for
+          * statements that need to be executed in data-only restores.
+          */
+         if (te->hadDumper)
+         {
+             /*
+              * If we can output the data, then restore it.
+              */
+             if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+             {
+                 _printTocEntry(AH, te, ropt, true, false);
+
+                 if (strcmp(te->desc, "BLOBS") == 0 ||
+                     strcmp(te->desc, "BLOB COMMENTS") == 0)
+                 {
+                     ahlog(AH, 1, "restoring %s\n", te->desc);
+
+                     _selectOutputSchema(AH, "pg_catalog");
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+                 }
+                 else
+                 {
+                     _disableTriggersIfNecessary(AH, te, ropt);
+
+                     /* Select owner and schema as necessary */
+                     _becomeOwner(AH, te);
+                     _selectOutputSchema(AH, te->namespace);
+
+                     ahlog(AH, 1, "restoring data for table \"%s\"\n",
+                           te->tag);
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             StartTransaction(AH);
+                         else
+                             ahprintf(AH, "BEGIN;\n\n");
+
+                         ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+                                  fmtId(te->tag));                    }
+
+                     /*
+                      * If we have a copy statement, use it. As of V1.3,
+                      * these are separate to allow easy import from
+                      * withing a database connection. Pre 1.3 archives can
+                      * not use DB connections and are sent to output only.
+                      *
+                      * For V1.3+, the table data MUST have a copy
+                      * statement so that we can go into appropriate mode
+                      * with libpq.
+                      */
+                     if (te->copyStmt && strlen(te->copyStmt) > 0)
+                     {
+                         ahprintf(AH, "%s", te->copyStmt);
+                         AH->writingCopyData = true;
+                     }
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+                     AH->writingCopyData = false;
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             CommitTransaction(AH);
+                         else
+                             ahprintf(AH, "COMMIT;\n\n");
+                     }
+
+
+                     _enableTriggersIfNecessary(AH, te, ropt);
+                 }
+             }
+         }
+         else if (!defnDumped)
+         {
+             /* If we haven't already dumped the defn part, do so now */
+             ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, false);
+         }
+     }
+
+     return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+     TocEntry   *tes;
+     RestoreOptions *ropt = AH->ropt;
+
+     ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+           te->tag);
+
+     for (tes = te->next; tes != AH->toc; tes = tes->next)
+     {
+         if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+             strcmp(tes->tag, te->tag) == 0 &&
+             strcmp(tes->namespace ? tes->namespace : "",
+                    te->namespace ? te->namespace : "") == 0)
+         {
+             /* mark it unwanted */
+             ropt->idWanted[tes->dumpId - 1] = false;
+
+             _reduce_dependencies(AH, tes);
+             break;
+         }
+     }
+ }
+
  /*
   * Allocate a new RestoreOptions block.
   * This is mainly so we can initialize it, but also for future expansion,
***************
*** 614,619 ****
--- 1167,1173 ----
      TocEntry   *te = AH->toc->next;
      OutputContext sav;
      char       *fmtName;
+     bool        *TableDumpIds;

      if (ropt->filename)
          sav = SetOutput(AH, ropt->filename, 0 /* no compression */ );
***************
*** 650,662 ****

      ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");

      while (te != AH->toc)
      {
          if (_tocEntryRequired(te, ropt, true) != 0)
!             ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
                       te->catalogId.tableoid, te->catalogId.oid,
                       te->desc, te->namespace ? te->namespace : "-",
                       te->tag, te->owner);
          te = te->next;
      }

--- 1204,1235 ----

      ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");

+     TableDumpIds = calloc(AH->maxDumpId,sizeof(bool));
+     while(te!= AH->toc)
+     {
+         if (strcmp(te->desc,"TABLE") == 0)
+             TableDumpIds[te->dumpId-1] = true;
+         te = te->next;
+     }
+     te = AH->toc->next;
+
      while (te != AH->toc)
      {
          if (_tocEntryRequired(te, ropt, true) != 0)
!         {
!             int i;
!             ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps);
!             for (i=0 ;i<te->nDeps; i++)
!                 ahprintf(AH, "%d ",te->dependencies[i]);
!             ahprintf(AH, "] { ");
!             for (i=0 ;i<te->nDeps; i++)
!                 if (TableDumpIds[te->dependencies[i]-1])
!                     ahprintf(AH, "%d ",te->dependencies[i]);
!             ahprintf(AH,"} %u %u %s %s %s %s\n",
                       te->catalogId.tableoid, te->catalogId.oid,
                       te->desc, te->namespace ? te->namespace : "-",
                       te->tag, te->owner);
+         }
          te = te->next;
      }

***************
*** 1948,1965 ****
--- 2521,2541 ----
                  deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
                  te->dependencies = deps;
                  te->nDeps = depIdx;
+                 te->depCount = depIdx;
              }
              else
              {
                  free(deps);
                  te->dependencies = NULL;
                  te->nDeps = 0;
+                 te->depCount = 0;
              }
          }
          else
          {
              te->dependencies = NULL;
              te->nDeps = 0;
+             te->depCount = 0;
          }

          if (AH->ReadExtraTocPtr)
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h    7 Nov 2007 12:24:24 -0000    1.76
--- pg_backup_archiver.h    29 Sep 2008 23:34:58 -0000
***************
*** 99,104 ****
--- 99,105 ----
  struct _restoreList;

  typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
  typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);

  typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
      WriteBufPtr WriteBufPtr;    /* Write a buffer of output to the archive */
      ReadBufPtr ReadBufPtr;        /* Read a buffer of input from the archive */
      ClosePtr ClosePtr;            /* Close the archive */
+     ReopenPtr ReopenPtr;            /* Reopen the archive */
      WriteExtraTocPtr WriteExtraTocPtr;    /* Write extra TOC entry data
                                           * associated with the current archive
                                           * format */
***************
*** 231,236 ****
--- 233,239 ----
      char       *archdbname;        /* DB name *read* from archive */
      bool        requirePassword;
      PGconn       *connection;
+     char       *cachepw;
      int            connectToDB;    /* Flag to indicate if direct DB connection is
                                   * required */
      bool        writingCopyData;    /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
      DumpId        dumpId;
      bool        hadDumper;        /* Archiver was passed a dumper routine (used
                                   * in restore) */
+     bool        prestored;      /* keep track of parallel restore */
      char       *tag;            /* index tag */
      char       *namespace;        /* null or empty string if not in a schema */
      char       *tablespace;        /* null if not in a tablespace; empty string
***************
*** 296,301 ****
--- 300,307 ----
      char       *copyStmt;
      DumpId       *dependencies;    /* dumpIds of objects this one depends on */
      int            nDeps;            /* number of dependencies */
+     int         depCount;       /* adjustable tally of dependencies */
+     int         tdeps[2];

      DataDumperPtr dataDumper;    /* Routine to dump data for object */
      void       *dataDumperArg;    /* Arg for above routine */
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c    28 Oct 2007 21:55:52 -0000    1.40
--- pg_backup_custom.c    29 Sep 2008 23:34:58 -0000
***************
*** 40,45 ****
--- 40,46 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
  static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = _ReopenArchive;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
      AH->FH = NULL;
  }

+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+     lclContext *ctx = (lclContext *) AH->formatData;
+     pgoff_t        tpos;
+
+     if (AH->mode == archModeWrite)
+     {
+         die_horribly(AH,modulename,"Can only reopen input archives");
+     }
+     else if ((! AH->fSpec) ||  strcmp(AH->fSpec, "") == 0)
+     {
+         die_horribly(AH,modulename,"Cannot reopen stdin");
+     }
+
+     tpos = ftello(AH->FH);
+
+     if (fclose(AH->FH) != 0)
+         die_horribly(AH, modulename, "could not close archive file: %s\n",
+                      strerror(errno));
+
+     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+     if (!AH->FH)
+         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+                      AH->fSpec, strerror(errno));
+
+     if (ctx->hasSeek)
+     {
+         fseeko(AH->FH, tpos, SEEK_SET);
+     }
+     else
+     {
+         die_horribly(AH,modulename,"cannot reopen non-seekable file");
+     }
+
+ }
+
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c    16 Aug 2008 02:25:06 -0000    1.80
--- pg_backup_db.c    29 Sep 2008 23:34:58 -0000
***************
*** 138,148 ****

      ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

!     if (AH->requirePassword)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
      }

      do
--- 138,153 ----

      ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

!     if (AH->requirePassword && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
+         AH->requirePassword = true;
+     }
+     else if (AH->requirePassword)
+     {
+         password = AH->cachepw;
      }

      do
***************
*** 174,180 ****
          }
      } while (new_pass);

!     if (password)
          free(password);

      /* check for version mismatch */
--- 179,185 ----
          }
      } while (new_pass);

!     if (password != AH->cachepw)
          free(password);

      /* check for version mismatch */
***************
*** 206,220 ****
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
      else
          AH->requirePassword = false;

      /*
       * Start the connection.  Loop until we have a password if requested by
--- 211,231 ----
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
+     else if (reqPwd)
+     {
+         password = AH->cachepw;
+     }
      else
+     {
          AH->requirePassword = false;
+     }

      /*
       * Start the connection.  Loop until we have a password if requested by
***************
*** 241,247 ****
      } while (new_pass);

      if (password)
!         free(password);

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 252,258 ----
      } while (new_pass);

      if (password)
!         AH->cachepw = password;

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c    28 Oct 2007 21:55:52 -0000    1.34
--- pg_backup_files.c    29 Sep 2008 23:34:58 -0000
***************
*** 87,92 ****
--- 87,93 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c    15 Nov 2007 21:14:41 -0000    1.62
--- pg_backup_tar.c    29 Sep 2008 23:34:58 -0000
***************
*** 143,148 ****
--- 143,149 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c    13 Apr 2008 03:49:22 -0000    1.88
--- pg_restore.c    29 Sep 2008 23:34:58 -0000
***************
*** 78,83 ****
--- 78,84 ----
      static int    no_data_for_failed_tables = 0;
      static int  outputNoTablespaces = 0;
      static int    use_setsessauth = 0;
+     static int  truncate_before_load = 0;

      struct option cmdopts[] = {
          {"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
          {"ignore-version", 0, NULL, 'i'},
          {"index", 1, NULL, 'I'},
          {"list", 0, NULL, 'l'},
+         {"multi-thread",1,NULL,'m'},
          {"no-privileges", 0, NULL, 'x'},
          {"no-acl", 0, NULL, 'x'},
          {"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
          {"disable-triggers", no_argument, &disable_triggers, 1},
          {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
          {"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+         {"truncate-before-load", no_argument, &truncate_before_load, 1},
          {"use-set-session-authorization", no_argument, &use_setsessauth, 1},

          {NULL, 0, NULL, 0}
***************
*** 139,145 ****
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
--- 142,148 ----
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
***************
*** 182,187 ****
--- 185,194 ----
                  opts->tocFile = strdup(optarg);
                  break;

+             case 'm':
+                 opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+                 break;
+
              case 'n':            /* Dump data for this schema only */
                  opts->schemaNames = strdup(optarg);
                  break;
***************
*** 262,268 ****
                  break;

              case 0:
!                 /* This covers the long options equivalent to -X xxx. */
                  break;

              case '1':            /* Restore data in a single transaction */
--- 269,278 ----
                  break;

              case 0:
!                 /*
!                  * This covers the long options without a short equivalent,
!                  * including those equivalent to -X xxx.
!                  */
                  break;

              case '1':            /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
      opts->noDataForFailedTables = no_data_for_failed_tables;
      opts->noTablespace = outputNoTablespaces;
      opts->use_setsessauth = use_setsessauth;
+     opts->truncate_before_load = truncate_before_load;
+
+     if (opts->single_txn)
+     {
+         if (opts->number_of_threads > 1)
+         {
+             write_msg(NULL, "single transaction not compatible with multi-threading");
+             exit(1);
+         }
+         else if (opts->truncate_before_load)
+         {
+             write_msg(NULL, "single transaction not compatible with truncate-before-load");
+             exit(1);
+         }
+     }

      if (opts->formatName)
      {
***************
*** 330,335 ****
--- 355,362 ----

      AH = OpenArchive(inputFileSpec, opts->format);

+     /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
      /* Let the archiver know how noisy to be */
      AH->verbose = opts->verbose;

***************
*** 351,356 ****
--- 378,385 ----

      if (opts->tocSummary)
          PrintTOCSummary(AH, opts);
+     else if (opts->number_of_threads > 1)
+         RestoreArchiveParallel(AH, opts);
      else
          RestoreArchive(AH, opts);


Re: parallel pg_restore - WIP patch

От
Philip Warner
Дата:
>
> +                     if (strcmp(te->desc,"CONSTRAINT") == 0 ||
> +                         strcmp(te->desc,"FK CONSTRAINT") == 0 ||
> +                         strcmp(te->desc,"CHECK CONSTRAINT") == 0 ||
> +                         strcmp(te->desc,"TRIGGER") == 0 ||
> +                         strcmp(slots[i].te->desc,"CONSTRAINT") == 0 ||
> +                         strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 ||
> +                         strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 ||
> +                         strcmp(slots[i].te->desc,"TRIGGER") == 0)
>   
Really just an observation from the peanut gallery here, but every time
pg_restore hard-codes this kind of thing, it introduces yet another
possible side-effect bug when someone, eg, adds a new TOC type.

Would it substantially decrease the benefits of the patch to skip *any*
toc entry that shares dependencies with another? (rather than just those
listed above).





Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Philip Warner wrote:
>> +                     if (strcmp(te->desc,"CONSTRAINT") == 0 ||
>> +                         strcmp(te->desc,"FK CONSTRAINT") == 0 ||
>> +                         strcmp(te->desc,"CHECK CONSTRAINT") == 0 ||
>> +                         strcmp(te->desc,"TRIGGER") == 0 ||
>> +                         strcmp(slots[i].te->desc,"CONSTRAINT") == 0 ||
>> +                         strcmp(slots[i].te->desc,"FK CONSTRAINT") == 0 ||
>> +                         strcmp(slots[i].te->desc,"CHECK CONSTRAINT") == 0 ||
>> +                         strcmp(slots[i].te->desc,"TRIGGER") == 0)
>>   
>>     
> Really just an observation from the peanut gallery here, but every time
> pg_restore hard-codes this kind of thing, it introduces yet another
> possible side-effect bug when someone, eg, adds a new TOC type.
>
> Would it substantially decrease the benefits of the patch to skip *any*
> toc entry that shares dependencies with another? (rather than just those
> listed above).
>
>
>   

Unfortunately, it quite possibly would. You would not be able to build 
two indexes on the same table in parallel, even though they wouldn't 
have conflicting locks.

cheers

andrew




Re: parallel pg_restore - WIP patch

От
Philip Warner
Дата:
Andrew Dunstan wrote:
> Unfortunately, it quite possibly would. You would not be able to build
> two indexes on the same table in parallel, even though they wouldn't
> have conflicting locks.
I suppose so, but:

1. By the same logic it might speed things up; it might build two
completely separate indexes and thereby avoid (some kind of) contention.
In any case, it would most likely do *something* else. It should only
reduce performance if (a) it can do nothing or (b) there is a benefit in
building multiple indexes on the same table at the same time.

2. Perhaps if there are a limited number of items that share
dependencies but which are known to be OK (ie. indexes), maybe list them
in the inner loop as exceptions and allow them to run parallel. This
would mean a failure to list a new TOC item type would result in worse
performance rather than a crash.





Re: parallel pg_restore - WIP patch

От
Andrew Dunstan
Дата:

Philip Warner wrote:
> Andrew Dunstan wrote:
>   
>> Unfortunately, it quite possibly would. You would not be able to build
>> two indexes on the same table in parallel, even though they wouldn't
>> have conflicting locks.
>>     
> I suppose so, but:
>
> 1. By the same logic it might speed things up; it might build two
> completely separate indexes and thereby avoid (some kind of) contention.
> In any case, it would most likely do *something* else. It should only
> reduce performance if (a) it can do nothing or (b) there is a benefit in
> building multiple indexes on the same table at the same time.
>
> 2. Perhaps if there are a limited number of items that share
> dependencies but which are known to be OK (ie. indexes), maybe list them
> in the inner loop as exceptions and allow them to run parallel. This
> would mean a failure to list a new TOC item type would result in worse
> performance rather than a crash.
>
>
>   

I will look at it in due course. Right now my concern is simply to get 
something that works that we can do some testing with. I think that's 
what we have now (fingers crossed). Some parts of it are jury rigged.

BTW, though, building indexes for the same table together is likely to 
be a win AIUI, especially given the recent work on synchronised scans.

cheers

andrew


Re: parallel pg_restore - WIP patch

От
Stefan Kaltenbrunner
Дата:
Andrew Dunstan wrote:
> 
> 
> Stefan Kaltenbrunner wrote:
>> Tom Lane wrote:
>>> Andrew Dunstan <andrew@dunslane.net> writes:
>>>> Tom Lane wrote:
>>>>> Um, FKs could conflict with each other too, so that by itself isn't
>>>>> gonna fix anything.
>>>
>>>> Good point. Looks like we'll need to make a list of "can't run in 
>>>> parallel with" items as well as strict dependencies.
>>>
>>> Yeah, I was just thinking about that.  The current archive format
>>> doesn't really carry enough information for this.  I think there
>>> are two basic solutions we could adopt:
>>>
>>> * Extend the archive format to provide some indication that "restoring
>>> this object requires exclusive access to these dependencies".
>>>
>>> * Hardwire knowledge into pg_restore that certain types of objects
>>> require exclusive access to their dependencies.
>>>
>>> The former seems more flexible, as well as more in tune with the basic
>>> design assumption that pg_restore shouldn't have a lot of knowledge
>>> about individual archive object types.  But it would mean that you
>>> couldn't use parallel restore with any pre-8.4 dumps.  In the long run
>>> that's no big deal, but in the short run it's annoying.
>>
>> hmm not sure how much of a problem that really is - we usually 
>> recommend to use the pg_dump version of the target database anyway.
>>
>>
>>
>>
> 
> We don't really need a huge amount of hardwiring as it turns out. Here 
> is a version of the patch that tries to do what's needed in this area.

this one is much better - however I still seem to be able to create 
deadlock scenarios with strange FK relations - ie FKs going in both 
directions between two tables.

for those interested these are the timings on my 8 core testbox for my 
test database:

single process restore: 169min
-m2: 101min
-m6: 64min
-m8: 63min
-m16: 56min


Stefan