Re: [HACKERS] asynchronous execution
| От | Kyotaro HORIGUCHI |
|---|---|
| Тема | Re: [HACKERS] asynchronous execution |
| Дата | |
| Msg-id | 20180111.170839.23674040.horiguchi.kyotaro@lab.ntt.co.jp обсуждение исходный текст |
| Ответ на | Re: [HACKERS] asynchronous execution (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>) |
| Ответы |
Re: [HACKERS] asynchronous execution
|
| Список | pgsql-hackers |
At Mon, 11 Dec 2017 20:07:53 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20171211.200753.191768178.horiguchi.kyotaro@lab.ntt.co.jp>
> > The attached PoC patch theoretically has no impact on the normal
> > code paths and just brings gain in async cases.
>
> The parallel append just committed hit this and the attached are
> the rebased version to the current HEAD. The result of a concise
> performance test follows.
>
> patched(ms) unpatched(ms) gain(%)
> A: simple table scan : 3562.32 3444.81 -3.4
> B: local partitioning : 1451.25 1604.38 9.5
> C: single remote table : 8818.92 9297.76 5.1
> D: sharding (single con) : 5966.14 6646.73 10.2
> E: sharding (multi con) : 1802.25 6515.49 72.3
>
> > A and B are degradation checks, which are expected to show no
> > degradation. C is the gain only by postgres_fdw's command
> > presending on a remote table. D is the gain of sharding on a
> > connection. The number of partitions/shards is 4. E is the gain
> > using dedicate connection per shard.
>
> Test A is accelerated by parallel sequential scan. Introducing
> parallel append accelerates test B. Comparing A and B, I doubt
> that degradation is stably measurable at least my environment but
> I believe that there's no degradation theoreticaly. The test C to
> E still shows apparent gain.
> regards,
The patch conflicts with 3cac0ec. This is the rebased version.
--
Kyotaro Horiguchi
NTT Open Source Software Center
From be22b33b90abec93a2a609a1db4955e6910b2da0 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH 1/3] Allow wait event set to be registered to resource owner
WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
src/backend/libpq/pqcomm.c | 2 +-
src/backend/storage/ipc/latch.c | 18 ++++++-
src/backend/storage/lmgr/condition_variable.c | 2 +-
src/backend/utils/resowner/resowner.c | 68 +++++++++++++++++++++++++++
src/include/storage/latch.h | 4 +-
src/include/utils/resowner_private.h | 8 ++++
6 files changed, 97 insertions(+), 5 deletions(-)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..890972b 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -220,7 +220,7 @@ pq_init(void)
(errmsg("could not set socket to nonblocking mode: %m")));
#endif
- FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+ FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..5457899 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -51,6 +51,7 @@
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
+#include "utils/resowner_private.h"
/*
* Select the fd readiness primitive to use. Normally the "most modern"
@@ -77,6 +78,8 @@ struct WaitEventSet
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ ResourceOwner resowner; /* Resource owner */
+
/*
* Array, of nevents_space length, storing the definition of events this
* set is waiting for.
@@ -359,7 +362,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
int ret = 0;
int rc;
WaitEvent event;
- WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+ WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
if (wakeEvents & WL_TIMEOUT)
Assert(timeout >= 0);
@@ -517,12 +520,15 @@ ResetLatch(volatile Latch *latch)
* WaitEventSetWait().
*/
WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
{
WaitEventSet *set;
char *data;
Size sz = 0;
+ if (res)
+ ResourceOwnerEnlargeWESs(res);
+
/*
* Use MAXALIGN size/alignment to guarantee that later uses of memory are
* aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -591,6 +597,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
#endif
+ /* Register this wait event set if requested */
+ set->resowner = res;
+ if (res)
+ ResourceOwnerRememberWES(set->resowner, set);
+
return set;
}
@@ -632,6 +643,9 @@ FreeWaitEventSet(WaitEventSet *set)
}
#endif
+ if (set->resowner != NULL)
+ ResourceOwnerForgetWES(set->resowner, set);
+
pfree(set);
}
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index ef1d5ba..30edc8e 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -69,7 +69,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
{
WaitEventSet *new_event_set;
- new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
+ new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2);
AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(new_event_set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index e09a4f1..7ae8777 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -124,6 +124,7 @@ typedef struct ResourceOwnerData
ResourceArray snapshotarr; /* snapshot references */
ResourceArray filearr; /* open temporary files */
ResourceArray dsmarr; /* dynamic shmem segments */
+ ResourceArray wesarr; /* wait event sets */
/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
int nlocks; /* number of owned locks */
@@ -169,6 +170,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
static void PrintSnapshotLeakWarning(Snapshot snapshot);
static void PrintFileLeakWarning(File file);
static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
/*****************************************************************************
@@ -437,6 +439,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
ResourceArrayInit(&(owner->snapshotarr), PointerGetDatum(NULL));
ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
+ ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
return owner;
}
@@ -538,6 +541,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
PrintDSMLeakWarning(res);
dsm_detach(res);
}
+
+ /* Ditto for wait event sets */
+ while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+ {
+ WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+ if (isCommit)
+ PrintWESLeakWarning(event);
+ FreeWaitEventSet(event);
+ }
}
else if (phase == RESOURCE_RELEASE_LOCKS)
{
@@ -685,6 +698,7 @@ ResourceOwnerDelete(ResourceOwner owner)
Assert(owner->snapshotarr.nitems == 0);
Assert(owner->filearr.nitems == 0);
Assert(owner->dsmarr.nitems == 0);
+ Assert(owner->wesarr.nitems == 0);
Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
/*
@@ -711,6 +725,7 @@ ResourceOwnerDelete(ResourceOwner owner)
ResourceArrayFree(&(owner->snapshotarr));
ResourceArrayFree(&(owner->filearr));
ResourceArrayFree(&(owner->dsmarr));
+ ResourceArrayFree(&(owner->wesarr));
pfree(owner);
}
@@ -1253,3 +1268,56 @@ PrintDSMLeakWarning(dsm_segment *seg)
elog(WARNING, "dynamic shared memory leak: segment %u still referenced",
dsm_segment_handle(seg));
}
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+ ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+ ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+ elog(ERROR, "wait event set %p is not owned by resource owner %s",
+ events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ elog(WARNING, "wait event set leak: %p still referenced",
+ events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..838845a 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
#define LATCH_H
#include <signal.h>
+#include "utils/resowner.h"
/*
* Latch structure should be treated as opaque and only accessed through
@@ -162,7 +163,8 @@ extern void DisownLatch(volatile Latch *latch);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+ ResourceOwner res, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index 22b377c..56f2059 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
#include "storage/dsm.h"
#include "storage/fd.h"
+#include "storage/latch.h"
#include "storage/lock.h"
#include "utils/catcache.h"
#include "utils/plancache.h"
@@ -88,4 +89,11 @@ extern void ResourceOwnerRememberDSM(ResourceOwner owner,
extern void ResourceOwnerForgetDSM(ResourceOwner owner,
dsm_segment *);
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+ WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+ WaitEventSet *);
+
#endif /* RESOWNER_PRIVATE_H */
--
2.9.2
From 885f62d89a93edbda44330c3ecc3a7ac08e302ea Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:23:51 +0900
Subject: [PATCH 2/3] core side modification
---
src/backend/executor/Makefile | 2 +-
src/backend/executor/execAsync.c | 110 ++++++++++++++
src/backend/executor/nodeAppend.c | 247 +++++++++++++++++++++++++++-----
src/backend/executor/nodeForeignscan.c | 22 ++-
src/backend/optimizer/plan/createplan.c | 62 +++++++-
src/backend/postmaster/pgstat.c | 3 +
src/include/executor/execAsync.h | 23 +++
src/include/executor/executor.h | 1 +
src/include/executor/nodeForeignscan.h | 3 +
src/include/foreign/fdwapi.h | 11 ++
src/include/nodes/execnodes.h | 18 ++-
src/include/nodes/plannodes.h | 2 +
src/include/pgstat.h | 3 +-
13 files changed, 462 insertions(+), 45 deletions(-)
create mode 100644 src/backend/executor/execAsync.c
create mode 100644 src/include/executor/execAsync.h
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895..8ad2adf 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/executor
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execExpr.o execExprInterp.o \
execGrouping.o execIndexing.o execJunk.o \
execMain.o execParallel.o execPartition.o execProcnode.o \
execReplication.o execScan.o execSRF.o execTuples.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000..f7daed7
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,110 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+ pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+ void *data, bool reinit)
+{
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+ wes, data, reinit);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(node));
+ }
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+ static int *refind = NULL;
+ static int refindsize = 0;
+ WaitEventSet *wes;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred = 0;
+ Bitmapset *fired_events = NULL;
+ int i;
+ int n;
+
+ n = bms_num_members(waitnodes);
+ wes = CreateWaitEventSet(TopTransactionContext,
+ TopTransactionResourceOwner, n);
+ if (refindsize < n)
+ {
+ if (refindsize == 0)
+ refindsize = EVENT_BUFFER_SIZE; /* XXX */
+ while (refindsize < n)
+ refindsize *= 2;
+ if (refind)
+ refind = (int *) repalloc(refind, refindsize * sizeof(int));
+ else
+ refind = (int *) palloc(refindsize * sizeof(int));
+ }
+
+ n = 0;
+ for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+ i = bms_next_member(waitnodes, i))
+ {
+ refind[i] = i;
+ if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+ n++;
+ }
+
+ if (n == 0)
+ {
+ FreeWaitEventSet(wes);
+ return NULL;
+ }
+
+ noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+ EVENT_BUFFER_SIZE,
+ WAIT_EVENT_ASYNC_WAIT);
+ FreeWaitEventSet(wes);
+ if (noccurred == 0)
+ return NULL;
+
+ for (i = 0 ; i < noccurred ; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+ {
+ int n = *(int*)w->user_data;
+
+ fired_events = bms_add_member(fired_events, n);
+ }
+ }
+
+ return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 64a17fb..644af5b 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -59,6 +59,7 @@
#include "executor/execdebug.h"
#include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
#include "miscadmin.h"
/* Shared state for parallel-aware Append. */
@@ -79,6 +80,7 @@ struct ParallelAppendState
#define INVALID_SUBPLAN_INDEX -1
static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
@@ -104,7 +106,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
ListCell *lc;
/* check for unsupported flags */
- Assert(!(eflags & EXEC_FLAG_MARK));
+ Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
/*
* Lock the non-leaf tables in the partition tree controlled by this node.
@@ -127,6 +129,19 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->ps.ExecProcNode = ExecAppend;
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ appendstate->as_nasyncplans = node->nasyncplans;
+ appendstate->as_syncdone = (node->nasyncplans == nplans);
+ appendstate->as_asyncresult = (TupleTableSlot **)
+ palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+ /* Choose async version of Exec function */
+ if (appendstate->as_nasyncplans > 0)
+ appendstate->ps.ExecProcNode = ExecAppendAsync;
+
+ /* initially, all async requests need a request */
+ for (i = 0; i < appendstate->as_nasyncplans; ++i)
+ appendstate->as_needrequest =
+ bms_add_member(appendstate->as_needrequest, i);
/*
* Miscellaneous initialization
@@ -149,27 +164,48 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
foreach(lc, node->appendplans)
{
Plan *initNode = (Plan *) lfirst(lc);
+ int sub_eflags = eflags;
+
+ if (i < appendstate->as_nasyncplans)
+ sub_eflags |= EXEC_FLAG_ASYNC;
- appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
+ appendplanstates[i] = ExecInitNode(initNode, estate, sub_eflags);
i++;
}
+ /* if there's any async-capable subnode, use async-aware routine */
+ if (appendstate->as_nasyncplans)
+ appendstate->ps.ExecProcNode = ExecAppendAsync;
+
/*
* initialize output tuple type
*/
ExecAssignResultTypeFromTL(&appendstate->ps);
appendstate->ps.ps_ProjInfo = NULL;
- /*
- * Parallel-aware append plans must choose the first subplan to execute by
- * looking at shared memory, but non-parallel-aware append plans can
- * always start with the first subplan.
- */
- appendstate->as_whichplan =
- appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
+ if (appendstate->ps.plan->parallel_aware)
+ {
+ /*
+ * Parallel-aware append plans must choose the first subplan to
+ * execute by looking at shared memory, but non-parallel-aware append
+ * plans can always start with the first subplan.
+ */
- /* If parallel-aware, this will be overridden later. */
- appendstate->choose_next_subplan = choose_next_subplan_locally;
+ appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
+
+ /* If parallel-aware, this will be overridden later. */
+ appendstate->choose_next_subplan = choose_next_subplan_locally;
+ }
+ else
+ {
+ appendstate->as_whichsyncplan = 0;
+
+ /*
+ * initialize to scan first synchronous subplan
+ */
+ appendstate->as_whichsyncplan = appendstate->as_nasyncplans;
+ appendstate->choose_next_subplan = choose_next_subplan_locally;
+ }
return appendstate;
}
@@ -186,10 +222,12 @@ ExecAppend(PlanState *pstate)
AppendState *node = castNode(AppendState, pstate);
/* If no subplan has been chosen, we must choose one before proceeding. */
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+ if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
!node->choose_next_subplan(node))
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ Assert(node->as_nasyncplans == 0);
+
for (;;)
{
PlanState *subnode;
@@ -200,8 +238,9 @@ ExecAppend(PlanState *pstate)
/*
* figure out which subplan we are currently processing
*/
- Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
- subnode = node->appendplans[node->as_whichplan];
+ Assert(node->as_whichsyncplan >= 0 &&
+ node->as_whichsyncplan < node->as_nplans);
+ subnode = node->appendplans[node->as_whichsyncplan];
/*
* get a tuple from the subplan
@@ -224,6 +263,137 @@ ExecAppend(PlanState *pstate)
}
}
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+ AppendState *node = castNode(AppendState, pstate);
+ Bitmapset *needrequest;
+ int i;
+
+ Assert(node->as_nasyncplans > 0);
+
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ while ((i = bms_first_member(needrequest)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ }
+ else
+ node->as_pending_async = bms_add_member(node->as_pending_async, i);
+ }
+ bms_free(needrequest);
+
+ for (;;)
+ {
+ TupleTableSlot *result;
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ while (!bms_is_empty(node->as_pending_async))
+ {
+ long timeout = node->as_syncdone ? -1 : 0;
+ Bitmapset *fired;
+ int i;
+
+ fired = ExecAsyncEventWait(node->appendplans, node->as_pending_async,
+ timeout);
+ while ((i = bms_first_member(fired)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest =
+ bms_add_member(node->as_needrequest, i);
+ }
+ node->as_pending_async =
+ bms_del_member(node->as_pending_async, i);
+ }
+ }
+ bms_free(fired);
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If there is no asynchronous activity still pending and the
+ * synchronous activity is also complete, we're totally done scanning
+ * this node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the synchronous children.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(bms_is_empty(node->as_pending_async));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
+ /*
+ * get a tuple from the subplan
+ */
+ result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+ if (!TupIsNull(result))
+ {
+ /*
+ * If the subplan gave us something then return it as-is. We do
+ * NOT make use of the result slot that was set up in
+ * ExecInitAppend; there's no need for it.
+ */
+ return result;
+ }
+
+ /*
+ * Go on to the "next" subplan in the appropriate direction. If no
+ * more subplans, return the empty slot set up for us by
+ * ExecInitAppend, unless there are async plans we have yet to finish.
+ */
+ if (!node->choose_next_subplan(node))
+ {
+ node->as_syncdone = true;
+ if (bms_is_empty(node->as_pending_async))
+ {
+ Assert(bms_is_empty(node->as_needrequest));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ }
+
+ /* Else loop back and try to get a tuple from the new subplan */
+ }
+}
+
/* ----------------------------------------------------------------
* ExecEndAppend
*
@@ -257,6 +427,15 @@ ExecReScanAppend(AppendState *node)
{
int i;
+ /* Reset async state. */
+ for (i = 0; i < node->as_nasyncplans; ++i)
+ {
+ ExecShutdownNode(node->appendplans[i]);
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ node->as_nasyncresult = 0;
+ node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
for (i = 0; i < node->as_nplans; i++)
{
PlanState *subnode = node->appendplans[i];
@@ -276,7 +455,7 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
- node->as_whichplan =
+ node->as_whichsyncplan =
node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
}
@@ -365,7 +544,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
static bool
choose_next_subplan_locally(AppendState *node)
{
- int whichplan = node->as_whichplan;
+ int whichplan = node->as_whichsyncplan;
/* We should never see INVALID_SUBPLAN_INDEX in this case. */
Assert(whichplan >= 0 && whichplan <= node->as_nplans);
@@ -374,13 +553,13 @@ choose_next_subplan_locally(AppendState *node)
{
if (whichplan >= node->as_nplans - 1)
return false;
- node->as_whichplan++;
+ node->as_whichsyncplan++;
}
else
{
if (whichplan <= 0)
return false;
- node->as_whichplan--;
+ node->as_whichsyncplan--;
}
return true;
@@ -405,33 +584,33 @@ choose_next_subplan_for_leader(AppendState *node)
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
{
/* Mark just-completed subplan as finished. */
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
}
else
{
/* Start with last subplan. */
- node->as_whichplan = node->as_nplans - 1;
+ node->as_whichsyncplan = node->as_nplans - 1;
}
/* Loop until we find a subplan to execute. */
- while (pstate->pa_finished[node->as_whichplan])
+ while (pstate->pa_finished[node->as_whichsyncplan])
{
- if (node->as_whichplan == 0)
+ if (node->as_whichsyncplan == 0)
{
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
LWLockRelease(&pstate->pa_lock);
return false;
}
- node->as_whichplan--;
+ node->as_whichsyncplan--;
}
/* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < append->first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < append->first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
LWLockRelease(&pstate->pa_lock);
@@ -463,8 +642,8 @@ choose_next_subplan_for_worker(AppendState *node)
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
/* Mark just-completed subplan as finished. */
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
/* If all the plans are already done, we have nothing to do */
if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
@@ -489,10 +668,10 @@ choose_next_subplan_for_worker(AppendState *node)
else
{
/* At last plan, no partial plans, arrange to bail out. */
- pstate->pa_next_plan = node->as_whichplan;
+ pstate->pa_next_plan = node->as_whichsyncplan;
}
- if (pstate->pa_next_plan == node->as_whichplan)
+ if (pstate->pa_next_plan == node->as_whichsyncplan)
{
/* We've tried everything! */
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -502,7 +681,7 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* Pick the plan we found, and advance pa_next_plan one more time. */
- node->as_whichplan = pstate->pa_next_plan++;
+ node->as_whichsyncplan = pstate->pa_next_plan++;
if (pstate->pa_next_plan >= node->as_nplans)
{
if (append->first_partial_plan < node->as_nplans)
@@ -518,8 +697,8 @@ choose_next_subplan_for_worker(AppendState *node)
}
/* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < append->first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < append->first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
LWLockRelease(&pstate->pa_lock);
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 59865f5..9cb5470 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -123,7 +123,6 @@ ExecForeignScan(PlanState *pstate)
(ExecScanRecheckMtd) ForeignRecheck);
}
-
/* ----------------------------------------------------------------
* ExecInitForeignScan
* ----------------------------------------------------------------
@@ -147,6 +146,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+ scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+ if ((eflags & EXEC_FLAG_ASYNC) != 0)
+ scanstate->fs_async = true;
/*
* Miscellaneous initialization
@@ -389,3 +392,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+ void *caller_data, bool reinit)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+ caller_data, reinit);
+}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index e599283..d85cb9c 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -204,7 +204,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam);
static Append *make_append(List *appendplans, int first_partial_plan,
- List *tlist, List *partitioned_rels);
+ int nasyncplans, int referent,
+ List *tlist, List *partitioned_rels);
static RecursiveUnion *make_recursive_union(List *tlist,
Plan *lefttree,
Plan *righttree,
@@ -284,6 +285,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
/*
@@ -1014,8 +1016,12 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
{
Append *plan;
List *tlist = build_path_tlist(root, &best_path->path);
- List *subplans = NIL;
+ List *asyncplans = NIL;
+ List *syncplans = NIL;
ListCell *subpaths;
+ int nasyncplans = 0;
+ bool first = true;
+ bool referent_is_sync = true;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1050,7 +1056,21 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
/* Must insist that all children return the same tlist */
subplan = create_plan_recurse(root, subpath, CP_EXACT_TLIST);
- subplans = lappend(subplans, subplan);
+ /*
+ * Classify as async-capable or not. If we have decided to run the
+ * chidlren in parallel, we cannot any one of them run asynchronously.
+ */
+ if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+ {
+ asyncplans = lappend(asyncplans, subplan);
+ ++nasyncplans;
+ if (first)
+ referent_is_sync = false;
+ }
+ else
+ syncplans = lappend(syncplans, subplan);
+
+ first = false;
}
/*
@@ -1060,8 +1080,10 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
* parent-rel Vars it'll be asked to emit.
*/
- plan = make_append(subplans, best_path->first_partial_path,
- tlist, best_path->partitioned_rels);
+ plan = make_append(list_concat(asyncplans, syncplans),
+ best_path->first_partial_path, nasyncplans,
+ referent_is_sync ? nasyncplans : 0, tlist,
+ best_path->partitioned_rels);
copy_generic_path_info(&plan->plan, (Path *) best_path);
@@ -5307,8 +5329,8 @@ make_foreignscan(List *qptlist,
}
static Append *
-make_append(List *appendplans, int first_partial_plan,
- List *tlist, List *partitioned_rels)
+make_append(List *appendplans, int first_partial_plan, int nasyncplans,
+ int referent, List *tlist, List *partitioned_rels)
{
Append *node = makeNode(Append);
Plan *plan = &node->plan;
@@ -5320,6 +5342,8 @@ make_append(List *appendplans, int first_partial_plan,
node->partitioned_rels = partitioned_rels;
node->appendplans = appendplans;
node->first_partial_plan = first_partial_plan;
+ node->nasyncplans = nasyncplans;
+ node->referent = referent;
return node;
}
@@ -6656,3 +6680,27 @@ is_projection_capable_plan(Plan *plan)
}
return true;
}
+
+/*
+ * is_projection_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d130114..667878b 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3673,6 +3673,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_SYNC_REP:
event_name = "SyncRep";
break;
+ case WAIT_EVENT_ASYNC_WAIT:
+ event_name = "AsyncExecWait";
+ break;
/* no default case, so that compiler will warn */
}
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000..5fd67d9
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ * Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+ void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+ long timeout);
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 6545a80..60f4e51 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -63,6 +63,7 @@
#define EXEC_FLAG_WITH_OIDS 0x0020 /* force OIDs in returned tuples */
#define EXEC_FLAG_WITHOUT_OIDS 0x0040 /* force no OIDs in returned tuples */
#define EXEC_FLAG_WITH_NO_DATA 0x0080 /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC 0x0100 /* request async execution */
/* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ccb66be..67abf8e 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data, bool reinit);
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e88fee3..beb3f0d 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -161,6 +161,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data,
+ bool reinit);
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -182,6 +187,7 @@ typedef struct FdwRoutine
GetForeignPlan_function GetForeignPlan;
BeginForeignScan_function BeginForeignScan;
IterateForeignScan_function IterateForeignScan;
+ IterateForeignScan_function IterateForeignScanAsync;
ReScanForeignScan_function ReScanForeignScan;
EndForeignScan_function EndForeignScan;
@@ -232,6 +238,11 @@ typedef struct FdwRoutine
InitializeDSMForeignScan_function InitializeDSMForeignScan;
ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
ShutdownForeignScan_function ShutdownForeignScan;
/* Support functions for path reparameterization. */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4bb5cb1..405ad7b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -851,6 +851,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
* abstract superclass for all PlanState-type nodes.
* ----------------
*/
+typedef enum AsyncState
+{
+ AS_AVAILABLE,
+ AS_WAITING
+} AsyncState;
+
typedef struct PlanState
{
NodeTag type;
@@ -891,6 +897,9 @@ typedef struct PlanState
TupleTableSlot *ps_ResultTupleSlot; /* slot for my result tuples */
ExprContext *ps_ExprContext; /* node's expression-evaluation context */
ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */
+
+ AsyncState asyncstate;
+ int32 padding; /* to keep alignment of derived types */
} PlanState;
/* ----------------
@@ -1013,10 +1022,16 @@ struct AppendState
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
- int as_whichplan;
+ int as_nasyncplans; /* # of async-capable children */
ParallelAppendState *as_pstate; /* parallel coordination info */
+ int as_whichsyncplan; /* which sync plan is being executed */
Size pstate_len; /* size of parallel coordination info */
bool (*choose_next_subplan) (AppendState *);
+ bool as_syncdone; /* all synchronous plans done? */
+ Bitmapset *as_needrequest; /* async plans needing a new request */
+ Bitmapset *as_pending_async; /* pending async plans */
+ TupleTableSlot **as_asyncresult; /* unreturned results of async plans */
+ int as_nasyncresult; /* # of valid entries in as_asyncresult */
};
/* ----------------
@@ -1567,6 +1582,7 @@ typedef struct ForeignScanState
Size pscan_len; /* size of parallel coordination information */
/* use struct pointer to avoid including fdwapi.h here */
struct FdwRoutine *fdwroutine;
+ bool fs_async;
void *fdw_state; /* foreign-data wrapper can keep state here */
} ForeignScanState;
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 74e9fb5..b4535f0 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -249,6 +249,8 @@ typedef struct Append
List *partitioned_rels;
List *appendplans;
int first_partial_plan;
+ int nasyncplans; /* # of async plans, always at start of list */
+ int referent; /* index of inheritance tree referent */
} Append;
/* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 3d3c0b6..a1ba26f 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -831,7 +831,8 @@ typedef enum
WAIT_EVENT_REPLICATION_ORIGIN_DROP,
WAIT_EVENT_REPLICATION_SLOT_DROP,
WAIT_EVENT_SAFE_SNAPSHOT,
- WAIT_EVENT_SYNC_REP
+ WAIT_EVENT_SYNC_REP,
+ WAIT_EVENT_ASYNC_WAIT
} WaitEventIPC;
/* ----------
--
2.9.2
From 6612fbe0cab492fedead1d35f1b9cdf24f3e6dd4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH 3/3] async postgres_fdw
---
contrib/postgres_fdw/connection.c | 26 ++
contrib/postgres_fdw/expected/postgres_fdw.out | 128 ++++---
contrib/postgres_fdw/postgres_fdw.c | 484 +++++++++++++++++++++----
contrib/postgres_fdw/postgres_fdw.h | 2 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 20 +-
5 files changed, 522 insertions(+), 138 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 00c926b..4f3d59d 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ void *storage; /* connection specific storage */
} ConnCacheEntry;
/*
@@ -202,6 +203,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->conn, server->servername, user->umid, user->userid);
+ entry->storage = NULL;
}
/*
@@ -216,6 +218,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
}
/*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+ bool found;
+ ConnCacheEntry *entry;
+ ConnCacheKey key;
+
+ key = user->umid;
+ entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+ Assert(found);
+
+ if (entry->storage == NULL)
+ {
+ entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+ memset(entry->storage, 0, initsize);
+ }
+
+ return entry->storage;
+}
+
+/*
* Connect to remote server using specified server and user mapping properties.
*/
static PGconn *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 683d641..3b4eefa 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6514,7 +6514,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+-------
a | aaa
@@ -6542,7 +6542,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -6570,7 +6570,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -6598,7 +6598,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | newtoo
@@ -6664,35 +6664,40 @@ insert into bar2 values(3,33,33);
insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+ Sort Key: bar.f1
+ -> Seq Scan on public.bar
+ Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
-> Foreign Scan on public.bar2
Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+ -> Sort
Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.*, foo.tableoid, foo.f1
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-> Foreign Scan on public.foo2
Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
f1 | f2
----+----
1 | 11
@@ -6702,35 +6707,40 @@ select * from bar where f1 in (select f1 from foo) for update;
(4 rows)
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+ Sort Key: bar.f1
+ -> Seq Scan on public.bar
+ Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
-> Foreign Scan on public.bar2
Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+ -> Sort
Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.*, foo.tableoid, foo.f1
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-> Foreign Scan on public.foo2
Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
f1 | f2
----+----
1 | 11
@@ -6760,11 +6770,11 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
Output: foo.ctid, foo.*, foo.tableoid, foo.f1
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-> Foreign Scan on public.foo2
Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-> Hash Join
Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid
Inner Unique: true
@@ -6778,11 +6788,11 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
Output: foo.ctid, foo.*, foo.tableoid, foo.f1
Group Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-> Foreign Scan on public.foo2
Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+ -> Seq Scan on public.foo
+ Output: foo.ctid, foo.*, foo.tableoid, foo.f1
(39 rows)
update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
@@ -6813,16 +6823,16 @@ where bar.f1 = ss.f1;
Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
Hash Cond: (foo.f1 = bar.f1)
-> Append
- -> Seq Scan on public.foo
- Output: ROW(foo.f1), foo.f1
-> Foreign Scan on public.foo2
Output: ROW(foo2.f1), foo2.f1
Remote SQL: SELECT f1 FROM public.loct1
- -> Seq Scan on public.foo foo_1
- Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
-> Foreign Scan on public.foo2 foo2_1
Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
Remote SQL: SELECT f1 FROM public.loct1
+ -> Seq Scan on public.foo
+ Output: ROW(foo.f1), foo.f1
+ -> Seq Scan on public.foo foo_1
+ Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
-> Hash
Output: bar.f1, bar.f2, bar.ctid
-> Seq Scan on public.bar
@@ -6840,16 +6850,16 @@ where bar.f1 = ss.f1;
Output: (ROW(foo.f1)), foo.f1
Sort Key: foo.f1
-> Append
- -> Seq Scan on public.foo
- Output: ROW(foo.f1), foo.f1
-> Foreign Scan on public.foo2
Output: ROW(foo2.f1), foo2.f1
Remote SQL: SELECT f1 FROM public.loct1
- -> Seq Scan on public.foo foo_1
- Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
-> Foreign Scan on public.foo2 foo2_1
Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
Remote SQL: SELECT f1 FROM public.loct1
+ -> Seq Scan on public.foo
+ Output: ROW(foo.f1), foo.f1
+ -> Seq Scan on public.foo foo_1
+ Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
(45 rows)
update bar set f2 = f2 + 100
@@ -7000,27 +7010,33 @@ delete from foo where f1 < 5 returning *;
(5 rows)
explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
- QUERY PLAN
-------------------------------------------------------------------------------
- Update on public.bar
- Output: bar.f1, bar.f2
- Update on public.bar
- Foreign Update on public.bar2
- -> Seq Scan on public.bar
- Output: bar.f1, (bar.f2 + 100), bar.ctid
- -> Foreign Update on public.bar2
- Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+ QUERY PLAN
+--------------------------------------------------------------------------------------
+ Sort
+ Output: u.f1, u.f2
+ Sort Key: u.f1
+ CTE u
+ -> Update on public.bar
+ Output: bar.f1, bar.f2
+ Update on public.bar
+ Foreign Update on public.bar2
+ -> Seq Scan on public.bar
+ Output: bar.f1, (bar.f2 + 100), bar.ctid
+ -> Foreign Update on public.bar2
+ Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+ -> CTE Scan on u
+ Output: u.f1, u.f2
+(14 rows)
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
f1 | f2
----+-----
1 | 311
2 | 322
- 6 | 266
3 | 333
4 | 344
+ 6 | 266
7 | 277
(6 rows)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 7992ba5..5ea1d88 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -20,6 +20,8 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -34,6 +36,7 @@
#include "optimizer/var.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
+#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
@@ -53,6 +56,9 @@ PG_MODULE_MAGIC;
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -120,10 +126,27 @@ enum FdwDirectModifyPrivateIndex
};
/*
+ * Connection private area structure.
+ */
+typedef struct PgFdwConnpriv
+{
+ ForeignScanState *current_owner; /* The node currently running a query
+ * on this connection*/
+} PgFdwConnpriv;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+ PGconn *conn; /* connection for the scan */
+ PgFdwConnpriv *connpriv; /* connection private memory */
+} PgFdwState;
+
+/*
* Execution state of a foreign scan using postgres_fdw.
*/
typedef struct PgFdwScanState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table. NULL
* for a foreign join scan. */
TupleDesc tupdesc; /* tuple descriptor of scan */
@@ -134,7 +157,7 @@ typedef struct PgFdwScanState
List *retrieved_attrs; /* list of retrieved attribute numbers */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
+ bool result_ready;
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@@ -150,6 +173,13 @@ typedef struct PgFdwScanState
/* batch-level state, for optimizing rewinds and avoiding useless fetch */
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ bool run_async; /* true if run asynchronously */
+ bool async_waiting; /* true if requesting the parent to wait */
+ ForeignScanState *waiter; /* Next node to run a query among nodes
+ * sharing the same connection */
+ ForeignScanState *last_waiter; /* A waiting node at the end of a waiting
+ * list. Maintained only by the current
+ * owner of the connection */
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
@@ -163,11 +193,11 @@ typedef struct PgFdwScanState
*/
typedef struct PgFdwModifyState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
/* for remote query execution */
- PGconn *conn; /* connection for the scan */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@@ -190,6 +220,7 @@ typedef struct PgFdwModifyState
*/
typedef struct PgFdwDirectModifyState
{
+ PgFdwState s; /* common structure */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
@@ -288,6 +319,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
static void postgresAddForeignUpdateTargets(Query *parsetree,
RangeTblEntry *target_rte,
Relation target_relation);
@@ -348,6 +380,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
UpperRelationKind stage,
RelOptInfo *input_rel,
RelOptInfo *output_rel);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data, bool reinit);
/*
* Helper functions
@@ -368,7 +404,10 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn);
+static void absorb_current_result(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
@@ -438,6 +477,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->IterateForeignScan = postgresIterateForeignScan;
routine->ReScanForeignScan = postgresReScanForeignScan;
routine->EndForeignScan = postgresEndForeignScan;
+ routine->ShutdownForeignScan = postgresShutdownForeignScan;
/* Functions for updating foreign tables */
routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -472,6 +512,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for async execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
PG_RETURN_POINTER(routine);
}
@@ -1322,12 +1366,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->s.conn = GetConnection(user, false);
+ fsstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
+ fsstate->s.connpriv->current_owner = NULL;
+ fsstate->waiter = NULL;
+ fsstate->last_waiter = node;
/* Assign a unique ID for my cursor */
- fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+ fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
fsstate->cursor_exists = false;
+ /* Initialize async execution status */
+ fsstate->run_async = false;
+ fsstate->async_waiting = false;
+
/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private,
FdwScanPrivateSelectSql));
@@ -1383,32 +1436,136 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
- */
- if (!fsstate->cursor_exists)
- create_cursor(node);
-
- /*
* Get some more tuples, if we've run out.
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
- /* No point in another fetch if we already detected EOF, though. */
- if (!fsstate->eof_reached)
- fetch_more_data(node);
- /* If we didn't get any tuples, must be end of data. */
+ ForeignScanState *next_conn_owner = node;
+
+ /* This node has sent a query on this connection */
+ if (fsstate->s.connpriv->current_owner == node)
+ {
+ /* Check if the result is available */
+ if (PQisBusy(fsstate->s.conn))
+ {
+ int rc = WaitLatchOrSocket(NULL,
+ WL_SOCKET_READABLE | WL_TIMEOUT,
+ PQsocket(fsstate->s.conn), 0,
+ WAIT_EVENT_ASYNC_WAIT);
+ if (node->fs_async && !(rc & WL_SOCKET_READABLE))
+ {
+ /*
+ * This node is not ready yet. Tell the caller to wait.
+ */
+ fsstate->result_ready = false;
+ node->ss.ps.asyncstate = AS_WAITING;
+ return ExecClearTuple(slot);
+ }
+ }
+
+ Assert(fsstate->async_waiting);
+ fsstate->async_waiting = false;
+ fetch_received_data(node);
+
+ /*
+ * If someone is waiting this node on the same connection, let the
+ * first waiter be the next owner of this connection.
+ */
+ if (fsstate->waiter)
+ {
+ PgFdwScanState *next_owner_state;
+
+ next_conn_owner = fsstate->waiter;
+ next_owner_state = GetPgFdwScanState(next_conn_owner);
+ fsstate->waiter = NULL;
+
+ /*
+ * only the current owner is responsible to maintain the shortcut
+ * to the last waiter
+ */
+ next_owner_state->last_waiter = fsstate->last_waiter;
+
+ /*
+ * for simplicity, last_waiter points itself on a node that no one
+ * is waiting for.
+ */
+ fsstate->last_waiter = node;
+ }
+ }
+ else if (fsstate->s.connpriv->current_owner &&
+ !GetPgFdwScanState(node)->eof_reached)
+ {
+ /*
+ * Anyone else is holding this connection and we want this node to
+ * run later. Add myself to the tail of the waiters' list then
+ * return not-ready. To avoid scanning through the waiters' list,
+ * the current owner is to maintain the shortcut to the last
+ * waiter.
+ */
+ PgFdwScanState *conn_owner_state =
+ GetPgFdwScanState(fsstate->s.connpriv->current_owner);
+ ForeignScanState *last_waiter = conn_owner_state->last_waiter;
+ PgFdwScanState *last_waiter_state = GetPgFdwScanState(last_waiter);
+
+ last_waiter_state->waiter = node;
+ conn_owner_state->last_waiter = node;
+
+ /* Register the node to the async-waiting node list */
+ Assert(!GetPgFdwScanState(node)->async_waiting);
+
+ GetPgFdwScanState(node)->async_waiting = true;
+
+ fsstate->result_ready = fsstate->eof_reached;
+ node->ss.ps.asyncstate =
+ fsstate->result_ready ? AS_AVAILABLE : AS_WAITING;
+ return ExecClearTuple(slot);
+ }
+
+ /* At this time no node is running on the connection */
+ Assert(GetPgFdwScanState(next_conn_owner)->s.connpriv->current_owner
+ == NULL);
+ /*
+ * Send the next request for the next owner of this connection if
+ * needed.
+ */
+ if (!GetPgFdwScanState(next_conn_owner)->eof_reached)
+ {
+ PgFdwScanState *next_owner_state =
+ GetPgFdwScanState(next_conn_owner);
+
+ request_more_data(next_conn_owner);
+
+ /* Register the node to the async-waiting node list */
+ if (!next_owner_state->async_waiting)
+ next_owner_state->async_waiting = true;
+
+ if (!next_conn_owner->fs_async)
+ fetch_received_data(next_conn_owner);
+ }
+
+
+ /*
+ * If we haven't received a result for the given node this time,
+ * return with no tuple to give way to other nodes.
+ */
if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ fsstate->result_ready = fsstate->eof_reached;
+ node->ss.ps.asyncstate =
+ fsstate->result_ready ? AS_AVAILABLE : AS_WAITING;
return ExecClearTuple(slot);
+ }
}
/*
* Return the next tuple.
*/
+ fsstate->result_ready = true;
+ node->ss.ps.asyncstate = AS_AVAILABLE;
ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
slot,
InvalidBuffer,
@@ -1424,7 +1581,7 @@ postgresIterateForeignScan(ForeignScanState *node)
static void
postgresReScanForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
char sql[64];
PGresult *res;
@@ -1432,6 +1589,9 @@ postgresReScanForeignScan(ForeignScanState *node)
if (!fsstate->cursor_exists)
return;
+ /* Absorb the ramining result */
+ absorb_current_result(node);
+
/*
* If any internal parameters affecting this node have changed, we'd
* better destroy and recreate the cursor. Otherwise, rewinding it should
@@ -1460,9 +1620,9 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->s.conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
PQclear(res);
/* Now force a fresh FETCH. */
@@ -1480,7 +1640,7 @@ postgresReScanForeignScan(ForeignScanState *node)
static void
postgresEndForeignScan(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
if (fsstate == NULL)
@@ -1488,16 +1648,32 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->s.conn, fsstate->cursor_number);
/* Release remote connection */
- ReleaseConnection(fsstate->conn);
- fsstate->conn = NULL;
+ ReleaseConnection(fsstate->s.conn);
+ fsstate->s.conn = NULL;
/* MemoryContexts will be deleted automatically. */
}
/*
+ * postgresShutdownForeignScan
+ * Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+ ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+ if (plan->operation != CMD_SELECT)
+ return;
+
+ /* Absorb the ramining result */
+ absorb_current_result(node);
+}
+
+/*
* postgresAddForeignUpdateTargets
* Add resjunk column(s) needed for update/delete on a foreign table
*/
@@ -1700,7 +1876,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->s.conn = GetConnection(user, true);
+ fmstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Deconstruct fdw_private data. */
@@ -1779,6 +1957,8 @@ postgresExecForeignInsert(EState *estate,
PGresult *res;
int n_rows;
+ vacate_connection((PgFdwState *)fmstate);
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -1789,14 +1969,14 @@ postgresExecForeignInsert(EState *estate,
/*
* Execute the prepared statement.
*/
- if (!PQsendQueryPrepared(fmstate->conn,
+ if (!PQsendQueryPrepared(fmstate->s.conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -1804,10 +1984,10 @@ postgresExecForeignInsert(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
@@ -1845,6 +2025,8 @@ postgresExecForeignUpdate(EState *estate,
PGresult *res;
int n_rows;
+ vacate_connection((PgFdwState *)fmstate);
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -1865,14 +2047,14 @@ postgresExecForeignUpdate(EState *estate,
/*
* Execute the prepared statement.
*/
- if (!PQsendQueryPrepared(fmstate->conn,
+ if (!PQsendQueryPrepared(fmstate->s.conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -1880,10 +2062,10 @@ postgresExecForeignUpdate(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
@@ -1921,6 +2103,8 @@ postgresExecForeignDelete(EState *estate,
PGresult *res;
int n_rows;
+ vacate_connection((PgFdwState *)fmstate);
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -1941,14 +2125,14 @@ postgresExecForeignDelete(EState *estate,
/*
* Execute the prepared statement.
*/
- if (!PQsendQueryPrepared(fmstate->conn,
+ if (!PQsendQueryPrepared(fmstate->s.conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -1956,10 +2140,10 @@ postgresExecForeignDelete(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
@@ -2006,16 +2190,16 @@ postgresEndForeignModify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->s.conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
PQclear(res);
fmstate->p_name = NULL;
}
/* Release remote connection */
- ReleaseConnection(fmstate->conn);
- fmstate->conn = NULL;
+ ReleaseConnection(fmstate->s.conn);
+ fmstate->s.conn = NULL;
}
/*
@@ -2303,7 +2487,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->s.conn = GetConnection(user, false);
+ dmstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
/* Initialize state variable */
dmstate->num_tuples = -1; /* -1 means not set yet */
@@ -2356,7 +2542,10 @@ postgresIterateDirectModify(ForeignScanState *node)
* If this is the first call after Begin, execute the statement.
*/
if (dmstate->num_tuples == -1)
+ {
+ vacate_connection((PgFdwState *)dmstate);
execute_dml_stmt(node);
+ }
/*
* If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2403,8 +2592,8 @@ postgresEndDirectModify(ForeignScanState *node)
PQclear(dmstate->result);
/* Release remote connection */
- ReleaseConnection(dmstate->conn);
- dmstate->conn = NULL;
+ ReleaseConnection(dmstate->s.conn);
+ dmstate->s.conn = NULL;
/* MemoryContext will be deleted automatically. */
}
@@ -2523,6 +2712,7 @@ estimate_path_cost_size(PlannerInfo *root,
List *local_param_join_conds;
StringInfoData sql;
PGconn *conn;
+ PgFdwConnpriv *connpriv;
Selectivity local_sel;
QualCost local_cost;
List *fdw_scan_tlist = NIL;
@@ -2565,6 +2755,16 @@ estimate_path_cost_size(PlannerInfo *root,
/* Get the remote estimate */
conn = GetConnection(fpinfo->user, false);
+ connpriv = GetConnectionSpecificStorage(fpinfo->user,
+ sizeof(PgFdwConnpriv));
+ if (connpriv)
+ {
+ PgFdwState tmpstate;
+ tmpstate.conn = conn;
+ tmpstate.connpriv = connpriv;
+ vacate_connection(&tmpstate);
+ }
+
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -2919,11 +3119,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
static void
create_cursor(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int numParams = fsstate->numParams;
const char **values = fsstate->param_values;
- PGconn *conn = fsstate->conn;
+ PGconn *conn = fsstate->s.conn;
StringInfoData buf;
PGresult *res;
@@ -2989,47 +3189,96 @@ create_cursor(ForeignScanState *node)
* Fetch some more rows from the node's cursor.
*/
static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
{
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ PGconn *conn = fsstate->s.conn;
+ char sql[64];
+
+ /* The connection should be vacant */
+ Assert(fsstate->s.connpriv->current_owner == NULL);
+
+ /*
+ * If this is the first call after Begin or ReScan, we need to create the
+ * cursor on the remote side.
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (!PQsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+ fsstate->s.connpriv->current_owner = node;
+}
+
+/*
+ * Fetch some more rows from the node's cursor.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
PGresult *volatile res = NULL;
MemoryContext oldcontext;
+ /* I should be the current connection owner */
+ Assert(fsstate->s.connpriv->current_owner == node);
+
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
- * batch.
+ * batch if no tuple is remaining
*/
- fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ MemoryContextReset(fsstate->batch_cxt);
+ }
+ else if (fsstate->next_tuple > 0)
+ {
+ /* move the remaining tuples to the beginning of the store */
+ int n = 0;
+
+ while(fsstate->next_tuple < fsstate->num_tuples)
+ fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+ fsstate->num_tuples = n;
+ }
+
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
/* PGresult must be released before leaving this function. */
PG_TRY();
{
- PGconn *conn = fsstate->conn;
+ PGconn *conn = fsstate->s.conn;
char sql[64];
- int numrows;
+ int addrows;
+ size_t newsize;
int i;
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_get_result(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
/* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ addrows = PQntuples(res);
+ newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+ if (fsstate->tuples)
+ fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+ else
+ fsstate->tuples = (HeapTuple *) palloc(newsize);
- for (i = 0; i < numrows; i++)
+ for (i = 0; i < addrows; i++)
{
Assert(IsA(node->ss.ps.plan, ForeignScan));
- fsstate->tuples[i] =
+ fsstate->tuples[fsstate->num_tuples + i] =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
@@ -3039,27 +3288,82 @@ fetch_more_data(ForeignScanState *node)
}
/* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
+ if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
fsstate->fetch_ct_2++;
+ fsstate->next_tuple = 0;
+ fsstate->num_tuples += addrows;
+
/* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
+ fsstate->eof_reached = (addrows < fsstate->fetch_size);
PQclear(res);
res = NULL;
}
PG_CATCH();
{
+ fsstate->s.connpriv->current_owner = NULL;
if (res)
PQclear(res);
PG_RE_THROW();
}
PG_END_TRY();
+ fsstate->s.connpriv->current_owner = NULL;
+
MemoryContextSwitchTo(oldcontext);
}
/*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate)
+{
+ PgFdwConnpriv *connpriv = fdwstate->connpriv;
+ ForeignScanState *owner;
+
+ if (connpriv == NULL || connpriv->current_owner == NULL)
+ return;
+
+ /*
+ * let the current connection owner read the result for the running query
+ */
+ owner = connpriv->current_owner;
+ fetch_received_data(owner);
+
+ /* Clear the waiting list */
+ while (owner)
+ {
+ PgFdwScanState *fsstate = GetPgFdwScanState(owner);
+
+ fsstate->last_waiter = NULL;
+ owner = fsstate->waiter;
+ fsstate->waiter = NULL;
+ }
+}
+
+/*
+ * Absorb the result of the current query.
+ */
+static void
+absorb_current_result(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *owner = fsstate->s.connpriv->current_owner;
+
+ if (owner)
+ {
+ PgFdwScanState *target_state = GetPgFdwScanState(owner);
+ PGconn *conn = target_state->s.conn;
+
+ while(PQisBusy(conn))
+ PQclear(PQgetResult(conn));
+ fsstate->s.connpriv->current_owner = NULL;
+ fsstate->async_waiting = false;
+ }
+}
+/*
* Force assorted GUC parameters to settings that ensure that we'll output
* data values in a form that is unambiguous to the remote server.
*
@@ -3143,7 +3447,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
- GetPrepStmtNumber(fmstate->conn));
+ GetPrepStmtNumber(fmstate->s.conn));
p_name = pstrdup(prep_name);
/*
@@ -3153,12 +3457,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
*/
- if (!PQsendPrepare(fmstate->conn,
+ if (!PQsendPrepare(fmstate->s.conn,
p_name,
fmstate->query,
0,
NULL))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
/*
* Get the result, and check for success.
@@ -3166,9 +3470,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
@@ -3299,9 +3603,9 @@ execute_dml_stmt(ForeignScanState *node)
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
- if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+ pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
/*
* Get the result, and check for success.
@@ -3309,10 +3613,10 @@ execute_dml_stmt(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+ dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
dmstate->query);
/* Get the number of rows affected. */
@@ -4582,6 +4886,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
/* XXX Consider parameterized paths for the join relation */
}
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection owner. Elsewise
+ * another node on this connection is the owner.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+ void *caller_data, bool reinit)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+ /* If the caller didn't reinit, this event is already in event set */
+ if (!reinit)
+ return true;
+
+ if (fsstate->s.connpriv->current_owner == node)
+ {
+ AddWaitEventToSet(wes,
+ WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+ NULL, caller_data);
+ return true;
+ }
+
+ return false;
+}
+
+
/*
* Assess whether the aggregation, grouping and having operations can be pushed
* down to the foreign server. As a side effect, save information we obtain in
@@ -4946,7 +5286,7 @@ make_tuple_from_result_row(PGresult *res,
PgFdwScanState *fdw_sstate;
Assert(fsstate);
- fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
+ fdw_sstate = GetPgFdwScanState(fsstate);
tupdesc = fdw_sstate->tupdesc;
}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 1ae809d..58ef26e 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -77,6 +77,7 @@ typedef struct PgFdwRelationInfo
UserMapping *user; /* only set in use_remote_estimate mode */
int fetch_size; /* fetch size for this remote table */
+ bool allow_prefetch; /* true to allow overlapped fetching */
/*
* Name of the relation while EXPLAINing ForeignScan. It is used for join
@@ -116,6 +117,7 @@ extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 3c3c5c7..cb9caa5 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1535,25 +1535,25 @@ INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
@@ -1589,12 +1589,12 @@ insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
-- Check UPDATE with inherited target and an inherited source table
explain (verbose, costs off)
@@ -1653,8 +1653,8 @@ explain (verbose, costs off)
delete from foo where f1 < 5 returning *;
delete from foo where f1 < 5 returning *;
explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
-- Test that UPDATE/DELETE with inherited target works with row-level triggers
CREATE TRIGGER trig_row_before
--
2.9.2
В списке pgsql-hackers по дате отправления: