Re: asynchronous and vectorized execution

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: asynchronous and vectorized execution
Дата
Msg-id 20160510.175039.122461151.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на asynchronous and vectorized execution  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers
Hello.

At Mon, 9 May 2016 13:33:55 -0400, Robert Haas <robertmhaas@gmail.com> wrote in
<CA+Tgmobx8su_bYtAa3DgrqB+R7xZG6kHRj0ccMUUshKAQVftww@mail.gmail.com>
> Hi,
> 
> I realize that we haven't gotten 9.6beta1 out the door yet, but I
> think we can't really wait much longer to start having at least some
> discussion of 9.7 topics, so I'm going to go ahead and put this one
> out there.  I believe there are other people thinking about these
> topics as well, including Andres Freund, Kyotaro Horiguchi, and
> probably some folks at 2ndQuadrant (but I don't know exactly who).  To
> make a long story short, I think there are several different areas
> where we should consider major upgrades to our executor.  It's too
> slow and it doesn't do everything we want it to do.  The main things
> on my mind are:

> 1. asynchronous execution, by which I mean the ability of a node to
> somehow say that it will generate a tuple eventually, but is not yet
> ready, so that the executor can go run some other part of the plan
> tree while it waits.  This case most obviously arises for foreign
> tables, where it makes little sense to block on I/O if some other part
> of the query tree could benefit from the CPU; consider SELECT * FROM
> lt WHERE qual UNION SELECT * FROM ft WHERE qual.

This is my main concern and what I wanted to solve.

> It is also a problem
> for parallel query: in a parallel sequential scan, the next worker can
> begin reading the next block even if the current block hasn't yet been
> received from the OS.  Whether or not this will be efficient is a
> research question, but it can be done.  However, imagine a parallel
> scan of a btree index: we don't know what page to scan next until we
> read the previous page and examine the next-pointer.  In the meantime,
> any worker that arrives at that scan node has no choice but to block.
> It would be better if the scan node could instead say "hey, thanks for
> coming but I'm really not ready to be on-CPU just at the moment" and
> potentially allow the worker to go work in some other part of the
> query tree.

Especially for foreign tables, there must be gaps between sending
FETCH and getting the result. Visiting other tables is very
effective to fill the gaps. Using file descriptors is greatly
helps this in effective way, thanks to the new API
WaitEventSet. The attached is a WiP of PoC (sorry for including
some debug code and irrelevant code) of that. It is a bit
different in Exec* APIs from the 0002 patch but works even only
for postgres-fdw and append. It embeds waiting code into
ExecAppend but easily replaceable with the framework in the
Robert's 0003 patch.

Apart from the core part, for postgres-fdw, some scans resides
together on one connection. These scans share the same FD but
there's no means to identify for which scan-node the fd is
signalled. To handle the situation, we might need 'seemed to be
ready but really not' route.

> For that worker to actually find useful work to do
> elsewhere, we'll probably need it to be the case either that the table
> is partitioned or the original query will need to involve UNION ALL,
> but those are not silly cases to worry about, particularly if we get
> native partitioning in 9.7.

One annoyance of this method is one FD with latch-like data
drain. Since we should provide FDs for such nodes, gather would
may have another data-passing channel on the FDs.

And I want to realize early-execution of async nodes. This might
need that all types of node return 'not-ready' for the first call
even if it is async-capable.

> 2. vectorized execution, by which I mean the ability of a node to
> return tuples in batches rather than one by one.  Andres has opined
> more than once that repeated trips through ExecProcNode defeat the
> ability of the CPU to do branch prediction correctly, slowing the
> whole system down, and that they also result in poor CPU cache
> behavior, since we jump all over the place executing a little bit of
> code from each node before moving onto the next rather than running
> one bit of code first, and then another later.  I think that's
> probably right.   For example, consider a 5-table join where all of
> the joins are implemented as hash tables.  If this query plan is going
> to be run to completion, it would make much more sense to fetch, say,
> 100 tuples from the driving scan and then probe for all of those in
> the first hash table, and then probe for all of those in the second
> hash table, and so on.  What we do instead is fetch one tuple and
> probe for it in all 5 hash tables, and then repeat.  If one of those
> hash tables would fit in the CPU cache but all five together will not,
> that seems likely to be a lot worse.   But even just ignoring the CPU
> cache aspect of it for a minute, suppose you want to write a loop to
> perform a hash join.  The inner loop fetches the next tuple from the
> probe table and does a hash lookup.  Right now, fetching the next
> tuple from the probe table means calling a function which in turn
> calls another function which probably calls another function which
> probably calls another function and now about 4 layers down we
> actually get the next tuple.  If the scan returned a batch of tuples
> to the hash join, fetching the next tuple from the batch would
> probably be 0 or 1 function calls rather than ... more.  Admittedly,
> you've got to consider the cost of marshaling the batches but I'm
> optimistic that there are cycles to be squeezed out here.  We might
> also want to consider storing batches of tuples in a column-optimized
> rather than row-optimized format so that iterating through one or two
> attributes across every tuple in the batch touches the minimal number
> of cache lines.
> 
> Obviously, both of these are big projects that could touch a large
> amount of executor code, and there may be other ideas, in addition to
> these, which some of you may be thinking about that could also touch a
> large amount of executor code.  It would be nice to agree on a way
> forward that minimizes code churn and maximizes everyone's attempt to
> contribute without conflicting with each other.  Also, it seems
> desirable to enable, as far as possible, incremental development - in
> particular, it seems to me that it would be good to pick a design that
> doesn't require massive changes to every node all at once.  A single
> patch that adds some capability to every node in the executor in one
> fell swoop is going to be too large to review effectively.
> 
> My proposal for how to do this is to make ExecProcNode function as a
> backward-compatibility wrapper.  For asynchronous execution, a node
> might return a not-ready-yet indication, but if that node is called
> via ExecProcNode, it means the caller isn't prepared to receive such
> an indication, so ExecProcNode will just wait for the node to become
> ready and then return the tuple.  Similarly, for vectorized execution,
> a node might return a bunch of tuples all at once.  ExecProcNode will
> extract the first one and return it to the caller, and subsequent
> calls to ExecProcNode will iterate through the rest of the batch, only
> calling the underlying node-specific function when the batch is
> exhausted.  In this way, code that doesn't know about the new stuff
> can continue to work pretty much as it does today.  Also, and I think
> this is important, nodes don't need the permission of their parent
> node to use these new capabilities.  They can use them whenever they
> wish, without worrying about whether the upper node is prepared to
> deal with it.  If not, ExecProcNode will paper over the problem.  This
> seems to me to be a good way to keep the code simple.

Agreed to returning not-ready state and wrapping nodes to
disguise old-style API, but I suppose Exec* may return a tuple as
it does corrently.

> For asynchronous execution, I have gone so far as to mock up a bit of
> what this might look like.  This shouldn't be taken very seriously at
> this point, but I'm attaching a few very-much-WIP patches to show the
> direction of my line of thinking.  Basically, I propose to have
> ExecBlah (that is, ExecBitmapHeapScan, ExecAppend, etc.) return tuples
> by putting them into a new PlanState member called "result", which is
> just a Node * so that we can support multiple types of results,
> instead of returning them.  There is also a result_ready boolean, so
> that a node can return without setting this Boolean to engage
> asynchronous behavior.  This triggers an "event loop", which
> repeatedly waits for FDs chosen by waiting nodes to become readable
> and/or writeable and then gives the node a chance to react.
> Eventually, the waiting node will stop waiting and have a result
> ready, at which point the event loop will give the parent of that node
> a chance to run.  If that node consequently becomes ready, then its
> parent gets a chance to run.  Eventually (we hope), the node for which
> we're waiting becomes ready, and we can then read a result tuple.

I thought almost the same, even only for AppendNode..

> With some more work, this seems like it can handle the FDW case, but I
> haven't worked out how to make it handle the related parallel query
> case.  What we want there is to wait not for the readiness of an FD
> but rather for some other process involved in the parallel query to
> reach a point where it can welcome assistance executing that node.  I
> don't know exactly what the signaling for that should look like yet -
> maybe setting the process latch or something.

Agreed as described above.

> By the way, one smaller executor project that I think we should also
> look at has to do with this comment in nodeSeqScan.c:
> 
> static bool
> SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
> {
>         /*
>          * Note that unlike IndexScan, SeqScan never use keys in heap_beginscan
>          * (and this is very bad) - so, here we do not check are keys ok or not.
>          */
>         return true;
> }
> 
> Some quick prototyping by my colleague Dilip Kumar suggests that, in
> fact, there are cases where pushing down keys into heap_beginscan()
> could be significantly faster.  Some care is required here because any
> functions we execute as scan keys are run with the buffer locked, so
> we had better not run anything very complicated.  But doing this for
> simple things like integer equality operators seems like it could save
> quite a few buffer lock/unlock cycles and some other executor overhead
> as well.

The cost of pushing-down keys on seqscans seems calucalatable
with a maybe-small amount of computation. So I suppose it is
promising.

> Thoughts, ideas, suggestions, etc. very welcome.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 2f49268..49e334f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -120,6 +120,14 @@ enum FdwDirectModifyPrivateIndex    FdwDirectModifyPrivateSetProcessed};
+typedef enum PgFdwFetchState
+{
+    PGFDWFETCH_IDLE,
+    PGFDWFETCH_WAITING,
+    PGFDWFETCH_READY,
+    PGFDWFETCH_EOF
+} PgFdwFetchState;
+/* * Execution state of a foreign scan using postgres_fdw. */
@@ -151,6 +159,8 @@ typedef struct PgFdwScanState    /* batch-level state, for optimizing rewinds and avoiding useless
fetch*/    int            fetch_ct_2;        /* Min(# of fetches done, 2) */    bool        eof_reached;    /* true if
lastfetch reached EOF */
 
