Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5
| От | Tom Lane |
|---|---|
| Тема | Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5 |
| Дата | |
| Msg-id | 20303.1508014292@sss.pgh.pa.us обсуждение исходный текст |
| Ответ на | Re: [BUGS] Combination of ordered-set aggregate function terminates JDBC connection on PostgreSQL 9.6.5 (Tom Lane <tgl@sss.pgh.pa.us>) |
| Список | pgsql-bugs |
I wrote:
> To know what value of randomAccess to pass to the tuplesort setup,
> we have to know *at the first transition-function call* whether
> there may be multiple final-function calls coming up. So what
> what I'm imagining is a simple boolean result "yes, there will be
> only one finalfn call, so it can destructively modify the transition
> state", or "there might be more than one finalfn call, so the finalfn(s)
> must preserve transition state". And this info has to be available
> throughout the aggregate run.
Attached is a proposed patch to make the ordered-set aggregates
safe for state merging. I've not tested it really thoroughly,
but it passes the regression cases added in 52328727b.
regards, tom lane
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 40d8ec9..1718285 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
*************** typedef struct AggStatePerTransData
*** 255,260 ****
--- 255,265 ----
Aggref *aggref;
/*
+ * Is this state value actually being shared by more than one Aggref?
+ */
+ bool aggshared;
+
+ /*
* Nominal number of arguments for aggregate function. For plain aggs,
* this excludes any ORDER BY expressions. For ordered-set aggs, this
* counts both the direct and aggregated (ORDER BY) arguments.
*************** ExecInitAgg(Agg *node, EState *estate, i
*** 3345,3353 ****
{
/*
* Existing compatible trans found, so just point the 'peragg' to
! * the same per-trans struct.
*/
pertrans = &pertransstates[existing_transno];
peragg->transno = existing_transno;
}
else
--- 3350,3359 ----
{
/*
* Existing compatible trans found, so just point the 'peragg' to
! * the same per-trans struct, and mark the trans state as shared.
*/
pertrans = &pertransstates[existing_transno];
+ pertrans->aggshared = true;
peragg->transno = existing_transno;
}
else
*************** build_pertrans_for_aggref(AggStatePerTra
*** 3449,3454 ****
--- 3455,3461 ----
/* Begin filling in the pertrans data */
pertrans->aggref = aggref;
+ pertrans->aggshared = false;
pertrans->aggCollation = aggref->inputcollid;
pertrans->transfn_oid = aggtransfn;
pertrans->serialfn_oid = aggserialfn;
*************** AggGetAggref(FunctionCallInfo fcinfo)
*** 4105,4121 ****
{
if (fcinfo->context && IsA(fcinfo->context, AggState))
{
AggStatePerAgg curperagg;
AggStatePerTrans curpertrans;
/* check curperagg (valid when in a final function) */
! curperagg = ((AggState *) fcinfo->context)->curperagg;
if (curperagg)
return curperagg->aggref;
/* check curpertrans (valid when in a transition function) */
! curpertrans = ((AggState *) fcinfo->context)->curpertrans;
if (curpertrans)
return curpertrans->aggref;
--- 4112,4129 ----
{
if (fcinfo->context && IsA(fcinfo->context, AggState))
{
+ AggState *aggstate = (AggState *) fcinfo->context;
AggStatePerAgg curperagg;
AggStatePerTrans curpertrans;
/* check curperagg (valid when in a final function) */
! curperagg = aggstate->curperagg;
if (curperagg)
return curperagg->aggref;
/* check curpertrans (valid when in a transition function) */
! curpertrans = aggstate->curpertrans;
if (curpertrans)
return curpertrans->aggref;
*************** AggGetTempMemoryContext(FunctionCallInfo
*** 4146,4151 ****
--- 4154,4197 ----
}
/*
+ * AggStateIsShared - find out whether transition state is shared
+ *
+ * If the function is being called as an aggregate support function,
+ * return TRUE if the aggregate's transition state is shared across
+ * multiple aggregates, FALSE if it is not.
+ *
+ * Returns TRUE if not called as an aggregate support function.
+ * This is intended as a conservative answer, ie "no you'd better not
+ * scribble on your input". In particular, will return TRUE if the
+ * aggregate is being used as a window function, which is a scenario
+ * in which changing the transition state is a bad idea. We might
+ * want to refine the behavior for the window case in future.
+ */
+ bool
+ AggStateIsShared(FunctionCallInfo fcinfo)
+ {
+ if (fcinfo->context && IsA(fcinfo->context, AggState))
+ {
+ AggState *aggstate = (AggState *) fcinfo->context;
+ AggStatePerAgg curperagg;
+ AggStatePerTrans curpertrans;
+
+ /* check curperagg (valid when in a final function) */
+ curperagg = aggstate->curperagg;
+
+ if (curperagg)
+ return aggstate->pertrans[curperagg->transno].aggshared;
+
+ /* check curpertrans (valid when in a transition function) */
+ curpertrans = aggstate->curpertrans;
+
+ if (curpertrans)
+ return curpertrans->aggshared;
+ }
+ return true;
+ }
+
+ /*
* AggRegisterCallback - register a cleanup callback for an aggregate
*
* This is useful for aggs to register shutdown callbacks, which will ensure
diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c
index 25905a3..1e323d9 100644
*** a/src/backend/utils/adt/orderedsetaggs.c
--- b/src/backend/utils/adt/orderedsetaggs.c
***************
*** 40,53 ****
* create just once per query because they will not change across groups.
* The per-query struct and subsidiary data live in the executor's per-query
* memory context, and go away implicitly at ExecutorEnd().
*/
typedef struct OSAPerQueryState
{
! /* Aggref for this aggregate: */
Aggref *aggref;
/* Memory context containing this struct and other per-query data: */
MemoryContext qcontext;
/* These fields are used only when accumulating tuples: */
--- 40,61 ----
* create just once per query because they will not change across groups.
* The per-query struct and subsidiary data live in the executor's per-query
* memory context, and go away implicitly at ExecutorEnd().
+ *
+ * These structs are set up during the first call of the transition function.
+ * Because we allow nodeAgg.c to merge ordered-set aggregates (but not
+ * hypothetical aggregates) with identical inputs and transition functions,
+ * this info must not depend on the particular aggregate (ie, particular
+ * final-function), nor on the direct argument(s) of the aggregate.
*/
typedef struct OSAPerQueryState
{
! /* Representative Aggref for this aggregate: */
Aggref *aggref;
/* Memory context containing this struct and other per-query data: */
MemoryContext qcontext;
+ /* Do we expect multiple final-function calls within one group? */
+ bool rescan_needed;
/* These fields are used only when accumulating tuples: */
*************** typedef struct OSAPerGroupState
*** 91,96 ****
--- 99,106 ----
Tuplesortstate *sortstate;
/* Number of normal rows inserted into sortstate: */
int64 number_of_rows;
+ /* Have we already done tuplesort_performsort? */
+ bool sort_done;
} OSAPerGroupState;
static void ordered_set_shutdown(Datum arg);
*************** ordered_set_startup(FunctionCallInfo fci
*** 146,151 ****
--- 156,164 ----
qstate->aggref = aggref;
qstate->qcontext = qcontext;
+ /* We need to support rescans if the trans state is shared */
+ qstate->rescan_needed = AggStateIsShared(fcinfo);
+
/* Extract the sort information */
sortlist = aggref->aggorder;
numSortCols = list_length(sortlist);
*************** ordered_set_startup(FunctionCallInfo fci
*** 277,291 ****
qstate->sortOperators,
qstate->sortCollations,
qstate->sortNullsFirsts,
! work_mem, false);
else
osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
qstate->sortOperator,
qstate->sortCollation,
qstate->sortNullsFirst,
! work_mem, false);
osastate->number_of_rows = 0;
/* Now register a shutdown callback to clean things up at end of group */
AggRegisterCallback(fcinfo,
--- 290,307 ----
qstate->sortOperators,
qstate->sortCollations,
qstate->sortNullsFirsts,
! work_mem,
! qstate->rescan_needed);
else
osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
qstate->sortOperator,
qstate->sortCollation,
qstate->sortNullsFirst,
! work_mem,
! qstate->rescan_needed);
osastate->number_of_rows = 0;
+ osastate->sort_done = false;
/* Now register a shutdown callback to clean things up at end of group */
AggRegisterCallback(fcinfo,
*************** ordered_set_startup(FunctionCallInfo fci
*** 306,319 ****
* group) by ExecutorEnd. But we must take care to release any potential
* non-memory resources.
*
! * This callback is arguably unnecessary, since we don't support use of
! * ordered-set aggs in AGG_HASHED mode and there is currently no non-error
! * code path in non-hashed modes wherein nodeAgg.c won't call the finalfn
! * after calling the transfn one or more times. So in principle we could rely
! * on the finalfn to delete the tuplestore etc. However, it's possible that
! * such a code path might exist in future, and in any case it'd be
! * notationally tedious and sometimes require extra data copying to ensure
! * we always delete the tuplestore in the finalfn.
*/
static void
ordered_set_shutdown(Datum arg)
--- 322,333 ----
* group) by ExecutorEnd. But we must take care to release any potential
* non-memory resources.
*
! * In the case where we're not expecting multiple finalfn calls, we could
! * arguably rely on the finalfn to clean up; but it's easier and more testable
! * if we just do it the same way in either case. Note that many of the
! * finalfns could *not* free the tuplesort object, at least not without extra
! * data copying, because what they return is a pointer to a datum inside the
! * tuplesort object.
*/
static void
ordered_set_shutdown(Datum arg)
*************** percentile_disc_final(PG_FUNCTION_ARGS)
*** 436,443 ****
if (osastate->number_of_rows == 0)
PG_RETURN_NULL();
! /* Finish the sort */
! tuplesort_performsort(osastate->sortstate);
/*----------
* We need the smallest K such that (K/N) >= percentile.
--- 450,463 ----
if (osastate->number_of_rows == 0)
PG_RETURN_NULL();
! /* Finish the sort, or rescan if we already did */
! if (!osastate->sort_done)
! {
! tuplesort_performsort(osastate->sortstate);
! osastate->sort_done = true;
! }
! else
! tuplesort_rescan(osastate->sortstate);
/*----------
* We need the smallest K such that (K/N) >= percentile.
*************** percentile_disc_final(PG_FUNCTION_ARGS)
*** 457,469 ****
if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL))
elog(ERROR, "missing row in percentile_disc");
- /*
- * Note: we *cannot* clean up the tuplesort object here, because the value
- * to be returned is allocated inside its sortcontext. We could use
- * datumCopy to copy it out of there, but it doesn't seem worth the
- * trouble, since the cleanup callback will clear the tuplesort later.
- */
-
/* We shouldn't have stored any nulls, but do the right thing anyway */
if (isnull)
PG_RETURN_NULL();
--- 477,482 ----
*************** percentile_cont_final_common(FunctionCal
*** 543,550 ****
Assert(expect_type == osastate->qstate->sortColType);
! /* Finish the sort */
! tuplesort_performsort(osastate->sortstate);
first_row = floor(percentile * (osastate->number_of_rows - 1));
second_row = ceil(percentile * (osastate->number_of_rows - 1));
--- 556,569 ----
Assert(expect_type == osastate->qstate->sortColType);
! /* Finish the sort, or rescan if we already did */
! if (!osastate->sort_done)
! {
! tuplesort_performsort(osastate->sortstate);
! osastate->sort_done = true;
! }
! else
! tuplesort_rescan(osastate->sortstate);
first_row = floor(percentile * (osastate->number_of_rows - 1));
second_row = ceil(percentile * (osastate->number_of_rows - 1));
*************** percentile_cont_final_common(FunctionCal
*** 575,587 ****
val = lerpfunc(first_val, second_val, proportion);
}
- /*
- * Note: we *cannot* clean up the tuplesort object here, because the value
- * to be returned may be allocated inside its sortcontext. We could use
- * datumCopy to copy it out of there, but it doesn't seem worth the
- * trouble, since the cleanup callback will clear the tuplesort later.
- */
-
PG_RETURN_DATUM(val);
}
--- 594,599 ----
*************** percentile_disc_multi_final(PG_FUNCTION_
*** 779,786 ****
*/
if (i < num_percentiles)
{
! /* Finish the sort */
! tuplesort_performsort(osastate->sortstate);
for (; i < num_percentiles; i++)
{
--- 791,804 ----
*/
if (i < num_percentiles)
{
! /* Finish the sort, or rescan if we already did */
! if (!osastate->sort_done)
! {
! tuplesort_performsort(osastate->sortstate);
! osastate->sort_done = true;
! }
! else
! tuplesort_rescan(osastate->sortstate);
for (; i < num_percentiles; i++)
{
*************** percentile_disc_multi_final(PG_FUNCTION_
*** 804,814 ****
}
}
- /*
- * We could clean up the tuplesort object after forming the array, but
- * probably not worth the trouble.
- */
-
/* We make the output array the same shape as the input */
PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
ARR_NDIM(param),
--- 822,827 ----
*************** percentile_cont_multi_final_common(Funct
*** 902,909 ****
*/
if (i < num_percentiles)
{
! /* Finish the sort */
! tuplesort_performsort(osastate->sortstate);
for (; i < num_percentiles; i++)
{
--- 915,928 ----
*/
if (i < num_percentiles)
{
! /* Finish the sort, or rescan if we already did */
! if (!osastate->sort_done)
! {
! tuplesort_performsort(osastate->sortstate);
! osastate->sort_done = true;
! }
! else
! tuplesort_rescan(osastate->sortstate);
for (; i < num_percentiles; i++)
{
*************** percentile_cont_multi_final_common(Funct
*** 962,972 ****
}
}
- /*
- * We could clean up the tuplesort object after forming the array, but
- * probably not worth the trouble.
- */
-
/* We make the output array the same shape as the input */
PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
ARR_NDIM(param),
--- 981,986 ----
*************** mode_final(PG_FUNCTION_ARGS)
*** 1043,1050 ****
shouldfree = !(osastate->qstate->typByVal);
! /* Finish the sort */
! tuplesort_performsort(osastate->sortstate);
/* Scan tuples and count frequencies */
while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val))
--- 1057,1070 ----
shouldfree = !(osastate->qstate->typByVal);
! /* Finish the sort, or rescan if we already did */
! if (!osastate->sort_done)
! {
! tuplesort_performsort(osastate->sortstate);
! osastate->sort_done = true;
! }
! else
! tuplesort_rescan(osastate->sortstate);
/* Scan tuples and count frequencies */
while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val))
*************** mode_final(PG_FUNCTION_ARGS)
*** 1097,1109 ****
if (shouldfree && !last_val_is_mode)
pfree(DatumGetPointer(last_val));
- /*
- * Note: we *cannot* clean up the tuplesort object here, because the value
- * to be returned is allocated inside its sortcontext. We could use
- * datumCopy to copy it out of there, but it doesn't seem worth the
- * trouble, since the cleanup callback will clear the tuplesort later.
- */
-
if (mode_freq)
PG_RETURN_DATUM(mode_val);
else
--- 1117,1122 ----
*************** hypothetical_rank_common(FunctionCallInf
*** 1174,1179 ****
--- 1187,1195 ----
hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc);
+ /* because we need a hypothetical row, we can't share transition state */
+ Assert(!osastate->sort_done);
+
/* insert the hypothetical row into the sort */
slot = osastate->qstate->tupslot;
ExecClearTuple(slot);
*************** hypothetical_rank_common(FunctionCallInf
*** 1190,1195 ****
--- 1206,1212 ----
/* finish the sort */
tuplesort_performsort(osastate->sortstate);
+ osastate->sort_done = true;
/* iterate till we find the hypothetical row */
while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot, NULL))
*************** hypothetical_rank_common(FunctionCallInf
*** 1207,1216 ****
ExecClearTuple(slot);
- /* Might as well clean up the tuplesort object immediately */
- tuplesort_end(osastate->sortstate);
- osastate->sortstate = NULL;
-
return rank;
}
--- 1224,1229 ----
*************** hypothetical_dense_rank_final(PG_FUNCTIO
*** 1329,1334 ****
--- 1342,1350 ----
/* Get short-term context we can use for execTuplesMatch */
tmpcontext = AggGetTempMemoryContext(fcinfo);
+ /* because we need a hypothetical row, we can't share transition state */
+ Assert(!osastate->sort_done);
+
/* insert the hypothetical row into the sort */
slot = osastate->qstate->tupslot;
ExecClearTuple(slot);
*************** hypothetical_dense_rank_final(PG_FUNCTIO
*** 1345,1350 ****
--- 1361,1367 ----
/* finish the sort */
tuplesort_performsort(osastate->sortstate);
+ osastate->sort_done = true;
/*
* We alternate fetching into tupslot and extraslot so that we have the
*************** hypothetical_dense_rank_final(PG_FUNCTIO
*** 1391,1400 ****
ExecDropSingleTupleTableSlot(extraslot);
- /* Might as well clean up the tuplesort object immediately */
- tuplesort_end(osastate->sortstate);
- osastate->sortstate = NULL;
-
rank = rank - duplicate_count;
PG_RETURN_INT64(rank);
--- 1408,1413 ----
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 5769f64..13f1bce 100644
*** a/src/include/catalog/pg_aggregate.h
--- b/src/include/catalog/pg_aggregate.h
*************** DATA(insert ( 3267 n 0 jsonb_agg_transfn
*** 318,330 ****
DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - -
- f f r r 0 2281 0 0 0 _null_ _null_ ));
/* ordered-set and hypothetical-set aggregates */
! DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - -
- - f f w w 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - -
- - - f f w w 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - -
- - - t f w w 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - -
- - - f f w w 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - -
- - f f w w 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - -
- - - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
--- 318,330 ----
DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - -
- f f r r 0 2281 0 0 0 _null_ _null_ ));
/* ordered-set and hypothetical-set aggregates */
! DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - -
- - t f s s 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - -
- - f f s s 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - -
- - - f f s s 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - -
- - - t f s s 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - -
- - - f f s s 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - -
- - f f s s 0 2281 0 0 0 _null_ _null_ ));
! DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - -
- - - t f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - -
- - t f w w 0 2281 0 0 0 _null_ _null_ ));
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index b604a5c..a68ec91 100644
*** a/src/include/fmgr.h
--- b/src/include/fmgr.h
*************** extern int AggCheckCallContext(FunctionC
*** 698,703 ****
--- 698,704 ----
MemoryContext *aggcontext);
extern fmAggrefPtr AggGetAggref(FunctionCallInfo fcinfo);
extern MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo);
+ extern bool AggStateIsShared(FunctionCallInfo fcinfo);
extern void AggRegisterCallback(FunctionCallInfo fcinfo,
fmExprContextCallbackFunction func,
Datum arg);
--
Sent via pgsql-bugs mailing list (pgsql-bugs@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-bugs
В списке pgsql-bugs по дате отправления: