Re: [HACKERS] [PATCH] Push limit to sort through a subquery

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [HACKERS] [PATCH] Push limit to sort through a subquery
Дата
Msg-id 8844.1503681519@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [HACKERS] [PATCH] Push limit to sort through a subquery  (Robert Haas <robertmhaas@gmail.com>)
Ответы Re: [HACKERS] [PATCH] Push limit to sort through a subquery  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers
v4, with a test case and some more comment-polishing.  I think this
is committable.

            regards, tom lane

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d..ad9eba6 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -47,17 +47,26 @@
  * greater than any 32-bit integer here so that values < 2^32 can be used
  * by individual parallel nodes to store their own state.
  */
-#define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS                UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE        UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT        UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED        UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT        UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS                UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE        UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE        UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION    UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA                UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT        UINT64CONST(0xE000000000000008)

 #define PARALLEL_TUPLE_QUEUE_SIZE        65536

 /*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+    int64        tuples_needed;    /* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
+/*
  * DSM structure for accumulating per-PlanState instrumentation.
  *
  * instrument_options: Same meaning here as in instrument.c.
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
  * execution and return results to the main backend.
  */
 ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+                     int64 tuples_needed)
 {
     ParallelExecutorInfo *pei;
     ParallelContext *pcxt;
     ExecParallelEstimateContext e;
     ExecParallelInitializeDSMContext d;
+    FixedParallelExecutorState *fpes;
     char       *pstmt_data;
     char       *pstmt_space;
     char       *param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
      * for the various things we need to store.
      */

+    /* Estimate space for fixed-size state. */
+    shm_toc_estimate_chunk(&pcxt->estimator,
+                           sizeof(FixedParallelExecutorState));
+    shm_toc_estimate_keys(&pcxt->estimator, 1);
+
     /* Estimate space for query text. */
     query_len = strlen(estate->es_sourceText);
     shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
      * asked for has been allocated or initialized yet, though, so do that.
      */

+    /* Store fixed-size state. */
+    fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+    fpes->tuples_needed = tuples_needed;
+    shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
     /* Store query string */
     query_string = shm_toc_allocate(pcxt->toc, query_len);
     memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
 void
 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
+    FixedParallelExecutorState *fpes;
     BufferUsage *buffer_usage;
     DestReceiver *receiver;
     QueryDesc  *queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     void       *area_space;
     dsa_area   *area;

+    /* Get fixed-size state. */
+    fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
     /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
     receiver = ExecParallelGetReceiver(seg, toc);
     instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     queryDesc->planstate->state->es_query_dsa = area;
     ExecParallelInitializeWorker(queryDesc->planstate, toc);

-    /* Run the plan */
-    ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
+    /* Pass down any tuple bound */
+    ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
+    /*
+     * Run the plan.  If we specified a tuple bound, be careful not to demand
+     * more tuples than that.
+     */
+    ExecutorRun(queryDesc,
+                ForwardScanDirection,
+                fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
+                true);

     /* Shut down the executor */
     ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 36d2914..c1aa506 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node)

     return false;
 }
