Re: Problem while updating a foreign table pointing to apartitioned table on foreign server

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: Problem while updating a foreign table pointing to apartitioned table on foreign server
Дата
Msg-id 20180605.191032.256535589.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: Problem while updating a foreign table pointing to a partitionedtable on foreign server  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
Ответы Re: Problem while updating a foreign table pointing to a partitionedtable on foreign server  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
Список pgsql-hackers
Hello.

At Mon, 04 Jun 2018 20:58:28 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20180604.205828.208262556.horiguchi.kyotaro@lab.ntt.co.jp>
> It fails on some join-pushdown cases since it doesn't add tid
> columns to join tlist.  I suppose that build_tlist_to_deparse
> needs something but I'll consider further tomorrow.

I made it work with a few exceptions and bumped.  PARAM_EXEC
doesn't work at all in a case where Sort exists between
ForeignUpdate and ForeignScan.

=====
explain (verbose, costs off)
update bar set f2 = f2 + 100
from
  ( select f1 from foo union all select f1+3 from foo ) ss
where bar.f1 = ss.f1;
                                  QUERY PLAN
-----------------------------------------------------------------------------
 Update on public.bar
   Update on public.bar
   Foreign Update on public.bar2
     Remote SQL: UPDATE public.loct2 SET f2 = $3 WHERE tableoid = $1 AND ctid = $2
...
   ->  Merge Join
         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, (ROW(foo.f1))
         Merge Cond: (bar2.f1 = foo.f1)
         ->  Sort
               Output: bar2.f1, bar2.f2, bar2.f3, bar2.tableoid, bar2.ctid
               Sort Key: bar2.f1
               ->  Foreign Scan on public.bar2
                     Output: bar2.f1, bar2.f2, bar2.f3, bar2.tableoid, bar2.ctid
                     Remote SQL: SELECT f1, f2, f3, ctid, tableoid FROM public.loct2 FOR UPDATE
=====

Even if this worked fine, it cannot be back-patched.  We need an
extra storage moves together with tuples or prevent sorts or
something like from being inserted there.


At Fri, 1 Jun 2018 10:21:39 -0400, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote in
<CAFjFpRdraYcQnD4tKzNuP1uP6L-gnizi4HLU_UA=28Q2M4zoDA@mail.gmail.com>
> I am not suggesting to commit 0003 in my patch set, but just 0001 and
> 0002 which just raise an error when multiple rows get updated when
> only one row is expected to be updated.

So I agree to commit the two at least in order to prevent doing
wrong silently.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index d272719ff4..bff216f29d 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1049,9 +1049,16 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs,
          * can use NoLock here.
          */
         Relation    rel = heap_open(rte->relid, NoLock);
+        Bitmapset   *attrs = fpinfo->attrs_used;
+
+        if (root->parse->commandType != CMD_UPDATE &&
+            root->parse->commandType != CMD_DELETE)
+            attrs = bms_del_member(bms_copy(attrs),
+                                   TableOidAttributeNumber -
+                                   FirstLowInvalidHeapAttributeNumber);
 
         deparseTargetList(buf, rte, foreignrel->relid, rel, false,
-                          fpinfo->attrs_used, false, retrieved_attrs);
+                          attrs, false, retrieved_attrs);
         heap_close(rel, NoLock);
     }
 }
@@ -1107,11 +1114,17 @@ deparseTargetList(StringInfo buf,
                   bool qualify_col,
                   List **retrieved_attrs)
 {
+    static int    check_attrs[4];
+    static char *check_attr_names[] = {"ctid", "oid", "tableoid"};
     TupleDesc    tupdesc = RelationGetDescr(rel);
     bool        have_wholerow;
     bool        first;
     int            i;
 
+    check_attrs[0] = SelfItemPointerAttributeNumber;
+    check_attrs[1] = ObjectIdAttributeNumber;
+    check_attrs[2] = TableOidAttributeNumber;
+    check_attrs[3] = FirstLowInvalidHeapAttributeNumber;
     *retrieved_attrs = NIL;
 
     /* If there's a whole-row reference, we'll need all the columns. */
@@ -1143,13 +1156,16 @@ deparseTargetList(StringInfo buf,
         }
     }
 
