Re: [HACKERS] Change in "policy" on dump ordering?

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [HACKERS] Change in "policy" on dump ordering?
Дата
Msg-id 18658.1501037135@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [HACKERS] Change in "policy" on dump ordering?  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [HACKERS] Change in "policy" on dump ordering?  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers
I wrote:
> The main problem with Kevin's fix, after thinking about it more, is that
> it shoves matview refresh commands into the same final processing phase
> where ACLs are done, which means that in a parallel restore they will not
> be done in parallel.  That seems like a pretty serious objection, although
> maybe not so serious that we'd be willing to accept a major rewrite in the
> back branches to avoid it.

> I'm wondering at this point about having restore create a fake DO_ACLS
> object (fake in the sense that it isn't in the dump file) that would
> participate normally in the dependency sort, and which we'd give a
> priority before matview refreshes but after everything else.  "Restore"
> of that object would perform the same operation we do now of running
> through the whole TOC and emitting grants/revokes.  So it couldn't be
> parallelized in itself (at least not without an additional batch of work)
> but it could be treated as an indivisible parallelized task, and then the
> matview refreshes could be parallelizable tasks after that.

> There's also Peter's proposal of splitting up GRANTs from REVOKEs and
> putting only the latter at the end.  I'm not quite convinced that that's
> a good idea but it certainly merits consideration.

After studying things for awhile, I've concluded that that last option
is probably not workable.  ACL items contain a blob of SQL that would be
tricky to pull apart, and is both version and options dependent, and
contains ordering dependencies that seem likely to defeat any desire
to put the REVOKEs last anyway.

Instead, I've prepared the attached draft patch, which addresses the
problem by teaching pg_backup_archiver.c to process TOC entries in
three separate passes, "main" then ACLs then matview refreshes.
It's survived light testing but could doubtless use further review.

Another way we could attack this is to adopt something similar to
the PRE_DATA_BOUNDARY/POST_DATA_BOUNDARY mechanism; that is, invent more
dummy section boundary objects, add dependencies sufficient to constrain
all TOC objects to be before or after the appropriate boundaries, and
then let the dependency sort go at it.  But I think that way is probably
more expensive than this one, and it doesn't have any real advantage if
there's not a potential for circular dependencies that need to be broken.
If somebody else wants to try drafting a patch like that, I won't stand
in the way, but I don't wanna do so.

Not clear where we want to go from here.  Should we try to get this
into next month's minor releases, or review it in September's commitfest
and back-patch after that?

            regards, tom lane

diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 6123859..3687687 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef enum
*** 203,208 ****
--- 203,230 ----
      OUTPUT_OTHERDATA            /* writing data as INSERT commands */
  } ArchiverOutput;

+ /*
+  * For historical reasons, ACL items are interspersed with everything else in
+  * a dump file's TOC; typically they're right after the object they're for.
+  * However, we need to restore data before ACLs, as otherwise a read-only
+  * table (ie one where the owner has revoked her own INSERT privilege) causes
+  * data restore failures.  On the other hand, matview REFRESH commands should
+  * come out after ACLs, as otherwise non-superuser-owned matviews might not
+  * be able to execute.  (If the permissions at the time of dumping would not
+  * allow a REFRESH, too bad; we won't fix that for you.)  These considerations
+  * force us to make three passes over the TOC, restoring the appropriate
+  * subset of items in each pass.  We assume that the dependency sort resulted
+  * in an appropriate ordering of items within each subset.
+  */
+ typedef enum
+ {
+     RESTORE_PASS_MAIN = 0,        /* Main pass (most TOC item types) */
+     RESTORE_PASS_ACL,            /* ACL item types */
+     RESTORE_PASS_REFRESH        /* Matview REFRESH items */
+
+ #define RESTORE_PASS_LAST RESTORE_PASS_REFRESH
+ } RestorePass;
+
  typedef enum
  {
      REQ_SCHEMA = 0x01,            /* want schema */
*************** struct _archiveHandle
*** 329,334 ****
--- 351,357 ----
      int            noTocComments;
      ArchiverStage stage;
      ArchiverStage lastErrorStage;
+     RestorePass restorePass;    /* used only during parallel restore */
      struct _tocEntry *currentTE;
      struct _tocEntry *lastErrorTE;
  };
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index f461692..4cfb71c 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** static ArchiveHandle *_allocAH(const cha
*** 58,64 ****
           SetupWorkerPtrType setupWorkerPtr);
  static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
                        ArchiveHandle *AH);
