Re: Problem while updating a foreign table pointing to apartitioned table on foreign server
| От | Kyotaro HORIGUCHI |
|---|---|
| Тема | Re: Problem while updating a foreign table pointing to apartitioned table on foreign server |
| Дата | |
| Msg-id | 20180605.191032.256535589.horiguchi.kyotaro@lab.ntt.co.jp обсуждение исходный текст |
| Ответ на | Re: Problem while updating a foreign table pointing to a partitionedtable on foreign server (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>) |
| Ответы |
Re: Problem while updating a foreign table pointing to a partitionedtable on foreign server
|
| Список | pgsql-hackers |
Hello.
At Mon, 04 Jun 2018 20:58:28 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20180604.205828.208262556.horiguchi.kyotaro@lab.ntt.co.jp>
> It fails on some join-pushdown cases since it doesn't add tid
> columns to join tlist. I suppose that build_tlist_to_deparse
> needs something but I'll consider further tomorrow.
I made it work with a few exceptions and bumped. PARAM_EXEC
doesn't work at all in a case where Sort exists between
ForeignUpdate and ForeignScan.
=====
explain (verbose, costs off)
update bar set f2 = f2 + 100
from
( select f1 from foo union all select f1+3 from foo ) ss
where bar.f1 = ss.f1;
QUERY PLAN
-----------------------------------------------------------------------------
Update on public.bar
Update on public.bar
Foreign Update on public.bar2
Remote SQL: UPDATE public.loct2 SET f2 = $3 WHERE tableoid = $1 AND ctid = $2
...
-> Merge Join
Output: bar2.f1, (bar2.f2 + 100), bar2.f3, (ROW(foo.f1))
Merge Cond: (bar2.f1 = foo.f1)
-> Sort
Output: bar2.f1, bar2.f2, bar2.f3, bar2.tableoid, bar2.ctid
Sort Key: bar2.f1
-> Foreign Scan on public.bar2
Output: bar2.f1, bar2.f2, bar2.f3, bar2.tableoid, bar2.ctid
Remote SQL: SELECT f1, f2, f3, ctid, tableoid FROM public.loct2 FOR UPDATE
=====
Even if this worked fine, it cannot be back-patched. We need an
extra storage moves together with tuples or prevent sorts or
something like from being inserted there.
At Fri, 1 Jun 2018 10:21:39 -0400, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote in
<CAFjFpRdraYcQnD4tKzNuP1uP6L-gnizi4HLU_UA=28Q2M4zoDA@mail.gmail.com>
> I am not suggesting to commit 0003 in my patch set, but just 0001 and
> 0002 which just raise an error when multiple rows get updated when
> only one row is expected to be updated.
So I agree to commit the two at least in order to prevent doing
wrong silently.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index d272719ff4..bff216f29d 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1049,9 +1049,16 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs,
* can use NoLock here.
*/
Relation rel = heap_open(rte->relid, NoLock);
+ Bitmapset *attrs = fpinfo->attrs_used;
+
+ if (root->parse->commandType != CMD_UPDATE &&
+ root->parse->commandType != CMD_DELETE)
+ attrs = bms_del_member(bms_copy(attrs),
+ TableOidAttributeNumber -
+ FirstLowInvalidHeapAttributeNumber);
deparseTargetList(buf, rte, foreignrel->relid, rel, false,
- fpinfo->attrs_used, false, retrieved_attrs);
+ attrs, false, retrieved_attrs);
heap_close(rel, NoLock);
}
}
@@ -1107,11 +1114,17 @@ deparseTargetList(StringInfo buf,
bool qualify_col,
List **retrieved_attrs)
{
+ static int check_attrs[4];
+ static char *check_attr_names[] = {"ctid", "oid", "tableoid"};
TupleDesc tupdesc = RelationGetDescr(rel);
bool have_wholerow;
bool first;
int i;
+ check_attrs[0] = SelfItemPointerAttributeNumber;
+ check_attrs[1] = ObjectIdAttributeNumber;
+ check_attrs[2] = TableOidAttributeNumber;
+ check_attrs[3] = FirstLowInvalidHeapAttributeNumber;
*retrieved_attrs = NIL;
/* If there's a whole-row reference, we'll need all the columns. */
@@ -1143,13 +1156,16 @@ deparseTargetList(StringInfo buf,
}
}
- /*
- * Add ctid and oid if needed. We currently don't support retrieving any
- * other system columns.
- */
- if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
- attrs_used))
+ for (i = 0 ; check_attrs[i] != FirstLowInvalidHeapAttributeNumber ; i++)
{
+ int attr = check_attrs[i];
+ char *attr_name = check_attr_names[i];
+
+ /* Add system columns if needed. */
+ if (!bms_is_member(attr - FirstLowInvalidHeapAttributeNumber,
+ attrs_used))
+ continue;
+
if (!first)
appendStringInfoString(buf, ", ");
else if (is_returning)
@@ -1158,26 +1174,9 @@ deparseTargetList(StringInfo buf,
if (qualify_col)
ADD_REL_QUALIFIER(buf, rtindex);
- appendStringInfoString(buf, "ctid");
+ appendStringInfoString(buf, attr_name);
- *retrieved_attrs = lappend_int(*retrieved_attrs,
- SelfItemPointerAttributeNumber);
- }
- if (bms_is_member(ObjectIdAttributeNumber - FirstLowInvalidHeapAttributeNumber,
- attrs_used))
- {
- if (!first)
- appendStringInfoString(buf, ", ");
- else if (is_returning)
- appendStringInfoString(buf, " RETURNING ");
- first = false;
-
- if (qualify_col)
- ADD_REL_QUALIFIER(buf, rtindex);
- appendStringInfoString(buf, "oid");
-
- *retrieved_attrs = lappend_int(*retrieved_attrs,
- ObjectIdAttributeNumber);
+ *retrieved_attrs = lappend_int(*retrieved_attrs, attr);
}
/* Don't generate bad syntax if no undropped columns */
@@ -1725,7 +1724,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
deparseRelation(buf, rel);
appendStringInfoString(buf, " SET ");
- pindex = 2; /* ctid is always the first param */
+ pindex = 3; /* tableoid and ctid are always the first param */
first = true;
foreach(lc, targetAttrs)
{
@@ -1739,7 +1738,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
appendStringInfo(buf, " = $%d", pindex);
pindex++;
}
- appendStringInfoString(buf, " WHERE ctid = $1");
+ appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
deparseReturningList(buf, rte, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1855,7 +1854,7 @@ deparseDeleteSql(StringInfo buf, RangeTblEntry *rte,
{
appendStringInfoString(buf, "DELETE FROM ");
deparseRelation(buf, rel);
- appendStringInfoString(buf, " WHERE ctid = $1");
+ appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
deparseReturningList(buf, rte, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_delete_after_row,
@@ -1951,8 +1950,13 @@ deparseReturningList(StringInfo buf, RangeTblEntry *rte,
*/
pull_varattnos((Node *) returningList, rtindex,
&attrs_used);
+
+ attrs_used = bms_del_member(attrs_used,
+ TableOidAttributeNumber -
+ FirstLowInvalidHeapAttributeNumber);
}
+
if (attrs_used != NULL)
deparseTargetList(buf, rte, rtindex, rel, true, attrs_used, false,
retrieved_attrs);
@@ -2066,6 +2070,12 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte,
ADD_REL_QUALIFIER(buf, varno);
appendStringInfoString(buf, "oid");
}
+ else if (varattno == TableOidAttributeNumber)
+ {
+ if (qualify_col)
+ ADD_REL_QUALIFIER(buf, varno);
+ appendStringInfoString(buf, "tableoid");
+ }
else if (varattno < 0)
{
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 78b0f43ca8..e574d7f51b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -73,7 +73,10 @@ enum FdwScanPrivateIndex
* String describing join i.e. names of relations being joined and types
* of join, added when the scan is join
*/
- FdwScanPrivateRelations
+ FdwScanPrivateRelations,
+
+ /* Integer list of ids of EXEC_PARAM */
+ FdwScanTupleIdParamIds
};
/*
@@ -95,7 +98,9 @@ enum FdwModifyPrivateIndex
/* has-returning flag (as an integer Value node) */
FdwModifyPrivateHasReturning,
/* Integer list of attribute numbers retrieved by RETURNING */
- FdwModifyPrivateRetrievedAttrs
+ FdwModifyPrivateRetrievedAttrs,
+ /* Integer list of paramid for tableoid and ctid of source tuple */
+ FdwModifyPrivateTidParams
};
/*
@@ -156,6 +161,8 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+
+ int *tid_params; /* EXEC_PARAM id for tuple identifier */
} PgFdwScanState;
/*
@@ -177,7 +184,7 @@ typedef struct PgFdwModifyState
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
/* info about parameters for prepared statement */
- AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
+ int *tid_params; /* EXEC_PARAM ids for tuple identifier */
int p_nums; /* number of parameters to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them */
@@ -293,9 +300,6 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
static void postgresEndForeignScan(ForeignScanState *node);
-static void postgresAddForeignUpdateTargets(Query *parsetree,
- RangeTblEntry *target_rte,
- Relation target_relation);
static List *postgresPlanForeignModify(PlannerInfo *root,
ModifyTable *plan,
Index resultRelation,
@@ -388,9 +392,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
char *query,
List *target_attrs,
bool has_returning,
- List *retrieved_attrs);
+ List *retrieved_attrs,
+ int *tid_params);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
+ Oid tableoid,
ItemPointer tupleid,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
@@ -451,6 +457,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_o,
const PgFdwRelationInfo *fpinfo_i);
+static List *add_tidcols_to_tlist(List *org, Index varno);
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -471,7 +478,6 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->EndForeignScan = postgresEndForeignScan;
/* Functions for updating foreign tables */
- routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
routine->PlanForeignModify = postgresPlanForeignModify;
routine->BeginForeignModify = postgresBeginForeignModify;
routine->ExecForeignInsert = postgresExecForeignInsert;
@@ -595,6 +601,39 @@ postgresGetForeignRelSize(PlannerInfo *root,
&fpinfo->attrs_used);
}
+ /*
+ * ctid and tableoid are required for target relation of UPDATE and
+ * DELETE. Join relations are handled elsewhere.
+ */
+ if (root->parse->resultRelation == baserel->relid &&
+ (root->parse->commandType == CMD_UPDATE ||
+ root->parse->commandType == CMD_DELETE))
+ {
+ Var *v;
+
+ v = makeVar(baserel->relid,
+ TableOidAttributeNumber,
+ OIDOID, -1, InvalidOid, 0);
+ add_new_column_to_pathtarget(baserel->reltarget, (Expr *) v);
+ v = makeVar(baserel->relid,
+ SelfItemPointerAttributeNumber,
+ TIDOID, -1, InvalidOid, 0);
+ add_new_column_to_pathtarget(baserel->reltarget, (Expr *) v);
+
+ fpinfo->param_attrs =
+ bms_add_member(fpinfo->param_attrs,
+ SelfItemPointerAttributeNumber -
+ FirstLowInvalidHeapAttributeNumber);
+
+ fpinfo->param_attrs =
+ bms_add_member(fpinfo->param_attrs,
+ TableOidAttributeNumber -
+ FirstLowInvalidHeapAttributeNumber);
+
+ fpinfo->attrs_used =
+ bms_add_members(fpinfo->attrs_used, fpinfo->param_attrs);
+ }
+
/*
* Compute the selectivity and cost of the local_conds, so we don't have
* to do it over again for each path. The best we can do for these
@@ -1116,6 +1155,94 @@ postgresGetForeignPaths(PlannerInfo *root,
}
}
+/* Find the id of a PARAM_EXEC matches to the given var */
+static int
+find_param_for_var(PlannerInfo *root, Var *var)
+{
+ ListCell *ppl;
+ PlannerParamItem *pitem;
+ Index levelsup;
+
+ /* Find the query level the Var belongs to */
+ for (levelsup = var->varlevelsup; levelsup > 0; levelsup--)
+ root = root->parent_root;
+
+ /* If there's already a matching PlannerParamItem there, just use it */
+ foreach(ppl, root->plan_params)
+ {
+ pitem = (PlannerParamItem *) lfirst(ppl);
+ if (IsA(pitem->item, Var))
+ {
+ Var *pvar = (Var *) pitem->item;
+
+ /*
+ * This comparison must match _equalVar(), except for ignoring
+ * varlevelsup. Note that _equalVar() ignores the location.
+ */
+ if (pvar->varno == var->varno &&
+ pvar->varattno == var->varattno &&
+ pvar->vartype == var->vartype &&
+ pvar->vartypmod == var->vartypmod &&
+ pvar->varcollid == var->varcollid &&
+ pvar->varnoold == var->varnoold &&
+ pvar->varoattno == var->varoattno)
+ return pitem->paramId;
+ }
+ }
+
+ return -1;
+}
+
+/*
+ * Select a PARAM_EXEC number to identify the given Var as a parameter for
+ * the current subquery, or for a nestloop's inner scan.
+ * If the Var already has a param in the current context, return that one.
+ * (copy of the function in subselect.c)
+ */
+static int
+assign_param_for_var(PlannerInfo *root, Var *var)
+{
+ int paramid;
+ PlannerParamItem *pitem;
+
+ /* Return registered param if any */
+ paramid = find_param_for_var(root, var);
+ if (paramid >= 0)
+ return paramid;
+
+ /* Nope, so make a new one */
+ var = copyObject(var);
+ var->varlevelsup = 0;
+
+ pitem = makeNode(PlannerParamItem);
+ pitem->item = (Node *) var;
+ pitem->paramId = list_length(root->glob->paramExecTypes);
+ root->glob->paramExecTypes = lappend_oid(root->glob->paramExecTypes,
+ var->vartype);
+
+ root->plan_params = lappend(root->plan_params, pitem);
+
+ return pitem->paramId;
+}
+
+static List *
+add_tidcols_to_tlist(List *org, Index varno)
+{
+ List *result = NIL;
+
+ result = list_copy(org);
+
+ result =
+ add_to_flat_tlist(result,
+ list_make2(makeVar(varno, TableOidAttributeNumber,
+ OIDOID, -1, InvalidOid, 0),
+ makeVar(varno,
+ SelfItemPointerAttributeNumber,
+ TIDOID, -1, InvalidOid, 0)));
+
+ return result;
+}
+
/*
* postgresGetForeignPlan
* Create ForeignScan plan node which implements selected best path
@@ -1136,6 +1263,7 @@ postgresGetForeignPlan(PlannerInfo *root,
List *local_exprs = NIL;
List *params_list = NIL;
List *fdw_scan_tlist = NIL;
+ List *fdw_return_tlist = NIL;
List *fdw_recheck_quals = NIL;
List *retrieved_attrs;
StringInfoData sql;
@@ -1223,8 +1351,8 @@ postgresGetForeignPlan(PlannerInfo *root,
* locally.
*/
- /* Build the list of columns to be fetched from the foreign server. */
- fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+ /* Build the list of columns to be returned to upper node. */
+ fdw_scan_tlist = fdw_return_tlist = build_tlist_to_deparse(foreignrel);
/*
* Ensure that the outer plan produces a tuple whose descriptor
@@ -1263,6 +1391,17 @@ postgresGetForeignPlan(PlannerInfo *root,
qual);
}
}
+
+ /*
+ * Remote query requires tuple identifers if this relation involves
+ * the target relation of UPDATE/DELETE commands.
+ */
+ if ((root->parse->commandType == CMD_UPDATE ||
+ root->parse->commandType == CMD_DELETE) &&
+ bms_is_member(root->parse->resultRelation, foreignrel->relids))
+ fdw_scan_tlist =
+ add_tidcols_to_tlist(fdw_return_tlist,
+ root->parse->resultRelation);
}
/*
@@ -1288,6 +1427,45 @@ postgresGetForeignPlan(PlannerInfo *root,
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name->data));
+ /*
+ * Prepare EXEC_PARAM for tuple identifier if this relation is the target
+ * relation of the current DELETE/UPDATE query.
+ */
+ if ((root->parse->commandType == CMD_DELETE ||
+ root->parse->commandType == CMD_UPDATE) &&
+ (scan_relid ?
+ !bms_is_empty(fpinfo->param_attrs) :
+ bms_is_member(root->parse->resultRelation, foreignrel->relids)))
+ {
+ int *paramids = palloc(sizeof(int) * 2);
+ Var *v;
+ Index target_relid = scan_relid;
+
+ if (target_relid == 0)
+ target_relid = root->parse->resultRelation;
+
+ if (list_length(fdw_private) == 3)
+ fdw_private = lappend(fdw_private, NULL);
+
+ v = makeNode(Var);
+ v->varno = target_relid;
+ v->vartype = OIDOID;
+ v->vartypmod = -1;
+ v->varcollid = InvalidOid;
+ v->varattno = TableOidAttributeNumber;
+ paramids[0] = assign_param_for_var(root, v);
+
+ v = makeNode(Var);
+ v->varno = target_relid;
+ v->vartype = TIDOID;
+ v->vartypmod = -1;
+ v->varcollid = InvalidOid;
+ v->varattno = SelfItemPointerAttributeNumber;
+ paramids[1] = assign_param_for_var(root, v);
+
+ fdw_private = lappend(fdw_private, paramids);
+ }
+
/*
* Create the ForeignScan node for the given relation.
*
@@ -1300,7 +1478,7 @@ postgresGetForeignPlan(PlannerInfo *root,
scan_relid,
params_list,
fdw_private,
- fdw_scan_tlist,
+ fdw_return_tlist,
fdw_recheck_quals,
outer_plan);
}
@@ -1368,6 +1546,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
FdwScanPrivateRetrievedAttrs);
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
FdwScanPrivateFetchSize));
+ if (list_length(fsplan->fdw_private) > FdwScanTupleIdParamIds)
+ fsstate->tid_params =
+ (int *) list_nth(fsplan->fdw_private, FdwScanTupleIdParamIds);
/* Create contexts for batches of tuples and per-tuple temp workspace. */
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1418,6 +1599,8 @@ postgresIterateForeignScan(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ EState *estate = node->ss.ps.state;
+ HeapTuple tup;
/*
* If this is the first call after Begin or ReScan, we need to create the
@@ -1439,10 +1622,30 @@ postgresIterateForeignScan(ForeignScanState *node)
return ExecClearTuple(slot);
}
+ tup = fsstate->tuples[fsstate->next_tuple++];
+
+ /* Store the remote table oid and ctid into exec parameter if requested */
+ if (fsstate->tid_params != NULL)
+ {
+ ParamExecData *prm;
+ ItemPointer itemp;
+
+ /* set toid */
+ prm = &(estate->es_param_exec_vals[fsstate->tid_params[0]]);
+ prm->value = ObjectIdGetDatum(tup->t_tableOid);
+ /* set ctid */
+ prm = &(estate->es_param_exec_vals[fsstate->tid_params[1]]);
+ itemp = (ItemPointer) palloc(sizeof(ItemPointerData));
+ ItemPointerSet(itemp,
+ ItemPointerGetBlockNumberNoCheck(&tup->t_self),
+ ItemPointerGetOffsetNumberNoCheck(&tup->t_self));
+ prm->value = PointerGetDatum(itemp);
+ }
+
/*
* Return the next tuple.
*/
- ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+ ExecStoreTuple(tup,
slot,
InvalidBuffer,
false);
@@ -1530,43 +1733,6 @@ postgresEndForeignScan(ForeignScanState *node)
/* MemoryContexts will be deleted automatically. */
}
-/*
- * postgresAddForeignUpdateTargets
- * Add resjunk column(s) needed for update/delete on a foreign table
- */
-static void
-postgresAddForeignUpdateTargets(Query *parsetree,
- RangeTblEntry *target_rte,
- Relation target_relation)
-{
- Var *var;
- const char *attrname;
- TargetEntry *tle;
-
- /*
- * In postgres_fdw, what we need is the ctid, same as for a regular table.
- */
-
- /* Make a Var representing the desired value */
- var = makeVar(parsetree->resultRelation,
- SelfItemPointerAttributeNumber,
- TIDOID,
- -1,
- InvalidOid,
- 0);
-
- /* Wrap it in a resjunk TLE with the right name ... */
- attrname = "ctid";
-
- tle = makeTargetEntry((Expr *) var,
- list_length(parsetree->targetList) + 1,
- pstrdup(attrname),
- true);
-
- /* ... and add it to the query's targetlist */
- parsetree->targetList = lappend(parsetree->targetList, tle);
-}
-
/*
* postgresPlanForeignModify
* Plan an insert/update/delete operation on a foreign table
@@ -1630,6 +1796,33 @@ postgresPlanForeignModify(PlannerInfo *root,
}
}
+ /*
+ * In the non-direct modify cases, the corresponding ForeignScan node must
+ * have stored remote tableoid and ctid as exec parameters
+ */
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ Var *v;
+ int *paramids = NULL;
+
+ paramids = palloc(sizeof(int) * 2);
+ v = makeNode(Var);
+ v->varno = resultRelation;
+ v->vartype = OIDOID;
+ v->vartypmod = -1;
+ v->varcollid = InvalidOid;
+ v->varattno = TableOidAttributeNumber;
+ paramids[0] = find_param_for_var(root, v);
+ if (paramids[0] < 0)
+ elog(ERROR, "Tupler ID parameter is not found");
+
+ v->vartype = TIDOID;
+ v->varattno = SelfItemPointerAttributeNumber;
+ paramids[1] = find_param_for_var(root, v);
+ if (paramids[1] < 0)
+ elog(ERROR, "Tupler ID parameter is not found");
+ }
+
/*
* Extract the relevant RETURNING list if any.
*/
@@ -1679,10 +1872,11 @@ postgresPlanForeignModify(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwModifyPrivateIndex, above.
*/
- return list_make4(makeString(sql.data),
+ return list_make5(makeString(sql.data),
targetAttrs,
makeInteger((retrieved_attrs != NIL)),
- retrieved_attrs);
+ retrieved_attrs,
+ paramids);
}
/*
@@ -1702,6 +1896,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
bool has_returning;
List *retrieved_attrs;
RangeTblEntry *rte;
+ int *tid_params;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
@@ -1719,6 +1914,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateHasReturning));
retrieved_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateRetrievedAttrs);
+ tid_params = (int *) list_nth(fdw_private, FdwModifyPrivateTidParams);
/* Find RTE. */
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
@@ -1733,7 +1929,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
query,
target_attrs,
has_returning,
- retrieved_attrs);
+ retrieved_attrs,
+ tid_params);
resultRelInfo->ri_FdwState = fmstate;
}
@@ -1758,7 +1955,7 @@ postgresExecForeignInsert(EState *estate,
prepare_foreign_modify(fmstate);
/* Convert parameters needed by prepared statement to text form */
- p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+ p_values = convert_prep_stmt_params(fmstate, InvalidOid, NULL, slot);
/*
* Execute the prepared statement.
@@ -1813,28 +2010,31 @@ postgresExecForeignUpdate(EState *estate,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
- Datum datum;
- bool isNull;
+ Datum toiddatum, ctiddatum;
const char **p_values;
PGresult *res;
int n_rows;
+ int *tid_params = fmstate->tid_params;
+ ParamExecData *prm;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
- /* Get the ctid that was passed up as a resjunk column */
- datum = ExecGetJunkAttribute(planSlot,
- fmstate->ctidAttno,
- &isNull);
- /* shouldn't ever get a null result... */
- if (isNull)
- elog(ERROR, "ctid is NULL");
+ Assert(tid_params);
+ /* Get the tableoid that was passed up as an exec param */
+ prm = &(estate->es_param_exec_vals[tid_params[0]]);
+ toiddatum = prm->value;
+
+ /* Get the ctid that was passed up as an exec param */
+ prm = &(estate->es_param_exec_vals[tid_params[1]]);
+ ctiddatum = prm->value;
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate,
- (ItemPointer) DatumGetPointer(datum),
- slot);
+ DatumGetObjectId(toiddatum),
+ (ItemPointer) DatumGetPointer(ctiddatum),
+ slot);
/*
* Execute the prepared statement.
@@ -1889,28 +2089,32 @@ postgresExecForeignDelete(EState *estate,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
- Datum datum;
- bool isNull;
+ Datum toiddatum, ctiddatum;
const char **p_values;
PGresult *res;
int n_rows;
+ int *tid_params = fmstate->tid_params;
+ ParamExecData *prm;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
+ Assert(tid_params);
+
+ /* Get the tableoid that was passed up as a exec param */
+ prm = &(estate->es_param_exec_vals[tid_params[0]]);
+ toiddatum = prm->value;
+
/* Get the ctid that was passed up as a resjunk column */
- datum = ExecGetJunkAttribute(planSlot,
- fmstate->ctidAttno,
- &isNull);
- /* shouldn't ever get a null result... */
- if (isNull)
- elog(ERROR, "ctid is NULL");
+ prm = &(estate->es_param_exec_vals[tid_params[1]]);
+ ctiddatum = prm->value;
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate,
- (ItemPointer) DatumGetPointer(datum),
- NULL);
+ DatumGetObjectId(toiddatum),
+ (ItemPointer) DatumGetPointer(ctiddatum),
+ NULL);
/*
* Execute the prepared statement.
@@ -2058,7 +2262,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
sql.data,
targetAttrs,
retrieved_attrs != NIL,
- retrieved_attrs);
+ retrieved_attrs,
+ NULL);
resultRelInfo->ri_FdwState = fmstate;
}
@@ -2561,8 +2766,13 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
*/
if (list_length(fdw_private) > FdwScanPrivateRelations)
{
- relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
- ExplainPropertyText("Relations", relations, es);
+ void *v = list_nth(fdw_private, FdwScanPrivateRelations);
+
+ if (v)
+ {
+ relations = strVal(v);
+ ExplainPropertyText("Relations", relations, es);
+ }
}
/*
@@ -2673,7 +2883,20 @@ estimate_path_cost_size(PlannerInfo *root,
/* Build the list of columns to be fetched from the foreign server. */
if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
+ {
fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+
+ /*
+ * If this foreign relation need to get remote tableoid and ctid,
+ * count them in costing.
+ */
+ if ((root->parse->commandType == CMD_UPDATE ||
+ root->parse->commandType == CMD_DELETE) &&
+ bms_is_member(root->parse->resultRelation, foreignrel->relids))
+ fdw_scan_tlist =
+ add_tidcols_to_tlist(fdw_scan_tlist,
+ root->parse->resultRelation);
+ }
else
fdw_scan_tlist = NIL;
@@ -3092,7 +3315,6 @@ create_cursor(ForeignScanState *node)
initStringInfo(&buf);
appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
fsstate->cursor_number, fsstate->query);
-
/*
* Notice that we pass NULL for paramTypes, thus forcing the remote server
* to infer types for all parameters. Since we explicitly cast every
@@ -3286,7 +3508,8 @@ create_foreign_modify(EState *estate,
char *query,
List *target_attrs,
bool has_returning,
- List *retrieved_attrs)
+ List *retrieved_attrs,
+ int *tid_params)
{
PgFdwModifyState *fmstate;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -3333,7 +3556,7 @@ create_foreign_modify(EState *estate,
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/* Prepare for output conversion of parameters used in prepared stmt. */
- n_params = list_length(fmstate->target_attrs) + 1;
+ n_params = list_length(fmstate->target_attrs) + 2;
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
fmstate->p_nums = 0;
@@ -3341,13 +3564,14 @@ create_foreign_modify(EState *estate,
{
Assert(subplan != NULL);
- /* Find the ctid resjunk column in the subplan's result */
- fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
- "ctid");
- if (!AttributeNumberIsValid(fmstate->ctidAttno))
- elog(ERROR, "could not find junk ctid column");
+ fmstate->tid_params = tid_params;
- /* First transmittable parameter will be ctid */
+ /* First transmittable parameter will be table oid */
+ getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ fmstate->p_nums++;
+
+ /* Second transmittable parameter will be ctid */
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
fmstate->p_nums++;
@@ -3430,6 +3654,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
*/
static const char **
convert_prep_stmt_params(PgFdwModifyState *fmstate,
+ Oid tableoid,
ItemPointer tupleid,
TupleTableSlot *slot)
{
@@ -3441,10 +3666,13 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
- /* 1st parameter should be ctid, if it's in use */
- if (tupleid != NULL)
+ /* First two parameters should be tableoid and ctid, if it's in use */
+ if (tableoid != InvalidOid)
{
/* don't need set_transmission_modes for TID output */
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+ ObjectIdGetDatum(tableoid));
+ pindex++;
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
@@ -5549,6 +5777,7 @@ make_tuple_from_result_row(PGresult *res,
bool *nulls;
ItemPointer ctid = NULL;
Oid oid = InvalidOid;
+ Oid toid = InvalidOid;
ConversionLocation errpos;
ErrorContextCallback errcallback;
MemoryContext oldcontext;
@@ -5609,10 +5838,9 @@ make_tuple_from_result_row(PGresult *res,
* Note: we ignore system columns other than ctid and oid in result
*/
errpos.cur_attno = i;
- if (i > 0)
+ if (i > 0 && i <= tupdesc->natts)
{
/* ordinary column */
- Assert(i <= tupdesc->natts);
nulls[i - 1] = (valstr == NULL);
/* Apply the input function even to nulls, to support domains */
values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
@@ -5620,7 +5848,20 @@ make_tuple_from_result_row(PGresult *res,
attinmeta->attioparams[i - 1],
attinmeta->atttypmods[i - 1]);
}
- else if (i == SelfItemPointerAttributeNumber)
+ else if (i == TableOidAttributeNumber ||
+ i == tupdesc->natts + 1)
+ {
+ /* table oid */
+ if (valstr != NULL)
+ {
+ Datum datum;
+
+ datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
+ toid = DatumGetObjectId(datum);
+ }
+ }
+ else if (i == SelfItemPointerAttributeNumber ||
+ i == tupdesc->natts + 2)
{
/* ctid */
if (valstr != NULL)
@@ -5691,6 +5932,9 @@ make_tuple_from_result_row(PGresult *res,
if (OidIsValid(oid))
HeapTupleSetOid(tuple, oid);
+ if (OidIsValid(toid))
+ tuple->t_tableOid = toid;
+
/* Clean up */
MemoryContextReset(temp_context);
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a5d4011e8d..39e5581125 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -108,6 +108,8 @@ typedef struct PgFdwRelationInfo
* representing the relation.
*/
int relation_index;
+
+ Bitmapset *param_attrs; /* attrs required for modification */
} PgFdwRelationInfo;
/* in postgres_fdw.c */
В списке pgsql-hackers по дате отправления: