Обсуждение: pg_dump test instability

Поиск
Список
Период
Сортировка

pg_dump test instability

От
Peter Eisentraut
Дата:
During the development of an unrelated feature, I have experienced
failures in a pg_dump test case, specifically

t/002_pg_dump.pl ....... 1825/6052
#   Failed test 'defaults_parallel: should not dump COPY
fk_reference_test_table second'
#   at t/002_pg_dump.pl line 3454.

This test sets up two tables connected by a foreign key and checks that
a data_only dump dumps them ordered so that the primary key table comes
first.

But because of the way the tests are set up, it also checks that in all
other dumps (i.e., non-data_only) it does *not* dump them in that order.
 This is kind of irrelevant to the test, but there is no way to express
in the pg_dump tests to not check certain scenarios.

In a non-data_only dump, the order of the tables doesn't matter, because
the foreign keys are added at the very end.  In parallel dumps, the
tables are in addition sorted by size, so the resultant order is
different from a single-threaded dump.  This can be seen by comparing
the dumped TOCs of the defaults_dir_format and defaults_parallel cases.
But it all happens to pass the tests right now.

In my hacking I have added another test table to the pg_dump test set,
which seems to have thrown off the sorting and scheduling, so that the
two tables now happen to come out in primary-key-first order anyway,
which causes the test to fail.

I have developed the attached rough patch to add a third option to
pg_dump test cases: besides like and unlike, add a "skip" option to
disregard the result of the test.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Вложения

Re: pg_dump test instability

От
Stephen Frost
Дата:
Greetings,

* Peter Eisentraut (peter.eisentraut@2ndquadrant.com) wrote:
> During the development of an unrelated feature, I have experienced
> failures in a pg_dump test case, specifically
>
> t/002_pg_dump.pl ....... 1825/6052
> #   Failed test 'defaults_parallel: should not dump COPY
> fk_reference_test_table second'
> #   at t/002_pg_dump.pl line 3454.
>
> This test sets up two tables connected by a foreign key and checks that
> a data_only dump dumps them ordered so that the primary key table comes
> first.
>
> But because of the way the tests are set up, it also checks that in all
> other dumps (i.e., non-data_only) it does *not* dump them in that order.
>  This is kind of irrelevant to the test, but there is no way to express
> in the pg_dump tests to not check certain scenarios.

Hmmm, yeah, that's a good point.  Most of the checks are set up that way
because it makes writing checks simpler and we have fewer of them, but
in this case we don't want that requirement to be levied on
non-data-only dumps.

> In a non-data_only dump, the order of the tables doesn't matter, because
> the foreign keys are added at the very end.  In parallel dumps, the
> tables are in addition sorted by size, so the resultant order is
> different from a single-threaded dump.  This can be seen by comparing
> the dumped TOCs of the defaults_dir_format and defaults_parallel cases.
> But it all happens to pass the tests right now.

Occationally I get lucky, apparently. :)  Though I think this might have
been different before I reworked those tests to be simpler.

> In my hacking I have added another test table to the pg_dump test set,
> which seems to have thrown off the sorting and scheduling, so that the
> two tables now happen to come out in primary-key-first order anyway,
> which causes the test to fail.

Ok.

> I have developed the attached rough patch to add a third option to
> pg_dump test cases: besides like and unlike, add a "skip" option to
> disregard the result of the test.

If I read this correctly, the actual test isn't run at all (though, of
course, the tables are created and such).

In any case though, this doesn't completely solve the problem, does it?
Skipping 'defaults' also causes 'defaults_parallel' to be skipped and
therefore avoids the issue for now, but if some other change caused the
ordering to be different in the regular (non-data_only) cases then this
test would blow up again.

Looking back at the 9.6 tests, tests were only run for the runs where
they were explicitly specified, which lead to tests being missed that
shouldn't have been, and that lead to the approach now used where every
test is against every run.

As this test should really only ever be applied to the 'data_only' run,
it seems like we should have a 'run' list and for this test that would
be just 'data_only'.  I haven't looked into what's supposed to happen
here in a parallel data-only test, but if we expect the ordering to
still be honored then we should probably have a test for that.

So, in short, I don't really agree with this 'skip' approach as it
doesn't properly solve this problem but would rather see a 'only_runs'
option or similar where this test is only tried against the data_only
run (and possibly a data_only_parallel_restore one, if one such was
added).  In any case, please be sure to also update the documentation
under 'Definition of the tests to run.' (about line 335) whenever the
set of options that can be specified for the tests is changed, and
let's strongly discourage the use of this feature for most tests as
these kinds of one-off's are quite rare.

Thanks!

Stephen

Вложения

Re: pg_dump test instability

От
Tom Lane
Дата:
Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
> In a non-data_only dump, the order of the tables doesn't matter, because
> the foreign keys are added at the very end.  In parallel dumps, the
> tables are in addition sorted by size, so the resultant order is
> different from a single-threaded dump.  This can be seen by comparing
> the dumped TOCs of the defaults_dir_format and defaults_parallel cases.
> But it all happens to pass the tests right now.

I noticed that business about sorting the TOC by size yesterday.
I think that's a completely bletcherous hack, and we ought to get
rid of it in favor of keeping the TOC order the same between parallel
and non-parallel cases, and instead doing size comparisons during
parallel worker dispatch.

The primary reason why it's a bletcherous hack is that it's designed
around the assumption that parallel dumps will be restored in parallel
and non-parallel dumps will be restored non-parallel.  That assumption
is wrong on its face.  But it explains why the code sorts both tables
and indexes, even though pg_dump doesn't have any need to parallelize
index dumps: it's expecting that a subsequent parallel restore will have
use for building indexes largest-first.  Of course, if you made the dump
non-parallel, you're out of luck on that.

Another reason why the code is bogus is that it sorts only a consecutive
sequence of DO_INDEX items.  This fails to work at all for indexes that
are constraints, and even for ones that aren't, there's no very good
reason to assume that they aren't interspersed with other sorts of
objects due to dependencies.  So I suspect an awful lot of parallelism
is being left on the table so far as indexes are concerned.

So if we made this less of a quick-n-dirty kluge, we could remove the
hazard of different-looking dump results in parallel and serial cases,
and probably improve restore performance as well.

A small problem with postponing sorting to the restore side is that
I don't think we record relpages info anywhere in the archive format.
However, at least for the directory-format case (which I think is the
only one supported for parallel restore), we could make it compare the
file sizes of the TABLE DATA items.  That'd work pretty well as a proxy
for both the amount of effort needed for table restore, and the amount
of effort needed to build indexes on the tables afterwards.

            regards, tom lane


Re: pg_dump test instability

От
Stephen Frost
Дата:
Greetings,

* Tom Lane (tgl@sss.pgh.pa.us) wrote:
> Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
> > In a non-data_only dump, the order of the tables doesn't matter, because
> > the foreign keys are added at the very end.  In parallel dumps, the
> > tables are in addition sorted by size, so the resultant order is
> > different from a single-threaded dump.  This can be seen by comparing
> > the dumped TOCs of the defaults_dir_format and defaults_parallel cases.
> > But it all happens to pass the tests right now.
>
> I noticed that business about sorting the TOC by size yesterday.
> I think that's a completely bletcherous hack, and we ought to get
> rid of it in favor of keeping the TOC order the same between parallel
> and non-parallel cases, and instead doing size comparisons during
> parallel worker dispatch.

So instead of dumping things by the order of the TOC, we'll perform the
sorting later on before handing out jobs to workers?  That seems alright
to me for the most part.  One thing I do wonder about is if we should
also be sorting by tablespace and not just size, to try and maximize
throughput (that is, assign out parallel workers to each tablespace,
each going after the largest table in that tablespace, before coming
back around to assigning the next-largest file to the second worker on a
given tablespace, presuming we have more workers than tablespaces),
that's what we've seen works rather well in pgbackrest.

> However, at least for the directory-format case (which I think is the
> only one supported for parallel restore), we could make it compare the
> file sizes of the TABLE DATA items.  That'd work pretty well as a proxy
> for both the amount of effort needed for table restore, and the amount
> of effort needed to build indexes on the tables afterwards.

Parallel restore also works w/ custom-format dumps.

Thanks!

Stephen

Вложения

Re: pg_dump test instability

От
Tom Lane
Дата:
Stephen Frost <sfrost@snowman.net> writes:
> * Tom Lane (tgl@sss.pgh.pa.us) wrote:
>> However, at least for the directory-format case (which I think is the
>> only one supported for parallel restore), we could make it compare the
>> file sizes of the TABLE DATA items.  That'd work pretty well as a proxy
>> for both the amount of effort needed for table restore, and the amount
>> of effort needed to build indexes on the tables afterwards.

> Parallel restore also works w/ custom-format dumps.

Really.  Well then the existing code is even more broken, because it
only does this sorting for directory output:

    /* If we do a parallel dump, we want the largest tables to go first */
    if (archiveFormat == archDirectory && numWorkers > 1)
        sortDataAndIndexObjectsBySize(dobjs, numObjs);

so that parallel restore is completely left in the lurch with a
custom-format dump.

But I imagine we can get some measure of table data size out of a custom
dump too.

            regards, tom lane


Re: pg_dump test instability

От
Stephen Frost
Дата:
Greetings,

* Tom Lane (tgl@sss.pgh.pa.us) wrote:
> Stephen Frost <sfrost@snowman.net> writes:
> > * Tom Lane (tgl@sss.pgh.pa.us) wrote:
> >> However, at least for the directory-format case (which I think is the
> >> only one supported for parallel restore), we could make it compare the
> >> file sizes of the TABLE DATA items.  That'd work pretty well as a proxy
> >> for both the amount of effort needed for table restore, and the amount
> >> of effort needed to build indexes on the tables afterwards.
>
> > Parallel restore also works w/ custom-format dumps.
>
> Really.  Well then the existing code is even more broken, because it
> only does this sorting for directory output:
>
>     /* If we do a parallel dump, we want the largest tables to go first */
>     if (archiveFormat == archDirectory && numWorkers > 1)
>         sortDataAndIndexObjectsBySize(dobjs, numObjs);
>
> so that parallel restore is completely left in the lurch with a
> custom-format dump.

Sorry for not being clear- it's only possible to parallel *dump* to a
directory-format dump, and the above code is for performing that
sort-by-size before executing a parallel dump.  One might wonder why
there's the check for archiveFormat at all though- numWorkers shouldn't
be able to be >1 except in the case where the archiveFormat supports
parallel dump, and if it supports parallel dump, then we should try to
dump out the tables largest-first.

Parallel *restore* can be done from either a custom-format dump or from
a directory-format dump.  I agree that we should seperate the concerns
and perform independent sorting on the restore side of things based on
the relative sizes of tables in the dump (be it custom format or
directory format).  While compression might make us not exactly correct
on the restore side, I expect that we'll generally be close enough to
avoid most cases where a single worker gets stuck working on a large
table at the end after all the other work is done.

> But I imagine we can get some measure of table data size out of a custom
> dump too.

I would think so.

Thanks!

Stephen

Вложения

Re: pg_dump test instability

От
Tom Lane
Дата:
Stephen Frost <sfrost@snowman.net> writes:
> Parallel *restore* can be done from either a custom-format dump or from
> a directory-format dump.  I agree that we should seperate the concerns
> and perform independent sorting on the restore side of things based on
> the relative sizes of tables in the dump (be it custom format or
> directory format).  While compression might make us not exactly correct
> on the restore side, I expect that we'll generally be close enough to
> avoid most cases where a single worker gets stuck working on a large
> table at the end after all the other work is done.

Here's a proposed patch for this.  It removes the hacking of the TOC list
order, solving Peter's original problem, and instead sorts-by-size
in the actual parallel dump or restore control code.  There are a
number of ensuing performance benefits:

* The BLOBS entry, if any, gets to participate in the ordering decision
during parallel dumps.  As the code stands, all the effort to avoid
scheduling a long job last is utterly wasted if you've got a lot of
blobs, because that entry stayed at the end.  I didn't work real hard
on that, just gave it a large size so it would go first not last.  If
you just have a few blobs, that's not necessary, but I doubt it hurts
either.

* During restore, we insert actual size numbers into the BLOBS and
TABLE DATA items, and then anything that depends on a TABLE DATA item
inherits its size.  This results in size-based prioritization not just
for simple indexes as before, but also for constraint indexes (UNIQUE
or PRIMARY KEY), foreign key verifications, delayed CHECK constraints,
etc.  It also means that stuff like triggers and rules get reinstalled
in size-based order, which doesn't really help, but again I don't
think it hurts.

* Parallel restore scheduling by size works for custom dumps as well
as directory ones (as long as the dump file was seekable when created,
but you'll be hurting anyway if it wasn't).

I have not really tried to demonstrate performance benefits, because
the results would depend a whole lot on your test case; but at least
in principle this should result in far more intelligent scheduling
of parallel restores.

While I haven't done so here, I'm rather tempted to rename the
par_prev/par_next fields and par_list_xxx functions to pending_prev,
pending_next, pending_list_xxx, since they now have only one use.
(BTW, I tried really hard to get rid of par_prev/par_next altogether,
in favor of keeping the pending entries in the unused space in the
"ready" TocEntry* array.  But it didn't work out well --- seems like
a list really is the natural data structure for that.)

            regards, tom lane

diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 42cf441..ba79821 100644
*** a/src/bin/pg_dump/pg_backup.h
--- b/src/bin/pg_dump/pg_backup.h
*************** extern void ConnectDatabase(Archive *AH,
*** 252,269 ****
  extern void DisconnectDatabase(Archive *AHX);
  extern PGconn *GetConnection(Archive *AHX);

- /* Called to add a TOC entry */
- extern void ArchiveEntry(Archive *AHX,
-              CatalogId catalogId, DumpId dumpId,
-              const char *tag,
-              const char *namespace, const char *tablespace,
-              const char *owner, bool withOids,
-              const char *desc, teSection section,
-              const char *defn,
-              const char *dropStmt, const char *copyStmt,
-              const DumpId *deps, int nDeps,
-              DataDumperPtr dumpFn, void *dumpArg);
-
  /* Called to write *data* to the archive */
  extern void WriteData(Archive *AH, const void *data, size_t dLen);

--- 252,257 ----
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 45a391b..fce014f 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** typedef struct _outputContext
*** 49,54 ****
--- 49,72 ----
      int            gzOut;
  } OutputContext;

+ /*
+  * State for tracking TocEntrys that are ready to process during a parallel
+  * restore.  (This used to be a list, and we still call it that, though now
+  * it's really an array so that we can apply qsort to it.)
+  *
+  * tes[] is sized large enough that we can't overrun it.
+  * The valid entries are indexed first_te .. last_te inclusive.
+  * We periodically sort the array to bring larger-by-dataLength entries to
+  * the front; "sorted" is true if the valid entries are known sorted.
+  */
+ typedef struct _parallelReadyList
+ {
+     TocEntry  **tes;            /* Ready-to-dump TocEntrys */
+     int            first_te;        /* index of first valid entry in tes[] */
+     int            last_te;        /* index of last valid entry in tes[] */
+     bool        sorted;            /* are valid entries currently sorted? */
+ } ParallelReadyList;
+
  /* translator: this is a module name */
  static const char *modulename = gettext_noop("archiver");

*************** static void restore_toc_entries_postfork
*** 98,107 ****
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
! static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
                     RestorePass pass);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
!                    TocEntry *ready_list,
                     ParallelState *pstate);
  static void mark_dump_job_done(ArchiveHandle *AH,
                     TocEntry *te,
--- 116,131 ----
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
! static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
! static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
! static void ready_list_remove(ParallelReadyList *ready_list, int i);
! static void ready_list_sort(ParallelReadyList *ready_list);
! static int    TocEntrySizeCompare(const void *p1, const void *p2);
! static void move_to_ready_list(TocEntry *pending_list,
!                    ParallelReadyList *ready_list,
                     RestorePass pass);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
!                    ParallelReadyList *ready_list,
                     ParallelState *pstate);
  static void mark_dump_job_done(ArchiveHandle *AH,
                     TocEntry *te,
*************** static bool has_lock_conflicts(TocEntry
*** 116,122 ****
  static void repoint_table_dependencies(ArchiveHandle *AH);
  static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
  static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
!                     TocEntry *ready_list);
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);

--- 140,146 ----
  static void repoint_table_dependencies(ArchiveHandle *AH);
  static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
  static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
!                     ParallelReadyList *ready_list);
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);

*************** RestoreArchive(Archive *AHX)
*** 639,644 ****
--- 663,672 ----
          ParallelState *pstate;
          TocEntry    pending_list;

+         /* The archive format module may need some setup for this */
+         if (AH->PrepParallelRestorePtr)
+             AH->PrepParallelRestorePtr(AH);
+
          par_list_header_init(&pending_list);

          /* This runs PRE_DATA items and then disconnects from the database */
*************** WriteData(Archive *AHX, const void *data
*** 1039,1048 ****
  /*
   * Create a new TOC entry. The TOC was designed as a TOC, but is now the
   * repository for all metadata. But the name has stuck.
   */

  /* Public */
! void
  ArchiveEntry(Archive *AHX,
               CatalogId catalogId, DumpId dumpId,
               const char *tag,
--- 1067,1080 ----
  /*
   * Create a new TOC entry. The TOC was designed as a TOC, but is now the
   * repository for all metadata. But the name has stuck.
+  *
+  * The new entry is added to the Archive's TOC list.  Most callers can ignore
+  * the result value because nothing else need be done, but a few want to
+  * manipulate the TOC entry further.
   */

  /* Public */
! TocEntry *
  ArchiveEntry(Archive *AHX,
               CatalogId catalogId, DumpId dumpId,
               const char *tag,
*************** ArchiveEntry(Archive *AHX,
*** 1100,1108 ****
--- 1132,1143 ----
      newToc->hadDumper = dumpFn ? true : false;

      newToc->formatData = NULL;
+     newToc->dataLength = 0;

      if (AH->ArchiveEntryPtr != NULL)
          AH->ArchiveEntryPtr(AH, newToc);
+
+     return newToc;
  }

  /* Public */
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2413,2444 ****
  {
      TocEntry   *te;

!     for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         if (!te->dataDumper)
!             continue;
!
!         if ((te->reqs & REQ_DATA) == 0)
!             continue;

!         if (pstate && pstate->numWorkers > 1)
          {
!             /*
!              * If we are in a parallel backup, then we are always the master
!              * process.  Dispatch each data-transfer job to a worker.
!              */
!             DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
!                                    mark_dump_job_done, NULL);
          }
-         else
-             WriteDataChunksForTocEntry(AH, te);
-     }

!     /*
!      * If parallel, wait for workers to finish.
!      */
!     if (pstate && pstate->numWorkers > 1)
          WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
  }


--- 2448,2506 ----
  {
      TocEntry   *te;

!     if (pstate && pstate->numWorkers > 1)
      {
!         /*
!          * In parallel mode, this code runs in the master process.  We
!          * construct an array of candidate TEs, then sort it into decreasing
!          * size order, then dispatch each TE to a data-transfer worker.  By
!          * dumping larger tables first, we avoid getting into a situation
!          * where we're down to one job and it's big, losing parallelism.
!          */
!         TocEntry  **tes;
!         int            ntes;

!         tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
!         ntes = 0;
!         for (te = AH->toc->next; te != AH->toc; te = te->next)
          {
!             /* Consider only TEs with dataDumper functions ... */
!             if (!te->dataDumper)
!                 continue;
!             /* ... and ignore ones not enabled for dump */
!             if ((te->reqs & REQ_DATA) == 0)
!                 continue;
!
!             tes[ntes++] = te;
          }

!         if (ntes > 1)
!             qsort((void *) tes, ntes, sizeof(TocEntry *),
!                   TocEntrySizeCompare);
!
!         for (int i = 0; i < ntes; i++)
!             DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
!                                    mark_dump_job_done, NULL);
!
!         pg_free(tes);
!
!         /* Now wait for workers to finish. */
          WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
+     }
+     else
+     {
+         /* Non-parallel mode: just dump all candidate TEs sequentially. */
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             /* Must have same filter conditions as above */
+             if (!te->dataDumper)
+                 continue;
+             if ((te->reqs & REQ_DATA) == 0)
+                 continue;
+
+             WriteDataChunksForTocEntry(AH, te);
+         }
+     }
  }


*************** ReadToc(ArchiveHandle *AH)
*** 2690,2695 ****
--- 2752,2758 ----
              te->dependencies = NULL;
              te->nDeps = 0;
          }
+         te->dataLength = 0;

          if (AH->ReadExtraTocPtr)
              AH->ReadExtraTocPtr(AH, te);
*************** static void
*** 4035,4045 ****
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
!     TocEntry    ready_list;
      TocEntry   *next_work_item;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

      /*
       * The pending_list contains all items that we need to restore.  Move all
       * items that are available to process immediately into the ready_list.
--- 4098,4111 ----
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
!     ParallelReadyList ready_list;
      TocEntry   *next_work_item;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

+     /* Set up ready_list with enough room for all known TocEntrys */
+     ready_list_init(&ready_list, AH->tocCount);
+
      /*
       * The pending_list contains all items that we need to restore.  Move all
       * items that are available to process immediately into the ready_list.
*************** restore_toc_entries_parallel(ArchiveHand
*** 4048,4054 ****
       * contains items that have no remaining dependencies and are OK to
       * process in the current restore pass.
       */
-     par_list_header_init(&ready_list);
      AH->restorePass = RESTORE_PASS_MAIN;
      move_to_ready_list(pending_list, &ready_list, AH->restorePass);

--- 4114,4119 ----
*************** restore_toc_entries_parallel(ArchiveHand
*** 4073,4080 ****
                  ahlog(AH, 1, "skipping item %d %s %s\n",
                        next_work_item->dumpId,
                        next_work_item->desc, next_work_item->tag);
!                 /* Drop it from ready_list, and update its dependencies */
!                 par_list_remove(next_work_item);
                  reduce_dependencies(AH, next_work_item, &ready_list);
                  /* Loop around to see if anything else can be dispatched */
                  continue;
--- 4138,4144 ----
                  ahlog(AH, 1, "skipping item %d %s %s\n",
                        next_work_item->dumpId,
                        next_work_item->desc, next_work_item->tag);
!                 /* Update its dependencies as though we'd completed it */
                  reduce_dependencies(AH, next_work_item, &ready_list);
                  /* Loop around to see if anything else can be dispatched */
                  continue;
*************** restore_toc_entries_parallel(ArchiveHand
*** 4084,4092 ****
                    next_work_item->dumpId,
                    next_work_item->desc, next_work_item->tag);

!             /* Remove it from ready_list, and dispatch to some worker */
!             par_list_remove(next_work_item);
!
              DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
                                     mark_restore_job_done, &ready_list);
          }
--- 4148,4154 ----
                    next_work_item->dumpId,
                    next_work_item->desc, next_work_item->tag);

!             /* Dispatch to some worker */
              DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
                                     mark_restore_job_done, &ready_list);
          }
*************** restore_toc_entries_parallel(ArchiveHand
*** 4132,4138 ****
      }

      /* There should now be nothing in ready_list. */
!     Assert(ready_list.par_next == &ready_list);

      ahlog(AH, 1, "finished main parallel loop\n");
  }
--- 4194,4202 ----
      }

      /* There should now be nothing in ready_list. */
!     Assert(ready_list.first_te > ready_list.last_te);
!
!     pg_free(ready_list.tes);

      ahlog(AH, 1, "finished main parallel loop\n");
  }
*************** par_list_remove(TocEntry *te)
*** 4235,4240 ****
--- 4299,4386 ----


  /*
+  * Initialize the ready_list with enough room for up to tocCount entries.
+  */
+ static void
+ ready_list_init(ParallelReadyList *ready_list, int tocCount)
+ {
+     ready_list->tes = (TocEntry **)
+         pg_malloc(tocCount * sizeof(TocEntry *));
+     ready_list->first_te = 0;
+     ready_list->last_te = -1;
+     ready_list->sorted = false;
+ }
+
+ /* Add te to the ready_list */
+ static void
+ ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
+ {
+     ready_list->tes[++ready_list->last_te] = te;
+     ready_list->sorted = false;
+ }
+
+ /* Remove the i'th entry in the ready_list */
+ static void
+ ready_list_remove(ParallelReadyList *ready_list, int i)
+ {
+     Assert(i >= ready_list->first_te && i <= ready_list->last_te);
+     if (i == ready_list->first_te)
+     {
+         /*
+          * In the common case where we dispatch the first available entry, the
+          * list can still be considered sorted.
+          */
+         ready_list->first_te++;
+     }
+     else
+     {
+         /* Must compact the list and mark it unsorted */
+         ready_list->tes[i] = ready_list->tes[ready_list->first_te];
+         ready_list->first_te++;
+         ready_list->sorted = false;
+     }
+ }
+
+ /* Sort the ready_list into the desired order */
+ static void
+ ready_list_sort(ParallelReadyList *ready_list)
+ {
+     if (!ready_list->sorted)
+     {
+         int            n = ready_list->last_te - ready_list->first_te + 1;
+
+         if (n > 1)
+             qsort(ready_list->tes + ready_list->first_te, n,
+                   sizeof(TocEntry *),
+                   TocEntrySizeCompare);
+         ready_list->sorted = true;
+     }
+ }
+
+ /* qsort comparator for sorting TocEntries by dataLength */
+ static int
+ TocEntrySizeCompare(const void *p1, const void *p2)
+ {
+     const TocEntry *te1 = *(const TocEntry *const *) p1;
+     const TocEntry *te2 = *(const TocEntry *const *) p2;
+
+     /* Sort by decreasing dataLength */
+     if (te1->dataLength > te2->dataLength)
+         return -1;
+     if (te1->dataLength < te2->dataLength)
+         return 1;
+
+     /* For equal dataLengths, sort by dumpId, just to be stable */
+     if (te1->dumpId < te2->dumpId)
+         return -1;
+     if (te1->dumpId > te2->dumpId)
+         return 1;
+
+     return 0;
+ }
+
+
+ /*
   * Move all immediately-ready items from pending_list to ready_list.
   *
   * Items are considered ready if they have no remaining dependencies and
*************** par_list_remove(TocEntry *te)
*** 4242,4248 ****
   * which applies the same logic one-at-a-time.)
   */
  static void
! move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
                     RestorePass pass)
  {
      TocEntry   *te;
--- 4388,4395 ----
   * which applies the same logic one-at-a-time.)
   */
  static void
