Обсуждение: Append with naive multiplexing of FDWs

Поиск
Список
Период
Сортировка

Append with naive multiplexing of FDWs

От
Thomas Munro
Дата:
Hello,

A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding.  I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.

Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better.  It's a very localised feature that works only between Append
and its immediate children.  The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.

Here's a quick rebase of that old POC patch, along with a demo.  Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.

=== demo ===

create table t (a text, b text);

create or replace function slow_data(name text) returns setof t as
$$
begin
  perform pg_sleep(random());
  return query select name, generate_series(1, 100)::text as i;
end;
$$
language plpgsql;

create view t1 as select * from slow_data('t1');
create view t2 as select * from slow_data('t2');
create view t3 as select * from slow_data('t3');

create extension postgres_fdw;
create server server1 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create server server2 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create server server3 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create user mapping for current_user server server1;
create user mapping for current_user server server2;
create user mapping for current_user server server3;
create foreign table ft1 (a text, b text) server server1 options
(table_name 't1');
create foreign table ft2 (a text, b text) server server2 options
(table_name 't2');
create foreign table ft3 (a text, b text) server server3 options
(table_name 't3');

-- create three remote shards
create table pt (a text, b text) partition by list (a);
alter table pt attach partition ft1 for values in ('ft1');
alter table pt attach partition ft2 for values in ('ft2');
alter table pt attach partition ft3 for values in ('ft3');

-- see that tuples come back in the order that they're ready

select * from pt where b like '42';

[1] https://www.postgresql.org/message-id/CAEepm%3D1CuAWfxDk%3D%3DjZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA%40mail.gmail.com

--
Thomas Munro
https://enterprisedb.com

Вложения

Re: Append with naive multiplexing of FDWs

От
Bruce Momjian
Дата:
On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> Hello,
> 
> A few years back[1] I experimented with a simple readiness API that
> would allow Append to start emitting tuples from whichever Foreign
> Scan has data available, when working with FDW-based sharding.  I used
> that primarily as a way to test Andres's new WaitEventSet stuff and my
> kqueue implementation of that, but I didn't pursue it seriously
> because I knew we wanted a more ambitious async executor rewrite and
> many people had ideas about that, with schedulers capable of jumping
> all over the tree etc.
> 
> Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> asked why we don't just do this naive thing until we have something
> better.  It's a very localised feature that works only between Append
> and its immediate children.  The patch makes it work for postgres_fdw,
> but it should work for any FDW that can get its hands on a socket.
> 
> Here's a quick rebase of that old POC patch, along with a demo.  Since
> 2016, Parallel Append landed, but I didn't have time to think about
> how to integrate with that so I did a quick "sledgehammer" rebase that
> disables itself if parallelism is in the picture.

Yes, sharding has been waiting on parallel FDW scans.  Would this work
for parallel partition scans if the partitions were FDWs?

-- 
  Bruce Momjian  <bruce@momjian.us>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +



Re: Append with naive multiplexing of FDWs

От
Thomas Munro
Дата:
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
> On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > A few years back[1] I experimented with a simple readiness API that
> > would allow Append to start emitting tuples from whichever Foreign
> > Scan has data available, when working with FDW-based sharding.  I used
> > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > kqueue implementation of that, but I didn't pursue it seriously
> > because I knew we wanted a more ambitious async executor rewrite and
> > many people had ideas about that, with schedulers capable of jumping
> > all over the tree etc.
> >
> > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > asked why we don't just do this naive thing until we have something
> > better.  It's a very localised feature that works only between Append
> > and its immediate children.  The patch makes it work for postgres_fdw,
> > but it should work for any FDW that can get its hands on a socket.
> >
> > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > 2016, Parallel Append landed, but I didn't have time to think about
> > how to integrate with that so I did a quick "sledgehammer" rebase that
> > disables itself if parallelism is in the picture.
>
> Yes, sharding has been waiting on parallel FDW scans.  Would this work
> for parallel partition scans if the partitions were FDWs?

Yeah, this works for partitions that are FDWs (as shown), but only for
Append, not for Parallel Append.  So you'd have parallelism in the
sense that your N remote shard servers are all doing stuff at the same
time, but it couldn't be in a parallel query on your 'home' server,
which is probably good for things that push down aggregation and bring
back just a few tuples from each shard, but bad for anything wanting
to ship back millions of tuples to chew on locally.  Do you think
that'd be useful enough on its own?

The problem is that parallel safe non-partial plans (like postgres_fdw
scans) are exclusively 'claimed' by one process under Parallel Append,
so with the patch as posted, if you modify it to allow parallelism
then it'll probably give correct answers but nothing prevents a single
process from claiming and starting all the scans and then waiting for
them to be ready, while the other processes miss out on doing any work
at all.  There's probably some kludgy solution involving not letting
any one worker start more than X, and some space cadet solution
involving passing sockets around and teaching libpq to hand over
connections at certain controlled phases of the protocol (due to lack
of threads), but nothing like that has jumped out as the right path so
far.

One idea that seems promising but requires a bunch more infrastructure
is to offload the libpq multiplexing to a background worker that owns
all the sockets, and have it push tuples into a multi-consumer shared
memory queue that regular executor processes could read from.  I have
been wondering if that would be best done by each FDW implementation,
or if there is a way to make a generic infrastructure for converting
parallel-safe executor nodes into partial plans by the use of a
'Scatter' (opposite of Gather) node that can spread the output of any
node over many workers.

If you had that, you'd still want a way for Parallel Append to be
readiness-based, but it would probably look a bit different to this
patch because it'd need to use (vapourware) multiconsumer shm queue
readiness, not fd readiness.  And another kind of fd-readiness
multiplexing would be going on inside the new (vapourware) worker that
handles all the libpq connections (and maybe other kinds of work for
other FDWs that are able to expose a socket).



Re: Append with naive multiplexing of FDWs

От
Bruce Momjian
Дата:
On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
> On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
> > On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > > A few years back[1] I experimented with a simple readiness API that
> > > would allow Append to start emitting tuples from whichever Foreign
> > > Scan has data available, when working with FDW-based sharding.  I used
> > > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > > kqueue implementation of that, but I didn't pursue it seriously
> > > because I knew we wanted a more ambitious async executor rewrite and
> > > many people had ideas about that, with schedulers capable of jumping
> > > all over the tree etc.
> > >
> > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > > asked why we don't just do this naive thing until we have something
> > > better.  It's a very localised feature that works only between Append
> > > and its immediate children.  The patch makes it work for postgres_fdw,
> > > but it should work for any FDW that can get its hands on a socket.
> > >
> > > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > > 2016, Parallel Append landed, but I didn't have time to think about
> > > how to integrate with that so I did a quick "sledgehammer" rebase that
> > > disables itself if parallelism is in the picture.
> >
> > Yes, sharding has been waiting on parallel FDW scans.  Would this work
> > for parallel partition scans if the partitions were FDWs?
> 
> Yeah, this works for partitions that are FDWs (as shown), but only for
> Append, not for Parallel Append.  So you'd have parallelism in the
> sense that your N remote shard servers are all doing stuff at the same
> time, but it couldn't be in a parallel query on your 'home' server,
> which is probably good for things that push down aggregation and bring
> back just a few tuples from each shard, but bad for anything wanting
> to ship back millions of tuples to chew on locally.  Do you think
> that'd be useful enough on its own?

Yes, I think so.  There are many data warehouse queries that want to
return only aggregate values, or filter for a small number of rows. 
Even OLTP queries might return only a few rows from multiple partitions.
This would allow for a proof-of-concept implementation so we can see how
realistic this approach is.

> The problem is that parallel safe non-partial plans (like postgres_fdw
> scans) are exclusively 'claimed' by one process under Parallel Append,
> so with the patch as posted, if you modify it to allow parallelism
> then it'll probably give correct answers but nothing prevents a single
> process from claiming and starting all the scans and then waiting for
> them to be ready, while the other processes miss out on doing any work
> at all.  There's probably some kludgy solution involving not letting
> any one worker start more than X, and some space cadet solution
> involving passing sockets around and teaching libpq to hand over
> connections at certain controlled phases of the protocol (due to lack
> of threads), but nothing like that has jumped out as the right path so
> far.

I am unclear how many queries can do any meaningful work until all
shards have giving their full results.

-- 
  Bruce Momjian  <bruce@momjian.us>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +



Re: Append with naive multiplexing of FDWs

От
Kyotaro Horiguchi
Дата:
Hello.

At Sat, 30 Nov 2019 14:26:11 -0500, Bruce Momjian <bruce@momjian.us> wrote in 
> On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
> > On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
> > > On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > > > A few years back[1] I experimented with a simple readiness API that
> > > > would allow Append to start emitting tuples from whichever Foreign
> > > > Scan has data available, when working with FDW-based sharding.  I used
> > > > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > > > kqueue implementation of that, but I didn't pursue it seriously
> > > > because I knew we wanted a more ambitious async executor rewrite and
> > > > many people had ideas about that, with schedulers capable of jumping
> > > > all over the tree etc.
> > > >
> > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > > > asked why we don't just do this naive thing until we have something
> > > > better.  It's a very localised feature that works only between Append
> > > > and its immediate children.  The patch makes it work for postgres_fdw,
> > > > but it should work for any FDW that can get its hands on a socket.
> > > >
> > > > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > > > 2016, Parallel Append landed, but I didn't have time to think about
> > > > how to integrate with that so I did a quick "sledgehammer" rebase that
> > > > disables itself if parallelism is in the picture.
> > >
> > > Yes, sharding has been waiting on parallel FDW scans.  Would this work
> > > for parallel partition scans if the partitions were FDWs?
> > 
> > Yeah, this works for partitions that are FDWs (as shown), but only for
> > Append, not for Parallel Append.  So you'd have parallelism in the
> > sense that your N remote shard servers are all doing stuff at the same
> > time, but it couldn't be in a parallel query on your 'home' server,
> > which is probably good for things that push down aggregation and bring
> > back just a few tuples from each shard, but bad for anything wanting
> > to ship back millions of tuples to chew on locally.  Do you think
> > that'd be useful enough on its own?
> 
> Yes, I think so.  There are many data warehouse queries that want to
> return only aggregate values, or filter for a small number of rows. 
> Even OLTP queries might return only a few rows from multiple partitions.
> This would allow for a proof-of-concept implementation so we can see how
> realistic this approach is.
> 
> > The problem is that parallel safe non-partial plans (like postgres_fdw
> > scans) are exclusively 'claimed' by one process under Parallel Append,
> > so with the patch as posted, if you modify it to allow parallelism
> > then it'll probably give correct answers but nothing prevents a single
> > process from claiming and starting all the scans and then waiting for
> > them to be ready, while the other processes miss out on doing any work
> > at all.  There's probably some kludgy solution involving not letting
> > any one worker start more than X, and some space cadet solution
> > involving passing sockets around and teaching libpq to hand over
> > connections at certain controlled phases of the protocol (due to lack
> > of threads), but nothing like that has jumped out as the right path so
> > far.
> 
> I am unclear how many queries can do any meaningful work until all
> shards have giving their full results.

There's my pending (somewhat stale) patch, which allows to run local
scans while waiting for remote servers.

https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp

I (or we) wanted to introduce the asynchronous node mechanism as the
basis of async-capable postgres_fdw. The reason why it is stopping is
that we are seeing and I am waiting the executor change that makes
executor push-up style, on which the async-node mechanism will be
constructed. If that won't happen shortly, I'd like to continue that
work..

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Append with naive multiplexing of FDWs

От
Thomas Munro
Дата:
On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
> There's my pending (somewhat stale) patch, which allows to run local
> scans while waiting for remote servers.
>
> https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp
>
> I (or we) wanted to introduce the asynchronous node mechanism as the
> basis of async-capable postgres_fdw. The reason why it is stopping is
> that we are seeing and I am waiting the executor change that makes
> executor push-up style, on which the async-node mechanism will be
> constructed. If that won't happen shortly, I'd like to continue that
> work..

After rereading some threads to remind myself what happened here...
right, my little patch began life in March 2016[1] when I wanted a
test case to test Andres's work on WaitEventSets, and your patch set
started a couple of months later and is vastly more ambitious[2][3].
It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF
model.  And I totally agree that there are lots of reason to want to
do that (including yielding to other parts of the plan instead of
waiting for I/O, locks and some parallelism primitives enabling new
kinds of parallelism), and I'm hoping to help with some small pieces
of that if I can.

My patch set (rebased upthread) was extremely primitive, with no new
planner concepts, and added only a very simple new executor node
method: ExecReady().  Append used that to try to ask its children if
they'd like some time to warm up.  By default, ExecReady() says "I
don't know what you're talking about, go away", but FDWs can provide
an implementation that says "yes, please call me again when this fd is
ready" or "yes, I am ready, please call ExecProc() now".  It doesn't
deal with anything more complicated than that, and in particular it
doesn't work if there are extra planner nodes in between Append and
the foreign scan.  (It also doesn't mix particularly well with
parallelism, as mentioned.)

The reason I reposted this unambitious work is because Stephen keeps
asking me why we don't consider the stupidly simple thing that would
help with simple foreign partition-based queries today, instead of
waiting for someone to redesign the entire executor, because that's
... really hard.

[1] https://www.postgresql.org/message-id/CAEepm%3D1CuAWfxDk%3D%3DjZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA%40mail.gmail.com
[2]
https://www.postgresql.org/message-id/flat/CA%2BTgmobx8su_bYtAa3DgrqB%2BR7xZG6kHRj0ccMUUshKAQVftww%40mail.gmail.com
[3]
https://www.postgresql.org/message-id/flat/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com



Re: Append with naive multiplexing of FDWs

От
Bruce Momjian
Дата:
On Thu, Dec  5, 2019 at 05:45:24PM +1300, Thomas Munro wrote:
> My patch set (rebased upthread) was extremely primitive, with no new
> planner concepts, and added only a very simple new executor node
> method: ExecReady().  Append used that to try to ask its children if
> they'd like some time to warm up.  By default, ExecReady() says "I
> don't know what you're talking about, go away", but FDWs can provide
> an implementation that says "yes, please call me again when this fd is
> ready" or "yes, I am ready, please call ExecProc() now".  It doesn't
> deal with anything more complicated than that, and in particular it
> doesn't work if there are extra planner nodes in between Append and
> the foreign scan.  (It also doesn't mix particularly well with
> parallelism, as mentioned.)
> 
> The reason I reposted this unambitious work is because Stephen keeps
> asking me why we don't consider the stupidly simple thing that would
> help with simple foreign partition-based queries today, instead of
> waiting for someone to redesign the entire executor, because that's
> ... really hard.

I agree with Stephen's request.  We have been waiting for the executor
rewrite for a while, so let's just do something simple and see how it
performs.

-- 
  Bruce Momjian  <bruce@momjian.us>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +



Re: Append with naive multiplexing of FDWs

От
Robert Haas
Дата:
On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote:
> I agree with Stephen's request.  We have been waiting for the executor
> rewrite for a while, so let's just do something simple and see how it
> performs.

I'm sympathetic to the frustration here, and I think it would be great
if we could find a way forward that doesn't involve waiting for a full
rewrite of the executor.  However, I seem to remember that when we
tested the various patches that various people had written for this
feature (I wrote one, too) they all had a noticeable performance
penalty in the case of a plain old Append that involved no FDWs and
nothing asynchronous. I don't think it's OK to have, say, a 2%
regression on every query that involves an Append, because especially
now that we have partitioning, that's a lot of queries.

I don't know whether this patch has that kind of problem. If it
doesn't, I would consider that a promising sign.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company



Re: Append with naive multiplexing of FDWs

От
Thomas Munro
Дата:
On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <robertmhaas@gmail.com> wrote:
>
> On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote:
> > I agree with Stephen's request.  We have been waiting for the executor
> > rewrite for a while, so let's just do something simple and see how it
> > performs.
>
> I'm sympathetic to the frustration here, and I think it would be great
> if we could find a way forward that doesn't involve waiting for a full
> rewrite of the executor.  However, I seem to remember that when we
> tested the various patches that various people had written for this
> feature (I wrote one, too) they all had a noticeable performance
> penalty in the case of a plain old Append that involved no FDWs and
> nothing asynchronous. I don't think it's OK to have, say, a 2%
> regression on every query that involves an Append, because especially
> now that we have partitioning, that's a lot of queries.
>
> I don't know whether this patch has that kind of problem. If it
> doesn't, I would consider that a promising sign.

I'll look into that.  If there is a measurable impact, I suspect it
can be avoided by, for example, installing a different ExecProcNode
function.



Re: Append with naive multiplexing of FDWs

От
Kyotaro Horiguchi
Дата:
At Fri, 6 Dec 2019 10:03:44 +1300, Thomas Munro <thomas.munro@gmail.com> wrote in 
> On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <robertmhaas@gmail.com> wrote:
> > I don't know whether this patch has that kind of problem. If it
> > doesn't, I would consider that a promising sign.
> 
> I'll look into that.  If there is a measurable impact, I suspect it
> can be avoided by, for example, installing a different ExecProcNode
> function.

Replacing ExecProcNode perfectly isolates additional process in
ExecAppendAsync. Thus, for pure local appends, the patch can impact
performance through only planner and execinit. But I don't believe it
cannot be as large as observable in a large scan.

