[PATCH 1/4] Add "COPY ... TO FUNCTION ..." support

Поиск
Список
Период
Сортировка
От Daniel Farina
Тема [PATCH 1/4] Add "COPY ... TO FUNCTION ..." support
Дата
Msg-id 1259012082-6196-2-git-send-email-dfarina@truviso.com
обсуждение исходный текст
Ответ на [PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..."  (Daniel Farina <dfarina@truviso.com>)
Список pgsql-hackers
This relatively small change enables all sort of new and shiny evil by
allowing specification of a function to COPY that accepts each
produced row's content in turn.  The function must accept a single
INTERNAL-type argument, which is actually of the type StringInfo.

Patch highlights:
 - Grammar production changes to enable "TO FUNCTION <qualified name>"
 - A new enumeration value in CopyDest to indicate this new mode   called COPY_FN.
 - CopyStateData's "filename" field has been renamed "destination"   and is now a Node type.  Before it was either a
stringor NULL;   now it may be a RangeVar, a string, or NULL.  Some code now has to   go through some minor strVal()
unboxingfor the regular TO '/file'   behavior.
 
 - Due to the relatively restricted way this function can be called   it was possible to reduce per-row overhead by
computingthe   FunctionCallInfo once and then reusing it, as opposed to simply   using one of the convenience functions
inthe fmgr.
 
 - Add and expose the makeNameListFromRangeVar procedure to   src/catalog/namespace.c, the inverse of
makeRangeVarFromNameList.

Signed-off-by: Daniel Farina <dfarina@truviso.com>
---src/backend/catalog/namespace.c |   21 +++++src/backend/commands/copy.c     |  190
+++++++++++++++++++++++++++++++++-----src/backend/executor/spi.c     |    2 +-src/backend/nodes/copyfuncs.c   |    2
+-src/backend/nodes/equalfuncs.c |    2 +-src/backend/parser/gram.y       |   30 ++++--src/include/catalog/namespace.h
|   1 +src/include/nodes/parsenodes.h  |    3 +-8 files changed, 212 insertions(+), 39 deletions(-)
 

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 99c9140..8911e29 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -2467,6 +2467,27 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)}/*
+ * makeNameListFromRangeVar
+ *        Utility routine to convert a qualified-name list into RangeVar form.
+ */
+List *
+makeNameListFromRangeVar(RangeVar *rangevar)
+{
+    List *names = NIL;
+
+    Assert(rangevar->relname != NULL);
+    names = lcons(makeString(rangevar->relname), names);
+
+    if (rangevar->schemaname != NULL)
+        names = lcons(makeString(rangevar->schemaname), names);
+
+    if (rangevar->catalogname != NULL)
+        names = lcons(makeString(rangevar->catalogname), names);
+
+    return names;
+}
+
+/* * makeRangeVarFromNameList *        Utility routine to convert a qualified-name list into RangeVar form. */
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5e95fd7..985505a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@#include "mb/pg_wchar.h"#include "miscadmin.h"#include "optimizer/planner.h"
+#include "parser/parse_func.h"#include "parser/parse_relation.h"#include "rewrite/rewriteHandler.h"#include
"storage/fd.h"
@@ -55,7 +56,8 @@ typedef enum CopyDest{    COPY_FILE,                    /* to/from file */    COPY_OLD_FE,
   /* to/from frontend (2.0 protocol) */
 
-    COPY_NEW_FE                    /* to/from frontend (3.0 protocol) */
+    COPY_NEW_FE,                /* to/from frontend (3.0 protocol) */
+    COPY_FN                        /* to function */} CopyDest;/*
@@ -104,7 +106,8 @@ typedef struct CopyStateData    Relation    rel;            /* relation to copy to or from */
QueryDesc *queryDesc;        /* executable query to copy from */    List       *attnumlist;        /* integer list of
attnumsto copy */
 
-    char       *filename;        /* filename, or NULL for STDIN/STDOUT */
+    Node       *destination;    /* filename, or NULL for STDIN/STDOUT, or a
+                                 * function */    bool        binary;            /* binary format? */    bool
oids;           /* include OIDs? */    bool        csv_mode;        /* Comma Separated Value format? */
 
