Re: [HACKERS] WIP: [[Parallel] Shared] Hash

Поиск
Список
Период
Сортировка
От Thomas Munro
Тема Re: [HACKERS] WIP: [[Parallel] Shared] Hash
Дата
Msg-id CAEepm=09rrc0YNJYjgFVdOR67g1qqtmfPf5tWYK9+_n6LyQjbA@mail.gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] WIP: [[Parallel] Shared] Hash  (Andres Freund <andres@anarazel.de>)
Ответы Re: [HACKERS] WIP: [[Parallel] Shared] Hash  (Thomas Munro <thomas.munro@enterprisedb.com>)
Список pgsql-hackers
On Thu, Feb 16, 2017 at 3:36 PM, Andres Freund <andres@anarazel.de> wrote:
> Hi,

Thanks for the review!

> FWIW, I'd appreciate if you'd added a short commit message to the
> individual patches - I find it helpful to have a littlebit more context
> while looking at them than just the titles.  Alternatively you can
> include that text when re-posting the series, but it's imo just as easy
> to have a short commit message (and just use format-patch).
>
> I'm for now using [1] as context.

Ok, will do.

> 0002-hj-add-dtrace-probes-v5.patch
>
> Hm. I'm personally very unenthusiastic about addming more of these, and
> would rather rip all of them out.  I tend to believe that static
> problems simply aren't a good approach for anything requiring a lot of
> detail.  But whatever.

Ok, I will get rid of these.  Apparently you aren't the only committer
who hates these.  (I have some other thoughts on that but will save
them for another time.)

> 0003-hj-refactor-memory-accounting-v5.patch
> @@ -424,15 +422,29 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
>         if (ntuples <= 0.0)
>                 ntuples = 1000.0;
>
> -       /*
> -        * Estimate tupsize based on footprint of tuple in hashtable... note this
> -        * does not allow for any palloc overhead.  The manipulations of spaceUsed
> -        * don't count palloc overhead either.
> -        */
> +       /* Estimate tupsize based on footprint of tuple in hashtable. */
>
> palloc overhead is still unaccounted for, no? In the chunked case that
> might not be much, I realize that (so that comment should probably have
> been updated when chunking was introduced).

Yeah, it seemed like an obsolete comment, but I'll put it back as that
isn't relevant to this patch.

> -       Size            spaceUsed;              /* memory space currently used by tuples */
> +       Size            spaceUsed;              /* memory space currently used by hashtable */
>
> It's not really the full hashtable, is it? The ->buckets array appears
> to still be unaccounted for.

It is actually the full hash table, and that is a change in this
patch.  See ExecHashTableCreate and ExecHashTableReset where is it set
to nbuckets * sizeof(HashJoinTuple), so that at all times it holds the
total size of buckets + all chunks.  Unlike the code in master, where
it's just the sum of all tuples while building, but then the bucket
space is added at the end in MultiExecHash.

> Looks ok.

Thanks!