As the mail pointed upthread, the patch acceleartes all remote cases
when fetch_size is >= 200. The problem was that local scans seemed
slightly slowed down. I dusted off the old patch (FWIW I attached it)
and.. will re-run on the current development environment. (And
re-check the code.).

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 9f5dc3720ddade94cd66713f4aa79da575b09e31 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH v1 1/3] Allow wait event set to be registered to resource
 owner

WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
 src/backend/libpq/pqcomm.c                    |  2 +-
 src/backend/storage/ipc/latch.c               | 18 ++++-
 src/backend/storage/lmgr/condition_variable.c |  2 +-
 src/backend/utils/resowner/resowner.c         | 67 +++++++++++++++++++
 src/include/storage/latch.h                   |  4 +-
 src/include/utils/resowner_private.h          |  8 +++
 6 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index cd517e8bb4..3912b8b3a0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -220,7 +220,7 @@ pq_init(void)
                 (errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-    FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+    FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
     AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
                       NULL, NULL);
     AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 2426cbcf8e..dc04ee5f6f 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -52,6 +52,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/resowner_private.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -78,6 +79,8 @@ struct WaitEventSet
     int            nevents;        /* number of registered events */
     int            nevents_space;    /* maximum number of events in this set */
 
+    ResourceOwner    resowner;    /* Resource owner */
+
     /*
      * Array, of nevents_space length, storing the definition of events this
      * set is waiting for.
@@ -372,7 +375,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
     int            ret = 0;
     int            rc;
     WaitEvent    event;
-    WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+    WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
 
     if (wakeEvents & WL_TIMEOUT)
         Assert(timeout >= 0);
@@ -539,12 +542,15 @@ ResetLatch(Latch *latch)
  * WaitEventSetWait().
  */
 WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
 {
     WaitEventSet *set;
     char       *data;
     Size        sz = 0;
 
+    if (res)
+        ResourceOwnerEnlargeWESs(res);
+
     /*
      * Use MAXALIGN size/alignment to guarantee that later uses of memory are
      * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -614,6 +620,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
     StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
+    /* Register this wait event set if requested */
+    set->resowner = res;
+    if (res)
+        ResourceOwnerRememberWES(set->resowner, set);
+
     return set;
 }
 
@@ -655,6 +666,9 @@ FreeWaitEventSet(WaitEventSet *set)
     }
 #endif
 
+    if (set->resowner != NULL)
+        ResourceOwnerForgetWES(set->resowner, set);
+
     pfree(set);
 }
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index e08507f0cc..5e88c48a1c 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -70,7 +70,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
     {
         WaitEventSet *new_event_set;
 
-        new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
+        new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2);
         AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
                           MyLatch, NULL);
         AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 7be11c48ab..829034516f 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -128,6 +128,7 @@ typedef struct ResourceOwnerData
     ResourceArray filearr;        /* open temporary files */
     ResourceArray dsmarr;        /* dynamic shmem segments */
     ResourceArray jitarr;        /* JIT contexts */
+    ResourceArray wesarr;        /* wait event sets */
 
     /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
     int            nlocks;            /* number of owned locks */
@@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
 static void PrintSnapshotLeakWarning(Snapshot snapshot);
 static void PrintFileLeakWarning(File file);
 static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
 
 
 /*****************************************************************************
@@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
     ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
     ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
     ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
+    ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
 
     return owner;
 }
@@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
             jit_release_context(context);
         }
+
+        /* Ditto for wait event sets */
+        while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+        {
+            WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+            if (isCommit)
+                PrintWESLeakWarning(event);
+            FreeWaitEventSet(event);
+        }
     }
     else if (phase == RESOURCE_RELEASE_LOCKS)
     {
@@ -701,6 +714,7 @@ ResourceOwnerDelete(ResourceOwner owner)
     Assert(owner->filearr.nitems == 0);
     Assert(owner->dsmarr.nitems == 0);
     Assert(owner->jitarr.nitems == 0);
+    Assert(owner->wesarr.nitems == 0);
     Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
 
     /*
@@ -728,6 +742,7 @@ ResourceOwnerDelete(ResourceOwner owner)
     ResourceArrayFree(&(owner->filearr));
     ResourceArrayFree(&(owner->dsmarr));
     ResourceArrayFree(&(owner->jitarr));
+    ResourceArrayFree(&(owner->wesarr));
 
     pfree(owner);
 }
@@ -1346,3 +1361,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle)
         elog(ERROR, "JIT context %p is not owned by resource owner %s",
              DatumGetPointer(handle), owner->name);
 }
+
+/*
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+    ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+    ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+    /*
+     * XXXX: There's no property to show as an identier of a wait event set,
+     * use its pointer instead.
+     */
+    if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+        elog(ERROR, "wait event set %p is not owned by resource owner %s",
+             events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+    /*
+     * XXXX: There's no property to show as an identier of a wait event set,
+     * use its pointer instead.
+     */
+    elog(WARNING, "wait event set leak: %p still referenced",
+         events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index bd7af11a8a..d136614587 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
 #define LATCH_H
 
 #include <signal.h>
+#include "utils/resowner.h"
 
 /*
  * Latch structure should be treated as opaque and only accessed through
@@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch);
 extern void SetLatch(Latch *latch);
 extern void ResetLatch(Latch *latch);
 
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+                                        ResourceOwner res, int nevents);
 extern void FreeWaitEventSet(WaitEventSet *set);
 extern int    AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
                               Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index b8261ad866..9c9c845784 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
@@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner,
 extern void ResourceOwnerForgetJIT(ResourceOwner owner,
                                    Datum handle);
 
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+                         WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+                       WaitEventSet *);
+
 #endif                            /* RESOWNER_PRIVATE_H */
-- 
2.23.0

From 65fc69bb64f4596aff0c37a2d090ddf1609120bb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 15 May 2018 20:21:32 +0900
Subject: [PATCH v1 2/3] infrastructure for asynchronous execution

This patch add an infrastructure for asynchronous execution. As a PoC
this makes only Append capable to handle asynchronously executable
subnodes.
---
 src/backend/commands/explain.c          |  17 ++
 src/backend/executor/Makefile           |   1 +
 src/backend/executor/execAsync.c        | 145 ++++++++++++
 src/backend/executor/nodeAppend.c       | 286 +++++++++++++++++++++---
 src/backend/executor/nodeForeignscan.c  |  22 +-
 src/backend/nodes/bitmapset.c           |  72 ++++++
 src/backend/nodes/copyfuncs.c           |   2 +
 src/backend/nodes/outfuncs.c            |   2 +
 src/backend/nodes/readfuncs.c           |   2 +
 src/backend/optimizer/plan/createplan.c |  83 +++++++
 src/backend/postmaster/pgstat.c         |   3 +
 src/backend/postmaster/syslogger.c      |   2 +-
 src/backend/utils/adt/ruleutils.c       |   8 +-
 src/include/executor/execAsync.h        |  23 ++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeForeignscan.h  |   3 +
 src/include/foreign/fdwapi.h            |  11 +
 src/include/nodes/bitmapset.h           |   1 +
 src/include/nodes/execnodes.h           |  20 +-
 src/include/nodes/plannodes.h           |   9 +
 src/include/pgstat.h                    |   3 +-
 21 files changed, 676 insertions(+), 40 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 62fb3434a3..9f06e1fbdc 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -82,6 +82,7 @@ static void show_sort_keys(SortState *sortstate, List *ancestors,
                            ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
                                    ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
                           ExplainState *es);
 static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1319,6 +1320,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
         }
         if (plan->parallel_aware)
             appendStringInfoString(es->str, "Parallel ");
+        if (plan->async_capable)
+            appendStringInfoString(es->str, "Async ");
         appendStringInfoString(es->str, pname);
         es->indent++;
     }
@@ -1860,6 +1863,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
         case T_Hash:
             show_hash_info(castNode(HashState, planstate), es);
             break;
+
+        case T_Append:
+            show_append_info(castNode(AppendState, planstate), es);
+            break;
+
         default:
             break;
     }
@@ -2197,6 +2205,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
                          ancestors, es);
 }
 
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+    Append *plan = (Append *) astate->ps.plan;
+
+    if (plan->nasyncplans > 0)
+        ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
 /*
  * Show the grouping keys for an Agg node.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index a983800e4b..8a2d6e9961 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
     execAmi.o \
+    execAsync.o \
     execCurrent.o \
     execExpr.o \
     execExprInterp.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..db477e2cf6
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,145 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *      Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *      src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+    pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+                       void *data, bool reinit)
+{
+    switch (nodeTag(node))
+    {
+    case T_ForeignScanState:
+        return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+                                             wes, data, reinit);
+        break;
+    default:
+            elog(ERROR, "unrecognized node type: %d",
+                (int) nodeTag(node));
+    }
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+    int **p_refind;
+    int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+    /* arg is the address of the variable refind in ExecAsyncEventWait */
+    ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+    *mcbarg->p_refind = NULL;
+    *mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+    static int *refind = NULL;
+    static int refindsize = 0;
+    WaitEventSet *wes;
+    WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+    int noccurred = 0;
+    Bitmapset *fired_events = NULL;
+    int i;
+    int n;
+
+    n = bms_num_members(waitnodes);
+    wes = CreateWaitEventSet(TopTransactionContext,
+                             TopTransactionResourceOwner, n);
+    if (refindsize < n)
+    {
+        if (refindsize == 0)
+            refindsize = EVENT_BUFFER_SIZE; /* XXX */
+        while (refindsize < n)
+            refindsize *= 2;
+        if (refind)
+            refind = (int *) repalloc(refind, refindsize * sizeof(int));
+        else
+        {
+            static ExecAsync_mcbarg mcb_arg =
+                { &refind, &refindsize };
+            static MemoryContextCallback mcb =
+                { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+            MemoryContext oldctxt =
+                MemoryContextSwitchTo(TopTransactionContext);
+
+            /*
+             * refind points to a memory block in
+             * TopTransactionContext. Register a callback to reset it.
+             */
+            MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+            refind = (int *) palloc(refindsize * sizeof(int));
+            MemoryContextSwitchTo(oldctxt);
+        }
+    }
+
+    n = 0;
+    for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+         i = bms_next_member(waitnodes, i))
+    {
+        refind[i] = i;
+        if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+            n++;
+    }
+
+    if (n == 0)
+    {
+        FreeWaitEventSet(wes);
+        return NULL;
+    }
+
+    noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+                                 EVENT_BUFFER_SIZE,
+                                 WAIT_EVENT_ASYNC_WAIT);
+    FreeWaitEventSet(wes);
+    if (noccurred == 0)
+        return NULL;
+
+    for (i = 0 ; i < noccurred ; i++)
+    {
+        WaitEvent *w = &occurred_event[i];
+
+        if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+        {
+            int n = *(int*)w->user_data;
+
+            fired_events = bms_add_member(fired_events, n);
+        }
+    }
+
+    return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 5ff986ac7d..03dec4d648 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
 #include "miscadmin.h"
 
 /* Shared state for parallel-aware Append. */
@@ -81,6 +82,7 @@ struct ParallelAppendState
 #define NO_MATCHING_SUBPLANS        -2
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
@@ -104,22 +106,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
     PlanState **appendplanstates;
     Bitmapset  *validsubplans;
     int            nplans;
+    int            nasyncplans;
     int            firstvalid;
     int            i,
                 j;
 
     /* check for unsupported flags */
-    Assert(!(eflags & EXEC_FLAG_MARK));
+    Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
 
     /*
      * create new AppendState for our append node
      */
     appendstate->ps.plan = (Plan *) node;
     appendstate->ps.state = estate;
-    appendstate->ps.ExecProcNode = ExecAppend;
+
+    /* choose appropriate version of Exec function */
+    if (node->nasyncplans == 0)
+        appendstate->ps.ExecProcNode = ExecAppend;
+    else
+        appendstate->ps.ExecProcNode = ExecAppendAsync;
 
     /* Let choose_next_subplan_* function handle setting the first subplan */
-    appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+    appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 
     /* If run-time partition pruning is enabled, then set that up now */
     if (node->part_prune_info != NULL)
@@ -152,7 +160,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
              */
             if (bms_is_empty(validsubplans))
             {
-                appendstate->as_whichplan = NO_MATCHING_SUBPLANS;
+                appendstate->as_whichsyncplan = NO_MATCHING_SUBPLANS;
 
                 /* Mark the first as valid so that it's initialized below */
                 validsubplans = bms_make_singleton(0);
@@ -212,10 +220,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
      */
     j = 0;
     firstvalid = nplans;
+    nasyncplans = 0;
+
     i = -1;
     while ((i = bms_next_member(validsubplans, i)) >= 0)
     {
         Plan       *initNode = (Plan *) list_nth(node->appendplans, i);
+        int            sub_eflags = eflags;
+
+        /* Let async-capable subplans run asynchronously */
+        if (i < node->nasyncplans)
+        {
+            sub_eflags |= EXEC_FLAG_ASYNC;
+            nasyncplans++;
+        }
 
         /*
          * Record the lowest appendplans index which is a valid partial plan.
@@ -223,13 +241,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
         if (i >= node->first_partial_plan && j < firstvalid)
             firstvalid = j;
 
-        appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+        appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
     }
 
     appendstate->as_first_partial_plan = firstvalid;
     appendstate->appendplans = appendplanstates;
     appendstate->as_nplans = nplans;
 
+    /* fill in async stuff */
+    appendstate->as_nasyncplans = nasyncplans;
+    appendstate->as_syncdone = (nasyncplans == nplans);
+
+    if (appendstate->as_nasyncplans)
+    {
+        appendstate->as_asyncresult = (TupleTableSlot **)
+            palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+        /* initially, all async requests need a request */
+        for (i = 0; i < appendstate->as_nasyncplans; ++i)
+            appendstate->as_needrequest =
+                bms_add_member(appendstate->as_needrequest, i);
+    }
+
     /*
      * Miscellaneous initialization
      */
@@ -253,21 +286,23 @@ ExecAppend(PlanState *pstate)
 {
     AppendState *node = castNode(AppendState, pstate);
 
-    if (node->as_whichplan < 0)
+    if (node->as_whichsyncplan < 0)
     {
         /*
          * If no subplan has been chosen, we must choose one before
          * proceeding.
          */
-        if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+        if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
             !node->choose_next_subplan(node))
             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
         /* Nothing to do if there are no matching subplans */
-        else if (node->as_whichplan == NO_MATCHING_SUBPLANS)
+        else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     }
 
+    Assert(node->as_nasyncplans == 0);
+
     for (;;)
     {
         PlanState  *subnode;
@@ -278,8 +313,9 @@ ExecAppend(PlanState *pstate)
         /*
          * figure out which subplan we are currently processing
          */
-        Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
-        subnode = node->appendplans[node->as_whichplan];
+        Assert(node->as_whichsyncplan >= 0 &&
+               node->as_whichsyncplan < node->as_nplans);
+        subnode = node->appendplans[node->as_whichsyncplan];
 
         /*
          * get a tuple from the subplan
@@ -302,6 +338,175 @@ ExecAppend(PlanState *pstate)
     }
 }
 
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+    AppendState *node = castNode(AppendState, pstate);
+    Bitmapset *needrequest;
+    int    i;
+
+    Assert(node->as_nasyncplans > 0);
+
+restart:
+    if (node->as_nasyncresult > 0)
+    {
+        --node->as_nasyncresult;
+        return node->as_asyncresult[node->as_nasyncresult];
+    }
+
+    needrequest = node->as_needrequest;
+    node->as_needrequest = NULL;
+    while ((i = bms_first_member(needrequest)) >= 0)
+    {
+        TupleTableSlot *slot;
+        PlanState *subnode = node->appendplans[i];
+
+        slot = ExecProcNode(subnode);
+        if (subnode->asyncstate == AS_AVAILABLE)
+        {
+            if (!TupIsNull(slot))
+            {
+                node->as_asyncresult[node->as_nasyncresult++] = slot;
+                node->as_needrequest = bms_add_member(node->as_needrequest, i);
+            }
+        }
+        else
+            node->as_pending_async = bms_add_member(node->as_pending_async, i);
+    }
+    bms_free(needrequest);
+
+    for (;;)
+    {
+        TupleTableSlot *result;
+
+        /* return now if a result is available */
+        if (node->as_nasyncresult > 0)
+        {
+            --node->as_nasyncresult;
+            return node->as_asyncresult[node->as_nasyncresult];
+        }
+
+        while (!bms_is_empty(node->as_pending_async))
+        {
+            long timeout = node->as_syncdone ? -1 : 0;
+            Bitmapset *fired;
+            int i;
+
+            fired = ExecAsyncEventWait(node->appendplans,
+                                       node->as_pending_async,
+                                       timeout);
+
+            if (bms_is_empty(fired) && node->as_syncdone)
+            {
+                /*
+                 * No subplan fired. This happens when even in normal
+                 * operation where the subnode already prepared results before
+                 * waiting. as_pending_result is storing stale information so
+                 * restart from the beginning.
+                 */
+                node->as_needrequest = node->as_pending_async;
+                node->as_pending_async = NULL;
+                goto restart;
+            }
+
+            while ((i = bms_first_member(fired)) >= 0)
+            {
+                TupleTableSlot *slot;
+                PlanState *subnode = node->appendplans[i];
+                slot = ExecProcNode(subnode);
+                if (subnode->asyncstate == AS_AVAILABLE)
+                {
+                    if (!TupIsNull(slot))
+                    {
+                        node->as_asyncresult[node->as_nasyncresult++] = slot;
+                        node->as_needrequest =
+                            bms_add_member(node->as_needrequest, i);
+                    }
+                    node->as_pending_async =
+                        bms_del_member(node->as_pending_async, i);
+                }
+            }
+            bms_free(fired);
+
+            /* return now if a result is available */
+            if (node->as_nasyncresult > 0)
+            {
+                --node->as_nasyncresult;
+                return node->as_asyncresult[node->as_nasyncresult];
+            }
+
+            if (!node->as_syncdone)
+                break;
+        }
+
+        /*
+         * If there is no asynchronous activity still pending and the
+         * synchronous activity is also complete, we're totally done scanning
+         * this node.  Otherwise, we're done with the asynchronous stuff but
+         * must continue scanning the synchronous children.
+         */
+        if (node->as_syncdone)
+        {
+            Assert(bms_is_empty(node->as_pending_async));
+            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+        }
+
+        /*
+         * get a tuple from the subplan
+         */
+        
+        if (node->as_whichsyncplan < 0)
+        {
+            /*
+             * If no subplan has been chosen, we must choose one before
+             * proceeding.
+             */
+            if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
+                !node->choose_next_subplan(node))
+            {
+                node->as_syncdone = true;
+                return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+            }
+
+            /* Nothing to do if there are no matching subplans */
+            else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
+            {
+                node->as_syncdone = true;
+                return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+            }
+        }
+
+        result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+        if (!TupIsNull(result))
+        {
+            /*
+             * If the subplan gave us something then return it as-is. We do
+             * NOT make use of the result slot that was set up in
+             * ExecInitAppend; there's no need for it.
+             */
+            return result;
+        }
+
+        /*
+         * Go on to the "next" subplan. If no more subplans, return the empty
+         * slot set up for us by ExecInitAppend, unless there are async plans
+         * we have yet to finish.
+         */
+        if (!node->choose_next_subplan(node))
+        {
+            node->as_syncdone = true;
+            if (bms_is_empty(node->as_pending_async))
+            {
+                Assert(bms_is_empty(node->as_needrequest));
+                return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+            }
+        }
+
+        /* Else loop back and try to get a tuple from the new subplan */
+    }
+}
+
 /* ----------------------------------------------------------------
  *        ExecEndAppend
  *
@@ -348,6 +553,15 @@ ExecReScanAppend(AppendState *node)
         node->as_valid_subplans = NULL;
     }
 
+    /* Reset async state. */
+    for (i = 0; i < node->as_nasyncplans; ++i)
+    {
+        ExecShutdownNode(node->appendplans[i]);
+        node->as_needrequest = bms_add_member(node->as_needrequest, i);
+    }
+    node->as_nasyncresult = 0;
+    node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
     for (i = 0; i < node->as_nplans; i++)
     {
         PlanState  *subnode = node->appendplans[i];
@@ -368,7 +582,7 @@ ExecReScanAppend(AppendState *node)
     }
 
     /* Let choose_next_subplan_* function handle setting the first subplan */