! static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass);
  static char *replace_line_endings(const char *str);
  static void _doSetFixedOutputState(ArchiveHandle *AH);
  static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
--- 58,64 ----
           SetupWorkerPtrType setupWorkerPtr);
  static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
                        ArchiveHandle *AH);
! static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
  static char *replace_line_endings(const char *str);
  static void _doSetFixedOutputState(ArchiveHandle *AH);
  static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
*************** static void _selectTablespace(ArchiveHan
*** 71,76 ****
--- 71,77 ----
  static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
  static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
  static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt);
+ static RestorePass _tocEntryRestorePass(TocEntry *te);
  static bool _tocEntryIsACL(TocEntry *te);
  static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
  static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
*************** static OutputContext SaveOutput(ArchiveH
*** 86,98 ****
  static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);

  static int    restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
! static void restore_toc_entries_prefork(ArchiveHandle *AH);
! static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list);
- static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
                     TocEntry *ready_list,
                     ParallelState *pstate);
--- 87,104 ----
  static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);

  static int    restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
! static void restore_toc_entries_prefork(ArchiveHandle *AH,
!                             TocEntry *pending_list);
! static void restore_toc_entries_parallel(ArchiveHandle *AH,
!                              ParallelState *pstate,
!                              TocEntry *pending_list);
! static void restore_toc_entries_postfork(ArchiveHandle *AH,
                               TocEntry *pending_list);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
+ static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+                    RestorePass pass);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
                     TocEntry *ready_list,
                     ParallelState *pstate);
*************** RestoreArchive(Archive *AHX)
*** 625,644 ****
          AH->currSchema = NULL;
      }

-     /*
-      * In serial mode, we now process each non-ACL TOC entry.
-      *
-      * In parallel mode, turn control over to the parallel-restore logic.
-      */
      if (parallel_mode)
      {
          ParallelState *pstate;
          TocEntry    pending_list;

          par_list_header_init(&pending_list);

          /* This runs PRE_DATA items and then disconnects from the database */
!         restore_toc_entries_prefork(AH);
          Assert(AH->connection == NULL);

          /* ParallelBackupStart() will actually fork the processes */
--- 631,648 ----
          AH->currSchema = NULL;
      }

      if (parallel_mode)
      {
+         /*
+          * In parallel mode, turn control over to the parallel-restore logic.
+          */
          ParallelState *pstate;
          TocEntry    pending_list;

          par_list_header_init(&pending_list);

          /* This runs PRE_DATA items and then disconnects from the database */
!         restore_toc_entries_prefork(AH, &pending_list);
          Assert(AH->connection == NULL);

          /* ParallelBackupStart() will actually fork the processes */
*************** RestoreArchive(Archive *AHX)
*** 652,679 ****
      }
      else
      {
          for (te = AH->toc->next; te != AH->toc; te = te->next)
!             (void) restore_toc_entry(AH, te, false);
!     }

!     /*
!      * Scan TOC again to output ownership commands and ACLs
!      */
!     for (te = AH->toc->next; te != AH->toc; te = te->next)
!     {
!         AH->currentTE = te;

!         /* Both schema and data objects might now have ownership/ACLs */
!         if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
          {
!             /* Show namespace if available */
!             if (te->namespace)
!                 ahlog(AH, 1, "setting owner and privileges for %s \"%s.%s\"\n",
!                       te->desc, te->namespace, te->tag);
!             else
!                 ahlog(AH, 1, "setting owner and privileges for %s \"%s\"\n",
!                       te->desc, te->tag);
!             _printTocEntry(AH, te, false, true);
          }
      }