@@ -131,6 +134,13 @@ typedef struct CopyStateData    MemoryContext rowcontext;    /* per-row evaluation context */
/*
+     * For writing rows out to a function. Used if copy_dest == COPY_FN
+     *
+     * Avoids repeated use of DirectFunctionCall for efficiency.
+     */
+    FunctionCallInfoData    output_fcinfo;
+
+    /*     * These variables are used to reduce overhead in textual COPY FROM.     *     * attribute_buf holds the
separated,de-escaped text for each field of
 
@@ -425,9 +435,11 @@ CopySendEndOfRow(CopyState cstate){    StringInfo    fe_msgbuf = cstate->fe_msgbuf;
+    /* Take care adding row delimiters*/    switch (cstate->copy_dest)    {        case COPY_FILE:
+        case COPY_FN:            if (!cstate->binary)            {                /* Default line termination depends
onplatform */
 
@@ -437,6 +449,18 @@ CopySendEndOfRow(CopyState cstate)                CopySendString(cstate, "\r\n");#endif
}
+            break;
+        case COPY_NEW_FE:
+        case COPY_OLD_FE:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            if (!cstate->binary)
+                CopySendChar(cstate, '\n');
+            break;
+    }
+
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:            (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,                          1,
cstate->copy_file);
@@ -446,10 +470,6 @@ CopySendEndOfRow(CopyState cstate)                         errmsg("could not write to COPY file:
%m")));           break;        case COPY_OLD_FE:
 
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->binary)
-                CopySendChar(cstate, '\n');
-            if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))            {                /* no hope of recovering
connectionsync, so FATAL */
 
@@ -459,13 +479,19 @@ CopySendEndOfRow(CopyState cstate)            }            break;        case COPY_NEW_FE:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->binary)
-                CopySendChar(cstate, '\n');
-            /* Dump the accumulated row as one CopyData message */            (void) pq_putmessage('d',
fe_msgbuf->data,fe_msgbuf->len);            break;
 
+        case COPY_FN:
+            FunctionCallInvoke(&cstate->output_fcinfo);
+
+            /*
+             * These are set earlier and are not supposed to change row to row.
+             */
+            Assert(cstate->output_fcinfo.arg[0] ==
+                   PointerGetDatum(cstate->fe_msgbuf));
+            Assert(!cstate->output_fcinfo.argnull[0]);
+            break;    }    resetStringInfo(fe_msgbuf);
@@ -577,6 +603,12 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)                bytesread +=
avail;           }            break;
 
+        case COPY_FN:
+            /*
+             * Should be disallowed by some prior step
+             */
+            Assert(false);
+            break;    }    return bytesread;
@@ -719,7 +751,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString){    CopyState    cstate;    bool
is_from= stmt->is_from;
 
-    bool        pipe = (stmt->filename == NULL);
+    bool        pipe = (stmt->destination == NULL);    List       *attnamelist = stmt->attlist;    List
*force_quote= NIL;    List       *force_notnull = NIL;
 
@@ -986,6 +1018,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)                 errhint("Anyone can COPY to
stdoutor from stdin. "                         "psql's \\copy command also works for anyone.")));
 
+    /* Disallow COPY ... FROM FUNCTION (only TO FUNCTION supported) */
+    if (is_from && cstate->destination != NULL &&
+        IsA(cstate->destination, RangeVar))
+        ereport(ERROR,
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("COPY FROM does not support functions as sources")));
+
+    if (stmt->relation)    {        Assert(!stmt->query);
@@ -1183,7 +1223,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)    cstate->encoding_embeds_ascii =
PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);   cstate->copy_dest = COPY_FILE;        /* default */
 
-    cstate->filename = stmt->filename;
+    cstate->destination = stmt->destination;    if (is_from)        CopyFrom(cstate);        /* copy from file to
database*/
 