> 0004-hj-refactor-batch-increases-v5.patch
>
> @@ -1693,10 +1689,12 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
>  }
>
>  /*
> - * Allocate 'size' bytes from the currently active HashMemoryChunk
> + * Allocate 'size' bytes from the currently active HashMemoryChunk.  If
> + * 'respect_work_mem' is true, this may cause the number of batches to be
> + * increased in an attempt to shrink the hash table.
>   */
>  static void *
> -dense_alloc(HashJoinTable hashtable, Size size)
> +dense_alloc(HashJoinTable hashtable, Size size, bool respect_work_mem)
>
> {
>         HashMemoryChunk newChunk;
>         char       *ptr;
> @@ -1710,6 +1708,15 @@ dense_alloc(HashJoinTable hashtable, Size size)
>          */
>         if (size > HASH_CHUNK_THRESHOLD)
>         {
> +               if (respect_work_mem &&
> +                       hashtable->growEnabled &&
> +                       hashtable->spaceUsed + HASH_CHUNK_HEADER_SIZE + size >
> +                       hashtable->spaceAllowed)
> +               {
> +                       /* work_mem would be exceeded: try to shrink hash table */
> +                       ExecHashIncreaseNumBatches(hashtable);
> +               }
> +
>
> Isn't it kinda weird to do this from within dense_alloc()?  I mean that
> dumps a lot of data to disk, frees a bunch of memory and so on - not
> exactly what "dense_alloc" implies.

Hmm.  Yeah I guess that is a bit weird.  My problem was that in the
shared case (later patch), when you call this function it has a fast
path and a slow path: the fast path just hands out more space from the
existing chunk, but the slow path acquires an LWLock and makes space
management decisions which have to be done sort of "atomically".  In
an earlier version I toyed with the idea of making dense_alloc return
NULL if you said respect_work_mem = true and it determined that you
need to increase the number of batches or go help other workers who
have already started doing so.  Then the batch increase stuff was not
in here, but callers who say respect_work_mem = true (the build and
reload loops) had to be prepared to loop and shrink if they get NULL,
or some wrapper function needs to do that them.  I will try it that
way again.

>  Isn't the free()ing part also
> dangerous, because the caller might actually use some of that memory,
> like e.g. in ExecHashRemoveNextSkewBucket() or such.  I haven't looked
> deeply enough to check whether that's an active bug, but it seems like
> inviting one if not.

I'm not sure if I get what you mean here.
ExecHashRemoveNextSkewBucket calls dense_alloc with respect_work_mem =
false, so it's not going to enter that path.

> 0005-hj-refactor-unmatched-v5.patch
>
> I'm a bit confused as to why unmatched tuple scan is a good parallelism
> target, but I might see later...

Macroscopically because any time we can spread the resulting tuples
over all participants, we enable parallelism in all executor nodes
above this one in the plan.  Suppose I made one worker do the
unmatched scan while the others twiddled their thumbs; now some other
join above me finishes up with potentially many tuples all in one
process while the rest do nothing.

Microscopically because we may be spinning through 1GB of memory
testing these bits, and the way that it is coded in master will do
that in random order whereas this way will be in sequential order,
globally and within each participant.  (You could stuff the matched
bits all up one end of each chunk, so that they'd all fit in a
cacheline...  but not suggesting that or any other micro-optimisation
for the sake of it: the main reason is the macroscopic one.)

> 0006-hj-barrier-v5.patch
>
> Skipping that here.
>
>
> 0007-hj-exec-detach-node-v5.patch
>
> Hm. You write elsewhere:
>> By the time ExecEndNode() runs in workers, ExecShutdownNode() has
>> already run.  That's done on purpose because, for example, the hash
>> table needs to survive longer than the parallel environment to allow
>> EXPLAIN to peek at it.  But it means that the Gather node has thrown
>> out the shared memory before any parallel-aware node below it gets to
>> run its Shutdown and End methods.  So I invented ExecDetachNode()
>> which runs before ExecShutdownNode(), giving parallel-aware nodes a
>> chance to say goodbye before their shared memory vanishes.  Better
>> ideas?
>
> To me that is a weakness in the ExecShutdownNode() API - imo child nodes
> should get the chance to shutdown before the upper-level node.
> ExecInitNode/ExecEndNode etc give individual nodes the freedom to do
> things in the right order, but ExecShutdownNode() doesn't.  I don't
> quite see why we'd want to invent a separate ExecDetachNode() that'd be
> called immediately before ExecShutdownNode().

Hmm.  Yes that makes sense, I think.

> An easy way to change that would be to return in the
> ExecShutdownNode()'s T_GatherState case, and delegate the responsibility
> of calling it on Gather's children to ExecShutdownGather().

That might work for the leader but maybe not for workers (?)

> Alternatively we could make it a full-blown thing like ExecInitNode()
> that every node needs to implement, but that seems a bit painful.
>
> Or have I missed something here?

Let me try a couple of ideas and get back to you.

> Random aside: Wondered before if having to provide all executor
> callbacks is a weakness of our executor integration, and whether it
> shouldn't be a struct of callbacks instead...
>
>
> 0008-hj-shared-single-batch-v5.patch
>
> First-off: I wonder if we should get the HASHPATH_TABLE_SHARED_SERIAL
> path committed first. ISTM that's already quite beneficial, and there's
> a good chunk of problems that we could push out initially.

The reason I don't think we can do that is because single-batch hash
joins can turn into multi-batch hash joins at execution time, unless
you're prepared to use unbounded memory in rare cases.  I don't think
that's acceptable.  I had the single batch shared hash code working
reasonably well early on, and then came to understand that it couldn't
really be committed without the full enchilada, because melting your
server is not a reasonable thing to do if the estimates are off.  Then
I spent a really long time battling with the multi-batch case to get
here!

> This desperately needs tests.

Will add.

> Have you measured whether the new branches in nodeHash[join] slow down
> non-parallel executions?  I do wonder if it'd not be better to have to
> put the common code in helper functions and have seperate
> T_SharedHashJoin/T_SharedHash types.  If you both have a parallel and
> non-parallel hash in the same query, the branches will be hard to
> predict...

Huh.  That is an interesting thought.  Will look into that.

> I think the synchronization protocol with the various phases needs to be
> documented somewhere.  Probably in nodeHashjoin.c's header.

Will do.

> The state machine code in MultiExecHash() also needs more
> comments. Including the fact that avoiding repeating work is done by
> "electing" leaders via BarrierWait().

Ok.

> I wonder if it wouldn't be better to inline the necessary code into the
> switch (with fall-throughs), instead of those gotos; putting some of the
> relevant code (particularly the scanning of the child node) into
> seperate functions.

Right, this comes from a desire to keep the real code common for
private and shared hash tables.  I will look into other ways to
structure it.

> + build:
> +       if (HashJoinTableIsShared(hashtable))
> +       {
> +               /* Make sure our local state is up-to-date so we can build. */
> +               Assert(BarrierPhase(barrier) == PHJ_PHASE_BUILDING);
> +               ExecHashUpdate(hashtable);
> +       }
> +
>         /*
>          * set expression context
>          */
> @@ -128,18 +197,78 @@ MultiExecHash(HashState *node)
>
> Why's is the parallel code before variable initialization stuff like
> setting up econtext?

Will move.

>> Introduces hash joins with "Shared Hash" and "Parallel Shared Hash"
>> nodes, for single-batch joins only.
>
> We don't necessarily know that ahead of time.  So this isn't something
> that we could actually apply separately, right?

Indeed, as mentioned above.

>         /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
> -       if (hashtable->nbuckets != hashtable->nbuckets_optimal)
> -               ExecHashIncreaseNumBuckets(hashtable);
> +       ExecHashUpdate(hashtable);
> +       ExecHashIncreaseNumBuckets(hashtable);
>
> It's kinda weird that we had the nearly redundant nbuckets !=
> nbuckets_optimal checks before...

+1

> +static void *
> +dense_alloc_shared(HashJoinTable hashtable,
> +                                  Size size,
> +                                  dsa_pointer *shared)
>
> Hm. I wonder if HASH_CHUNK_SIZE being only 32kb isn't going to be a
> bottlenck here.

Yeah, I should benchmark some different sizes.

> @@ -195,6 +238,40 @@ ExecHashJoin(HashJoinState *node)
>                                 if (TupIsNull(outerTupleSlot))
>                                 {
>                                         /* end of batch, or maybe whole join */
> +
> +                                       if (HashJoinTableIsShared(hashtable))
> +                                       {
> +                                               /*
> +                                                * An important optimization: if this is a
> +                                                * single-batch join and not an outer join, there is
> +                                                * no reason to synchronize again when we've finished
> +                                                * probing.
> +                                                */
> +                                               Assert(BarrierPhase(&hashtable->shared->barrier) ==
> +                                                          PHJ_PHASE_PROBING);
> +                                               if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node))
> +                                                       return NULL;    /* end of join */
> +
>
> I think it's a bit weird that the parallel path now has an exit path
> that the non-parallel path doesn't have.

Indeed, but I think it's fairly clearly explained?  Do you think there
is something unsafe about exiting in that state?

> +        * If this is a shared hash table, there is a extra charge for inserting
> +        * each tuple into the shared hash table to cover memory synchronization
> +        * overhead, compared to a private hash table.  There is no extra charge
> +        * for probing the hash table for outer path row, on the basis that
> +        * read-only access to a shared hash table shouldn't be any more
> +        * expensive.
> +        *
> +        * cpu_shared_tuple_cost acts a tie-breaker controlling whether we prefer
> +        * HASHPATH_TABLE_PRIVATE or HASHPATH_TABLE_SHARED_SERIAL plans when the
> +        * hash table fits in work_mem, since the cost is otherwise the same.  If
> +        * it is positive, then we'll prefer private hash tables, even though that
> +        * means that we'll be running N copies of the inner plan.  Running N
> +        * copies of the copies of the inner plan in parallel is not considered
> +        * more expensive than running 1 copy of the inner plan while N-1
> +        * participants do nothing, despite doing less work in total.
> +        */
> +       if (table_type != HASHPATH_TABLE_PRIVATE)
> +               startup_cost += cpu_shared_tuple_cost * inner_path_rows;
> +
> +       /*
> +        * If this is a parallel shared hash table, then the value we have for
> +        * inner_rows refers only to the rows returned by each participant.  For
> +        * shared hash table size estimation, we need the total number, so we need
> +        * to undo the division.
> +        */
> +       if (table_type == HASHPATH_TABLE_SHARED_PARALLEL)
> +               inner_path_rows_total *= get_parallel_divisor(inner_path);
> +
> +       /*
>
> Is the per-tuple cost really the same for HASHPATH_TABLE_SHARED_SERIAL
> and PARALLEL?

I *guess* the real cost for insertion depends on hard-to-estimate
things like collision probability (many tuples into same bucket, also
false sharing on same cacheline).  I think the dynamic partitioning
based parallel hash join systems would use the histogram to deal with
balancing for their more course-grained disjoint version of this
problem, but that seemed like overkill for this.  I just added a
simple GUC cpu_shared_tuple_cost to model the cost for inserting,
primarily as a tie-breaker so that we'd prefer private hash tables to
shared ones, unless shared ones allow us to avoid batching or enable
parallel build.

Let me try to measure the difference in insertion speeds with a few
interesting key distributions and get back to you.

> Don't we also need to somehow account for the more expensive hash-probes
> in the HASHPATH_TABLE_SHARED_* cases? Seems quite possible that we'll
> otherwise tend to use shared tables for small hashed tables that are
> looked up very frequently, even though a private one will likely be
> faster.

Hmm.  I don't expect hash probes to be more expensive.  Why should
they be: DSA address decoding?  I will try to measure that too.

With the costing as I have it, we should use private tables for small
relations unless there is a partial plan available.  If there is a
partial plan it usually looks better because it gets to divide the
whole shemozzle by 2, 3, 8 or whatever.  To avoid using shared tables
for small cheap to build tables even if there is a partial plan
available I think we might need an extra cost term which estimates the
number of times we expect to have to wait for peers, and how long you
might have to wait.

The simple version might be a GUC "synchronization_cost", which is the
cost per anticipated barrier wait.  In a typical single batch inner
join we could charge one of those (for the wait between building and
probing), and for a single batch outer join we could charge two (you
also have to wait to begin the outer scan).  Then, if the subplan
looks really expensive (say a big scan with a lot of filtering), we'll
still go for the partial plan so we can divide the cost by P and we'll
come out ahead even though we have to pay one synchronisation cost,
but if it looks cheap (seq scan of tiny table) we won't bother with a
partial plan because the synchronisation cost wouldn't pay for itself.
Add more for extra batches.

But... that's a bit bogus, because the real cost isn't really some
kind of fixed "synchronisation" per se; it's how long you think it'll
take between the moment the average participant finishes building (ie
runs out of tuples to insert) and the moment the last participant
finishes.  That comes down to the granularity of parallelism and the
cost per tuple.  For example, parallel index scans and parallel
sequential scans read whole pages at a time; so at some point you hit
the end of the supply of tuples, but one of your peers might have up
to one whole page worth to process, so however long that takes, that's
how long you'll have to wait for that guy to be finished and reach the
barrier.  That's quite tricky to estimate, unless you have a way to
ask a child path "how many times to do I have to execute you to pull
one 'granule' of data from your ultimate tuple source", and multiple
that by the path's total cost / path's estimated rows, and then (say)
guesstimate that on average you'll be twiddling your thumbs for half
that many cost units.  Or some better maths, but that sort of thing.

Thoughts?

(I suppose a partition-wise join as subplan of a Hash node might
introduce an extreme case of course granularity if it allows
participants to process whole join partitions on their own, so that a
barrier wait at end-of-hash-table-build might leave everyone waiting
24 hours for one peer to finish pulling tuples from the final join
partition in its subplan...?!)

>
> +       /*
> +        * Set the table as sharable if appropriate, with parallel or serial
> +        * building.  If parallel, the executor will also need an estimate of the
> +        * total number of rows expected from all participants.
> +        */
>
> Oh. I was about to comment that sharable is wrong, just to discover it's
> valid in NA. Weird.

It does look pretty weird now that you mention it!  I'll change it,
because "shareable prevails by a 2:1 margin in American texts"
according to http://grammarist.com/spelling/sharable-shareable/ , or
maybe I'll change it to "shared".

>
> @@ -2096,6 +2096,7 @@ create_mergejoin_path(PlannerInfo *root,
>   * 'required_outer' is the set of required outer rels
>   * 'hashclauses' are the RestrictInfo nodes to use as hash clauses
>   *             (this should be a subset of the restrict_clauses list)
> + * 'table_type' to select [[Parallel] Shared] Hash
>   */
>  HashPath *
>  create_hashjoin_path(PlannerInfo *root,
>
> Reminds me that you're not denoting the Parallel bit in explain right
> now - intentionally so?

Yes I am... here are the three cases:

Hash Join-> [... some parallel-safe plan ...]-> Hash  -> [... some parallel-safe plan ...]

Parallel Hash Join-> [... some partial plan ...]-> Shared Hash  -> [... some parallel-safe plan ...]

Parallel Hash Join-> [... some partial plan ...]-> Parallel Shared Hash  -> [... some partial plan ...]

Make sense?

>  /*
> - * To reduce palloc overhead, the HashJoinTuples for the current batch are
> - * packed in 32kB buffers instead of pallocing each tuple individually.
> + * To reduce palloc/dsa_allocate overhead, the HashJoinTuples for the current
> + * batch are packed in 32kB buffers instead of pallocing each tuple
> + * individually.
>
> s/palloc\/dsa_allocate/allocator/?

Ok.

> @@ -112,8 +121,12 @@ typedef struct HashMemoryChunkData
>         size_t          maxlen;                 /* size of the buffer holding the tuples */
>         size_t          used;                   /* number of buffer bytes already used */
>
> -       struct HashMemoryChunkData *next;       /* pointer to the next chunk (linked
> -                                                                                * list) */
> +       /* pointer to the next chunk (linked list) */
> +       union
> +       {
> +               dsa_pointer shared;
> +               struct HashMemoryChunkData *unshared;
> +       } next;
>
> This'll increase memory usage on some platforms, e.g. when using
> spinlock backed atomics.  I tend to think that that's fine, but it's
> probably worth calling out.

In the code quoted above it won't because that's a plain dsa_pointer,
not an atomic one.  But yeah you're right about HashJoinBucketHead.  I
will note with comments.

If I'm looking at the right column of
https://wiki.postgresql.org/wiki/Atomics then concretely we're talking
about 80386 (not the more general i386 architecture but the specific
dead chip), ARM v5, PA-RISC and SparcV8 (and presumably you'd only
bother turning on parallel query if you had an SMP configuration), so
it's a technicality to consider but as long as it compiles and
produces the right answer on those machines I assume it's OK, right?
(Postgres 4.2 also supported parallel hash joins on Sequent 80386 SMP
systems and put a spinlock into each bucket so anyone upgrading their
Sequent system directly from Postgres 4.2 to a theoretical future
PostgreSQL version with this patch will hopefully not consider this to
be a regression.)

On the other hand, I could get rid of the union for each bucket slot
and instead have a union that points to the first bucket, so that such
systems don't have to pay for the wider buckets-with-spinlocks even
when using private hash tables.  Will look into that.

Actually I was meaning to ask you something about this: is it OK to
memset all the bucket heads to zero when clearing the hash table or do
I have to loop over them and pg_atomic_write_XXX(&x, 0) to avoid
upsetting the emulated atomic state into a bad state?  If that memset
is not safe on emulated-atomics systems then I guess I should probably
consider macros to select between a loop or memset depending on the
implementation.

>
> --- a/src/include/pgstat.h
> +++ b/src/include/pgstat.h
> @@ -787,7 +787,15 @@ typedef enum
>         WAIT_EVENT_MQ_SEND,
>         WAIT_EVENT_PARALLEL_FINISH,
>         WAIT_EVENT_SAFE_SNAPSHOT,
> -       WAIT_EVENT_SYNC_REP
> +       WAIT_EVENT_SYNC_REP,
> +       WAIT_EVENT_HASH_BEGINNING,
> +       WAIT_EVENT_HASH_CREATING,
> +       WAIT_EVENT_HASH_BUILDING,
> +       WAIT_EVENT_HASH_RESIZING,
> +       WAIT_EVENT_HASH_REINSERTING,
> +       WAIT_EVENT_HASH_UNMATCHED,
> +       WAIT_EVENT_HASHJOIN_PROBING,
> +       WAIT_EVENT_HASHJOIN_REWINDING
>  } WaitEventIPC;
>
> Hm. That seems a bit on the detailed side - if we're going that way it
> seems likely that we'll end up with hundreds of wait events. I don't
> think gradually evolving wait events into something like a query
> progress framework is a good idea.

I thought the idea was to label each wait point in the source so that
an expert can see exactly why we're waiting.

> That's it for now...

Thanks!  Plenty for me to go away and think about.  I will post a new
version soon.

-- 
Thomas Munro
http://www.enterprisedb.com



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Rahila Syed
Дата:
Сообщение: Re: [HACKERS] Parallel Index-only scan
Следующее
От: Alvaro Herrera
Дата:
Сообщение: Re: [HACKERS] ICU integration