Re: Parallel grouping sets

Поиск
Список
Период
Сортировка
От Pengzhou Tang
Тема Re: Parallel grouping sets
Дата
Msg-id CAG4reAQ8rFCc+i0oju3VjaW7xSOJAkvLrqa4F-NYZzAG4SW7iQ@mail.gmail.com
обсуждение исходный текст
Ответ на Re: Parallel grouping sets  (Richard Guo <guofenglinux@gmail.com>)
Ответы Re: Parallel grouping sets  (Tomas Vondra <tomas.vondra@2ndquadrant.com>)
Список pgsql-hackers
Hi there,

We want to update our work on the parallel groupingsets, the 
attached
patchset implements parallel grouping sets with the strategy proposed in

It contains some refinement of our code and adds LLVM support. 
It also
contains a few patches refactoring the grouping sets code to make the
parallel grouping sets implementation cleaner.

Like simple parallel aggregate,  we separate the process of grouping 
sets
into two stages:

The partial stage: 
the partial stage is much the same as the current grouping sets
implementation, the differences are:
- In the partial stage, like in regular parallel aggregation, only partial
 aggregate results (e.g. transvalues) are produced.
- The output of the partial stage includes a grouping set ID to allow for
  disambiguation during the final stage

The optimizations of the existing grouping sets implementation are
preserved during the partial stage, like:
- Grouping sets that can be combined in one rollup are still grouped
  together (for group agg).
- Hashaggs can be performed concurrently with the first group agg.
- All hash transitions can be done in one expression state.
 
The final stage
In the final stage, the partial aggregate results are combined according to
the grouping set id. None of the optimizations of the partial stage can be
leveraged in the final stage. So all rollups are extracted and each rollup
contains only one grouping set, each aggregate phase processes a single
grouping set. In this stage, tuples are multiplexed into the different phases
according to the grouping set id before we actually aggregate it. 

An alternative approach to the final stage implementation that we considered
was using a single AGG with grouping clause: gsetid + all grouping columns.
In the end, we decided against it because it doesn't support mixed aggregation,
firstly, once the grouping columns are a mix of unsortable and unhashable
columns, it cannot produce a path in the final stage, secondly, mixed aggregation
is the cheapest path in some cases and this way can not support it. Meanwhile,
if the union of all the grouping columns is large, this parallel implementation will
incur undue costs.


The patches included in this patchset are as follows:

0001-All-grouping-sets-do-their-own-sorting.patch

This is a refactoring patch for the existing code. It moves the phase 0 SORT
into the AGG instead of assuming that the input is already sorted.

Postgres used to add a SORT path explicitly beneath the AGG for sort group
aggregate. Grouping sets path also adds a SORT path for the first sort
aggregate phase but the following sort aggregate phases do their own sorting
using a tuplesort. This commit unifies the way grouping sets paths do sorting, 
all sort aggregate phases now do their own sorting using tuplesort.

We did this refactoring to support the final stage of parallel grouping sets.
Adding a SORT path underneath the AGG in the final stage is wasteful. With
this patch, all non-hashed aggregate phases can do their own sorting after
the tuples are redirected.

Unpatched:
tpch=# explain (costs off) select count(*) from customer group by grouping sets (c_custkey, c_name);
            QUERY PLAN
----------------------------------
 GroupAggregate
   Group Key: c_custkey
   Sort Key: c_name
     Group Key: c_name
   ->  Sort
         Sort Key: c_custkey
         ->  Seq Scan on customer

Patched:
tpch=# explain (costs off) select count(*) from customer group by grouping sets (c_custkey, c_name);
         QUERY PLAN
----------------------------
 GroupAggregate
   Sort Key: c_custkey
     Group Key: c_custkey
   Sort Key: c_name
     Group Key: c_name
   ->  Seq Scan on customer


0002-fix-a-numtrans-bug.patch

Bugfix for the additional size of the hash table for hash aggregate, the 
additional size is always zero.

0003-Reorganise-the-aggregate-phases.patch

Planner used to organize the grouping sets in [HASHED]->[SORTED] order.
HASHED aggregates were always located before SORTED aggregate. And
ExecInitAgg() organized the aggregate phases in [HASHED]->[SORTED] order.
All HASHED grouping sets are squeezed into phase 0 when executing the
AGG node. For AGG_HASHED or AGG_MIXED strategies, however, the executor
will start from executing phase 1-3 assuming they are all groupaggs and then
return to phase 0 to execute hashaggs if it is AGG_MIXED.

When adding support for parallel grouping sets, this was a big barrier.
Firstly, we needed complicated logic to locate the first sort rollup/phase and
handle the special order for a differentstrategy in many places.

Secondly, squeezing all hashed grouping sets to phase 0 doesn't work for the
final stage. We can't put all transition functions into one expression state in the
final stage. ExecEvalExpr() is optimized to evaluate all the hashed grouping
sets for the same tuple, however, each input to the final stage is a trans value,
so you inherently should not evaluate more than one grouping set for the same input.

This commit organizes the grouping sets in a more natural way:
[SORTED]->[HASHED].

The executor now starts execution from phase 0 for all strategies, the HASHED
sets are no longer squeezed into a single phase. Instead, a HASHED set has its
own phase and we use other ways to put all hash transitions in one expression
state for the partial stage. 

This commit also moves 'sort_in' from the AggState to the AggStatePerPhase*
structure, this helps to handle more complicated cases necessitated by the
introduction of parallel grouping sets. For example, we might need to add a
tuplestore 'store_in' to store partial aggregates results for PLAIN sets then. 

It also gives us a chance to keep the first TupleSortState, so we do not do a resort
when rescanning.

0004-Parallel-grouping-sets.patch

This is the main logic. Patch 0001 and 0003 allow it to be pretty simple.

Here is an example plan with the patch applied:
tpch=# explain (costs off) select sum(l_quantity) as sum_qty, count(*) as count_order from lineitem group by grouping sets((l_returnflag, l_linestatus), (), l_suppkey);
                     QUERY PLAN
----------------------------------------------------
 Finalize MixedAggregate
   Filtered by: (GROUPINGSETID())
   Sort Key: l_suppkey
     Group Key: l_suppkey
   Group Key: ()
   Hash Key: l_returnflag, l_linestatus
   ->  Gather
         Workers Planned: 7
         ->  Partial MixedAggregate
               Sort Key: l_suppkey
                 Group Key: l_suppkey
                 Group Key: ()
               Hash Key: l_returnflag, l_linestatus
               ->  Parallel Seq Scan on lineitem
(14 rows)

We have done some performance tests as well using a groupingsets-enhanced
subset of TPCH. TPCH didn't contain grouping sets queries, so we changed all
"group by" clauses to "group by rollup" clauses. We chose 14 queries the test.

We noticed no performance regressions. 3 queries showed performance improvements
due to parallelism: (tpch scale is 10 and max_parallel_workers_per_gather is 8)

1.sql: 16150.780 ms vs 116093.550 ms
13.sql: 5288.635 ms vs 19541.981 ms
18.sql: 52985.084 ms vs 67980.856 ms

Thanks,
Pengzhou & 
Melanie & Jesse

Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Bruce Momjian
Дата:
Сообщение: Re: RETURNING does not explain evaluation context for subqueries
Следующее
От: Pengzhou Tang
Дата:
Сообщение: Re: Additional size of hash table is alway zero for hash aggregates