Re: [HACKERS] [PATCH] Push limit to sort through a subquery

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [HACKERS] [PATCH] Push limit to sort through a subquery
Дата
Msg-id 32422.1503672403@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [HACKERS] [PATCH] Push limit to sort through a subquery  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers
Robert Haas <robertmhaas@gmail.com> writes:
> I'm inclined to commit both of these after a little more testing and
> self-review, but let me know if anyone else wants to review first.

Attached is an updated version of the first patch, which I rebased
over 3f4c7917b, improved a bunch of the comments for, and fixed a
couple of obvious typos in.  However, when I went to test it,
it blew up real good:

regression=# set parallel_setup_cost=0;
SET
regression=# set parallel_tuple_cost=0;
SET
regression=# set min_parallel_table_scan_size=0;
SET
regression=# set max_parallel_workers_per_gather=4;
SET
regression=# explain analyze select * from tenk1 order by fivethous limit 4;
ERROR:  retrieved too many tuples in a bounded sort
CONTEXT:  parallel worker

The cause is obvious: GatherMerge doesn't know about the contract that
it's not supposed to pull more than tuples_needed rows from an input after
promising not to do so.  I am not convinced that it's worth adding the
logic that would be needed to make that happen, so my inclination is to
abandon this patch.  But here it is if you want to push further.

            regards, tom lane

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d..1287025 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -47,17 +47,26 @@
  * greater than any 32-bit integer here so that values < 2^32 can be used
  * by individual parallel nodes to store their own state.
  */
-#define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS                UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE        UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT        UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED        UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS                UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE        UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT        UINT64CONST(0xE000000000000008)

 #define PARALLEL_TUPLE_QUEUE_SIZE        65536

 /*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+    int64        tuples_needed;    /* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
+/*
  * DSM structure for accumulating per-PlanState instrumentation.
  *
  * instrument_options: Same meaning here as in instrument.c.
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
  * execution and return results to the main backend.
  */
 ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+                     int64 tuples_needed)
 {
     ParallelExecutorInfo *pei;
     ParallelContext *pcxt;
     ExecParallelEstimateContext e;
     ExecParallelInitializeDSMContext d;
+    FixedParallelExecutorState *fpes;
     char       *pstmt_data;
     char       *pstmt_space;
     char       *param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
      * for the various things we need to store.
      */

+    /* Estimate space for fixed-size state. */
+    shm_toc_estimate_chunk(&pcxt->estimator,
+                           sizeof(FixedParallelExecutorState));
+    shm_toc_estimate_keys(&pcxt->estimator, 1);
+
     /* Estimate space for query text. */
     query_len = strlen(estate->es_sourceText);
     shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
      * asked for has been allocated or initialized yet, though, so do that.
      */

+    /* Store fixed-size state. */
+    fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+    fpes->tuples_needed = tuples_needed;
+    shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
     /* Store query string */
     query_string = shm_toc_allocate(pcxt->toc, query_len);
     memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
 void
 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
+    FixedParallelExecutorState *fpes;
     BufferUsage *buffer_usage;
     DestReceiver *receiver;
     QueryDesc  *queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     void       *area_space;
     dsa_area   *area;

+    /* Get fixed-size state. */
+    fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
     receiver = ExecParallelGetReceiver(seg, toc);
     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,6 +893,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     queryDesc->planstate->state->es_query_dsa = area;
     ExecParallelInitializeWorker(queryDesc->planstate, toc);

+    /* Pass down any tuple bound */
+    ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
     /* Run the plan */
     ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 36d2914..06a89ad 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -757,3 +757,119 @@ ExecShutdownNode(PlanState *node)

     return false;
 }
