Обсуждение: BUG #15869: Custom aggregation returns null when parallelized

Поиск
Список
Период
Сортировка

BUG #15869: Custom aggregation returns null when parallelized

От
PG Bug reporting form
Дата:
The following bug has been logged on the website:

Bug reference:      15869
Logged by:          Kassym Dorsel
Email address:      kdorsel@gmail.com
PostgreSQL version: 11.4
Operating system:   Windows 10 / Ubuntu 18
Description:

I have a custom aggregate setup. It run and returns the correct results when
run sequentially. Once I set parallel = safe it returns null values. I have
tested and gotten the same results on Windows 10 / postgres 11.4 and Ubuntu
18 / postgres 11.3.

Here's the aggregate:
CREATE OR REPLACE FUNCTION array_sort(ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$
SELECT ARRAY(SELECT unnest($1) ORDER BY 1)
$$;

create type _stats_agg_accum_type AS (
    cnt bigint,
    q double precision[],
    n double precision[],
    np  double precision[],
    dn  double precision[]
);

create type _stats_agg_result_type AS (
    count bigint,
    q25 double precision,
    q50 double precision,
    q75 double precision
);

create or replace function _stats_agg_p2_parabolic(_stats_agg_accum_type,
double precision, double precision)
returns double precision AS '
DECLARE
    a alias for $1;
    i alias for $2;
    d alias for $3;
BEGIN
    RETURN a.q[i] + d / (a.n[i + 1] - a.n[i - 1]) * ((a.n[i] - a.n[i - 1] +
d) * (a.q[i + 1] - a.q[i]) / (a.n[i + 1] - a.n[i]) + (a.n[i + 1] - a.n[i] -
d) * (a.q[i] - a.q[i - 1]) / (a.n[i] - a.n[i - 1]));
END;
'
language plpgsql;

create or replace function _stats_agg_p2_linear(_stats_agg_accum_type,
double precision, double precision)
returns double precision AS '
DECLARE
    a alias for $1;
    i alias for $2;
    d alias for $3;
BEGIN
    return a.q[i] + d * (a.q[i + d] - a.q[i]) / (a.n[i + d] - a.n[i]);
END;
'
language plpgsql;

create or replace function _stats_agg_accumulator(_stats_agg_accum_type,
double precision)
returns _stats_agg_accum_type AS '
DECLARE
    a ALIAS FOR $1;
    x alias for $2;
    k int;
    d double precision;
    qp double precision;
BEGIN
    a.cnt = a.cnt + 1;

    if a.cnt <= 5 then
        a.q = array_append(a.q, x);
        if a.cnt = 5 then
            a.q = array_sort(a.q);
        end if;
        return a;
    end if;

    case
        when x < a.q[1] then
            a.q[1] = x;
            k = 1;
        when x >= a.q[1] and x < a.q[2] then
            k = 1;
        when x >= a.q[2] and x < a.q[3] then
            k = 2;
        when x >= a.q[3] and x < a.q[4] then
            k = 3;
        when x >= a.q[4] and x <= a.q[5] then
            k = 4;
        when x > a.q[5] then
            a.q[5] = x;
            k = 4;
    end case;

    for ii in 1..5 loop
        if ii > k then
            a.n[ii] = a.n[ii] + 1;
        end if;
        a.np[ii] = a.np[ii] + a.dn[ii];
    end loop;

    for ii in 2..4 loop
        d = a.np[ii] - a.n[ii];
        if (d >= 1 and a.n[ii+1] - a.n[ii] > 1) or (d <= -1 and a.n[ii-1] -
a.n[ii] < -1) then
            d = sign(d);
            qp = _stats_agg_p2_parabolic(a, ii, d);
            if qp > a.q[ii-1] and qp < a.q[ii+1] then
                a.q[ii] = qp;
            else
                a.q[ii] = _stats_agg_p2_linear(a, ii, d);
            end if;
            a.n[ii] = a.n[ii] + d;
        end if;
    end loop;

    return a;
END;
'
language plpgsql;

create or replace function _stats_agg_combiner(_stats_agg_accum_type,
_stats_agg_accum_type)
returns _stats_agg_accum_type AS '
DECLARE
    a alias for $1;
    b alias for $2;
    c _stats_agg_accum_type;
BEGIN
    c.cnt = a.cnt + b.cnt;
    c.q[2] = (a.q[2] + b.q[2]) / 2;
    c.q[3] = (a.q[3] + b.q[3]) / 2;
    c.q[4] = (a.q[4] + b.q[4]) / 2;
    RETURN c;
END;
'
strict language plpgsql;

create or replace function _stats_agg_finalizer(_stats_agg_accum_type)
returns _stats_agg_result_type AS '
BEGIN
    RETURN row(
        $1.cnt,
        $1.q[2],
        $1.q[3],
        $1.q[4]
    );
END;
'
language plpgsql;

create aggregate stats_agg(double precision) (
    sfunc = _stats_agg_accumulator,
    stype = _stats_agg_accum_type,
    finalfunc = _stats_agg_finalizer,
    combinefunc = _stats_agg_combiner,
    --parallel = safe,
    initcond = '(0, {}, "{1,2,3,4,5}", "{1,2,3,4,5}",
"{0,0.25,0.5,0.75,1}")'
);