+    bool        is_async;
+    PgFdwFetchState fetch_status;    /* working memory contexts */    MemoryContext batch_cxt;    /* context holding
currentbatch of tuples */
 
@@ -1248,7 +1258,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)     */    fsstate = (PgFdwScanState
*)palloc0(sizeof(PgFdwScanState));    node->fdw_state = (void *) fsstate;
 
-
+    fsstate->is_async = ((eflags & EXEC_FLAG_ASYNC) != 0);    /*     * Obtain the foreign server where to connect and
usermapping to use for     * connection. For base relations we obtain this information from
 
@@ -1287,6 +1297,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)     */    fsstate->conn =
GetConnection(user,false);
 
+    /* Set a waiting fd to allow asynchronous waiting in upper node */
+    node->ss.ps.fd = PQsocket(fsstate->conn);
+    /* Assign a unique ID for my cursor */    fsstate->cursor_number = GetCursorNumber(fsstate->conn);
fsstate->cursor_exists= false;
 
@@ -1359,12 +1372,22 @@ postgresIterateForeignScan(ForeignScanState *node)     */    if (fsstate->next_tuple >=
fsstate->num_tuples)   {
 
-        /* No point in another fetch if we already detected EOF, though. */
-        if (!fsstate->eof_reached)
-            fetch_more_data(node);
-        /* If we didn't get any tuples, must be end of data. */
-        if (fsstate->next_tuple >= fsstate->num_tuples)
+        fetch_more_data(node);
+        if (fsstate->fetch_status == PGFDWFETCH_WAITING)
+        {
+            /*
+             * fetch_more_data just sent the asynchronous query for next
+             * output, so ask the caller to visit the next table.
+             */
+            node->ss.ps.exec_status = EXEC_NOT_READY;
+            return ExecClearTuple(slot);
+        }
+        else if (fsstate->fetch_status == PGFDWFETCH_EOF)
+        {
+            /* fetch_more_data give no more tuples */
+            node->ss.ps.exec_status = EXEC_EOT;            return ExecClearTuple(slot);
+        }    }    /*
@@ -2872,7 +2895,9 @@ fetch_more_data(ForeignScanState *node)    PgFdwScanState *fsstate = (PgFdwScanState *)
node->fdw_state;   PGresult   *volatile res = NULL;    MemoryContext oldcontext;
 
-
+    PGconn       *conn = fsstate->conn;
+    char        sql[64];
+        /*     * We'll store the tuples in the batch_cxt.  First, flush the previous     * batch.
@@ -2881,18 +2906,51 @@ fetch_more_data(ForeignScanState *node)    MemoryContextReset(fsstate->batch_cxt);
oldcontext= MemoryContextSwitchTo(fsstate->batch_cxt);
 
+
+    snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+             fsstate->fetch_size, fsstate->cursor_number);
+    if (fsstate->fetch_status != PGFDWFETCH_WAITING)
+    {
+        /*
+         * If we reached the final tuple in previous call, no more tuple will
+         * be fetched this time.
+         */
+        if (fsstate->eof_reached)
+        {
+            fsstate->fetch_status = PGFDWFETCH_EOF;
+            return;
+        }
+
+        if (!PQsendQuery(conn, sql))
+            pgfdw_report_error(ERROR, NULL, conn, false, sql);
+        fsstate->fetch_status = PGFDWFETCH_WAITING;
+
+        /*
+         * When currently on a connection running asynchronous fetching, we
+         * return immediately here.
+         */
+        if (fsstate->is_async)
+            return;
+    }
+    else
+    {
+        Assert(fsstate->is_async);
+        if (!PQconsumeInput(conn))
+            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+                
+        if (PQisBusy(conn))
+            return;
+    }
+    /* PGresult must be released before leaving this function. */    PG_TRY();    {
-        PGconn       *conn = fsstate->conn;
-        char        sql[64];
-        int            numrows;        int            i;
+        int            numrows;
-        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-                 fsstate->fetch_size, fsstate->cursor_number);
+        res = pgfdw_get_result(conn, sql);
+        fsstate->fetch_status = PGFDWFETCH_READY;
-        res = pgfdw_exec_query(conn, sql);        /* On error, report the original query, not the FETCH. */        if
(PQresultStatus(res)!= PGRES_TUPLES_OK)            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
@@ -2923,6 +2981,10 @@ fetch_more_data(ForeignScanState *node)        /* Must be EOF if we didn't get as many tuples as
weasked for. */        fsstate->eof_reached = (numrows < fsstate->fetch_size);
 