+
+/*
+ * ExecSetTupleBound
+ *
+ * Set a tuple bound for a planstate node.  This lets child plan nodes
+ * optimize based on the knowledge that the maximum number of tuples
+ * that their parent will demand is limited.
+ *
+ * Passing a negative tuples_needed value indicates "unknown limit", which
+ * should be the default assumption when this is not called at all for a
+ * particular node.  Also note the requirement that if this is called
+ * repeatedly on a plan tree, the exact same set of nodes must be updated
+ * with the new limit each time.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+    /*
+     * Since this function recurses, in principle we should check stack depth
+     * here.  In practice, it's probably pointless since the earlier node
+     * initialization tree traversal would surely have consumed more stack.
+     */
+
+    if (IsA(child_node, SortState))
+    {
+        /*
+         * If it is a Sort node, notify it that it can use bounded sort.
+         *
+         * Note: it is the responsibility of nodeSort.c to react properly to
+         * changes of these parameters.  If we ever redesign this, it'd be a
+         * good idea to integrate this signaling with the parameter-change
+         * mechanism.
+         */
+        SortState  *sortState = (SortState *) child_node;
+
+        if (tuples_needed < 0)
+        {
+            /* make sure flag gets reset if needed upon rescan */
+            sortState->bounded = false;
+        }
+        else
+        {
+            sortState->bounded = true;
+            sortState->bound = tuples_needed;
+        }
+    }
+    else if (IsA(child_node, MergeAppendState))
+    {
+        /*
+         * If it is a MergeAppend, we can apply the bound to any nodes that
+         * are children of the MergeAppend, since the MergeAppend surely need
+         * read no more than that many tuples from any one input.
+         */
+        MergeAppendState *maState = (MergeAppendState *) child_node;
+        int            i;
+
+        for (i = 0; i < maState->ms_nplans; i++)
+            ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+    }
+    else if (IsA(child_node, ResultState))
+    {
+        /*
+         * We should also look through a Result, since the planner might stick
+         * one atop MergeAppend for projection purposes.
+         *
+         * If Result supported qual checking, we'd have to punt on seeing a
+         * qual.  Note that having a resconstantqual is not a showstopper: if
+         * that fails we're not getting any rows at all.
+         */
+        if (outerPlanState(child_node))
+            ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+    }
+    else if (IsA(child_node, SubqueryScanState))
+    {
+        /*
+         * We can also look through SubqueryScan, but only if it has no qual
+         * (otherwise it might discard rows).
+         */
+        SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
+
+        if (subqueryState->ss.ps.qual == NULL)
+            ExecSetTupleBound(tuples_needed, subqueryState->subplan);
+    }
+    else if (IsA(child_node, GatherState))
+    {
+        GatherState *gstate = (GatherState *) child_node;
+
+        /*
+         * We might have a Gather node, which can propagate the bound to its
+         * workers.  As with MergeAppend, no one worker could possibly need to
+         * return more tuples than the Gather itself needs to.
+         *
+         * Note: As with Sort, the Gather node is responsible for reacting
+         * properly to changes to this parameter.
+         */
+        gstate->tuples_needed = tuples_needed;
+
+        /* We must also pass down bound to our own copy of the child plan */
+        ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+    }
+    else if (IsA(child_node, GatherMergeState))
+    {
+        GatherMergeState *gstate = (GatherMergeState *) child_node;
+
+        /* Same idea here as for Gather */
+        gstate->tuples_needed = tuples_needed;
+        ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+    }
+
+    /*
+     * In principle we could look through any plan node type that is certain
+     * not to discard or combine input rows.  In practice, there are not many
+     * node types that the planner might put between Sort and Limit, so trying
+     * to be very general is not (currently) worth the trouble.
+     */
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index e8d94ee..a0f5a60 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
     gatherstate->ps.state = estate;
     gatherstate->ps.ExecProcNode = ExecGather;
     gatherstate->need_to_scan_locally = !node->single_copy;
+    gatherstate->tuples_needed = -1;

     /*
      * Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
             if (!node->pei)
                 node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                  estate,
-                                                 gather->num_workers);
+                                                 gather->num_workers,
+                                                 node->tuples_needed);

             /*
              * Register backend workers. We might not get as many as we
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 64c6239..2526c58 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
     gm_state->ps.plan = (Plan *) node;
     gm_state->ps.state = estate;
     gm_state->ps.ExecProcNode = ExecGatherMerge;
+    gm_state->tuples_needed = -1;

     /*
      * Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
             if (!node->pei)
                 node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                  estate,
-                                                 gm->num_workers);
+                                                 gm->num_workers,
+                                                 node->tuples_needed);

             /* Try to launch workers. */
             pcxt = node->pei->pcxt;
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index ceb6854..d407bd8 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -27,7 +27,7 @@
 #include "nodes/nodeFuncs.h"

 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);


 /* ----------------------------------------------------------------
@@ -297,92 +297,27 @@ recompute_limits(LimitState *node)
     /* Set state-machine state */
     node->lstate = LIMIT_RESCAN;