@@ -1225,7 +1265,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)static voidDoCopyTo(CopyState cstate){
-    bool        pipe = (cstate->filename == NULL);
+    bool        pipe = (cstate->destination == NULL);    if (cstate->rel)    {
@@ -1257,37 +1297,128 @@ DoCopyTo(CopyState cstate)        else            cstate->copy_file = stdout;    }
-    else
+    else if (IsA(cstate->destination, String))    {        mode_t        oumask;        /* Pre-existing umask value */
      struct stat st;
 
+        char       *dest_filename = strVal(cstate->destination);        /*         * Prevent write to relative path
...too easy to shoot oneself in the         * foot by overwriting a database file ...         */
 
-        if (!is_absolute_path(cstate->filename))
+        if (!is_absolute_path(dest_filename))            ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),                    errmsg("relative path not allowed for COPY to file")));
oumask= umask((mode_t) 022);
 
-        cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+        cstate->copy_file = AllocateFile(dest_filename, PG_BINARY_W);        umask(oumask);        if
(cstate->copy_file== NULL)            ereport(ERROR,                    (errcode_for_file_access(),
errmsg("couldnot open file \"%s\" for writing: %m",
 
-                            cstate->filename)));
+                            dest_filename)));        fstat(fileno(cstate->copy_file), &st);        if
(S_ISDIR(st.st_mode))           ereport(ERROR,                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
-                     errmsg("\"%s\" is a directory", cstate->filename)));
+                     errmsg("\"%s\" is a directory", dest_filename)));    }
+    /* Branch taken in the "COPY ... TO FUNCTION funcname" situation */
+    else if (IsA(cstate->destination, RangeVar))
+    {
+        List            *names;
+        FmgrInfo        *flinfo;
+        FuncDetailCode     fdresult;
+        Oid                 funcid;
+        Oid                 rettype;
+        bool             retset;
+        int                 nvargs;
+        Oid                *true_typeids;
+        const int         nargs        = 1;
+        Oid                 argtypes[]    = { INTERNALOID };
+
+        /* Flip copy-action dispatch flag */
+        cstate->copy_dest = COPY_FN;
+
+        /* Make an fcinfo that can be reused and is stored on the cstate. */
+        names = makeNameListFromRangeVar((RangeVar *) cstate->destination);
+        flinfo  = palloc0(sizeof *flinfo);
+
+
+        fdresult = func_get_detail(names, NIL, NIL, nargs, argtypes, false,
+                                   false,
+
+                                   /* Begin out-arguments */
+                                   &funcid, &rettype, &retset, &nvargs,
+                                   &true_typeids, NULL);
+
+        /*
+         * Check to ensure that this is a "normal" function when looked up,
+         * otherwise error.
+         */
+        switch (fdresult)
+        {
+            /* Normal function found; do nothing */
+            case FUNCDETAIL_NORMAL:
+                break;
+
+            case FUNCDETAIL_NOTFOUND:
+                ereport(ERROR,
+                        (errcode(ERRCODE_UNDEFINED_FUNCTION),
+                         errmsg("function %s does not exist",
+                                func_signature_string(names, nargs, NIL,
+                                                      argtypes))));
+                break;
+
+            case FUNCDETAIL_AGGREGATE:
+                ereport(ERROR,
+                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                         errmsg("function %s must not be an aggregate",
+                                func_signature_string(names, nargs, NIL,
+                                                      argtypes))));
+                break;
+
+            case FUNCDETAIL_WINDOWFUNC:
+                ereport(ERROR,
+                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                         errmsg("function %s must not be a window function",
+                                func_signature_string(names, nargs, NIL,
+                                                      argtypes))));
+                break;
+
+            case FUNCDETAIL_COERCION:
+                /*
+                 * Should never be yielded from func_get_detail if it is passed
+                 * fargs == NIL, as it is previously.
+                 */
+                Assert(false);
+                break;
+
+            case FUNCDETAIL_MULTIPLE:
+                /*
+                 * Only support one signature, thus overloading of a name with
+                 * different types should never occur.
+                 */
+                Assert(false);
+                break;
+
+        }
+
+        fmgr_info(funcid, flinfo);
+        InitFunctionCallInfoData(cstate->output_fcinfo, flinfo,
+                                 1, NULL, NULL);
+    }
+    else
+        /* Unexpected type was found for cstate->destination. */
+        Assert(false);
+
+    PG_TRY();    {        if (cstate->fe_copy)
@@ -1310,13 +1441,13 @@ DoCopyTo(CopyState cstate)    }    PG_END_TRY();
-    if (!pipe)
+    if (!pipe && cstate->copy_dest != COPY_FN)    {        if (FreeFile(cstate->copy_file))            ereport(ERROR,
                 (errcode_for_file_access(),                     errmsg("could not write to file \"%s\": %m",
 
-                            cstate->filename)));
+                            strVal(cstate->destination))));    }}
@@ -1342,6 +1473,13 @@ CopyTo(CopyState cstate)    /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
 