Here's the setup code:
--CREATE TABLE temp (val double precision);
--insert into temp (val) select i from generate_series(0, 150000) as t(i);
--set force_parallel_mode = on;
select (stats_agg(val)).* from temp;


Expected results:
150001, 37500, 75000, 112500

Results when run in parallel:
150001, null, null, null


Re: BUG #15869: Custom aggregation returns null when parallelized

От
David Rowley
Дата:
On Mon, 24 Jun 2019 at 03:32, PG Bug reporting form
<noreply@postgresql.org> wrote:
> Here's the setup code:
> --CREATE TABLE temp (val double precision);
> --insert into temp (val) select i from generate_series(0, 150000) as t(i);
> --set force_parallel_mode = on;
> select (stats_agg(val)).* from temp;

I don't think force_parallel_mode does what you think it does. It just
adds a Gather node to the top of the plan, if the plan is deemed
parallel safe. It's not going to force your aggregate to be
parallelised.

You might coax the planner into generating a parallel aggregate plan
by setting parallel_tuple_cost and parallel_setup_cost both to 0.

> Expected results:
> 150001, 37500, 75000, 112500
>
> Results when run in parallel:
> 150001, null, null, null

Are you actually getting a partial and finalize aggregate node with
that? Can you show the EXPLAIN output of each?

You might also want to double check your combine function. It does not
look like it's very well coded to handle NULL values for arrays that
have yet to receive their fill of 5 elements.

-- 
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: BUG #15869: Custom aggregation returns null when parallelized

От
Kassym Dorsel
Дата:
Right, adding the Gather node makes it use the combine func and this is where the problem is.

You're right on handling of null values in my combine function. Since this was being run on a table with 150k rows, I had assumed that the contents of my aggregate types would never be null/empty.

Thinking about it, it would make sense to receive an aggregate type with count = 0 or null iff there is 1 worker (1 result to combine the other being null/empty). When there are 2 or more workers I would assume that rows would be relatively evenly split and the return of my aggregate type would be filled given the 150k rows. I tried with 1,2,3,4 workers (ALTER TABLE temp SET (parallel_workers = 1,2,3,4);) and got the same null results before adding support for null values. 

Is this expected behavior when number of workers is >=2? An explicit paragraph in parallel aggregates documentation outlining null support in combine func might be helpful.

Regardless, adding support for null/empty values has fixed my problem and now the aggregate correctly works in parallel queries. Many thanks.

Best,
Kassym


On Sun, Jun 23, 2019 at 10:51 PM David Rowley <david.rowley@2ndquadrant.com> wrote:
On Mon, 24 Jun 2019 at 03:32, PG Bug reporting form
<noreply@postgresql.org> wrote:
> Here's the setup code:
> --CREATE TABLE temp (val double precision);
> --insert into temp (val) select i from generate_series(0, 150000) as t(i);
> --set force_parallel_mode = on;
> select (stats_agg(val)).* from temp;

I don't think force_parallel_mode does what you think it does. It just
adds a Gather node to the top of the plan, if the plan is deemed
parallel safe. It's not going to force your aggregate to be
parallelised.

You might coax the planner into generating a parallel aggregate plan
by setting parallel_tuple_cost and parallel_setup_cost both to 0.

> Expected results:
> 150001, 37500, 75000, 112500
>
> Results when run in parallel:
> 150001, null, null, null

Are you actually getting a partial and finalize aggregate node with
that? Can you show the EXPLAIN output of each?

You might also want to double check your combine function. It does not
look like it's very well coded to handle NULL values for arrays that
have yet to receive their fill of 5 elements.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

Re: BUG #15869: Custom aggregation returns null when parallelized

От
David Rowley
Дата:
On Tue, 25 Jun 2019 at 04:07, Kassym Dorsel <k.dorsel@gmail.com> wrote:
> Right, adding the Gather node makes it use the combine func and this is where the problem is.