-    node->as_whichplan = INVALID_SUBPLAN_INDEX;
+    node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 }
 
 /* ----------------------------------------------------------------
@@ -456,7 +670,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 static bool
 choose_next_subplan_locally(AppendState *node)
 {
-    int            whichplan = node->as_whichplan;
+    int            whichplan = node->as_whichsyncplan;
     int            nextplan;
 
     /* We should never be called when there are no subplans */
@@ -475,6 +689,10 @@ choose_next_subplan_locally(AppendState *node)
             node->as_valid_subplans =
                 ExecFindMatchingSubPlans(node->as_prune_state);
 
+        /* Exclude async plans */
+        if (node->as_nasyncplans > 0)
+            bms_del_range(node->as_valid_subplans, 0, node->as_nasyncplans - 1);
+
         whichplan = -1;
     }
 
@@ -489,7 +707,7 @@ choose_next_subplan_locally(AppendState *node)
     if (nextplan < 0)
         return false;
 
-    node->as_whichplan = nextplan;
+    node->as_whichsyncplan = nextplan;
 
     return true;
 }
@@ -511,19 +729,19 @@ choose_next_subplan_for_leader(AppendState *node)
     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
     /* We should never be called when there are no subplans */
-    Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+    Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
     LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
-    if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+    if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
     {
         /* Mark just-completed subplan as finished. */
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
     }
     else
     {
         /* Start with last subplan. */
-        node->as_whichplan = node->as_nplans - 1;
+        node->as_whichsyncplan = node->as_nplans - 1;
 
         /*
          * If we've yet to determine the valid subplans then do so now.  If
@@ -544,12 +762,12 @@ choose_next_subplan_for_leader(AppendState *node)
     }
 
     /* Loop until we find a subplan to execute. */
-    while (pstate->pa_finished[node->as_whichplan])
+    while (pstate->pa_finished[node->as_whichsyncplan])
     {
-        if (node->as_whichplan == 0)
+        if (node->as_whichsyncplan == 0)
         {
             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
-            node->as_whichplan = INVALID_SUBPLAN_INDEX;
+            node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
             LWLockRelease(&pstate->pa_lock);
             return false;
         }
@@ -558,12 +776,12 @@ choose_next_subplan_for_leader(AppendState *node)
          * We needn't pay attention to as_valid_subplans here as all invalid
          * plans have been marked as finished.
          */
-        node->as_whichplan--;
+        node->as_whichsyncplan--;
     }
 
     /* If non-partial, immediately mark as finished. */
-    if (node->as_whichplan < node->as_first_partial_plan)
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+    if (node->as_whichsyncplan < node->as_first_partial_plan)
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
     LWLockRelease(&pstate->pa_lock);
 
@@ -592,13 +810,13 @@ choose_next_subplan_for_worker(AppendState *node)
     Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
     /* We should never be called when there are no subplans */
-    Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+    Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
     LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
     /* Mark just-completed subplan as finished. */
-    if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+    if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
     /*
      * If we've yet to determine the valid subplans then do so now.  If
@@ -620,7 +838,7 @@ choose_next_subplan_for_worker(AppendState *node)
     }
 
     /* Save the plan from which we are starting the search. */
-    node->as_whichplan = pstate->pa_next_plan;
+    node->as_whichsyncplan = pstate->pa_next_plan;
 
     /* Loop until we find a valid subplan to execute. */
     while (pstate->pa_finished[pstate->pa_next_plan])
@@ -634,7 +852,7 @@ choose_next_subplan_for_worker(AppendState *node)
             /* Advance to the next valid plan. */
             pstate->pa_next_plan = nextplan;
         }
-        else if (node->as_whichplan > node->as_first_partial_plan)
+        else if (node->as_whichsyncplan > node->as_first_partial_plan)
         {
             /*
              * Try looping back to the first valid partial plan, if there is
@@ -643,7 +861,7 @@ choose_next_subplan_for_worker(AppendState *node)
             nextplan = bms_next_member(node->as_valid_subplans,
                                        node->as_first_partial_plan - 1);
             pstate->pa_next_plan =
-                nextplan < 0 ? node->as_whichplan : nextplan;
+                nextplan < 0 ? node->as_whichsyncplan : nextplan;
         }
         else
         {
@@ -651,10 +869,10 @@ choose_next_subplan_for_worker(AppendState *node)
              * At last plan, and either there are no partial plans or we've
              * tried them all.  Arrange to bail out.
              */
-            pstate->pa_next_plan = node->as_whichplan;
+            pstate->pa_next_plan = node->as_whichsyncplan;
         }
 
-        if (pstate->pa_next_plan == node->as_whichplan)
+        if (pstate->pa_next_plan == node->as_whichsyncplan)
         {
             /* We've tried everything! */
             pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -664,7 +882,7 @@ choose_next_subplan_for_worker(AppendState *node)
     }
 
     /* Pick the plan we found, and advance pa_next_plan one more time. */
-    node->as_whichplan = pstate->pa_next_plan;
+    node->as_whichsyncplan = pstate->pa_next_plan;
     pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
                                            pstate->pa_next_plan);
 
@@ -691,8 +909,8 @@ choose_next_subplan_for_worker(AppendState *node)
     }
 
     /* If non-partial, immediately mark as finished. */
-    if (node->as_whichplan < node->as_first_partial_plan)
-        node->as_pstate->pa_finished[node->as_whichplan] = true;
+    if (node->as_whichsyncplan < node->as_first_partial_plan)
+        node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
     LWLockRelease(&pstate->pa_lock);
 
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 52af1dac5c..1a54383ec8 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -117,7 +117,6 @@ ExecForeignScan(PlanState *pstate)
                     (ExecScanRecheckMtd) ForeignRecheck);
 }
 
-
 /* ----------------------------------------------------------------
  *        ExecInitForeignScan
  * ----------------------------------------------------------------
@@ -141,6 +140,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
     scanstate->ss.ps.plan = (Plan *) node;
     scanstate->ss.ps.state = estate;
     scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+    scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+    if ((eflags & EXEC_FLAG_ASYNC) != 0)
+        scanstate->fs_async = true;
 
     /*
      * Miscellaneous initialization
@@ -384,3 +387,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
     if (fdwroutine->ShutdownForeignScan)
         fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *        ExecAsyncForeignScanConfigureWait
+ *
+ *        In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+                              void *caller_data, bool reinit)
+{
+    FdwRoutine *fdwroutine = node->fdwroutine;
+
+    Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+    return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+                                                 caller_data, reinit);
+}
diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c
index 665149defe..5d4e19a052 100644
--- a/src/backend/nodes/bitmapset.c
+++ b/src/backend/nodes/bitmapset.c
@@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper)
     return a;
 }
 
+/*
+ * bms_del_range
+ *        Delete members in the range of 'lower' to 'upper' from the set.
+ *
+ * Note this could also be done by calling bms_del_member in a loop, however,
+ * using this function will be faster when the range is large as we work at
+ * the bitmapword level rather than at bit level.
+ */
+Bitmapset *
+bms_del_range(Bitmapset *a, int lower, int upper)
+{
+    int            lwordnum,
+                lbitnum,
+                uwordnum,
+                ushiftbits,
+                wordnum;
+
+    if (lower < 0 || upper < 0)
+        elog(ERROR, "negative bitmapset member not allowed");
+    if (lower > upper)
+        elog(ERROR, "lower range must not be above upper range");
+    uwordnum = WORDNUM(upper);
+
+    if (a == NULL)
+    {
+        a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1));
+        a->nwords = uwordnum + 1;
+    }
+
+    /* ensure we have enough words to store the upper bit */
+    else if (uwordnum >= a->nwords)
+    {
+        int            oldnwords = a->nwords;
+        int            i;
+
+        a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1));
+        a->nwords = uwordnum + 1;
+        /* zero out the enlarged portion */
+        for (i = oldnwords; i < a->nwords; i++)
+            a->words[i] = 0;
+    }
+
+    wordnum = lwordnum = WORDNUM(lower);
+
+    lbitnum = BITNUM(lower);
+    ushiftbits = BITNUM(upper) + 1;
+
+    /*
+     * Special case when lwordnum is the same as uwordnum we must perform the
+     * upper and lower masking on the word.
+     */
+    if (lwordnum == uwordnum)
+    {
+        a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1)
+                                | (~(bitmapword) 0) << ushiftbits);
+    }
+    else
+    {
+        /* turn off lbitnum and all bits left of it */
+        a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1);
+
+        /* turn off all bits for any intermediate words */
+        while (wordnum < uwordnum)
+            a->words[wordnum++] = (bitmapword) 0;
+
+        /* turn off upper's bit and all bits right of it. */
+        a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits;
+    }
+
+    return a;
+}
+
 /*
  * bms_int_members - like bms_intersect, but left input is recycled
  */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a74b56bb59..a266904010 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -244,6 +244,8 @@ _copyAppend(const Append *from)
     COPY_NODE_FIELD(appendplans);
     COPY_SCALAR_FIELD(first_partial_plan);
     COPY_NODE_FIELD(part_prune_info);
+    COPY_SCALAR_FIELD(nasyncplans);
+    COPY_SCALAR_FIELD(referent);
 
     return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a80eccc2c1..bf87e721a5 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -434,6 +434,8 @@ _outAppend(StringInfo str, const Append *node)
     WRITE_NODE_FIELD(appendplans);
     WRITE_INT_FIELD(first_partial_plan);
     WRITE_NODE_FIELD(part_prune_info);
+    WRITE_INT_FIELD(nasyncplans);
+    WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 764e3bb90c..25b84b3f15 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1639,6 +1639,8 @@ _readAppend(void)
     READ_NODE_FIELD(appendplans);
     READ_INT_FIELD(first_partial_plan);
     READ_NODE_FIELD(part_prune_info);
+    READ_INT_FIELD(nasyncplans);
+    READ_INT_FIELD(referent);
 
     READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index aee81bd755..c676980a30 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -207,6 +207,9 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
                                                      Index scanrelid, char *enrname);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
                                          Index scanrelid, int wtParam);
+static Append *make_append(List *appendplans, int first_partial_plan,
+                           int nasyncplans,    int referent,
+                           List *tlist, PartitionPruneInfo *partpruneinfos);
 static RecursiveUnion *make_recursive_union(List *tlist,
                                             Plan *lefttree,
                                             Plan *righttree,
@@ -292,6 +295,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
                                      List *rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
                                              GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -1069,6 +1073,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
     bool        tlist_was_changed = false;
     List       *pathkeys = best_path->path.pathkeys;
     List       *subplans = NIL;
+    List       *asyncplans = NIL;
+    List       *syncplans = NIL;
     ListCell   *subpaths;
     RelOptInfo *rel = best_path->path.parent;
     PartitionPruneInfo *partpruneinfo = NULL;
@@ -1077,6 +1083,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
     Oid           *nodeSortOperators = NULL;
     Oid           *nodeCollations = NULL;
     bool       *nodeNullsFirst = NULL;
+    int            nasyncplans = 0;
+    bool        first = true;
+    bool        referent_is_sync = true;
 
     /*
      * The subpaths list could be empty, if every child was proven empty by
@@ -1206,6 +1215,23 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
         }
 
         subplans = lappend(subplans, subplan);
+
+        /*
+         * Classify as async-capable or not. If we have decided to run the
+         * chidlren in parallel, we cannot any one of them run asynchronously.
+         */
+        if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+        {
+            subplan->async_capable = true;
+            asyncplans = lappend(asyncplans, subplan);
+            ++nasyncplans;
+            if (first)
+                referent_is_sync = false;
+        }
+        else
+            syncplans = lappend(syncplans, subplan);
+
+        first = false;
     }
 
     /*
@@ -1244,6 +1270,18 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
     plan->first_partial_plan = best_path->first_partial_path;
     plan->part_prune_info = partpruneinfo;
 
+    /*
+     * XXX ideally, if there's just one child, we'd not bother to generate an
+     * Append node but just return the single child.  At the moment this does
+     * not work because the varno of the child scan plan won't match the
+     * parent-rel Vars it'll be asked to emit.
+     */
+
+    plan = make_append(list_concat(asyncplans, syncplans),
+                       best_path->first_partial_path, nasyncplans,
+                       referent_is_sync ? nasyncplans : 0, tlist,
+                       partpruneinfo);
+
     copy_generic_path_info(&plan->plan, (Path *) best_path);
 
     /*
@@ -5462,6 +5500,27 @@ make_foreignscan(List *qptlist,
     return node;
 }
 
+static Append *
+make_append(List *appendplans, int first_partial_plan, int nasyncplans,
+            int referent, List *tlist, PartitionPruneInfo *partpruneinfo)
+{
+    Append       *node = makeNode(Append);
+    Plan       *plan = &node->plan;
+
+    plan->targetlist = tlist;
+    plan->qual = NIL;
+    plan->lefttree = NULL;
+    plan->righttree = NULL;
+
+    node->appendplans = appendplans;
+    node->first_partial_plan = first_partial_plan;
+    node->part_prune_info = partpruneinfo;
+    node->nasyncplans = nasyncplans;
+    node->referent = referent;
+
+    return node;
+}
+
 static RecursiveUnion *
 make_recursive_union(List *tlist,
                      Plan *lefttree,
@@ -6836,3 +6895,27 @@ is_projection_capable_plan(Plan *plan)
     }
     return true;
 }
+
+/*
+ * is_projection_capable_path
+ *        Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+    switch (nodeTag(path))
+    {
+        case T_ForeignPath:
+            {
+                FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+                Assert(fdwroutine != NULL);
+                if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+                    fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+                    return true;
+            }
+        default:
+            break;
+    }
+    return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index fabcf31de8..575ccd5def 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3853,6 +3853,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
         case WAIT_EVENT_SYNC_REP:
             event_name = "SyncRep";
             break;
+        case WAIT_EVENT_ASYNC_WAIT:
+            event_name = "AsyncExecWait";
+            break;
             /* no default case, so that compiler will warn */
     }
 
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index bb2baff763..7669e6ff53 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -306,7 +306,7 @@ SysLoggerMain(int argc, char *argv[])
      * syslog pipe, which implies that all other backends have exited
      * (including the postmaster).
      */