+        /* But don't return EOF if any tuple available */
+        if (numrows == 0)
+            fsstate->fetch_status = PGFDWFETCH_EOF;
+        PQclear(res);        res = NULL;    }
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ac02304..f76fc94 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1553,6 +1553,8 @@ ExecutePlan(EState *estate,    if (use_parallel_mode)        EnterParallelMode();
+    ExecStartNode(planstate);
+    /*     * Loop until we've processed the proper number of tuples from the plan.     */
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 554244f..590b28e 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)    if (node->instrument)        InstrStartNode(node->instrument);
+    node->exec_status = EXEC_READY;
+    switch (nodeTag(node))    {            /*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)    if (node->instrument)        InstrStopNode(node->instrument,
TupIsNull(result)? 0.0 : 1.0);
 
+    if (TupIsNull(result) &&
+        node->exec_status == EXEC_READY)
+        node->exec_status = EXEC_EOT;
+    return result;}
@@ -786,6 +792,30 @@ ExecEndNode(PlanState *node)}/*
+ * ExecStartNode - execute registered early-startup callbacks
+ */
+bool
+ExecStartNode(PlanState *node)
+{
+    if (node == NULL)
+        return false;
+
+    switch (nodeTag(node))
+    {
+    case T_GatherState:
+        return ExecStartGather((GatherState *)node);
+        break;
+    case T_SeqScanState:
+        return ExecStartSeqScan((SeqScanState *)node);
+        break;
+    default:
+        break;    
+    }
+
+    return planstate_tree_walker(node, ExecStartNode, NULL);
+}
+
+/* * ExecShutdownNode * * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 0c1e4a3..95130b0 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -2344,6 +2344,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)    aggstate = makeNode(AggState);
aggstate->ss.ps.plan= (Plan *) node;    aggstate->ss.ps.state = estate;
 
+    aggstate->ss.ps.fd = -1;    aggstate->aggs = NIL;    aggstate->numaggs = 0;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..004c621 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,9 +121,11 @@ ExecInitAppend(Append *node, EState *estate, int eflags){    AppendState *appendstate =
makeNode(AppendState);   PlanState **appendplanstates;
 
+    AppendAsyncState *asyncstates;    int            nplans;    int            i;    ListCell   *lc;
+    bool        has_async_child = false;    /* check for unsupported flags */    Assert(!(eflags & EXEC_FLAG_MARK));
@@ -134,14 +136,22 @@ ExecInitAppend(Append *node, EState *estate, int eflags)    nplans =
list_length(node->appendplans);   appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
 
+    asyncstates =
+        (AppendAsyncState *) palloc0(nplans * sizeof(AppendAsyncState));
+    for (i = 0 ; i < nplans ; i++)
+        asyncstates[i] = ASYNCCHILD_READY;    /*     * create new AppendState for our append node     */
appendstate->ps.plan= (Plan *) node;    appendstate->ps.state = estate; 
+    appendstate->ps.fd = -1;    appendstate->appendplans = appendplanstates;
+    appendstate->async_state = asyncstates;    appendstate->as_nplans = nplans;
+    appendstate->evset = CreateWaitEventSet(CurrentMemoryContext,
+                                            list_length(node->appendplans));    /*     * Miscellaneous initialization
@@ -165,9 +175,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)    {        Plan       *initNode = (Plan
*)lfirst(lc);
 
+        /* always request async-execition for children */
+        eflags |= EXEC_FLAG_ASYNC;        appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
+
+        /*
+         * A child that can scan asynchronously sets a file descriptor for
+         * polling on them during initialization.
+         */
+        if (appendplanstates[i]->fd >= 0)
+        {
+            AddWaitEventToSet(appendstate->evset, WL_SOCKET_READABLE,
+                              appendplanstates[i]->fd, NULL,
+                              (void *)i);
+            has_async_child = true;
+        }        i++;    }
+    if (!has_async_child)
+    {
+        FreeWaitEventSet(appendstate->evset);
+        appendstate->evset = NULL;
+    }    /*     * initialize output tuple type
@@ -193,45 +222,86 @@ ExecInitAppend(Append *node, EState *estate, int eflags)TupleTableSlot *ExecAppend(AppendState
*node){
-    for (;;)
+    int n_notready = 1;
+
+    while (n_notready > 0)    {
-        PlanState  *subnode;        TupleTableSlot *result;
+        PlanState  *subnode;
+        int i, n;
-        /*
-         * figure out which subplan we are currently processing
-         */
-        subnode = node->appendplans[node->as_whichplan];
+        /* Scan the children in a round-robin policy. */
+        n_notready = 0;
+        n = node->as_whichplan;
+        for (i = 0 ; i < node->as_nplans ; i++, n++)
+        {
+            if (n >= node->as_nplans) n = 0;
-        /*
-         * get a tuple from the subplan
-         */
-        result = ExecProcNode(subnode);
+            if (node->async_state[n] != ASYNCCHILD_READY)
+            {
+                if (node->async_state[n] == ASYNCCHILD_NOT_READY)
+                    n_notready++;
+                continue;
+            }
+
+            subnode = node->appendplans[n];
+
+            result = ExecProcNode(subnode);
-        if (!TupIsNull(result))
-        {            /*             * If the subplan gave us something then return it as-is. We do             * NOT
makeuse of the result slot that was set up in             * ExecInitAppend; there's no need for it.             */
 
-            return result;
+            switch (subnode->exec_status)
+            {
+            case  EXEC_READY:
+                node->as_whichplan = n;
+                return result;
+
+            case  EXEC_NOT_READY:
+                node->async_state[n] = ASYNCCHILD_NOT_READY;
+                n_notready++;
+                break;
+
+            case EXEC_EOT:
+                node->async_state[n] = ASYNCCHILD_DONE;
+                break;
+
+            default:
+                elog(ERROR, "Unkown node status: %d", subnode->exec_status);
+            }                        }        /*
-         * Go on to the "next" subplan in the appropriate direction. If no
-         * more subplans, return the empty slot set up for us by
-         * ExecInitAppend.
+         * If we have any "not ready "children after no children can return a
+         * tuple, wait any of them to be ready.         */
-        if (ScanDirectionIsForward(node->ps.state->es_direction))
-            node->as_whichplan++;
-        else
-            node->as_whichplan--;
-        if (!exec_append_initialize_next(node))
-            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-        /* Else loop back and try to get a tuple from the new subplan */
+        if (n_notready > 0)
+        {
+            WaitEvent occurred_events[5];
+            int nevents;
+            int i;
+
+            nevents = WaitEventSetWait(node->evset, -1, occurred_events, 5);
+            Assert(nevents > 0);
+            for (i = 0 ; i < nevents ; i++)
+            {
+                int plannum = (int)occurred_events[i].user_data;
+                node->async_state[plannum] = ASYNCCHILD_READY;
+            }
+            node->as_whichplan = (int)occurred_events[0].user_data;
+            continue;
+        }
+    }
+
+    /* All children exhausted. Free the wait event set if exists */
+    if (node->evset)
+    {
+        FreeWaitEventSet(node->evset);
+        node->evset = NULL;    }
+    return NULL;}/* ----------------------------------------------------------------
@@ -271,6 +341,7 @@ ExecReScanAppend(AppendState *node)    {        PlanState  *subnode = node->appendplans[i];
+        node->async_state[i] = ASYNCCHILD_READY;        /*         * ExecReScan doesn't know about my subplans, so I
haveto do         * changed-parameter signaling myself.
 
diff --git a/src/backend/executor/nodeBitmapAnd.c b/src/backend/executor/nodeBitmapAnd.c
index c39d790..3942285 100644
--- a/src/backend/executor/nodeBitmapAnd.c
+++ b/src/backend/executor/nodeBitmapAnd.c
@@ -63,6 +63,7 @@ ExecInitBitmapAnd(BitmapAnd *node, EState *estate, int eflags)     */    bitmapandstate->ps.plan =
(Plan*) node;    bitmapandstate->ps.state = estate;
 
