Обсуждение: parallel aggregation

Поиск
Список
Период
Сортировка

parallel aggregation

От
Alexander Saydakov
Дата:
I have a few questions regarding aggregate functions that would be parallel safe.
1. do the inputs of combinefunc always go through serialfunc-deserialfunc or they can come directly from sfunc (worker on the same machine, perhaps)?
2. can the result of combinefunc ever be fed to sfunc as opposed to other combiners or finalfunc?

I have in mind a scenario, when a different data structure is used in the combine stage. For that it would be good if the conversion can happen in serialfunc-deserialfunc, and the combiner does not even know about the other structure used for state transition during aggregation. If that is the case, the only problem remains with the finalfunc. It has to be ready to receive both types.

Thanks.

Re: parallel aggregation

От
David Rowley
Дата:
On Wed, 12 Apr 2023 at 22:14, Alexander Saydakov <saydakov@yahooinc.com> wrote:
>
> I have a few questions regarding aggregate functions that would be parallel safe.
> 1. do the inputs of combinefunc always go through serialfunc-deserialfunc or they can come directly from sfunc
(workeron the same machine, perhaps)? 

Only aggregates with an INTERNAL transition state must be serialised
and deserialised.  Non-internal state aggregates i.e ones that have a
corresponding database type, can be pushed through the normal means
that we transfer tuples from parallel workers to the main process
without any serialisation or deserialisation at the other end.

All serial functions must return bytea and accept a single INTERNAL
parameter, so you can't even make a serial func for an aggregate
that's not got an INTERNAL aggregate state type.

> 2. can the result of combinefunc ever be fed to sfunc as opposed to other combiners or finalfunc?

combinefuncs take 2 STYPEs, so it's not valid to pass those to an
SFUNC (those are only given a BASETYPE to transition into the
aggregate state).  The finalfunc will be called (if it exists) during
the Finalize Aggregate plan node. The Finalize Aggregate node also
gathers intermediate aggregate states from parallel workers and calls
the combinefunc on ones from the same group, so yes, the finalfunc
will be called on aggregate states that have been combined with the
combinefunc.

> I have in mind a scenario, when a different data structure is used in the combine stage. For that it would be good if
theconversion can happen in serialfunc-deserialfunc, and the combiner does not even know about the other structure used
forstate transition during aggregation. If that is the case, the only problem remains with the finalfunc. It has to be
readyto receive both types. 

What's the use case for that?

David



Re: [E] Re: parallel aggregation

От
David Rowley
Дата:
(Let's keep this on this list)

On Thu, 13 Apr 2023 at 12:08, Alexander Saydakov <saydakov@yahooinc.com> wrote:
>
> Yes, I am talking about a custom aggregate in C++ with internal state type.
> You did not really answer my first question. Does the state always go through serialfinc-deserialfunc before reaching
acombiner? 

Well, I think maybe you asked the question wrongly.  The answer I gave
was "No" because the serial and deserial functions are only used for
internal typed aggregates.  But if the question is "are serial and
deserial functions always used for internal typed aggregates between
the Partial and Finalize phase", the answer is "Yes", they are
*currently*. I wouldn't want to rely on that staying true forever,
however.  I could think of a couple of reasons that this could change
in the future:

1) Partition-wise aggregates don't really require it.  Someone might
submit a patch that allows the Partial Aggregate phase just to return
a pointer to memory and have the Finalize Aggregate state just work on
that pointer directly rather than having its own copy.
2) If we ever changed away from the process model into a threading
model then we *may* consider not performing serialisation as an
optimisation.

Even if we ever did those 2, we might still need serial/deserial
functions for sending the states to other servers.  That's something
we don't currently do as there's no SQL-level way to express "just
give me the raw states and don't call the final functions".

> The type "internal" in C++ is just a pointer. So it can be an arbitrary data structure. For some aggregations it
wouldbe better to have different state types in the first phase of the aggregation (processing raw data using state
transitionfunction) and the second phase (combining partial results). So I wonder if there is a clean separation
betweenthe phases: once partial aggregation is done the results go through serial-deserial barrier and only combining
isdone after that (sfunc never receives results of combining). If so, the question remains how to make finalfunc
understandboth states: directly from sfunc if there is no partial aggregation, and from the combiner. 
> Can a combiner receive results of another combiner? Can a combiner output also go through serial-deserial?

You have to remember that the final func can be called without the
state ever having gone through the combine func. This is what happens
in non-parallel aggregation.  Also, think of the case with > 2
parallel workers.  There might be more than 2 states to combine for
any given group.  So the combine function must be able to operate on
aggregate states that have already been combined from other states.