--- 656,706 ----
      }
      else
      {
+         /*
+          * In serial mode, process everything in three phases: normal items,
+          * then ACLs, then matview refresh items.  We might be able to skip
+          * one or both extra phases in some cases, eg data-only restores.
+          */
+         bool        haveACL = false;
+         bool        haveRefresh = false;
+
          for (te = AH->toc->next; te != AH->toc; te = te->next)
!         {
!             if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
!                 continue;        /* ignore if not to be dumped at all */

!             switch (_tocEntryRestorePass(te))
!             {
!                 case RESTORE_PASS_MAIN:
!                     (void) restore_toc_entry(AH, te, false);
!                     break;
!                 case RESTORE_PASS_ACL:
!                     haveACL = true;
!                     break;
!                 case RESTORE_PASS_REFRESH:
!                     haveRefresh = true;
!                     break;
!             }
!         }

!         if (haveACL)
          {
!             for (te = AH->toc->next; te != AH->toc; te = te->next)
!             {
!                 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
!                     _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
!                     (void) restore_toc_entry(AH, te, false);
!             }
!         }
!
!         if (haveRefresh)
!         {
!             for (te = AH->toc->next; te != AH->toc; te = te->next)
!             {
!                 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
!                     _tocEntryRestorePass(te) == RESTORE_PASS_REFRESH)
!                     (void) restore_toc_entry(AH, te, false);
!             }
          }
      }

*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 720,729 ****
      AH->currentTE = te;

      /* Work out what, if anything, we want from this entry */
!     if (_tocEntryIsACL(te))
!         reqs = 0;                /* ACLs are never restored here */
!     else
!         reqs = te->reqs;

      /*
       * Ignore DATABASE entry unless we should create it.  We must check this
--- 747,753 ----
      AH->currentTE = te;

      /* Work out what, if anything, we want from this entry */
!     reqs = te->reqs;

      /*
       * Ignore DATABASE entry unless we should create it.  We must check this
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 744,760 ****

      defnDumped = false;

!     if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
      {
!         /* Show namespace if available */
          if (te->namespace)
              ahlog(AH, 1, "creating %s \"%s.%s\"\n",
                    te->desc, te->namespace, te->tag);
          else
              ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);

!
!         _printTocEntry(AH, te, false, false);
          defnDumped = true;

          if (strcmp(te->desc, "TABLE") == 0)
--- 768,786 ----

      defnDumped = false;

!     /*
!      * If it has a schema component that we want, then process that
!      */
!     if ((reqs & REQ_SCHEMA) != 0)
      {
!         /* Show namespace in log message if available */
          if (te->namespace)
              ahlog(AH, 1, "creating %s \"%s.%s\"\n",
                    te->desc, te->namespace, te->tag);
          else
              ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag);