! move_to_ready_list(TocEntry *pending_list,
!                    ParallelReadyList *ready_list,
                     RestorePass pass)
  {
      TocEntry   *te;
*************** move_to_ready_list(TocEntry *pending_lis
*** 4259,4265 ****
              /* Remove it from pending_list ... */
              par_list_remove(te);
              /* ... and add to ready_list */
!             par_list_append(ready_list, te);
          }
      }
  }
--- 4406,4412 ----
              /* Remove it from pending_list ... */
              par_list_remove(te);
              /* ... and add to ready_list */
!             ready_list_insert(ready_list, te);
          }
      }
  }
*************** move_to_ready_list(TocEntry *pending_lis
*** 4272,4293 ****
   * items currently running.  Items in the ready_list are known to have
   * no remaining dependencies, but we have to check for lock conflicts.
   *
!  * Note that the returned item has *not* been removed from ready_list.
!  * The caller must do that after successfully dispatching the item.
   *
   * pref_non_data is for an alternative selection algorithm that gives
   * preference to non-data items if there is already a data load running.
   * It is currently disabled.
   */
  static TocEntry *
! get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
                     ParallelState *pstate)
  {
      bool        pref_non_data = false;    /* or get from AH->ropt */
!     TocEntry   *data_te = NULL;
!     TocEntry   *te;
!     int            i,
!                 k;

      /*
       * Bogus heuristics for pref_non_data
--- 4419,4436 ----
   * items currently running.  Items in the ready_list are known to have
   * no remaining dependencies, but we have to check for lock conflicts.
   *
!  * The returned item has been removed from the ready_list.
   *
   * pref_non_data is for an alternative selection algorithm that gives
   * preference to non-data items if there is already a data load running.
   * It is currently disabled.
   */
  static TocEntry *
! get_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
                     ParallelState *pstate)
  {
      bool        pref_non_data = false;    /* or get from AH->ropt */
!     int            data_te_index = -1;

      /*
       * Bogus heuristics for pref_non_data
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4296,4302 ****
      {
          int            count = 0;

!         for (k = 0; k < pstate->numWorkers; k++)
          {
              TocEntry   *running_te = pstate->te[k];

--- 4439,4445 ----
      {
          int            count = 0;

!         for (int k = 0; k < pstate->numWorkers; k++)
          {
              TocEntry   *running_te = pstate->te[k];

*************** get_next_work_item(ArchiveHandle *AH, To
*** 4309,4318 ****
      }

      /*
       * Search the ready_list until we find a suitable item.
       */
!     for (te = ready_list->par_next; te != ready_list; te = te->par_next)
      {
          bool        conflicts = false;

          /*
--- 4452,4467 ----
      }

      /*
+      * Sort the ready_list so that we'll tackle larger jobs first.
+      */
+     ready_list_sort(ready_list);
+
+     /*
       * Search the ready_list until we find a suitable item.
       */
!     for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
      {
+         TocEntry   *te = ready_list->tes[i];
          bool        conflicts = false;

          /*
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4320,4328 ****
           * that a currently running item also needs lock on, or vice versa. If
           * so, we don't want to schedule them together.
           */
!         for (i = 0; i < pstate->numWorkers; i++)
          {
!             TocEntry   *running_te = pstate->te[i];

              if (running_te == NULL)
                  continue;
--- 4469,4477 ----
           * that a currently running item also needs lock on, or vice versa. If
           * so, we don't want to schedule them together.
           */
!         for (int k = 0; k < pstate->numWorkers; k++)
          {
!             TocEntry   *running_te = pstate->te[k];

              if (running_te == NULL)
                  continue;
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4339,4355 ****

          if (pref_non_data && te->section == SECTION_DATA)
          {
!             if (data_te == NULL)
!                 data_te = te;
              continue;
          }

          /* passed all tests, so this item can run */
          return te;
      }

!     if (data_te != NULL)
          return data_te;

      ahlog(AH, 2, "no item ready\n");
      return NULL;
--- 4488,4510 ----

          if (pref_non_data && te->section == SECTION_DATA)
          {
!             if (data_te_index < 0)
!                 data_te_index = i;
              continue;
          }

          /* passed all tests, so this item can run */
+         ready_list_remove(ready_list, i);
          return te;
      }

!     if (data_te_index >= 0)
!     {
!         TocEntry   *data_te = ready_list->tes[data_te_index];
!
!         ready_list_remove(ready_list, data_te_index);
          return data_te;
+     }

      ahlog(AH, 2, "no item ready\n");
      return NULL;
*************** mark_restore_job_done(ArchiveHandle *AH,
*** 4393,4399 ****
                        int status,
                        void *callback_data)
  {
!     TocEntry   *ready_list = (TocEntry *) callback_data;

      ahlog(AH, 1, "finished item %d %s %s\n",
            te->dumpId, te->desc, te->tag);
--- 4548,4554 ----
                        int status,
                        void *callback_data)
  {
!     ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;

      ahlog(AH, 1, "finished item %d %s %s\n",
            te->dumpId, te->desc, te->tag);
*************** fix_dependencies(ArchiveHandle *AH)
*** 4551,4556 ****
--- 4706,4717 ----
  /*
   * Change dependencies on table items to depend on table data items instead,
   * but only in POST_DATA items.
+  *
+  * Also, for any item having such dependency(s), set its dataLength to the
+  * largest dataLength of the table data items it depends on.  This ensures
+  * that parallel restore will prioritize larger jobs (index builds, FK
+  * constraint checks, etc) over smaller ones, avoiding situations where we
+  * end a restore with only one active job working on a large table.
   */
  static void
  repoint_table_dependencies(ArchiveHandle *AH)
*************** repoint_table_dependencies(ArchiveHandle
*** 4569,4577 ****
              if (olddep <= AH->maxDumpId &&
                  AH->tableDataId[olddep] != 0)
              {
!                 te->dependencies[i] = AH->tableDataId[olddep];
                  ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
!                       te->dumpId, olddep, AH->tableDataId[olddep]);
              }
          }
      }
--- 4730,4742 ----
              if (olddep <= AH->maxDumpId &&
                  AH->tableDataId[olddep] != 0)
              {
!                 DumpId        tabledataid = AH->tableDataId[olddep];
!                 TocEntry   *tabledatate = AH->tocsByDumpId[tabledataid];
!
!                 te->dependencies[i] = tabledataid;
!                 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
                  ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
!                       te->dumpId, olddep, tabledataid);
              }
          }
      }
*************** identify_locking_dependencies(ArchiveHan
*** 4639,4645 ****
   * becomes ready should be moved to the ready_list, if that's provided.
   */
  static void
! reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
  {
      int            i;

--- 4804,4811 ----
   * becomes ready should be moved to the ready_list, if that's provided.
   */
  static void
! reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
!                     ParallelReadyList *ready_list)
  {
      int            i;

*************** reduce_dependencies(ArchiveHandle *AH, T
*** 4668,4674 ****
              /* Remove it from pending list ... */
              par_list_remove(otherte);
              /* ... and add to ready_list */
!             par_list_append(ready_list, otherte);
          }
      }
  }
--- 4834,4840 ----
              /* Remove it from pending list ... */
              par_list_remove(otherte);
              /* ... and add to ready_list */
!             ready_list_insert(ready_list, otherte);
          }
      }
  }
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 8dd1915..3b7b8c0 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef int (*WriteBytePtrType) (Archive
*** 162,173 ****
  typedef int (*ReadBytePtrType) (ArchiveHandle *AH);
  typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len);
  typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len);
- typedef void (*SaveArchivePtrType) (ArchiveHandle *AH);
  typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te);

  typedef void (*ClonePtrType) (ArchiveHandle *AH);
  typedef void (*DeClonePtrType) (ArchiveHandle *AH);

--- 162,173 ----
  typedef int (*ReadBytePtrType) (ArchiveHandle *AH);
  typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len);
  typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len);
  typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te);

+ typedef void (*PrepParallelRestorePtrType) (ArchiveHandle *AH);
  typedef void (*ClonePtrType) (ArchiveHandle *AH);
  typedef void (*DeClonePtrType) (ArchiveHandle *AH);

*************** struct _archiveHandle
*** 297,302 ****
--- 297,303 ----
      WorkerJobDumpPtrType WorkerJobDumpPtr;
      WorkerJobRestorePtrType WorkerJobRestorePtr;

+     PrepParallelRestorePtrType PrepParallelRestorePtr;
      ClonePtrType ClonePtr;        /* Clone format-specific fields */
      DeClonePtrType DeClonePtr;    /* Clean up cloned fields */

*************** struct _tocEntry
*** 387,392 ****
--- 388,394 ----
      void       *formatData;        /* TOC Entry data specific to file format */

      /* working state while dumping/restoring */
+     pgoff_t        dataLength;        /* item's data size; 0 if none or unknown */
      teReqs        reqs;            /* do we need schema and/or data of object */
      bool        created;        /* set for DATA member if TABLE was created */

*************** extern void on_exit_close_archive(Archiv
*** 405,410 ****
--- 407,424 ----

  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
pg_attribute_printf(3,4); 

+ /* Called to add a TOC entry */
+ extern TocEntry *ArchiveEntry(Archive *AHX,
+              CatalogId catalogId, DumpId dumpId,
+              const char *tag,
+              const char *namespace, const char *tablespace,
+              const char *owner, bool withOids,
+              const char *desc, teSection section,
+              const char *defn,
+              const char *dropStmt, const char *copyStmt,
+              const DumpId *deps, int nDeps,
+              DataDumperPtr dumpFn, void *dumpArg);
+
  extern void WriteTOC(ArchiveHandle *AH);
  extern void ReadTOC(ArchiveHandle *AH);
  extern void WriteHead(ArchiveHandle *AH);
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index ad18a6c..96f44e8 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** static void _StartBlob(ArchiveHandle *AH
*** 59,64 ****
--- 59,66 ----
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, bool drop);
+
+ static void _PrepParallelRestore(ArchiveHandle *AH);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);

*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 129,134 ****
--- 131,138 ----
      AH->StartBlobPtr = _StartBlob;
      AH->EndBlobPtr = _EndBlob;
      AH->EndBlobsPtr = _EndBlobs;
+
+     AH->PrepParallelRestorePtr = _PrepParallelRestore;
      AH->ClonePtr = _Clone;
      AH->DeClonePtr = _DeClone;

*************** _ReopenArchive(ArchiveHandle *AH)
*** 776,781 ****
--- 780,845 ----
  }

  /*
+  * Prepare for parallel restore.
+  *
+  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+  * TOC entries' dataLength fields with appropriate values to guide the
+  * ordering of restore jobs.  The source of said data is format-dependent,
+  * as is the exact meaning of the values.
+  *
+  * A format module might also choose to do other setup here.
+  */
+ static void
+ _PrepParallelRestore(ArchiveHandle *AH)
+ {
+     lclContext *ctx = (lclContext *) AH->formatData;
+     TocEntry   *prev_te = NULL;
+     lclTocEntry *prev_tctx = NULL;
+     TocEntry   *te;
+
+     /*
+      * Knowing that the data items were dumped out in TOC order, we can
+      * reconstruct the length of each item as the delta to the start offset of
+      * the next data item.
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+         /*
+          * Ignore entries without a known data offset; if we were unable to
+          * seek to rewrite the TOC when creating the archive, this'll be all
+          * of them, and we'll end up with no size estimates.
+          */
+         if (tctx->dataState != K_OFFSET_POS_SET)
+             continue;
+
+         /* Compute previous data item's length */
+         if (prev_te)
+         {
+             if (tctx->dataPos > prev_tctx->dataPos)
+                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
+         }
+
+         prev_te = te;
+         prev_tctx = tctx;
+     }
+
+     /* If OK to seek, we can determine the length of the last item */
+     if (prev_te && ctx->hasSeek)
+     {
+         pgoff_t        endpos;
+
+         if (fseeko(AH->FH, 0, SEEK_END) != 0)
+             exit_horribly(modulename, "error during file seek: %s\n",
+                           strerror(errno));
+         endpos = ftello(AH->FH);
+         if (endpos > prev_tctx->dataPos)
+             prev_te->dataLength = endpos - prev_tctx->dataPos;
+     }
+ }
+
+ /*
   * Clone format-specific fields during parallel restoration.
   */
  static void
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 4aabb40..cda90b9 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** static void _EndBlob(ArchiveHandle *AH,
*** 87,92 ****
--- 87,93 ----
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH);

+ static void _PrepParallelRestore(ArchiveHandle *AH);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);

*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 132,137 ****
--- 133,139 ----
      AH->EndBlobPtr = _EndBlob;
      AH->EndBlobsPtr = _EndBlobs;

+     AH->PrepParallelRestorePtr = _PrepParallelRestore;
      AH->ClonePtr = _Clone;
      AH->DeClonePtr = _DeClone;