+    bitmapandstate->ps.fd = -1;    bitmapandstate->bitmapplans = bitmapplanstates;    bitmapandstate->nplans =
nplans;
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 449aacb..cc89d56 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -556,6 +556,7 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)    scanstate =
makeNode(BitmapHeapScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->tbm = NULL;    scanstate->tbmiterator = NULL;
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index a364098..d799292 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -206,6 +206,7 @@ ExecInitBitmapIndexScan(BitmapIndexScan *node, EState *estate, int eflags)    indexstate =
makeNode(BitmapIndexScanState);   indexstate->ss.ps.plan = (Plan *) node;    indexstate->ss.ps.state = estate;
 
+    indexstate->ss.ps.fd = -1;    /* normally we don't make the result bitmap till runtime */
indexstate->biss_result= NULL;
 
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 7e928eb..5f06ce9 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -64,6 +64,7 @@ ExecInitBitmapOr(BitmapOr *node, EState *estate, int eflags)     */    bitmaporstate->ps.plan = (Plan
*)node;    bitmaporstate->ps.state = estate;
 
+    bitmaporstate->ps.fd = -1;    bitmaporstate->bitmapplans = bitmapplanstates;    bitmaporstate->nplans = nplans;
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index 3c2f684..6f09853 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -191,6 +191,7 @@ ExecInitCteScan(CteScan *node, EState *estate, int eflags)    scanstate = makeNode(CteScanState);
scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->eflags = eflags;    scanstate->cte_table = NULL;    scanstate->eof_cte =
false;
diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c
index 322abca..e825001 100644
--- a/src/backend/executor/nodeCustom.c
+++ b/src/backend/executor/nodeCustom.c
@@ -44,6 +44,7 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags)    /* fill up fields of ScanState
*/   css->ss.ps.plan = &cscan->scan.plan;    css->ss.ps.state = estate;
 