-    wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+    wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2);
     AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 #ifndef WIN32
     AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 13685a0a0e..60b749f062 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4640,7 +4640,7 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
     dpns->planstate = ps;
 
     /*
-     * We special-case Append and MergeAppend to pretend that the first child
+     * We special-case Append and MergeAppend to pretend that a specific child
      * plan is the OUTER referent; we have to interpret OUTER Vars in their
      * tlists according to one of the children, and the first one is the most
      * natural choice.  Likewise special-case ModifyTable to pretend that the
@@ -4648,7 +4648,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
      * lists containing references to non-target relations.
      */
     if (IsA(ps, AppendState))
-        dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+    {
+        AppendState *aps = (AppendState *) ps;
+        Append *app = (Append *) ps->plan;
+        dpns->outer_planstate = aps->appendplans[app->referent];
+    }
     else if (IsA(ps, MergeAppendState))
         dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
     else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..5fd67d9004
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ *        Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *        src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+                                   void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+                                     long timeout);
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 6298c7c8ca..4adb2efe76 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -59,6 +59,7 @@
 #define EXEC_FLAG_MARK            0x0008    /* need mark/restore */
 #define EXEC_FLAG_SKIP_TRIGGERS 0x0010    /* skip AfterTrigger calls */
 #define EXEC_FLAG_WITH_NO_DATA    0x0020    /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC            0x0040    /* request async execution */
 
 
 /* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ca7723c899..81791033af 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
                                             ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+                                          WaitEventSet *wes,
+                                          void *caller_data, bool reinit);
 
 #endif                            /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 822686033e..851cd15e65 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
                                                             List *fdw_private,
                                                             RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+                                                    WaitEventSet *wes,
+                                                    void *caller_data,
+                                                    bool reinit);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -190,6 +195,7 @@ typedef struct FdwRoutine
     GetForeignPlan_function GetForeignPlan;
     BeginForeignScan_function BeginForeignScan;
     IterateForeignScan_function IterateForeignScan;
+    IterateForeignScan_function IterateForeignScanAsync;
     ReScanForeignScan_function ReScanForeignScan;
     EndForeignScan_function EndForeignScan;
 
@@ -242,6 +248,11 @@ typedef struct FdwRoutine
     InitializeDSMForeignScan_function InitializeDSMForeignScan;
     ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
     InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+    /* Support functions for asynchronous execution */
+    IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+    ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
     ShutdownForeignScan_function ShutdownForeignScan;
 
     /* Support functions for path reparameterization. */
diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h
index 0c645628e5..d97a4c2235 100644
--- a/src/include/nodes/bitmapset.h
+++ b/src/include/nodes/bitmapset.h
@@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b);
+extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b);

 /* support for iterating through the integer elements of a set: */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6eb647290b..46d7fbab3a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -932,6 +932,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
  * abstract superclass for all PlanState-type nodes.
  * ----------------
  */
+typedef enum AsyncState
+{
+    AS_AVAILABLE,
+    AS_WAITING
+} AsyncState;
+
 typedef struct PlanState
 {
     NodeTag        type;
@@ -1020,6 +1026,11 @@ typedef struct PlanState
     bool        outeropsset;
     bool        inneropsset;
     bool        resultopsset;
+
+    /* Async subnode execution sutff */
+    AsyncState    asyncstate;
+
+    int32        padding;            /* to keep alignment of derived types */
 } PlanState;
 
 /* ----------------
@@ -1216,14 +1227,20 @@ struct AppendState
     PlanState    ps;                /* its first field is NodeTag */
     PlanState **appendplans;    /* array of PlanStates for my inputs */
     int            as_nplans;
-    int            as_whichplan;
+    int            as_whichsyncplan; /* which sync plan is being executed  */
     int            as_first_partial_plan;    /* Index of 'appendplans' containing
                                          * the first partial plan */
+    int            as_nasyncplans;    /* # of async-capable children */
     ParallelAppendState *as_pstate; /* parallel coordination info */
     Size        pstate_len;        /* size of parallel coordination info */
     struct PartitionPruneState *as_prune_state;
     Bitmapset  *as_valid_subplans;
     bool        (*choose_next_subplan) (AppendState *);
+    bool        as_syncdone;    /* all synchronous plans done? */
+    Bitmapset  *as_needrequest;    /* async plans needing a new request */
+    Bitmapset  *as_pending_async;    /* pending async plans */
+    TupleTableSlot **as_asyncresult;    /* unreturned results of async plans */
+    int            as_nasyncresult;    /* # of valid entries in as_asyncresult */
 };
 
 /* ----------------
@@ -1786,6 +1803,7 @@ typedef struct ForeignScanState
     Size        pscan_len;        /* size of parallel coordination information */
     /* use struct pointer to avoid including fdwapi.h here */
     struct FdwRoutine *fdwroutine;
+    bool        fs_async;
     void       *fdw_state;        /* foreign-data wrapper can keep state here */
 } ForeignScanState;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 8e6594e355..26810915e5 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -133,6 +133,11 @@ typedef struct Plan
     bool        parallel_aware; /* engage parallel-aware logic? */
     bool        parallel_safe;    /* OK to use as part of parallel plan? */
 
+    /*
+     * information needed for asynchronous execution
+     */
+    bool        async_capable;  /* engage asyncronous execution logic? */
+
     /*
      * Common structural data for all Plan types.
      */
@@ -259,6 +264,10 @@ typedef struct Append
 
     /* Info for run-time subplan pruning; NULL if we're not doing that */
     struct PartitionPruneInfo *part_prune_info;
+
+    /* Async child node execution stuff */
+    int            nasyncplans;    /* # async subplans, always at start of list */
+    int            referent;         /* index of inheritance tree referent */
 } Append;
 
 /* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823d..d57ef809fc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -853,7 +853,8 @@ typedef enum
     WAIT_EVENT_REPLICATION_ORIGIN_DROP,
     WAIT_EVENT_REPLICATION_SLOT_DROP,
     WAIT_EVENT_SAFE_SNAPSHOT,
-    WAIT_EVENT_SYNC_REP
+    WAIT_EVENT_SYNC_REP,
+    WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
-- 
2.23.0

From 8984d4e3360eaf79fc0f23b3e6817770be13fcfd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH v1 3/3] async postgres_fdw

---
 contrib/postgres_fdw/connection.c             |  26 +
 .../postgres_fdw/expected/postgres_fdw.out    | 222 ++++---
 contrib/postgres_fdw/postgres_fdw.c           | 615 ++++++++++++++++--
 contrib/postgres_fdw/postgres_fdw.h           |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  20 +-
 5 files changed, 711 insertions(+), 174 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 27b86a03f8..1afd99cad8 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -56,6 +56,7 @@ typedef struct ConnCacheEntry
     bool        invalidated;    /* true if reconnect is pending */
     uint32        server_hashvalue;    /* hash value of foreign server OID */
     uint32        mapping_hashvalue;    /* hash value of user mapping OID */
+    void        *storage;        /* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -200,6 +201,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
         elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
              entry->conn, server->servername, user->umid, user->userid);
+        entry->storage = NULL;
     }
 
     /*
@@ -213,6 +215,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
     return entry->conn;
 }
 
+/*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+    bool        found;
+    ConnCacheEntry *entry;
+    ConnCacheKey key;
+
+    key = user->umid;
+    entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+    Assert(found);
+
+    if (entry->storage == NULL)
+    {
+        entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+        memset(entry->storage, 0, initsize);
+    }
+
+    return entry->storage;
+}
+
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 48282ab151..bd2e835d2d 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6900,7 +6900,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
 INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |  aa   
 ----------+-------
  a        | aaa
@@ -6928,7 +6928,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -6956,7 +6956,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | aaa
@@ -6984,7 +6984,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa   
 ----------+--------
  a        | newtoo
@@ -7054,35 +7054,41 @@ insert into bar2 values(3,33,33);
 insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+                                                   QUERY PLAN                                                    
+-----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+                     Sort Key: bar.f1
+                     ->  Seq Scan on public.bar
+                           Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2 bar_1
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(29 rows)
 
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
  f1 | f2 
 ----+----
   1 | 11
@@ -7092,35 +7098,41 @@ select * from bar where f1 in (select f1 from foo) for update;
 (4 rows)
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+                                                   QUERY PLAN                                                   
+----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+                     Sort Key: bar.f1
+                     ->  Seq Scan on public.bar
+                           Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2 bar_1
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(29 rows)
 
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
  f1 | f2 
 ----+----
   1 | 11
@@ -7150,11 +7162,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
    ->  Hash Join
          Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid
          Inner Unique: true
@@ -7168,12 +7181,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1 
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(39 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(41 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
 select tableoid::regclass, * from bar order by 1,2;
@@ -7203,16 +7217,17 @@ where bar.f1 = ss.f1;
          Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
          Hash Cond: (foo.f1 = bar.f1)
          ->  Append
-               ->  Seq Scan on public.foo
-                     Output: ROW(foo.f1), foo.f1
-               ->  Foreign Scan on public.foo2 foo_1
+               Async subplans: 2 
+               ->  Async Foreign Scan on public.foo2 foo_1
                      Output: ROW(foo_1.f1), foo_1.f1
                      Remote SQL: SELECT f1 FROM public.loct1
-               ->  Seq Scan on public.foo foo_2
-                     Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-               ->  Foreign Scan on public.foo2 foo_3
+               ->  Async Foreign Scan on public.foo2 foo_3
                      Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
+               ->  Seq Scan on public.foo
+                     Output: ROW(foo.f1), foo.f1
+               ->  Seq Scan on public.foo foo_2
+                     Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
          ->  Hash
                Output: bar.f1, bar.f2, bar.ctid
                ->  Seq Scan on public.bar
@@ -7230,17 +7245,18 @@ where bar.f1 = ss.f1;
                Output: (ROW(foo.f1)), foo.f1
                Sort Key: foo.f1
                ->  Append
-                     ->  Seq Scan on public.foo
-                           Output: ROW(foo.f1), foo.f1
-                     ->  Foreign Scan on public.foo2 foo_1
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on public.foo2 foo_1
                            Output: ROW(foo_1.f1), foo_1.f1
                            Remote SQL: SELECT f1 FROM public.loct1
-                     ->  Seq Scan on public.foo foo_2
-                           Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-                     ->  Foreign Scan on public.foo2 foo_3
+                     ->  Async Foreign Scan on public.foo2 foo_3
                            Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
                            Remote SQL: SELECT f1 FROM public.loct1
-(45 rows)
+                     ->  Seq Scan on public.foo
+                           Output: ROW(foo.f1), foo.f1
+                     ->  Seq Scan on public.foo foo_2
+                           Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
+(47 rows)
 
 update bar set f2 = f2 + 100
 from
@@ -7390,27 +7406,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2 bar_1
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2 bar_1
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: u.f1, u.f2
+   Sort Key: u.f1
+   CTE u
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2 bar_1
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2 bar_1
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on u
+         Output: u.f1, u.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
  f1 | f2  
 ----+-----
   1 | 311
   2 | 322
-  6 | 266
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
@@ -8485,11 +8507,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
  Sort
    Sort Key: t1.a, t3.c
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)) INNER JOIN (ftprt1_p1 t3)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)) INNER JOIN (ftprt1_p2 t3_1)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE
t1.a% 25 =0 ORDER BY 1,2,3;
 
   a  |  b  |  c
@@ -8524,20 +8547,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
 -- with whole-row reference; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2
t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
 
-                       QUERY PLAN                       
---------------------------------------------------------
+                          QUERY PLAN                          
+--------------------------------------------------------------
  Sort
    Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
    ->  Hash Full Join
          Hash Cond: (t1.a = t2.b)
          ->  Append
-               ->  Foreign Scan on ftprt1_p1 t1
-               ->  Foreign Scan on ftprt1_p2 t1_1
+               Async subplans: 2 
+               ->  Async Foreign Scan on ftprt1_p1 t1
+               ->  Async Foreign Scan on ftprt1_p2 t1_1
          ->  Hash
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2
-                     ->  Foreign Scan on ftprt2_p2 t2_1
-(11 rows)
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on ftprt2_p1 t2
+                     ->  Async Foreign Scan on ftprt2_p2 t2_1
+(13 rows)
 
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2
t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
 
        wr       |       wr       
@@ -8566,11 +8591,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
  Sort
    Sort Key: t1.a, t1.b
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2 
+         ->  Async Foreign Scan
                Relations: (ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE
t1.a%25= 0 ORDER BY 1,2;
 
   a  |  b  
@@ -8623,21 +8649,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
 -- test FOR UPDATE; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF
t1;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                             QUERY PLAN                             
+--------------------------------------------------------------------
  LockRows
    ->  Sort
          Sort Key: t1.a
          ->  Hash Join
                Hash Cond: (t2.b = t1.a)
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2
-                     ->  Foreign Scan on ftprt2_p2 t2_1
+                     Async subplans: 2 
+                     ->  Async Foreign Scan on ftprt2_p1 t2
+                     ->  Async Foreign Scan on ftprt2_p2 t2_1
                ->  Hash
                      ->  Append
-                           ->  Foreign Scan on ftprt1_p1 t1
-                           ->  Foreign Scan on ftprt1_p2 t1_1
-(12 rows)
+                           Async subplans: 2 
+                           ->  Async Foreign Scan on ftprt1_p1 t1
+                           ->  Async Foreign Scan on ftprt1_p2 t1_1
+(14 rows)
 
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF
t1;
   a  |  b  
@@ -8672,18 +8700,19 @@ ANALYZE fpagg_tab_p3;
 SET enable_partitionwise_aggregate TO false;
 EXPLAIN (COSTS OFF)
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
-                        QUERY PLAN                         
------------------------------------------------------------
+                           QUERY PLAN                            
+-----------------------------------------------------------------
  Sort
    Sort Key: pagg_tab.a
    ->  HashAggregate
          Group Key: pagg_tab.a
          Filter: (avg(pagg_tab.b) < '22'::numeric)
          ->  Append
-               ->  Foreign Scan on fpagg_tab_p1 pagg_tab
-               ->  Foreign Scan on fpagg_tab_p2 pagg_tab_1
-               ->  Foreign Scan on fpagg_tab_p3 pagg_tab_2
-(9 rows)
+               Async subplans: 3 
+               ->  Async Foreign Scan on fpagg_tab_p1 pagg_tab
+               ->  Async Foreign Scan on fpagg_tab_p2 pagg_tab_1
+               ->  Async Foreign Scan on fpagg_tab_p3 pagg_tab_2
+(10 rows)
 
 -- Plan with partitionwise aggregates is enabled
 SET enable_partitionwise_aggregate TO true;
@@ -8694,13 +8723,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
  Sort
    Sort Key: pagg_tab.a
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 3 
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
-(9 rows)
+(10 rows)
 
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
  a  | sum  | min | count 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index bdc21b36d1..f3212aac90 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,8 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -35,6 +37,7 @@
 #include "optimizer/restrictinfo.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "postgres_fdw.h"
 #include "utils/builtins.h"
 #include "utils/float.h"
@@ -56,6 +59,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -122,11 +128,28 @@ enum FdwDirectModifyPrivateIndex
     FdwDirectModifyPrivateSetProcessed
 };
 
+/*
+ * Connection private area structure.
+ */
+typedef struct PgFdwConnpriv
+{
+    ForeignScanState   *leader;        /* leader node of this connection */
+    bool                busy;        /* true if this connection is busy */
+} PgFdwConnpriv;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+    PGconn       *conn;            /* connection for the scan */
+    PgFdwConnpriv *connpriv;    /* connection private memory */
+} PgFdwState;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+    PgFdwState    s;                /* common structure */
     Relation    rel;            /* relcache entry for the foreign table. NULL
                                  * for a foreign join scan. */
     TupleDesc    tupdesc;        /* tuple descriptor of scan */
@@ -137,7 +160,7 @@ typedef struct PgFdwScanState
     List       *retrieved_attrs;    /* list of retrieved attribute numbers */
 
     /* for remote query execution */
-    PGconn       *conn;            /* connection for the scan */
+    bool        result_ready;
     unsigned int cursor_number; /* quasi-unique ID for my cursor */
     bool        cursor_exists;    /* have we created the cursor? */
     int            numParams;        /* number of parameters passed to query */
@@ -153,6 +176,12 @@ typedef struct PgFdwScanState
     /* batch-level state, for optimizing rewinds and avoiding useless fetch */
     int            fetch_ct_2;        /* Min(# of fetches done, 2) */
     bool        eof_reached;    /* true if last fetch reached EOF */
+    bool        run_async;        /* true if run asynchronously */
+    bool        inqueue;        /* true if this node is in waiter queue */
+    ForeignScanState *waiter;    /* Next node to run a query among nodes
+                                 * sharing the same connection */
+    ForeignScanState *last_waiter;    /* last waiting node in waiting queue.
+                                     * valid only on the leader node */
 
     /* working memory contexts */
     MemoryContext batch_cxt;    /* context holding current batch of tuples */