*************** _ArchiveEntry(ArchiveHandle *AH, TocEntr
*** 240,252 ****
      char        fn[MAXPGPATH];

      tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
!     if (te->dataDumper)
      {
          snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
          tctx->filename = pg_strdup(fn);
      }
-     else if (strcmp(te->desc, "BLOBS") == 0)
-         tctx->filename = pg_strdup("blobs.toc");
      else
          tctx->filename = NULL;

--- 242,254 ----
      char        fn[MAXPGPATH];

      tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
!     if (strcmp(te->desc, "BLOBS") == 0)
!         tctx->filename = pg_strdup("blobs.toc");
!     else if (te->dataDumper)
      {
          snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
          tctx->filename = pg_strdup(fn);
      }
      else
          tctx->filename = NULL;

*************** setFilePath(ArchiveHandle *AH, char *buf
*** 727,732 ****
--- 729,796 ----
  }

  /*
+  * Prepare for parallel restore.
+  *
+  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+  * TOC entries' dataLength fields with appropriate values to guide the
+  * ordering of restore jobs.  The source of said data is format-dependent,
+  * as is the exact meaning of the values.
+  *
+  * A format module might also choose to do other setup here.
+  */
+ static void
+ _PrepParallelRestore(ArchiveHandle *AH)
+ {
+     TocEntry   *te;
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+         char        fname[MAXPGPATH];
+         struct stat st;
+
+         /*
+          * A dumpable object has set tctx->filename, any other object has not.
+          * (see _ArchiveEntry).
+          */
+         if (tctx->filename == NULL)
+             continue;
+
+         /* We may ignore items not due to be restored */
+         if ((te->reqs & REQ_DATA) == 0)
+             continue;
+
+         /*
+          * Stat the file and, if successful, put its size in dataLength.  When
+          * using compression, the physical file size might not be a very good
+          * guide to the amount of work involved in restoring the file, but we
+          * only need an approximate indicator of that.
+          */
+         setFilePath(AH, fname, tctx->filename);
+
+         if (stat(fname, &st) == 0)
+             te->dataLength = st.st_size;
+         else
+         {
+             /* It might be compressed */
+             strlcat(fname, ".gz", sizeof(fname));
+             if (stat(fname, &st) == 0)
+                 te->dataLength = st.st_size;
+         }
+
+         /*
+          * If this is the BLOBS entry, what we stat'd was blobs.toc, which
+          * most likely is a lot smaller than the actual blob data.  We don't
+          * have a cheap way to estimate how much smaller, but fortunately it
+          * doesn't matter too much as long as we get the blobs processed
+          * reasonably early.  Arbitrarily scale up by a factor of 1K.
+          */
+         if (strcmp(te->desc, "BLOBS") == 0)
+             te->dataLength *= 1024;
+     }
+ }
+
+ /*
   * Clone format-specific fields during parallel restoration.
   */
  static void
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 6763a89..0116072 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "catalog/pg_trigger_d.h"
  #include "catalog/pg_type_d.h"
  #include "libpq/libpq-fs.h"
+ #include "storage/block.h"

  #include "dumputils.h"
  #include "parallel.h"
*************** main(int argc, char **argv)
*** 845,854 ****
       */
      sortDumpableObjectsByTypeName(dobjs, numObjs);

-     /* If we do a parallel dump, we want the largest tables to go first */
-     if (archiveFormat == archDirectory && numWorkers > 1)
-         sortDataAndIndexObjectsBySize(dobjs, numObjs);
-
      sortDumpableObjects(dobjs, numObjs,
                          boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);

--- 846,851 ----
*************** dumpTableData(Archive *fout, TableDataIn
*** 2156,2168 ****
       * See comments for BuildArchiveDependencies.
       */
      if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
!         ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
!                      tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
!                      NULL, tbinfo->rolname,
!                      false, "TABLE DATA", SECTION_DATA,
!                      "", "", copyStmt,
!                      &(tbinfo->dobj.dumpId), 1,
!                      dumpFn, tdinfo);

      destroyPQExpBuffer(copyBuf);
      destroyPQExpBuffer(clistBuf);
--- 2153,2180 ----
       * See comments for BuildArchiveDependencies.
       */
      if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
!     {
!         TocEntry   *te;
!
!         te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
!                           tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
!                           NULL, tbinfo->rolname,
!                           false, "TABLE DATA", SECTION_DATA,
!                           "", "", copyStmt,
!                           &(tbinfo->dobj.dumpId), 1,
!                           dumpFn, tdinfo);
!
!         /*
!          * Set the TocEntry's dataLength in case we are doing a parallel dump
!          * and want to order dump jobs by table size.  We choose to measure
!          * dataLength in table pages during dump, so no scaling is needed.
!          * However, relpages is declared as "integer" in pg_class, and hence
!          * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
!          * Cast so that we get the right interpretation of table sizes
!          * exceeding INT_MAX pages.
!          */
!         te->dataLength = (BlockNumber) tbinfo->relpages;
!     }

      destroyPQExpBuffer(copyBuf);
      destroyPQExpBuffer(clistBuf);
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6759,6766 ****
                  i_conoid,
                  i_condef,
                  i_tablespace,
!                 i_indreloptions,
!                 i_relpages;
      int            ntups;

      for (i = 0; i < numTables; i++)
--- 6771,6777 ----
                  i_conoid,
                  i_condef,
                  i_tablespace,
!                 i_indreloptions;
      int            ntups;

      for (i = 0; i < numTables; i++)
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6807,6813 ****
                                "i.indnkeyatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6818,6824 ----
                                "i.indnkeyatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6844,6850 ****
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6855,6861 ----
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6877,6883 ****
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6888,6894 ----
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6906,6912 ****
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6917,6923 ----
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6938,6944 ****
                                "t.relnatts AS indnkeyatts, "
                                "t.relnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6949,6955 ----
                                "t.relnatts AS indnkeyatts, "
                                "t.relnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6974,6980 ****
          i_indkey = PQfnumber(res, "indkey");
          i_indisclustered = PQfnumber(res, "indisclustered");
          i_indisreplident = PQfnumber(res, "indisreplident");
-         i_relpages = PQfnumber(res, "relpages");
          i_contype = PQfnumber(res, "contype");
          i_conname = PQfnumber(res, "conname");
          i_condeferrable = PQfnumber(res, "condeferrable");
--- 6985,6990 ----
*************** getIndexes(Archive *fout, TableInfo tbli
*** 7013,7019 ****
              indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't');
              indxinfo[j].indisreplident = (PQgetvalue(res, j, i_indisreplident)[0] == 't');
              indxinfo[j].parentidx = atooid(PQgetvalue(res, j, i_parentidx));
-             indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages));
              contype = *(PQgetvalue(res, j, i_contype));

              if (contype == 'p' || contype == 'u' || contype == 'x')
--- 7023,7028 ----
*************** dumpDumpableObject(Archive *fout, Dumpab
*** 9845,9856 ****
              break;
          case DO_BLOB_DATA:
              if (dobj->dump & DUMP_COMPONENT_DATA)
!                 ArchiveEntry(fout, dobj->catId, dobj->dumpId,
!                              dobj->name, NULL, NULL, "",
!                              false, "BLOBS", SECTION_DATA,
!                              "", "", NULL,
!                              NULL, 0,
!                              dumpBlobs, NULL);
              break;
          case DO_POLICY:
              dumpPolicy(fout, (PolicyInfo *) dobj);
--- 9854,9884 ----
              break;
          case DO_BLOB_DATA:
              if (dobj->dump & DUMP_COMPONENT_DATA)
!             {
!                 TocEntry   *te;
!
!                 te = ArchiveEntry(fout, dobj->catId, dobj->dumpId,
!                                   dobj->name, NULL, NULL, "",
!                                   false, "BLOBS", SECTION_DATA,
!                                   "", "", NULL,
!                                   NULL, 0,
!                                   dumpBlobs, NULL);
!
!                 /*
!                  * Set the TocEntry's dataLength in case we are doing a
!                  * parallel dump and want to order dump jobs by table size.
!                  * (We need some size estimate for every TocEntry with a
!                  * DataDumper function.)  We don't currently have any cheap
!                  * way to estimate the size of blobs, but it doesn't matter;
!                  * let's just set the size to a large value so parallel dumps
!                  * will launch this job first.  If there's lots of blobs, we
!                  * win, and if there aren't, we don't lose much.  (If you want
!                  * to improve on this, really what you should be thinking
!                  * about is allowing blob dumping to be parallelized, not just
!                  * getting a smarter estimate for the single TOC entry.)
!                  */
!                 te->dataLength = MaxBlockNumber;
!             }
              break;
          case DO_POLICY:
              dumpPolicy(fout, (PolicyInfo *) dobj);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1448005..685ad78 100644
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
*************** typedef struct _indxInfo
*** 370,376 ****
      Oid            parentidx;        /* if partitioned, parent index OID */
      /* if there is an associated constraint object, its dumpId: */
      DumpId        indexconstraint;
-     int            relpages;        /* relpages of the underlying table */
  } IndxInfo;

  typedef struct _indexAttachInfo