+
+/*
+ * ExecSetTupleBound
+ *
+ * Set a tuple bound for a planstate node.  This lets child plan nodes
+ * optimize based on the knowledge that the maximum number of tuples that
+ * their parent will demand is limited.  The tuple bound for a node may
+ * only be changed between scans (i.e., after node initialization or just
+ * before an ExecReScan call).
+ *
+ * Any negative tuples_needed value means "no limit", which should be the
+ * default assumption when this is not called at all for a particular node.
+ *
+ * Note: if this is called repeatedly on a plan tree, the exact same set
+ * of nodes must be updated with the new limit each time; be careful that
+ * only unchanging conditions are tested here.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+    /*
+     * Since this function recurses, in principle we should check stack depth
+     * here.  In practice, it's probably pointless since the earlier node
+     * initialization tree traversal would surely have consumed more stack.
+     */
+
+    if (IsA(child_node, SortState))
+    {
+        /*
+         * If it is a Sort node, notify it that it can use bounded sort.
+         *
+         * Note: it is the responsibility of nodeSort.c to react properly to
+         * changes of these parameters.  If we ever redesign this, it'd be a
+         * good idea to integrate this signaling with the parameter-change
+         * mechanism.
+         */
+        SortState  *sortState = (SortState *) child_node;
+
+        if (tuples_needed < 0)
+        {
+            /* make sure flag gets reset if needed upon rescan */
+            sortState->bounded = false;
+        }
+        else
+        {
+            sortState->bounded = true;
+            sortState->bound = tuples_needed;
+        }
+    }
+    else if (IsA(child_node, MergeAppendState))
+    {
+        /*
+         * If it is a MergeAppend, we can apply the bound to any nodes that
+         * are children of the MergeAppend, since the MergeAppend surely need
+         * read no more than that many tuples from any one input.
+         */
+        MergeAppendState *maState = (MergeAppendState *) child_node;
+        int            i;
+
+        for (i = 0; i < maState->ms_nplans; i++)
+            ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+    }
+    else if (IsA(child_node, ResultState))
+    {
+        /*
+         * Similarly, for a projecting Result, we can apply the bound to its
+         * child node.
+         *
+         * If Result supported qual checking, we'd have to punt on seeing a
+         * qual.  Note that having a resconstantqual is not a showstopper: if
+         * that condition succeeds it affects nothing, while if it fails, no
+         * rows will be demanded from the Result child anyway.
+         */
+        if (outerPlanState(child_node))
+            ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+    }
+    else if (IsA(child_node, SubqueryScanState))
+    {
+        /*
+         * We can also descend through SubqueryScan, but only if it has no
+         * qual (otherwise it might discard rows).
+         */
+        SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
+
+        if (subqueryState->ss.ps.qual == NULL)
+            ExecSetTupleBound(tuples_needed, subqueryState->subplan);
+    }
+    else if (IsA(child_node, GatherState))
+    {
+        /*
+         * A Gather node can propagate the bound to its workers.  As with
+         * MergeAppend, no one worker could possibly need to return more
+         * tuples than the Gather itself needs to.
+         *
+         * Note: As with Sort, the Gather node is responsible for reacting
+         * properly to changes to this parameter.
+         */
+        GatherState *gstate = (GatherState *) child_node;
+
+        gstate->tuples_needed = tuples_needed;
+
+        /* Also pass down the bound to our own copy of the child plan */
+        ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+    }
+    else if (IsA(child_node, GatherMergeState))
+    {
+        /* Same comments as for Gather */
+        GatherMergeState *gstate = (GatherMergeState *) child_node;
+
+        gstate->tuples_needed = tuples_needed;
+
+        ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+    }
+
+    /*
+     * In principle we could descend through any plan node type that is
+     * certain not to discard or combine input rows; but on seeing a node that
+     * can do that, we can't propagate the bound any further.  For the moment
+     * it's unclear that any other cases are worth checking here.
+     */
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index e8d94ee..a0f5a60 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
     gatherstate->ps.state = estate;
     gatherstate->ps.ExecProcNode = ExecGather;
     gatherstate->need_to_scan_locally = !node->single_copy;
+    gatherstate->tuples_needed = -1;

     /*
      * Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
             if (!node->pei)
                 node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                  estate,
-                                                 gather->num_workers);
+                                                 gather->num_workers,
+                                                 node->tuples_needed);

             /*
              * Register backend workers. We might not get as many as we
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 64c6239..2526c58 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
     gm_state->ps.plan = (Plan *) node;
     gm_state->ps.state = estate;
     gm_state->ps.ExecProcNode = ExecGatherMerge;
+    gm_state->tuples_needed = -1;

     /*
      * Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
             if (!node->pei)
                 node->pei = ExecInitParallelPlan(node->ps.lefttree,
                                                  estate,
-                                                 gm->num_workers);
+                                                 gm->num_workers,
+                                                 node->tuples_needed);

             /* Try to launch workers. */
             pcxt = node->pei->pcxt;
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index ceb6854..883f46c 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -27,7 +27,7 @@
 #include "nodes/nodeFuncs.h"

 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);


 /* ----------------------------------------------------------------
@@ -297,92 +297,26 @@ recompute_limits(LimitState *node)
     /* Set state-machine state */
     node->lstate = LIMIT_RESCAN;