-    /* Notify child node about limit, if useful */
-    pass_down_bound(node, outerPlanState(node));
+    /*
+     * Notify child node about limit, if useful.  Note: think not to
+     * "optimize" by skipping the ExecSetTupleBound call if
+     * compute_tuples_needed returns <0.  We must update child nodes anyway,
+     * in case this is a rescan and the previous time we got a different
+     * result.
+     */
+    ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
 }

 /*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  We can also pass down the bound through plan nodes
- * that cannot remove or combine input rows; for example, if our input is a
- * MergeAppend, we can apply the same bound to any Sorts that are direct
- * children of the MergeAppend, since the MergeAppend surely need not read
- * more than that many tuples from any one input.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the number of tuples needed to satisfy a Limit node.
+ * Return a negative value if there is not a determinable limit.
  */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
 {
-    /*
-     * Since this function recurses, in principle we should check stack depth
-     * here.  In practice, it's probably pointless since the earlier node
-     * initialization tree traversal would surely have consumed more stack.
-     */
-
-    if (IsA(child_node, SortState))
-    {
-        SortState  *sortState = (SortState *) child_node;
-        int64        tuples_needed = node->count + node->offset;
-
-        /* negative test checks for overflow in sum */
-        if (node->noCount || tuples_needed < 0)
-        {
-            /* make sure flag gets reset if needed upon rescan */
-            sortState->bounded = false;
-        }
-        else
-        {
-            sortState->bounded = true;
-            sortState->bound = tuples_needed;
-        }
-    }
-    else if (IsA(child_node, MergeAppendState))
-    {
-        /* Pass down the bound through MergeAppend */
-        MergeAppendState *maState = (MergeAppendState *) child_node;
-        int            i;
-
-        for (i = 0; i < maState->ms_nplans; i++)
-            pass_down_bound(node, maState->mergeplans[i]);
-    }
-    else if (IsA(child_node, ResultState))
-    {
-        /*
-         * We also have to be prepared to look through a Result, since the
-         * planner might stick one atop MergeAppend for projection purposes.
-         *
-         * If Result supported qual checking, we'd have to punt on seeing a
-         * qual.  Note that having a resconstantqual is not a showstopper: if
-         * that fails we're not getting any rows at all.
-         */
-        if (outerPlanState(child_node))
-            pass_down_bound(node, outerPlanState(child_node));
-    }
-    else if (IsA(child_node, SubqueryScanState))
-    {
-        /*
-         * We can also look through SubqueryScan, but only if it has no qual
-         * (otherwise it might discard rows).
-         */
-        SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
-
-        if (subqueryState->ss.ps.qual == NULL)
-            pass_down_bound(node, subqueryState->subplan);
-    }
-
-    /*
-     * In principle we could look through any plan node type that is certain
-     * not to discard or combine input rows.  In practice, there are not many
-     * node types that the planner might put between Sort and Limit, so trying
-     * to be very general is not worth the trouble.
-     */
+    if (node->noCount)
+        return -1;
+    /* Note: if this overflows, we'll return a negative value, which is OK */
+    return node->count + node->offset;
 }

 /* ----------------------------------------------------------------
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index bd0a87f..79b8867 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
 } ParallelExecutorInfo;

 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
-                     EState *estate, int nworkers);
+                     EState *estate, int nworkers, int64 tuples_needed);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index eacbea3..f48a603 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);


 /* ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3272c4b..15a8426 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1919,6 +1919,7 @@ typedef struct GatherState
     struct TupleQueueReader **reader;
     TupleTableSlot *funnel_slot;
     bool        need_to_scan_locally;
+    int64        tuples_needed;    /* tuple bound, see ExecSetTupleBound */
 } GatherState;

 /* ----------------
@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
     struct binaryheap *gm_heap; /* binary heap of slot indices */
     bool        gm_initialized; /* gather merge initilized ? */
     bool        need_to_scan_locally;
+    int64        tuples_needed;    /* tuple bound, see ExecSetTupleBound */
     int            gm_nkeys;
     SortSupport gm_sortkeys;    /* array of length ms_nkeys */
     struct GMReaderTupleBuffer *gm_tuple_buffers;    /* tuple buffer per reader */

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Amit Kapila
Дата:
Сообщение: Re: [HACKERS] [PATCH] Push limit to sort through a subquery
Следующее
От: Tom Lane
Дата:
Сообщение: Re: [HACKERS] [PATCH] Push limit to sort through a subquery