-    /*
-     * Add ctid and oid if needed.  We currently don't support retrieving any
-     * other system columns.
-     */
-    if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
-                      attrs_used))
+    for (i = 0 ; check_attrs[i] != FirstLowInvalidHeapAttributeNumber ; i++)
     {
+        int    attr = check_attrs[i];
+        char *attr_name = check_attr_names[i];
+
+        /* Add system columns if needed. */
+        if (!bms_is_member(attr - FirstLowInvalidHeapAttributeNumber,
+                           attrs_used))
+            continue;
+
         if (!first)
             appendStringInfoString(buf, ", ");
         else if (is_returning)
@@ -1158,26 +1174,9 @@ deparseTargetList(StringInfo buf,
 
         if (qualify_col)
             ADD_REL_QUALIFIER(buf, rtindex);
-        appendStringInfoString(buf, "ctid");
+        appendStringInfoString(buf, attr_name);
 
-        *retrieved_attrs = lappend_int(*retrieved_attrs,
-                                       SelfItemPointerAttributeNumber);
-    }
-    if (bms_is_member(ObjectIdAttributeNumber - FirstLowInvalidHeapAttributeNumber,
-                      attrs_used))
-    {
-        if (!first)
-            appendStringInfoString(buf, ", ");
-        else if (is_returning)
-            appendStringInfoString(buf, " RETURNING ");
-        first = false;
-
-        if (qualify_col)
-            ADD_REL_QUALIFIER(buf, rtindex);
-        appendStringInfoString(buf, "oid");
-
-        *retrieved_attrs = lappend_int(*retrieved_attrs,
-                                       ObjectIdAttributeNumber);
+        *retrieved_attrs = lappend_int(*retrieved_attrs, attr);
     }
 
     /* Don't generate bad syntax if no undropped columns */
@@ -1725,7 +1724,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
     deparseRelation(buf, rel);
     appendStringInfoString(buf, " SET ");
 
-    pindex = 2;                    /* ctid is always the first param */
+    pindex = 3;            /* tableoid and ctid are always the first param */
     first = true;
     foreach(lc, targetAttrs)
     {
@@ -1739,7 +1738,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
         appendStringInfo(buf, " = $%d", pindex);
         pindex++;
     }
-    appendStringInfoString(buf, " WHERE ctid = $1");
+    appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
 
     deparseReturningList(buf, rte, rtindex, rel,
                          rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1855,7 +1854,7 @@ deparseDeleteSql(StringInfo buf, RangeTblEntry *rte,
 {
     appendStringInfoString(buf, "DELETE FROM ");
     deparseRelation(buf, rel);
-    appendStringInfoString(buf, " WHERE ctid = $1");
+    appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
 
     deparseReturningList(buf, rte, rtindex, rel,
                          rel->trigdesc && rel->trigdesc->trig_delete_after_row,
@@ -1951,8 +1950,13 @@ deparseReturningList(StringInfo buf, RangeTblEntry *rte,
          */
         pull_varattnos((Node *) returningList, rtindex,
                        &attrs_used);
+
+        attrs_used = bms_del_member(attrs_used,
+                                    TableOidAttributeNumber -
+                                    FirstLowInvalidHeapAttributeNumber);
     }
 
+
     if (attrs_used != NULL)
         deparseTargetList(buf, rte, rtindex, rel, true, attrs_used, false,
                           retrieved_attrs);
@@ -2066,6 +2070,12 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte,
             ADD_REL_QUALIFIER(buf, varno);
         appendStringInfoString(buf, "oid");
     }
+    else if (varattno == TableOidAttributeNumber)
+    {
+        if (qualify_col)
+            ADD_REL_QUALIFIER(buf, varno);
+        appendStringInfoString(buf, "tableoid");
+    }
     else if (varattno < 0)
     {
         /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 78b0f43ca8..e574d7f51b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -73,7 +73,10 @@ enum FdwScanPrivateIndex
      * String describing join i.e. names of relations being joined and types
      * of join, added when the scan is join
      */
-    FdwScanPrivateRelations
+    FdwScanPrivateRelations,
+
+    /* Integer list of ids of EXEC_PARAM */
+    FdwScanTupleIdParamIds
 };
 
 /*
@@ -95,7 +98,9 @@ enum FdwModifyPrivateIndex
     /* has-returning flag (as an integer Value node) */
     FdwModifyPrivateHasReturning,
     /* Integer list of attribute numbers retrieved by RETURNING */
-    FdwModifyPrivateRetrievedAttrs
+    FdwModifyPrivateRetrievedAttrs,
+    /* Integer list of paramid for tableoid and ctid of source tuple */
+    FdwModifyPrivateTidParams
 };
 
 /*
@@ -156,6 +161,8 @@ typedef struct PgFdwScanState
     MemoryContext temp_cxt;        /* context for per-tuple temporary data */
 
     int            fetch_size;        /* number of tuples per fetch */
+
+    int           *tid_params;        /* EXEC_PARAM id for tuple identifier */
 } PgFdwScanState;
 
 /*
@@ -177,7 +184,7 @@ typedef struct PgFdwModifyState
     List       *retrieved_attrs;    /* attr numbers retrieved by RETURNING */
 
     /* info about parameters for prepared statement */
-    AttrNumber    ctidAttno;        /* attnum of input resjunk ctid column */
+    int            *tid_params;    /* EXEC_PARAM ids for tuple identifier */
     int            p_nums;            /* number of parameters to transmit */
     FmgrInfo   *p_flinfo;        /* output conversion functions for them */
 
@@ -293,9 +300,6 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
-static void postgresAddForeignUpdateTargets(Query *parsetree,
-                                RangeTblEntry *target_rte,
-                                Relation target_relation);
 static List *postgresPlanForeignModify(PlannerInfo *root,
                           ModifyTable *plan,
                           Index resultRelation,
@@ -388,9 +392,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
                       char *query,
                       List *target_attrs,
                       bool has_returning,
-                      List *retrieved_attrs);
+                      List *retrieved_attrs,
+                      int *tid_params);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
+                         Oid tableoid,
                          ItemPointer tupleid,
                          TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
@@ -451,6 +457,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
                   const PgFdwRelationInfo *fpinfo_o,
                   const PgFdwRelationInfo *fpinfo_i);
 
