Re: Problem while updating a foreign table pointing to apartitioned table on foreign server

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: Problem while updating a foreign table pointing to apartitioned table on foreign server
Дата
Msg-id 20180824.170607.131182908.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: Problem while updating a foreign table pointing to a partitionedtable on foreign server  (Etsuro Fujita <fujita.etsuro@lab.ntt.co.jp>)
Список pgsql-hackers
Sorry, I sent older version, which is logically same but contains
some whitespace problems. I resend only 0003 by this mail.

At Fri, 24 Aug 2018 16:51:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in
<20180824.165131.45788857.horiguchi.kyotaro@lab.ntt.co.jp>
> Hello.
> 
> At Tue, 21 Aug 2018 11:01:32 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20180821.110132.261184472.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > > You wrote:
> > > >    Several places seems to be assuming that fdw_scan_tlist may be
> > > >    used foreign scan on simple relation but I didn't find that
> > > >    actually happens.
> > > 
> > > Yeah, currently, postgres_fdw and file_fdw don't use that list for
> > > simple foreign table scans, but it could be used to improve the
> > > efficiency for those scans, as explained in fdwhandler.sgml:
> ...
> > I'll put more consideration on using fdw_scan_tlist in the
> > documented way.
> 
> Done. postgres_fdw now generates full fdw_scan_tlist (as
> documented) for foreign relations with junk columns, but a small
> change in core was needed. However it is far less invasive than
> the previous version and I believe that it dones't harm
> maybe-existing use of fdw_scan_tlist on non-join rels.
> 
> The previous patch didn't show "tableoid" in the Output list (as
> "<added_junk>") of explain output but this does correctly by
> referring to rte->eref->colnames. I believe no other FDW has
> expanded foreign relation even if it uses fdw_scan_tlist for
> ForeignScan on a base relation so it won't harm them.
> 
> Since this uses fdw_scan_tlist so it is theoretically
> back-patchable back to 9.6. This patch applies on top of the
> current master.
> 
> Please find the attached three files.
> 
> 0001-Add-test-for-postgres_fdw-foreign-parition-update.patch
> 
>  This should fail for unpatched postgres_fdw. (Just for demonstration)
> 
> 0002-Core-side-modification-for-PgFDW-foreign-update-fix.patch
> 
>  Core side change which allows fdw_scan_tlist to have extra
>  columns that is not defined in the base relation.
> 
> 0003-Fix-of-foreign-update-bug-of-PgFDW.patch
> 
>  Fix of postgres_fdw for this problem.
> 
> 0004-Regtest-change-for-PgFDW-foreign-update-fix.patch
> 
>  Regression test change separated for readability.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From b95571ac7cf15101bfa045354a82befe074ecc55 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 24 Aug 2018 16:17:24 +0900
Subject: [PATCH 3/4] Fix of foreign update bug of PgFDW

Postgres_fdw wrongly behavoes in updating foreign tables on a remote
partitioned table when direct modify is not used. This is because
postgres_fdw is forgetting that two different tuples with the same
ctid may come in the case. With this patch it uses remote tableoid in
addition to ctid to distinguish a remote tuple.
---
 contrib/postgres_fdw/deparse.c      | 149 +++++++++++-------
 contrib/postgres_fdw/postgres_fdw.c | 291 +++++++++++++++++++++++++++++++-----
 2 files changed, 344 insertions(+), 96 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 6001f4d25e..c4cd6a7249 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1037,6 +1037,15 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs,
          */
         deparseExplicitTargetList(tlist, false, retrieved_attrs, context);
     }