--- 370,375 ----
*************** extern void parseOidArray(const char *st
*** 677,683 ****
  extern void sortDumpableObjects(DumpableObject **objs, int numObjs,
                      DumpId preBoundaryId, DumpId postBoundaryId);
  extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
- extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);

  /*
   * version specific routines
--- 676,681 ----
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 6227a8f..a1d3ced 100644
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
*************** static const char *modulename = gettext_
*** 35,44 ****
   * pg_dump.c; that is, PRE_DATA objects must sort before DO_PRE_DATA_BOUNDARY,
   * POST_DATA objects must sort after DO_POST_DATA_BOUNDARY, and DATA objects
   * must sort between them.
-  *
-  * Note: sortDataAndIndexObjectsBySize wants to have all DO_TABLE_DATA and
-  * DO_INDEX objects in contiguous chunks, so do not reuse the values for those
-  * for other object types.
   */
  static const int dbObjectTypePriority[] =
  {
--- 35,40 ----
*************** static void repairDependencyLoop(Dumpabl
*** 111,206 ****
  static void describeDumpableObject(DumpableObject *obj,
                         char *buf, int bufsize);

- static int    DOSizeCompare(const void *p1, const void *p2);
-
- static int
- findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
- {
-     int            i;
-
-     for (i = 0; i < numObjs; i++)
-         if (objs[i]->objType == type)
-             return i;
-     return -1;
- }
-
- static int
- findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
- {
-     int            i;
-
-     for (i = start; i < numObjs; i++)
-         if (objs[i]->objType != type)
-             return i;
-     return numObjs - 1;
- }
-
- /*
-  * When we do a parallel dump, we want to start with the largest items first.
-  *
-  * Say we have the objects in this order:
-  * ....DDDDD....III....
-  *
-  * with D = Table data, I = Index, . = other object
-  *
-  * This sorting function now takes each of the D or I blocks and sorts them
-  * according to their size.
-  */
- void
- sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
- {
-     int            startIdx,
-                 endIdx;
-     void       *startPtr;
-
-     if (numObjs <= 1)
-         return;
-
-     startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
-     if (startIdx >= 0)
-     {
-         endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
-         startPtr = objs + startIdx;
-         qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
-               DOSizeCompare);
-     }
-
-     startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
-     if (startIdx >= 0)
-     {
-         endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
-         startPtr = objs + startIdx;
-         qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
-               DOSizeCompare);
-     }
- }
-
- static int
- DOSizeCompare(const void *p1, const void *p2)
- {
-     DumpableObject *obj1 = *(DumpableObject **) p1;
-     DumpableObject *obj2 = *(DumpableObject **) p2;
-     int            obj1_size = 0;
-     int            obj2_size = 0;
-
-     if (obj1->objType == DO_TABLE_DATA)
-         obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
-     if (obj1->objType == DO_INDEX)
-         obj1_size = ((IndxInfo *) obj1)->relpages;
-
-     if (obj2->objType == DO_TABLE_DATA)
-         obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
-     if (obj2->objType == DO_INDEX)
-         obj2_size = ((IndxInfo *) obj2)->relpages;
-
-     /* we want to see the biggest item go first */
-     if (obj1_size > obj2_size)
-         return -1;
-     if (obj2_size > obj1_size)
-         return 1;
-
-     return 0;
- }

  /*
   * Sort the given objects into a type/name-based ordering
--- 107,112 ----

Re: pg_dump test instability

От
Peter Eisentraut
Дата:
On 28/08/2018 20:47, Tom Lane wrote:
> Here's a proposed patch for this.  It removes the hacking of the TOC list
> order, solving Peter's original problem, and instead sorts-by-size
> in the actual parallel dump or restore control code.

I have reviewed this patch.  I haven't done any major performance tests
or the like, but the improvements are clear in principle.

It does solve the issue that I had originally reported, when I apply it
on top of my development branch.

Some small comments on the code:

Maybe add a ready_list_free() to go with ready_list_init(), instead of
calling pg_free(ready_list.tes) directly.

get_next_work_item() has been changed to remove the work item from the
ready_list.  Maybe rename to something like pop_next_work_item()?

I'm confused by what ready_list_remove() is doing when it's not removing
the first item.  It looks like it's removing all leading items up to the
i'th one.  Is that what we want?  In some cases, we are skipping over
things that we are not interested at all, so this would work, but if
we're just skipping over an item because of a lock conflict, then it's
not right.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Re: pg_dump test instability

От
Tom Lane
Дата:
Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
> Some small comments on the code:

> Maybe add a ready_list_free() to go with ready_list_init(), instead of
> calling pg_free(ready_list.tes) directly.
> get_next_work_item() has been changed to remove the work item from the
> ready_list.  Maybe rename to something like pop_next_work_item()?

Both seem reasonable, will do.

> I'm confused by what ready_list_remove() is doing when it's not removing
> the first item.  It looks like it's removing all leading items up to the
> i'th one.  Is that what we want?  In some cases, we are skipping over
> things that we are not interested at all, so this would work, but if
> we're just skipping over an item because of a lock conflict, then it's
> not right.

No.  In both code paths, the array slot at index first_te is being
physically dropped from the set of valid entries (by incrementing
first_te).  In the first path, that slot holds the item we want to
remove logically from the set, so that incrementing first_te is
all we have to do: the remaining entries are still in the range
first_te..last_te, and they're still sorted.  In the second code
path, the item that was in that slot is still wanted as part of
the set, so we copy it into the valid range (overwriting the item
in slot i, which is no longer wanted).  Now the valid range is
probably not sorted, so we have to flag that a re-sort is needed.

I expect that most of the time the first code path will be taken,
because usually we'll be able to dispatch the highest-priority
ready entry.  We'll only take the second path when we have to postpone
the highest-priority entry because of a potential lock conflict
against some already-running task.  Any items between first_te and i
are other tasks that also have lock conflicts and can't be dispatched
yet; we certainly don't want to lose them, and this code doesn't.

If you can suggest comments that would clarify this more,
I'm all ears.

            regards, tom lane


Re: pg_dump test instability

От
Peter Eisentraut
Дата:
On 12/09/2018 18:06, Tom Lane wrote:
> No.  In both code paths, the array slot at index first_te is being
> physically dropped from the set of valid entries (by incrementing
> first_te).  In the first path, that slot holds the item we want to
> remove logically from the set, so that incrementing first_te is
> all we have to do: the remaining entries are still in the range
> first_te..last_te, and they're still sorted.  In the second code
> path, the item that was in that slot is still wanted as part of
> the set, so we copy it into the valid range (overwriting the item
> in slot i, which is no longer wanted).  Now the valid range is
> probably not sorted, so we have to flag that a re-sort is needed.

I see.  Why not shift all items up to the i'th up by one place, instead
of moving only the first one?  That way the sortedness would be
preserved.  Otherwise we'd move the first one into the middle, then
sorting would move it to the front again, etc.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Re: pg_dump test instability

От
Tom Lane
Дата:
Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
> On 12/09/2018 18:06, Tom Lane wrote:
>> No.  In both code paths, the array slot at index first_te is being
>> physically dropped from the set of valid entries (by incrementing
>> first_te).  In the first path, that slot holds the item we want to
>> remove logically from the set, so that incrementing first_te is
>> all we have to do: the remaining entries are still in the range
>> first_te..last_te, and they're still sorted.  In the second code
>> path, the item that was in that slot is still wanted as part of
>> the set, so we copy it into the valid range (overwriting the item
>> in slot i, which is no longer wanted).  Now the valid range is
>> probably not sorted, so we have to flag that a re-sort is needed.

> I see.  Why not shift all items up to the i'th up by one place, instead
> of moving only the first one?  That way the sortedness would be
> preserved.  Otherwise we'd move the first one into the middle, then
> sorting would move it to the front again, etc.

Hmmm ... might be worth doing, but I'm not sure.  The steady-state cycle
will probably be that after one task has been dispatched, we'll sleep
until some task finishes and then that will unblock some pending items,
resulting in new entries at the end of the list, forcing a sort anyway
before we next dispatch a task.  So I was expecting that avoiding a sort
here wasn't really going to be worth expending much effort for.  But my
intuition about that could be wrong.  I'll run a test case with some
instrumentation added and see how often we could avoid sorts by
memmove'ing.

            regards, tom lane


Re: pg_dump test instability

От
Tom Lane
Дата:
I wrote:
> Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
>> I see.  Why not shift all items up to the i'th up by one place, instead
>> of moving only the first one?  That way the sortedness would be
>> preserved.  Otherwise we'd move the first one into the middle, then
>> sorting would move it to the front again, etc.

> Hmmm ... might be worth doing, but I'm not sure.  The steady-state cycle
> will probably be that after one task has been dispatched, we'll sleep
> until some task finishes and then that will unblock some pending items,
> resulting in new entries at the end of the list, forcing a sort anyway
> before we next dispatch a task.  So I was expecting that avoiding a sort
> here wasn't really going to be worth expending much effort for.  But my
> intuition about that could be wrong.  I'll run a test case with some
> instrumentation added and see how often we could avoid sorts by
> memmove'ing.

OK, my intuition was faulty.  At least when testing with the regression
database, situations where we are taking the slow path at all seem to
involve several interrelated dump objects (eg indexes of a table) that
are all waiting for the same lock, such that we may be able to dispatch a
number of unrelated tasks before anything gets added from the pending
list.  Doing it as you suggest eliminates a significant fraction of
the re-sort operations.

Attached updated patch does it like that and makes the cosmetic
adjustments you suggested.   I also went ahead and did the renaming
of par_prev/par_next/par_list_xxx that I'd suggested upthread.
I think this is committable ...

            regards, tom lane

diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 42cf441..ba79821 100644
*** a/src/bin/pg_dump/pg_backup.h
--- b/src/bin/pg_dump/pg_backup.h
*************** extern void ConnectDatabase(Archive *AH,
*** 252,269 ****
  extern void DisconnectDatabase(Archive *AHX);
  extern PGconn *GetConnection(Archive *AHX);

- /* Called to add a TOC entry */
- extern void ArchiveEntry(Archive *AHX,
-              CatalogId catalogId, DumpId dumpId,
-              const char *tag,
-              const char *namespace, const char *tablespace,
-              const char *owner, bool withOids,
-              const char *desc, teSection section,
-              const char *defn,
-              const char *dropStmt, const char *copyStmt,
-              const DumpId *deps, int nDeps,
-              DataDumperPtr dumpFn, void *dumpArg);
-
  /* Called to write *data* to the archive */
  extern void WriteData(Archive *AH, const void *data, size_t dLen);

--- 252,257 ----
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 36e3383..3f7a658 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** typedef struct _outputContext
*** 49,54 ****
--- 49,72 ----
      int            gzOut;
  } OutputContext;

+ /*
+  * State for tracking TocEntrys that are ready to process during a parallel
+  * restore.  (This used to be a list, and we still call it that, though now
+  * it's really an array so that we can apply qsort to it.)
+  *
+  * tes[] is sized large enough that we can't overrun it.
+  * The valid entries are indexed first_te .. last_te inclusive.
+  * We periodically sort the array to bring larger-by-dataLength entries to
+  * the front; "sorted" is true if the valid entries are known sorted.
+  */
+ typedef struct _parallelReadyList
+ {
+     TocEntry  **tes;            /* Ready-to-dump TocEntrys */
+     int            first_te;        /* index of first valid entry in tes[] */
+     int            last_te;        /* index of last valid entry in tes[] */
+     bool        sorted;            /* are valid entries currently sorted? */
+ } ParallelReadyList;
+
  /* translator: this is a module name */
  static const char *modulename = gettext_noop("archiver");

*************** static void restore_toc_entries_parallel
*** 95,107 ****
                               TocEntry *pending_list);
  static void restore_toc_entries_postfork(ArchiveHandle *AH,
                               TocEntry *pending_list);
! static void par_list_header_init(TocEntry *l);
! static void par_list_append(TocEntry *l, TocEntry *te);
! static void par_list_remove(TocEntry *te);
! static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
                     RestorePass pass);
! static TocEntry *get_next_work_item(ArchiveHandle *AH,
!                    TocEntry *ready_list,
                     ParallelState *pstate);
  static void mark_dump_job_done(ArchiveHandle *AH,
                     TocEntry *te,
--- 113,132 ----
                               TocEntry *pending_list);
  static void restore_toc_entries_postfork(ArchiveHandle *AH,
                               TocEntry *pending_list);
! static void pending_list_header_init(TocEntry *l);
! static void pending_list_append(TocEntry *l, TocEntry *te);
! static void pending_list_remove(TocEntry *te);
! static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
! static void ready_list_free(ParallelReadyList *ready_list);
! static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
! static void ready_list_remove(ParallelReadyList *ready_list, int i);
! static void ready_list_sort(ParallelReadyList *ready_list);
! static int    TocEntrySizeCompare(const void *p1, const void *p2);
! static void move_to_ready_list(TocEntry *pending_list,
!                    ParallelReadyList *ready_list,
                     RestorePass pass);
! static TocEntry *pop_next_work_item(ArchiveHandle *AH,
!                    ParallelReadyList *ready_list,
                     ParallelState *pstate);
  static void mark_dump_job_done(ArchiveHandle *AH,
                     TocEntry *te,
*************** static bool has_lock_conflicts(TocEntry
*** 116,122 ****
  static void repoint_table_dependencies(ArchiveHandle *AH);
  static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
  static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
!                     TocEntry *ready_list);
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);

--- 141,147 ----
  static void repoint_table_dependencies(ArchiveHandle *AH);
  static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
  static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
!                     ParallelReadyList *ready_list);
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);

*************** RestoreArchive(Archive *AHX)
*** 639,645 ****
          ParallelState *pstate;
          TocEntry    pending_list;

!         par_list_header_init(&pending_list);

          /* This runs PRE_DATA items and then disconnects from the database */
          restore_toc_entries_prefork(AH, &pending_list);
--- 664,674 ----
          ParallelState *pstate;
          TocEntry    pending_list;

!         /* The archive format module may need some setup for this */
!         if (AH->PrepParallelRestorePtr)
!             AH->PrepParallelRestorePtr(AH);
!
!         pending_list_header_init(&pending_list);

          /* This runs PRE_DATA items and then disconnects from the database */
          restore_toc_entries_prefork(AH, &pending_list);
*************** WriteData(Archive *AHX, const void *data
*** 1039,1048 ****
  /*
   * Create a new TOC entry. The TOC was designed as a TOC, but is now the
   * repository for all metadata. But the name has stuck.
   */

  /* Public */
! void
  ArchiveEntry(Archive *AHX,
               CatalogId catalogId, DumpId dumpId,
               const char *tag,
--- 1068,1081 ----
  /*
   * Create a new TOC entry. The TOC was designed as a TOC, but is now the
   * repository for all metadata. But the name has stuck.
+  *
+  * The new entry is added to the Archive's TOC list.  Most callers can ignore
+  * the result value because nothing else need be done, but a few want to
+  * manipulate the TOC entry further.
   */

  /* Public */
! TocEntry *
  ArchiveEntry(Archive *AHX,
               CatalogId catalogId, DumpId dumpId,
               const char *tag,
*************** ArchiveEntry(Archive *AHX,
*** 1100,1108 ****
--- 1133,1144 ----
      newToc->hadDumper = dumpFn ? true : false;

      newToc->formatData = NULL;
+     newToc->dataLength = 0;

      if (AH->ArchiveEntryPtr != NULL)
          AH->ArchiveEntryPtr(AH, newToc);
+
+     return newToc;
  }

  /* Public */
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2413,2444 ****
  {
      TocEntry   *te;

!     for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         if (!te->dataDumper)
!             continue;
!
!         if ((te->reqs & REQ_DATA) == 0)
!             continue;

!         if (pstate && pstate->numWorkers > 1)
          {
!             /*
!              * If we are in a parallel backup, then we are always the master
!              * process.  Dispatch each data-transfer job to a worker.
!              */
!             DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
!                                    mark_dump_job_done, NULL);
          }
-         else
-             WriteDataChunksForTocEntry(AH, te);
-     }

!     /*
!      * If parallel, wait for workers to finish.
!      */
!     if (pstate && pstate->numWorkers > 1)
          WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
  }


--- 2449,2507 ----
  {
      TocEntry   *te;

!     if (pstate && pstate->numWorkers > 1)
      {
!         /*
!          * In parallel mode, this code runs in the master process.  We
!          * construct an array of candidate TEs, then sort it into decreasing
!          * size order, then dispatch each TE to a data-transfer worker.  By
!          * dumping larger tables first, we avoid getting into a situation
!          * where we're down to one job and it's big, losing parallelism.
!          */
!         TocEntry  **tes;
!         int            ntes;

!         tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
!         ntes = 0;
!         for (te = AH->toc->next; te != AH->toc; te = te->next)
          {
!             /* Consider only TEs with dataDumper functions ... */
!             if (!te->dataDumper)
!                 continue;
!             /* ... and ignore ones not enabled for dump */
!             if ((te->reqs & REQ_DATA) == 0)
!                 continue;
!
!             tes[ntes++] = te;
          }

!         if (ntes > 1)
!             qsort((void *) tes, ntes, sizeof(TocEntry *),
!                   TocEntrySizeCompare);
!
!         for (int i = 0; i < ntes; i++)
!             DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
!                                    mark_dump_job_done, NULL);
!
!         pg_free(tes);
!
!         /* Now wait for workers to finish. */
          WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
+     }
+     else
+     {
+         /* Non-parallel mode: just dump all candidate TEs sequentially. */
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             /* Must have same filter conditions as above */
+             if (!te->dataDumper)
+                 continue;
+             if ((te->reqs & REQ_DATA) == 0)
+                 continue;
+
+             WriteDataChunksForTocEntry(AH, te);
+         }
+     }
  }


*************** ReadToc(ArchiveHandle *AH)
*** 2690,2695 ****
--- 2753,2759 ----
              te->dependencies = NULL;
              te->nDeps = 0;
          }
+         te->dataLength = 0;

          if (AH->ReadExtraTocPtr)
              AH->ReadExtraTocPtr(AH, te);
*************** restore_toc_entries_prefork(ArchiveHandl
*** 3996,4002 ****
          else
          {
              /* Nope, so add it to pending_list */
!             par_list_append(pending_list, next_work_item);
          }
      }

--- 4060,4066 ----
          else
          {
              /* Nope, so add it to pending_list */
!             pending_list_append(pending_list, next_work_item);
          }
      }

*************** static void
*** 4035,4045 ****
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
!     TocEntry    ready_list;
      TocEntry   *next_work_item;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

      /*
       * The pending_list contains all items that we need to restore.  Move all
       * items that are available to process immediately into the ready_list.
--- 4099,4112 ----
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                               TocEntry *pending_list)
  {
!     ParallelReadyList ready_list;
      TocEntry   *next_work_item;

      ahlog(AH, 2, "entering restore_toc_entries_parallel\n");

+     /* Set up ready_list with enough room for all known TocEntrys */
+     ready_list_init(&ready_list, AH->tocCount);
+
      /*
       * The pending_list contains all items that we need to restore.  Move all
       * items that are available to process immediately into the ready_list.
*************** restore_toc_entries_parallel(ArchiveHand
*** 4048,4054 ****
       * contains items that have no remaining dependencies and are OK to
       * process in the current restore pass.
       */
-     par_list_header_init(&ready_list);
      AH->restorePass = RESTORE_PASS_MAIN;
      move_to_ready_list(pending_list, &ready_list, AH->restorePass);

--- 4115,4120 ----
*************** restore_toc_entries_parallel(ArchiveHand
*** 4064,4070 ****
      for (;;)
      {
          /* Look for an item ready to be dispatched to a worker */
!         next_work_item = get_next_work_item(AH, &ready_list, pstate);
          if (next_work_item != NULL)
          {
              /* If not to be restored, don't waste time launching a worker */
--- 4130,4136 ----
      for (;;)
      {
          /* Look for an item ready to be dispatched to a worker */
!         next_work_item = pop_next_work_item(AH, &ready_list, pstate);
          if (next_work_item != NULL)
          {
              /* If not to be restored, don't waste time launching a worker */
*************** restore_toc_entries_parallel(ArchiveHand
*** 4073,4080 ****
                  ahlog(AH, 1, "skipping item %d %s %s\n",
                        next_work_item->dumpId,
                        next_work_item->desc, next_work_item->tag);
!                 /* Drop it from ready_list, and update its dependencies */
!                 par_list_remove(next_work_item);
                  reduce_dependencies(AH, next_work_item, &ready_list);
                  /* Loop around to see if anything else can be dispatched */
                  continue;
--- 4139,4145 ----
                  ahlog(AH, 1, "skipping item %d %s %s\n",
                        next_work_item->dumpId,
                        next_work_item->desc, next_work_item->tag);
!                 /* Update its dependencies as though we'd completed it */
                  reduce_dependencies(AH, next_work_item, &ready_list);
                  /* Loop around to see if anything else can be dispatched */
                  continue;
*************** restore_toc_entries_parallel(ArchiveHand
*** 4084,4092 ****
                    next_work_item->dumpId,
                    next_work_item->desc, next_work_item->tag);

!             /* Remove it from ready_list, and dispatch to some worker */
!             par_list_remove(next_work_item);
!
              DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
                                     mark_restore_job_done, &ready_list);
          }
--- 4149,4155 ----
                    next_work_item->dumpId,
                    next_work_item->desc, next_work_item->tag);

!             /* Dispatch to some worker */
              DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
                                     mark_restore_job_done, &ready_list);
          }
*************** restore_toc_entries_parallel(ArchiveHand
*** 4132,4138 ****
      }

      /* There should now be nothing in ready_list. */
!     Assert(ready_list.par_next == &ready_list);

      ahlog(AH, 1, "finished main parallel loop\n");
  }
--- 4195,4203 ----
      }

      /* There should now be nothing in ready_list. */
!     Assert(ready_list.first_te > ready_list.last_te);
!
!     ready_list_free(&ready_list);

      ahlog(AH, 1, "finished main parallel loop\n");
  }
*************** restore_toc_entries_postfork(ArchiveHand
*** 4170,4176 ****
       * connection.  We don't sweat about RestorePass ordering; it's likely we
       * already violated that.
       */
!     for (te = pending_list->par_next; te != pending_list; te = te->par_next)
      {
          ahlog(AH, 1, "processing missed item %d %s %s\n",
                te->dumpId, te->desc, te->tag);
--- 4235,4241 ----
       * connection.  We don't sweat about RestorePass ordering; it's likely we
       * already violated that.
       */
!     for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
      {
          ahlog(AH, 1, "processing missed item %d %s %s\n",
                te->dumpId, te->desc, te->tag);
*************** has_lock_conflicts(TocEntry *te1, TocEnt
*** 4201,4236 ****


  /*
!  * Initialize the header of a parallel-processing list.
   *
!  * These are circular lists with a dummy TocEntry as header, just like the
   * main TOC list; but we use separate list links so that an entry can be in
!  * the main TOC list as well as in a parallel-processing list.
   */
  static void
! par_list_header_init(TocEntry *l)
  {
!     l->par_prev = l->par_next = l;
  }

! /* Append te to the end of the parallel-processing list headed by l */
  static void
! par_list_append(TocEntry *l, TocEntry *te)
  {
!     te->par_prev = l->par_prev;
!     l->par_prev->par_next = te;
!     l->par_prev = te;
!     te->par_next = l;
  }

! /* Remove te from whatever parallel-processing list it's in */
  static void
! par_list_remove(TocEntry *te)
  {
!     te->par_prev->par_next = te->par_next;
!     te->par_next->par_prev = te->par_prev;
!     te->par_prev = NULL;
!     te->par_next = NULL;
  }


--- 4266,4395 ----


  /*
!  * Initialize the header of the pending-items list.
   *
!  * This is a circular list with a dummy TocEntry as header, just like the
   * main TOC list; but we use separate list links so that an entry can be in
!  * the main TOC list as well as in the pending list.
   */
  static void
! pending_list_header_init(TocEntry *l)
  {
!     l->pending_prev = l->pending_next = l;
  }

! /* Append te to the end of the pending-list headed by l */
  static void
! pending_list_append(TocEntry *l, TocEntry *te)
  {
!     te->pending_prev = l->pending_prev;
!     l->pending_prev->pending_next = te;
!     l->pending_prev = te;
!     te->pending_next = l;
  }

! /* Remove te from the pending-list */
  static void
! pending_list_remove(TocEntry *te)
  {
!     te->pending_prev->pending_next = te->pending_next;
!     te->pending_next->pending_prev = te->pending_prev;
!     te->pending_prev = NULL;
!     te->pending_next = NULL;
! }
!
!
! /*
!  * Initialize the ready_list with enough room for up to tocCount entries.
!  */
! static void
! ready_list_init(ParallelReadyList *ready_list, int tocCount)
! {
!     ready_list->tes = (TocEntry **)
!         pg_malloc(tocCount * sizeof(TocEntry *));
!     ready_list->first_te = 0;
!     ready_list->last_te = -1;
!     ready_list->sorted = false;
! }
!
! /*
!  * Free storage for a ready_list.
!  */
! static void
! ready_list_free(ParallelReadyList *ready_list)
! {
!     pg_free(ready_list->tes);
! }
!
! /* Add te to the ready_list */
! static void
! ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
! {
!     ready_list->tes[++ready_list->last_te] = te;
!     /* List is (probably) not sorted anymore. */
!     ready_list->sorted = false;
! }
!
! /* Remove the i'th entry in the ready_list */
! static void
! ready_list_remove(ParallelReadyList *ready_list, int i)
! {
!     int            f = ready_list->first_te;
!
!     Assert(i >= f && i <= ready_list->last_te);
!
!     /*
!      * In the typical case where the item to be removed is the first ready
!      * entry, we need only increment first_te to remove it.  Otherwise, move
!      * the entries before it to compact the list.  (This preserves sortedness,
!      * if any.)  We could alternatively move the entries after i, but there
!      * are typically many more of those.
!      */
!     if (i > f)
!     {
!         TocEntry  **first_te_ptr = &ready_list->tes[f];
!
!         memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
!     }
!     ready_list->first_te++;
! }
!
! /* Sort the ready_list into the desired order */
! static void
! ready_list_sort(ParallelReadyList *ready_list)
! {
!     if (!ready_list->sorted)
!     {
!         int            n = ready_list->last_te - ready_list->first_te + 1;
!
!         if (n > 1)
!             qsort(ready_list->tes + ready_list->first_te, n,
!                   sizeof(TocEntry *),
!                   TocEntrySizeCompare);
!         ready_list->sorted = true;
!     }
! }
!
! /* qsort comparator for sorting TocEntries by dataLength */
! static int
! TocEntrySizeCompare(const void *p1, const void *p2)
! {
!     const TocEntry *te1 = *(const TocEntry *const *) p1;
!     const TocEntry *te2 = *(const TocEntry *const *) p2;
!
!     /* Sort by decreasing dataLength */
!     if (te1->dataLength > te2->dataLength)
!         return -1;
!     if (te1->dataLength < te2->dataLength)
!         return 1;
!
!     /* For equal dataLengths, sort by dumpId, just to be stable */
!     if (te1->dumpId < te2->dumpId)
!         return -1;
!     if (te1->dumpId > te2->dumpId)
!         return 1;
!
!     return 0;
  }


*************** par_list_remove(TocEntry *te)
*** 4242,4293 ****
   * which applies the same logic one-at-a-time.)
   */
  static void
! move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list,
                     RestorePass pass)
  {
      TocEntry   *te;
      TocEntry   *next_te;

!     for (te = pending_list->par_next; te != pending_list; te = next_te)
      {
!         /* must save list link before possibly moving te to other list */
!         next_te = te->par_next;

          if (te->depCount == 0 &&
              _tocEntryRestorePass(te) == pass)
          {
              /* Remove it from pending_list ... */
!             par_list_remove(te);
              /* ... and add to ready_list */
!             par_list_append(ready_list, te);
          }
      }
  }

  /*
!  * Find the next work item (if any) that is capable of being run now.
   *
   * To qualify, the item must have no remaining dependencies
   * and no requirements for locks that are incompatible with
   * items currently running.  Items in the ready_list are known to have
   * no remaining dependencies, but we have to check for lock conflicts.
   *
-  * Note that the returned item has *not* been removed from ready_list.
-  * The caller must do that after successfully dispatching the item.
-  *
   * pref_non_data is for an alternative selection algorithm that gives
   * preference to non-data items if there is already a data load running.
   * It is currently disabled.
   */
  static TocEntry *
! get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
                     ParallelState *pstate)
  {
      bool        pref_non_data = false;    /* or get from AH->ropt */
!     TocEntry   *data_te = NULL;
!     TocEntry   *te;
!     int            i,
!                 k;

      /*
       * Bogus heuristics for pref_non_data
--- 4401,4450 ----
   * which applies the same logic one-at-a-time.)
   */
  static void
! move_to_ready_list(TocEntry *pending_list,
!                    ParallelReadyList *ready_list,
                     RestorePass pass)
  {
      TocEntry   *te;
      TocEntry   *next_te;

!     for (te = pending_list->pending_next; te != pending_list; te = next_te)
      {
!         /* must save list link before possibly removing te from list */
!         next_te = te->pending_next;

          if (te->depCount == 0 &&
              _tocEntryRestorePass(te) == pass)
          {
              /* Remove it from pending_list ... */
!             pending_list_remove(te);
              /* ... and add to ready_list */
!             ready_list_insert(ready_list, te);
          }
      }
  }

  /*
!  * Find the next work item (if any) that is capable of being run now,
!  * and remove it from the ready_list.
!  *
!  * Returns the item, or NULL if nothing is runnable.
   *
   * To qualify, the item must have no remaining dependencies
   * and no requirements for locks that are incompatible with
   * items currently running.  Items in the ready_list are known to have
   * no remaining dependencies, but we have to check for lock conflicts.
   *
   * pref_non_data is for an alternative selection algorithm that gives
   * preference to non-data items if there is already a data load running.
   * It is currently disabled.
   */
  static TocEntry *
! pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
                     ParallelState *pstate)
  {
      bool        pref_non_data = false;    /* or get from AH->ropt */
!     int            data_te_index = -1;

      /*
       * Bogus heuristics for pref_non_data
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4296,4302 ****
      {
          int            count = 0;

!         for (k = 0; k < pstate->numWorkers; k++)
          {
              TocEntry   *running_te = pstate->te[k];

--- 4453,4459 ----
      {
          int            count = 0;

!         for (int k = 0; k < pstate->numWorkers; k++)
          {
              TocEntry   *running_te = pstate->te[k];

*************** get_next_work_item(ArchiveHandle *AH, To
*** 4309,4318 ****
      }

      /*
       * Search the ready_list until we find a suitable item.
       */
!     for (te = ready_list->par_next; te != ready_list; te = te->par_next)
      {
          bool        conflicts = false;

          /*
--- 4466,4481 ----
      }

      /*
+      * Sort the ready_list so that we'll tackle larger jobs first.
+      */
+     ready_list_sort(ready_list);
+
+     /*
       * Search the ready_list until we find a suitable item.
       */
!     for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
      {
+         TocEntry   *te = ready_list->tes[i];
          bool        conflicts = false;

          /*
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4320,4328 ****
           * that a currently running item also needs lock on, or vice versa. If
           * so, we don't want to schedule them together.
           */
!         for (i = 0; i < pstate->numWorkers; i++)
          {
!             TocEntry   *running_te = pstate->te[i];

              if (running_te == NULL)
                  continue;
--- 4483,4491 ----
           * that a currently running item also needs lock on, or vice versa. If
           * so, we don't want to schedule them together.
           */
!         for (int k = 0; k < pstate->numWorkers; k++)
          {
!             TocEntry   *running_te = pstate->te[k];

              if (running_te == NULL)
                  continue;
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4339,4355 ****

          if (pref_non_data && te->section == SECTION_DATA)
          {
!             if (data_te == NULL)
!                 data_te = te;
              continue;
          }

          /* passed all tests, so this item can run */
          return te;
      }

!     if (data_te != NULL)
          return data_te;

      ahlog(AH, 2, "no item ready\n");
      return NULL;
--- 4502,4524 ----

          if (pref_non_data && te->section == SECTION_DATA)
          {
!             if (data_te_index < 0)
!                 data_te_index = i;
              continue;
          }

          /* passed all tests, so this item can run */
+         ready_list_remove(ready_list, i);
          return te;
      }

!     if (data_te_index >= 0)
!     {
!         TocEntry   *data_te = ready_list->tes[data_te_index];
!
!         ready_list_remove(ready_list, data_te_index);
          return data_te;
+     }

      ahlog(AH, 2, "no item ready\n");
      return NULL;
*************** mark_restore_job_done(ArchiveHandle *AH,
*** 4393,4399 ****
                        int status,
                        void *callback_data)
  {
!     TocEntry   *ready_list = (TocEntry *) callback_data;

      ahlog(AH, 1, "finished item %d %s %s\n",
            te->dumpId, te->desc, te->tag);
--- 4562,4568 ----
                        int status,
                        void *callback_data)
  {
!     ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;

      ahlog(AH, 1, "finished item %d %s %s\n",
            te->dumpId, te->desc, te->tag);
*************** fix_dependencies(ArchiveHandle *AH)
*** 4443,4450 ****
          te->depCount = te->nDeps;
          te->revDeps = NULL;
          te->nRevDeps = 0;
!         te->par_prev = NULL;
!         te->par_next = NULL;
      }

      /*
--- 4612,4619 ----
          te->depCount = te->nDeps;
          te->revDeps = NULL;
          te->nRevDeps = 0;
!         te->pending_prev = NULL;
!         te->pending_next = NULL;
      }

      /*
*************** fix_dependencies(ArchiveHandle *AH)
*** 4551,4556 ****
--- 4720,4731 ----
  /*
   * Change dependencies on table items to depend on table data items instead,
   * but only in POST_DATA items.
+  *
+  * Also, for any item having such dependency(s), set its dataLength to the
+  * largest dataLength of the table data items it depends on.  This ensures
+  * that parallel restore will prioritize larger jobs (index builds, FK
+  * constraint checks, etc) over smaller ones, avoiding situations where we
+  * end a restore with only one active job working on a large table.
   */
  static void
  repoint_table_dependencies(ArchiveHandle *AH)
*************** repoint_table_dependencies(ArchiveHandle
*** 4569,4577 ****
              if (olddep <= AH->maxDumpId &&
                  AH->tableDataId[olddep] != 0)
              {
!                 te->dependencies[i] = AH->tableDataId[olddep];
                  ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
!                       te->dumpId, olddep, AH->tableDataId[olddep]);
              }
          }
      }
--- 4744,4756 ----
              if (olddep <= AH->maxDumpId &&
                  AH->tableDataId[olddep] != 0)
              {
!                 DumpId        tabledataid = AH->tableDataId[olddep];
!                 TocEntry   *tabledatate = AH->tocsByDumpId[tabledataid];
!
!                 te->dependencies[i] = tabledataid;
!                 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
                  ahlog(AH, 2, "transferring dependency %d -> %d to %d\n",
!                       te->dumpId, olddep, tabledataid);
              }
          }
      }
*************** identify_locking_dependencies(ArchiveHan
*** 4647,4653 ****
   * becomes ready should be moved to the ready_list, if that's provided.
   */
  static void
! reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
  {
      int            i;

--- 4826,4833 ----
   * becomes ready should be moved to the ready_list, if that's provided.
   */
  static void
! reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
!                     ParallelReadyList *ready_list)
  {
      int            i;

*************** reduce_dependencies(ArchiveHandle *AH, T
*** 4670,4682 ****
           */
          if (otherte->depCount == 0 &&
              _tocEntryRestorePass(otherte) == AH->restorePass &&
!             otherte->par_prev != NULL &&
              ready_list != NULL)
          {
              /* Remove it from pending list ... */
!             par_list_remove(otherte);
              /* ... and add to ready_list */
!             par_list_append(ready_list, otherte);
          }
      }
  }
--- 4850,4862 ----
           */
          if (otherte->depCount == 0 &&
              _tocEntryRestorePass(otherte) == AH->restorePass &&
!             otherte->pending_prev != NULL &&
              ready_list != NULL)
          {
              /* Remove it from pending list ... */
!             pending_list_remove(otherte);
              /* ... and add to ready_list */
!             ready_list_insert(ready_list, otherte);
          }
      }
  }
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 8dd1915..26dd044 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef int (*WriteBytePtrType) (Archive
*** 162,173 ****
  typedef int (*ReadBytePtrType) (ArchiveHandle *AH);
  typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len);
  typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len);
- typedef void (*SaveArchivePtrType) (ArchiveHandle *AH);
  typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te);

  typedef void (*ClonePtrType) (ArchiveHandle *AH);
  typedef void (*DeClonePtrType) (ArchiveHandle *AH);

--- 162,173 ----
  typedef int (*ReadBytePtrType) (ArchiveHandle *AH);
  typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len);
  typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len);
  typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te);
  typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te);

+ typedef void (*PrepParallelRestorePtrType) (ArchiveHandle *AH);
  typedef void (*ClonePtrType) (ArchiveHandle *AH);
  typedef void (*DeClonePtrType) (ArchiveHandle *AH);

*************** struct _archiveHandle
*** 297,302 ****
--- 297,303 ----
      WorkerJobDumpPtrType WorkerJobDumpPtr;
      WorkerJobRestorePtrType WorkerJobRestorePtr;

+     PrepParallelRestorePtrType PrepParallelRestorePtr;
      ClonePtrType ClonePtr;        /* Clone format-specific fields */
      DeClonePtrType DeClonePtr;    /* Clean up cloned fields */

*************** struct _tocEntry
*** 387,398 ****
      void       *formatData;        /* TOC Entry data specific to file format */

      /* working state while dumping/restoring */
      teReqs        reqs;            /* do we need schema and/or data of object */
      bool        created;        /* set for DATA member if TABLE was created */

      /* working state (needed only for parallel restore) */
!     struct _tocEntry *par_prev; /* list links for pending/ready items; */
!     struct _tocEntry *par_next; /* these are NULL if not in either list */
      int            depCount;        /* number of dependencies not yet restored */
      DumpId       *revDeps;        /* dumpIds of objects depending on this one */
      int            nRevDeps;        /* number of such dependencies */
--- 388,400 ----
      void       *formatData;        /* TOC Entry data specific to file format */

      /* working state while dumping/restoring */
+     pgoff_t        dataLength;        /* item's data size; 0 if none or unknown */
      teReqs        reqs;            /* do we need schema and/or data of object */
      bool        created;        /* set for DATA member if TABLE was created */

      /* working state (needed only for parallel restore) */
!     struct _tocEntry *pending_prev; /* list links for pending-items list; */
!     struct _tocEntry *pending_next; /* NULL if not in that list */
      int            depCount;        /* number of dependencies not yet restored */
      DumpId       *revDeps;        /* dumpIds of objects depending on this one */
      int            nRevDeps;        /* number of such dependencies */
*************** extern void on_exit_close_archive(Archiv
*** 405,410 ****
--- 407,424 ----

  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
pg_attribute_printf(3,4); 

+ /* Called to add a TOC entry */
+ extern TocEntry *ArchiveEntry(Archive *AHX,
+              CatalogId catalogId, DumpId dumpId,
+              const char *tag,
+              const char *namespace, const char *tablespace,
+              const char *owner, bool withOids,
+              const char *desc, teSection section,
+              const char *defn,
+              const char *dropStmt, const char *copyStmt,
+              const DumpId *deps, int nDeps,
+              DataDumperPtr dumpFn, void *dumpArg);
+
  extern void WriteTOC(ArchiveHandle *AH);
  extern void ReadTOC(ArchiveHandle *AH);
  extern void WriteHead(ArchiveHandle *AH);
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index ad18a6c..96f44e8 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** static void _StartBlob(ArchiveHandle *AH
*** 59,64 ****
--- 59,66 ----
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, bool drop);
+
+ static void _PrepParallelRestore(ArchiveHandle *AH);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);

*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 129,134 ****
--- 131,138 ----
      AH->StartBlobPtr = _StartBlob;
      AH->EndBlobPtr = _EndBlob;
      AH->EndBlobsPtr = _EndBlobs;
+
+     AH->PrepParallelRestorePtr = _PrepParallelRestore;
      AH->ClonePtr = _Clone;
      AH->DeClonePtr = _DeClone;

*************** _ReopenArchive(ArchiveHandle *AH)
*** 776,781 ****
--- 780,845 ----
  }

  /*
+  * Prepare for parallel restore.
+  *
+  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+  * TOC entries' dataLength fields with appropriate values to guide the
+  * ordering of restore jobs.  The source of said data is format-dependent,
+  * as is the exact meaning of the values.
+  *
+  * A format module might also choose to do other setup here.
+  */
+ static void
+ _PrepParallelRestore(ArchiveHandle *AH)
+ {
+     lclContext *ctx = (lclContext *) AH->formatData;
+     TocEntry   *prev_te = NULL;
+     lclTocEntry *prev_tctx = NULL;
+     TocEntry   *te;
+
+     /*
+      * Knowing that the data items were dumped out in TOC order, we can
+      * reconstruct the length of each item as the delta to the start offset of
+      * the next data item.
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+         /*
+          * Ignore entries without a known data offset; if we were unable to
+          * seek to rewrite the TOC when creating the archive, this'll be all
+          * of them, and we'll end up with no size estimates.
+          */
+         if (tctx->dataState != K_OFFSET_POS_SET)
+             continue;
+
+         /* Compute previous data item's length */
+         if (prev_te)
+         {
+             if (tctx->dataPos > prev_tctx->dataPos)
+                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
+         }
+
+         prev_te = te;
+         prev_tctx = tctx;
+     }
+
+     /* If OK to seek, we can determine the length of the last item */
+     if (prev_te && ctx->hasSeek)
+     {
+         pgoff_t        endpos;
+
+         if (fseeko(AH->FH, 0, SEEK_END) != 0)
+             exit_horribly(modulename, "error during file seek: %s\n",
+                           strerror(errno));
+         endpos = ftello(AH->FH);
+         if (endpos > prev_tctx->dataPos)
+             prev_te->dataLength = endpos - prev_tctx->dataPos;
+     }
+ }
+
+ /*
   * Clone format-specific fields during parallel restoration.
   */
  static void
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 4aabb40..cda90b9 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** static void _EndBlob(ArchiveHandle *AH,
*** 87,92 ****
--- 87,93 ----
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH);

+ static void _PrepParallelRestore(ArchiveHandle *AH);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);

*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 132,137 ****
--- 133,139 ----
      AH->EndBlobPtr = _EndBlob;
      AH->EndBlobsPtr = _EndBlobs;

+     AH->PrepParallelRestorePtr = _PrepParallelRestore;
      AH->ClonePtr = _Clone;
      AH->DeClonePtr = _DeClone;

*************** _ArchiveEntry(ArchiveHandle *AH, TocEntr
*** 240,252 ****
      char        fn[MAXPGPATH];

      tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
!     if (te->dataDumper)
      {
          snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
          tctx->filename = pg_strdup(fn);
      }
-     else if (strcmp(te->desc, "BLOBS") == 0)
-         tctx->filename = pg_strdup("blobs.toc");
      else
          tctx->filename = NULL;

--- 242,254 ----
      char        fn[MAXPGPATH];

      tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
!     if (strcmp(te->desc, "BLOBS") == 0)
!         tctx->filename = pg_strdup("blobs.toc");
!     else if (te->dataDumper)
      {
          snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId);
          tctx->filename = pg_strdup(fn);
      }
      else
          tctx->filename = NULL;