+static List *add_tidcols_to_tlist(List *org, Index varno);
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -471,7 +478,6 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
     routine->EndForeignScan = postgresEndForeignScan;
 
     /* Functions for updating foreign tables */
-    routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
     routine->PlanForeignModify = postgresPlanForeignModify;
     routine->BeginForeignModify = postgresBeginForeignModify;
     routine->ExecForeignInsert = postgresExecForeignInsert;
@@ -595,6 +601,39 @@ postgresGetForeignRelSize(PlannerInfo *root,
                        &fpinfo->attrs_used);
     }
 
+    /*
+     * ctid and tableoid are required for target relation of UPDATE and
+     * DELETE. Join relations are handled elsewhere.
+     */
+    if (root->parse->resultRelation == baserel->relid &&
+        (root->parse->commandType == CMD_UPDATE ||
+         root->parse->commandType == CMD_DELETE))
+    {
+        Var *v;
+
+        v = makeVar(baserel->relid,
+                    TableOidAttributeNumber,
+                    OIDOID, -1, InvalidOid, 0);
+        add_new_column_to_pathtarget(baserel->reltarget, (Expr *) v);
+        v = makeVar(baserel->relid,
+                    SelfItemPointerAttributeNumber,
+                    TIDOID, -1, InvalidOid, 0);
+        add_new_column_to_pathtarget(baserel->reltarget, (Expr *) v);
+
+        fpinfo->param_attrs =
+            bms_add_member(fpinfo->param_attrs,
+                           SelfItemPointerAttributeNumber -
+                           FirstLowInvalidHeapAttributeNumber);
+
+        fpinfo->param_attrs =
+            bms_add_member(fpinfo->param_attrs,
+                           TableOidAttributeNumber -
+                           FirstLowInvalidHeapAttributeNumber);
+
+        fpinfo->attrs_used =
+            bms_add_members(fpinfo->attrs_used, fpinfo->param_attrs);
+    }
+
     /*
      * Compute the selectivity and cost of the local_conds, so we don't have
      * to do it over again for each path.  The best we can do for these
@@ -1116,6 +1155,94 @@ postgresGetForeignPaths(PlannerInfo *root,
     }
 }
 
+/* Find the id of a PARAM_EXEC matches to the given var */
+static int
+find_param_for_var(PlannerInfo *root, Var *var)
+{
+    ListCell   *ppl;
+    PlannerParamItem *pitem;
+    Index        levelsup;
+
+    /* Find the query level the Var belongs to */
+    for (levelsup = var->varlevelsup; levelsup > 0; levelsup--)
+        root = root->parent_root;
+
+    /* If there's already a matching PlannerParamItem there, just use it */
+    foreach(ppl, root->plan_params)
+    {
+        pitem = (PlannerParamItem *) lfirst(ppl);
+        if (IsA(pitem->item, Var))
+        {
+            Var           *pvar = (Var *) pitem->item;
+
+            /*
+             * This comparison must match _equalVar(), except for ignoring
+             * varlevelsup.  Note that _equalVar() ignores the location.
+             */
+            if (pvar->varno == var->varno &&
+                pvar->varattno == var->varattno &&
+                pvar->vartype == var->vartype &&
+                pvar->vartypmod == var->vartypmod &&
+                pvar->varcollid == var->varcollid &&
+                pvar->varnoold == var->varnoold &&
+                pvar->varoattno == var->varoattno)
+                return pitem->paramId;
+        }
+    }
+
+    return -1;
+}
+
+/*
+ * Select a PARAM_EXEC number to identify the given Var as a parameter for
+ * the current subquery, or for a nestloop's inner scan.
+ * If the Var already has a param in the current context, return that one.
+ * (copy of the function in subselect.c)
+ */
+static int
+assign_param_for_var(PlannerInfo *root, Var *var)
+{
+    int                    paramid;
+    PlannerParamItem   *pitem;
+
+    /* Return registered param if any */
+    paramid = find_param_for_var(root, var);
+    if (paramid >= 0)
+        return paramid;
+
+    /* Nope, so make a new one */
+    var = copyObject(var);
+    var->varlevelsup = 0;
+
+    pitem = makeNode(PlannerParamItem);
+    pitem->item = (Node *) var;
+    pitem->paramId = list_length(root->glob->paramExecTypes);
+    root->glob->paramExecTypes = lappend_oid(root->glob->paramExecTypes,
+                                             var->vartype);
+
+    root->plan_params = lappend(root->plan_params, pitem);
+
+    return pitem->paramId;
+}
+
+static List *
+add_tidcols_to_tlist(List *org, Index varno)
+{
+    List   *result = NIL;
+
+    result = list_copy(org);
+
+    result =
+        add_to_flat_tlist(result,
+                          list_make2(makeVar(varno, TableOidAttributeNumber,
+                                             OIDOID, -1, InvalidOid, 0),
+                                     makeVar(varno,
+                                             SelfItemPointerAttributeNumber,
+                                             TIDOID, -1, InvalidOid, 0)));
+
+    return result;
+}
+
 /*
  * postgresGetForeignPlan
  *        Create ForeignScan plan node which implements selected best path
@@ -1136,6 +1263,7 @@ postgresGetForeignPlan(PlannerInfo *root,
     List       *local_exprs = NIL;
     List       *params_list = NIL;
     List       *fdw_scan_tlist = NIL;
+    List       *fdw_return_tlist = NIL;
     List       *fdw_recheck_quals = NIL;
     List       *retrieved_attrs;
     StringInfoData sql;
@@ -1223,8 +1351,8 @@ postgresGetForeignPlan(PlannerInfo *root,
          * locally.
          */
 
-        /* Build the list of columns to be fetched from the foreign server. */
-        fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+        /* Build the list of columns to be returned to upper node. */
+        fdw_scan_tlist = fdw_return_tlist = build_tlist_to_deparse(foreignrel);
 
         /*
          * Ensure that the outer plan produces a tuple whose descriptor
@@ -1263,6 +1391,17 @@ postgresGetForeignPlan(PlannerInfo *root,
                                                       qual);
             }
         }
+
+        /*
+         * Remote query requires tuple identifers if this relation involves
+         * the target relation of UPDATE/DELETE commands.
+         */
+        if ((root->parse->commandType == CMD_UPDATE ||
+             root->parse->commandType == CMD_DELETE) &&
+            bms_is_member(root->parse->resultRelation, foreignrel->relids))
+            fdw_scan_tlist = 
+                add_tidcols_to_tlist(fdw_return_tlist,
+                                         root->parse->resultRelation);
     }
 
     /*
@@ -1288,6 +1427,45 @@ postgresGetForeignPlan(PlannerInfo *root,
         fdw_private = lappend(fdw_private,
                               makeString(fpinfo->relation_name->data));
 
+    /*
+     * Prepare EXEC_PARAM for tuple identifier if this relation is the target
+     * relation of the current DELETE/UPDATE query.
+     */
+    if ((root->parse->commandType == CMD_DELETE ||
+         root->parse->commandType == CMD_UPDATE) &&  
+        (scan_relid ?
+         !bms_is_empty(fpinfo->param_attrs) :
+         bms_is_member(root->parse->resultRelation, foreignrel->relids)))
+    {
+        int *paramids = palloc(sizeof(int) * 2);
+        Var    *v;
+        Index    target_relid = scan_relid;
+
+        if (target_relid == 0)
+            target_relid = root->parse->resultRelation;
+
+        if (list_length(fdw_private) == 3)
+            fdw_private = lappend(fdw_private, NULL);
+
+        v = makeNode(Var);
+        v->varno = target_relid;
+        v->vartype = OIDOID;
+        v->vartypmod = -1;
+        v->varcollid = InvalidOid;
+        v->varattno = TableOidAttributeNumber;
+        paramids[0] = assign_param_for_var(root, v);
+
+        v = makeNode(Var);
+        v->varno = target_relid;
+        v->vartype = TIDOID;
+        v->vartypmod = -1;
+        v->varcollid = InvalidOid;
+        v->varattno = SelfItemPointerAttributeNumber;
+        paramids[1] = assign_param_for_var(root, v);
+
+        fdw_private = lappend(fdw_private, paramids);
+    }
+
     /*
      * Create the ForeignScan node for the given relation.
      *
@@ -1300,7 +1478,7 @@ postgresGetForeignPlan(PlannerInfo *root,
                             scan_relid,
                             params_list,
                             fdw_private,
-                            fdw_scan_tlist,
+                            fdw_return_tlist,
                             fdw_recheck_quals,
                             outer_plan);
 }
@@ -1368,6 +1546,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
                                                  FdwScanPrivateRetrievedAttrs);
     fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
                                           FdwScanPrivateFetchSize));
+    if (list_length(fsplan->fdw_private) > FdwScanTupleIdParamIds)
+        fsstate->tid_params =
+            (int *) list_nth(fsplan->fdw_private, FdwScanTupleIdParamIds);
 
     /* Create contexts for batches of tuples and per-tuple temp workspace. */
     fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1418,6 +1599,8 @@ postgresIterateForeignScan(ForeignScanState *node)
 {
     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
     TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+    EState *estate = node->ss.ps.state;
+    HeapTuple        tup;
 
     /*
      * If this is the first call after Begin or ReScan, we need to create the
@@ -1439,10 +1622,30 @@ postgresIterateForeignScan(ForeignScanState *node)
             return ExecClearTuple(slot);
     }
 
+    tup = fsstate->tuples[fsstate->next_tuple++];
+
+    /* Store the remote table oid and ctid into exec parameter if requested */
+    if (fsstate->tid_params != NULL)
+    {
+        ParamExecData *prm;
+        ItemPointer      itemp;
+
+        /* set toid */
+        prm = &(estate->es_param_exec_vals[fsstate->tid_params[0]]);
+        prm->value = ObjectIdGetDatum(tup->t_tableOid);
+        /* set ctid */
+        prm = &(estate->es_param_exec_vals[fsstate->tid_params[1]]);
+        itemp = (ItemPointer) palloc(sizeof(ItemPointerData));
+        ItemPointerSet(itemp,
+                       ItemPointerGetBlockNumberNoCheck(&tup->t_self),
+                       ItemPointerGetOffsetNumberNoCheck(&tup->t_self));
+        prm->value = PointerGetDatum(itemp);
+    }
+
     /*
      * Return the next tuple.
      */
-    ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+    ExecStoreTuple(tup,
                    slot,
                    InvalidBuffer,
                    false);
@@ -1530,43 +1733,6 @@ postgresEndForeignScan(ForeignScanState *node)
     /* MemoryContexts will be deleted automatically. */
 }
 
-/*
- * postgresAddForeignUpdateTargets
- *        Add resjunk column(s) needed for update/delete on a foreign table
- */
-static void
-postgresAddForeignUpdateTargets(Query *parsetree,
-                                RangeTblEntry *target_rte,
-                                Relation target_relation)
-{
-    Var           *var;
-    const char *attrname;
-    TargetEntry *tle;
-
-    /*
-     * In postgres_fdw, what we need is the ctid, same as for a regular table.
-     */
-
-    /* Make a Var representing the desired value */
-    var = makeVar(parsetree->resultRelation,
-                  SelfItemPointerAttributeNumber,
-                  TIDOID,
-                  -1,
-                  InvalidOid,
-                  0);
-
-    /* Wrap it in a resjunk TLE with the right name ... */
-    attrname = "ctid";
-
-    tle = makeTargetEntry((Expr *) var,
-                          list_length(parsetree->targetList) + 1,
-                          pstrdup(attrname),
-                          true);
-
-    /* ... and add it to the query's targetlist */
-    parsetree->targetList = lappend(parsetree->targetList, tle);
-}
-
 /*
  * postgresPlanForeignModify
  *        Plan an insert/update/delete operation on a foreign table
@@ -1630,6 +1796,33 @@ postgresPlanForeignModify(PlannerInfo *root,
         }
     }
 
+    /*
+     * In the non-direct modify cases, the corresponding ForeignScan node must
+     * have stored remote tableoid and ctid as exec parameters
+     */
+    if (operation == CMD_UPDATE || operation == CMD_DELETE)
+    {
+        Var    *v;
+        int *paramids = NULL;
+
+        paramids = palloc(sizeof(int) * 2);
+        v = makeNode(Var);
+        v->varno = resultRelation;
+        v->vartype = OIDOID;
+        v->vartypmod = -1;
+        v->varcollid = InvalidOid;
+        v->varattno = TableOidAttributeNumber;
+        paramids[0] = find_param_for_var(root, v);
+        if (paramids[0] < 0)
+            elog(ERROR, "Tupler ID parameter is not found");
+
+        v->vartype = TIDOID;
+        v->varattno = SelfItemPointerAttributeNumber;
+        paramids[1] = find_param_for_var(root, v);
+        if (paramids[1] < 0)
+            elog(ERROR, "Tupler ID parameter is not found");
+    }
+
     /*
      * Extract the relevant RETURNING list if any.
      */