+    css->ss.ps.fd = -1;    /* create expression context for node */    ExecAssignExprContext(estate, &css->ss.ps);
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 300f947..4079529 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -144,6 +144,7 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)    scanstate =
makeNode(ForeignScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index a03f6e7..7d508da 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -299,6 +299,7 @@ ExecInitFunctionScan(FunctionScan *node, EState *estate, int eflags)    scanstate =
makeNode(FunctionScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->eflags = eflags;    /*
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 3834ed6..60a1598 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,6 +46,88 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate);static HeapTuple
gather_readnext(GatherState*gatherstate);static void ExecShutdownGatherWorkers(GatherState *node);
 
+/* ----------------------------------------------------------------
+ *        StartGather
+ *
+ *        Gather node can have an advantage from asynchronous execution in most
+ *        cases because of its startup cost.
+ *        ----------------------------------------------------------------
+ */
+bool
+ExecStartGather(GatherState *node)
+{
+    EState       *estate = node->ps.state;
+    Gather       *gather = (Gather *) node->ps.plan;
+    TupleTableSlot *fslot = node->funnel_slot;
+    int i;
+
+    /* Don't start if already started or explicitly inhibited by the upper */
+    if (node->initialized || !node->early_start)
+        return false;
+
+    /*
+     * Initialize the parallel context and workers on first execution. We do
+     * this on first execution rather than during node initialization, as it
+     * needs to allocate large dynamic segment, so it is better to do if it
+     * is really needed.
+     */
+
+    /*
+     * Sometimes we might have to run without parallelism; but if
+     * parallel mode is active then we can try to fire up some workers.
+     */
+    if (gather->num_workers > 0 && IsInParallelMode())
+    {
+        ParallelContext *pcxt;
+        bool    got_any_worker = false;
+
+        /* Initialize the workers required to execute Gather node. */
+        if (!node->pei)
+            node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                             estate,
+                                             gather->num_workers);
+
+        /*
+         * Register backend workers. We might not get as many as we
+         * requested, or indeed any at all.
+         */
+        pcxt = node->pei->pcxt;
+        LaunchParallelWorkers(pcxt);
+
+        /* Set up tuple queue readers to read the results. */
+        if (pcxt->nworkers > 0)
+        {
+            node->nreaders = 0;
+            node->reader =
+                palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+            for (i = 0; i < pcxt->nworkers; ++i)
+            {
+                if (pcxt->worker[i].bgwhandle == NULL)
+                    continue;
+
+                shm_mq_set_handle(node->pei->tqueue[i],
+                                  pcxt->worker[i].bgwhandle);
+                node->reader[node->nreaders++] =
+                    CreateTupleQueueReader(node->pei->tqueue[i],
+                                           fslot->tts_tupleDescriptor);
+                got_any_worker = true;
+            }
+        }
+
+        /* No workers?  Then never mind. */
+        if (!got_any_worker)
+            ExecShutdownGatherWorkers(node);
+    }
+
+    /* Run plan locally if no workers or not single-copy. */
+    node->need_to_scan_locally = (node->reader == NULL)
+        || !gather->single_copy;
+
+    node->early_start = false;
+    node->initialized = true;
+    return false;
+}/* ---------------------------------------------------------------- *        ExecInitGather
@@ -58,6 +140,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)    Plan       *outerNode;    bool
hasoid;   TupleDesc    tupDesc;
 
+    int            child_eflags;    /* Gather node doesn't have innerPlan node. */    Assert(innerPlan(node) ==
NULL);
@@ -68,6 +151,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)    gatherstate = makeNode(GatherState);
gatherstate->ps.plan= (Plan *) node;    gatherstate->ps.state = estate;
 
+    gatherstate->ps.fd = -1;    gatherstate->need_to_scan_locally = !node->single_copy;    /*
@@ -97,7 +181,12 @@ ExecInitGather(Gather *node, EState *estate, int eflags)     * now initialize outer plan     */
outerNode= outerPlan(node);
 
-    outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+    /*
+     * This outer plan is executed in another process so don't start
+     * asynchronously in this process
+     */
+    child_eflags = eflags & ~EXEC_FLAG_ASYNC;
+    outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);    gatherstate->ps.ps_TupFromTlist =
false;
@@ -115,6 +204,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)    tupDesc =
ExecTypeFromTL(outerNode->targetlist,hasoid);    ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
 