*************** setFilePath(ArchiveHandle *AH, char *buf
*** 727,732 ****
--- 729,796 ----
  }

  /*
+  * Prepare for parallel restore.
+  *
+  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+  * TOC entries' dataLength fields with appropriate values to guide the
+  * ordering of restore jobs.  The source of said data is format-dependent,
+  * as is the exact meaning of the values.
+  *
+  * A format module might also choose to do other setup here.
+  */
+ static void
+ _PrepParallelRestore(ArchiveHandle *AH)
+ {
+     TocEntry   *te;
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+         char        fname[MAXPGPATH];
+         struct stat st;
+
+         /*
+          * A dumpable object has set tctx->filename, any other object has not.
+          * (see _ArchiveEntry).
+          */
+         if (tctx->filename == NULL)
+             continue;
+
+         /* We may ignore items not due to be restored */
+         if ((te->reqs & REQ_DATA) == 0)
+             continue;
+
+         /*
+          * Stat the file and, if successful, put its size in dataLength.  When
+          * using compression, the physical file size might not be a very good
+          * guide to the amount of work involved in restoring the file, but we
+          * only need an approximate indicator of that.
+          */
+         setFilePath(AH, fname, tctx->filename);
+
+         if (stat(fname, &st) == 0)
+             te->dataLength = st.st_size;
+         else
+         {
+             /* It might be compressed */
+             strlcat(fname, ".gz", sizeof(fname));
+             if (stat(fname, &st) == 0)
+                 te->dataLength = st.st_size;
+         }
+
+         /*
+          * If this is the BLOBS entry, what we stat'd was blobs.toc, which
+          * most likely is a lot smaller than the actual blob data.  We don't
+          * have a cheap way to estimate how much smaller, but fortunately it
+          * doesn't matter too much as long as we get the blobs processed
+          * reasonably early.  Arbitrarily scale up by a factor of 1K.
+          */
+         if (strcmp(te->desc, "BLOBS") == 0)
+             te->dataLength *= 1024;
+     }
+ }
+
+ /*
   * Clone format-specific fields during parallel restoration.
   */
  static void
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f0ea83e..0687a81 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "catalog/pg_trigger_d.h"
  #include "catalog/pg_type_d.h"
  #include "libpq/libpq-fs.h"
+ #include "storage/block.h"

  #include "dumputils.h"
  #include "parallel.h"
*************** main(int argc, char **argv)
*** 845,854 ****
       */
      sortDumpableObjectsByTypeName(dobjs, numObjs);

-     /* If we do a parallel dump, we want the largest tables to go first */
-     if (archiveFormat == archDirectory && numWorkers > 1)
-         sortDataAndIndexObjectsBySize(dobjs, numObjs);
-
      sortDumpableObjects(dobjs, numObjs,
                          boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);

--- 846,851 ----
*************** dumpTableData(Archive *fout, TableDataIn
*** 2156,2168 ****
       * See comments for BuildArchiveDependencies.
       */
      if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
!         ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
!                      tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
!                      NULL, tbinfo->rolname,
!                      false, "TABLE DATA", SECTION_DATA,
!                      "", "", copyStmt,
!                      &(tbinfo->dobj.dumpId), 1,
!                      dumpFn, tdinfo);

      destroyPQExpBuffer(copyBuf);
      destroyPQExpBuffer(clistBuf);
--- 2153,2180 ----
       * See comments for BuildArchiveDependencies.
       */
      if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
!     {
!         TocEntry   *te;
!
!         te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
!                           tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
!                           NULL, tbinfo->rolname,
!                           false, "TABLE DATA", SECTION_DATA,
!                           "", "", copyStmt,
!                           &(tbinfo->dobj.dumpId), 1,
!                           dumpFn, tdinfo);
!
!         /*
!          * Set the TocEntry's dataLength in case we are doing a parallel dump
!          * and want to order dump jobs by table size.  We choose to measure
!          * dataLength in table pages during dump, so no scaling is needed.
!          * However, relpages is declared as "integer" in pg_class, and hence
!          * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
!          * Cast so that we get the right interpretation of table sizes
!          * exceeding INT_MAX pages.
!          */
!         te->dataLength = (BlockNumber) tbinfo->relpages;
!     }

      destroyPQExpBuffer(copyBuf);
      destroyPQExpBuffer(clistBuf);
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6759,6766 ****
                  i_conoid,
                  i_condef,
                  i_tablespace,
!                 i_indreloptions,
!                 i_relpages;
      int            ntups;

      for (i = 0; i < numTables; i++)
--- 6771,6777 ----
                  i_conoid,
                  i_condef,
                  i_tablespace,
!                 i_indreloptions;
      int            ntups;

      for (i = 0; i < numTables; i++)
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6807,6813 ****
                                "i.indnkeyatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6818,6824 ----
                                "i.indnkeyatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6844,6850 ****
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6855,6861 ----
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "i.indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6877,6883 ****
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6888,6894 ----
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6906,6912 ****
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6917,6923 ----
                                "i.indnatts AS indnkeyatts, "
                                "i.indnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6938,6944 ****
                                "t.relnatts AS indnkeyatts, "
                                "t.relnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, t.relpages, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
--- 6949,6955 ----
                                "t.relnatts AS indnkeyatts, "
                                "t.relnatts AS indnatts, "
                                "i.indkey, i.indisclustered, "
!                               "false AS indisreplident, "
                                "c.contype, c.conname, "
                                "c.condeferrable, c.condeferred, "
                                "c.tableoid AS contableoid, "
*************** getIndexes(Archive *fout, TableInfo tbli
*** 6974,6980 ****
          i_indkey = PQfnumber(res, "indkey");
          i_indisclustered = PQfnumber(res, "indisclustered");
          i_indisreplident = PQfnumber(res, "indisreplident");
-         i_relpages = PQfnumber(res, "relpages");
          i_contype = PQfnumber(res, "contype");
          i_conname = PQfnumber(res, "conname");
          i_condeferrable = PQfnumber(res, "condeferrable");
--- 6985,6990 ----
*************** getIndexes(Archive *fout, TableInfo tbli
*** 7013,7019 ****
              indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't');
              indxinfo[j].indisreplident = (PQgetvalue(res, j, i_indisreplident)[0] == 't');
              indxinfo[j].parentidx = atooid(PQgetvalue(res, j, i_parentidx));
-             indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages));
              contype = *(PQgetvalue(res, j, i_contype));

              if (contype == 'p' || contype == 'u' || contype == 'x')
--- 7023,7028 ----
*************** getTableAttrs(Archive *fout, TableInfo *
*** 8206,8211 ****
--- 8215,8221 ----
                                "'' AS attfdwoptions,\n");

          if (fout->remoteVersion >= 90100)
+         {
              /*
               * Since we only want to dump COLLATE clauses for attributes whose
               * collation is different from their type's default, we use a CASE
*************** getTableAttrs(Archive *fout, TableInfo *
*** 8214,8219 ****
--- 8224,8230 ----
              appendPQExpBuffer(q,
                                "CASE WHEN a.attcollation <> t.typcollation "
                                "THEN a.attcollation ELSE 0 END AS attcollation,\n");
+         }
          else
              appendPQExpBuffer(q,
                                "0 AS attcollation,\n");
*************** getTableAttrs(Archive *fout, TableInfo *
*** 8225,8232 ****
              appendPQExpBuffer(q,
                                "'' AS attoptions\n");

          appendPQExpBuffer(q,
-                           /* need left join here to not fail on dropped columns ... */
                            "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                            "ON a.atttypid = t.oid\n"
                            "WHERE a.attrelid = '%u'::pg_catalog.oid "
--- 8236,8243 ----
              appendPQExpBuffer(q,
                                "'' AS attoptions\n");

+         /* need left join here to not fail on dropped columns ... */
          appendPQExpBuffer(q,
                            "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                            "ON a.atttypid = t.oid\n"
                            "WHERE a.attrelid = '%u'::pg_catalog.oid "
*************** dumpDumpableObject(Archive *fout, Dumpab
*** 9772,9783 ****
              break;
          case DO_BLOB_DATA:
              if (dobj->dump & DUMP_COMPONENT_DATA)
!                 ArchiveEntry(fout, dobj->catId, dobj->dumpId,
!                              dobj->name, NULL, NULL, "",
!                              false, "BLOBS", SECTION_DATA,
!                              "", "", NULL,
!                              NULL, 0,
!                              dumpBlobs, NULL);
              break;
          case DO_POLICY:
              dumpPolicy(fout, (PolicyInfo *) dobj);
--- 9783,9813 ----
              break;
          case DO_BLOB_DATA:
              if (dobj->dump & DUMP_COMPONENT_DATA)
!             {
!                 TocEntry   *te;
!
!                 te = ArchiveEntry(fout, dobj->catId, dobj->dumpId,
!                                   dobj->name, NULL, NULL, "",
!                                   false, "BLOBS", SECTION_DATA,
!                                   "", "", NULL,
!                                   NULL, 0,
!                                   dumpBlobs, NULL);
!
!                 /*
!                  * Set the TocEntry's dataLength in case we are doing a
!                  * parallel dump and want to order dump jobs by table size.
!                  * (We need some size estimate for every TocEntry with a
!                  * DataDumper function.)  We don't currently have any cheap
!                  * way to estimate the size of blobs, but it doesn't matter;
!                  * let's just set the size to a large value so parallel dumps
!                  * will launch this job first.  If there's lots of blobs, we
!                  * win, and if there aren't, we don't lose much.  (If you want
!                  * to improve on this, really what you should be thinking
!                  * about is allowing blob dumping to be parallelized, not just
!                  * getting a smarter estimate for the single TOC entry.)
!                  */
!                 te->dataLength = MaxBlockNumber;
!             }
              break;
          case DO_POLICY:
              dumpPolicy(fout, (PolicyInfo *) dobj);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1448005..685ad78 100644
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
*************** typedef struct _indxInfo
*** 370,376 ****
      Oid            parentidx;        /* if partitioned, parent index OID */
      /* if there is an associated constraint object, its dumpId: */
      DumpId        indexconstraint;
-     int            relpages;        /* relpages of the underlying table */
  } IndxInfo;

  typedef struct _indexAttachInfo
--- 370,375 ----
*************** extern void parseOidArray(const char *st
*** 677,683 ****
  extern void sortDumpableObjects(DumpableObject **objs, int numObjs,
                      DumpId preBoundaryId, DumpId postBoundaryId);
  extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
- extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);

  /*
   * version specific routines
--- 676,681 ----
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 6227a8f..a1d3ced 100644
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
*************** static const char *modulename = gettext_
*** 35,44 ****
   * pg_dump.c; that is, PRE_DATA objects must sort before DO_PRE_DATA_BOUNDARY,
   * POST_DATA objects must sort after DO_POST_DATA_BOUNDARY, and DATA objects
   * must sort between them.
-  *
-  * Note: sortDataAndIndexObjectsBySize wants to have all DO_TABLE_DATA and
-  * DO_INDEX objects in contiguous chunks, so do not reuse the values for those
-  * for other object types.
   */
  static const int dbObjectTypePriority[] =
  {
--- 35,40 ----
*************** static void repairDependencyLoop(Dumpabl
*** 111,206 ****
  static void describeDumpableObject(DumpableObject *obj,
                         char *buf, int bufsize);

- static int    DOSizeCompare(const void *p1, const void *p2);
-
- static int
- findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
- {
-     int            i;
-
-     for (i = 0; i < numObjs; i++)
-         if (objs[i]->objType == type)
-             return i;
-     return -1;
- }
-
- static int
- findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
- {
-     int            i;
-
-     for (i = start; i < numObjs; i++)
-         if (objs[i]->objType != type)
-             return i;
-     return numObjs - 1;
- }
-
- /*
-  * When we do a parallel dump, we want to start with the largest items first.
-  *
-  * Say we have the objects in this order:
-  * ....DDDDD....III....
-  *
-  * with D = Table data, I = Index, . = other object
-  *
-  * This sorting function now takes each of the D or I blocks and sorts them
-  * according to their size.
-  */
- void
- sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
- {
-     int            startIdx,
-                 endIdx;
-     void       *startPtr;
-
-     if (numObjs <= 1)
-         return;
-
-     startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
-     if (startIdx >= 0)
-     {
-         endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
-         startPtr = objs + startIdx;
-         qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
-               DOSizeCompare);
-     }
-
-     startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
-     if (startIdx >= 0)
-     {
-         endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
-         startPtr = objs + startIdx;
-         qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
-               DOSizeCompare);
-     }
- }
-
- static int
- DOSizeCompare(const void *p1, const void *p2)
- {
-     DumpableObject *obj1 = *(DumpableObject **) p1;
-     DumpableObject *obj2 = *(DumpableObject **) p2;
-     int            obj1_size = 0;
-     int            obj2_size = 0;
-
-     if (obj1->objType == DO_TABLE_DATA)
-         obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
-     if (obj1->objType == DO_INDEX)
-         obj1_size = ((IndxInfo *) obj1)->relpages;
-
-     if (obj2->objType == DO_TABLE_DATA)
-         obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
-     if (obj2->objType == DO_INDEX)
-         obj2_size = ((IndxInfo *) obj2)->relpages;
-
-     /* we want to see the biggest item go first */
-     if (obj1_size > obj2_size)
-         return -1;
-     if (obj2_size > obj1_size)
-         return 1;
-
-     return 0;
- }

  /*
   * Sort the given objects into a type/name-based ordering
--- 107,112 ----

Re: pg_dump test instability

От
Peter Eisentraut
Дата:
On 13/09/2018 23:03, Tom Lane wrote:
> Attached updated patch does it like that and makes the cosmetic
> adjustments you suggested.   I also went ahead and did the renaming
> of par_prev/par_next/par_list_xxx that I'd suggested upthread.
> I think this is committable ...

Yes, this looks good to me.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Re: pg_dump test instability

От
Tom Lane
Дата:
Peter Eisentraut <peter.eisentraut@2ndquadrant.com> writes:
> On 13/09/2018 23:03, Tom Lane wrote:
>> Attached updated patch does it like that and makes the cosmetic
>> adjustments you suggested.   I also went ahead and did the renaming
>> of par_prev/par_next/par_list_xxx that I'd suggested upthread.
>> I think this is committable ...

> Yes, this looks good to me.

Pushed, thanks for reviewing.

            regards, tom lane