@@ -1679,10 +1872,11 @@ postgresPlanForeignModify(PlannerInfo *root,
      * Build the fdw_private list that will be available to the executor.
      * Items in the list must match enum FdwModifyPrivateIndex, above.
      */
-    return list_make4(makeString(sql.data),
+    return list_make5(makeString(sql.data),
                       targetAttrs,
                       makeInteger((retrieved_attrs != NIL)),
-                      retrieved_attrs);
+                      retrieved_attrs,
+                      paramids);
 }
 
 /*
@@ -1702,6 +1896,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
     bool        has_returning;
     List       *retrieved_attrs;
     RangeTblEntry *rte;
+    int           *tid_params;
 
     /*
      * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
@@ -1719,6 +1914,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
                                     FdwModifyPrivateHasReturning));
     retrieved_attrs = (List *) list_nth(fdw_private,
                                         FdwModifyPrivateRetrievedAttrs);
+    tid_params = (int *) list_nth(fdw_private, FdwModifyPrivateTidParams);
 
     /* Find RTE. */
     rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
@@ -1733,7 +1929,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
                                     query,
                                     target_attrs,
                                     has_returning,
-                                    retrieved_attrs);
+                                    retrieved_attrs,
+                                    tid_params);
 
     resultRelInfo->ri_FdwState = fmstate;
 }