@@ -166,11 +195,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+    PgFdwState    s;                /* common structure */
     Relation    rel;            /* relcache entry for the foreign table */
     AttInMetadata *attinmeta;    /* attribute datatype conversion metadata */
 
     /* for remote query execution */
-    PGconn       *conn;            /* connection for the scan */
     char       *p_name;            /* name of prepared statement, if created */
 
     /* extracted fdw_private data */
@@ -197,6 +226,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+    PgFdwState    s;                /* common structure */
     Relation    rel;            /* relcache entry for the foreign table */
     AttInMetadata *attinmeta;    /* attribute datatype conversion metadata */
 
@@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
                                             RangeTblEntry *target_rte,
                                             Relation target_relation);
@@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
                                          RelOptInfo *input_rel,
                                          RelOptInfo *output_rel,
                                          void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+                                              WaitEventSet *wes,
+                                              void *caller_data, bool reinit);
 
 /*
  * Helper functions
@@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
                                       EquivalenceClass *ec, EquivalenceMember *em,
                                       void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
                                                RangeTblEntry *rte,
@@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
     routine->IterateForeignScan = postgresIterateForeignScan;
     routine->ReScanForeignScan = postgresReScanForeignScan;
     routine->EndForeignScan = postgresEndForeignScan;
+    routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
     /* Functions for updating foreign tables */
     routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
     /* Support functions for upper relation push-down */
     routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+    /* Support functions for async execution */
+    routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+    routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
     PG_RETURN_POINTER(routine);
 }
 
@@ -1434,12 +1476,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
      * Get connection to the foreign server.  Connection manager will
      * establish new connection if necessary.
      */
-    fsstate->conn = GetConnection(user, false);
+    fsstate->s.conn = GetConnection(user, false);
+    fsstate->s.connpriv = (PgFdwConnpriv *)
+        GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
+    fsstate->s.connpriv->leader = NULL;
+    fsstate->s.connpriv->busy = false;
+    fsstate->waiter = NULL;
+    fsstate->last_waiter = node;
 
     /* Assign a unique ID for my cursor */
-    fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+    fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
     fsstate->cursor_exists = false;
 
+    /* Initialize async execution status */
+    fsstate->run_async = false;
+    fsstate->inqueue = false;
+
     /* Get private info created by planner functions. */
     fsstate->query = strVal(list_nth(fsplan->fdw_private,
                                      FdwScanPrivateSelectSql));
@@ -1487,40 +1539,259 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
                              &fsstate->param_values);
 }
 
+/*
+ * Async queue manipuration functions
+ */
+
+/*
+ * add_async_waiter:
+ *
+ * adds the node to the end of waiter queue. Immediately starts the node if no
+ * node is running
+ */
+static inline void
+add_async_waiter(ForeignScanState *node)
+{
+    PgFdwScanState   *fsstate = GetPgFdwScanState(node);
+    ForeignScanState *leader = fsstate->s.connpriv->leader;
+
+    /* do nothing if the node is already in the queue or already eof'ed */
+    if (leader == node || fsstate->inqueue || fsstate->eof_reached)
+        return;
+
+    if (leader == NULL)
+    {
+        /* immediately send request if not busy */
+        request_more_data(node);
+    }
+    else
+    {
+        PgFdwScanState   *leader_state = GetPgFdwScanState(leader);
+        PgFdwScanState   *last_waiter_state
+            = GetPgFdwScanState(leader_state->last_waiter);
+
+        last_waiter_state->waiter = node;
+        leader_state->last_waiter = node;
+        fsstate->inqueue = true;
+    }
+}
+
+/*
+ * move_to_next_waiter:
+ *
+ * Makes the first waiter be next leader
+ * Returns the new leader or NULL if there's no waiter.
+ */
+static inline ForeignScanState *
+move_to_next_waiter(ForeignScanState *node)
+{
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
+    ForeignScanState *ret = fsstate->waiter;
+
+    Assert(fsstate->s.connpriv->leader = node);
+    
+    if (ret)
+    {
+        PgFdwScanState *retstate = GetPgFdwScanState(ret);
+        fsstate->waiter = NULL;
+        retstate->last_waiter = fsstate->last_waiter;
+        retstate->inqueue = false;
+    }
+
+    fsstate->s.connpriv->leader = ret;
+
+    return ret;
+}
+
+/*
+ * remove the node from waiter queue
+ *
+ * This is a bit different from the two above in the sense that this can
+ * operate on connection leader. The result is absorbed when this is called on
+ * active leader.
+ *
+ * Returns true if the node was found.
+ */
+static inline bool
+remove_async_node(ForeignScanState *node)
+{
+    PgFdwScanState        *fsstate = GetPgFdwScanState(node);
+    ForeignScanState    *leader = fsstate->s.connpriv->leader;
+    PgFdwScanState        *leader_state;
+    ForeignScanState    *prev;
+    PgFdwScanState        *prev_state;
+    ForeignScanState    *cur;
+
+    /* no need to remove me */
+    if (!leader || !fsstate->inqueue)
+        return false;
+
+    leader_state = GetPgFdwScanState(leader);
+
+    /* Remove the leader node */
+    if (leader == node)
+    {
+        ForeignScanState    *next_leader;
+
+        if (leader_state->s.connpriv->busy)
+        {
+            /*
+             * this node is waiting for result, absorb the result first so
+             * that the following commands can be sent on the connection.
+             */
+            PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+            PGconn *conn = leader_state->s.conn;
+
+            while(PQisBusy(conn))
+                PQclear(PQgetResult(conn));
+            
+            leader_state->s.connpriv->busy = false;
+        }
+
+        /* Make the first waiter the leader */
+        if (leader_state->waiter)
+        {
+            PgFdwScanState *next_leader_state;
+
+            next_leader = leader_state->waiter;
+            next_leader_state = GetPgFdwScanState(next_leader);
+
+            leader_state->s.connpriv->leader = next_leader;
+            next_leader_state->last_waiter = leader_state->last_waiter;
+        }
+        leader_state->waiter = NULL;
+
+        return true;
+    }
+
+    /*
+     * Just remove the node in queue
+     *
+     * This function is called on the shutdown path. We don't bother
+     * considering faster way to do this.
+     */
+    prev = leader;
+    prev_state = leader_state;
+    cur =  GetPgFdwScanState(prev)->waiter;
+    while (cur)
+    {
+        PgFdwScanState *curstate = GetPgFdwScanState(cur);
+
+        if (cur == node)
+        {
+            prev_state->waiter = curstate->waiter;
+            if (leader_state->last_waiter == cur)
+                leader_state->last_waiter = prev;
+            else
+                leader_state->last_waiter = cur;
+
+            fsstate->inqueue = false;
+
+            return true;
+        }
+        prev = cur;
+        prev_state = curstate;
+        cur = curstate->waiter;
+    }
+
+    return false;
+}
+
 /*
  * postgresIterateForeignScan
- *        Retrieve next row from the result set, or clear tuple slot to indicate
- *        EOF.
+ *        Retrieve next row from the result set.
+ *
+ *        For synchronous nodes, returns clear tuples slot to indicte EOF.
+ *
+ *        If the node is asynchronous one, clear tuple slot has two meanings.
+ *        If the caller receives clear tuple slot, asyncstate indicates wheter
+ *        the node is EOF (AS_AVAILABLE) or waiting for data to
+ *        come(AS_WAITING).
  */
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
-    /*
-     * If this is the first call after Begin or ReScan, we need to create the
-     * cursor on the remote side.
-     */
-    if (!fsstate->cursor_exists)
-        create_cursor(node);
+    if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached)
+    {
+        /* we've run out, get some more tuples */
+        if (!node->fs_async)
+        {
+            /* finish running query to send my command */
+            if (!fsstate->s.connpriv->busy)
+                vacate_connection((PgFdwState *)fsstate, false);
+
+            request_more_data(node);
+
+            /*
+             * Fetch the result immediately. This executes the next waiter if
+             * any.
+             */
+            fetch_received_data(node);
+        }
+        else if (!fsstate->s.connpriv->busy)
+        {
+            /* If the connection is not busy, just send the request. */
+            request_more_data(node);
+        }
+        else
+        {
+            /* This connection is busy */
+            bool available = true;
+            ForeignScanState *leader = fsstate->s.connpriv->leader;
+            PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+
+            /* Check if the result is immediately available */
+            if (PQisBusy(leader_state->s.conn))
+            {
+                int rc = WaitLatchOrSocket(NULL,
+                                           WL_SOCKET_READABLE | WL_TIMEOUT |
+                                           WL_EXIT_ON_PM_DEATH,
+                                           PQsocket(leader_state->s.conn), 0,
+                                           WAIT_EVENT_ASYNC_WAIT);
+                if (!(rc & WL_SOCKET_READABLE))
+                    available = false;
+            }
+
+            /* The next waiter is executed automatcically */
+            if (available)
+                fetch_received_data(leader);
+
+            /* add the requested node */
+            add_async_waiter(node);
+
+            /* add the previous leader */
+            add_async_waiter(leader);
+        }
+    }
 
     /*
-     * Get some more tuples, if we've run out.
+     * If we haven't received a result for the given node this time,
+     * return with no tuple to give way to another node.
      */
     if (fsstate->next_tuple >= fsstate->num_tuples)
     {
-        /* No point in another fetch if we already detected EOF, though. */
-        if (!fsstate->eof_reached)
-            fetch_more_data(node);
-        /* If we didn't get any tuples, must be end of data. */
-        if (fsstate->next_tuple >= fsstate->num_tuples)
-            return ExecClearTuple(slot);
+        if (fsstate->eof_reached)
+        {
+            fsstate->result_ready = true;
+            node->ss.ps.asyncstate = AS_AVAILABLE;
+        }
+        else
+        {
+            fsstate->result_ready = false;
+            node->ss.ps.asyncstate = AS_WAITING;
+        }
+            
+        return ExecClearTuple(slot);
     }
 
     /*
      * Return the next tuple.
      */
+    fsstate->result_ready = true;
+    node->ss.ps.asyncstate = AS_AVAILABLE;
     ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
                        slot,
                        false);
@@ -1535,7 +1806,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     char        sql[64];
     PGresult   *res;
 
@@ -1543,6 +1814,8 @@ postgresReScanForeignScan(ForeignScanState *node)
     if (!fsstate->cursor_exists)
         return;
 
+    vacate_connection((PgFdwState *)fsstate, true);
+
     /*
      * If any internal parameters affecting this node have changed, we'd
      * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1571,9 +1844,9 @@ postgresReScanForeignScan(ForeignScanState *node)
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_exec_query(fsstate->conn, sql);
+    res = pgfdw_exec_query(fsstate->s.conn, sql);
     if (PQresultStatus(res) != PGRES_COMMAND_OK)
-        pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+        pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
     PQclear(res);
 
     /* Now force a fresh FETCH. */
@@ -1591,7 +1864,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
     /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
     if (fsstate == NULL)
@@ -1599,15 +1872,31 @@ postgresEndForeignScan(ForeignScanState *node)
 
     /* Close the cursor if open, to prevent accumulation of cursors */
     if (fsstate->cursor_exists)
-        close_cursor(fsstate->conn, fsstate->cursor_number);
+        close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
     /* Release remote connection */
-    ReleaseConnection(fsstate->conn);
-    fsstate->conn = NULL;
+    ReleaseConnection(fsstate->s.conn);
+    fsstate->s.conn = NULL;
 
     /* MemoryContexts will be deleted automatically. */
 }
 
+/*
+ * postgresShutdownForeignScan
+ *        Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+    ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+    if (plan->operation != CMD_SELECT)
+        return;
+
+    /* remove the node from waiting queue */
+    remove_async_node(node);
+}
+
 /*
  * postgresAddForeignUpdateTargets
  *        Add resjunk column(s) needed for update/delete on a foreign table
@@ -2372,7 +2661,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
      * Get connection to the foreign server.  Connection manager will
      * establish new connection if necessary.
      */
-    dmstate->conn = GetConnection(user, false);
+    dmstate->s.conn = GetConnection(user, false);
+    dmstate->s.connpriv = (PgFdwConnpriv *)
+        GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
 
     /* Update the foreign-join-related fields. */
     if (fsplan->scan.scanrelid == 0)
@@ -2457,7 +2748,11 @@ postgresIterateDirectModify(ForeignScanState *node)
      * If this is the first call after Begin, execute the statement.
      */
     if (dmstate->num_tuples == -1)
+    {
+        /* finish running query to send my command */
+        vacate_connection((PgFdwState *)dmstate, true);
         execute_dml_stmt(node);
+    }
 
     /*
      * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2504,8 +2799,8 @@ postgresEndDirectModify(ForeignScanState *node)
         PQclear(dmstate->result);
 
     /* Release remote connection */
-    ReleaseConnection(dmstate->conn);
-    dmstate->conn = NULL;
+    ReleaseConnection(dmstate->s.conn);
+    dmstate->s.conn = NULL;
 
     /* MemoryContext will be deleted automatically. */
 }
@@ -2703,6 +2998,7 @@ estimate_path_cost_size(PlannerInfo *root,
         List       *local_param_join_conds;
         StringInfoData sql;
         PGconn       *conn;
+        PgFdwConnpriv *connpriv;
         Selectivity local_sel;
         QualCost    local_cost;
         List       *fdw_scan_tlist = NIL;
@@ -2747,6 +3043,18 @@ estimate_path_cost_size(PlannerInfo *root,
 
         /* Get the remote estimate */
         conn = GetConnection(fpinfo->user, false);
+        connpriv = GetConnectionSpecificStorage(fpinfo->user,
+                                                sizeof(PgFdwConnpriv));
+        if (connpriv)
+        {
+            PgFdwState tmpstate;
+            tmpstate.conn = conn;
+            tmpstate.connpriv = connpriv;
+
+            /* finish running query to send my command */
+            vacate_connection(&tmpstate, true);
+        }
+
         get_remote_estimate(sql.data, conn, &rows, &width,
                             &startup_cost, &total_cost);
         ReleaseConnection(conn);
@@ -3317,11 +3625,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     ExprContext *econtext = node->ss.ps.ps_ExprContext;
     int            numParams = fsstate->numParams;
     const char **values = fsstate->param_values;
-    PGconn       *conn = fsstate->conn;
+    PGconn       *conn = fsstate->s.conn;
     StringInfoData buf;
     PGresult   *res;
 
@@ -3384,50 +3692,127 @@ create_cursor(ForeignScanState *node)
 }
 
 /*
- * Fetch some more rows from the node's cursor.
+ * Sends the next request of the node. If the given node is different from the
+ * current connection leader, pushes it back to waiter queue and let the given
+ * node be the leader.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
 {
-    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
+    ForeignScanState *leader = fsstate->s.connpriv->leader;
+    PGconn       *conn = fsstate->s.conn;
+    char        sql[64];
+
+    /* must be non-busy */
+    Assert(!fsstate->s.connpriv->busy);
+    /* must be not-eof */
+    Assert(!fsstate->eof_reached);
+
+    /*
+     * If this is the first call after Begin or ReScan, we need to create the
+     * cursor on the remote side.
+     */
+    if (!fsstate->cursor_exists)
+        create_cursor(node);
+
+    snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+             fsstate->fetch_size, fsstate->cursor_number);
+
+    if (!PQsendQuery(conn, sql))
+        pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+    fsstate->s.connpriv->busy = true;
+
+    /* Let the node be the leader if it is different from current one */
+    if (leader != node)
+    {
+        /*
+         * If the connection leader exists, insert the node as the connection
+         * leader making the current leader be the first waiter.
+         */
+        if (leader != NULL)
+        {
+            remove_async_node(node);
+            fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter;
+            fsstate->waiter = leader;
+        }
+        else
+        {
+            fsstate->last_waiter = node;
+            fsstate->waiter = NULL;
+        }
+
+        fsstate->s.connpriv->leader = node;
+    }
+}
+
+/*
+ * Fetches received data and automatically send requests of the next waiter.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
     PGresult   *volatile res = NULL;
     MemoryContext oldcontext;
+    ForeignScanState *waiter;
+
+    /* I should be the current connection leader */
+    Assert(fsstate->s.connpriv->leader == node);
 
     /*
      * We'll store the tuples in the batch_cxt.  First, flush the previous
-     * batch.
+     * batch if no tuple is remaining
      */
-    fsstate->tuples = NULL;
-    MemoryContextReset(fsstate->batch_cxt);
+    if (fsstate->next_tuple >= fsstate->num_tuples)
+    {
+        fsstate->tuples = NULL;
+        fsstate->num_tuples = 0;
+        MemoryContextReset(fsstate->batch_cxt);
+    }
+    else if (fsstate->next_tuple > 0)
+    {
+        /* move the remaining tuples to the beginning of the store */
+        int n = 0;
+
+        while(fsstate->next_tuple < fsstate->num_tuples)
+            fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+        fsstate->num_tuples = n;
+    }
+
     oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
     /* PGresult must be released before leaving this function. */
     PG_TRY();
     {
-        PGconn       *conn = fsstate->conn;
+        PGconn       *conn = fsstate->s.conn;
         char        sql[64];
-        int            numrows;
+        int            addrows;
+        size_t        newsize;
         int            i;
 
         snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
                  fsstate->fetch_size, fsstate->cursor_number);
 