-    /* Notify child node about limit, if useful */
-    pass_down_bound(node, outerPlanState(node));
+    /*
+     * Notify child node about limit.  Note: think not to "optimize" by
+     * skipping ExecSetTupleBound if compute_tuples_needed returns < 0.  We
+     * must update the child node anyway, in case this is a rescan and the
+     * previous time we got a different result.
+     */
+    ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
 }

 /*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  We can also pass down the bound through plan nodes
- * that cannot remove or combine input rows; for example, if our input is a
- * MergeAppend, we can apply the same bound to any Sorts that are direct
- * children of the MergeAppend, since the MergeAppend surely need not read
- * more than that many tuples from any one input.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the maximum number of tuples needed to satisfy this Limit node.
+ * Return a negative value if there is not a determinable limit.
  */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
 {
-    /*
-     * Since this function recurses, in principle we should check stack depth
-     * here.  In practice, it's probably pointless since the earlier node
-     * initialization tree traversal would surely have consumed more stack.
-     */
-
-    if (IsA(child_node, SortState))
-    {
-        SortState  *sortState = (SortState *) child_node;
-        int64        tuples_needed = node->count + node->offset;
-
-        /* negative test checks for overflow in sum */
-        if (node->noCount || tuples_needed < 0)
-        {
-            /* make sure flag gets reset if needed upon rescan */
-            sortState->bounded = false;
-        }
-        else
-        {
-            sortState->bounded = true;
-            sortState->bound = tuples_needed;
-        }
-    }
-    else if (IsA(child_node, MergeAppendState))
-    {
-        /* Pass down the bound through MergeAppend */
-        MergeAppendState *maState = (MergeAppendState *) child_node;
-        int            i;
-
-        for (i = 0; i < maState->ms_nplans; i++)
-            pass_down_bound(node, maState->mergeplans[i]);
-    }
-    else if (IsA(child_node, ResultState))
-    {
-        /*
-         * We also have to be prepared to look through a Result, since the
-         * planner might stick one atop MergeAppend for projection purposes.
-         *
-         * If Result supported qual checking, we'd have to punt on seeing a
-         * qual.  Note that having a resconstantqual is not a showstopper: if
-         * that fails we're not getting any rows at all.
-         */
-        if (outerPlanState(child_node))
-            pass_down_bound(node, outerPlanState(child_node));
-    }
-    else if (IsA(child_node, SubqueryScanState))
-    {
-        /*
-         * We can also look through SubqueryScan, but only if it has no qual
-         * (otherwise it might discard rows).
-         */
-        SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
-
-        if (subqueryState->ss.ps.qual == NULL)
-            pass_down_bound(node, subqueryState->subplan);
-    }
-
-    /*
-     * In principle we could look through any plan node type that is certain
-     * not to discard or combine input rows.  In practice, there are not many
-     * node types that the planner might put between Sort and Limit, so trying
-     * to be very general is not worth the trouble.
-     */
+    if (node->noCount)
+        return -1;
+    /* Note: if this overflows, we'll return a negative value, which is OK */
+    return node->count + node->offset;
 }

 /* ----------------------------------------------------------------
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index bd0a87f..79b8867 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
 } ParallelExecutorInfo;

 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
-                     EState *estate, int nworkers);
+                     EState *estate, int nworkers, int64 tuples_needed);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index eacbea3..f48a603 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);


 /* ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3272c4b..15a8426 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1919,6 +1919,7 @@ typedef struct GatherState
     struct TupleQueueReader **reader;
     TupleTableSlot *funnel_slot;
     bool        need_to_scan_locally;
+    int64        tuples_needed;    /* tuple bound, see ExecSetTupleBound */
 } GatherState;

 /* ----------------
@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
     struct binaryheap *gm_heap; /* binary heap of slot indices */
     bool        gm_initialized; /* gather merge initilized ? */
     bool        need_to_scan_locally;
+    int64        tuples_needed;    /* tuple bound, see ExecSetTupleBound */
     int            gm_nkeys;
     SortSupport gm_sortkeys;    /* array of length ms_nkeys */
     struct GMReaderTupleBuffer *gm_tuple_buffers;    /* tuple buffer per reader */
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 084f0f0..ccad18e 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty;
    500
 (20 rows)

+reset enable_hashagg;
+-- gather merge test with a LIMIT
+explain (costs off)
+  select fivethous from tenk1 order by fivethous limit 4;
+                  QUERY PLAN
+----------------------------------------------
+ Limit
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: fivethous
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+select fivethous from tenk1 order by fivethous limit 4;
+ fivethous
+-----------
+         0
+         0
+         1
+         1
+(4 rows)
+
 -- gather merge test with 0 worker
 set max_parallel_workers = 0;
 explain (costs off)
@@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5;
 (5 rows)

 reset max_parallel_workers;
-reset enable_hashagg;
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
 explain (costs off)
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 58c3f59..c0debdd 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -118,13 +118,20 @@ explain (costs off)

 select count(*) from tenk1 group by twenty;

+reset enable_hashagg;
+
+-- gather merge test with a LIMIT
+explain (costs off)
+  select fivethous from tenk1 order by fivethous limit 4;
+
+select fivethous from tenk1 order by fivethous limit 4;
+
 -- gather merge test with 0 worker
 set max_parallel_workers = 0;
 explain (costs off)
    select string4 from tenk1 order by string4 limit 5;
 select string4 from tenk1 order by string4 limit 5;
 reset max_parallel_workers;
-reset enable_hashagg;

 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Robert Haas
Дата:
Сообщение: Re: [HACKERS] paths in partitions of a dummy partitioned table
Следующее
От: Alexander Kuzmenkov
Дата:
Сообщение: Re: [HACKERS] Range Merge Join v1