You could just do something similar to how we handle NodeTag's in
PostgreSQL. Effectively all Node typed structs have a NodeTag field at
the start of the struct.  This is just an enum that code can look at
to determine the node type of whichever pointer it is looking at.
Perhaps you can get away with coding your aggregate function's
component functions in a way that can handle both types, you'd just
need to look at the first 4 bytes of the pointer so you know what to
do. In Postgres, we have an IsA macro to help us with that.  Have a
look at nodes.h.

David



Re: [E] Re: parallel aggregation

От
Alexander Saydakov
Дата:
Still not quite clear:
1. I wonder if there is a clean separation between the phases: once partial aggregation is done only combining is done after that (state transition function never receives results of combining).
2. Can a combiner output also go through serial-deserial before hitting another combiner or finalizer?
Thank you very much.

On Wed, Apr 12, 2023 at 7:23 PM David Rowley <dgrowleyml@gmail.com> wrote:
(Let's keep this on this list)

On Thu, 13 Apr 2023 at 12:08, Alexander Saydakov <saydakov@yahooinc.com> wrote:
>
> Yes, I am talking about a custom aggregate in C++ with internal state type.
> You did not really answer my first question. Does the state always go through serialfinc-deserialfunc before reaching a combiner?

Well, I think maybe you asked the question wrongly.  The answer I gave
was "No" because the serial and deserial functions are only used for
internal typed aggregates.  But if the question is "are serial and
deserial functions always used for internal typed aggregates between
the Partial and Finalize phase", the answer is "Yes", they are
*currently*. I wouldn't want to rely on that staying true forever,
however.  I could think of a couple of reasons that this could change
in the future:

1) Partition-wise aggregates don't really require it.  Someone might
submit a patch that allows the Partial Aggregate phase just to return
a pointer to memory and have the Finalize Aggregate state just work on
that pointer directly rather than having its own copy.
2) If we ever changed away from the process model into a threading
model then we *may* consider not performing serialisation as an
optimisation.

Even if we ever did those 2, we might still need serial/deserial
functions for sending the states to other servers.  That's something
we don't currently do as there's no SQL-level way to express "just
give me the raw states and don't call the final functions".

> The type "internal" in C++ is just a pointer. So it can be an arbitrary data structure. For some aggregations it would be better to have different state types in the first phase of the aggregation (processing raw data using state transition function) and the second phase (combining partial results). So I wonder if there is a clean separation between the phases: once partial aggregation is done the results go through serial-deserial barrier and only combining is done after that (sfunc never receives results of combining). If so, the question remains how to make finalfunc understand both states: directly from sfunc if there is no partial aggregation, and from the combiner.
> Can a combiner receive results of another combiner? Can a combiner output also go through serial-deserial?

You have to remember that the final func can be called without the
state ever having gone through the combine func. This is what happens
in non-parallel aggregation.  Also, think of the case with > 2
parallel workers.  There might be more than 2 states to combine for
any given group.  So the combine function must be able to operate on
aggregate states that have already been combined from other states.

You could just do something similar to how we handle NodeTag's in
PostgreSQL. Effectively all Node typed structs have a NodeTag field at
the start of the struct.  This is just an enum that code can look at
to determine the node type of whichever pointer it is looking at.
Perhaps you can get away with coding your aggregate function's
component functions in a way that can handle both types, you'd just
need to look at the first 4 bytes of the pointer so you know what to
do. In Postgres, we have an IsA macro to help us with that.  Have a
look at nodes.h.

David

Re: [E] Re: parallel aggregation

От
David Rowley
Дата:
On Thu, 13 Apr 2023 at 14:31, Alexander Saydakov <saydakov@yahooinc.com> wrote:
> 1. I wonder if there is a clean separation between the phases: once partial aggregation is done only combining is
doneafter that (state transition function never receives results of combining).
 

Currently the transfn won't be called again on a state that has had
the combinefn called on it.  I can't think of a future reason that we
might change that. My imagination might be lacking, however.

> 2. Can a combiner output also go through serial-deserial before hitting another combiner or finalizer?

Not currently. However, I *can* think of reasons why that might change
in the future.  If we wanted to accept partially aggregated results
from foreign servers and then combine them locally then, if those
foreign servers did parallel aggregation, the foreign server's
combined states would need to be serialised before sending over the
network.  It feels like just a matter of time before we grow the
ability to do that. Lots of work has been done on foreign data
wrappers in the past few years. It feels like it has tailed off a bit,
but I wouldn't be surprised if we had the ability to do that in the
next few years.

David