-        res = pgfdw_exec_query(conn, sql);
+        res = pgfdw_get_result(conn, sql);
         /* On error, report the original query, not the FETCH. */
         if (PQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
         /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
-        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-        fsstate->num_tuples = numrows;
-        fsstate->next_tuple = 0;
+        addrows = PQntuples(res);
+        newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+        if (fsstate->tuples)
+            fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+        else
+            fsstate->tuples = (HeapTuple *) palloc(newsize);
 
-        for (i = 0; i < numrows; i++)
+        for (i = 0; i < addrows; i++)
         {
             Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-            fsstate->tuples[i] =
+            fsstate->tuples[fsstate->num_tuples + i] =
                 make_tuple_from_result_row(res, i,
                                            fsstate->rel,
                                            fsstate->attinmeta,
@@ -3437,22 +3822,75 @@ fetch_more_data(ForeignScanState *node)
         }
 
         /* Update fetch_ct_2 */
-        if (fsstate->fetch_ct_2 < 2)
+        if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
             fsstate->fetch_ct_2++;
 
+        fsstate->next_tuple = 0;
+        fsstate->num_tuples += addrows;
+
         /* Must be EOF if we didn't get as many tuples as we asked for. */
-        fsstate->eof_reached = (numrows < fsstate->fetch_size);
+        fsstate->eof_reached = (addrows < fsstate->fetch_size);
+
+        PQclear(res);
+        res = NULL;
     }
     PG_FINALLY();
     {
+        fsstate->s.connpriv->busy = false;
+
         if (res)
             PQclear(res);
     }
     PG_END_TRY();
 
+    fsstate->s.connpriv->busy = false;
+
+    /* let the first waiter be the next leader of this connection */
+    waiter = move_to_next_waiter(node);
+
+    /* send the next request if any */
+    if (waiter)
+        request_more_data(waiter);
+
     MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate, bool clear_queue)
+{
+    PgFdwConnpriv *connpriv = fdwstate->connpriv;
+    ForeignScanState *leader;
+
+    /* the connection is alrady available */
+    if (connpriv == NULL || connpriv->leader == NULL || !connpriv->busy)
+        return;
+
+    /*
+     * let the current connection leader read the result for the running query
+     */
+    leader = connpriv->leader;
+    fetch_received_data(leader);
+
+    /* let the first waiter be the next leader of this connection */
+    move_to_next_waiter(leader);
+
+    if (!clear_queue)
+        return;
+
+    /* Clear the waiting list */
+    while (leader)
+    {
+        PgFdwScanState *fsstate = GetPgFdwScanState(leader);
+
+        fsstate->last_waiter = NULL;
+        leader = fsstate->waiter;
+        fsstate->waiter = NULL;
+    }
+}
+
 /*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
@@ -3566,7 +4004,9 @@ create_foreign_modify(EState *estate,
     user = GetUserMapping(userid, table->serverid);
 
     /* Open connection; report that we'll create a prepared statement. */
-    fmstate->conn = GetConnection(user, true);
+    fmstate->s.conn = GetConnection(user, true);
+    fmstate->s.connpriv = (PgFdwConnpriv *)
+        GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
     fmstate->p_name = NULL;        /* prepared statement not made yet */
 
     /* Set up remote query information. */
@@ -3653,6 +4093,9 @@ execute_foreign_modify(EState *estate,
            operation == CMD_UPDATE ||
            operation == CMD_DELETE);
 
+    /* finish running query to send my command */
+    vacate_connection((PgFdwState *)fmstate, true);
+
     /* Set up the prepared statement on the remote server, if we didn't yet */
     if (!fmstate->p_name)
         prepare_foreign_modify(fmstate);
@@ -3680,14 +4123,14 @@ execute_foreign_modify(EState *estate,
     /*
      * Execute the prepared statement.
      */
-    if (!PQsendQueryPrepared(fmstate->conn,
+    if (!PQsendQueryPrepared(fmstate->s.conn,
                              fmstate->p_name,
                              fmstate->p_nums,
                              p_values,
                              NULL,
                              NULL,
                              0))
-        pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+        pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -3695,10 +4138,10 @@ execute_foreign_modify(EState *estate,
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_get_result(fmstate->conn, fmstate->query);
+    res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
     if (PQresultStatus(res) !=
         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+        pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
     /* Check number of rows affected, and fetch RETURNING tuple if any */
     if (fmstate->has_returning)
@@ -3734,7 +4177,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
     /* Construct name we'll use for the prepared statement. */
     snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
-             GetPrepStmtNumber(fmstate->conn));
+             GetPrepStmtNumber(fmstate->s.conn));
     p_name = pstrdup(prep_name);
 
     /*
@@ -3744,12 +4187,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
      * the prepared statements we use in this module are simple enough that
      * the remote server will make the right choices.
      */
-    if (!PQsendPrepare(fmstate->conn,
+    if (!PQsendPrepare(fmstate->s.conn,
                        p_name,
                        fmstate->query,
                        0,
                        NULL))
-        pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+        pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -3757,9 +4200,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    res = pgfdw_get_result(fmstate->conn, fmstate->query);
+    res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
     if (PQresultStatus(res) != PGRES_COMMAND_OK)
-        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+        pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
     PQclear(res);
 
     /* This action shows that the prepare has been done. */
@@ -3888,16 +4331,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
          * We don't use a PG_TRY block here, so be careful not to throw error
          * without releasing the PGresult.
          */
-        res = pgfdw_exec_query(fmstate->conn, sql);
+        res = pgfdw_exec_query(fmstate->s.conn, sql);
         if (PQresultStatus(res) != PGRES_COMMAND_OK)
-            pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+            pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
         PQclear(res);
         fmstate->p_name = NULL;
     }
 
     /* Release remote connection */
-    ReleaseConnection(fmstate->conn);
-    fmstate->conn = NULL;
+    ReleaseConnection(fmstate->s.conn);
+    fmstate->s.conn = NULL;
 }
 
 /*
@@ -4056,9 +4499,9 @@ execute_dml_stmt(ForeignScanState *node)
      * the desired result.  This allows us to avoid assuming that the remote
      * server has the same OIDs we do for the parameters' types.
      */
-    if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+    if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
                            NULL, values, NULL, NULL, 0))
-        pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+        pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
     /*
      * Get the result, and check for success.
@@ -4066,10 +4509,10 @@ execute_dml_stmt(ForeignScanState *node)
      * We don't use a PG_TRY block here, so be careful not to throw error
      * without releasing the PGresult.
      */
-    dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+    dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
     if (PQresultStatus(dmstate->result) !=
         (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-        pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+        pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
                            dmstate->query);
 
     /* Get the number of rows affected. */
@@ -5560,6 +6003,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
     /* XXX Consider parameterized paths for the join relation */
 }
 
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+    return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection leader. Elsewise
+ * another node on this connection is the leader.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+                                  void *caller_data, bool reinit)
+{
+    PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+    /* If the caller didn't reinit, this event is already in event set */
+    if (!reinit)
+        return true;
+
+    if (fsstate->s.connpriv->leader == node)
+    {
+        AddWaitEventToSet(wes,
+                          WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+                          NULL, caller_data);
+        return true;
+    }
+
+    return false;
+}
+
+
 /*
  * Assess whether the aggregation, grouping and having operations can be pushed
  * down to the foreign server.  As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index ea052872c3..696af73408 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo
     UserMapping *user;            /* only set in use_remote_estimate mode */
 
     int            fetch_size;        /* fetch size for this remote table */