@@ -1758,7 +1955,7 @@ postgresExecForeignInsert(EState *estate,
         prepare_foreign_modify(fmstate);
 
     /* Convert parameters needed by prepared statement to text form */
-    p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+    p_values = convert_prep_stmt_params(fmstate, InvalidOid, NULL, slot);
 
     /*
      * Execute the prepared statement.
@@ -1813,28 +2010,31 @@ postgresExecForeignUpdate(EState *estate,
                           TupleTableSlot *planSlot)
 {
     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-    Datum        datum;
-    bool        isNull;
+    Datum        toiddatum, ctiddatum;
     const char **p_values;
     PGresult   *res;
     int            n_rows;
+    int        *tid_params = fmstate->tid_params;
+    ParamExecData *prm;
 
     /* Set up the prepared statement on the remote server, if we didn't yet */
     if (!fmstate->p_name)
         prepare_foreign_modify(fmstate);
 
-    /* Get the ctid that was passed up as a resjunk column */
-    datum = ExecGetJunkAttribute(planSlot,
-                                 fmstate->ctidAttno,
-                                 &isNull);
-    /* shouldn't ever get a null result... */
-    if (isNull)
-        elog(ERROR, "ctid is NULL");
+    Assert(tid_params);
+    /* Get the tableoid that was passed up as an exec param */
+    prm = &(estate->es_param_exec_vals[tid_params[0]]);
+    toiddatum = prm->value;
+
+    /* Get the ctid that was passed up as an exec param */
+    prm = &(estate->es_param_exec_vals[tid_params[1]]);
+    ctiddatum = prm->value;
 
     /* Convert parameters needed by prepared statement to text form */
     p_values = convert_prep_stmt_params(fmstate,
-                                        (ItemPointer) DatumGetPointer(datum),
-                                        slot);
+                                    DatumGetObjectId(toiddatum),
+                                    (ItemPointer) DatumGetPointer(ctiddatum),
+                                    slot);
 
     /*
      * Execute the prepared statement.
@@ -1889,28 +2089,32 @@ postgresExecForeignDelete(EState *estate,
                           TupleTableSlot *planSlot)
 {
     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-    Datum        datum;
-    bool        isNull;
+    Datum        toiddatum, ctiddatum;
     const char **p_values;
     PGresult   *res;
     int            n_rows;
+    int        *tid_params = fmstate->tid_params;
+    ParamExecData *prm;
 
     /* Set up the prepared statement on the remote server, if we didn't yet */
     if (!fmstate->p_name)
         prepare_foreign_modify(fmstate);
 
+    Assert(tid_params);
+
+    /* Get the tableoid that was passed up as a exec param */
+    prm = &(estate->es_param_exec_vals[tid_params[0]]);
+    toiddatum = prm->value;
+
     /* Get the ctid that was passed up as a resjunk column */
-    datum = ExecGetJunkAttribute(planSlot,
-                                 fmstate->ctidAttno,
-                                 &isNull);
-    /* shouldn't ever get a null result... */
-    if (isNull)
-        elog(ERROR, "ctid is NULL");
+    prm = &(estate->es_param_exec_vals[tid_params[1]]);
+    ctiddatum = prm->value;
 
     /* Convert parameters needed by prepared statement to text form */
     p_values = convert_prep_stmt_params(fmstate,
-                                        (ItemPointer) DatumGetPointer(datum),
-                                        NULL);
+                                    DatumGetObjectId(toiddatum),
+                                    (ItemPointer) DatumGetPointer(ctiddatum),
+                                    NULL);
 
     /*
      * Execute the prepared statement.
@@ -2058,7 +2262,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                     sql.data,
                                     targetAttrs,
                                     retrieved_attrs != NIL,
-                                    retrieved_attrs);
+                                    retrieved_attrs,
+                                    NULL);
 
     resultRelInfo->ri_FdwState = fmstate;
 }
@@ -2561,8 +2766,13 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
      */
     if (list_length(fdw_private) > FdwScanPrivateRelations)
     {
-        relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
-        ExplainPropertyText("Relations", relations, es);
+        void *v = list_nth(fdw_private, FdwScanPrivateRelations);
+
+        if (v)
+        {
+            relations = strVal(v);
+            ExplainPropertyText("Relations", relations, es);
+        }
     }
 
     /*
@@ -2673,7 +2883,20 @@ estimate_path_cost_size(PlannerInfo *root,

         /* Build the list of columns to be fetched from the foreign server. */
         if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
+        {
             fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+
+            /*
+             * If this foreign relation need to get remote tableoid and ctid,
+             * count them in costing.
+             */
+            if ((root->parse->commandType == CMD_UPDATE ||
+                 root->parse->commandType == CMD_DELETE) &&
+                bms_is_member(root->parse->resultRelation, foreignrel->relids))
+                fdw_scan_tlist = 
+                    add_tidcols_to_tlist(fdw_scan_tlist,
+                                             root->parse->resultRelation);
+        }
         else
             fdw_scan_tlist = NIL;
 
@@ -3092,7 +3315,6 @@ create_cursor(ForeignScanState *node)
     initStringInfo(&buf);
     appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
                      fsstate->cursor_number, fsstate->query);
-
     /*
      * Notice that we pass NULL for paramTypes, thus forcing the remote server
      * to infer types for all parameters.  Since we explicitly cast every
@@ -3286,7 +3508,8 @@ create_foreign_modify(EState *estate,
                       char *query,
                       List *target_attrs,
                       bool has_returning,
-                      List *retrieved_attrs)
+                      List *retrieved_attrs,
+                      int *tid_params)
 {
     PgFdwModifyState *fmstate;
     Relation    rel = resultRelInfo->ri_RelationDesc;
@@ -3333,7 +3556,7 @@ create_foreign_modify(EState *estate,
         fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
     /* Prepare for output conversion of parameters used in prepared stmt. */
-    n_params = list_length(fmstate->target_attrs) + 1;
+    n_params = list_length(fmstate->target_attrs) + 2;
     fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
     fmstate->p_nums = 0;
 
@@ -3341,13 +3564,14 @@ create_foreign_modify(EState *estate,
     {
         Assert(subplan != NULL);
 
-        /* Find the ctid resjunk column in the subplan's result */
-        fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
-                                                          "ctid");
-        if (!AttributeNumberIsValid(fmstate->ctidAttno))
-            elog(ERROR, "could not find junk ctid column");
+        fmstate->tid_params = tid_params;
 
-        /* First transmittable parameter will be ctid */
+        /* First transmittable parameter will be table oid */
+        getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+        fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+        fmstate->p_nums++;
+
+        /* Second transmittable parameter will be ctid */
         getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
         fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
         fmstate->p_nums++;
@@ -3430,6 +3654,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  */
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
+                         Oid tableoid,
                          ItemPointer tupleid,
                          TupleTableSlot *slot)
 {
@@ -3441,10 +3666,13 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 
     p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
 
-    /* 1st parameter should be ctid, if it's in use */
-    if (tupleid != NULL)
+    /* First two parameters should be tableoid and ctid, if it's in use */
+    if (tableoid != InvalidOid)
     {
         /* don't need set_transmission_modes for TID output */
+        p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+                                              ObjectIdGetDatum(tableoid));
+        pindex++;
         p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
                                               PointerGetDatum(tupleid));
         pindex++;
@@ -5549,6 +5777,7 @@ make_tuple_from_result_row(PGresult *res,
     bool       *nulls;
     ItemPointer ctid = NULL;
     Oid            oid = InvalidOid;
+    Oid            toid = InvalidOid;
     ConversionLocation errpos;
     ErrorContextCallback errcallback;
     MemoryContext oldcontext;
@@ -5609,10 +5838,9 @@ make_tuple_from_result_row(PGresult *res,
          * Note: we ignore system columns other than ctid and oid in result
          */
         errpos.cur_attno = i;
-        if (i > 0)
+        if (i > 0 && i <= tupdesc->natts)
         {
             /* ordinary column */
-            Assert(i <= tupdesc->natts);
             nulls[i - 1] = (valstr == NULL);
             /* Apply the input function even to nulls, to support domains */
             values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
@@ -5620,7 +5848,20 @@ make_tuple_from_result_row(PGresult *res,
                                               attinmeta->attioparams[i - 1],
                                               attinmeta->atttypmods[i - 1]);
         }
-        else if (i == SelfItemPointerAttributeNumber)
+        else if (i == TableOidAttributeNumber ||
+                 i == tupdesc->natts + 1)
+        {
+            /* table oid */
+            if (valstr != NULL)
+            {
+                Datum        datum;
+
+                datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
+                toid = DatumGetObjectId(datum);
+            }
+        }
+        else if (i == SelfItemPointerAttributeNumber ||
+                 i ==  tupdesc->natts + 2)
         {
             /* ctid */
             if (valstr != NULL)
@@ -5691,6 +5932,9 @@ make_tuple_from_result_row(PGresult *res,
     if (OidIsValid(oid))
         HeapTupleSetOid(tuple, oid);
 
+    if (OidIsValid(toid))
+        tuple->t_tableOid = toid;
+
     /* Clean up */
     MemoryContextReset(temp_context);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a5d4011e8d..39e5581125 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -108,6 +108,8 @@ typedef struct PgFdwRelationInfo
      * representing the relation.
      */
     int            relation_index;
+
+    Bitmapset  *param_attrs;            /* attrs required for modification */
 } PgFdwRelationInfo;
 
 /* in postgres_fdw.c */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Alexander Korotkov
Дата:
Сообщение: Re: [HACKERS] Moving relation extension locks out of heavyweight lock manager
Следующее
От: Masahiko Sawada
Дата:
Сообщение: Re: [HACKERS] Transactions involving multiple postgres foreign servers