+    /*
+     * Register asynchronous execution callback for this node. Backend workers
+     * needs to allocate large dynamic segment, and it is better to execute
+     * them at the time of first execution from this aspect. So asynchronous
+     * execution should be decided considering that but we omit the aspect for
+     * now.
+     */
+    if (eflags & EXEC_FLAG_ASYNC)
+        gatherstate->early_start = true;
+    return gatherstate;}
@@ -128,74 +227,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)TupleTableSlot *ExecGather(GatherState
*node){
-    TupleTableSlot *fslot = node->funnel_slot;
-    int            i;    TupleTableSlot *slot;    TupleTableSlot *resultSlot;    ExprDoneCond isDone;    ExprContext
*econtext;
-    /*
-     * Initialize the parallel context and workers on first execution. We do
-     * this on first execution rather than during node initialization, as it
-     * needs to allocate large dynamic segment, so it is better to do if it
-     * is really needed.
-     */
+    /* Initialize workers if not yet. */    if (!node->initialized)
-    {
-        EState       *estate = node->ps.state;
-        Gather       *gather = (Gather *) node->ps.plan;
-
-        /*
-         * Sometimes we might have to run without parallelism; but if
-         * parallel mode is active then we can try to fire up some workers.
-         */
-        if (gather->num_workers > 0 && IsInParallelMode())
-        {
-            ParallelContext *pcxt;
-
-            /* Initialize the workers required to execute Gather node. */
-            if (!node->pei)
-                node->pei = ExecInitParallelPlan(node->ps.lefttree,
-                                                 estate,
-                                                 gather->num_workers);
-
-            /*
-             * Register backend workers. We might not get as many as we
-             * requested, or indeed any at all.
-             */
-            pcxt = node->pei->pcxt;
-            LaunchParallelWorkers(pcxt);
-            node->nworkers_launched = pcxt->nworkers_launched;
-
-            /* Set up tuple queue readers to read the results. */
-            if (pcxt->nworkers_launched > 0)
-            {
-                node->nreaders = 0;
-                node->reader =
-                    palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
-
-                for (i = 0; i < pcxt->nworkers_launched; ++i)
-                {
-                    shm_mq_set_handle(node->pei->tqueue[i],
-                                      pcxt->worker[i].bgwhandle);
-                    node->reader[node->nreaders++] =
-                        CreateTupleQueueReader(node->pei->tqueue[i],
-                                               fslot->tts_tupleDescriptor);
-                }
-            }
-            else
-            {
-                /* No workers?  Then never mind. */
-                ExecShutdownGatherWorkers(node);
-            }
-        }
-
-        /* Run plan locally if no workers or not single-copy. */
-        node->need_to_scan_locally = (node->reader == NULL)
-            || !gather->single_copy;
-        node->initialized = true;
-    }
+        ExecStartGather(node);    /*     * Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeGroup.c b/src/backend/executor/nodeGroup.c
index dcf5175..33093e7 100644
--- a/src/backend/executor/nodeGroup.c
+++ b/src/backend/executor/nodeGroup.c
@@ -207,6 +207,7 @@ ExecInitGroup(Group *node, EState *estate, int eflags)    grpstate = makeNode(GroupState);
grpstate->ss.ps.plan= (Plan *) node;    grpstate->ss.ps.state = estate;
 
+    grpstate->ss.ps.fd = -1;    grpstate->grp_done = FALSE;    /*
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 9ed09a7..f62b556 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -172,6 +172,7 @@ ExecInitHash(Hash *node, EState *estate, int eflags)    hashstate = makeNode(HashState);
hashstate->ps.plan= (Plan *) node;    hashstate->ps.state = estate;
 
+    hashstate->ps.fd = -1;    hashstate->hashtable = NULL;    hashstate->hashkeys = NIL;    /* will be set by parent
HashJoin*/
 
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 369e666..ec54570 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -451,6 +451,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)    hjstate = makeNode(HashJoinState);
 hjstate->js.ps.plan = (Plan *) node;    hjstate->js.ps.state = estate;
 
+    hjstate->js.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 4f6f91c..94b0193 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -403,6 +403,7 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)    indexstate =
makeNode(IndexOnlyScanState);   indexstate->ss.ps.plan = (Plan *) node;    indexstate->ss.ps.state = estate;
 
+    indexstate->ss.ps.fd = -1;    indexstate->ioss_HeapFetches = 0;    /*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index bf16cb1..1beee6f 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -829,6 +829,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)    indexstate =
makeNode(IndexScanState);   indexstate->ss.ps.plan = (Plan *) node;    indexstate->ss.ps.state = estate;
 
+    indexstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index faf32e1..6baf1c0 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -384,6 +384,7 @@ ExecInitLimit(Limit *node, EState *estate, int eflags)    limitstate = makeNode(LimitState);
limitstate->ps.plan= (Plan *) node;    limitstate->ps.state = estate;
 
+    limitstate->ps.fd = -1;    limitstate->lstate = LIMIT_INITIAL;
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 4ebcaff..42b2ff5 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -361,6 +361,7 @@ ExecInitLockRows(LockRows *node, EState *estate, int eflags)    lrstate = makeNode(LockRowsState);
 lrstate->ps.plan = (Plan *) node;    lrstate->ps.state = estate;
 
+    lrstate->ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 9ab03f3..db8279a 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -171,6 +171,7 @@ ExecInitMaterial(Material *node, EState *estate, int eflags)    matstate = makeNode(MaterialState);
  matstate->ss.ps.plan = (Plan *) node;    matstate->ss.ps.state = estate;
 
+    matstate->ss.ps.fd = -1;    /*     * We must have a tuplestore buffering the subplan output to do backward
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..c5323d7 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -83,6 +83,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)     */    mergestate->ps.plan =
(Plan*) node;    mergestate->ps.state = estate;
 
+    mergestate->ps.fd = -1;    mergestate->mergeplans = mergeplanstates;    mergestate->ms_nplans = nplans;
@@ -112,6 +113,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)    {        Plan       *initNode
=(Plan *) lfirst(lc);
 
+        /* always request async execution for now */
+        eflags = eflags | EXEC_FLAG_ASYNC;
+        mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);        i++;    }
diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 6db09b8..27ac84e 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -1485,6 +1485,7 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)    mergestate =
makeNode(MergeJoinState);   mergestate->js.ps.plan = (Plan *) node;    mergestate->js.ps.state = estate;
 
+    mergestate->js.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index e62c8aa..78df2e4 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1561,6 +1561,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)    mtstate->ps.plan = (Plan *)
node;   mtstate->ps.state = estate;    mtstate->ps.targetlist = NIL;        /* not actually used */
 
+    mtstate->ps.fd = -1;    mtstate->operation = operation;    mtstate->canSetTag = node->canSetTag;
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..c262d7f 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -309,6 +309,7 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)    nlstate = makeNode(NestLoopState);
 nlstate->js.ps.plan = (Plan *) node;    nlstate->js.ps.state = estate;
 
+    nlstate->js.ps.fd = -1;    /*     * Miscellaneous initialization
@@ -340,11 +341,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)     * inner child, because it will
alwaysbe rescanned with fresh parameter     * values.     */
 
+
+    /*
+     * async execution of outer plan is benetifical if this join is requested
+     * as async
+     */    outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);    if (node->nestParams == NIL)
   eflags |= EXEC_FLAG_REWIND;    else        eflags &= ~EXEC_FLAG_REWIND;
 
+
+    /*
+     * Async execution of the inner is inhibited if parameterized by the
+     * outer
+     */
+    if (list_length(node->nestParams) > 0)
+        eflags &= ~ EXEC_FLAG_ASYNC;
+    innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);    /*
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index e76405a..48a70cb 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -176,6 +176,7 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)    rustate =
makeNode(RecursiveUnionState);   rustate->ps.plan = (Plan *) node;    rustate->ps.state = estate;
 
+    rustate->ps.fd = -1;    rustate->eqfunctions = NULL;    rustate->hashfunctions = NULL;
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 4007b76..027b64e 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -217,6 +217,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)    resstate = makeNode(ResultState);
resstate->ps.plan= (Plan *) node;    resstate->ps.state = estate;
 
+    resstate->ps.fd = -1;    resstate->rs_done = false;    resstate->rs_checkqual = (node->resconstantqual == NULL) ?
false: true;
 
diff --git a/src/backend/executor/nodeSamplescan.c b/src/backend/executor/nodeSamplescan.c
index 9ce7c02..a670e77 100644
--- a/src/backend/executor/nodeSamplescan.c
+++ b/src/backend/executor/nodeSamplescan.c
@@ -152,6 +152,7 @@ ExecInitSampleScan(SampleScan *node, EState *estate, int eflags)    scanstate =
makeNode(SampleScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..86a3015 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,20 @@ static TupleTableSlot *SeqNext(SeqScanState *node); *
----------------------------------------------------------------*/
 
+bool
+ExecStartSeqScan(SeqScanState *node)
+{
+    if (node->early_start)
+    {
+        ereport(LOG,
+                (errmsg("dummy_async_cb is called for %p@ExecStartSeqScan", node),
+                 errhidestmt(true)));
+        node->early_start = false;
+    }
+
+    return false;
+}
+/* ---------------------------------------------------------------- *        SeqNext *
@@ -177,6 +191,7 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)    scanstate = makeNode(SeqScanState);
scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
@@ -214,6 +229,10 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL(&scanstate->ss.ps);   ExecAssignScanProjectionInfo(&scanstate->ss);
 
+    /*  Do early-start when requested */
+    if (eflags & EXEC_FLAG_ASYNC)
+        scanstate->early_start = true;
+    return scanstate;}
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 2d81d46..8eafd91 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -487,6 +487,7 @@ ExecInitSetOp(SetOp *node, EState *estate, int eflags)    setopstate = makeNode(SetOpState);
setopstate->ps.plan= (Plan *) node;    setopstate->ps.state = estate;
 
+    setopstate->ps.fd = -1;    setopstate->eqfunctions = NULL;    setopstate->hashfunctions = NULL;
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a34dcc5..f28dc2d 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -162,6 +162,7 @@ ExecInitSort(Sort *node, EState *estate, int eflags)    sortstate = makeNode(SortState);
sortstate->ss.ps.plan= (Plan *) node;    sortstate->ss.ps.state = estate;
 
+    sortstate->ss.ps.fd = -1;    /*     * We must have random access to the sort output to do backward scan or
diff --git a/src/backend/executor/nodeSubqueryscan.c b/src/backend/executor/nodeSubqueryscan.c
index 0304b15..c2b9bb0 100644
--- a/src/backend/executor/nodeSubqueryscan.c
+++ b/src/backend/executor/nodeSubqueryscan.c
@@ -117,6 +117,7 @@ ExecInitSubqueryScan(SubqueryScan *node, EState *estate, int eflags)    subquerystate =
makeNode(SubqueryScanState);   subquerystate->ss.ps.plan = (Plan *) node;    subquerystate->ss.ps.state = estate;
 
+    subquerystate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index 2604103..41d69c3 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -461,6 +461,7 @@ ExecInitTidScan(TidScan *node, EState *estate, int eflags)    tidstate = makeNode(TidScanState);
tidstate->ss.ps.plan= (Plan *) node;    tidstate->ss.ps.state = estate;
 
+    tidstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeUnique.c b/src/backend/executor/nodeUnique.c
index 4caae34..56c21e8 100644
--- a/src/backend/executor/nodeUnique.c
+++ b/src/backend/executor/nodeUnique.c
@@ -122,6 +122,7 @@ ExecInitUnique(Unique *node, EState *estate, int eflags)    uniquestate = makeNode(UniqueState);
uniquestate->ps.plan= (Plan *) node;    uniquestate->ps.state = estate;
 
+    uniquestate->ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeValuesscan.c b/src/backend/executor/nodeValuesscan.c
index 2c4bd9c..2ec3ed7 100644
--- a/src/backend/executor/nodeValuesscan.c
+++ b/src/backend/executor/nodeValuesscan.c
@@ -205,6 +205,7 @@ ExecInitValuesScan(ValuesScan *node, EState *estate, int eflags)    scanstate =
makeNode(ValuesScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    /*     * Miscellaneous initialization
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index f06eebe..bc5b9ce 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1787,6 +1787,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)    winstate =
makeNode(WindowAggState);   winstate->ss.ps.plan = (Plan *) node;    winstate->ss.ps.state = estate;
 
+    winstate->ss.ps.fd = -1;    /*     * Create expression contexts.  We need two, one for per-input-tuple
diff --git a/src/backend/executor/nodeWorktablescan.c b/src/backend/executor/nodeWorktablescan.c
index cfed6e6..230c849 100644
--- a/src/backend/executor/nodeWorktablescan.c
+++ b/src/backend/executor/nodeWorktablescan.c
@@ -144,6 +144,7 @@ ExecInitWorkTableScan(WorkTableScan *node, EState *estate, int eflags)    scanstate =
makeNode(WorkTableScanState);   scanstate->ss.ps.plan = (Plan *) node;    scanstate->ss.ps.state = estate;
 
+    scanstate->ss.ps.fd = -1;    scanstate->rustate = NULL;    /* we'll set this later */    /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 44fac27..de78d04 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@#define EXEC_FLAG_WITH_OIDS        0x0020    /* force OIDs in returned tuples */#define
EXEC_FLAG_WITHOUT_OIDS   0x0040    /* force no OIDs in returned tuples */#define EXEC_FLAG_WITH_NO_DATA    0x0080    /*
relscannability doesn't matter */
 
+#define EXEC_FLAG_ASYNC            0x0100    /* request asynchronous execution *//*
@@ -224,6 +225,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);extern PlanState *ExecInitNode(Plan *node, EState
*estate,int eflags);extern TupleTableSlot *ExecProcNode(PlanState *node);extern Node *MultiExecProcNode(PlanState
*node);
+extern bool ExecStartNode(PlanState *node);extern void ExecEndNode(PlanState *node);extern bool
ExecShutdownNode(PlanState*node);
 
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index f76d9be..0a48a03 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -18,6 +18,7 @@extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);extern TupleTableSlot
*ExecGather(GatherState*node);
 
+extern bool ExecStartGather(GatherState *node);extern void ExecEndGather(GatherState *node);extern void
ExecShutdownGather(GatherState*node);extern void ExecReScanGather(GatherState *node);
 
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index f2e61ff..daf54ac 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -19,6 +19,7 @@extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);extern TupleTableSlot
*ExecSeqScan(SeqScanState*node);
 
+extern bool ExecStartSeqScan(SeqScanState *node);extern void ExecEndSeqScan(SeqScanState *node);extern void
ExecReScanSeqScan(SeqScanState*node);
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ee4e189..205a2c8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -20,6 +20,7 @@#include "lib/pairingheap.h"#include "nodes/params.h"#include "nodes/plannodes.h"
+#include "storage/latch.h"#include "utils/reltrigger.h"#include "utils/sortsupport.h"#include "utils/tuplestore.h"
@@ -345,6 +346,14 @@ typedef struct ResultRelInfo    List       *ri_onConflictSetWhere;} ResultRelInfo;
+/* Enum for async awareness */
+typedef enum NodeStatus
+{
+    EXEC_NOT_READY,
+    EXEC_READY,
+    EXEC_EOT
+} NodeStatus;
+/* ---------------- *      EState information *
@@ -1059,6 +1068,9 @@ typedef struct PlanState    ProjectionInfo *ps_ProjInfo;    /* info for doing tuple projection */
  bool        ps_TupFromTlist;/* state flag for processing set-valued                                 * functions in
targetlist*/
 
+
+    NodeStatus    exec_status;
+    int            fd;} PlanState;/* ----------------
@@ -1138,6 +1150,14 @@ typedef struct ModifyTableState                                         * target */}
ModifyTableState;
+
+typedef enum AppendAsyncState
+{
+    ASYNCCHILD_READY,
+    ASYNCCHILD_NOT_READY,
+    ASYNCCHILD_DONE
+} AppendAsyncState;
+/* ---------------- *     AppendState information *
@@ -1149,8 +1169,10 @@ typedef struct AppendState{    PlanState    ps;                /* its first field is NodeTag */
 PlanState **appendplans;    /* array of PlanStates for my inputs */
 
+    AppendAsyncState   *async_state;    int            as_nplans;    int            as_whichplan;
+    WaitEventSet *evset;} AppendState;/* ----------------
@@ -1259,6 +1281,7 @@ typedef struct SeqScanState{    ScanState    ss;                /* its first field is NodeTag */
 Size        pscan_len;        /* size of parallel heap scan descriptor */ 
+    bool        early_start;} SeqScanState;/* ----------------
@@ -1952,6 +1975,7 @@ typedef struct UniqueStatetypedef struct GatherState{    PlanState    ps;                /* its
firstfield is NodeTag */
 
+    bool        early_start;    bool        initialized;    struct ParallelExecutorInfo *pei;    int
nreaders;

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Benedikt Grundmann
Дата:
Сообщение: Re: between not propated into a simple equality join
Следующее
От: Simon Riggs
Дата:
Сообщение: Re: HeapTupleSatisfiesToast() busted? (was atomic pin/unpin causing errors)