+    bool        allow_prefetch;    /* true to allow overlapped fetching  */
 
     /*
      * Name of the relation, for use while EXPLAINing ForeignScan.  It is used
@@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 1c5c37b783..69c06ac6e4 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1730,25 +1730,25 @@ INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE b SET aa = 'new';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'newtoo';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
@@ -1790,12 +1790,12 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
@@ -1854,8 +1854,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 -- Test that UPDATE/DELETE with inherited target works with row-level triggers
 CREATE TRIGGER trig_row_before
-- 
2.23.0


Re: Append with naive multiplexing of FDWs

От
Bruce Momjian
Дата:
On Thu, Dec  5, 2019 at 03:19:50PM -0500, Robert Haas wrote:
> On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote:
> > I agree with Stephen's request.  We have been waiting for the executor
> > rewrite for a while, so let's just do something simple and see how it
> > performs.
> 
> I'm sympathetic to the frustration here, and I think it would be great
> if we could find a way forward that doesn't involve waiting for a full
> rewrite of the executor.  However, I seem to remember that when we
> tested the various patches that various people had written for this
> feature (I wrote one, too) they all had a noticeable performance
> penalty in the case of a plain old Append that involved no FDWs and
> nothing asynchronous. I don't think it's OK to have, say, a 2%
> regression on every query that involves an Append, because especially
> now that we have partitioning, that's a lot of queries.
> 
> I don't know whether this patch has that kind of problem. If it
> doesn't, I would consider that a promising sign.

Certainly any overhead on normal queries would be unacceptable.

-- 
  Bruce Momjian  <bruce@momjian.us>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +



Re: Append with naive multiplexing of FDWs

От
Kyotaro Horiguchi
Дата:
Hello.

I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.


At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <bruce@momjian.us> wrote in 
> Certainly any overhead on normal queries would be unacceptable.

I took performance numbers on the current shape of the async execution
patch for the following scan cases.

t0   : single local table (parallel disabled)
pll  : local partitioning (local Append, parallel disabled)
ft0  : single foreign table
pf0  : inheritance on 4 foreign tables, single connection
pf1  : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connections

The benchmarking system is configured as the follows on a single
machine.

          [ benchmark client   ]
           |                  |
    (localhost:5433)    (localhost:5432)
           |                  |
   +----+  |   +------+       |
   |    V  V   V      |       V
   | [master server]  |  [async server]
   |       V          |       V
   +--fdw--+          +--fdw--+


The patch works roughly in the following steps.

1. Planner decides how many children out of an append can run
  asynchrnously (called as async-capable.).

2. While ExecInit if an Append doesn't have an async-capable children,
  ExecAppend that is exactly the same function is set as
  ExecProcNode. Otherwise ExecAppendAsync is used.

If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.

3. postgresql_fdw always runs async-capable code path.

If the postgres_fdw part causes degradation, ft0 reflects that.


The tables has two integers and the query does sum(a) on all tuples.

With the default fetch_size = 100, number is run time in ms.  Each
number is the average of 14 runs.

     master  patched   gain
t0   7325    7130     +2.7%
pll  4558    4484     +1.7%
ft0  3670    3675     -0.1%
pf0  2322    1550    +33.3%
pf1  2367    1475    +37.7%
ptf0 2517    1624    +35.5%
ptf1 2343    1497    +36.2%

With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.

     master  patched   gain
t0   7212    7252     -0.6%
pll  4546    4397     +3.3%
ft0  3712    3731     -0.5%
pf0  2131    1570    +26.4%
pf1  1926    1189    +38.3%
ptf0 2001    1557    +22.2%
ptf1 1903    1193    +37.4%

FWIW, attached are the test script.

gentblr2.sql: Table creation script.
testrun.sh  : Benchmarking script.


regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
SELECT :scale  * 0 as th0,
       :scale  * 1 as th1,
       :scale  * 2 as th2,
       :scale  * 3 as th3,
       :scale  * 4 as th4,
       :scale  * 10 as th10,
       :scale  * 20 as th20,
       :scale  * 30 as th30,
       :scale  * 40 as th40,
       :scale  * 100 as th100,
       :scale  * 1000 as th1000
\gset

DROP TABLE IF EXISTS t0 CASCADE;
DROP TABLE IF EXISTS pl CASCADE;
DROP TABLE IF EXISTS pll CASCADE;
DROP TABLE IF EXISTS pf0 CASCADE;
DROP TABLE IF EXISTS pf1 CASCADE;
DROP TABLE IF EXISTS ptf0;
DROP TABLE IF EXISTS ptf1;

CREATE TABLE pl (a int, b int);
CREATE TABLE cl1 (LIKE pl) INHERITS (pl);
CREATE TABLE cl2 (LIKE pl) INHERITS (pl);
CREATE TABLE cl3 (LIKE pl) INHERITS (pl);
CREATE TABLE cl4 (LIKE pl) INHERITS (pl);
INSERT INTO cl1 (SELECT a, a FROM generate_series(:th0, :th1 - 1) a);
INSERT INTO cl2 (SELECT a, a FROM generate_series(:th1, :th2 - 1) a);
INSERT INTO cl3 (SELECT a, a FROM generate_series(:th2, :th3 - 1) a);
INSERT INTO cl4 (SELECT a, a FROM generate_series(:th3, :th4 - 1) a);

CREATE TABLE pll (a int, b int);
CREATE TABLE cll1 (LIKE pl) INHERITS (pll);
CREATE TABLE cll2 (LIKE pl) INHERITS (pll);
CREATE TABLE cll3 (LIKE pl) INHERITS (pll);
CREATE TABLE cll4 (LIKE pl) INHERITS (pll);
INSERT INTO cll1 (SELECT a, a FROM generate_series(:th0, :th10 - 1) a);
INSERT INTO cll2 (SELECT a, a FROM generate_series(:th10, :th20 - 1) a);
INSERT INTO cll3 (SELECT a, a FROM generate_series(:th20, :th30 - 1) a);
INSERT INTO cll4 (SELECT a, a FROM generate_series(:th30, :th40 - 1) a);


CREATE TABLE t0  (LIKE pl);
INSERT INTO t0 (SELECT a, a FROM generate_series(0, :th100 - 1) a);

DROP SERVER IF EXISTS svl CASCADE;
DROP SERVER IF EXISTS sv0 CASCADE;
DROP SERVER IF EXISTS sv1 CASCADE;
DROP SERVER IF EXISTS sv2 CASCADE;
DROP SERVER IF EXISTS sv3 CASCADE;
DROP SERVER IF EXISTS sv4 CASCADE;

CREATE SERVER svl FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname',
fetch_size:'fetchsize');
 
CREATE SERVER sv0 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname',
fetch_size:'fetchsize');
 
CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname',
fetch_size:'fetchsize');
 
CREATE SERVER sv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname',
fetch_size:'fetchsize');
 
CREATE SERVER sv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname',
fetch_size:'fetchsize');
 
CREATE SERVER sv4 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname',
fetch_size:'fetchsize');
 

CREATE USER MAPPING FOR public SERVER svl;
CREATE USER MAPPING FOR public SERVER sv0;
CREATE USER MAPPING FOR public SERVER sv1;
CREATE USER MAPPING FOR public SERVER sv2;
CREATE USER MAPPING FOR public SERVER sv3;
CREATE USER MAPPING FOR public SERVER sv4;

CREATE FOREIGN TABLE ft0 (a int, b int) SERVER svl OPTIONS (table_name 't0');

CREATE FOREIGN TABLE ft10 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ft20 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ft30 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ft40 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl4');
CREATE FOREIGN TABLE ft11 (a int, b int) SERVER sv1 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ft22 (a int, b int) SERVER sv2 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ft33 (a int, b int) SERVER sv3 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ft44 (a int, b int) SERVER sv4 OPTIONS (table_name 'cl4');

CREATE TABLE pf0 (LIKE pl);
ALTER FOREIGN TABLE ft10 INHERIT pf0;
ALTER FOREIGN TABLE ft20 INHERIT pf0;
ALTER FOREIGN TABLE ft30 INHERIT pf0;
ALTER FOREIGN TABLE ft40 INHERIT pf0;

CREATE TABLE pf1 (LIKE pl);
ALTER FOREIGN TABLE ft11 INHERIT pf1;
ALTER FOREIGN TABLE ft22 INHERIT pf1;
ALTER FOREIGN TABLE ft33 INHERIT pf1;
ALTER FOREIGN TABLE ft44 INHERIT pf1;

CREATE FOREIGN TABLE ftp10 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ftp20 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ftp30 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ftp40 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl4');
CREATE FOREIGN TABLE ftp11 (a int, b int) SERVER sv1 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ftp22 (a int, b int) SERVER sv2 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ftp33 (a int, b int) SERVER sv3 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ftp44 (a int, b int) SERVER sv4 OPTIONS (table_name 'cl4');

CREATE TABLE ptf0 (a int, b int) PARTITION BY RANGE (a);
ALTER TABLE ptf0 ATTACH PARTITION ftp10 FOR VALUES FROM (:th0) TO (:th1);
ALTER TABLE ptf0 ATTACH PARTITION ftp20 FOR VALUES FROM (:th1) TO (:th2);
ALTER TABLE ptf0 ATTACH PARTITION ftp30 FOR VALUES FROM (:th2) TO (:th3);
ALTER TABLE ptf0 ATTACH PARTITION ftp40 FOR VALUES FROM (:th3) TO (:th4);

CREATE TABLE ptf1 (a int, b int) PARTITION BY RANGE (a);
ALTER TABLE ptf1 ATTACH PARTITION ftp11 FOR VALUES FROM (:th0) TO (:th1);
ALTER TABLE ptf1 ATTACH PARTITION ftp22 FOR VALUES FROM (:th1) TO (:th2);
ALTER TABLE ptf1 ATTACH PARTITION ftp33 FOR VALUES FROM (:th2) TO (:th3);
ALTER TABLE ptf1 ATTACH PARTITION ftp44 FOR VALUES FROM (:th3) TO (:th4);

ANALYZE;
#! /bin/bash

function do_test() {
    echo $1
    for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement
=0; set client_min_messages=log; explain analyze select sum(a) from $1"; done | grep LOG
 
}

function do_test_union1() {
    echo "UNION_pf0"
    for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement
=0; set client_min_messages=log; SELECT sum(a) FROM (SELECT a FROM ft10 UNION ALL SELECT a FROM ft20 UNION ALL SELECT a
FROMft30 UNION ALL SELECT a FROM ft40) as pf0"; done | grep LOG
 
}
function do_test_union2() {
    echo "UNION_pf1"
    for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement
=0; set client_min_messages=log; SELECT sum(a) FROM (SELECT a FROM ft11 UNION ALL SELECT a FROM ft22 UNION ALL SELECT a
FROMft33 UNION ALL SELECT a FROM ft44) as pf1"; done | grep LOG
 
}

function warmup() {
    for i in $(seq 1 5); do psql postgres -c "set log_min_duration_statement = -1; select sum(a) from $1"; done 1>&2 >
/dev/null
}

#for t in "t0" "pll";
#for t in "ft0" "pf0" "pf1" "ptf0"  "ptf1";
#for t in "pf0" "ptf0";
for t in "t0" "pll" "ft0" "pf0" "pf1" "ptf0"  "ptf1";
  do
   warmup $t
   do_test $t
  done
exit
for t in "ft0" "pf0" "pf1" "ptf0"  "ptf1";
do
  warmup $t
  do_test $t
done
#do_test_union1
#do_test_union2

Re: Re: Append with naive multiplexing of FDWs

От
"movead.li@highgo.ca"
Дата:
Hello

I have tested the patch with a partition table with several foreign
partitions living on seperate data nodes. The initial testing was done
with a partition table having 3 foreign partitions, test was done with
variety of scale facters. The seonnd test was with fixed data per data
node but number of data nodes were increased incrementally to see
the peformance impact as more nodes are added to the cluster. The
test three is similar to the initial test but with much huge data and
4 nodes.

The results are summary is given below and test script attached:

Test ENV
Parent node:2Core 8G
Child Nodes:2Core 4G


Test one:

1.1 The partition struct as below:

 [ ptf:(a int, b int, c varchar)]
    (Parent node)
        |             |             |
    [ptf1]      [ptf2]      [ptf3]
 (Node1)   (Node2)    (Node3)

The table data is partitioned across nodes, the test is done using a
simple select query and a count aggregate as shown below. The result
is an average of executing each query multiple times to ensure reliable
and consistent results.

①select * from ptf where b = 100;
②select count(*) from ptf;

1.2. Test Results

 For ① result:
       scalepernode    master    patched     performance
           2G                    7s             2s               350%
           5G                    173s         63s             275%
           10G                  462s         156s           296%
           20G                  968s         327s           296%
           30G                  1472s       494s           297%
           
 For ② result:
       scalepernode    master    patched     performance
           2G                    1079s       291s           370%
           5G                    2688s       741s           362%
           10G                  4473s       1493s         299%

It takes too long time to test a aggregate so the test was done with a
smaller data size.


1.3. summary

With the table partitioned over 3 nodes, the average performance gain
across variety of scale factors is almost 300%


Test Two
2.1 The partition struct as below:

 [ ptf:(a int, b int, c varchar)]
    (Parent node)
        |             |             |
    [ptf1]         ...      [ptfN]
 (Node1)      (...)    (NodeN)

①select * from ptf
②select * from ptf where b = 100;

This test is done with same size of data per node but table is partitioned
across N number of nodes. Each varation (master or patches) is tested
at-least 3 times to get reliable and consistent results. The purpose of the
test is to see impact on performance as number of data nodes are increased.

2.2 The results

For ① result(scalepernode=2G):
    nodenumber  master    patched     performance
             2             432s        180s              240%
             3             636s         223s             285%
             4             830s         283s             293%
             5             1065s       361s             295%
For ② result(scalepernode=10G):
    nodenumber  master    patched     performance
             2             281s        140s             201%
             3             421s        140s             300%
             4             562s        141s             398%
             5             702s        141s             497%
             6             833s        139s             599%
             7             986s        141s             699%
             8             1125s      140s             803%


Test Three

This test is similar to the [test one] but with much huge data and 
4 nodes.

For ① result:
    scalepernode    master    patched     performance
      100G                6592s       1649s         399%
For ② result:
    scalepernode    master    patched     performance
      100G                35383      12363         286%
The result show it work well in much huge data.


Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but  the parent can only
sequennly process the data from data nodes.

Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.



Highgo Software (Canada/China/Pakistan)
URL : www.highgo.ca
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
Вложения

Re: Append with naive multiplexing of FDWs

От
Ahsan Hadi
Дата:
Hi Hackers,

Sharing the email below from Movead Li, I believe he wanted to share the benchmarking results as a response to this email thread but it started a new thread.. Here it is...

"
Hello

I have tested the patch with a partition table with several foreign
partitions living on seperate data nodes. The initial testing was done
with a partition table having 3 foreign partitions, test was done with
variety of scale facters. The seonnd test was with fixed data per data
node but number of data nodes were increased incrementally to see
the peformance impact as more nodes are added to the cluster. The
test three is similar to the initial test but with much huge data and
4 nodes.

The results are summary is given below and test script attached:

Test ENV
Parent node:2Core 8G
Child Nodes:2Core 4G


Test one:

1.1 The partition struct as below:

 [ ptf:(a int, b int, c varchar)]
    (Parent node)
        |             |             |
    [ptf1]      [ptf2]      [ptf3]
 (Node1)   (Node2)    (Node3)

The table data is partitioned across nodes, the test is done using a
simple select query and a count aggregate as shown below. The result
is an average of executing each query multiple times to ensure reliable
and consistent results.

①select * from ptf where b = 100;
②select count(*) from ptf;

1.2. Test Results

 For ① result:
       scalepernode    master    patched     performance
           2G                    7s             2s               350%
           5G                    173s         63s             275%
           10G                  462s         156s           296%
           20G                  968s         327s           296%
           30G                  1472s       494s           297%
           
 For ② result:
       scalepernode    master    patched     performance
           2G                    1079s       291s           370%
           5G                    2688s       741s           362%
           10G                  4473s       1493s         299%

It takes too long time to test a aggregate so the test was done with a
smaller data size.


1.3. summary

With the table partitioned over 3 nodes, the average performance gain
across variety of scale factors is almost 300%


Test Two
2.1 The partition struct as below:

 [ ptf:(a int, b int, c varchar)]
    (Parent node)
        |             |             |
    [ptf1]         ...      [ptfN]
 (Node1)      (...)    (NodeN)

①select * from ptf
②select * from ptf where b = 100;

This test is done with same size of data per node but table is partitioned
across N number of nodes. Each varation (master or patches) is tested
at-least 3 times to get reliable and consistent results. The purpose of the
test is to see impact on performance as number of data nodes are increased.

2.2 The results

For ① result(scalepernode=2G):
    nodenumber  master    patched     performance
             2             432s        180s              240%
             3             636s         223s             285%
             4             830s         283s             293%
             5             1065s       361s             295%
For ② result(scalepernode=10G):
    nodenumber  master    patched     performance
             2             281s        140s             201%
             3             421s        140s             300%
             4             562s        141s             398%
             5             702s        141s             497%
             6             833s        139s             599%
             7             986s        141s             699%
             8             1125s      140s             803%


Test Three

This test is similar to the [test one] but with much huge data and 
4 nodes.

For ① result:
    scalepernode    master    patched     performance
      100G                6592s       1649s         399%
For ② result:
    scalepernode    master    patched     performance
      100G                35383      12363         286%
The result show it work well in much huge data.


Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but  the parent can only
sequennly process the data from data nodes.

Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.
"



On Thu, Dec 12, 2019 at 5:41 PM Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote:
Hello.

I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.


At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <bruce@momjian.us> wrote in
> Certainly any overhead on normal queries would be unacceptable.

I took performance numbers on the current shape of the async execution
patch for the following scan cases.

t0   : single local table (parallel disabled)
pll  : local partitioning (local Append, parallel disabled)
ft0  : single foreign table
pf0  : inheritance on 4 foreign tables, single connection
pf1  : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connections

The benchmarking system is configured as the follows on a single
machine.

          [ benchmark client   ]
           |                  |
    (localhost:5433)    (localhost:5432)
           |                  |
   +----+  |   +------+       |
   |    V  V   V      |       V
   | [master server]  |  [async server]
   |       V          |       V
   +--fdw--+          +--fdw--+


The patch works roughly in the following steps.

1. Planner decides how many children out of an append can run
  asynchrnously (called as async-capable.).

2. While ExecInit if an Append doesn't have an async-capable children,
  ExecAppend that is exactly the same function is set as
  ExecProcNode. Otherwise ExecAppendAsync is used.

If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.

3. postgresql_fdw always runs async-capable code path.

If the postgres_fdw part causes degradation, ft0 reflects that.


The tables has two integers and the query does sum(a) on all tuples.

With the default fetch_size = 100, number is run time in ms.  Each
number is the average of 14 runs.

     master  patched   gain
t0   7325    7130     +2.7%
pll  4558    4484     +1.7%
ft0  3670    3675     -0.1%
pf0  2322    1550    +33.3%
pf1  2367    1475    +37.7%
ptf0 2517    1624    +35.5%
ptf1 2343    1497    +36.2%

With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.

     master  patched   gain
t0   7212    7252     -0.6%
pll  4546    4397     +3.3%
ft0  3712    3731     -0.5%
pf0  2131    1570    +26.4%
pf1  1926    1189    +38.3%
ptf0 2001    1557    +22.2%
ptf1 1903    1193    +37.4%

FWIW, attached are the test script.

gentblr2.sql: Table creation script.
testrun.sh  : Benchmarking script.


regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center
Вложения

Re: Append with naive multiplexing of FDWs

От
Bruce Momjian
Дата:
On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
> Summary
> The patch is pretty good, it works well when there were little data back to
> the parent node. The patch doesn’t provide parallel FDW scan, it ensures
> that child nodes can send data to parent in parallel but  the parent can only
> sequennly process the data from data nodes.
> 
> Providing there is no performance degrdation for non FDW append queries,
> I would recomend to consider this patch as an interim soluton while we are
> waiting for parallel FDW scan.

Wow, these are very impressive results!

-- 
  Bruce Momjian  <bruce@momjian.us>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +



Re: Append with naive multiplexing of FDWs

От
Kyotaro Horiguchi
Дата:
Thank you very much for the testing of the patch, Ahsan!

At Wed, 15 Jan 2020 15:41:04 -0500, Bruce Momjian <bruce@momjian.us> wrote in 
> On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
> > Summary
> > The patch is pretty good, it works well when there were little data back to
> > the parent node. The patch doesn’t provide parallel FDW scan, it ensures
> > that child nodes can send data to parent in parallel but the parent can only
> > sequennly process the data from data nodes.

"Parallel scan" at the moment means multiple workers fetch unique
blocks from *one* table in an arbitrated manner. In this sense
"parallel FDW scan" means multiple local workers fetch unique bundles
of tuples from *one* foreign table, which means it is running on a
single session.  That doesn't offer an advantage.

If parallel query processing worked in worker-per-table mode,
especially on partitioned tables, maybe the current FDW would work
without much of modification. But I believe asynchronous append on
foreign tables on a single process is far resource-effective and
moderately faster than parallel append.

> > Providing there is no performance degrdation for non FDW append queries,
> > I would recomend to consider this patch as an interim soluton while we are
> > waiting for parallel FDW scan.
> 
> Wow, these are very impressive results!

Thanks.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: Append with naive multiplexing of FDWs

От
Thomas Munro
Дата:
On Thu, Jan 16, 2020 at 9:41 AM Bruce Momjian <bruce@momjian.us> wrote:
> On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
> > Summary
> > The patch is pretty good, it works well when there were little data back to
> > the parent node. The patch doesn’t provide parallel FDW scan, it ensures
> > that child nodes can send data to parent in parallel but  the parent can only
> > sequennly process the data from data nodes.
> >
> > Providing there is no performance degrdation for non FDW append queries,
> > I would recomend to consider this patch as an interim soluton while we are
> > waiting for parallel FDW scan.
>
> Wow, these are very impressive results!

+1

Thanks Ahsan and Movead.  Could you please confirm which patch set you tested?



Re: Append with naive multiplexing of FDWs

От
Movead Li
Дата:
Hello Kyotaro,

>"Parallel scan" at the moment means multiple workers fetch unique
>blocks from *one* table in an arbitrated manner. In this sense
>"parallel FDW scan" means multiple local workers fetch unique bundles
>of tuples from *one* foreign table, which means it is running on a
>single session. That doesn't offer an advantage.

It maybe not "parallel FDW scan", it can be "parallel shards scan"
the local workers will pick every foreign partition to scan. I have ever
draw a picture about that you can see it in the link below.
I think the "parallel shards scan" make sence in this way.

>If parallel query processing worked in worker-per-table mode,
>especially on partitioned tables, maybe the current FDW would work
>without much of modification. But I believe asynchronous append on
>foreign tables on a single process is far resource-effective and
>moderately faster than parallel append.

As the test result, current patch can not gain more performance when 
it returns a huge number of tuples. By "parallel shards scan" method,
it can work well, because the 'parallel' can take full use of CPUs while 
'asynchronous' can't. 



Highgo Software (Canada/China/Pakistan)
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca


Re: Append with naive multiplexing of FDWs

От
Kyotaro Horiguchi
Дата:
Thanks!

At Wed, 29 Jan 2020 14:41:07 +0800, Movead Li <movead.li@highgo.ca> wrote in
> >"Parallel scan" at the moment means multiple workers fetch unique
> >blocks from *one* table in an arbitrated manner. In this sense
> >"parallel FDW scan" means multiple local workers fetch unique bundles
> >of tuples from *one* foreign table, which means it is running on a
> >single session.  That doesn't offer an advantage.
>
> It maybe not "parallel FDW scan", it can be "parallel shards scan"
> the local workers will pick every foreign partition to scan. I have ever
> draw a picture about that you can see it in the link below.
>
> https://www.highgo.ca/2019/08/22/parallel-foreign-scan-of-postgresql/
>
> I think the "parallel shards scan" make sence in this way.

It is "asynchronous append on async-capable'd postgres-fdw scans". It
could be called as such in the sense that it is intended to be used
with sharding.

> >If parallel query processing worked in worker-per-table mode,
> >especially on partitioned tables, maybe the current FDW would work
> >without much of modification. But I believe asynchronous append on
> >foreign tables on a single process is far resource-effective and
> >moderately faster than parallel append.
>
> As the test result, current patch can not gain more performance when 
> it returns a huge number of tuples. By "parallel shards scan" method,
> it can work well, because the 'parallel' can take full use of CPUs while 
> 'asynchronous' can't. 

Did you looked at my benchmarking result upthread?  Even it gives
significant gain even when gathering large number of tuples from
multiple servers or even from a single server.  It is because of its
asynchronous nature.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center



Re: Append with naive multiplexing of FDWs

От
"movead.li@highgo.ca"
Дата:
Hello,

>It is "asynchronous append on async-capable'd postgres-fdw scans". It
>could be called as such in the sense that it is intended to be used
>with sharding.
 Yes that's it.

 
>Did you looked at my benchmarking result upthread?  Even it gives
>significant gain even when gathering large number of tuples from
>multiple servers or even from a single server.  It is because of its
>asynchronous nature.
I mean it gain performance at first, but it mets bottleneck while
increase the number of the nodes.
For example:
   It has 2 nodes, it will gain 200% performance.
   It has 3 nodes, it will gain 300% performance.
   However,
   It has 4 nodes, it gain 300% performance.
   It has 5 nodes, it gain 300% performance.
   ...
    
 
----
Highgo Software (Canada/China/Pakistan)
URL : www.highgo.ca
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca

Re: Append with naive multiplexing of FDWs

От
Etsuro Fujita
Дата:
On Sun, Dec 1, 2019 at 4:26 AM Bruce Momjian <bruce@momjian.us> wrote:
> On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
> > On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote:
> > > On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > > > A few years back[1] I experimented with a simple readiness API that
> > > > would allow Append to start emitting tuples from whichever Foreign
> > > > Scan has data available, when working with FDW-based sharding.  I used
> > > > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > > > kqueue implementation of that, but I didn't pursue it seriously
> > > > because I knew we wanted a more ambitious async executor rewrite and
> > > > many people had ideas about that, with schedulers capable of jumping
> > > > all over the tree etc.
> > > >
> > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > > > asked why we don't just do this naive thing until we have something
> > > > better.  It's a very localised feature that works only between Append
> > > > and its immediate children.  The patch makes it work for postgres_fdw,
> > > > but it should work for any FDW that can get its hands on a socket.
> > > >
> > > > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > > > 2016, Parallel Append landed, but I didn't have time to think about
> > > > how to integrate with that so I did a quick "sledgehammer" rebase that
> > > > disables itself if parallelism is in the picture.
> > >
> > > Yes, sharding has been waiting on parallel FDW scans.  Would this work
> > > for parallel partition scans if the partitions were FDWs?
> >
> > Yeah, this works for partitions that are FDWs (as shown), but only for
> > Append, not for Parallel Append.  So you'd have parallelism in the
> > sense that your N remote shard servers are all doing stuff at the same
> > time, but it couldn't be in a parallel query on your 'home' server,
> > which is probably good for things that push down aggregation and bring
> > back just a few tuples from each shard, but bad for anything wanting
> > to ship back millions of tuples to chew on locally.  Do you think
> > that'd be useful enough on its own?
>
> Yes, I think so.  There are many data warehouse queries that want to
> return only aggregate values, or filter for a small number of rows.
> Even OLTP queries might return only a few rows from multiple partitions.
> This would allow for a proof-of-concept implementation so we can see how
> realistic this approach is.

+1

Best regards,
Etsuro Fujita



Re: Append with naive multiplexing of FDWs

От
Etsuro Fujita
Дата:
On Thu, Dec 5, 2019 at 1:46 PM Thomas Munro <thomas.munro@gmail.com> wrote:
> On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi
> <horikyota.ntt@gmail.com> wrote:
> > There's my pending (somewhat stale) patch, which allows to run local
> > scans while waiting for remote servers.
> >
> > https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp

I think it’s great to execute local scans while waiting for the
results of remote scans, but looking at your patch (the 0002 patch of
your patch set in [1]), I’m not sure that the 0002 patch does it much
efficiently, because it modifies nodeAppend.c so that all the work is
done by a single process.  Rather than doing so, I’m wondering if it
would be better to modify Parallel Append so that some processes
execute remote scans and others execute local scans.  I’m not sure
that we need to have this improvement as well in the first cut of this
feature, though.

> After rereading some threads to remind myself what happened here...
> right, my little patch began life in March 2016[1] when I wanted a
> test case to test Andres's work on WaitEventSets, and your patch set
> started a couple of months later and is vastly more ambitious[2][3].
> It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF
> model.  And I totally agree that there are lots of reason to want to
> do that (including yielding to other parts of the plan instead of
> waiting for I/O, locks and some parallelism primitives enabling new
> kinds of parallelism), and I'm hoping to help with some small pieces
> of that if I can.
>
> My patch set (rebased upthread) was extremely primitive, with no new
> planner concepts, and added only a very simple new executor node
> method: ExecReady().  Append used that to try to ask its children if
> they'd like some time to warm up.  By default, ExecReady() says "I
> don't know what you're talking about, go away", but FDWs can provide
> an implementation that says "yes, please call me again when this fd is
> ready" or "yes, I am ready, please call ExecProc() now".  It doesn't
> deal with anything more complicated than that, and in particular it
> doesn't work if there are extra planner nodes in between Append and
> the foreign scan.  (It also doesn't mix particularly well with
> parallelism, as mentioned.)
>
> The reason I reposted this unambitious work is because Stephen keeps
> asking me why we don't consider the stupidly simple thing that would
> help with simple foreign partition-based queries today, instead of
> waiting for someone to redesign the entire executor, because that's
> ... really hard.

Yeah, I think your patch is much simpler, compared to Horiguchi-san’s
patch set, which I think is a good thing, considering this would be
rather an interim solution until executor rewrite is done.  Here are a
few comments that I have for now:

* I know your patch is a POC one, but one concern about it (and
Horiguchi-san's patch set) is concurrent data fetches by multiple
foreign scan nodes using the same connection in the case of
postgres_fdw.  Here is an example causing an error:

create or replace function slow_data_ext(name text, secs float)
returns setof t as
$$
begin
 perform pg_sleep(secs);
 return query select name, generate_series(1, 100)::text as i;
end;
$$
language plpgsql;

create view t11 as select * from slow_data_ext('t11', 1.0);
create view t12 as select * from slow_data_ext('t12', 2.0);
create view t13 as select * from slow_data_ext('t13', 3.0);
create foreign table ft11 (a text, b text) server server1 options
(table_name 't11');
create foreign table ft12 (a text, b text) server server2 options
(table_name 't12');
create foreign table ft13 (a text, b text) server server3 options
(table_name 't13');
create table pt1 (a text, b text) partition by list (a);
alter table pt1 attach partition ft11 for values in ('t11');
alter table pt1 attach partition ft12 for values in ('t12');
alter table pt1 attach partition ft13 for values in ('t13');

create view t21 as select * from slow_data_ext('t21', 1.0);
create view t22 as select * from slow_data_ext('t22', 2.0);
create view t23 as select * from slow_data_ext('t23', 3.0);
create foreign table ft21 (a text, b text) server server1 options
(table_name 't21');
create foreign table ft22 (a text, b text) server server2 options
(table_name 't22');
create foreign table ft23 (a text, b text) server server3 options
(table_name 't23');
create table pt2 (a text, b text) partition by list (a);
alter table pt2 attach partition ft21 for values in ('t21');
alter table pt2 attach partition ft22 for values in ('t22');
alter table pt2 attach partition ft23 for values in ('t23');

explain verbose select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
                                                  QUERY PLAN
--------------------------------------------------------------------------------------------------------------
 Nested Loop  (cost=200.00..1303.80 rows=50220 width=128)
   Output: pt1.a, pt1.b, pt2.a, pt2.b
   ->  Append  (cost=100.00..427.65 rows=2790 width=64)
         ->  Foreign Scan on public.ft11 pt1_1  (cost=100.00..137.90
rows=930 width=64)
               Output: pt1_1.a, pt1_1.b
               Remote SQL: SELECT a, b FROM public.t11
         ->  Foreign Scan on public.ft12 pt1_2  (cost=100.00..137.90
rows=930 width=64)
               Output: pt1_2.a, pt1_2.b
               Remote SQL: SELECT a, b FROM public.t12
         ->  Foreign Scan on public.ft13 pt1_3  (cost=100.00..137.90
rows=930 width=64)
               Output: pt1_3.a, pt1_3.b
               Remote SQL: SELECT a, b FROM public.t13
   ->  Materialize  (cost=100.00..248.44 rows=18 width=64)
         Output: pt2.a, pt2.b
         ->  Append  (cost=100.00..248.35 rows=18 width=64)
               ->  Foreign Scan on public.ft22 pt2_1
(cost=100.00..124.13 rows=9 width=64)
                     Output: pt2_1.a, pt2_1.b
                     Remote SQL: SELECT a, b FROM public.t22 WHERE
(((a = 't22'::text) OR (a = 't23'::text)))
               ->  Foreign Scan on public.ft23 pt2_2
(cost=100.00..124.13 rows=9 width=64)
                     Output: pt2_2.a, pt2_2.b
                     Remote SQL: SELECT a, b FROM public.t23 WHERE
(((a = 't22'::text) OR (a = 't23'::text)))
(21 rows)

select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
ERROR:  another command is already in progress
CONTEXT:  remote SQL command: DECLARE c4 CURSOR FOR
SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))

I think the cause of this error is that an asynchronous data fetch for
ft22 is blocked by that for ft12 that is in progress.
(Horiguchi-san’s patch set doesn't work for this query either, causing
the same error.  Though, it looks like he intended to handle cases
like this by a queuing system added to postgres_fdw to process such
concurrent data fetches.)  I think a simple solution for this issue
would be to just disable asynchrony optimization for such cases.  I
think we could do so by tracking foreign scan nodes across the entire
final plan tree that use the same connection at plan or execution
time.

* Another one is “no new planner concepts”.  I think it leads to this:

@@ -239,9 +242,207 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
    /* For parallel query, this will be overridden later. */
    appendstate->choose_next_subplan = choose_next_subplan_locally;

