Обсуждение: Append with naive multiplexing of FDWs
Hello, A few years back[1] I experimented with a simple readiness API that would allow Append to start emitting tuples from whichever Foreign Scan has data available, when working with FDW-based sharding. I used that primarily as a way to test Andres's new WaitEventSet stuff and my kqueue implementation of that, but I didn't pursue it seriously because I knew we wanted a more ambitious async executor rewrite and many people had ideas about that, with schedulers capable of jumping all over the tree etc. Anyway, Stephen Frost pinged me off-list to ask about that patch, and asked why we don't just do this naive thing until we have something better. It's a very localised feature that works only between Append and its immediate children. The patch makes it work for postgres_fdw, but it should work for any FDW that can get its hands on a socket. Here's a quick rebase of that old POC patch, along with a demo. Since 2016, Parallel Append landed, but I didn't have time to think about how to integrate with that so I did a quick "sledgehammer" rebase that disables itself if parallelism is in the picture. === demo === create table t (a text, b text); create or replace function slow_data(name text) returns setof t as $$ begin perform pg_sleep(random()); return query select name, generate_series(1, 100)::text as i; end; $$ language plpgsql; create view t1 as select * from slow_data('t1'); create view t2 as select * from slow_data('t2'); create view t3 as select * from slow_data('t3'); create extension postgres_fdw; create server server1 foreign data wrapper postgres_fdw options (dbname 'postgres'); create server server2 foreign data wrapper postgres_fdw options (dbname 'postgres'); create server server3 foreign data wrapper postgres_fdw options (dbname 'postgres'); create user mapping for current_user server server1; create user mapping for current_user server server2; create user mapping for current_user server server3; create foreign table ft1 (a text, b text) server server1 options (table_name 't1'); create foreign table ft2 (a text, b text) server server2 options (table_name 't2'); create foreign table ft3 (a text, b text) server server3 options (table_name 't3'); -- create three remote shards create table pt (a text, b text) partition by list (a); alter table pt attach partition ft1 for values in ('ft1'); alter table pt attach partition ft2 for values in ('ft2'); alter table pt attach partition ft3 for values in ('ft3'); -- see that tuples come back in the order that they're ready select * from pt where b like '42'; [1] https://www.postgresql.org/message-id/CAEepm%3D1CuAWfxDk%3D%3DjZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA%40mail.gmail.com -- Thomas Munro https://enterprisedb.com
Вложения
On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote: > Hello, > > A few years back[1] I experimented with a simple readiness API that > would allow Append to start emitting tuples from whichever Foreign > Scan has data available, when working with FDW-based sharding. I used > that primarily as a way to test Andres's new WaitEventSet stuff and my > kqueue implementation of that, but I didn't pursue it seriously > because I knew we wanted a more ambitious async executor rewrite and > many people had ideas about that, with schedulers capable of jumping > all over the tree etc. > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and > asked why we don't just do this naive thing until we have something > better. It's a very localised feature that works only between Append > and its immediate children. The patch makes it work for postgres_fdw, > but it should work for any FDW that can get its hands on a socket. > > Here's a quick rebase of that old POC patch, along with a demo. Since > 2016, Parallel Append landed, but I didn't have time to think about > how to integrate with that so I did a quick "sledgehammer" rebase that > disables itself if parallelism is in the picture. Yes, sharding has been waiting on parallel FDW scans. Would this work for parallel partition scans if the partitions were FDWs? -- Bruce Momjian <bruce@momjian.us> http://momjian.us EnterpriseDB http://enterprisedb.com + As you are, so once was I. As I am, so you will be. + + Ancient Roman grave inscription +
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote: > On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote: > > A few years back[1] I experimented with a simple readiness API that > > would allow Append to start emitting tuples from whichever Foreign > > Scan has data available, when working with FDW-based sharding. I used > > that primarily as a way to test Andres's new WaitEventSet stuff and my > > kqueue implementation of that, but I didn't pursue it seriously > > because I knew we wanted a more ambitious async executor rewrite and > > many people had ideas about that, with schedulers capable of jumping > > all over the tree etc. > > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and > > asked why we don't just do this naive thing until we have something > > better. It's a very localised feature that works only between Append > > and its immediate children. The patch makes it work for postgres_fdw, > > but it should work for any FDW that can get its hands on a socket. > > > > Here's a quick rebase of that old POC patch, along with a demo. Since > > 2016, Parallel Append landed, but I didn't have time to think about > > how to integrate with that so I did a quick "sledgehammer" rebase that > > disables itself if parallelism is in the picture. > > Yes, sharding has been waiting on parallel FDW scans. Would this work > for parallel partition scans if the partitions were FDWs? Yeah, this works for partitions that are FDWs (as shown), but only for Append, not for Parallel Append. So you'd have parallelism in the sense that your N remote shard servers are all doing stuff at the same time, but it couldn't be in a parallel query on your 'home' server, which is probably good for things that push down aggregation and bring back just a few tuples from each shard, but bad for anything wanting to ship back millions of tuples to chew on locally. Do you think that'd be useful enough on its own? The problem is that parallel safe non-partial plans (like postgres_fdw scans) are exclusively 'claimed' by one process under Parallel Append, so with the patch as posted, if you modify it to allow parallelism then it'll probably give correct answers but nothing prevents a single process from claiming and starting all the scans and then waiting for them to be ready, while the other processes miss out on doing any work at all. There's probably some kludgy solution involving not letting any one worker start more than X, and some space cadet solution involving passing sockets around and teaching libpq to hand over connections at certain controlled phases of the protocol (due to lack of threads), but nothing like that has jumped out as the right path so far. One idea that seems promising but requires a bunch more infrastructure is to offload the libpq multiplexing to a background worker that owns all the sockets, and have it push tuples into a multi-consumer shared memory queue that regular executor processes could read from. I have been wondering if that would be best done by each FDW implementation, or if there is a way to make a generic infrastructure for converting parallel-safe executor nodes into partial plans by the use of a 'Scatter' (opposite of Gather) node that can spread the output of any node over many workers. If you had that, you'd still want a way for Parallel Append to be readiness-based, but it would probably look a bit different to this patch because it'd need to use (vapourware) multiconsumer shm queue readiness, not fd readiness. And another kind of fd-readiness multiplexing would be going on inside the new (vapourware) worker that handles all the libpq connections (and maybe other kinds of work for other FDWs that are able to expose a socket).
On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote: > On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote: > > On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote: > > > A few years back[1] I experimented with a simple readiness API that > > > would allow Append to start emitting tuples from whichever Foreign > > > Scan has data available, when working with FDW-based sharding. I used > > > that primarily as a way to test Andres's new WaitEventSet stuff and my > > > kqueue implementation of that, but I didn't pursue it seriously > > > because I knew we wanted a more ambitious async executor rewrite and > > > many people had ideas about that, with schedulers capable of jumping > > > all over the tree etc. > > > > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and > > > asked why we don't just do this naive thing until we have something > > > better. It's a very localised feature that works only between Append > > > and its immediate children. The patch makes it work for postgres_fdw, > > > but it should work for any FDW that can get its hands on a socket. > > > > > > Here's a quick rebase of that old POC patch, along with a demo. Since > > > 2016, Parallel Append landed, but I didn't have time to think about > > > how to integrate with that so I did a quick "sledgehammer" rebase that > > > disables itself if parallelism is in the picture. > > > > Yes, sharding has been waiting on parallel FDW scans. Would this work > > for parallel partition scans if the partitions were FDWs? > > Yeah, this works for partitions that are FDWs (as shown), but only for > Append, not for Parallel Append. So you'd have parallelism in the > sense that your N remote shard servers are all doing stuff at the same > time, but it couldn't be in a parallel query on your 'home' server, > which is probably good for things that push down aggregation and bring > back just a few tuples from each shard, but bad for anything wanting > to ship back millions of tuples to chew on locally. Do you think > that'd be useful enough on its own? Yes, I think so. There are many data warehouse queries that want to return only aggregate values, or filter for a small number of rows. Even OLTP queries might return only a few rows from multiple partitions. This would allow for a proof-of-concept implementation so we can see how realistic this approach is. > The problem is that parallel safe non-partial plans (like postgres_fdw > scans) are exclusively 'claimed' by one process under Parallel Append, > so with the patch as posted, if you modify it to allow parallelism > then it'll probably give correct answers but nothing prevents a single > process from claiming and starting all the scans and then waiting for > them to be ready, while the other processes miss out on doing any work > at all. There's probably some kludgy solution involving not letting > any one worker start more than X, and some space cadet solution > involving passing sockets around and teaching libpq to hand over > connections at certain controlled phases of the protocol (due to lack > of threads), but nothing like that has jumped out as the right path so > far. I am unclear how many queries can do any meaningful work until all shards have giving their full results. -- Bruce Momjian <bruce@momjian.us> http://momjian.us EnterpriseDB http://enterprisedb.com + As you are, so once was I. As I am, so you will be. + + Ancient Roman grave inscription +
Hello. At Sat, 30 Nov 2019 14:26:11 -0500, Bruce Momjian <bruce@momjian.us> wrote in > On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote: > > On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote: > > > On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote: > > > > A few years back[1] I experimented with a simple readiness API that > > > > would allow Append to start emitting tuples from whichever Foreign > > > > Scan has data available, when working with FDW-based sharding. I used > > > > that primarily as a way to test Andres's new WaitEventSet stuff and my > > > > kqueue implementation of that, but I didn't pursue it seriously > > > > because I knew we wanted a more ambitious async executor rewrite and > > > > many people had ideas about that, with schedulers capable of jumping > > > > all over the tree etc. > > > > > > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and > > > > asked why we don't just do this naive thing until we have something > > > > better. It's a very localised feature that works only between Append > > > > and its immediate children. The patch makes it work for postgres_fdw, > > > > but it should work for any FDW that can get its hands on a socket. > > > > > > > > Here's a quick rebase of that old POC patch, along with a demo. Since > > > > 2016, Parallel Append landed, but I didn't have time to think about > > > > how to integrate with that so I did a quick "sledgehammer" rebase that > > > > disables itself if parallelism is in the picture. > > > > > > Yes, sharding has been waiting on parallel FDW scans. Would this work > > > for parallel partition scans if the partitions were FDWs? > > > > Yeah, this works for partitions that are FDWs (as shown), but only for > > Append, not for Parallel Append. So you'd have parallelism in the > > sense that your N remote shard servers are all doing stuff at the same > > time, but it couldn't be in a parallel query on your 'home' server, > > which is probably good for things that push down aggregation and bring > > back just a few tuples from each shard, but bad for anything wanting > > to ship back millions of tuples to chew on locally. Do you think > > that'd be useful enough on its own? > > Yes, I think so. There are many data warehouse queries that want to > return only aggregate values, or filter for a small number of rows. > Even OLTP queries might return only a few rows from multiple partitions. > This would allow for a proof-of-concept implementation so we can see how > realistic this approach is. > > > The problem is that parallel safe non-partial plans (like postgres_fdw > > scans) are exclusively 'claimed' by one process under Parallel Append, > > so with the patch as posted, if you modify it to allow parallelism > > then it'll probably give correct answers but nothing prevents a single > > process from claiming and starting all the scans and then waiting for > > them to be ready, while the other processes miss out on doing any work > > at all. There's probably some kludgy solution involving not letting > > any one worker start more than X, and some space cadet solution > > involving passing sockets around and teaching libpq to hand over > > connections at certain controlled phases of the protocol (due to lack > > of threads), but nothing like that has jumped out as the right path so > > far. > > I am unclear how many queries can do any meaningful work until all > shards have giving their full results. There's my pending (somewhat stale) patch, which allows to run local scans while waiting for remote servers. https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp I (or we) wanted to introduce the asynchronous node mechanism as the basis of async-capable postgres_fdw. The reason why it is stopping is that we are seeing and I am waiting the executor change that makes executor push-up style, on which the async-node mechanism will be constructed. If that won't happen shortly, I'd like to continue that work.. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > There's my pending (somewhat stale) patch, which allows to run local > scans while waiting for remote servers. > > https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp > > I (or we) wanted to introduce the asynchronous node mechanism as the > basis of async-capable postgres_fdw. The reason why it is stopping is > that we are seeing and I am waiting the executor change that makes > executor push-up style, on which the async-node mechanism will be > constructed. If that won't happen shortly, I'd like to continue that > work.. After rereading some threads to remind myself what happened here... right, my little patch began life in March 2016[1] when I wanted a test case to test Andres's work on WaitEventSets, and your patch set started a couple of months later and is vastly more ambitious[2][3]. It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF model. And I totally agree that there are lots of reason to want to do that (including yielding to other parts of the plan instead of waiting for I/O, locks and some parallelism primitives enabling new kinds of parallelism), and I'm hoping to help with some small pieces of that if I can. My patch set (rebased upthread) was extremely primitive, with no new planner concepts, and added only a very simple new executor node method: ExecReady(). Append used that to try to ask its children if they'd like some time to warm up. By default, ExecReady() says "I don't know what you're talking about, go away", but FDWs can provide an implementation that says "yes, please call me again when this fd is ready" or "yes, I am ready, please call ExecProc() now". It doesn't deal with anything more complicated than that, and in particular it doesn't work if there are extra planner nodes in between Append and the foreign scan. (It also doesn't mix particularly well with parallelism, as mentioned.) The reason I reposted this unambitious work is because Stephen keeps asking me why we don't consider the stupidly simple thing that would help with simple foreign partition-based queries today, instead of waiting for someone to redesign the entire executor, because that's ... really hard. [1] https://www.postgresql.org/message-id/CAEepm%3D1CuAWfxDk%3D%3DjZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA%40mail.gmail.com [2] https://www.postgresql.org/message-id/flat/CA%2BTgmobx8su_bYtAa3DgrqB%2BR7xZG6kHRj0ccMUUshKAQVftww%40mail.gmail.com [3] https://www.postgresql.org/message-id/flat/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com
On Thu, Dec 5, 2019 at 05:45:24PM +1300, Thomas Munro wrote: > My patch set (rebased upthread) was extremely primitive, with no new > planner concepts, and added only a very simple new executor node > method: ExecReady(). Append used that to try to ask its children if > they'd like some time to warm up. By default, ExecReady() says "I > don't know what you're talking about, go away", but FDWs can provide > an implementation that says "yes, please call me again when this fd is > ready" or "yes, I am ready, please call ExecProc() now". It doesn't > deal with anything more complicated than that, and in particular it > doesn't work if there are extra planner nodes in between Append and > the foreign scan. (It also doesn't mix particularly well with > parallelism, as mentioned.) > > The reason I reposted this unambitious work is because Stephen keeps > asking me why we don't consider the stupidly simple thing that would > help with simple foreign partition-based queries today, instead of > waiting for someone to redesign the entire executor, because that's > ... really hard. I agree with Stephen's request. We have been waiting for the executor rewrite for a while, so let's just do something simple and see how it performs. -- Bruce Momjian <bruce@momjian.us> http://momjian.us EnterpriseDB http://enterprisedb.com + As you are, so once was I. As I am, so you will be. + + Ancient Roman grave inscription +
On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote: > I agree with Stephen's request. We have been waiting for the executor > rewrite for a while, so let's just do something simple and see how it > performs. I'm sympathetic to the frustration here, and I think it would be great if we could find a way forward that doesn't involve waiting for a full rewrite of the executor. However, I seem to remember that when we tested the various patches that various people had written for this feature (I wrote one, too) they all had a noticeable performance penalty in the case of a plain old Append that involved no FDWs and nothing asynchronous. I don't think it's OK to have, say, a 2% regression on every query that involves an Append, because especially now that we have partitioning, that's a lot of queries. I don't know whether this patch has that kind of problem. If it doesn't, I would consider that a promising sign. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <robertmhaas@gmail.com> wrote: > > On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote: > > I agree with Stephen's request. We have been waiting for the executor > > rewrite for a while, so let's just do something simple and see how it > > performs. > > I'm sympathetic to the frustration here, and I think it would be great > if we could find a way forward that doesn't involve waiting for a full > rewrite of the executor. However, I seem to remember that when we > tested the various patches that various people had written for this > feature (I wrote one, too) they all had a noticeable performance > penalty in the case of a plain old Append that involved no FDWs and > nothing asynchronous. I don't think it's OK to have, say, a 2% > regression on every query that involves an Append, because especially > now that we have partitioning, that's a lot of queries. > > I don't know whether this patch has that kind of problem. If it > doesn't, I would consider that a promising sign. I'll look into that. If there is a measurable impact, I suspect it can be avoided by, for example, installing a different ExecProcNode function.
At Fri, 6 Dec 2019 10:03:44 +1300, Thomas Munro <thomas.munro@gmail.com> wrote in > On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <robertmhaas@gmail.com> wrote: > > I don't know whether this patch has that kind of problem. If it > > doesn't, I would consider that a promising sign. > > I'll look into that. If there is a measurable impact, I suspect it > can be avoided by, for example, installing a different ExecProcNode > function. Replacing ExecProcNode perfectly isolates additional process in ExecAppendAsync. Thus, for pure local appends, the patch can impact performance through only planner and execinit. But I don't believe it cannot be as large as observable in a large scan. As the mail pointed upthread, the patch acceleartes all remote cases when fetch_size is >= 200. The problem was that local scans seemed slightly slowed down. I dusted off the old patch (FWIW I attached it) and.. will re-run on the current development environment. (And re-check the code.). regards. -- Kyotaro Horiguchi NTT Open Source Software Center From 9f5dc3720ddade94cd66713f4aa79da575b09e31 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Mon, 22 May 2017 12:42:58 +0900 Subject: [PATCH v1 1/3] Allow wait event set to be registered to resource owner WaitEventSet needs to be released using resource owner for a certain case. This change adds WaitEventSet reowner and allow the creator of a WaitEventSet to specify a resource owner. --- src/backend/libpq/pqcomm.c | 2 +- src/backend/storage/ipc/latch.c | 18 ++++- src/backend/storage/lmgr/condition_variable.c | 2 +- src/backend/utils/resowner/resowner.c | 67 +++++++++++++++++++ src/include/storage/latch.h | 4 +- src/include/utils/resowner_private.h | 8 +++ 6 files changed, 96 insertions(+), 5 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index cd517e8bb4..3912b8b3a0 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -220,7 +220,7 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3); AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL); diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 2426cbcf8e..dc04ee5f6f 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -52,6 +52,7 @@ #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/shmem.h" +#include "utils/resowner_private.h" /* * Select the fd readiness primitive to use. Normally the "most modern" @@ -78,6 +79,8 @@ struct WaitEventSet int nevents; /* number of registered events */ int nevents_space; /* maximum number of events in this set */ + ResourceOwner resowner; /* Resource owner */ + /* * Array, of nevents_space length, storing the definition of events this * set is waiting for. @@ -372,7 +375,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, int ret = 0; int rc; WaitEvent event; - WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3); + WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3); if (wakeEvents & WL_TIMEOUT) Assert(timeout >= 0); @@ -539,12 +542,15 @@ ResetLatch(Latch *latch) * WaitEventSetWait(). */ WaitEventSet * -CreateWaitEventSet(MemoryContext context, int nevents) +CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents) { WaitEventSet *set; char *data; Size sz = 0; + if (res) + ResourceOwnerEnlargeWESs(res); + /* * Use MAXALIGN size/alignment to guarantee that later uses of memory are * aligned correctly. E.g. epoll_event might need 8 byte alignment on some @@ -614,6 +620,11 @@ CreateWaitEventSet(MemoryContext context, int nevents) StaticAssertStmt(WSA_INVALID_EVENT == NULL, ""); #endif + /* Register this wait event set if requested */ + set->resowner = res; + if (res) + ResourceOwnerRememberWES(set->resowner, set); + return set; } @@ -655,6 +666,9 @@ FreeWaitEventSet(WaitEventSet *set) } #endif + if (set->resowner != NULL) + ResourceOwnerForgetWES(set->resowner, set); + pfree(set); } diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index e08507f0cc..5e88c48a1c 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -70,7 +70,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) { WaitEventSet *new_event_set; - new_event_set = CreateWaitEventSet(TopMemoryContext, 2); + new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2); AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index 7be11c48ab..829034516f 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -128,6 +128,7 @@ typedef struct ResourceOwnerData ResourceArray filearr; /* open temporary files */ ResourceArray dsmarr; /* dynamic shmem segments */ ResourceArray jitarr; /* JIT contexts */ + ResourceArray wesarr; /* wait event sets */ /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */ int nlocks; /* number of owned locks */ @@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc); static void PrintSnapshotLeakWarning(Snapshot snapshot); static void PrintFileLeakWarning(File file); static void PrintDSMLeakWarning(dsm_segment *seg); +static void PrintWESLeakWarning(WaitEventSet *events); /***************************************************************************** @@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name) ResourceArrayInit(&(owner->filearr), FileGetDatum(-1)); ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL)); ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL)); + ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL)); return owner; } @@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, jit_release_context(context); } + + /* Ditto for wait event sets */ + while (ResourceArrayGetAny(&(owner->wesarr), &foundres)) + { + WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres); + + if (isCommit) + PrintWESLeakWarning(event); + FreeWaitEventSet(event); + } } else if (phase == RESOURCE_RELEASE_LOCKS) { @@ -701,6 +714,7 @@ ResourceOwnerDelete(ResourceOwner owner) Assert(owner->filearr.nitems == 0); Assert(owner->dsmarr.nitems == 0); Assert(owner->jitarr.nitems == 0); + Assert(owner->wesarr.nitems == 0); Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1); /* @@ -728,6 +742,7 @@ ResourceOwnerDelete(ResourceOwner owner) ResourceArrayFree(&(owner->filearr)); ResourceArrayFree(&(owner->dsmarr)); ResourceArrayFree(&(owner->jitarr)); + ResourceArrayFree(&(owner->wesarr)); pfree(owner); } @@ -1346,3 +1361,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle) elog(ERROR, "JIT context %p is not owned by resource owner %s", DatumGetPointer(handle), owner->name); } + +/* + * wait event set reference array. + * + * This is separate from actually inserting an entry because if we run out + * of memory, it's critical to do so *before* acquiring the resource. + */ +void +ResourceOwnerEnlargeWESs(ResourceOwner owner) +{ + ResourceArrayEnlarge(&(owner->wesarr)); +} + +/* + * Remember that a wait event set is owned by a ResourceOwner + * + * Caller must have previously done ResourceOwnerEnlargeWESs() + */ +void +ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events) +{ + ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events)); +} + +/* + * Forget that a wait event set is owned by a ResourceOwner + */ +void +ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events) +{ + /* + * XXXX: There's no property to show as an identier of a wait event set, + * use its pointer instead. + */ + if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events))) + elog(ERROR, "wait event set %p is not owned by resource owner %s", + events, owner->name); +} + +/* + * Debugging subroutine + */ +static void +PrintWESLeakWarning(WaitEventSet *events) +{ + /* + * XXXX: There's no property to show as an identier of a wait event set, + * use its pointer instead. + */ + elog(WARNING, "wait event set leak: %p still referenced", + events); +} diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index bd7af11a8a..d136614587 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -101,6 +101,7 @@ #define LATCH_H #include <signal.h> +#include "utils/resowner.h" /* * Latch structure should be treated as opaque and only accessed through @@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch); extern void SetLatch(Latch *latch); extern void ResetLatch(Latch *latch); -extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents); +extern WaitEventSet *CreateWaitEventSet(MemoryContext context, + ResourceOwner res, int nevents); extern void FreeWaitEventSet(WaitEventSet *set); extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data); diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h index b8261ad866..9c9c845784 100644 --- a/src/include/utils/resowner_private.h +++ b/src/include/utils/resowner_private.h @@ -18,6 +18,7 @@ #include "storage/dsm.h" #include "storage/fd.h" +#include "storage/latch.h" #include "storage/lock.h" #include "utils/catcache.h" #include "utils/plancache.h" @@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner, extern void ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle); +/* support for wait event set management */ +extern void ResourceOwnerEnlargeWESs(ResourceOwner owner); +extern void ResourceOwnerRememberWES(ResourceOwner owner, + WaitEventSet *); +extern void ResourceOwnerForgetWES(ResourceOwner owner, + WaitEventSet *); + #endif /* RESOWNER_PRIVATE_H */ -- 2.23.0 From 65fc69bb64f4596aff0c37a2d090ddf1609120bb Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Tue, 15 May 2018 20:21:32 +0900 Subject: [PATCH v1 2/3] infrastructure for asynchronous execution This patch add an infrastructure for asynchronous execution. As a PoC this makes only Append capable to handle asynchronously executable subnodes. --- src/backend/commands/explain.c | 17 ++ src/backend/executor/Makefile | 1 + src/backend/executor/execAsync.c | 145 ++++++++++++ src/backend/executor/nodeAppend.c | 286 +++++++++++++++++++++--- src/backend/executor/nodeForeignscan.c | 22 +- src/backend/nodes/bitmapset.c | 72 ++++++ src/backend/nodes/copyfuncs.c | 2 + src/backend/nodes/outfuncs.c | 2 + src/backend/nodes/readfuncs.c | 2 + src/backend/optimizer/plan/createplan.c | 83 +++++++ src/backend/postmaster/pgstat.c | 3 + src/backend/postmaster/syslogger.c | 2 +- src/backend/utils/adt/ruleutils.c | 8 +- src/include/executor/execAsync.h | 23 ++ src/include/executor/executor.h | 1 + src/include/executor/nodeForeignscan.h | 3 + src/include/foreign/fdwapi.h | 11 + src/include/nodes/bitmapset.h | 1 + src/include/nodes/execnodes.h | 20 +- src/include/nodes/plannodes.h | 9 + src/include/pgstat.h | 3 +- 21 files changed, 676 insertions(+), 40 deletions(-) create mode 100644 src/backend/executor/execAsync.c create mode 100644 src/include/executor/execAsync.h diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 62fb3434a3..9f06e1fbdc 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -82,6 +82,7 @@ static void show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es); static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ExplainState *es); +static void show_append_info(AppendState *astate, ExplainState *es); static void show_agg_keys(AggState *astate, List *ancestors, ExplainState *es); static void show_grouping_sets(PlanState *planstate, Agg *agg, @@ -1319,6 +1320,8 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); + if (plan->async_capable) + appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1860,6 +1863,11 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Hash: show_hash_info(castNode(HashState, planstate), es); break; + + case T_Append: + show_append_info(castNode(AppendState, planstate), es); + break; + default: break; } @@ -2197,6 +2205,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ancestors, es); } +static void +show_append_info(AppendState *astate, ExplainState *es) +{ + Append *plan = (Append *) astate->ps.plan; + + if (plan->nasyncplans > 0) + ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es); +} + /* * Show the grouping keys for an Agg node. */ diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index a983800e4b..8a2d6e9961 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ + execAsync.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c new file mode 100644 index 0000000000..db477e2cf6 --- /dev/null +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,145 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/nodeAppend.h" +#include "executor/nodeForeignscan.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +void ExecAsyncSetState(PlanState *pstate, AsyncState status) +{ + pstate->asyncstate = status; +} + +bool +ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node, + void *data, bool reinit) +{ + switch (nodeTag(node)) + { + case T_ForeignScanState: + return ExecForeignAsyncConfigureWait((ForeignScanState *)node, + wes, data, reinit); + break; + default: + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(node)); + } +} + +/* + * struct for memory context callback argument used in ExecAsyncEventWait + */ +typedef struct { + int **p_refind; + int *p_refindsize; +} ExecAsync_mcbarg; + +/* + * callback function to reset static variables pointing to the memory in + * TopTransactionContext in ExecAsyncEventWait. + */ +static void ExecAsyncMemoryContextCallback(void *arg) +{ + /* arg is the address of the variable refind in ExecAsyncEventWait */ + ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg; + *mcbarg->p_refind = NULL; + *mcbarg->p_refindsize = 0; +} + +#define EVENT_BUFFER_SIZE 16 + +Bitmapset * +ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout) +{ + static int *refind = NULL; + static int refindsize = 0; + WaitEventSet *wes; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred = 0; + Bitmapset *fired_events = NULL; + int i; + int n; + + n = bms_num_members(waitnodes); + wes = CreateWaitEventSet(TopTransactionContext, + TopTransactionResourceOwner, n); + if (refindsize < n) + { + if (refindsize == 0) + refindsize = EVENT_BUFFER_SIZE; /* XXX */ + while (refindsize < n) + refindsize *= 2; + if (refind) + refind = (int *) repalloc(refind, refindsize * sizeof(int)); + else + { + static ExecAsync_mcbarg mcb_arg = + { &refind, &refindsize }; + static MemoryContextCallback mcb = + { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL }; + MemoryContext oldctxt = + MemoryContextSwitchTo(TopTransactionContext); + + /* + * refind points to a memory block in + * TopTransactionContext. Register a callback to reset it. + */ + MemoryContextRegisterResetCallback(TopTransactionContext, &mcb); + refind = (int *) palloc(refindsize * sizeof(int)); + MemoryContextSwitchTo(oldctxt); + } + } + + n = 0; + for (i = bms_next_member(waitnodes, -1) ; i >= 0 ; + i = bms_next_member(waitnodes, i)) + { + refind[i] = i; + if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true)) + n++; + } + + if (n == 0) + { + FreeWaitEventSet(wes); + return NULL; + } + + noccurred = WaitEventSetWait(wes, timeout, occurred_event, + EVENT_BUFFER_SIZE, + WAIT_EVENT_ASYNC_WAIT); + FreeWaitEventSet(wes); + if (noccurred == 0) + return NULL; + + for (i = 0 ; i < noccurred ; i++) + { + WaitEvent *w = &occurred_event[i]; + + if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0) + { + int n = *(int*)w->user_data; + + fired_events = bms_add_member(fired_events, n); + } + } + + return fired_events; +} diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 5ff986ac7d..03dec4d648 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -60,6 +60,7 @@ #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" +#include "executor/execAsync.h" #include "miscadmin.h" /* Shared state for parallel-aware Append. */ @@ -81,6 +82,7 @@ struct ParallelAppendState #define NO_MATCHING_SUBPLANS -2 static TupleTableSlot *ExecAppend(PlanState *pstate); +static TupleTableSlot *ExecAppendAsync(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); @@ -104,22 +106,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags) PlanState **appendplanstates; Bitmapset *validsubplans; int nplans; + int nasyncplans; int firstvalid; int i, j; /* check for unsupported flags */ - Assert(!(eflags & EXEC_FLAG_MARK)); + Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC))); /* * create new AppendState for our append node */ appendstate->ps.plan = (Plan *) node; appendstate->ps.state = estate; - appendstate->ps.ExecProcNode = ExecAppend; + + /* choose appropriate version of Exec function */ + if (node->nasyncplans == 0) + appendstate->ps.ExecProcNode = ExecAppend; + else + appendstate->ps.ExecProcNode = ExecAppendAsync; /* Let choose_next_subplan_* function handle setting the first subplan */ - appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -152,7 +160,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) */ if (bms_is_empty(validsubplans)) { - appendstate->as_whichplan = NO_MATCHING_SUBPLANS; + appendstate->as_whichsyncplan = NO_MATCHING_SUBPLANS; /* Mark the first as valid so that it's initialized below */ validsubplans = bms_make_singleton(0); @@ -212,10 +220,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags) */ j = 0; firstvalid = nplans; + nasyncplans = 0; + i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + int sub_eflags = eflags; + + /* Let async-capable subplans run asynchronously */ + if (i < node->nasyncplans) + { + sub_eflags |= EXEC_FLAG_ASYNC; + nasyncplans++; + } /* * Record the lowest appendplans index which is a valid partial plan. @@ -223,13 +241,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags) if (i >= node->first_partial_plan && j < firstvalid) firstvalid = j; - appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags); } appendstate->as_first_partial_plan = firstvalid; appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* fill in async stuff */ + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_syncdone = (nasyncplans == nplans); + + if (appendstate->as_nasyncplans) + { + appendstate->as_asyncresult = (TupleTableSlot **) + palloc0(node->nasyncplans * sizeof(TupleTableSlot *)); + + /* initially, all async requests need a request */ + for (i = 0; i < appendstate->as_nasyncplans; ++i) + appendstate->as_needrequest = + bms_add_member(appendstate->as_needrequest, i); + } + /* * Miscellaneous initialization */ @@ -253,21 +286,23 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); - if (node->as_whichplan < 0) + if (node->as_whichsyncplan < 0) { /* * If no subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && + if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX && !node->choose_next_subplan(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); /* Nothing to do if there are no matching subplans */ - else if (node->as_whichplan == NO_MATCHING_SUBPLANS) + else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } + Assert(node->as_nasyncplans == 0); + for (;;) { PlanState *subnode; @@ -278,8 +313,9 @@ ExecAppend(PlanState *pstate) /* * figure out which subplan we are currently processing */ - Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); - subnode = node->appendplans[node->as_whichplan]; + Assert(node->as_whichsyncplan >= 0 && + node->as_whichsyncplan < node->as_nplans); + subnode = node->appendplans[node->as_whichsyncplan]; /* * get a tuple from the subplan @@ -302,6 +338,175 @@ ExecAppend(PlanState *pstate) } } +static TupleTableSlot * +ExecAppendAsync(PlanState *pstate) +{ + AppendState *node = castNode(AppendState, pstate); + Bitmapset *needrequest; + int i; + + Assert(node->as_nasyncplans > 0); + +restart: + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + while ((i = bms_first_member(needrequest)) >= 0) + { + TupleTableSlot *slot; + PlanState *subnode = node->appendplans[i]; + + slot = ExecProcNode(subnode); + if (subnode->asyncstate == AS_AVAILABLE) + { + if (!TupIsNull(slot)) + { + node->as_asyncresult[node->as_nasyncresult++] = slot; + node->as_needrequest = bms_add_member(node->as_needrequest, i); + } + } + else + node->as_pending_async = bms_add_member(node->as_pending_async, i); + } + bms_free(needrequest); + + for (;;) + { + TupleTableSlot *result; + + /* return now if a result is available */ + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + while (!bms_is_empty(node->as_pending_async)) + { + long timeout = node->as_syncdone ? -1 : 0; + Bitmapset *fired; + int i; + + fired = ExecAsyncEventWait(node->appendplans, + node->as_pending_async, + timeout); + + if (bms_is_empty(fired) && node->as_syncdone) + { + /* + * No subplan fired. This happens when even in normal + * operation where the subnode already prepared results before + * waiting. as_pending_result is storing stale information so + * restart from the beginning. + */ + node->as_needrequest = node->as_pending_async; + node->as_pending_async = NULL; + goto restart; + } + + while ((i = bms_first_member(fired)) >= 0) + { + TupleTableSlot *slot; + PlanState *subnode = node->appendplans[i]; + slot = ExecProcNode(subnode); + if (subnode->asyncstate == AS_AVAILABLE) + { + if (!TupIsNull(slot)) + { + node->as_asyncresult[node->as_nasyncresult++] = slot; + node->as_needrequest = + bms_add_member(node->as_needrequest, i); + } + node->as_pending_async = + bms_del_member(node->as_pending_async, i); + } + } + bms_free(fired); + + /* return now if a result is available */ + if (node->as_nasyncresult > 0) + { + --node->as_nasyncresult; + return node->as_asyncresult[node->as_nasyncresult]; + } + + if (!node->as_syncdone) + break; + } + + /* + * If there is no asynchronous activity still pending and the + * synchronous activity is also complete, we're totally done scanning + * this node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the synchronous children. + */ + if (node->as_syncdone) + { + Assert(bms_is_empty(node->as_pending_async)); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + + /* + * get a tuple from the subplan + */ + + if (node->as_whichsyncplan < 0) + { + /* + * If no subplan has been chosen, we must choose one before + * proceeding. + */ + if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX && + !node->choose_next_subplan(node)) + { + node->as_syncdone = true; + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + + /* Nothing to do if there are no matching subplans */ + else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS) + { + node->as_syncdone = true; + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + } + + result = ExecProcNode(node->appendplans[node->as_whichsyncplan]); + + if (!TupIsNull(result)) + { + /* + * If the subplan gave us something then return it as-is. We do + * NOT make use of the result slot that was set up in + * ExecInitAppend; there's no need for it. + */ + return result; + } + + /* + * Go on to the "next" subplan. If no more subplans, return the empty + * slot set up for us by ExecInitAppend, unless there are async plans + * we have yet to finish. + */ + if (!node->choose_next_subplan(node)) + { + node->as_syncdone = true; + if (bms_is_empty(node->as_pending_async)) + { + Assert(bms_is_empty(node->as_needrequest)); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + } + + /* Else loop back and try to get a tuple from the new subplan */ + } +} + /* ---------------------------------------------------------------- * ExecEndAppend * @@ -348,6 +553,15 @@ ExecReScanAppend(AppendState *node) node->as_valid_subplans = NULL; } + /* Reset async state. */ + for (i = 0; i < node->as_nasyncplans; ++i) + { + ExecShutdownNode(node->appendplans[i]); + node->as_needrequest = bms_add_member(node->as_needrequest, i); + } + node->as_nasyncresult = 0; + node->as_syncdone = (node->as_nasyncplans == node->as_nplans); + for (i = 0; i < node->as_nplans; i++) { PlanState *subnode = node->appendplans[i]; @@ -368,7 +582,7 @@ ExecReScanAppend(AppendState *node) } /* Let choose_next_subplan_* function handle setting the first subplan */ - node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_whichsyncplan = INVALID_SUBPLAN_INDEX; } /* ---------------------------------------------------------------- @@ -456,7 +670,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) static bool choose_next_subplan_locally(AppendState *node) { - int whichplan = node->as_whichplan; + int whichplan = node->as_whichsyncplan; int nextplan; /* We should never be called when there are no subplans */ @@ -475,6 +689,10 @@ choose_next_subplan_locally(AppendState *node) node->as_valid_subplans = ExecFindMatchingSubPlans(node->as_prune_state); + /* Exclude async plans */ + if (node->as_nasyncplans > 0) + bms_del_range(node->as_valid_subplans, 0, node->as_nasyncplans - 1); + whichplan = -1; } @@ -489,7 +707,7 @@ choose_next_subplan_locally(AppendState *node) if (nextplan < 0) return false; - node->as_whichplan = nextplan; + node->as_whichsyncplan = nextplan; return true; } @@ -511,19 +729,19 @@ choose_next_subplan_for_leader(AppendState *node) Assert(ScanDirectionIsForward(node->ps.state->es_direction)); /* We should never be called when there are no subplans */ - Assert(node->as_whichplan != NO_MATCHING_SUBPLANS); + Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); - if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX) { /* Mark just-completed subplan as finished. */ - node->as_pstate->pa_finished[node->as_whichplan] = true; + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; } else { /* Start with last subplan. */ - node->as_whichplan = node->as_nplans - 1; + node->as_whichsyncplan = node->as_nplans - 1; /* * If we've yet to determine the valid subplans then do so now. If @@ -544,12 +762,12 @@ choose_next_subplan_for_leader(AppendState *node) } /* Loop until we find a subplan to execute. */ - while (pstate->pa_finished[node->as_whichplan]) + while (pstate->pa_finished[node->as_whichsyncplan]) { - if (node->as_whichplan == 0) + if (node->as_whichsyncplan == 0) { pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; - node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_whichsyncplan = INVALID_SUBPLAN_INDEX; LWLockRelease(&pstate->pa_lock); return false; } @@ -558,12 +776,12 @@ choose_next_subplan_for_leader(AppendState *node) * We needn't pay attention to as_valid_subplans here as all invalid * plans have been marked as finished. */ - node->as_whichplan--; + node->as_whichsyncplan--; } /* If non-partial, immediately mark as finished. */ - if (node->as_whichplan < node->as_first_partial_plan) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; LWLockRelease(&pstate->pa_lock); @@ -592,13 +810,13 @@ choose_next_subplan_for_worker(AppendState *node) Assert(ScanDirectionIsForward(node->ps.state->es_direction)); /* We should never be called when there are no subplans */ - Assert(node->as_whichplan != NO_MATCHING_SUBPLANS); + Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); /* Mark just-completed subplan as finished. */ - if (node->as_whichplan != INVALID_SUBPLAN_INDEX) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; /* * If we've yet to determine the valid subplans then do so now. If @@ -620,7 +838,7 @@ choose_next_subplan_for_worker(AppendState *node) } /* Save the plan from which we are starting the search. */ - node->as_whichplan = pstate->pa_next_plan; + node->as_whichsyncplan = pstate->pa_next_plan; /* Loop until we find a valid subplan to execute. */ while (pstate->pa_finished[pstate->pa_next_plan]) @@ -634,7 +852,7 @@ choose_next_subplan_for_worker(AppendState *node) /* Advance to the next valid plan. */ pstate->pa_next_plan = nextplan; } - else if (node->as_whichplan > node->as_first_partial_plan) + else if (node->as_whichsyncplan > node->as_first_partial_plan) { /* * Try looping back to the first valid partial plan, if there is @@ -643,7 +861,7 @@ choose_next_subplan_for_worker(AppendState *node) nextplan = bms_next_member(node->as_valid_subplans, node->as_first_partial_plan - 1); pstate->pa_next_plan = - nextplan < 0 ? node->as_whichplan : nextplan; + nextplan < 0 ? node->as_whichsyncplan : nextplan; } else { @@ -651,10 +869,10 @@ choose_next_subplan_for_worker(AppendState *node) * At last plan, and either there are no partial plans or we've * tried them all. Arrange to bail out. */ - pstate->pa_next_plan = node->as_whichplan; + pstate->pa_next_plan = node->as_whichsyncplan; } - if (pstate->pa_next_plan == node->as_whichplan) + if (pstate->pa_next_plan == node->as_whichsyncplan) { /* We've tried everything! */ pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; @@ -664,7 +882,7 @@ choose_next_subplan_for_worker(AppendState *node) } /* Pick the plan we found, and advance pa_next_plan one more time. */ - node->as_whichplan = pstate->pa_next_plan; + node->as_whichsyncplan = pstate->pa_next_plan; pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, pstate->pa_next_plan); @@ -691,8 +909,8 @@ choose_next_subplan_for_worker(AppendState *node) } /* If non-partial, immediately mark as finished. */ - if (node->as_whichplan < node->as_first_partial_plan) - node->as_pstate->pa_finished[node->as_whichplan] = true; + if (node->as_whichsyncplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichsyncplan] = true; LWLockRelease(&pstate->pa_lock); diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 52af1dac5c..1a54383ec8 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -117,7 +117,6 @@ ExecForeignScan(PlanState *pstate) (ExecScanRecheckMtd) ForeignRecheck); } - /* ---------------------------------------------------------------- * ExecInitForeignScan * ---------------------------------------------------------------- @@ -141,6 +140,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) scanstate->ss.ps.plan = (Plan *) node; scanstate->ss.ps.state = estate; scanstate->ss.ps.ExecProcNode = ExecForeignScan; + scanstate->ss.ps.asyncstate = AS_AVAILABLE; + + if ((eflags & EXEC_FLAG_ASYNC) != 0) + scanstate->fs_async = true; /* * Miscellaneous initialization @@ -384,3 +387,20 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanConfigureWait + * + * In async mode, configure for a wait + * ---------------------------------------------------------------- + */ +bool +ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes, + void *caller_data, bool reinit) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); + return fdwroutine->ForeignAsyncConfigureWait(node, wes, + caller_data, reinit); +} diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c index 665149defe..5d4e19a052 100644 --- a/src/backend/nodes/bitmapset.c +++ b/src/backend/nodes/bitmapset.c @@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper) return a; } +/* + * bms_del_range + * Delete members in the range of 'lower' to 'upper' from the set. + * + * Note this could also be done by calling bms_del_member in a loop, however, + * using this function will be faster when the range is large as we work at + * the bitmapword level rather than at bit level. + */ +Bitmapset * +bms_del_range(Bitmapset *a, int lower, int upper) +{ + int lwordnum, + lbitnum, + uwordnum, + ushiftbits, + wordnum; + + if (lower < 0 || upper < 0) + elog(ERROR, "negative bitmapset member not allowed"); + if (lower > upper) + elog(ERROR, "lower range must not be above upper range"); + uwordnum = WORDNUM(upper); + + if (a == NULL) + { + a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1)); + a->nwords = uwordnum + 1; + } + + /* ensure we have enough words to store the upper bit */ + else if (uwordnum >= a->nwords) + { + int oldnwords = a->nwords; + int i; + + a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1)); + a->nwords = uwordnum + 1; + /* zero out the enlarged portion */ + for (i = oldnwords; i < a->nwords; i++) + a->words[i] = 0; + } + + wordnum = lwordnum = WORDNUM(lower); + + lbitnum = BITNUM(lower); + ushiftbits = BITNUM(upper) + 1; + + /* + * Special case when lwordnum is the same as uwordnum we must perform the + * upper and lower masking on the word. + */ + if (lwordnum == uwordnum) + { + a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1) + | (~(bitmapword) 0) << ushiftbits); + } + else + { + /* turn off lbitnum and all bits left of it */ + a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1); + + /* turn off all bits for any intermediate words */ + while (wordnum < uwordnum) + a->words[wordnum++] = (bitmapword) 0; + + /* turn off upper's bit and all bits right of it. */ + a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits; + } + + return a; +} + /* * bms_int_members - like bms_intersect, but left input is recycled */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index a74b56bb59..a266904010 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -244,6 +244,8 @@ _copyAppend(const Append *from) COPY_NODE_FIELD(appendplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); + COPY_SCALAR_FIELD(nasyncplans); + COPY_SCALAR_FIELD(referent); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index a80eccc2c1..bf87e721a5 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -434,6 +434,8 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(appendplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); + WRITE_INT_FIELD(nasyncplans); + WRITE_INT_FIELD(referent); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 764e3bb90c..25b84b3f15 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1639,6 +1639,8 @@ _readAppend(void) READ_NODE_FIELD(appendplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); + READ_INT_FIELD(nasyncplans); + READ_INT_FIELD(referent); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index aee81bd755..c676980a30 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -207,6 +207,9 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual Index scanrelid, char *enrname); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); +static Append *make_append(List *appendplans, int first_partial_plan, + int nasyncplans, int referent, + List *tlist, PartitionPruneInfo *partpruneinfos); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -292,6 +295,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root, List *rowMarks, OnConflictExpr *onconflict, int epqParam); static GatherMerge *create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path); +static bool is_async_capable_path(Path *path); /* @@ -1069,6 +1073,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) bool tlist_was_changed = false; List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; + List *asyncplans = NIL; + List *syncplans = NIL; ListCell *subpaths; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; @@ -1077,6 +1083,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) Oid *nodeSortOperators = NULL; Oid *nodeCollations = NULL; bool *nodeNullsFirst = NULL; + int nasyncplans = 0; + bool first = true; + bool referent_is_sync = true; /* * The subpaths list could be empty, if every child was proven empty by @@ -1206,6 +1215,23 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } subplans = lappend(subplans, subplan); + + /* + * Classify as async-capable or not. If we have decided to run the + * chidlren in parallel, we cannot any one of them run asynchronously. + */ + if (!best_path->path.parallel_safe && is_async_capable_path(subpath)) + { + subplan->async_capable = true; + asyncplans = lappend(asyncplans, subplan); + ++nasyncplans; + if (first) + referent_is_sync = false; + } + else + syncplans = lappend(syncplans, subplan); + + first = false; } /* @@ -1244,6 +1270,18 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; + /* + * XXX ideally, if there's just one child, we'd not bother to generate an + * Append node but just return the single child. At the moment this does + * not work because the varno of the child scan plan won't match the + * parent-rel Vars it'll be asked to emit. + */ + + plan = make_append(list_concat(asyncplans, syncplans), + best_path->first_partial_path, nasyncplans, + referent_is_sync ? nasyncplans : 0, tlist, + partpruneinfo); + copy_generic_path_info(&plan->plan, (Path *) best_path); /* @@ -5462,6 +5500,27 @@ make_foreignscan(List *qptlist, return node; } +static Append * +make_append(List *appendplans, int first_partial_plan, int nasyncplans, + int referent, List *tlist, PartitionPruneInfo *partpruneinfo) +{ + Append *node = makeNode(Append); + Plan *plan = &node->plan; + + plan->targetlist = tlist; + plan->qual = NIL; + plan->lefttree = NULL; + plan->righttree = NULL; + + node->appendplans = appendplans; + node->first_partial_plan = first_partial_plan; + node->part_prune_info = partpruneinfo; + node->nasyncplans = nasyncplans; + node->referent = referent; + + return node; +} + static RecursiveUnion * make_recursive_union(List *tlist, Plan *lefttree, @@ -6836,3 +6895,27 @@ is_projection_capable_plan(Plan *plan) } return true; } + +/* + * is_projection_capable_path + * Check whether a given Path node is async-capable. + */ +static bool +is_async_capable_path(Path *path) +{ + switch (nodeTag(path)) + { + case T_ForeignPath: + { + FdwRoutine *fdwroutine = path->parent->fdwroutine; + + Assert(fdwroutine != NULL); + if (fdwroutine->IsForeignPathAsyncCapable != NULL && + fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) + return true; + } + default: + break; + } + return false; +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index fabcf31de8..575ccd5def 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3853,6 +3853,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_ASYNC_WAIT: + event_name = "AsyncExecWait"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index bb2baff763..7669e6ff53 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -306,7 +306,7 @@ SysLoggerMain(int argc, char *argv[]) * syslog pipe, which implies that all other backends have exited * (including the postmaster). */ - wes = CreateWaitEventSet(CurrentMemoryContext, 2); + wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); #ifndef WIN32 AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL); diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 13685a0a0e..60b749f062 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -4640,7 +4640,7 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps) dpns->planstate = ps; /* - * We special-case Append and MergeAppend to pretend that the first child + * We special-case Append and MergeAppend to pretend that a specific child * plan is the OUTER referent; we have to interpret OUTER Vars in their * tlists according to one of the children, and the first one is the most * natural choice. Likewise special-case ModifyTable to pretend that the @@ -4648,7 +4648,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps) * lists containing references to non-target relations. */ if (IsA(ps, AppendState)) - dpns->outer_planstate = ((AppendState *) ps)->appendplans[0]; + { + AppendState *aps = (AppendState *) ps; + Append *app = (Append *) ps->plan; + dpns->outer_planstate = aps->appendplans[app->referent]; + } else if (IsA(ps, MergeAppendState)) dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0]; else if (IsA(ps, ModifyTableState)) diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h new file mode 100644 index 0000000000..5fd67d9004 --- /dev/null +++ b/src/include/executor/execAsync.h @@ -0,0 +1,23 @@ +/*-------------------------------------------------------------------- + * execAsync.c + * Support functions for asynchronous query execution + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + *-------------------------------------------------------------------- + */ +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" +#include "storage/latch.h" + +extern void ExecAsyncSetState(PlanState *pstate, AsyncState status); +extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node, + void *data, bool reinit); +extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, + long timeout); +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6298c7c8ca..4adb2efe76 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -59,6 +59,7 @@ #define EXEC_FLAG_MARK 0x0008 /* need mark/restore */ #define EXEC_FLAG_SKIP_TRIGGERS 0x0010 /* skip AfterTrigger calls */ #define EXEC_FLAG_WITH_NO_DATA 0x0020 /* rel scannability doesn't matter */ +#define EXEC_FLAG_ASYNC 0x0040 /* request async execution */ /* Hook for plugins to get control in ExecutorStart() */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index ca7723c899..81791033af 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, bool reinit); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 822686033e..851cd15e65 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); +typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, + bool reinit); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler @@ -190,6 +195,7 @@ typedef struct FdwRoutine GetForeignPlan_function GetForeignPlan; BeginForeignScan_function BeginForeignScan; IterateForeignScan_function IterateForeignScan; + IterateForeignScan_function IterateForeignScanAsync; ReScanForeignScan_function ReScanForeignScan; EndForeignScan_function EndForeignScan; @@ -242,6 +248,11 @@ typedef struct FdwRoutine InitializeDSMForeignScan_function InitializeDSMForeignScan; ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ShutdownForeignScan_function ShutdownForeignScan; /* Support functions for path reparameterization. */ diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h index 0c645628e5..d97a4c2235 100644 --- a/src/include/nodes/bitmapset.h +++ b/src/include/nodes/bitmapset.h @@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b); extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper); extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b); extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b); +extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper); extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b); /* support for iterating through the integer elements of a set: */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6eb647290b..46d7fbab3a 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -932,6 +932,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate); * abstract superclass for all PlanState-type nodes. * ---------------- */ +typedef enum AsyncState +{ + AS_AVAILABLE, + AS_WAITING +} AsyncState; + typedef struct PlanState { NodeTag type; @@ -1020,6 +1026,11 @@ typedef struct PlanState bool outeropsset; bool inneropsset; bool resultopsset; + + /* Async subnode execution sutff */ + AsyncState asyncstate; + + int32 padding; /* to keep alignment of derived types */ } PlanState; /* ---------------- @@ -1216,14 +1227,20 @@ struct AppendState PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; - int as_whichplan; + int as_whichsyncplan; /* which sync plan is being executed */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ + int as_nasyncplans; /* # of async-capable children */ ParallelAppendState *as_pstate; /* parallel coordination info */ Size pstate_len; /* size of parallel coordination info */ struct PartitionPruneState *as_prune_state; Bitmapset *as_valid_subplans; bool (*choose_next_subplan) (AppendState *); + bool as_syncdone; /* all synchronous plans done? */ + Bitmapset *as_needrequest; /* async plans needing a new request */ + Bitmapset *as_pending_async; /* pending async plans */ + TupleTableSlot **as_asyncresult; /* unreturned results of async plans */ + int as_nasyncresult; /* # of valid entries in as_asyncresult */ }; /* ---------------- @@ -1786,6 +1803,7 @@ typedef struct ForeignScanState Size pscan_len; /* size of parallel coordination information */ /* use struct pointer to avoid including fdwapi.h here */ struct FdwRoutine *fdwroutine; + bool fs_async; void *fdw_state; /* foreign-data wrapper can keep state here */ } ForeignScanState; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 8e6594e355..26810915e5 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -133,6 +133,11 @@ typedef struct Plan bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + /* + * information needed for asynchronous execution + */ + bool async_capable; /* engage asyncronous execution logic? */ + /* * Common structural data for all Plan types. */ @@ -259,6 +264,10 @@ typedef struct Append /* Info for run-time subplan pruning; NULL if we're not doing that */ struct PartitionPruneInfo *part_prune_info; + + /* Async child node execution stuff */ + int nasyncplans; /* # async subplans, always at start of list */ + int referent; /* index of inheritance tree referent */ } Append; /* ---------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index fe076d823d..d57ef809fc 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -853,7 +853,8 @@ typedef enum WAIT_EVENT_REPLICATION_ORIGIN_DROP, WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, - WAIT_EVENT_SYNC_REP + WAIT_EVENT_SYNC_REP, + WAIT_EVENT_ASYNC_WAIT } WaitEventIPC; /* ---------- -- 2.23.0 From 8984d4e3360eaf79fc0f23b3e6817770be13fcfd Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp> Date: Thu, 19 Oct 2017 17:24:07 +0900 Subject: [PATCH v1 3/3] async postgres_fdw --- contrib/postgres_fdw/connection.c | 26 + .../postgres_fdw/expected/postgres_fdw.out | 222 ++++--- contrib/postgres_fdw/postgres_fdw.c | 615 ++++++++++++++++-- contrib/postgres_fdw/postgres_fdw.h | 2 + contrib/postgres_fdw/sql/postgres_fdw.sql | 20 +- 5 files changed, 711 insertions(+), 174 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 27b86a03f8..1afd99cad8 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -56,6 +56,7 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + void *storage; /* connection specific storage */ } ConnCacheEntry; /* @@ -200,6 +201,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); + entry->storage = NULL; } /* @@ -213,6 +215,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt) return entry->conn; } +/* + * Rerturns the connection specific storage for this user. Allocate with + * initsize if not exists. + */ +void * +GetConnectionSpecificStorage(UserMapping *user, size_t initsize) +{ + bool found; + ConnCacheEntry *entry; + ConnCacheKey key; + + key = user->umid; + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + Assert(found); + + if (entry->storage == NULL) + { + entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize); + memset(entry->storage, 0, initsize); + } + + return entry->storage; +} + /* * Connect to remote server using specified server and user mapping properties. */ diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 48282ab151..bd2e835d2d 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6900,7 +6900,7 @@ INSERT INTO a(aa) VALUES('aaaaa'); INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+------- a | aaa @@ -6928,7 +6928,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -6956,7 +6956,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -6984,7 +6984,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | newtoo @@ -7054,35 +7054,41 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid + Sort Key: bar.f1 + -> Seq Scan on public.bar + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid -> Foreign Scan on public.bar2 bar_1 Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.f1, foo.*, foo.tableoid - -> Foreign Scan on public.foo2 foo_1 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) + -> Seq Scan on public.foo + Output: foo.ctid, foo.f1, foo.*, foo.tableoid +(29 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -7092,35 +7098,41 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid + Sort Key: bar.f1 + -> Seq Scan on public.bar + Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid -> Foreign Scan on public.bar2 bar_1 Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.f1, foo.*, foo.tableoid - -> Foreign Scan on public.foo2 foo_1 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) + -> Seq Scan on public.foo + Output: foo.ctid, foo.f1, foo.*, foo.tableoid +(29 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 @@ -7150,11 +7162,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.f1, foo.*, foo.tableoid - -> Foreign Scan on public.foo2 foo_1 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 + -> Seq Scan on public.foo + Output: foo.ctid, foo.f1, foo.*, foo.tableoid -> Hash Join Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid Inner Unique: true @@ -7168,12 +7181,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo); Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: foo.ctid, foo.f1, foo.*, foo.tableoid - -> Foreign Scan on public.foo2 foo_1 + Async subplans: 1 + -> Async Foreign Scan on public.foo2 foo_1 Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(39 rows) + -> Seq Scan on public.foo + Output: foo.ctid, foo.f1, foo.*, foo.tableoid +(41 rows) update bar set f2 = f2 + 100 where f1 in (select f1 from foo); select tableoid::regclass, * from bar order by 1,2; @@ -7203,16 +7217,17 @@ where bar.f1 = ss.f1; Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1)) Hash Cond: (foo.f1 = bar.f1) -> Append - -> Seq Scan on public.foo - Output: ROW(foo.f1), foo.f1 - -> Foreign Scan on public.foo2 foo_1 + Async subplans: 2 + -> Async Foreign Scan on public.foo2 foo_1 Output: ROW(foo_1.f1), foo_1.f1 Remote SQL: SELECT f1 FROM public.loct1 - -> Seq Scan on public.foo foo_2 - Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3) - -> Foreign Scan on public.foo2 foo_3 + -> Async Foreign Scan on public.foo2 foo_3 Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3) Remote SQL: SELECT f1 FROM public.loct1 + -> Seq Scan on public.foo + Output: ROW(foo.f1), foo.f1 + -> Seq Scan on public.foo foo_2 + Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3) -> Hash Output: bar.f1, bar.f2, bar.ctid -> Seq Scan on public.bar @@ -7230,17 +7245,18 @@ where bar.f1 = ss.f1; Output: (ROW(foo.f1)), foo.f1 Sort Key: foo.f1 -> Append - -> Seq Scan on public.foo - Output: ROW(foo.f1), foo.f1 - -> Foreign Scan on public.foo2 foo_1 + Async subplans: 2 + -> Async Foreign Scan on public.foo2 foo_1 Output: ROW(foo_1.f1), foo_1.f1 Remote SQL: SELECT f1 FROM public.loct1 - -> Seq Scan on public.foo foo_2 - Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3) - -> Foreign Scan on public.foo2 foo_3 + -> Async Foreign Scan on public.foo2 foo_3 Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3) Remote SQL: SELECT f1 FROM public.loct1 -(45 rows) + -> Seq Scan on public.foo + Output: ROW(foo.f1), foo.f1 + -> Seq Scan on public.foo foo_2 + Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3) +(47 rows) update bar set f2 = f2 + 100 from @@ -7390,27 +7406,33 @@ delete from foo where f1 < 5 returning *; (5 rows) explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; - QUERY PLAN ------------------------------------------------------------------------------- - Update on public.bar - Output: bar.f1, bar.f2 - Update on public.bar - Foreign Update on public.bar2 bar_1 - -> Seq Scan on public.bar - Output: bar.f1, (bar.f2 + 100), bar.ctid - -> Foreign Update on public.bar2 bar_1 - Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 -(8 rows) +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; + QUERY PLAN +-------------------------------------------------------------------------------------- + Sort + Output: u.f1, u.f2 + Sort Key: u.f1 + CTE u + -> Update on public.bar + Output: bar.f1, bar.f2 + Update on public.bar + Foreign Update on public.bar2 bar_1 + -> Seq Scan on public.bar + Output: bar.f1, (bar.f2 + 100), bar.ctid + -> Foreign Update on public.bar2 bar_1 + Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2 + -> CTE Scan on u + Output: u.f1, u.f2 +(14 rows) -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; f1 | f2 ----+----- 1 | 311 2 | 322 - 6 | 266 3 | 333 4 | 344 + 6 | 266 7 | 277 (6 rows) @@ -8485,11 +8507,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J Sort Sort Key: t1.a, t3.c -> Append - -> Foreign Scan + Async subplans: 2 + -> Async Foreign Scan Relations: ((ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)) INNER JOIN (ftprt1_p1 t3) - -> Foreign Scan + -> Async Foreign Scan Relations: ((ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)) INNER JOIN (ftprt1_p2 t3_1) -(7 rows) +(8 rows) SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE t1.a% 25 =0 ORDER BY 1,2,3; a | b | c @@ -8524,20 +8547,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10) -- with whole-row reference; partitionwise join does not apply EXPLAIN (COSTS OFF) SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2; - QUERY PLAN --------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------- Sort Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2) -> Hash Full Join Hash Cond: (t1.a = t2.b) -> Append - -> Foreign Scan on ftprt1_p1 t1 - -> Foreign Scan on ftprt1_p2 t1_1 + Async subplans: 2 + -> Async Foreign Scan on ftprt1_p1 t1 + -> Async Foreign Scan on ftprt1_p2 t1_1 -> Hash -> Append - -> Foreign Scan on ftprt2_p1 t2 - -> Foreign Scan on ftprt2_p2 t2_1 -(11 rows) + Async subplans: 2 + -> Async Foreign Scan on ftprt2_p1 t2 + -> Async Foreign Scan on ftprt2_p2 t2_1 +(13 rows) SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2; wr | wr @@ -8566,11 +8591,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t Sort Sort Key: t1.a, t1.b -> Append - -> Foreign Scan + Async subplans: 2 + -> Async Foreign Scan Relations: (ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2) - -> Foreign Scan + -> Async Foreign Scan Relations: (ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1) -(7 rows) +(8 rows) SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE t1.a%25= 0 ORDER BY 1,2; a | b @@ -8623,21 +8649,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE -- test FOR UPDATE; partitionwise join does not apply EXPLAIN (COSTS OFF) SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; - QUERY PLAN --------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------- LockRows -> Sort Sort Key: t1.a -> Hash Join Hash Cond: (t2.b = t1.a) -> Append - -> Foreign Scan on ftprt2_p1 t2 - -> Foreign Scan on ftprt2_p2 t2_1 + Async subplans: 2 + -> Async Foreign Scan on ftprt2_p1 t2 + -> Async Foreign Scan on ftprt2_p2 t2_1 -> Hash -> Append - -> Foreign Scan on ftprt1_p1 t1 - -> Foreign Scan on ftprt1_p2 t1_1 -(12 rows) + Async subplans: 2 + -> Async Foreign Scan on ftprt1_p1 t1 + -> Async Foreign Scan on ftprt1_p2 t1_1 +(14 rows) SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1; a | b @@ -8672,18 +8700,19 @@ ANALYZE fpagg_tab_p3; SET enable_partitionwise_aggregate TO false; EXPLAIN (COSTS OFF) SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1; - QUERY PLAN ------------------------------------------------------------ + QUERY PLAN +----------------------------------------------------------------- Sort Sort Key: pagg_tab.a -> HashAggregate Group Key: pagg_tab.a Filter: (avg(pagg_tab.b) < '22'::numeric) -> Append - -> Foreign Scan on fpagg_tab_p1 pagg_tab - -> Foreign Scan on fpagg_tab_p2 pagg_tab_1 - -> Foreign Scan on fpagg_tab_p3 pagg_tab_2 -(9 rows) + Async subplans: 3 + -> Async Foreign Scan on fpagg_tab_p1 pagg_tab + -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_1 + -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_2 +(10 rows) -- Plan with partitionwise aggregates is enabled SET enable_partitionwise_aggregate TO true; @@ -8694,13 +8723,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O Sort Sort Key: pagg_tab.a -> Append - -> Foreign Scan + Async subplans: 3 + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p1 pagg_tab) - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1) - -> Foreign Scan + -> Async Foreign Scan Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2) -(9 rows) +(10 rows) SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1; a | sum | min | count diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index bdc21b36d1..f3212aac90 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,8 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" +#include "executor/nodeForeignscan.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -35,6 +37,7 @@ #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" +#include "pgstat.h" #include "postgres_fdw.h" #include "utils/builtins.h" #include "utils/float.h" @@ -56,6 +59,9 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Retrive PgFdwScanState struct from ForeginScanState */ +#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state) + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -122,11 +128,28 @@ enum FdwDirectModifyPrivateIndex FdwDirectModifyPrivateSetProcessed }; +/* + * Connection private area structure. + */ +typedef struct PgFdwConnpriv +{ + ForeignScanState *leader; /* leader node of this connection */ + bool busy; /* true if this connection is busy */ +} PgFdwConnpriv; + +/* Execution state base type */ +typedef struct PgFdwState +{ + PGconn *conn; /* connection for the scan */ + PgFdwConnpriv *connpriv; /* connection private memory */ +} PgFdwState; + /* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table. NULL * for a foreign join scan. */ TupleDesc tupdesc; /* tuple descriptor of scan */ @@ -137,7 +160,7 @@ typedef struct PgFdwScanState List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ + bool result_ready; unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -153,6 +176,12 @@ typedef struct PgFdwScanState /* batch-level state, for optimizing rewinds and avoiding useless fetch */ int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + bool run_async; /* true if run asynchronously */ + bool inqueue; /* true if this node is in waiter queue */ + ForeignScanState *waiter; /* Next node to run a query among nodes + * sharing the same connection */ + ForeignScanState *last_waiter; /* last waiting node in waiting queue. + * valid only on the leader node */ /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ @@ -166,11 +195,11 @@ typedef struct PgFdwScanState */ typedef struct PgFdwModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* for remote query execution */ - PGconn *conn; /* connection for the scan */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -197,6 +226,7 @@ typedef struct PgFdwModifyState */ typedef struct PgFdwDirectModifyState { + PgFdwState s; /* common structure */ Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ @@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); +static void postgresShutdownForeignScan(ForeignScanState *node); static void postgresAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); @@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static bool postgresForeignAsyncConfigureWait(ForeignScanState *node, + WaitEventSet *wes, + void *caller_data, bool reinit); /* * Helper functions @@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); -static void fetch_more_data(ForeignScanState *node); +static void request_more_data(ForeignScanState *node); +static void fetch_received_data(ForeignScanState *node); +static void vacate_connection(PgFdwState *fdwconn, bool clear_queue); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, @@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; + routine->ShutdownForeignScan = postgresShutdownForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; @@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for async execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + PG_RETURN_POINTER(routine); } @@ -1434,12 +1476,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->s.conn = GetConnection(user, false); + fsstate->s.connpriv = (PgFdwConnpriv *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv)); + fsstate->s.connpriv->leader = NULL; + fsstate->s.connpriv->busy = false; + fsstate->waiter = NULL; + fsstate->last_waiter = node; /* Assign a unique ID for my cursor */ - fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_number = GetCursorNumber(fsstate->s.conn); fsstate->cursor_exists = false; + /* Initialize async execution status */ + fsstate->run_async = false; + fsstate->inqueue = false; + /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); @@ -1487,40 +1539,259 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_values); } +/* + * Async queue manipuration functions + */ + +/* + * add_async_waiter: + * + * adds the node to the end of waiter queue. Immediately starts the node if no + * node is running + */ +static inline void +add_async_waiter(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *leader = fsstate->s.connpriv->leader; + + /* do nothing if the node is already in the queue or already eof'ed */ + if (leader == node || fsstate->inqueue || fsstate->eof_reached) + return; + + if (leader == NULL) + { + /* immediately send request if not busy */ + request_more_data(node); + } + else + { + PgFdwScanState *leader_state = GetPgFdwScanState(leader); + PgFdwScanState *last_waiter_state + = GetPgFdwScanState(leader_state->last_waiter); + + last_waiter_state->waiter = node; + leader_state->last_waiter = node; + fsstate->inqueue = true; + } +} + +/* + * move_to_next_waiter: + * + * Makes the first waiter be next leader + * Returns the new leader or NULL if there's no waiter. + */ +static inline ForeignScanState * +move_to_next_waiter(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *ret = fsstate->waiter; + + Assert(fsstate->s.connpriv->leader = node); + + if (ret) + { + PgFdwScanState *retstate = GetPgFdwScanState(ret); + fsstate->waiter = NULL; + retstate->last_waiter = fsstate->last_waiter; + retstate->inqueue = false; + } + + fsstate->s.connpriv->leader = ret; + + return ret; +} + +/* + * remove the node from waiter queue + * + * This is a bit different from the two above in the sense that this can + * operate on connection leader. The result is absorbed when this is called on + * active leader. + * + * Returns true if the node was found. + */ +static inline bool +remove_async_node(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *leader = fsstate->s.connpriv->leader; + PgFdwScanState *leader_state; + ForeignScanState *prev; + PgFdwScanState *prev_state; + ForeignScanState *cur; + + /* no need to remove me */ + if (!leader || !fsstate->inqueue) + return false; + + leader_state = GetPgFdwScanState(leader); + + /* Remove the leader node */ + if (leader == node) + { + ForeignScanState *next_leader; + + if (leader_state->s.connpriv->busy) + { + /* + * this node is waiting for result, absorb the result first so + * that the following commands can be sent on the connection. + */ + PgFdwScanState *leader_state = GetPgFdwScanState(leader); + PGconn *conn = leader_state->s.conn; + + while(PQisBusy(conn)) + PQclear(PQgetResult(conn)); + + leader_state->s.connpriv->busy = false; + } + + /* Make the first waiter the leader */ + if (leader_state->waiter) + { + PgFdwScanState *next_leader_state; + + next_leader = leader_state->waiter; + next_leader_state = GetPgFdwScanState(next_leader); + + leader_state->s.connpriv->leader = next_leader; + next_leader_state->last_waiter = leader_state->last_waiter; + } + leader_state->waiter = NULL; + + return true; + } + + /* + * Just remove the node in queue + * + * This function is called on the shutdown path. We don't bother + * considering faster way to do this. + */ + prev = leader; + prev_state = leader_state; + cur = GetPgFdwScanState(prev)->waiter; + while (cur) + { + PgFdwScanState *curstate = GetPgFdwScanState(cur); + + if (cur == node) + { + prev_state->waiter = curstate->waiter; + if (leader_state->last_waiter == cur) + leader_state->last_waiter = prev; + else + leader_state->last_waiter = cur; + + fsstate->inqueue = false; + + return true; + } + prev = cur; + prev_state = curstate; + cur = curstate->waiter; + } + + return false; +} + /* * postgresIterateForeignScan - * Retrieve next row from the result set, or clear tuple slot to indicate - * EOF. + * Retrieve next row from the result set. + * + * For synchronous nodes, returns clear tuples slot to indicte EOF. + * + * If the node is asynchronous one, clear tuple slot has two meanings. + * If the caller receives clear tuple slot, asyncstate indicates wheter + * the node is EOF (AS_AVAILABLE) or waiting for data to + * come(AS_WAITING). */ static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. - */ - if (!fsstate->cursor_exists) - create_cursor(node); + if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached) + { + /* we've run out, get some more tuples */ + if (!node->fs_async) + { + /* finish running query to send my command */ + if (!fsstate->s.connpriv->busy) + vacate_connection((PgFdwState *)fsstate, false); + + request_more_data(node); + + /* + * Fetch the result immediately. This executes the next waiter if + * any. + */ + fetch_received_data(node); + } + else if (!fsstate->s.connpriv->busy) + { + /* If the connection is not busy, just send the request. */ + request_more_data(node); + } + else + { + /* This connection is busy */ + bool available = true; + ForeignScanState *leader = fsstate->s.connpriv->leader; + PgFdwScanState *leader_state = GetPgFdwScanState(leader); + + /* Check if the result is immediately available */ + if (PQisBusy(leader_state->s.conn)) + { + int rc = WaitLatchOrSocket(NULL, + WL_SOCKET_READABLE | WL_TIMEOUT | + WL_EXIT_ON_PM_DEATH, + PQsocket(leader_state->s.conn), 0, + WAIT_EVENT_ASYNC_WAIT); + if (!(rc & WL_SOCKET_READABLE)) + available = false; + } + + /* The next waiter is executed automatcically */ + if (available) + fetch_received_data(leader); + + /* add the requested node */ + add_async_waiter(node); + + /* add the previous leader */ + add_async_waiter(leader); + } + } /* - * Get some more tuples, if we've run out. + * If we haven't received a result for the given node this time, + * return with no tuple to give way to another node. */ if (fsstate->next_tuple >= fsstate->num_tuples) { - /* No point in another fetch if we already detected EOF, though. */ - if (!fsstate->eof_reached) - fetch_more_data(node); - /* If we didn't get any tuples, must be end of data. */ - if (fsstate->next_tuple >= fsstate->num_tuples) - return ExecClearTuple(slot); + if (fsstate->eof_reached) + { + fsstate->result_ready = true; + node->ss.ps.asyncstate = AS_AVAILABLE; + } + else + { + fsstate->result_ready = false; + node->ss.ps.asyncstate = AS_WAITING; + } + + return ExecClearTuple(slot); } /* * Return the next tuple. */ + fsstate->result_ready = true; + node->ss.ps.asyncstate = AS_AVAILABLE; ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++], slot, false); @@ -1535,7 +1806,7 @@ postgresIterateForeignScan(ForeignScanState *node) static void postgresReScanForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); char sql[64]; PGresult *res; @@ -1543,6 +1814,8 @@ postgresReScanForeignScan(ForeignScanState *node) if (!fsstate->cursor_exists) return; + vacate_connection((PgFdwState *)fsstate, true); + /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should @@ -1571,9 +1844,9 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql); PQclear(res); /* Now force a fresh FETCH. */ @@ -1591,7 +1864,7 @@ postgresReScanForeignScan(ForeignScanState *node) static void postgresEndForeignScan(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) @@ -1599,15 +1872,31 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->s.conn, fsstate->cursor_number); /* Release remote connection */ - ReleaseConnection(fsstate->conn); - fsstate->conn = NULL; + ReleaseConnection(fsstate->s.conn); + fsstate->s.conn = NULL; /* MemoryContexts will be deleted automatically. */ } +/* + * postgresShutdownForeignScan + * Remove asynchrony stuff and cleanup garbage on the connection. + */ +static void +postgresShutdownForeignScan(ForeignScanState *node) +{ + ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; + + if (plan->operation != CMD_SELECT) + return; + + /* remove the node from waiting queue */ + remove_async_node(node); +} + /* * postgresAddForeignUpdateTargets * Add resjunk column(s) needed for update/delete on a foreign table @@ -2372,7 +2661,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->s.conn = GetConnection(user, false); + dmstate->s.connpriv = (PgFdwConnpriv *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv)); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2457,7 +2748,11 @@ postgresIterateDirectModify(ForeignScanState *node) * If this is the first call after Begin, execute the statement. */ if (dmstate->num_tuples == -1) + { + /* finish running query to send my command */ + vacate_connection((PgFdwState *)dmstate, true); execute_dml_stmt(node); + } /* * If the local query doesn't specify RETURNING, just clear tuple slot. @@ -2504,8 +2799,8 @@ postgresEndDirectModify(ForeignScanState *node) PQclear(dmstate->result); /* Release remote connection */ - ReleaseConnection(dmstate->conn); - dmstate->conn = NULL; + ReleaseConnection(dmstate->s.conn); + dmstate->s.conn = NULL; /* MemoryContext will be deleted automatically. */ } @@ -2703,6 +2998,7 @@ estimate_path_cost_size(PlannerInfo *root, List *local_param_join_conds; StringInfoData sql; PGconn *conn; + PgFdwConnpriv *connpriv; Selectivity local_sel; QualCost local_cost; List *fdw_scan_tlist = NIL; @@ -2747,6 +3043,18 @@ estimate_path_cost_size(PlannerInfo *root, /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); + connpriv = GetConnectionSpecificStorage(fpinfo->user, + sizeof(PgFdwConnpriv)); + if (connpriv) + { + PgFdwState tmpstate; + tmpstate.conn = conn; + tmpstate.connpriv = connpriv; + + /* finish running query to send my command */ + vacate_connection(&tmpstate, true); + } + get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3317,11 +3625,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, static void create_cursor(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; StringInfoData buf; PGresult *res; @@ -3384,50 +3692,127 @@ create_cursor(ForeignScanState *node) } /* - * Fetch some more rows from the node's cursor. + * Sends the next request of the node. If the given node is different from the + * current connection leader, pushes it back to waiter queue and let the given + * node be the leader. */ static void -fetch_more_data(ForeignScanState *node) +request_more_data(ForeignScanState *node) { - PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PgFdwScanState *fsstate = GetPgFdwScanState(node); + ForeignScanState *leader = fsstate->s.connpriv->leader; + PGconn *conn = fsstate->s.conn; + char sql[64]; + + /* must be non-busy */ + Assert(!fsstate->s.connpriv->busy); + /* must be not-eof */ + Assert(!fsstate->eof_reached); + + /* + * If this is the first call after Begin or ReScan, we need to create the + * cursor on the remote side. + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); + + fsstate->s.connpriv->busy = true; + + /* Let the node be the leader if it is different from current one */ + if (leader != node) + { + /* + * If the connection leader exists, insert the node as the connection + * leader making the current leader be the first waiter. + */ + if (leader != NULL) + { + remove_async_node(node); + fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter; + fsstate->waiter = leader; + } + else + { + fsstate->last_waiter = node; + fsstate->waiter = NULL; + } + + fsstate->s.connpriv->leader = node; + } +} + +/* + * Fetches received data and automatically send requests of the next waiter. + */ +static void +fetch_received_data(ForeignScanState *node) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); PGresult *volatile res = NULL; MemoryContext oldcontext; + ForeignScanState *waiter; + + /* I should be the current connection leader */ + Assert(fsstate->s.connpriv->leader == node); /* * We'll store the tuples in the batch_cxt. First, flush the previous - * batch. + * batch if no tuple is remaining */ - fsstate->tuples = NULL; - MemoryContextReset(fsstate->batch_cxt); + if (fsstate->next_tuple >= fsstate->num_tuples) + { + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + MemoryContextReset(fsstate->batch_cxt); + } + else if (fsstate->next_tuple > 0) + { + /* move the remaining tuples to the beginning of the store */ + int n = 0; + + while(fsstate->next_tuple < fsstate->num_tuples) + fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++]; + fsstate->num_tuples = n; + } + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = fsstate->conn; + PGconn *conn = fsstate->s.conn; char sql[64]; - int numrows; + int addrows; + size_t newsize; int i; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); - res = pgfdw_exec_query(conn, sql); + res = pgfdw_get_result(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; + addrows = PQntuples(res); + newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple); + if (fsstate->tuples) + fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize); + else + fsstate->tuples = (HeapTuple *) palloc(newsize); - for (i = 0; i < numrows; i++) + for (i = 0; i < addrows; i++) { Assert(IsA(node->ss.ps.plan, ForeignScan)); - fsstate->tuples[i] = + fsstate->tuples[fsstate->num_tuples + i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, @@ -3437,22 +3822,75 @@ fetch_more_data(ForeignScanState *node) } /* Update fetch_ct_2 */ - if (fsstate->fetch_ct_2 < 2) + if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0) fsstate->fetch_ct_2++; + fsstate->next_tuple = 0; + fsstate->num_tuples += addrows; + /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); + fsstate->eof_reached = (addrows < fsstate->fetch_size); + + PQclear(res); + res = NULL; } PG_FINALLY(); { + fsstate->s.connpriv->busy = false; + if (res) PQclear(res); } PG_END_TRY(); + fsstate->s.connpriv->busy = false; + + /* let the first waiter be the next leader of this connection */ + waiter = move_to_next_waiter(node); + + /* send the next request if any */ + if (waiter) + request_more_data(waiter); + MemoryContextSwitchTo(oldcontext); } +/* + * Vacate a connection so that this node can send the next query + */ +static void +vacate_connection(PgFdwState *fdwstate, bool clear_queue) +{ + PgFdwConnpriv *connpriv = fdwstate->connpriv; + ForeignScanState *leader; + + /* the connection is alrady available */ + if (connpriv == NULL || connpriv->leader == NULL || !connpriv->busy) + return; + + /* + * let the current connection leader read the result for the running query + */ + leader = connpriv->leader; + fetch_received_data(leader); + + /* let the first waiter be the next leader of this connection */ + move_to_next_waiter(leader); + + if (!clear_queue) + return; + + /* Clear the waiting list */ + while (leader) + { + PgFdwScanState *fsstate = GetPgFdwScanState(leader); + + fsstate->last_waiter = NULL; + leader = fsstate->waiter; + fsstate->waiter = NULL; + } +} + /* * Force assorted GUC parameters to settings that ensure that we'll output * data values in a form that is unambiguous to the remote server. @@ -3566,7 +4004,9 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->s.conn = GetConnection(user, true); + fmstate->s.connpriv = (PgFdwConnpriv *) + GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv)); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -3653,6 +4093,9 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + /* finish running query to send my command */ + vacate_connection((PgFdwState *)fmstate, true); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3680,14 +4123,14 @@ execute_foreign_modify(EState *estate, /* * Execute the prepared statement. */ - if (!PQsendQueryPrepared(fmstate->conn, + if (!PQsendQueryPrepared(fmstate->s.conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -3695,10 +4138,10 @@ execute_foreign_modify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -3734,7 +4177,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", - GetPrepStmtNumber(fmstate->conn)); + GetPrepStmtNumber(fmstate->s.conn)); p_name = pstrdup(prep_name); /* @@ -3744,12 +4187,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. */ - if (!PQsendPrepare(fmstate->conn, + if (!PQsendPrepare(fmstate->s.conn, p_name, fmstate->query, 0, NULL)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query); /* * Get the result, and check for success. @@ -3757,9 +4200,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->s.conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ @@ -3888,16 +4331,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->s.conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql); PQclear(res); fmstate->p_name = NULL; } /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + ReleaseConnection(fmstate->s.conn); + fmstate->s.conn = NULL; } /* @@ -4056,9 +4499,9 @@ execute_dml_stmt(ForeignScanState *node) * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. */ - if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams, NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query); /* * Get the result, and check for success. @@ -4066,10 +4509,10 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true, dmstate->query); /* Get the number of rows affected. */ @@ -5560,6 +6003,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + return true; +} + + +/* + * Configure waiting event. + * + * Add an wait event only when the node is the connection leader. Elsewise + * another node on this connection is the leader. + */ +static bool +postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes, + void *caller_data, bool reinit) +{ + PgFdwScanState *fsstate = GetPgFdwScanState(node); + + + /* If the caller didn't reinit, this event is already in event set */ + if (!reinit) + return true; + + if (fsstate->s.connpriv->leader == node) + { + AddWaitEventToSet(wes, + WL_SOCKET_READABLE, PQsocket(fsstate->s.conn), + NULL, caller_data); + return true; + } + + return false; +} + + /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index ea052872c3..696af73408 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo UserMapping *user; /* only set in use_remote_estimate mode */ int fetch_size; /* fetch size for this remote table */ + bool allow_prefetch; /* true to allow overlapped fetching */ /* * Name of the relation, for use while EXPLAINing ForeignScan. It is used @@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 1c5c37b783..69c06ac6e4 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1730,25 +1730,25 @@ INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; @@ -1790,12 +1790,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) @@ -1854,8 +1854,8 @@ explain (verbose, costs off) delete from foo where f1 < 5 returning *; delete from foo where f1 < 5 returning *; explain (verbose, costs off) -update bar set f2 = f2 + 100 returning *; -update bar set f2 = f2 + 100 returning *; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; +with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1; -- Test that UPDATE/DELETE with inherited target works with row-level triggers CREATE TRIGGER trig_row_before -- 2.23.0
On Thu, Dec 5, 2019 at 03:19:50PM -0500, Robert Haas wrote: > On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <bruce@momjian.us> wrote: > > I agree with Stephen's request. We have been waiting for the executor > > rewrite for a while, so let's just do something simple and see how it > > performs. > > I'm sympathetic to the frustration here, and I think it would be great > if we could find a way forward that doesn't involve waiting for a full > rewrite of the executor. However, I seem to remember that when we > tested the various patches that various people had written for this > feature (I wrote one, too) they all had a noticeable performance > penalty in the case of a plain old Append that involved no FDWs and > nothing asynchronous. I don't think it's OK to have, say, a 2% > regression on every query that involves an Append, because especially > now that we have partitioning, that's a lot of queries. > > I don't know whether this patch has that kind of problem. If it > doesn't, I would consider that a promising sign. Certainly any overhead on normal queries would be unacceptable. -- Bruce Momjian <bruce@momjian.us> http://momjian.us EnterpriseDB http://enterprisedb.com + As you are, so once was I. As I am, so you will be. + + Ancient Roman grave inscription +
Hello. I think I can say that this patch doesn't slows non-AsyncAppend, non-postgres_fdw scans. At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <bruce@momjian.us> wrote in > Certainly any overhead on normal queries would be unacceptable. I took performance numbers on the current shape of the async execution patch for the following scan cases. t0 : single local table (parallel disabled) pll : local partitioning (local Append, parallel disabled) ft0 : single foreign table pf0 : inheritance on 4 foreign tables, single connection pf1 : inheritance on 4 foreign tables, 4 connections ptf0 : partition on 4 foreign tables, single connection ptf1 : partition on 4 foreign tables, 4 connections The benchmarking system is configured as the follows on a single machine. [ benchmark client ] | | (localhost:5433) (localhost:5432) | | +----+ | +------+ | | V V V | V | [master server] | [async server] | V | V +--fdw--+ +--fdw--+ The patch works roughly in the following steps. 1. Planner decides how many children out of an append can run asynchrnously (called as async-capable.). 2. While ExecInit if an Append doesn't have an async-capable children, ExecAppend that is exactly the same function is set as ExecProcNode. Otherwise ExecAppendAsync is used. If the infrastructure part in the patch causes any degradation, the "t0"(scan on local single table) and/or "pll" test (scan on a local paritioned table) gets slow. 3. postgresql_fdw always runs async-capable code path. If the postgres_fdw part causes degradation, ft0 reflects that. The tables has two integers and the query does sum(a) on all tuples. With the default fetch_size = 100, number is run time in ms. Each number is the average of 14 runs. master patched gain t0 7325 7130 +2.7% pll 4558 4484 +1.7% ft0 3670 3675 -0.1% pf0 2322 1550 +33.3% pf1 2367 1475 +37.7% ptf0 2517 1624 +35.5% ptf1 2343 1497 +36.2% With larger fetch_size (200) the gain mysteriously decreases for sharing single connection cases (pf0, ptf0), but others don't seem change so much. master patched gain t0 7212 7252 -0.6% pll 4546 4397 +3.3% ft0 3712 3731 -0.5% pf0 2131 1570 +26.4% pf1 1926 1189 +38.3% ptf0 2001 1557 +22.2% ptf1 1903 1193 +37.4% FWIW, attached are the test script. gentblr2.sql: Table creation script. testrun.sh : Benchmarking script. regards. -- Kyotaro Horiguchi NTT Open Source Software Center SELECT :scale * 0 as th0, :scale * 1 as th1, :scale * 2 as th2, :scale * 3 as th3, :scale * 4 as th4, :scale * 10 as th10, :scale * 20 as th20, :scale * 30 as th30, :scale * 40 as th40, :scale * 100 as th100, :scale * 1000 as th1000 \gset DROP TABLE IF EXISTS t0 CASCADE; DROP TABLE IF EXISTS pl CASCADE; DROP TABLE IF EXISTS pll CASCADE; DROP TABLE IF EXISTS pf0 CASCADE; DROP TABLE IF EXISTS pf1 CASCADE; DROP TABLE IF EXISTS ptf0; DROP TABLE IF EXISTS ptf1; CREATE TABLE pl (a int, b int); CREATE TABLE cl1 (LIKE pl) INHERITS (pl); CREATE TABLE cl2 (LIKE pl) INHERITS (pl); CREATE TABLE cl3 (LIKE pl) INHERITS (pl); CREATE TABLE cl4 (LIKE pl) INHERITS (pl); INSERT INTO cl1 (SELECT a, a FROM generate_series(:th0, :th1 - 1) a); INSERT INTO cl2 (SELECT a, a FROM generate_series(:th1, :th2 - 1) a); INSERT INTO cl3 (SELECT a, a FROM generate_series(:th2, :th3 - 1) a); INSERT INTO cl4 (SELECT a, a FROM generate_series(:th3, :th4 - 1) a); CREATE TABLE pll (a int, b int); CREATE TABLE cll1 (LIKE pl) INHERITS (pll); CREATE TABLE cll2 (LIKE pl) INHERITS (pll); CREATE TABLE cll3 (LIKE pl) INHERITS (pll); CREATE TABLE cll4 (LIKE pl) INHERITS (pll); INSERT INTO cll1 (SELECT a, a FROM generate_series(:th0, :th10 - 1) a); INSERT INTO cll2 (SELECT a, a FROM generate_series(:th10, :th20 - 1) a); INSERT INTO cll3 (SELECT a, a FROM generate_series(:th20, :th30 - 1) a); INSERT INTO cll4 (SELECT a, a FROM generate_series(:th30, :th40 - 1) a); CREATE TABLE t0 (LIKE pl); INSERT INTO t0 (SELECT a, a FROM generate_series(0, :th100 - 1) a); DROP SERVER IF EXISTS svl CASCADE; DROP SERVER IF EXISTS sv0 CASCADE; DROP SERVER IF EXISTS sv1 CASCADE; DROP SERVER IF EXISTS sv2 CASCADE; DROP SERVER IF EXISTS sv3 CASCADE; DROP SERVER IF EXISTS sv4 CASCADE; CREATE SERVER svl FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size:'fetchsize'); CREATE SERVER sv0 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size:'fetchsize'); CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size:'fetchsize'); CREATE SERVER sv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size:'fetchsize'); CREATE SERVER sv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size:'fetchsize'); CREATE SERVER sv4 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size:'fetchsize'); CREATE USER MAPPING FOR public SERVER svl; CREATE USER MAPPING FOR public SERVER sv0; CREATE USER MAPPING FOR public SERVER sv1; CREATE USER MAPPING FOR public SERVER sv2; CREATE USER MAPPING FOR public SERVER sv3; CREATE USER MAPPING FOR public SERVER sv4; CREATE FOREIGN TABLE ft0 (a int, b int) SERVER svl OPTIONS (table_name 't0'); CREATE FOREIGN TABLE ft10 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl1'); CREATE FOREIGN TABLE ft20 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl2'); CREATE FOREIGN TABLE ft30 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl3'); CREATE FOREIGN TABLE ft40 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl4'); CREATE FOREIGN TABLE ft11 (a int, b int) SERVER sv1 OPTIONS (table_name 'cl1'); CREATE FOREIGN TABLE ft22 (a int, b int) SERVER sv2 OPTIONS (table_name 'cl2'); CREATE FOREIGN TABLE ft33 (a int, b int) SERVER sv3 OPTIONS (table_name 'cl3'); CREATE FOREIGN TABLE ft44 (a int, b int) SERVER sv4 OPTIONS (table_name 'cl4'); CREATE TABLE pf0 (LIKE pl); ALTER FOREIGN TABLE ft10 INHERIT pf0; ALTER FOREIGN TABLE ft20 INHERIT pf0; ALTER FOREIGN TABLE ft30 INHERIT pf0; ALTER FOREIGN TABLE ft40 INHERIT pf0; CREATE TABLE pf1 (LIKE pl); ALTER FOREIGN TABLE ft11 INHERIT pf1; ALTER FOREIGN TABLE ft22 INHERIT pf1; ALTER FOREIGN TABLE ft33 INHERIT pf1; ALTER FOREIGN TABLE ft44 INHERIT pf1; CREATE FOREIGN TABLE ftp10 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl1'); CREATE FOREIGN TABLE ftp20 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl2'); CREATE FOREIGN TABLE ftp30 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl3'); CREATE FOREIGN TABLE ftp40 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl4'); CREATE FOREIGN TABLE ftp11 (a int, b int) SERVER sv1 OPTIONS (table_name 'cl1'); CREATE FOREIGN TABLE ftp22 (a int, b int) SERVER sv2 OPTIONS (table_name 'cl2'); CREATE FOREIGN TABLE ftp33 (a int, b int) SERVER sv3 OPTIONS (table_name 'cl3'); CREATE FOREIGN TABLE ftp44 (a int, b int) SERVER sv4 OPTIONS (table_name 'cl4'); CREATE TABLE ptf0 (a int, b int) PARTITION BY RANGE (a); ALTER TABLE ptf0 ATTACH PARTITION ftp10 FOR VALUES FROM (:th0) TO (:th1); ALTER TABLE ptf0 ATTACH PARTITION ftp20 FOR VALUES FROM (:th1) TO (:th2); ALTER TABLE ptf0 ATTACH PARTITION ftp30 FOR VALUES FROM (:th2) TO (:th3); ALTER TABLE ptf0 ATTACH PARTITION ftp40 FOR VALUES FROM (:th3) TO (:th4); CREATE TABLE ptf1 (a int, b int) PARTITION BY RANGE (a); ALTER TABLE ptf1 ATTACH PARTITION ftp11 FOR VALUES FROM (:th0) TO (:th1); ALTER TABLE ptf1 ATTACH PARTITION ftp22 FOR VALUES FROM (:th1) TO (:th2); ALTER TABLE ptf1 ATTACH PARTITION ftp33 FOR VALUES FROM (:th2) TO (:th3); ALTER TABLE ptf1 ATTACH PARTITION ftp44 FOR VALUES FROM (:th3) TO (:th4); ANALYZE; #! /bin/bash function do_test() { echo $1 for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement =0; set client_min_messages=log; explain analyze select sum(a) from $1"; done | grep LOG } function do_test_union1() { echo "UNION_pf0" for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement =0; set client_min_messages=log; SELECT sum(a) FROM (SELECT a FROM ft10 UNION ALL SELECT a FROM ft20 UNION ALL SELECT a FROMft30 UNION ALL SELECT a FROM ft40) as pf0"; done | grep LOG } function do_test_union2() { echo "UNION_pf1" for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement =0; set client_min_messages=log; SELECT sum(a) FROM (SELECT a FROM ft11 UNION ALL SELECT a FROM ft22 UNION ALL SELECT a FROMft33 UNION ALL SELECT a FROM ft44) as pf1"; done | grep LOG } function warmup() { for i in $(seq 1 5); do psql postgres -c "set log_min_duration_statement = -1; select sum(a) from $1"; done 1>&2 > /dev/null } #for t in "t0" "pll"; #for t in "ft0" "pf0" "pf1" "ptf0" "ptf1"; #for t in "pf0" "ptf0"; for t in "t0" "pll" "ft0" "pf0" "pf1" "ptf0" "ptf1"; do warmup $t do_test $t done exit for t in "ft0" "pf0" "pf1" "ptf0" "ptf1"; do warmup $t do_test $t done #do_test_union1 #do_test_union2
Hello
I have tested the patch with a partition table with several foreign
partitions living on seperate data nodes. The initial testing was done
with a partition table having 3 foreign partitions, test was done with
variety of scale facters. The seonnd test was with fixed data per data
node but number of data nodes were increased incrementally to see
the peformance impact as more nodes are added to the cluster. The
test three is similar to the initial test but with much huge data and
4 nodes.
The results are summary is given below and test script attached:
Test ENV
Parent node:2Core 8G
Child Nodes:2Core 4G
Test one:
1.1 The partition struct as below:
[ ptf:(a int, b int, c varchar)]
(Parent node)
| | |
[ptf1] [ptf2] [ptf3]
(Node1) (Node2) (Node3)
The table data is partitioned across nodes, the test is done using a
simple select query and a count aggregate as shown below. The result
is an average of executing each query multiple times to ensure reliable
and consistent results.
①select * from ptf where b = 100;
②select count(*) from ptf;
1.2. Test Results
For ① result:
scalepernode master patched performance
2G 7s 2s 350%
5G 173s 63s 275%
10G 462s 156s 296%
20G 968s 327s 296%
30G 1472s 494s 297%
For ② result:
scalepernode master patched performance
2G 1079s 291s 370%
5G 2688s 741s 362%
10G 4473s 1493s 299%
It takes too long time to test a aggregate so the test was done with a
smaller data size.
1.3. summary
With the table partitioned over 3 nodes, the average performance gain
across variety of scale factors is almost 300%
Test Two
2.1 The partition struct as below:
[ ptf:(a int, b int, c varchar)]
(Parent node)
| | |
[ptf1] ... [ptfN]
(Node1) (...) (NodeN)
①select * from ptf
②select * from ptf where b = 100;
This test is done with same size of data per node but table is partitioned
across N number of nodes. Each varation (master or patches) is tested
at-least 3 times to get reliable and consistent results. The purpose of the
test is to see impact on performance as number of data nodes are increased.
2.2 The results
For ① result(scalepernode=2G):
nodenumber master patched performance
2 432s 180s 240%
3 636s 223s 285%
4 830s 283s 293%
5 1065s 361s 295%
For ② result(scalepernode=10G):
nodenumber master patched performance
2 281s 140s 201%
3 421s 140s 300%
4 562s 141s 398%
5 702s 141s 497%
6 833s 139s 599%
7 986s 141s 699%
8 1125s 140s 803%
Test Three
This test is similar to the [test one] but with much huge data and
4 nodes.
For ① result:
scalepernode master patched performance
100G 6592s 1649s 399%
For ② result:
scalepernode master patched performance
100G 35383 12363 286%
The result show it work well in much huge data.
Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but the parent can only
sequennly process the data from data nodes.
Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.
Highgo Software (Canada/China/Pakistan)
URL : www.highgo.ca
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
URL : www.highgo.ca
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
Вложения
Hi Hackers,
Sharing the email below from Movead Li, I believe he wanted to share the benchmarking results as a response to this email thread but it started a new thread.. Here it is...
"
Hello
I have tested the patch with a partition table with several foreign
partitions living on seperate data nodes. The initial testing was done
with a partition table having 3 foreign partitions, test was done with
variety of scale facters. The seonnd test was with fixed data per data
node but number of data nodes were increased incrementally to see
the peformance impact as more nodes are added to the cluster. The
test three is similar to the initial test but with much huge data and
4 nodes.
The results are summary is given below and test script attached:
Test ENV
Parent node:2Core 8G
Child Nodes:2Core 4G
Test one:
1.1 The partition struct as below:
[ ptf:(a int, b int, c varchar)]
(Parent node)
| | |
[ptf1] [ptf2] [ptf3]
(Node1) (Node2) (Node3)
The table data is partitioned across nodes, the test is done using a
simple select query and a count aggregate as shown below. The result
is an average of executing each query multiple times to ensure reliable
and consistent results.
①select * from ptf where b = 100;
②select count(*) from ptf;
1.2. Test Results
For ① result:
scalepernode master patched performance
2G 7s 2s 350%
5G 173s 63s 275%
10G 462s 156s 296%
20G 968s 327s 296%
30G 1472s 494s 297%
For ② result:
scalepernode master patched performance
2G 1079s 291s 370%
5G 2688s 741s 362%
10G 4473s 1493s 299%
It takes too long time to test a aggregate so the test was done with a
smaller data size.
1.3. summary
With the table partitioned over 3 nodes, the average performance gain
across variety of scale factors is almost 300%
Test Two
2.1 The partition struct as below:
[ ptf:(a int, b int, c varchar)]
(Parent node)
| | |
[ptf1] ... [ptfN]
(Node1) (...) (NodeN)
①select * from ptf
②select * from ptf where b = 100;
This test is done with same size of data per node but table is partitioned
across N number of nodes. Each varation (master or patches) is tested
at-least 3 times to get reliable and consistent results. The purpose of the
test is to see impact on performance as number of data nodes are increased.
2.2 The results
For ① result(scalepernode=2G):
nodenumber master patched performance
2 432s 180s 240%
3 636s 223s 285%
4 830s 283s 293%
5 1065s 361s 295%
For ② result(scalepernode=10G):
nodenumber master patched performance
2 281s 140s 201%
3 421s 140s 300%
4 562s 141s 398%
5 702s 141s 497%
6 833s 139s 599%
7 986s 141s 699%
8 1125s 140s 803%
Test Three
This test is similar to the [test one] but with much huge data and
4 nodes.
For ① result:
scalepernode master patched performance
100G 6592s 1649s 399%
For ② result:
scalepernode master patched performance
100G 35383 12363 286%
The result show it work well in much huge data.
Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but the parent can only
sequennly process the data from data nodes.
Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.
"
On Thu, Dec 12, 2019 at 5:41 PM Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote:
Hello.
I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.
At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <bruce@momjian.us> wrote in
> Certainly any overhead on normal queries would be unacceptable.
I took performance numbers on the current shape of the async execution
patch for the following scan cases.
t0 : single local table (parallel disabled)
pll : local partitioning (local Append, parallel disabled)
ft0 : single foreign table
pf0 : inheritance on 4 foreign tables, single connection
pf1 : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connections
The benchmarking system is configured as the follows on a single
machine.
[ benchmark client ]
| |
(localhost:5433) (localhost:5432)
| |
+----+ | +------+ |
| V V V | V
| [master server] | [async server]
| V | V
+--fdw--+ +--fdw--+
The patch works roughly in the following steps.
1. Planner decides how many children out of an append can run
asynchrnously (called as async-capable.).
2. While ExecInit if an Append doesn't have an async-capable children,
ExecAppend that is exactly the same function is set as
ExecProcNode. Otherwise ExecAppendAsync is used.
If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.
3. postgresql_fdw always runs async-capable code path.
If the postgres_fdw part causes degradation, ft0 reflects that.
The tables has two integers and the query does sum(a) on all tuples.
With the default fetch_size = 100, number is run time in ms. Each
number is the average of 14 runs.
master patched gain
t0 7325 7130 +2.7%
pll 4558 4484 +1.7%
ft0 3670 3675 -0.1%
pf0 2322 1550 +33.3%
pf1 2367 1475 +37.7%
ptf0 2517 1624 +35.5%
ptf1 2343 1497 +36.2%
With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.
master patched gain
t0 7212 7252 -0.6%
pll 4546 4397 +3.3%
ft0 3712 3731 -0.5%
pf0 2131 1570 +26.4%
pf1 1926 1189 +38.3%
ptf0 2001 1557 +22.2%
ptf1 1903 1193 +37.4%
FWIW, attached are the test script.
gentblr2.sql: Table creation script.
testrun.sh : Benchmarking script.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
Вложения
On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote: > Summary > The patch is pretty good, it works well when there were little data back to > the parent node. The patch doesn’t provide parallel FDW scan, it ensures > that child nodes can send data to parent in parallel but the parent can only > sequennly process the data from data nodes. > > Providing there is no performance degrdation for non FDW append queries, > I would recomend to consider this patch as an interim soluton while we are > waiting for parallel FDW scan. Wow, these are very impressive results! -- Bruce Momjian <bruce@momjian.us> http://momjian.us EnterpriseDB http://enterprisedb.com + As you are, so once was I. As I am, so you will be. + + Ancient Roman grave inscription +
Thank you very much for the testing of the patch, Ahsan! At Wed, 15 Jan 2020 15:41:04 -0500, Bruce Momjian <bruce@momjian.us> wrote in > On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote: > > Summary > > The patch is pretty good, it works well when there were little data back to > > the parent node. The patch doesn’t provide parallel FDW scan, it ensures > > that child nodes can send data to parent in parallel but the parent can only > > sequennly process the data from data nodes. "Parallel scan" at the moment means multiple workers fetch unique blocks from *one* table in an arbitrated manner. In this sense "parallel FDW scan" means multiple local workers fetch unique bundles of tuples from *one* foreign table, which means it is running on a single session. That doesn't offer an advantage. If parallel query processing worked in worker-per-table mode, especially on partitioned tables, maybe the current FDW would work without much of modification. But I believe asynchronous append on foreign tables on a single process is far resource-effective and moderately faster than parallel append. > > Providing there is no performance degrdation for non FDW append queries, > > I would recomend to consider this patch as an interim soluton while we are > > waiting for parallel FDW scan. > > Wow, these are very impressive results! Thanks. -- Kyotaro Horiguchi NTT Open Source Software Center
On Thu, Jan 16, 2020 at 9:41 AM Bruce Momjian <bruce@momjian.us> wrote: > On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote: > > Summary > > The patch is pretty good, it works well when there were little data back to > > the parent node. The patch doesn’t provide parallel FDW scan, it ensures > > that child nodes can send data to parent in parallel but the parent can only > > sequennly process the data from data nodes. > > > > Providing there is no performance degrdation for non FDW append queries, > > I would recomend to consider this patch as an interim soluton while we are > > waiting for parallel FDW scan. > > Wow, these are very impressive results! +1 Thanks Ahsan and Movead. Could you please confirm which patch set you tested?
Hello Kyotaro,
>"Parallel scan" at the moment means multiple workers fetch unique
>blocks from *one* table in an arbitrated manner. In this sense
>"parallel FDW scan" means multiple local workers fetch unique bundles
>of tuples from *one* foreign table, which means it is running on a
>single session. That doesn't offer an advantage.
It maybe not "parallel FDW scan", it can be "parallel shards scan"
the local workers will pick every foreign partition to scan. I have ever
draw a picture about that you can see it in the link below.
I think the "parallel shards scan" make sence in this way.
>If parallel query processing worked in worker-per-table mode,
>especially on partitioned tables, maybe the current FDW would work
>without much of modification. But I believe asynchronous append on
>foreign tables on a single process is far resource-effective and
>moderately faster than parallel append.
As the test result, current patch can not gain more performance when
it returns a huge number of tuples. By "parallel shards scan" method,
it can work well, because the 'parallel' can take full use of CPUs while
'asynchronous' can't.
Highgo Software (Canada/China/Pakistan)
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
Thanks! At Wed, 29 Jan 2020 14:41:07 +0800, Movead Li <movead.li@highgo.ca> wrote in > >"Parallel scan" at the moment means multiple workers fetch unique > >blocks from *one* table in an arbitrated manner. In this sense > >"parallel FDW scan" means multiple local workers fetch unique bundles > >of tuples from *one* foreign table, which means it is running on a > >single session. That doesn't offer an advantage. > > It maybe not "parallel FDW scan", it can be "parallel shards scan" > the local workers will pick every foreign partition to scan. I have ever > draw a picture about that you can see it in the link below. > > https://www.highgo.ca/2019/08/22/parallel-foreign-scan-of-postgresql/ > > I think the "parallel shards scan" make sence in this way. It is "asynchronous append on async-capable'd postgres-fdw scans". It could be called as such in the sense that it is intended to be used with sharding. > >If parallel query processing worked in worker-per-table mode, > >especially on partitioned tables, maybe the current FDW would work > >without much of modification. But I believe asynchronous append on > >foreign tables on a single process is far resource-effective and > >moderately faster than parallel append. > > As the test result, current patch can not gain more performance when > it returns a huge number of tuples. By "parallel shards scan" method, > it can work well, because the 'parallel' can take full use of CPUs while > 'asynchronous' can't. Did you looked at my benchmarking result upthread? Even it gives significant gain even when gathering large number of tuples from multiple servers or even from a single server. It is because of its asynchronous nature. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
Hello,
>It is "asynchronous append on async-capable'd postgres-fdw scans". It>could be called as such in the sense that it is intended to be used>with sharding.Yes that's it.>Did you looked at my benchmarking result upthread? Even it gives>significant gain even when gathering large number of tuples from>multiple servers or even from a single server. It is because of its>asynchronous nature.I mean it gain performance at first, but it mets bottleneck whileincrease the number of the nodes.For example:It has 2 nodes, it will gain 200% performance.It has 3 nodes, it will gain 300% performance.However,It has 4 nodes, it gain 300% performance.It has 5 nodes, it gain 300% performance....----Highgo Software (Canada/China/Pakistan)
URL : www.highgo.ca
EMAIL: mailto:movead(dot)li(at)highgo(dot)ca
On Sun, Dec 1, 2019 at 4:26 AM Bruce Momjian <bruce@momjian.us> wrote: > On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote: > > On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <bruce@momjian.us> wrote: > > > On Wed, Sep 4, 2019 at 06:18:31PM +1200, Thomas Munro wrote: > > > > A few years back[1] I experimented with a simple readiness API that > > > > would allow Append to start emitting tuples from whichever Foreign > > > > Scan has data available, when working with FDW-based sharding. I used > > > > that primarily as a way to test Andres's new WaitEventSet stuff and my > > > > kqueue implementation of that, but I didn't pursue it seriously > > > > because I knew we wanted a more ambitious async executor rewrite and > > > > many people had ideas about that, with schedulers capable of jumping > > > > all over the tree etc. > > > > > > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and > > > > asked why we don't just do this naive thing until we have something > > > > better. It's a very localised feature that works only between Append > > > > and its immediate children. The patch makes it work for postgres_fdw, > > > > but it should work for any FDW that can get its hands on a socket. > > > > > > > > Here's a quick rebase of that old POC patch, along with a demo. Since > > > > 2016, Parallel Append landed, but I didn't have time to think about > > > > how to integrate with that so I did a quick "sledgehammer" rebase that > > > > disables itself if parallelism is in the picture. > > > > > > Yes, sharding has been waiting on parallel FDW scans. Would this work > > > for parallel partition scans if the partitions were FDWs? > > > > Yeah, this works for partitions that are FDWs (as shown), but only for > > Append, not for Parallel Append. So you'd have parallelism in the > > sense that your N remote shard servers are all doing stuff at the same > > time, but it couldn't be in a parallel query on your 'home' server, > > which is probably good for things that push down aggregation and bring > > back just a few tuples from each shard, but bad for anything wanting > > to ship back millions of tuples to chew on locally. Do you think > > that'd be useful enough on its own? > > Yes, I think so. There are many data warehouse queries that want to > return only aggregate values, or filter for a small number of rows. > Even OLTP queries might return only a few rows from multiple partitions. > This would allow for a proof-of-concept implementation so we can see how > realistic this approach is. +1 Best regards, Etsuro Fujita
On Thu, Dec 5, 2019 at 1:46 PM Thomas Munro <thomas.munro@gmail.com> wrote: > On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi > <horikyota.ntt@gmail.com> wrote: > > There's my pending (somewhat stale) patch, which allows to run local > > scans while waiting for remote servers. > > > > https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@lab.ntt.co.jp I think it’s great to execute local scans while waiting for the results of remote scans, but looking at your patch (the 0002 patch of your patch set in [1]), I’m not sure that the 0002 patch does it much efficiently, because it modifies nodeAppend.c so that all the work is done by a single process. Rather than doing so, I’m wondering if it would be better to modify Parallel Append so that some processes execute remote scans and others execute local scans. I’m not sure that we need to have this improvement as well in the first cut of this feature, though. > After rereading some threads to remind myself what happened here... > right, my little patch began life in March 2016[1] when I wanted a > test case to test Andres's work on WaitEventSets, and your patch set > started a couple of months later and is vastly more ambitious[2][3]. > It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF > model. And I totally agree that there are lots of reason to want to > do that (including yielding to other parts of the plan instead of > waiting for I/O, locks and some parallelism primitives enabling new > kinds of parallelism), and I'm hoping to help with some small pieces > of that if I can. > > My patch set (rebased upthread) was extremely primitive, with no new > planner concepts, and added only a very simple new executor node > method: ExecReady(). Append used that to try to ask its children if > they'd like some time to warm up. By default, ExecReady() says "I > don't know what you're talking about, go away", but FDWs can provide > an implementation that says "yes, please call me again when this fd is > ready" or "yes, I am ready, please call ExecProc() now". It doesn't > deal with anything more complicated than that, and in particular it > doesn't work if there are extra planner nodes in between Append and > the foreign scan. (It also doesn't mix particularly well with > parallelism, as mentioned.) > > The reason I reposted this unambitious work is because Stephen keeps > asking me why we don't consider the stupidly simple thing that would > help with simple foreign partition-based queries today, instead of > waiting for someone to redesign the entire executor, because that's > ... really hard. Yeah, I think your patch is much simpler, compared to Horiguchi-san’s patch set, which I think is a good thing, considering this would be rather an interim solution until executor rewrite is done. Here are a few comments that I have for now: * I know your patch is a POC one, but one concern about it (and Horiguchi-san's patch set) is concurrent data fetches by multiple foreign scan nodes using the same connection in the case of postgres_fdw. Here is an example causing an error: create or replace function slow_data_ext(name text, secs float) returns setof t as $$ begin perform pg_sleep(secs); return query select name, generate_series(1, 100)::text as i; end; $$ language plpgsql; create view t11 as select * from slow_data_ext('t11', 1.0); create view t12 as select * from slow_data_ext('t12', 2.0); create view t13 as select * from slow_data_ext('t13', 3.0); create foreign table ft11 (a text, b text) server server1 options (table_name 't11'); create foreign table ft12 (a text, b text) server server2 options (table_name 't12'); create foreign table ft13 (a text, b text) server server3 options (table_name 't13'); create table pt1 (a text, b text) partition by list (a); alter table pt1 attach partition ft11 for values in ('t11'); alter table pt1 attach partition ft12 for values in ('t12'); alter table pt1 attach partition ft13 for values in ('t13'); create view t21 as select * from slow_data_ext('t21', 1.0); create view t22 as select * from slow_data_ext('t22', 2.0); create view t23 as select * from slow_data_ext('t23', 3.0); create foreign table ft21 (a text, b text) server server1 options (table_name 't21'); create foreign table ft22 (a text, b text) server server2 options (table_name 't22'); create foreign table ft23 (a text, b text) server server3 options (table_name 't23'); create table pt2 (a text, b text) partition by list (a); alter table pt2 attach partition ft21 for values in ('t21'); alter table pt2 attach partition ft22 for values in ('t22'); alter table pt2 attach partition ft23 for values in ('t23'); explain verbose select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23'; QUERY PLAN -------------------------------------------------------------------------------------------------------------- Nested Loop (cost=200.00..1303.80 rows=50220 width=128) Output: pt1.a, pt1.b, pt2.a, pt2.b -> Append (cost=100.00..427.65 rows=2790 width=64) -> Foreign Scan on public.ft11 pt1_1 (cost=100.00..137.90 rows=930 width=64) Output: pt1_1.a, pt1_1.b Remote SQL: SELECT a, b FROM public.t11 -> Foreign Scan on public.ft12 pt1_2 (cost=100.00..137.90 rows=930 width=64) Output: pt1_2.a, pt1_2.b Remote SQL: SELECT a, b FROM public.t12 -> Foreign Scan on public.ft13 pt1_3 (cost=100.00..137.90 rows=930 width=64) Output: pt1_3.a, pt1_3.b Remote SQL: SELECT a, b FROM public.t13 -> Materialize (cost=100.00..248.44 rows=18 width=64) Output: pt2.a, pt2.b -> Append (cost=100.00..248.35 rows=18 width=64) -> Foreign Scan on public.ft22 pt2_1 (cost=100.00..124.13 rows=9 width=64) Output: pt2_1.a, pt2_1.b Remote SQL: SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text))) -> Foreign Scan on public.ft23 pt2_2 (cost=100.00..124.13 rows=9 width=64) Output: pt2_2.a, pt2_2.b Remote SQL: SELECT a, b FROM public.t23 WHERE (((a = 't22'::text) OR (a = 't23'::text))) (21 rows) select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23'; ERROR: another command is already in progress CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text))) I think the cause of this error is that an asynchronous data fetch for ft22 is blocked by that for ft12 that is in progress. (Horiguchi-san’s patch set doesn't work for this query either, causing the same error. Though, it looks like he intended to handle cases like this by a queuing system added to postgres_fdw to process such concurrent data fetches.) I think a simple solution for this issue would be to just disable asynchrony optimization for such cases. I think we could do so by tracking foreign scan nodes across the entire final plan tree that use the same connection at plan or execution time. * Another one is “no new planner concepts”. I think it leads to this: @@ -239,9 +242,207 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* For parallel query, this will be overridden later. */ appendstate->choose_next_subplan = choose_next_subplan_locally; + /* + * Initially we consider all subplans to be potentially asynchronous. + */ + appendstate->asyncplans = (PlanState **) palloc(nplans * sizeof(PlanState *)); + appendstate->asyncfds = (int *) palloc0(nplans * sizeof(int)); + appendstate->nasyncplans = nplans; + memcpy(appendstate->asyncplans, appendstate->appendplans, nplans * sizeof(PlanState *)); + appendstate->lastreadyplan = 0; I’m not sure that this would cause performance degradation in the case of an plain Append that involves no FDWs supporting asynchrony optimization, but I think it would be better to determine subplans with that optimization at plan time and save cycles at execution time as done in Horiguchi-san’s patch set. (I’m not sure that we need a new ExecAppend function proposed there, though.) Also, consider this ordered Append example: create table t31 (a int check (a >= 10 and a < 20), b text); create table t32 (a int check (a >= 20 and a < 30), b text); create table t33 (a int check (a >= 30 and a < 40), b text); create foreign table ft31 (a int check (a >= 10 and a < 20), b text) server server1 options (table_name 't31'); create foreign table ft32 (a int check (a >= 20 and a < 30), b text) server server2 options (table_name 't32'); create foreign table ft33 (a int check (a >= 30 and a < 40), b text) server server3 options (table_name 't33'); create table pt3 (a int, b text) partition by range (a); alter table pt3 attach partition ft31 for values from (10) to (20); alter table pt3 attach partition ft32 for values from (20) to (30); alter table pt3 attach partition ft33 for values from (30) to (40); explain verbose select * from pt3 order by a; QUERY PLAN ----------------------------------------------------------------------------------- Append (cost=300.00..487.52 rows=4095 width=36) -> Foreign Scan on public.ft31 pt3_1 (cost=100.00..155.68 rows=1365 width=36) Output: pt3_1.a, pt3_1.b Remote SQL: SELECT a, b FROM public.t31 ORDER BY a ASC NULLS LAST -> Foreign Scan on public.ft32 pt3_2 (cost=100.00..155.68 rows=1365 width=36) Output: pt3_2.a, pt3_2.b Remote SQL: SELECT a, b FROM public.t32 ORDER BY a ASC NULLS LAST -> Foreign Scan on public.ft33 pt3_3 (cost=100.00..155.68 rows=1365 width=36) Output: pt3_3.a, pt3_3.b Remote SQL: SELECT a, b FROM public.t33 ORDER BY a ASC NULLS LAST (10 rows) For this query, we can’t apply asynchrony optimization. To disable it for such cases I think it would be better to do something at plan time as well as done in his patch set. I haven’t finished reviewing your patch, but before doing so, I’ll review Horiguchi-san's patch set in more detail for further comparison. Attached is a rebased version of your patch, in which I added the same changes to the postgres_fdw regression tests as Horiguchi-san so that the tests run successfully. Thank you for working on this, Thomas and Horiguchi-san! Sorry for the delay. Best regards, Etsuro Fujita [1] https://www.postgresql.org/message-id/20200820.163608.1893015081639298019.horikyota.ntt%40gmail.com
Вложения
On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote: > * I know your patch is a POC one, but one concern about it (and > Horiguchi-san's patch set) is concurrent data fetches by multiple > foreign scan nodes using the same connection in the case of > postgres_fdw. Here is an example causing an error: > select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23'; > ERROR: another command is already in progress > CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR > SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text))) > (Horiguchi-san’s patch set doesn't work for this query either, causing > the same error. Though, it looks like he intended to handle cases > like this by a queuing system added to postgres_fdw to process such > concurrent data fetches.) I was wrong here; Horiguchi-san's patch set works well for this query. Maybe I did something wrong when testing his patch set. Sorry for that. Best regards, Etsuro Fujita
Fujita-san, thank you for taking time! At Mon, 31 Aug 2020 19:10:39 +0900, Etsuro Fujita <etsuro.fujita@gmail.com> wrote in > On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote: > > * I know your patch is a POC one, but one concern about it (and > > Horiguchi-san's patch set) is concurrent data fetches by multiple > > foreign scan nodes using the same connection in the case of > > postgres_fdw. Here is an example causing an error: > > > select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23'; > > ERROR: another command is already in progress > > CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR > > SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text))) > > > (Horiguchi-san’s patch set doesn't work for this query either, causing > > the same error. Though, it looks like he intended to handle cases > > like this by a queuing system added to postgres_fdw to process such > > concurrent data fetches.) > > I was wrong here; Horiguchi-san's patch set works well for this query. > Maybe I did something wrong when testing his patch set. Sorry for > that. Yeah. postgresIterateForeignScan calls vacate_connection() to make the underlying connection available if a server connection is busy with another remote query. The mechanism is backed by a waiting queue (add_async_waiter, move_to_next_waiter, remove_async_node). regards. -- Kyotaro Horiguchi NTT Open Source Software Center
On Tue, Sep 1, 2020 at 9:45 AM Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote: > At Mon, 31 Aug 2020 19:10:39 +0900, Etsuro Fujita <etsuro.fujita@gmail.com> wrote in > > On Mon, Aug 31, 2020 at 6:20 PM Etsuro Fujita <etsuro.fujita@gmail.com> wrote: > > > * I know your patch is a POC one, but one concern about it (and > > > Horiguchi-san's patch set) is concurrent data fetches by multiple > > > foreign scan nodes using the same connection in the case of > > > postgres_fdw. Here is an example causing an error: > > > > > select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23'; > > > ERROR: another command is already in progress > > > CONTEXT: remote SQL command: DECLARE c4 CURSOR FOR > > > SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text))) > > > > > (Horiguchi-san’s patch set doesn't work for this query either, causing > > > the same error. Though, it looks like he intended to handle cases > > > like this by a queuing system added to postgres_fdw to process such > > > concurrent data fetches.) > > > > I was wrong here; Horiguchi-san's patch set works well for this query. > > Maybe I did something wrong when testing his patch set. Sorry for > > that. > > Yeah. postgresIterateForeignScan calls vacate_connection() to make the > underlying connection available if a server connection is busy with > another remote query. The mechanism is backed by a waiting queue > (add_async_waiter, move_to_next_waiter, remove_async_node). Thanks for the explanation, Horiguchi-san! So your version of the patch processes the query successfully, because 1) before performing an asynchronous data fetch of ft22, it waits for the in-progress data fetch of ft12 using the same connection to complete so that the data fetch of ft22 can be done, and 2) before performing an asynchronous data fetch of ft23, it waits for the in-progress data fetch of ft13 using the same connection to complete so that the data fetch of ft23 can be done. Right? If so, I think in some cases such handling would impact performance negatively. Consider the same query with LIMIT processed by your version: explain verbose select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23' limit 1; QUERY PLAN -------------------------------------------------------------------------------------------------------------------- Limit (cost=200.00..200.01 rows=1 width=128) Output: pt1.a, pt1.b, pt2.a, pt2.b -> Nested Loop (cost=200.00..903.87 rows=50220 width=128) Output: pt1.a, pt1.b, pt2.a, pt2.b -> Append (cost=100.00..151.85 rows=2790 width=64) Async subplans: 3 -> Async Foreign Scan on public.ft11 pt1_1 (cost=100.00..137.90 rows=930 width=64) Output: pt1_1.a, pt1_1.b Remote SQL: SELECT a, b FROM public.t11 -> Async Foreign Scan on public.ft12 pt1_2 (cost=100.00..137.90 rows=930 width=64) Output: pt1_2.a, pt1_2.b Remote SQL: SELECT a, b FROM public.t12 -> Async Foreign Scan on public.ft13 pt1_3 (cost=100.00..137.90 rows=930 width=64) Output: pt1_3.a, pt1_3.b Remote SQL: SELECT a, b FROM public.t13 -> Materialize (cost=100.00..124.31 rows=18 width=64) Output: pt2.a, pt2.b -> Append (cost=100.00..124.22 rows=18 width=64) Async subplans: 2 -> Async Foreign Scan on public.ft22 pt2_1 (cost=100.00..124.13 rows=9 width=64) Output: pt2_1.a, pt2_1.b Remote SQL: SELECT a, b FROM public.t22 WHERE (((a = 't22'::text) OR (a = 't23'::text))) -> Async Foreign Scan on public.ft23 pt2_2 (cost=100.00..124.13 rows=9 width=64) Output: pt2_2.a, pt2_2.b Remote SQL: SELECT a, b FROM public.t23 WHERE (((a = 't22'::text) OR (a = 't23'::text))) (25 rows) I think your version would require extra time to process this query compared to HEAD due to such handling. This query throws an error with your version, though: select * from pt1, pt2 where pt2.a = 't22' or pt2.a = 't23' limit 1; ERROR: another command is already in progress CONTEXT: remote SQL command: CLOSE c1 Best regards, Etsuro Fujita