!         _printTocEntry(AH, te, false);
          defnDumped = true;

          if (strcmp(te->desc, "TABLE") == 0)
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 810,816 ****
      }

      /*
!      * If we have a data component, then process it
       */
      if ((reqs & REQ_DATA) != 0)
      {
--- 836,842 ----
      }

      /*
!      * If it has a data component that we want, then process that
       */
      if ((reqs & REQ_DATA) != 0)
      {
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 826,832 ****
               */
              if (AH->PrintTocDataPtr != NULL)
              {
!                 _printTocEntry(AH, te, true, false);

                  if (strcmp(te->desc, "BLOBS") == 0 ||
                      strcmp(te->desc, "BLOB COMMENTS") == 0)
--- 852,858 ----
               */
              if (AH->PrintTocDataPtr != NULL)
              {
!                 _printTocEntry(AH, te, true);

                  if (strcmp(te->desc, "BLOBS") == 0 ||
                      strcmp(te->desc, "BLOB COMMENTS") == 0)
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 914,920 ****
          {
              /* If we haven't already dumped the defn part, do so now */
              ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!             _printTocEntry(AH, te, false, false);
          }
      }

--- 940,946 ----
          {
              /* If we haven't already dumped the defn part, do so now */
              ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!             _printTocEntry(AH, te, false);
          }
      }

*************** _tocEntryRequired(TocEntry *te, teSectio
*** 2944,2950 ****
--- 2970,2998 ----
  }

  /*
+  * Identify which pass we should restore this TOC entry in.
+  *
+  * See notes with the RestorePass typedef in pg_backup_archiver.h.
+  */
+ static RestorePass
+ _tocEntryRestorePass(TocEntry *te)
+ {
+     /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+     if (strcmp(te->desc, "ACL") == 0 ||
+         strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+         strcmp(te->desc, "DEFAULT ACL") == 0)
+         return RESTORE_PASS_ACL;
+     if (strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
+         return RESTORE_PASS_REFRESH;
+     return RESTORE_PASS_MAIN;
+ }
+
+ /*
   * Identify TOC entries that are ACLs.
+  *
+  * Note: it seems worth duplicating some code here to avoid a hard-wired
+  * assumption that these are exactly the same entries that we restore during
+  * the RESTORE_PASS_ACL phase.
   */
  static bool
  _tocEntryIsACL(TocEntry *te)
*************** _getObjectDescription(PQExpBuffer buf, T
*** 3364,3386 ****
                type);
  }

  static void
! _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass)
  {
      RestoreOptions *ropt = AH->public.ropt;

-     /* ACLs are dumped only during acl pass */
-     if (acl_pass)
-     {
-         if (!_tocEntryIsACL(te))
-             return;
-     }
-     else
-     {
-         if (_tocEntryIsACL(te))
-             return;
-     }
-
      /*
       * Avoid dumping the public schema, as it will already be created ...
       * unless we are using --clean mode (and *not* --create mode), in which
--- 3412,3429 ----
                type);
  }

+ /*
+  * Emit the SQL commands to create the object represented by a TOC entry
+  *
+  * This now also includes issuing an ALTER OWNER command to restore the
+  * object's ownership, if wanted.  But note that the object's permissions
+  * will remain at default, until the matching ACL TOC entry is restored.
+  */
  static void
! _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
  {
      RestoreOptions *ropt = AH->public.ropt;

      /*
       * Avoid dumping the public schema, as it will already be created ...
       * unless we are using --clean mode (and *not* --create mode), in which
*************** _printTocEntry(ArchiveHandle *AH, TocEnt
*** 3567,3573 ****
       * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
       * commands, so we can no longer assume we know the current auth setting.
       */
!     if (acl_pass)
      {
          if (AH->currUser)
              free(AH->currUser);
--- 3610,3616 ----
       * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
       * commands, so we can no longer assume we know the current auth setting.
       */
!     if (_tocEntryIsACL(te))
      {
          if (AH->currUser)
              free(AH->currUser);
*************** replace_line_endings(const char *str)
*** 3597,3602 ****
--- 3640,3648 ----
      return result;
  }

+ /*
+  * Write the file header for a custom-format archive
+  */
  void
  WriteHead(ArchiveHandle *AH)
  {
*************** dumpTimestamp(ArchiveHandle *AH, const c
*** 3772,3787 ****
  /*
   * Main engine for parallel restore.
   *
!  * Work is done in three phases.
!  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
!  * just as for a standard restore.  Second we process the remaining non-ACL
!  * steps in parallel worker children (threads on Windows, processes on Unix),
!  * each of which connects separately to the database.  Finally we process all
!  * the ACL entries in a single connection (that happens back in
!  * RestoreArchive).
   */
  static void
! restore_toc_entries_prefork(ArchiveHandle *AH)
  {
      bool        skipped_some;
      TocEntry   *next_work_item;
--- 3818,3831 ----
  /*
   * Main engine for parallel restore.
   *
!  * Parallel restore is done in three phases.  In this first phase,
!  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
!  * processed in the RESTORE_PASS_MAIN pass.  (In practice, that's all
!  * PRE_DATA items other than ACLs.)  Entries we can't process now are
!  * added to the pending_list for later phases to deal with.
   */
  static void
! restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
  {
      bool        skipped_some;
      TocEntry   *next_work_item;
*************** restore_toc_entries_prefork(ArchiveHandl
*** 3799,3821 ****
       * about showing all the dependencies of SECTION_PRE_DATA items, so we do
       * not risk trying to process them out-of-order.
       *
       * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
       * before DATA items, and all DATA items before POST_DATA items.  That is
       * not certain to be true in older archives, though, so this loop is coded
       * to not assume it.
       */
      skipped_some = false;
      for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
      {
!         /* NB: process-or-continue logic must be the inverse of loop below */
          if (next_work_item->section != SECTION_PRE_DATA)
          {
              /* DATA and POST_DATA items are just ignored for now */
              if (next_work_item->section == SECTION_DATA ||
                  next_work_item->section == SECTION_POST_DATA)
              {
                  skipped_some = true;
-                 continue;
              }
              else
              {
--- 3843,3873 ----
       * about showing all the dependencies of SECTION_PRE_DATA items, so we do
       * not risk trying to process them out-of-order.
       *
+      * Stuff that we can't do immediately gets added to the pending_list.
+      * Note: we don't yet filter out entries that aren't going to be restored.
+      * They might participate in dependency chains connecting entries that
+      * should be restored, so we treat them as live until we actually process
+      * them.
+      *
       * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
       * before DATA items, and all DATA items before POST_DATA items.  That is
       * not certain to be true in older archives, though, so this loop is coded
       * to not assume it.
       */
+     AH->restorePass = RESTORE_PASS_MAIN;
      skipped_some = false;
      for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
      {
!         bool        do_now = true;
!
          if (next_work_item->section != SECTION_PRE_DATA)
          {
              /* DATA and POST_DATA items are just ignored for now */
              if (next_work_item->section == SECTION_DATA ||
                  next_work_item->section == SECTION_POST_DATA)
              {
+                 do_now = false;
                  skipped_some = true;
              }
              else
              {
*************** restore_toc_entries_prefork(ArchiveHandl
*** 3826,3843 ****
                   * comment's dependencies are satisfied, so skip it for now.
                   */
                  if (skipped_some)
!                     continue;
              }
          }

!         ahlog(AH, 1, "processing item %d %s %s\n",
!               next_work_item->dumpId,
!               next_work_item->desc, next_work_item->tag);

!         (void) restore_toc_entry(AH, next_work_item, false);

!         /* there should be no touch of ready_list here, so pass NULL */
!         reduce_dependencies(AH, next_work_item, NULL);
      }

      /*
--- 3878,3912 ----
                   * comment's dependencies are satisfied, so skip it for now.
                   */
                  if (skipped_some)
!                     do_now = false;
              }
          }

!         /*
!          * Also skip items that need to be forced into later passes.  We need
!          * not set skipped_some in this case, since by assumption no main-pass
!          * items could depend on these.
!          */
!         if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
!             do_now = false;

!         if (do_now)
!         {
!             /* OK, restore the item and update its dependencies */
!             ahlog(AH, 1, "processing item %d %s %s\n",
!                   next_work_item->dumpId,
!                   next_work_item->desc, next_work_item->tag);

!             (void) restore_toc_entry(AH, next_work_item, false);
!
!             /* there should be no touch of ready_list here, so pass NULL */
!             reduce_dependencies(AH, next_work_item, NULL);
!         }
!         else
!         {
!             /* Nope, so add it to pending_list */
!             par_list_append(pending_list, next_work_item);
!         }
      }

      /*
*************** restore_toc_entries_prefork(ArchiveHandl
*** 3863,3951 ****
  /*
   * Main engine for parallel restore.
   *
!  * Work is done in three phases.
!  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
!  * just as for a standard restore. This is done in restore_toc_entries_prefork().
!  * Second we process the remaining non-ACL steps in parallel worker children
!  * (threads on Windows, processes on Unix), these fork off and set up their
!  * connections before we call restore_toc_entries_parallel_forked.
!  * Finally we process all the ACL entries in a single connection (that happens
!  * back in RestoreArchive).
   */
  static void
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
-     bool        skipped_some;
      TocEntry    ready_list;
      TocEntry   *next_work_item;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

      /*
!      * Initialize the lists of ready items, the list for pending items has
!      * already been initialized in the caller.  After this setup, the pending
!      * list is everything that needs to be done but is blocked by one or more
!      * dependencies, while the ready list contains items that have no
!      * remaining dependencies. Note: we don't yet filter out entries that
!      * aren't going to be restored. They might participate in dependency
!      * chains connecting entries that should be restored, so we treat them as
!      * live until we actually process them.
       */
      par_list_header_init(&ready_list);
!     skipped_some = false;
!     for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
!     {
!         /* NB: process-or-continue logic must be the inverse of loop above */
!         if (next_work_item->section == SECTION_PRE_DATA)
!         {
!             /* All PRE_DATA items were dealt with above */
!             continue;
!         }
!         if (next_work_item->section == SECTION_DATA ||
!             next_work_item->section == SECTION_POST_DATA)
!         {
!             /* set this flag at same point that previous loop did */
!             skipped_some = true;
!         }
!         else
!         {
!             /* SECTION_NONE items must be processed if previous loop didn't */
!             if (!skipped_some)
!                 continue;
!         }
!
!         if (next_work_item->depCount > 0)
!             par_list_append(pending_list, next_work_item);
!         else
!             par_list_append(&ready_list, next_work_item);
!     }

      /*
       * main parent loop
       *
       * Keep going until there is no worker still running AND there is no work
!      * left to be done.
       */
-
      ahlog(AH, 1, "entering main parallel loop\n");

!     while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
!            !IsEveryWorkerIdle(pstate))
      {
          if (next_work_item != NULL)
          {
              /* If not to be restored, don't waste time launching a worker */
!             if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0 ||
!                 _tocEntryIsACL(next_work_item))
              {
                  ahlog(AH, 1, "skipping item %d %s %s\n",
                        next_work_item->dumpId,
                        next_work_item->desc, next_work_item->tag);
!
                  par_list_remove(next_work_item);
                  reduce_dependencies(AH, next_work_item, &ready_list);
!
                  continue;
              }

--- 3932,3991 ----
  /*
   * Main engine for parallel restore.
   *
!  * Parallel restore is done in three phases.  In this second phase,
!  * we process entries by dispatching them to parallel worker children
!  * (processes on Unix, threads on Windows), each of which connects
!  * separately to the database.  Inter-entry dependencies are respected,
!  * and so is the RestorePass multi-pass structure.  When we can no longer
!  * make any entries ready to process, we exit.  Normally, there will be
!  * nothing left to do; but if there is, the third phase will mop up.
   */
  static void
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
      TocEntry    ready_list;
      TocEntry   *next_work_item;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

      /*
!      * The pending_list contains all items that we need to restore.  Move all
!      * items that are available to process immediately into the ready_list.
!      * After this setup, the pending list is everything that needs to be done
!      * but is blocked by one or more dependencies, while the ready list
!      * contains items that have no remaining dependencies and are OK to
!      * process in the current restore pass.
       */
      par_list_header_init(&ready_list);
!     AH->restorePass = RESTORE_PASS_MAIN;
!     move_to_ready_list(pending_list, &ready_list, AH->restorePass);

      /*
       * main parent loop
       *
       * Keep going until there is no worker still running AND there is no work
!      * left to be done.  Note invariant: at top of loop, there should always
!      * be at least one worker available to dispatch a job to.
       */
      ahlog(AH, 1, "entering main parallel loop\n");

!     for (;;)
      {
+         /* Look for an item ready to be dispatched to a worker */
+         next_work_item = get_next_work_item(AH, &ready_list, pstate);
          if (next_work_item != NULL)
          {
              /* If not to be restored, don't waste time launching a worker */
!             if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
              {
                  ahlog(AH, 1, "skipping item %d %s %s\n",
                        next_work_item->dumpId,
                        next_work_item->desc, next_work_item->tag);
!                 /* Drop it from ready_list, and update its dependencies */
                  par_list_remove(next_work_item);
                  reduce_dependencies(AH, next_work_item, &ready_list);
!                 /* Loop around to see if anything else can be dispatched */
                  continue;
              }

*************** restore_toc_entries_parallel(ArchiveHand
*** 3953,3966 ****
                    next_work_item->dumpId,
                    next_work_item->desc, next_work_item->tag);

              par_list_remove(next_work_item);

              DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
                                     mark_restore_job_done, &ready_list);
          }
          else
          {
!             /* at least one child is working and we have nothing ready. */
          }

          /*
--- 3993,4026 ----
                    next_work_item->dumpId,
                    next_work_item->desc, next_work_item->tag);

+             /* Remove it from ready_list, and dispatch to some worker */
              par_list_remove(next_work_item);

              DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
                                     mark_restore_job_done, &ready_list);
          }
+         else if (IsEveryWorkerIdle(pstate))
+         {
+             /*
+              * Nothing is ready and no worker is running, so we're done with
+              * the current pass or maybe with the whole process.
+              */
+             if (AH->restorePass == RESTORE_PASS_LAST)
+                 break;            /* No more parallel processing is possible */
+
+             /* Advance to next restore pass */
+             AH->restorePass++;
+             /* That probably allows some stuff to be made ready */
+             move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+             /* Loop around to see if anything's now ready */
+             continue;
+         }
          else
          {
!             /*
!              * We have nothing ready, but at least one child is working, so
!              * wait for some subjob to finish.
!              */
          }

          /*
*************** restore_toc_entries_parallel(ArchiveHand
*** 3980,3988 ****
--- 4040,4060 ----
                         next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
      }

+     /* There should now be nothing in ready_list. */
+     Assert(ready_list.par_next == &ready_list);
+
      ahlog(AH, 1, "finished main parallel loop\n");
  }

+ /*
+  * Main engine for parallel restore.
+  *
+  * Parallel restore is done in three phases.  In this third phase,
+  * we mop up any remaining TOC entries by processing them serially.
+  * This phase normally should have nothing to do, but if we've somehow
+  * gotten stuck due to circular dependencies or some such, this provides
+  * at least some chance of completing the restore successfully.
+  */
  static void
  restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
  {
*************** restore_toc_entries_postfork(ArchiveHand
*** 4002,4010 ****
      _doSetFixedOutputState(AH);

      /*
!      * Make sure there is no non-ACL work left due to, say, circular
!      * dependencies, or some other pathological condition. If so, do it in the
!      * single parent connection.
       */
      for (te = pending_list->par_next; te != pending_list; te = te->par_next)
      {
--- 4074,4083 ----
      _doSetFixedOutputState(AH);

      /*
!      * Make sure there is no work left due to, say, circular dependencies, or
!      * some other pathological condition.  If so, do it in the single parent
!      * connection.  We don't sweat about RestorePass ordering; it's likely we
!      * already violated that.
       */
      for (te = pending_list->par_next; te != pending_list; te = te->par_next)
      {
*************** restore_toc_entries_postfork(ArchiveHand
*** 4012,4019 ****
                te->dumpId, te->desc, te->tag);
          (void) restore_toc_entry(AH, te, false);
      }
-
-     /* The ACLs will be handled back in RestoreArchive. */
  }

  /*
--- 4085,4090 ----
*************** par_list_remove(TocEntry *te)
*** 4073,4078 ****
--- 4144,4179 ----


  /*
+  * Move all immediately-ready items from pending_list to ready_list.
+  *
+  * Items are considered ready if they have no remaining dependencies and
+  * they belong in the current restore pass.  (See also reduce_dependencies,
+  * which applies the same logic one-at-a-time.)
+  */
+ static void
+ move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
+                    RestorePass pass)
+ {
+     TocEntry   *te;
+     TocEntry   *next_te;
+
+     for (te = pending_list->par_next; te != pending_list; te = next_te)
+     {
+         /* must save list link before possibly moving te to other list */
+         next_te = te->par_next;
+
+         if (te->depCount == 0 &&
+             _tocEntryRestorePass(te) == pass)
+         {
+             /* Remove it from pending_list ... */
+             par_list_remove(te);
+             /* ... and add to ready_list */
+             par_list_append(ready_list, te);
+         }
+     }
+ }
+
+ /*
   * Find the next work item (if any) that is capable of being run now.
   *
   * To qualify, the item must have no remaining dependencies
*************** reduce_dependencies(ArchiveHandle *AH, T
*** 4457,4464 ****
      {
          TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];

          otherte->depCount--;
!         if (otherte->depCount == 0 && otherte->par_prev != NULL)
          {
              /* It must be in the pending list, so remove it ... */
              par_list_remove(otherte);
--- 4558,4574 ----
      {
          TocEntry   *otherte = AH->tocsByDumpId[te->revDeps[i]];

+         Assert(otherte->depCount > 0);
          otherte->depCount--;
!
!         /*
!          * It's ready if it has no remaining dependencies and it belongs in
!          * the current restore pass.  However, don't move it if it has not yet
!          * been put into the pending list.
!          */
!         if (otherte->depCount == 0 &&
!             _tocEntryRestorePass(otherte) == AH->restorePass &&
!             otherte->par_prev != NULL)
          {
              /* It must be in the pending list, so remove it ... */
              par_list_remove(otherte);

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: [HACKERS] pg_dump does not handle indirectly-granted permissions properly
Следующее
От: Etsuro Fujita
Дата:
Сообщение: Re: [HACKERS] Mishandling of WCO constraints in direct foreign tablemodification