+    else if (tlist != NIL)
+    {
+        /*
+         * The given tlist is that of base relation's expanded with junk
+         * columns.
+         */
+        context->params_list = NULL;
+        deparseExplicitTargetList(tlist, false, retrieved_attrs, context);
+    }
     else
     {
         /*
@@ -1088,6 +1097,42 @@ deparseFromExpr(List *quals, deparse_expr_cxt *context)
     }
 }
 
+/*
+ * Adds one element in target/returning list if it is in attrs_used.
+ *
+ * If deparsestr is given, just use it. Otherwise resolves the name using rte.
+ */
+static inline void
+deparseAddTargetListItem(StringInfo buf,
+                         List **retrieved_attrs, Bitmapset *attrs_used,
+                         Index rtindex, AttrNumber attnum,
+                         char *deparsestr, RangeTblEntry *rte,
+                         bool is_returning, bool qualify_col,
+                         bool have_wholerow, bool *first)
+{
+    if (!have_wholerow &&
+        !bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, attrs_used))
+        return;
+
+    if (!*first)
+        appendStringInfoString(buf, ", ");
+    else if (is_returning)
+        appendStringInfoString(buf, " RETURNING ");
+    *first = false;
+
+    if (deparsestr)
+    {
+        if (qualify_col)
+            ADD_REL_QUALIFIER(buf, rtindex);
+
+        appendStringInfoString(buf, deparsestr);
+    }
+    else
+        deparseColumnRef(buf, rtindex, attnum, rte, qualify_col);
+
+    *retrieved_attrs = lappend_int(*retrieved_attrs, attnum);
+}
+
 /*
  * Emit a target list that retrieves the columns specified in attrs_used.
  * This is used for both SELECT and RETURNING targetlists; the is_returning
@@ -1128,58 +1173,28 @@ deparseTargetList(StringInfo buf,
         if (attr->attisdropped)
             continue;
 
-        if (have_wholerow ||
-            bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
-                          attrs_used))
-        {
-            if (!first)
-                appendStringInfoString(buf, ", ");
-            else if (is_returning)
-                appendStringInfoString(buf, " RETURNING ");
-            first = false;
-
-            deparseColumnRef(buf, rtindex, i, rte, qualify_col);
-
-            *retrieved_attrs = lappend_int(*retrieved_attrs, i);
-        }
+        deparseAddTargetListItem(buf, retrieved_attrs, attrs_used,
+                                 rtindex, i, NULL, rte,
+                                 is_returning, qualify_col, have_wholerow,
+                                 &first);
     }
 
     /*
-     * Add ctid and oid if needed.  We currently don't support retrieving any
-     * other system columns.
+     * Add ctid, oid and tableoid if needed. The attribute name and number are
+     * assigned in postgresAddForeignUpdateTargets. We currently don't support
+     * retrieving any other system columns.
      */
-    if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
-                      attrs_used))
-    {
-        if (!first)
-            appendStringInfoString(buf, ", ");
-        else if (is_returning)
-            appendStringInfoString(buf, " RETURNING ");
-        first = false;
+    deparseAddTargetListItem(buf, retrieved_attrs, attrs_used,
+                             rtindex, tupdesc->natts + 1, "tableoid",
+                             NULL, is_returning, qualify_col, false, &first);
 
-        if (qualify_col)
-            ADD_REL_QUALIFIER(buf, rtindex);
-        appendStringInfoString(buf, "ctid");
+    deparseAddTargetListItem(buf, retrieved_attrs, attrs_used,
+                             rtindex, SelfItemPointerAttributeNumber, "ctid",
+                             NULL, is_returning, qualify_col, false, &first);
 
-        *retrieved_attrs = lappend_int(*retrieved_attrs,
-                                       SelfItemPointerAttributeNumber);
-    }
-    if (bms_is_member(ObjectIdAttributeNumber - FirstLowInvalidHeapAttributeNumber,
-                      attrs_used))
-    {
-        if (!first)
-            appendStringInfoString(buf, ", ");
-        else if (is_returning)
-            appendStringInfoString(buf, " RETURNING ");
-        first = false;
-
-        if (qualify_col)
-            ADD_REL_QUALIFIER(buf, rtindex);
-        appendStringInfoString(buf, "oid");
-
-        *retrieved_attrs = lappend_int(*retrieved_attrs,
-                                       ObjectIdAttributeNumber);
-    }
+    deparseAddTargetListItem(buf, retrieved_attrs, attrs_used,
+                             rtindex, ObjectIdAttributeNumber, "oid",
+                             NULL, is_returning, qualify_col, false, &first);
 
     /* Don't generate bad syntax if no undropped columns */
     if (first && !is_returning)
@@ -1728,7 +1743,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
     deparseRelation(buf, rel);
     appendStringInfoString(buf, " SET ");
 
-    pindex = 2;                    /* ctid is always the first param */
+    pindex = 3;                    /* tableoid and ctid always precede */
     first = true;
     foreach(lc, targetAttrs)
     {
@@ -1742,7 +1757,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
         appendStringInfo(buf, " = $%d", pindex);
         pindex++;
     }
-    appendStringInfoString(buf, " WHERE ctid = $1");
+    appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
 
     deparseReturningList(buf, rte, rtindex, rel,
                          rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1858,7 +1873,7 @@ deparseDeleteSql(StringInfo buf, RangeTblEntry *rte,
 {
     appendStringInfoString(buf, "DELETE FROM ");
     deparseRelation(buf, rel);
-    appendStringInfoString(buf, " WHERE ctid = $1");
+    appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
 
     deparseReturningList(buf, rte, rtindex, rel,
                          rel->trigdesc && rel->trigdesc->trig_delete_after_row,
@@ -2160,9 +2175,11 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte,
     }
     else
     {
-        char       *colname = NULL;
+        char *colname = NULL;
         List       *options;
         ListCell   *lc;
+        Relation rel;
+        int natts;
 
         /* varno must not be any of OUTER_VAR, INNER_VAR and INDEX_VAR. */
         Assert(!IS_SPECIAL_VARNO(varno));
@@ -2171,16 +2188,34 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, RangeTblEntry *rte,
          * If it's a column of a foreign table, and it has the column_name FDW
          * option, use that value.
          */
-        options = GetForeignColumnOptions(rte->relid, varattno);
-        foreach(lc, options)
-        {
-            DefElem    *def = (DefElem *) lfirst(lc);
+        rel = heap_open(rte->relid, NoLock);
+        natts = RelationGetNumberOfAttributes(rel);
+        heap_close(rel, NoLock);
 
-            if (strcmp(def->defname, "column_name") == 0)
+        if (rte->relkind == RELKIND_FOREIGN_TABLE)
+        {
+            if (varattno > 0 && varattno <= natts)
             {
-                colname = defGetString(def);
-                break;
+                options = GetForeignColumnOptions(rte->relid, varattno);
+                foreach(lc, options)
+                {
+                    DefElem    *def = (DefElem *) lfirst(lc);
+
+                    if (strcmp(def->defname, "column_name") == 0)
+                    {
+                        colname = defGetString(def);
+                        break;
+                    }
+                }
             }
+            else if (varattno == natts + 1)
+            {
+                /* This should be an additional junk column */
+                colname = "tableoid";
+            }
+            else
+                elog(ERROR, "name resolution failed for attribute %d of relation %u",
+                     varattno, rte->relid);
         }
 
         /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 0803c30a48..babf5a49d4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -179,6 +179,7 @@ typedef struct PgFdwModifyState
 
     /* info about parameters for prepared statement */
     AttrNumber    ctidAttno;        /* attnum of input resjunk ctid column */
+    AttrNumber    toidAttno;        /* attnum of input resjunk tableoid column */
     int            p_nums;            /* number of parameters to transmit */
     FmgrInfo   *p_flinfo;        /* output conversion functions for them */
 
@@ -283,6 +284,12 @@ static void postgresGetForeignRelSize(PlannerInfo *root,
 static void postgresGetForeignPaths(PlannerInfo *root,
                         RelOptInfo *baserel,
                         Oid foreigntableid);
+static List *generate_scan_tlist_for_relation(PlannerInfo *root,
+                                              RelOptInfo *foreignrel,
+                                              Oid foreigntableoid,
+                                              PgFdwRelationInfo *fpinfo,
+                                              List *tlist,
+                                              List *recheck_quals);
 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
                        RelOptInfo *foreignrel,
                        Oid foreigntableid,
@@ -392,6 +399,7 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
                       List *retrieved_attrs);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
+                         Oid tableoid,
                          ItemPointer tupleid,
                          TupleTableSlot *slot);
 static void store_returning_result(PgFdwModifyState *fmstate,
@@ -1117,6 +1125,109 @@ postgresGetForeignPaths(PlannerInfo *root,
     }
 }
 
+/*
+ * generate_scan_tlist_for_relation :
+ *    Constructs fdw_scan_tlist from the followig sources.
+ *
+ * We may have appended tableoid and ctid junk columns to the parse
+ * targetlist. We need to give alternative scan tlist to planner in the
+ * case. This function returns the tlist consists of the following attributes
+ * in the order.
+ *
+ * 1. Relation attributes requested by user and needed for recheck
+ *        fpinfo->attrs_used, fdw_recheck_quals and given tlist.
+ * 2. Junk columns and others in root->processed_tlist which are not added by 1
+ *
+ * If no junk column exists, returns NIL.
+ */
+static List *
+generate_scan_tlist_for_relation(PlannerInfo *root,
+                                 RelOptInfo *foreignrel, Oid foreigntableoid,
+                                 PgFdwRelationInfo *fpinfo,
+                                 List *tlist, List *recheck_quals)
+{
+    Index        frelid = foreignrel->relid;
+    List       *fdw_scan_tlist = NIL;
+    Relation    frel;
+    int            base_nattrs;
+    ListCell   *lc;
+    Bitmapset *attrs = NULL;
+    int attnum;
+
+    /*
+     * RelOptInfo has expanded number of attributes. Check it against the base
+     * relations's attribute number to determine the necessity for alternative
+     * scan target list.
+     */
+    frel = heap_open(foreigntableoid, NoLock);
+    base_nattrs = RelationGetNumberOfAttributes(frel);
+    heap_close(frel, NoLock);
+
+    if (base_nattrs == foreignrel->max_attr)
+        return NIL;
+
+    /* We have junk columns. Construct alternative scan target list. */
+
+    /* collect needed relation attributes */
+    attrs = bms_copy(fpinfo->attrs_used);
+    pull_varattnos((Node *)recheck_quals, frelid, &attrs);
+    pull_varattnos((Node *)tlist, frelid, &attrs);
+
+    /* Add relation's attributes  */
+    while ((attnum = bms_first_member(attrs)) >= 0)
+    {
+        TargetEntry *tle;
+        Form_pg_attribute attr;
+        Var *var;
+        char *name = NULL;
+
+        attnum += FirstLowInvalidHeapAttributeNumber;
+        if (attnum < 1)
+            continue;
+        if (attnum > base_nattrs)
+            break;
+
+        attr = TupleDescAttr(frel->rd_att, attnum - 1);
+        if (attr->attisdropped)
+            var = (Var *) makeNullConst(INT4OID, -1, InvalidOid);
+        else
+        {
+            var = makeVar(frelid, attnum,
+                          attr->atttypid, attr->atttypmod,
+                          attr->attcollation, 0);
+            name = pstrdup(NameStr(attr->attname));
+        }
+
+        tle = makeTargetEntry((Expr *)var,
+                              list_length(fdw_scan_tlist) + 1,
+                              name,
+                              false);
+        fdw_scan_tlist = lappend(fdw_scan_tlist, tle);
+    }
+
+    /* Add junk attributes  */
+    foreach (lc, root->processed_tlist)
+    {
+        TargetEntry *tle = lfirst_node(TargetEntry, lc);
+        Var *var = (Var *) tle->expr;
+
+        /*
+         * We aren't interested in non Vars, vars of other rels and base
+         * attributes.
+         */
+        if (IsA(var, Var) && var->varno == frelid &&
+            (var->varattno > base_nattrs || var->varattno < 1))
+        {
+            Assert(tle->resjunk);
+            tle = copyObject(tle);
+            tle->resno = list_length(fdw_scan_tlist) + 1;
+            fdw_scan_tlist = lappend(fdw_scan_tlist, tle);
+        }
+    }
+
+    return fdw_scan_tlist;
+}
+
 /*
  * postgresGetForeignPlan
  *        Create ForeignScan plan node which implements selected best path
@@ -1140,10 +1251,11 @@ postgresGetForeignPlan(PlannerInfo *root,
     List       *fdw_recheck_quals = NIL;
     List       *retrieved_attrs;
     StringInfoData sql;
-    ListCell   *lc;
 
     if (IS_SIMPLE_REL(foreignrel))
     {
+        ListCell *lc;
+
         /*
          * For base relations, set scan_relid as the relid of the relation.
          */
@@ -1191,6 +1303,17 @@ postgresGetForeignPlan(PlannerInfo *root,
          * should recheck all the remote quals.
          */
         fdw_recheck_quals = remote_exprs;
+
+        /*
+         * We may have put tableoid and ctid as junk columns to the
+         * targetlist. Generate fdw_scan_tlist in the case.
+         */
+        fdw_scan_tlist = generate_scan_tlist_for_relation(root,
+                                                          foreignrel,
+                                                          foreigntableid,
+                                                          fpinfo,
+                                                          tlist,
+                                                          fdw_recheck_quals);
     }
     else
     {
@@ -1383,16 +1506,12 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
      * into local representation and error reporting during that process.
      */
     if (fsplan->scan.scanrelid > 0)
-    {
         fsstate->rel = node->ss.ss_currentRelation;
-        fsstate->tupdesc = RelationGetDescr(fsstate->rel);
-    }
     else
-    {
         fsstate->rel = NULL;
-        fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
-    }
 
+    /* Always use the tuple descriptor privided by core */
+    fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
     fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
 
     /*
@@ -1543,22 +1662,30 @@ postgresAddForeignUpdateTargets(Query *parsetree,
     Var           *var;
     const char *attrname;
     TargetEntry *tle;
+    int            varattno = RelationGetNumberOfAttributes(target_relation) + 1;
 
     /*
-     * In postgres_fdw, what we need is the ctid, same as for a regular table.
+     * In postgres_fdw, what we need is the tableoid and ctid, same as for a
+     * regular table.
      */
 
-    /* Make a Var representing the desired value */
+    /*
+     * Table OID is needed to retrieved as a non-system junk column in the
+     * returning tuple. We add it as a column after all regular columns.
+     */
+    attrname = "tableoid";
     var = makeVar(parsetree->resultRelation,
-                  SelfItemPointerAttributeNumber,
-                  TIDOID,
+                  varattno++,
+                  OIDOID,
                   -1,
                   InvalidOid,
                   0);
 
-    /* Wrap it in a resjunk TLE with the right name ... */
-    attrname = "ctid";
-
+    /*
+     * Wrap it in a resjunk TLE with a name accessible later by FDW. Doesn't
+     * seem that we explicitly free this tle but give pstrdup'ed string here
+     * just in case.
+     */
     tle = makeTargetEntry((Expr *) var,
                           list_length(parsetree->targetList) + 1,
                           pstrdup(attrname),
@@ -1566,6 +1693,29 @@ postgresAddForeignUpdateTargets(Query *parsetree,
 
     /* ... and add it to the query's targetlist */
     parsetree->targetList = lappend(parsetree->targetList, tle);
+
+    /* ... also needs to have colname entry */
+    target_rte->eref->colnames =
+        lappend(target_rte->eref->colnames, makeString(pstrdup(attrname)));
+
+
+    /* Do the same for ctid */
+    attrname = "ctid";
+    var = makeVar(parsetree->resultRelation,
+                  SelfItemPointerAttributeNumber,
+                  TIDOID,
+                  -1,
+                  InvalidOid,
+                  0);
+
+    tle = makeTargetEntry((Expr *) var,
+                          list_length(parsetree->targetList) + 1,
+                          pstrdup(attrname),
+                          true);
+
+    parsetree->targetList = lappend(parsetree->targetList, tle);
+    target_rte->eref->colnames =
+        lappend(target_rte->eref->colnames, makeString(pstrdup(attrname)));
 }
 
 /*
@@ -1769,7 +1919,7 @@ postgresExecForeignInsert(EState *estate,
         prepare_foreign_modify(fmstate);
 
     /* Convert parameters needed by prepared statement to text form */
-    p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+    p_values = convert_prep_stmt_params(fmstate, InvalidOid, NULL, slot);
 
     /*
      * Execute the prepared statement.
@@ -1824,7 +1974,7 @@ postgresExecForeignUpdate(EState *estate,
                           TupleTableSlot *planSlot)
 {
     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-    Datum        datum;
+    Datum        toiddatum, ctiddatum;
     bool        isNull;
     const char **p_values;
     PGresult   *res;
@@ -1835,17 +1985,26 @@ postgresExecForeignUpdate(EState *estate,
         prepare_foreign_modify(fmstate);
 
     /* Get the ctid that was passed up as a resjunk column */
-    datum = ExecGetJunkAttribute(planSlot,
-                                 fmstate->ctidAttno,
-                                 &isNull);
+    toiddatum = ExecGetJunkAttribute(planSlot,
+                                     fmstate->toidAttno,
+                                     &isNull);
+    /* shouldn't ever get a null result... */
+    if (isNull)
+        elog(ERROR, "tableoid is NULL");
+
+    /* Get the ctid that was passed up as a resjunk column */
+    ctiddatum = ExecGetJunkAttribute(planSlot,
+                                     fmstate->ctidAttno,
+                                     &isNull);
     /* shouldn't ever get a null result... */
     if (isNull)
         elog(ERROR, "ctid is NULL");
 
     /* Convert parameters needed by prepared statement to text form */
     p_values = convert_prep_stmt_params(fmstate,
-                                        (ItemPointer) DatumGetPointer(datum),
-                                        slot);
+                                    DatumGetObjectId(toiddatum),
+                                    (ItemPointer) DatumGetPointer(ctiddatum),
+                                    slot);
 
     /*
      * Execute the prepared statement.
@@ -1900,7 +2059,7 @@ postgresExecForeignDelete(EState *estate,
                           TupleTableSlot *planSlot)
 {
     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
-    Datum        datum;
+    Datum        ctiddatum, toiddatum;
     bool        isNull;
     const char **p_values;
     PGresult   *res;
@@ -1911,17 +2070,26 @@ postgresExecForeignDelete(EState *estate,
         prepare_foreign_modify(fmstate);
 
     /* Get the ctid that was passed up as a resjunk column */
-    datum = ExecGetJunkAttribute(planSlot,
-                                 fmstate->ctidAttno,
-                                 &isNull);
+    toiddatum = ExecGetJunkAttribute(planSlot,
+                                     fmstate->toidAttno,
+                                     &isNull);
+    /* shouldn't ever get a null result... */
+    if (isNull)
+        elog(ERROR, "tableoid is NULL");
+
+    /* Get the ctid that was passed up as a resjunk column */
+    ctiddatum = ExecGetJunkAttribute(planSlot,
+                                     fmstate->ctidAttno,
+                                     &isNull);
     /* shouldn't ever get a null result... */
     if (isNull)
         elog(ERROR, "ctid is NULL");
 
     /* Convert parameters needed by prepared statement to text form */
     p_values = convert_prep_stmt_params(fmstate,
-                                        (ItemPointer) DatumGetPointer(datum),
-                                        NULL);
+                                    DatumGetObjectId(toiddatum),
+                                    (ItemPointer) DatumGetPointer(ctiddatum),
+                                    NULL);
 
     /*
      * Execute the prepared statement.
@@ -2303,6 +2471,28 @@ postgresPlanDirectModify(PlannerInfo *root,
                                                    returningList);
     }
 
+    /*
+     * The junk columns in the targetlist is no longer needed for FDW direct
+     * moidfy. Strip them so that the planner doesn't bother.
+     */
+    if (fscan->scan.scanrelid > 0 && fscan->fdw_scan_tlist != NIL)
+    {
+        List *newtlist = NIL;
+        ListCell *lc;
+
+        fscan->fdw_scan_tlist = NIL;
+        foreach (lc, subplan->targetlist)
+        {
+            TargetEntry *tle = lfirst_node(TargetEntry, lc);
+
+            /* once found junk, all the rest are also junk */
+            if (tle->resjunk)
+                continue;
+            newtlist = lappend(newtlist, tle);
+        }
+        subplan->targetlist = newtlist;
+    }
+
     /*
      * Construct the SQL command string.
      */
@@ -2349,7 +2539,7 @@ postgresPlanDirectModify(PlannerInfo *root,
     /*
      * Update the foreign-join-related fields.
      */
-    if (fscan->scan.scanrelid == 0)
+    if (fscan->fdw_scan_tlist != NIL || fscan->scan.scanrelid == 0)
     {
         /* No need for the outer subplan. */
         fscan->scan.plan.lefttree = NULL;
@@ -3345,7 +3535,7 @@ create_foreign_modify(EState *estate,
         fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
     /* Prepare for output conversion of parameters used in prepared stmt. */
-    n_params = list_length(fmstate->target_attrs) + 1;
+    n_params = list_length(fmstate->target_attrs) + 2;
     fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
     fmstate->p_nums = 0;
 
@@ -3353,13 +3543,24 @@ create_foreign_modify(EState *estate,
     {
         Assert(subplan != NULL);
 
+        /* Find the remote tableoid resjunk column in the subplan's result */
+        fmstate->toidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+                                                          "tableoid");
+        if (!AttributeNumberIsValid(fmstate->toidAttno))
+            elog(ERROR, "could not find junk tableoid column");
+
+        /* First transmittable parameter will be table oid */
+        getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+        fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+        fmstate->p_nums++;
+
         /* Find the ctid resjunk column in the subplan's result */
         fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
                                                           "ctid");
         if (!AttributeNumberIsValid(fmstate->ctidAttno))
             elog(ERROR, "could not find junk ctid column");
 
-        /* First transmittable parameter will be ctid */
+        /* Second transmittable parameter will be ctid */
         getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
         fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
         fmstate->p_nums++;
@@ -3442,6 +3643,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  */
 static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
+                         Oid tableoid,
                          ItemPointer tupleid,
                          TupleTableSlot *slot)
 {
@@ -3453,10 +3655,15 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 
     p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
 
-    /* 1st parameter should be ctid, if it's in use */
-    if (tupleid != NULL)
+    /* First two parameters should be tableoid and ctid, if it's in use */
+    if (tableoid != InvalidOid)
     {
+        Assert (tupleid != NULL);
+
         /* don't need set_transmission_modes for TID output */
+        p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+                                              ObjectIdGetDatum(tableoid));
+        pindex++;
         p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
                                               PointerGetDatum(tupleid));
         pindex++;
@@ -3685,8 +3892,8 @@ rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
         new_tlist = lappend(new_tlist,
                             makeTargetEntry(tle->expr,
                                             list_length(new_tlist) + 1,
-                                            NULL,
-                                            false));
+                                            tle->resname,
+                                            tle->resjunk));
     }
     fscan->fdw_scan_tlist = new_tlist;
 }
@@ -5576,12 +5783,18 @@ make_tuple_from_result_row(PGresult *res,
      */
     oldcontext = MemoryContextSwitchTo(temp_context);
 
-    if (rel)
-        tupdesc = RelationGetDescr(rel);
+    /*
+     * If fdw_scan_tlist is provided for base relation, use the tuple
+     * descriptor given from planner.
+     */
+    if (!rel ||
+        (fsstate &&
+         castNode(ForeignScan, fsstate->ss.ps.plan)->fdw_scan_tlist != NULL))
+        tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
     else
     {
-        Assert(fsstate);
-        tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
+        Assert(rel);
+        tupdesc = RelationGetDescr(rel);
     }
 
     values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
@@ -5623,7 +5836,7 @@ make_tuple_from_result_row(PGresult *res,
         errpos.cur_attno = i;
         if (i > 0)
         {
-            /* ordinary column */
+            /* ordinary column and tableoid */
             Assert(i <= tupdesc->natts);
             nulls[i - 1] = (valstr == NULL);
             /* Apply the input function even to nulls, to support domains */
-- 
2.16.3


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Kyotaro HORIGUCHI
Дата:
Сообщение: Re: Problem while updating a foreign table pointing to apartitioned table on foreign server
Следующее
От: Fabien COELHO
Дата:
Сообщение: Re: libpq host/hostaddr/conninfo inconsistencies