You're mixing up Gather and Parallel Aggregates. Setting
force_parallel_mode to on does not force the aggregate to be
parallelised. It just tries to inject a Gather node at the top of the
plan. I think it was really meant just to test the tuple queues for
parallel query back in 9.6. You're certainly not the only person to
have been confused by it.

> You're right on handling of null values in my combine function. Since this was being run on a table with 150k rows, I
hadassumed that the contents of my aggregate types would never be null/empty. 
>
> Thinking about it, it would make sense to receive an aggregate type with count = 0 or null iff there is 1 worker (1
resultto combine the other being null/empty). When there are 2 or more workers I would assume that rows would be
relativelyevenly split and the return of my aggregate type would be filled given the 150k rows. I tried with 1,2,3,4
workers(ALTER TABLE temp SET (parallel_workers = 1,2,3,4);) and got the same null results before adding support for
nullvalues. 
>
> Is this expected behavior when number of workers is >=2? An explicit paragraph in parallel aggregates documentation
outliningnull support in combine func might be helpful. 

I don't think anyone would be opposed to improving the documents, but
in this case, it's not the state that was NULL. You don't need to deal
with that since you made your combine function strict. It was your
array elements that were NULL and "<value> <op> NULL" yielding NULL is
fairly fundamental to SQL, not really specific to aggregation.  Your
initcond made the q[] array an empty array, so trying to fetch an
element that does not exist will yield NULL. You wouldn't have had the
issue if you'd set all those array elements to 0 in the initcond, but
I've not taken the time to understand your transfn to know if that's
valid. If you've added NULL handling in the combinefn, then that's
likely fine.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services



Re: BUG #15869: Custom aggregation returns null when parallelized

От
Kassym Dorsel
Дата:
Yes, the issue comes from having the array in my state have zero elements. However why does an aggregate worker end up with a state in which zero records have been passed to it?

In the case of 1 worker process, this makes sense to me. The combine func takes in two states, the one filled by the single worker process and another dummy empty state. In the case of 2 or more worker process I would think that all states would process some rows and not be left initialized and never used. Is the fact that when there are 2 or more workers that one or more aggregation states doesn't receive any data to process a bug? Or is this non deterministic behavior to be expected?

My mistake was making the assumption that an initialized state would always receive data and would not be passed to a combine func without having processed any data. 

This is the part that would be nice to have in the documentation. In an aggregation with a gather node an initialized worker state may never be passed any data to process and would thus keep its initialized state when being passed to the combine func.



Best,
Kassym


On Mon, Jun 24, 2019 at 6:59 PM David Rowley <david.rowley@2ndquadrant.com> wrote:
On Tue, 25 Jun 2019 at 04:07, Kassym Dorsel <k.dorsel@gmail.com> wrote:
> Right, adding the Gather node makes it use the combine func and this is where the problem is.

You're mixing up Gather and Parallel Aggregates. Setting
force_parallel_mode to on does not force the aggregate to be
parallelised. It just tries to inject a Gather node at the top of the
plan. I think it was really meant just to test the tuple queues for
parallel query back in 9.6. You're certainly not the only person to
have been confused by it.

> You're right on handling of null values in my combine function. Since this was being run on a table with 150k rows, I had assumed that the contents of my aggregate types would never be null/empty.
>
> Thinking about it, it would make sense to receive an aggregate type with count = 0 or null iff there is 1 worker (1 result to combine the other being null/empty). When there are 2 or more workers I would assume that rows would be relatively evenly split and the return of my aggregate type would be filled given the 150k rows. I tried with 1,2,3,4 workers (ALTER TABLE temp SET (parallel_workers = 1,2,3,4);) and got the same null results before adding support for null values.
>
> Is this expected behavior when number of workers is >=2? An explicit paragraph in parallel aggregates documentation outlining null support in combine func might be helpful.

I don't think anyone would be opposed to improving the documents, but
in this case, it's not the state that was NULL. You don't need to deal
with that since you made your combine function strict. It was your
array elements that were NULL and "<value> <op> NULL" yielding NULL is
fairly fundamental to SQL, not really specific to aggregation.  Your
initcond made the q[] array an empty array, so trying to fetch an
element that does not exist will yield NULL. You wouldn't have had the
issue if you'd set all those array elements to 0 in the initcond, but
I've not taken the time to understand your transfn to know if that's
valid. If you've added NULL handling in the combinefn, then that's
likely fine.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services