+   /*
+    * Initially we consider all subplans to be potentially
asynchronous.
+    */
+   appendstate->asyncplans = (PlanState **) palloc(nplans *
sizeof(PlanState *));
+   appendstate->asyncfds = (int *) palloc0(nplans * sizeof(int));
+   appendstate->nasyncplans = nplans;
+   memcpy(appendstate->asyncplans, appendstate->appendplans, nplans *
sizeof(PlanState *));
+   appendstate->lastreadyplan = 0;

I’m not sure that this would cause performance degradation in the case
of an plain Append that involves no FDWs supporting asynchrony
optimization, but I think it would be better to determine subplans
with that optimization at plan time and save cycles at execution time
as done in Horiguchi-san’s patch set.  (I’m not sure that we need a
new ExecAppend function proposed there, though.)  Also, consider this
ordered Append example:

create table t31 (a int check (a >= 10 and a < 20), b text);
create table t32 (a int check (a >= 20 and a < 30), b text);
create table t33 (a int check (a >= 30 and a < 40), b text);
create foreign table ft31 (a int check (a >= 10 and a < 20), b text)
server server1 options (table_name 't31');
create foreign table ft32 (a int check (a >= 20 and a < 30), b text)
server server2 options (table_name 't32');
create foreign table ft33 (a int check (a >= 30 and a < 40), b text)
server server3 options (table_name 't33');
create table pt3 (a int, b text) partition by range (a);
alter table pt3 attach partition ft31 for values from (10) to (20);
alter table pt3 attach partition ft32 for values from (20) to (30);
alter table pt3 attach partition ft33 for values from (30) to (40);

explain verbose select * from pt3 order by a;
                                    QUERY PLAN
-----------------------------------------------------------------------------------
 Append  (cost=300.00..487.52 rows=4095 width=36)
   ->  Foreign Scan on public.ft31 pt3_1  (cost=100.00..155.68
rows=1365 width=36)
         Output: pt3_1.a, pt3_1.b
         Remote SQL: SELECT a, b FROM public.t31 ORDER BY a ASC NULLS LAST
   ->  Foreign Scan on public.ft32 pt3_2  (cost=100.00..155.68
rows=1365 width=36)
         Output: pt3_2.a, pt3_2.b
         Remote SQL: SELECT a, b FROM public.t32 ORDER BY a ASC NULLS LAST
   ->  Foreign Scan on public.ft33 pt3_3  (cost=100.00..155.68
rows=1365 width=36)
         Output: pt3_3.a, pt3_3.b
         Remote SQL: SELECT a, b FROM public.t33 ORDER BY a ASC NULLS LAST
(10 rows)

For this query, we can’t apply asynchrony optimization.  To disable it
for such cases I think it would be better to do something at plan time
as well as done in his patch set.

I haven’t finished reviewing your patch, but before doing so, I’ll
review Horiguchi-san's patch set in more detail for further
comparison.  Attached is a rebased version of your patch, in which I
added the same changes to the postgres_fdw regression tests as
Horiguchi-san so that the tests run successfully.

Thank you for working on this, Thomas and Horiguchi-san!  Sorry for the delay.

Best regards,
Etsuro Fujita

[1] https://www.postgresql.org/message-id/20200820.163608.1893015081639298019.horikyota.ntt%40gmail.com

Вложения

Re: Append with naive multiplexing of FDWs

От
Etsuro Fujita
Дата:
On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:
> * I know your patch is a POC one, but one concern about it (and
> Horiguchi-san's patch set) is concurrent data fetches by multiple
> foreign scan nodes using the same connection in the case of
> postgres_fdw.  Here is an example causing an error:

> select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
> ERROR:  another command is already in progress
> CONTEXT:  remote SQL command: DECLARE c4 CURSOR FOR
> SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))

> (Horiguchi-san’s patch set doesn't work for this query either, causing
> the same error.  Though, it looks like he intended to handle cases
> like this by a queuing system added to postgres_fdw to process such
> concurrent data fetches.)

I was wrong here; Horiguchi-san's patch set works well for this query.
Maybe I did something wrong when testing his patch set.  Sorry for
that.

Best regards,
Etsuro Fujita



Re: Append with naive multiplexing of FDWs

От
Kyotaro Horiguchi
Дата:
Fujita-san, thank you for taking time!

At Mon, 31 Aug 2020 19:10:39 +0900, Etsuro Fujita <etsuro.fujita@gmail.com> wrote in 
> On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:
> > * I know your patch is a POC one, but one concern about it (and
> > Horiguchi-san's patch set) is concurrent data fetches by multiple
> > foreign scan nodes using the same connection in the case of
> > postgres_fdw.  Here is an example causing an error:
> 
> > select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
> > ERROR:  another command is already in progress
> > CONTEXT:  remote SQL command: DECLARE c4 CURSOR FOR
> > SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))
> 
> > (Horiguchi-san’s patch set doesn't work for this query either, causing
> > the same error.  Though, it looks like he intended to handle cases
> > like this by a queuing system added to postgres_fdw to process such
> > concurrent data fetches.)
> 
> I was wrong here; Horiguchi-san's patch set works well for this query.
> Maybe I did something wrong when testing his patch set.  Sorry for
> that.

Yeah. postgresIterateForeignScan calls vacate_connection() to make the
underlying connection available if a server connection is busy with
another remote query. The mechanism is backed by a waiting queue
(add_async_waiter, move_to_next_waiter, remove_async_node).

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

Re: Append with naive multiplexing of FDWs

От
Etsuro Fujita
Дата:
On Tue, Sep 1, 2020 at 9:45 AM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:
> At Mon, 31 Aug 2020 19:10:39 +0900, Etsuro Fujita <etsuro.fujita@gmail.com> wrote in
> > On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote:
> > > * I know your patch is a POC one, but one concern about it (and
> > > Horiguchi-san's patch set) is concurrent data fetches by multiple
> > > foreign scan nodes using the same connection in the case of
> > > postgres_fdw.  Here is an example causing an error:
> >
> > > select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23';
> > > ERROR:  another command is already in progress
> > > CONTEXT:  remote SQL command: DECLARE c4 CURSOR FOR
> > > SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text)))
> >
> > > (Horiguchi-san’s patch set doesn't work for this query either, causing
> > > the same error.  Though, it looks like he intended to handle cases
> > > like this by a queuing system added to postgres_fdw to process such
> > > concurrent data fetches.)
> >
> > I was wrong here; Horiguchi-san's patch set works well for this query.
> > Maybe I did something wrong when testing his patch set.  Sorry for
> > that.
>
> Yeah. postgresIterateForeignScan calls vacate_connection() to make the
> underlying connection available if a server connection is busy with
> another remote query. The mechanism is backed by a waiting queue
> (add_async_waiter, move_to_next_waiter, remove_async_node).

Thanks for the explanation, Horiguchi-san!

So your version of the patch processes the query successfully, because
1) before performing an asynchronous data fetch of ft22, it waits for
the in-progress data fetch of ft12 using the same connection to
complete so that the data fetch of ft22 can be done, and 2) before
performing an asynchronous data fetch of ft23, it waits for the
in-progress data fetch of ft13 using the same connection to complete
so that the data fetch of ft23 can be done.  Right?  If so, I think in
some cases such handling would impact performance negatively.
Consider the same query with LIMIT processed by your version:

explain verbose select * from pt1, pt2 where pt2.a = 't22' or pt2.a =
't23' limit 1;
                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
 Limit  (cost=200.00..200.01 rows=1 width=128)
   Output: pt1.a, pt1.b, pt2.a, pt2.b
   ->  Nested Loop  (cost=200.00..903.87 rows=50220 width=128)
         Output: pt1.a, pt1.b, pt2.a, pt2.b
         ->  Append  (cost=100.00..151.85 rows=2790 width=64)
               Async subplans: 3
               ->  Async Foreign Scan on public.ft11 pt1_1
(cost=100.00..137.90 rows=930 width=64)
                     Output: pt1_1.a, pt1_1.b
                     Remote SQL: SELECT a, b FROM public.t11
               ->  Async Foreign Scan on public.ft12 pt1_2
(cost=100.00..137.90 rows=930 width=64)
                     Output: pt1_2.a, pt1_2.b
                     Remote SQL: SELECT a, b FROM public.t12
               ->  Async Foreign Scan on public.ft13 pt1_3
(cost=100.00..137.90 rows=930 width=64)
                     Output: pt1_3.a, pt1_3.b
                     Remote SQL: SELECT a, b FROM public.t13
         ->  Materialize  (cost=100.00..124.31 rows=18 width=64)
               Output: pt2.a, pt2.b
               ->  Append  (cost=100.00..124.22 rows=18 width=64)
                     Async subplans: 2
                     ->  Async Foreign Scan on public.ft22 pt2_1
(cost=100.00..124.13 rows=9 width=64)
                           Output: pt2_1.a, pt2_1.b
                           Remote SQL: SELECT a, b FROM public.t22
WHERE (((a = 't22'::text) OR (a = 't23'::text)))
                     ->  Async Foreign Scan on public.ft23 pt2_2
(cost=100.00..124.13 rows=9 width=64)
                           Output: pt2_2.a, pt2_2.b
                           Remote SQL: SELECT a, b FROM public.t23
WHERE (((a = 't22'::text) OR (a = 't23'::text)))
(25 rows)

I think your version would require extra time to process this query
compared to HEAD due to such handling.  This query throws an error
with your version, though:

select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23' limit 1;
ERROR:  another command is already in progress
CONTEXT:  remote SQL command: CLOSE c1

Best regards,
Etsuro Fujita