+    /*
+     * fe_msgbuf is never rebound, so there is only a need to set up the
+     * output_fcinfo once.
+     */
+    cstate->output_fcinfo.arg[0] = PointerGetDatum(cstate->fe_msgbuf);
+    cstate->output_fcinfo.argnull[0] = false;
+    /* Get info about the columns we need to process. */    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs
*sizeof(FmgrInfo));    foreach(cur, cstate->attnumlist)
 
@@ -1668,7 +1806,7 @@ limit_printout_length(const char *str)static voidCopyFrom(CopyState cstate){
-    bool        pipe = (cstate->filename == NULL);
+    bool        pipe = (cstate->destination == NULL);    HeapTuple    tuple;    TupleDesc    tupDesc;
Form_pg_attribute*attr;
 
@@ -1768,19 +1906,21 @@ CopyFrom(CopyState cstate)    {        struct stat st;
-        cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+        cstate->copy_file = AllocateFile(strVal(cstate->destination),
+                                         PG_BINARY_R);        if (cstate->copy_file == NULL)            ereport(ERROR,
                  (errcode_for_file_access(),                     errmsg("could not open file \"%s\" for reading: %m",
 
-                            cstate->filename)));
+                            strVal(cstate->destination))));        fstat(fileno(cstate->copy_file), &st);        if
(S_ISDIR(st.st_mode))           ereport(ERROR,                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
 
-                     errmsg("\"%s\" is a directory", cstate->filename)));
+                     errmsg("\"%s\" is a directory",
+                            strVal(cstate->destination))));    }    tupDesc = RelationGetDescr(cstate->rel);
@@ -2215,7 +2355,7 @@ CopyFrom(CopyState cstate)            ereport(ERROR,
(errcode_for_file_access(),                    errmsg("could not read from file \"%s\": %m",
 
-                            cstate->filename)));
+                            strVal(cstate->destination))));    }    /*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index f045f9c..0914dc9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1829,7 +1829,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,                {
CopyStmt  *cstmt = (CopyStmt *) stmt;
 
-                    if (cstmt->filename == NULL)
+                    if (cstmt->destination == NULL)                    {                        my_res =
SPI_ERROR_COPY;                       goto fail;
 
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 8bc72d1..9b39abe 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2485,7 +2485,7 @@ _copyCopyStmt(CopyStmt *from)    COPY_NODE_FIELD(query);    COPY_NODE_FIELD(attlist);
COPY_SCALAR_FIELD(is_from);
-    COPY_STRING_FIELD(filename);
+    COPY_NODE_FIELD(destination);    COPY_NODE_FIELD(options);    return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3d65d8b..6ddf226 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1085,7 +1085,7 @@ _equalCopyStmt(CopyStmt *a, CopyStmt *b)    COMPARE_NODE_FIELD(query);
COMPARE_NODE_FIELD(attlist);   COMPARE_SCALAR_FIELD(is_from);
 
-    COMPARE_STRING_FIELD(filename);
+    COMPARE_NODE_FIELD(destination);    COMPARE_NODE_FIELD(options);    return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130e6f4..23331ee 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -251,8 +251,7 @@ static TypeName *TableFuncTypeName(List *columns);%type <value>    TriggerFuncArg%type <node>
TriggerWhen
-%type <str>        copy_file_name
-                database_name access_method_clause access_method attr_name
+%type <str>        database_name access_method_clause access_method attr_name                index_name name
cursor_namefile_name cluster_index_specification%type <list>    func_name handler_name qual_Op qual_all_Op subquery_Op
 
@@ -433,6 +432,8 @@ static TypeName *TableFuncTypeName(List *columns);%type <ival>    opt_frame_clause frame_extent
frame_bound
+%type <node>    copy_file_or_function_name
+/* * Non-keyword token types.  These are hard-wired into the "flex" lexer. * They must be listed first so that their
numericcodes do not depend on
 
@@ -1977,14 +1978,15 @@ ClosePortalStmt:
*****************************************************************************/CopyStmt:   COPY opt_binary
qualified_nameopt_column_list opt_oids
 
