Re: Speed of user-defined aggregates using array_append as transfn

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Speed of user-defined aggregates using array_append as transfn
Дата
Msg-id 18705.1477686897@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Speed of user-defined aggregates using array_append as transfn  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: Speed of user-defined aggregates using array_append as transfn  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
I wrote:
> 3. We could try to fix it mostly from array_append's side, by teaching it
> to return the expanded array in the aggregation context when it's being
> called as an aggregate function, and making some
> hopefully-not-performance-killing tweaks to nodeAgg to play along with
> that.  This would amount to additional complication in the API contract
> for C-coded aggregate functions, but I think it wouldn't affect functions
> that weren't attempting to invoke the optimization, so it should be OK.
> A larger objection is that it still doesn't do anything for the memory
> consumption angle.

Hm, that actually seems to work, at least from a speed standpoint; see
the attached not-terribly-well-documented patch.  The additions to nodeAgg
are only in places where we're going to be doing datum copying or freeing
anyway.

I'm still worried about chewing up a kilobyte-at-least for each transition
value, but maybe that's something we could leave to fix later.  Another
idea is that we could teach the planner to know about that in its hash
table size estimates.

            regards, tom lane

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b06e1c1..e0288ba 100644
*** a/src/backend/executor/nodeAgg.c
--- b/src/backend/executor/nodeAgg.c
*************** advance_transition_function(AggState *ag
*** 791,798 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 791,800 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if transfn returned a pointer to its
!      * first input, we don't need to do anything.  Also, if transfn returned a
!      * pointer to a R/W expanded object that is already a child of the
!      * aggcontext, assume we can adopt that value without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_transition_function(AggState *ag
*** 800,811 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 802,826 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
*************** advance_combine_function(AggState *aggst
*** 1053,1060 ****

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * pfree the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
--- 1068,1078 ----

      /*
       * If pass-by-ref datatype, must copy the new value into aggcontext and
!      * free the prior transValue.  But if the combine function returned a
!      * pointer to its first input, we don't need to do anything.  Also, if the
!      * combine function returned a pointer to a R/W expanded object that is
!      * already a child of the aggcontext, assume we can adopt that value
!      * without copying it.
       */
      if (!pertrans->transtypeByVal &&
          DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
*************** advance_combine_function(AggState *aggst
*** 1062,1073 ****
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             newVal = datumCopy(newVal,
!                                pertrans->transtypeByVal,
!                                pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!             pfree(DatumGetPointer(pergroupstate->transValue));
      }

      pergroupstate->transValue = newVal;
--- 1080,1104 ----
          if (!fcinfo->isnull)
          {
              MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
!             if (DatumIsReadWriteExpandedObject(newVal,
!                                                false,
!                                                pertrans->transtypeLen) &&
!                 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
!                  /* do nothing */ ;
!             else
!                 newVal = datumCopy(newVal,
!                                    pertrans->transtypeByVal,
!                                    pertrans->transtypeLen);
          }
          if (!pergroupstate->transValueIsNull)
!         {
!             if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
!                                                false,
!                                                pertrans->transtypeLen))
!                 DeleteExpandedObject(pergroupstate->transValue);
!             else
!                 pfree(DatumGetPointer(pergroupstate->transValue));
!         }
      }

      pergroupstate->transValue = newVal;
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index 1ef1500..8d6fa41 100644
*** a/src/backend/utils/adt/array_userfuncs.c
--- b/src/backend/utils/adt/array_userfuncs.c
*************** static Datum array_position_common(Funct
*** 32,37 ****
--- 32,41 ----
   * Caution: if the input is a read/write pointer, this returns the input
   * argument; so callers must be sure that their changes are "safe", that is
   * they cannot leave the array in a corrupt state.
+  *
+  * If we're being called as an aggregate function, make sure any newly-made
+  * expanded array is allocated in the aggregate state context, so as to save
+  * copying operations.
   */
  static ExpandedArrayHeader *
  fetch_array_arg_replace_nulls(FunctionCallInfo fcinfo, int argno)
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 39,44 ****
--- 43,49 ----
      ExpandedArrayHeader *eah;
      Oid            element_type;
      ArrayMetaState *my_extra;
+     MemoryContext resultcxt;

      /* If first time through, create datatype cache struct */
      my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 51,60 ****
--- 56,72 ----
          fcinfo->flinfo->fn_extra = my_extra;
      }

+     /* Figure out which context we want the result in */
+     if (!AggCheckCallContext(fcinfo, &resultcxt))
+         resultcxt = CurrentMemoryContext;
+
      /* Now collect the array value */
      if (!PG_ARGISNULL(argno))
      {
+         MemoryContext oldcxt = MemoryContextSwitchTo(resultcxt);
+
          eah = PG_GETARG_EXPANDED_ARRAYX(argno, my_extra);
+         MemoryContextSwitchTo(oldcxt);
      }
      else
      {
*************** fetch_array_arg_replace_nulls(FunctionCa
*** 72,78 ****
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              CurrentMemoryContext,
                                               my_extra);
      }

--- 84,90 ----
                       errmsg("input data type is not an array")));

          eah = construct_empty_expanded_array(element_type,
!                                              resultcxt,
                                               my_extra);
      }


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Jim Nasby
Дата:
Сообщение: Re: emergency outage requiring database restart
Следующее
От: Merlin Moncure
Дата:
Сообщение: Re: emergency outage requiring database restart