-            copy_from copy_file_name copy_delimiter opt_with copy_options
+            copy_from copy_file_or_function_name copy_delimiter opt_with
+            copy_options                {                    CopyStmt *n = makeNode(CopyStmt);
n->relation= $3;                    n->query = NULL;                    n->attlist = $4;                    n->is_from
=$6;
 
-                    n->filename = $7;
+                    n->destination = $7;                    n->options = NIL;                    /* Concatenate
user-suppliedflags */
 
@@ -1998,14 +2000,15 @@ CopyStmt:    COPY opt_binary qualified_name opt_column_list opt_oids
n->options= list_concat(n->options, $10);                    $$ = (Node *)n;                }
 
-            | COPY select_with_parens TO copy_file_name opt_with copy_options
+            | COPY select_with_parens TO copy_file_or_function_name
+              opt_with copy_options                {                    CopyStmt *n = makeNode(CopyStmt);
     n->relation = NULL;                    n->query = $2;                    n->attlist = NIL;
n->is_from= false;
 
-                    n->filename = $4;
+                    n->destination = $4;                    n->options = $6;                    $$ = (Node *)n;
       }
 
@@ -2021,10 +2024,17 @@ copy_from: * used depends on the direction. (It really doesn't make sense to copy from *
stdout.We silently correct the "typo".)         - AY 9/94 */
 
-copy_file_name:
-            Sconst                                    { $$ = $1; }
-            | STDIN                                    { $$ = NULL; }
-            | STDOUT                                { $$ = NULL; }
+copy_file_or_function_name:
+            Sconst                            { $$ = (Node *) makeString($1); }
+
+            /*
+             * Note that func_name is not used here because there is no need to
+             * accept the "funcname(TYPES)" construction, as there is only one
+             * valid signature.
+             */
+            | FUNCTION qualified_name        { $$ = (Node *) $2; }
+            | STDIN                            { $$ = NULL; }
+            | STDOUT                        { $$ = NULL; }        ;copy_options: copy_opt_list
  { $$ = $1; }
 
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index d356635..1d801cd 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -94,6 +94,7 @@ extern Oid    LookupExplicitNamespace(const char *nspname);extern Oid
LookupCreationNamespace(constchar *nspname);extern Oid    QualifiedNameGetCreationNamespace(List *names, char
**objname_p);
+extern List *makeNameListFromRangeVar(RangeVar *rangevar);extern RangeVar *makeRangeVarFromNameList(List
*names);externchar *NameListToString(List *names);extern char *NameListToQuotedString(List *names);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b34300f..203088c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1293,7 +1293,8 @@ typedef struct CopyStmt    List       *attlist;        /* List of column names (as Strings), or
NIL                                * for all columns */    bool        is_from;        /* TO or FROM */
 
-    char       *filename;        /* filename, or NULL for STDIN/STDOUT */
+    Node       *destination;    /* filename, or NULL for STDIN/STDOUT, or a
+                                 * function */    List       *options;        /* List of DefElem nodes */} CopyStmt;
-- 
1.6.5.3



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Daniel Farina
Дата:
Сообщение: [PATCH 2/4] Add tests for "COPY ... TO FUNCTION ..."
Следующее
От: Peter Eisentraut
Дата:
Сообщение: Re: magic block in doc functions