Re: Make COPY format extendable: Extract COPY TO format implementations

Поиск
Список
Период
Сортировка
От Sutou Kouhei
Тема Re: Make COPY format extendable: Extract COPY TO format implementations
Дата
Msg-id 20240126.174947.1394046405836416758.kou@clear-code.com
обсуждение исходный текст
Ответ на Re: Make COPY format extendable: Extract COPY TO format implementations  (Michael Paquier <michael@paquier.xyz>)
Список pgsql-hackers
Hi,

In <ZbLwNyOKbddno0Ue@paquier.xyz>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Fri, 26 Jan 2024 08:35:19 +0900,
  Michael Paquier <michael@paquier.xyz> wrote:

>> OK. How about the following for the fetch function
>> signature?
>> 
>> extern CopyToRoutine *GetBuiltinCopyToRoutine(const char *format);
> 
> Or CopyToRoutineGet()?  I am not wedded to my suggestion, got a bad
> history with naming things around here.

Thanks for the suggestion.
I rethink about this and use the following:

+extern void ProcessCopyOptionFormatTo(ParseState *pstate, CopyFormatOptions *opts_out, DefElem *defel);

It's not a fetch function. It sets CopyToRoutine opts_out
instead. But it hides CopyToRoutine* to copyto.c. Is it
acceptable?

>> OK. I'll focus on 0001 and 0005 for now. I'll restart
>> 0002-0004/0006-0008 after 0001 and 0005 are accepted.
> 
> Once you get these, I'd be interested in re-doing an evaluation of
> COPY TO and more tests with COPY FROM while running Postgres on
> scissors.  One thing I was thinking to use here is my blackhole_am for
> COPY FROM:
> https://github.com/michaelpq/pg_plugins/tree/main/blackhole_am

Thanks!

Could you evaluate the attached patch set with COPY FROM?

I attach v7 patch set. It includes only the 0001 and 0005
parts in v6 patch set because we focus on them for now.

0001: This is based on 0001 in v6.

Changes since v6:

* Fix comment style
* Hide CopyToRoutine{Text,CSV,Binary}
* Add more comments
* Eliminate "if (cstate->opts.csv_mode)" branches from "text"
  and "csv" callbacks
* Remove CopyTo*_function typedefs
* Update benchmark results in commit message but the results
  are measured on my environment that isn't suitable for
  accurate benchmark

0002: This is based on 0005 in v6.

Changes since v6:

* Fix comment style
* Hide CopyFromRoutine{Text,CSV,Binary}
* Add more comments
* Eliminate a "if (cstate->opts.csv_mode)" branch from "text"
  and "csv" callbacks
  * NOTE: We can eliminate more "if (cstate->opts.csv_mode)" branches
    such as one in NextCopyFromRawFields(). Should we do it
    in this feature improvement (make COPY format
    extendable)? Can we defer this as a separated improvement?
* Remove CopyFrom*_function typedefs



Thanks,
-- 
kou
From 3e75129c2e9d9d34eebb6ef31b4fe6579f9eb02d Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Fri, 26 Jan 2024 16:46:51 +0900
Subject: [PATCH v7 1/2] Extract COPY TO format implementations

This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
  https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
  https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com

This doesn't change the current behavior. This just introduces
CopyToRoutine, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyToRoutine can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.

Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:

https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us

> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.

You can see that there is no significant loss of performance:

Data: Random 32 bit integers:

    CREATE TABLE data (int32 integer);
    SELECT setseed(0.29);
    INSERT INTO data
      SELECT random() * 10000
        FROM generate_series(1, ${n_records});

The number of records: 100K, 1M and 10M

100K without this change:

    format,elapsed time (ms)
    text,10.561
    csv,10.868
    binary,10.287

100K with this change:

    format,elapsed time (ms)
    text,9.962
    csv,10.453
    binary,9.473

1M without this change:

    format,elapsed time (ms)
    text,103.265
    csv,109.789
    binary,104.078

1M with this change:

    format,elapsed time (ms)
    text,98.612
    csv,101.908
    binary,94.456

10M without this change:

    format,elapsed time (ms)
    text,1060.614
    csv,1065.272
    binary,1025.875

10M with this change:

    format,elapsed time (ms)
    text,1020.050
    csv,1031.279
    binary,954.792
---
 contrib/file_fdw/file_fdw.c     |   2 +-
 src/backend/commands/copy.c     |  71 ++--
 src/backend/commands/copyfrom.c |   2 +-
 src/backend/commands/copyto.c   | 551 +++++++++++++++++++++++---------
 src/include/commands/copy.h     |   8 +-
 src/include/commands/copyapi.h  |  48 +++
 6 files changed, 505 insertions(+), 177 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 249d82d3a0..9e4e819858 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -329,7 +329,7 @@ file_fdw_validator(PG_FUNCTION_ARGS)
     /*
      * Now apply the core COPY code's validation logic for more checks.
      */
-    ProcessCopyOptions(NULL, NULL, true, other_options);
+    ProcessCopyOptions(NULL, NULL, true, NULL, other_options);
 
     /*
      * Either filename or program option is required for file_fdw foreign
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cc0786c6f4..3676d1206d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -442,6 +442,9 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
  * a list of options.  In that usage, 'opts_out' can be passed as NULL and
  * the collected data is just leaked until CurrentMemoryContext is reset.
  *
+ * 'cstate' is CopyToState* for !is_from, CopyFromState* for is_from. 'cstate'
+ * may be NULL. For example, file_fdw uses NULL.
+ *
  * Note that additional checking, such as whether column names listed in FORCE
  * QUOTE actually exist, has to be applied later.  This just checks for
  * self-consistency of the options list.
@@ -450,6 +453,7 @@ void
 ProcessCopyOptions(ParseState *pstate,
                    CopyFormatOptions *opts_out,
                    bool is_from,
+                   void *cstate,
                    List *options)
 {
     bool        format_specified = false;
@@ -464,30 +468,54 @@ ProcessCopyOptions(ParseState *pstate,
 
     opts_out->file_encoding = -1;
 
-    /* Extract options from the statement node tree */
+    /*
+     * Extract only the "format" option to detect target routine as the first
+     * step
+     */
     foreach(option, options)
     {
         DefElem    *defel = lfirst_node(DefElem, option);
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-                opts_out->csv_mode = true;
-            else if (strcmp(fmt, "binary") == 0)
-                opts_out->binary = true;
+
+            if (is_from)
+            {
+                char       *fmt = defGetString(defel);
+
+                if (strcmp(fmt, "text") == 0)
+                     /* default format */ ;
+                else if (strcmp(fmt, "csv") == 0)
+                {
+                    opts_out->csv_mode = true;
+                }
+                else if (strcmp(fmt, "binary") == 0)
+                {
+                    opts_out->binary = true;
+                }
+                else
+                    ereport(ERROR,
+                            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                             errmsg("COPY format \"%s\" not recognized", fmt),
+                             parser_errposition(pstate, defel->location)));
+            }
             else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+                ProcessCopyOptionFormatTo(pstate, opts_out, defel);
         }
+    }
+    if (!format_specified)
+        /* Set the default format. */
+        ProcessCopyOptionFormatTo(pstate, opts_out, NULL);
+    /* Extract options except "format" from the statement node tree */
+    foreach(option, options)
+    {
+        DefElem    *defel = lfirst_node(DefElem, option);
+
+        if (strcmp(defel->defname, "format") == 0)
+            continue;
         else if (strcmp(defel->defname, "freeze") == 0)
         {
             if (freeze_specified)
@@ -616,11 +644,18 @@ ProcessCopyOptions(ParseState *pstate,
             opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from);
         }
         else
-            ereport(ERROR,
-                    (errcode(ERRCODE_SYNTAX_ERROR),
-                     errmsg("option \"%s\" not recognized",
-                            defel->defname),
-                     parser_errposition(pstate, defel->location)));
+        {
+            bool        processed = false;
+
+            if (!is_from)
+                processed = opts_out->to_routine->CopyToProcessOption(cstate, defel);
+            if (!processed)
+                ereport(ERROR,
+                        (errcode(ERRCODE_SYNTAX_ERROR),
+                         errmsg("option \"%s\" not recognized",
+                                defel->defname),
+                         parser_errposition(pstate, defel->location)));
+        }
     }
 
     /*
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 173a736ad5..05b3d13236 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1411,7 +1411,7 @@ BeginCopyFrom(ParseState *pstate,
     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
     /* Extract options from the statement node tree */
-    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
+    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , cstate, options);
 
     /* Process the target relation */
     cstate->rel = rel;
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index d3dc3fc854..52572585fa 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/defrem.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -131,6 +132,397 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToRoutine implementations.
+ */
+
+/*
+ * CopyToRoutine implementation for "text" and "csv". CopyToTextBased*() are
+ * shared by both of "text" and "csv". CopyToText*() are only for "text" and
+ * CopyToCSV*() are only for "csv".
+ *
+ * We can use the same functions for all callbacks by referring
+ * cstate->opts.csv_mode but splitting callbacks to eliminate "if
+ * (cstate->opts.csv_mode)" branches from all callbacks has performance
+ * merit when many tuples are copied. So we use separated callbacks for "text"
+ * and "csv".
+ */
+
+/*
+ * All "text" and "csv" options are parsed in ProcessCopyOptions(). We may
+ * move the code to here later.
+ */
+static bool
+CopyToTextBasedProcessOption(CopyToState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyToTextBasedGetFormat(CopyToState cstate)
+{
+    return 0;
+}
+
+static void
+CopyToTextBasedSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+    CopySendEndOfRow(cstate);
+}
+
+typedef void (*CopyAttributeOutHeaderFunction) (CopyToState cstate, char *string);
+
+/*
+ * We can use CopyAttributeOutText() directly but define this for consistency
+ * with CopyAttributeOutCSVHeader(). "static inline" will prevent performance
+ * penalty by this wrapping.
+ */
+static inline void
+CopyAttributeOutTextHeader(CopyToState cstate, char *string)
+{
+    CopyAttributeOutText(cstate, string);
+}
+
+static inline void
+CopyAttributeOutCSVHeader(CopyToState cstate, char *string)
+{
+    CopyAttributeOutCSV(cstate, string, false,
+                        list_length(cstate->attnumlist) == 1);
+}
+
+/*
+ * We don't use this function as a callback directly. We define
+ * CopyToTextStart() and CopyToCSVStart() and use them instead. It's for
+ * eliminating a "if (cstate->opts.csv_mode)" branch. This callback is called
+ * only once per COPY TO. So this optimization may be meaningless but done for
+ * consistency with CopyToTextBasedOneRow().
+ *
+ * This must initialize cstate->out_functions for CopyToTextBasedOneRow().
+ */
+static inline void
+CopyToTextBasedStart(CopyToState cstate, TupleDesc tupDesc, CopyAttributeOutHeaderFunction out)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            out(cstate, colname);
+        }
+
+        CopyToTextBasedSendEndOfRow(cstate);
+    }
+}
+
+static void
+CopyToTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    CopyToTextBasedStart(cstate, tupDesc, CopyAttributeOutTextHeader);
+}
+
+static void
+CopyToCSVStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    CopyToTextBasedStart(cstate, tupDesc, CopyAttributeOutCSVHeader);
+}
+
+typedef void (*CopyAttributeOutValueFunction) (CopyToState cstate, char *string, int attnum);
+
+static inline void
+CopyAttributeOutTextValue(CopyToState cstate, char *string, int attnum)
+{
+    CopyAttributeOutText(cstate, string);
+}
+
+static inline void
+CopyAttributeOutCSVValue(CopyToState cstate, char *string, int attnum)
+{
+    CopyAttributeOutCSV(cstate, string,
+                        cstate->opts.force_quote_flags[attnum - 1],
+                        list_length(cstate->attnumlist) == 1);
+}
+
+/*
+ * We don't use this function as a callback directly. We define
+ * CopyToTextOneRow() and CopyToCSVOneRow() and use them instead. It's for
+ * eliminating a "if (cstate->opts.csv_mode)" branch. This callback is called
+ * per tuple. So this optimization will be valuable when many tuples are
+ * copied.
+ *
+ * cstate->out_functions must be initialized in CopyToTextBasedStart().
+ */
+static void
+CopyToTextBasedOneRow(CopyToState cstate, TupleTableSlot *slot, CopyAttributeOutValueFunction out)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1], value);
+            out(cstate, string, attnum);
+        }
+    }
+
+    CopyToTextBasedSendEndOfRow(cstate);
+}
+
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextBasedOneRow(cstate, slot, CopyAttributeOutTextValue);
+}
+
+static void
+CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextBasedOneRow(cstate, slot, CopyAttributeOutCSVValue);
+}
+
+static void
+CopyToTextBasedEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToRoutine implementation for "binary".
+ */
+
+/*
+ * All "binary" options are parsed in ProcessCopyOptions(). We may move the
+ * code to here later.
+ */
+static bool
+CopyToBinaryProcessOption(CopyToState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyToBinaryGetFormat(CopyToState cstate)
+{
+    return 1;
+}
+
+/*
+ * This must initialize cstate->out_functions for CopyToBinaryOneRow().
+ */
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int            num_phys_attrs;
+    ListCell   *cur;
+
+    num_phys_attrs = tupDesc->natts;
+    /* Get info about the columns we need to process. */
+    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Oid            out_func_oid;
+        bool        isvarlena;
+        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+        getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+    }
+
+    {
+        /* Generate header for a binary copy */
+        int32        tmp;
+
+        /* Signature */
+        CopySendData(cstate, BinarySignature, 11);
+        /* Flags field */
+        tmp = 0;
+        CopySendInt32(cstate, tmp);
+        /* No header extension */
+        tmp = 0;
+        CopySendInt32(cstate, tmp);
+    }
+}
+
+/*
+ * cstate->out_functions must be initialized in CopyToBinaryStart().
+ */
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextBased*() are shared with "csv". CopyToText*() are only for "text".
+ */
+static const CopyToRoutine CopyToRoutineText = {
+    .CopyToProcessOption = CopyToTextBasedProcessOption,
+    .CopyToGetFormat = CopyToTextBasedGetFormat,
+    .CopyToStart = CopyToTextStart,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextBasedEnd,
+};
+
+/*
+ * CopyToTextBased*() are shared with "text". CopyToCSV*() are only for "csv".
+ */
+static const CopyToRoutine CopyToRoutineCSV = {
+    .CopyToProcessOption = CopyToTextBasedProcessOption,
+    .CopyToGetFormat = CopyToTextBasedGetFormat,
+    .CopyToStart = CopyToCSVStart,
+    .CopyToOneRow = CopyToCSVOneRow,
+    .CopyToEnd = CopyToTextBasedEnd,
+};
+
+static const CopyToRoutine CopyToRoutineBinary = {
+    .CopyToProcessOption = CopyToBinaryProcessOption,
+    .CopyToGetFormat = CopyToBinaryGetFormat,
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
+
+/*
+ * Process the "format" option for COPY TO.
+ *
+ * If defel is NULL, the default format "text" is used.
+ */
+void
+ProcessCopyOptionFormatTo(ParseState *pstate,
+                          CopyFormatOptions *opts_out,
+                          DefElem *defel)
+{
+    char       *format;
+
+    if (defel)
+        format = defGetString(defel);
+    else
+        format = "text";
+
+    if (strcmp(format, "text") == 0)
+        opts_out->to_routine = &CopyToRoutineText;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        opts_out->to_routine = &CopyToRoutineCSV;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        opts_out->to_routine = &CopyToRoutineBinary;
+    }
+    else
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -141,7 +533,7 @@ SendCopyBegin(CopyToState cstate)
 {
     StringInfoData buf;
     int            natts = list_length(cstate->attnumlist);
-    int16        format = (cstate->opts.binary ? 1 : 0);
+    int16        format = cstate->opts.to_routine->CopyToGetFormat(cstate);
     int            i;
 
     pq_beginmessage(&buf, PqMsg_CopyOutResponse);
@@ -198,16 +590,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -242,10 +624,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -431,7 +809,7 @@ BeginCopyTo(ParseState *pstate,
     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
     /* Extract options from the statement node tree */
-    ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
+    ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , cstate, options);
 
     /* Process the source/target relation or query */
     if (rel)
@@ -748,8 +1126,6 @@ DoCopyTo(CopyToState cstate)
     bool        pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
     bool        fe_copy = (pipe && whereToSendOutput == DestRemote);
     TupleDesc    tupDesc;
-    int            num_phys_attrs;
-    ListCell   *cur;
     uint64        processed;
 
     if (fe_copy)
@@ -759,32 +1135,11 @@ DoCopyTo(CopyToState cstate)
         tupDesc = RelationGetDescr(cstate->rel);
     else
         tupDesc = cstate->queryDesc->tupDesc;
-    num_phys_attrs = tupDesc->natts;
     cstate->opts.null_print_client = cstate->opts.null_print;    /* default */
 
     /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
     cstate->fe_msgbuf = makeStringInfo();
 
-    /* Get info about the columns we need to process. */
-    cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
-        Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
-        if (cstate->opts.binary)
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-        else
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-    }
-
     /*
      * Create a temporary memory context that we can reset once per row to
      * recover palloc'd memory.  This avoids any problems with leaks inside
@@ -795,57 +1150,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false,
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->opts.to_routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -884,13 +1189,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
+    cstate->opts.to_routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -906,71 +1205,15 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    bool        need_delim = false;
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
-    ListCell   *cur;
-    char       *string;
 
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Datum        value = slot->tts_values[attnum - 1];
-        bool        isnull = slot->tts_isnull[attnum - 1];
-
-        if (!cstate->opts.binary)
-        {
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-        }
-
-        if (isnull)
-        {
-            if (!cstate->opts.binary)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-                CopySendInt32(cstate, -1);
-        }
-        else
-        {
-            if (!cstate->opts.binary)
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1],
-                                        list_length(cstate->attnumlist) == 1);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-            else
-            {
-                bytea       *outputbytes;
-
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->opts.to_routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b3da3cb0be..9abd7fe538 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,6 +14,7 @@
 #ifndef COPY_H
 #define COPY_H
 
+#include "commands/copyapi.h"
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
@@ -74,11 +75,11 @@ typedef struct CopyFormatOptions
     bool        convert_selectively;    /* do selective binary conversion? */
     CopyOnErrorChoice on_error; /* what to do when error happened */
     List       *convert_select; /* list of column names (can be NIL) */
+    const        CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
+/* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
@@ -87,7 +88,8 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
 
-extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options);
+extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List
*options);
+extern void ProcessCopyOptionFormatTo(ParseState *pstate, CopyFormatOptions *opts_out, DefElem *defel);
 extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause,
                                    const char *filename,
                                    bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List
*options);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 0000000000..ed52ce5f49
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,48 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO/FROM handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/parsenodes.h"
+
+/* This is private in commands/copyto.c */
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called for processing one COPY TO option. This will return false when
+     * the given option is invalid.
+     */
+    bool        (*CopyToProcessOption) (CopyToState cstate, DefElem *defel);
+
+    /*
+     * Called when COPY TO is started. This will return a format as int16
+     * value. It's used for the CopyOutResponse message.
+     */
+    int16        (*CopyToGetFormat) (CopyToState cstate);
+
+    /* Called when COPY TO is started. This will send a header. */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /* Copy one row for COPY TO. */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO is ended. This will send a trailer. */
+    void        (*CopyToEnd) (CopyToState cstate);
+}            CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
-- 
2.43.0

From c956816bed5c6ac4366ad4ae839d8e38f5ae4d7e Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Fri, 26 Jan 2024 17:21:53 +0900
Subject: [PATCH v7 2/2] Extract COPY FROM format implementations

This doesn't change the current behavior. This just introduces
CopyFromRoutine, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.

Note that CopyFromRoutine can't be used from extensions yet because
CopyRead*() aren't exported yet. Extensions can't read data from a
source without CopyRead*(). They will be exported by subsequent
patches.
---
 src/backend/commands/copy.c              |  33 +-
 src/backend/commands/copyfrom.c          | 270 +++++++++++++---
 src/backend/commands/copyfromparse.c     | 382 +++++++++++++----------
 src/include/commands/copy.h              |   6 +-
 src/include/commands/copyapi.h           |  32 ++
 src/include/commands/copyfrom_internal.h |   4 +
 6 files changed, 490 insertions(+), 237 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3676d1206d..489de4ab8d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -483,32 +483,19 @@ ProcessCopyOptions(ParseState *pstate,
             format_specified = true;
 
             if (is_from)
-            {
-                char       *fmt = defGetString(defel);
-
-                if (strcmp(fmt, "text") == 0)
-                     /* default format */ ;
-                else if (strcmp(fmt, "csv") == 0)
-                {
-                    opts_out->csv_mode = true;
-                }
-                else if (strcmp(fmt, "binary") == 0)
-                {
-                    opts_out->binary = true;
-                }
-                else
-                    ereport(ERROR,
-                            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                             errmsg("COPY format \"%s\" not recognized", fmt),
-                             parser_errposition(pstate, defel->location)));
-            }
+                ProcessCopyOptionFormatFrom(pstate, opts_out, defel);
             else
                 ProcessCopyOptionFormatTo(pstate, opts_out, defel);
         }
     }
     if (!format_specified)
+    {
         /* Set the default format. */
-        ProcessCopyOptionFormatTo(pstate, opts_out, NULL);
+        if (is_from)
+            ProcessCopyOptionFormatFrom(pstate, opts_out, NULL);
+        else
+            ProcessCopyOptionFormatTo(pstate, opts_out, NULL);
+    }
     /* Extract options except "format" from the statement node tree */
     foreach(option, options)
     {
@@ -645,9 +632,11 @@ ProcessCopyOptions(ParseState *pstate,
         }
         else
         {
-            bool        processed = false;
+            bool        processed;
 
-            if (!is_from)
+            if (is_from)
+                processed = opts_out->from_routine->CopyFromProcessOption(cstate, defel);
+            else
                 processed = opts_out->to_routine->CopyToProcessOption(cstate, defel);
             if (!processed)
                 ereport(ERROR,
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 05b3d13236..498d7bc5ad 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -32,6 +32,7 @@
 #include "catalog/namespace.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/defrem.h"
 #include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
@@ -108,6 +109,223 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+
+/*
+ * CopyFromRoutine implementations.
+ */
+
+/*
+ * CopyFromRoutine implementation for "text" and "csv". CopyFromTextBased*()
+ * are shared by both of "text" and "csv". CopyFromText*() are only for "text"
+ * and CopyFromCSV*() are only for "csv".
+ *
+ * We can use the same functions for all callbacks by referring
+ * cstate->opts.csv_mode but splitting callbacks to eliminate "if
+ * (cstate->opts.csv_mode)" branches from all callbacks has performance merit
+ * when many tuples are copied. So we use separated callbacks for "text" and
+ * "csv".
+ */
+
+/*
+ * All "text" and "csv" options are parsed in ProcessCopyOptions(). We may
+ * move the code to here later.
+ */
+static bool
+CopyFromTextBasedProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyFromTextBasedGetFormat(CopyFromState cstate)
+{
+    return 0;
+}
+
+/*
+ * This must initialize cstate->in_functions for CopyFromTextBasedOneRow().
+ */
+static void
+CopyFromTextBasedStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    num_phys_attrs = tupDesc->natts;
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Pick up the required catalog information for each attribute in the
+     * relation, including the input function, the element type (to pass to
+     * the input function).
+     */
+    cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+    for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+    {
+        Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+        Oid            in_func_oid;
+
+        /* We don't need info for dropped attributes */
+        if (att->attisdropped)
+            continue;
+
+        /* Fetch the input function and typioparam info */
+        getTypeInputInfo(att->atttypid,
+                         &in_func_oid, &cstate->typioparams[attnum - 1]);
+        fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]);
+    }
+
+    /* create workspace for CopyReadAttributes results */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+static void
+CopyFromTextBasedEnd(CopyFromState cstate)
+{
+}
+
+/*
+ * CopyFromRoutine implementation for "binary".
+ */
+
+/*
+ * All "binary" options are parsed in ProcessCopyOptions(). We may move the
+ * code to here later.
+ */
+static bool
+CopyFromBinaryProcessOption(CopyFromState cstate, DefElem *defel)
+{
+    return false;
+}
+
+static int16
+CopyFromBinaryGetFormat(CopyFromState cstate)
+{
+    return 1;
+}
+
+/*
+ * This must initialize cstate->in_functions for CopyFromBinaryOneRow().
+ */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    num_phys_attrs = tupDesc->natts;
+
+    /*
+     * Pick up the required catalog information for each attribute in the
+     * relation, including the input function, the element type (to pass to
+     * the input function).
+     */
+    cstate->in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+    cstate->typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+    for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+    {
+        Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+        Oid            in_func_oid;
+
+        /* We don't need info for dropped attributes */
+        if (att->attisdropped)
+            continue;
+
+        /* Fetch the input function and typioparam info */
+        getTypeBinaryInputInfo(att->atttypid,
+                               &in_func_oid, &cstate->typioparams[attnum - 1]);
+        fmgr_info(in_func_oid, &cstate->in_functions[attnum - 1]);
+    }
+
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+}
+
+/*
+ * CopyFromTextBased*() are shared with "csv". CopyFromText*() are only for "text".
+ */
+static const CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromProcessOption = CopyFromTextBasedProcessOption,
+    .CopyFromGetFormat = CopyFromTextBasedGetFormat,
+    .CopyFromStart = CopyFromTextBasedStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextBasedEnd,
+};
+
+/*
+ * CopyFromTextBased*() are shared with "text". CopyFromCSV*() are only for "csv".
+ */
+static const CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromProcessOption = CopyFromTextBasedProcessOption,
+    .CopyFromGetFormat = CopyFromTextBasedGetFormat,
+    .CopyFromStart = CopyFromTextBasedStart,
+    .CopyFromOneRow = CopyFromCSVOneRow,
+    .CopyFromEnd = CopyFromTextBasedEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromProcessOption = CopyFromBinaryProcessOption,
+    .CopyFromGetFormat = CopyFromBinaryGetFormat,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/*
+ * Process the "format" option for COPY FROM.
+ *
+ * If defel is NULL, the default format "text" is used.
+ */
+void
+ProcessCopyOptionFormatFrom(ParseState *pstate,
+                            CopyFormatOptions *opts_out,
+                            DefElem *defel)
+{
+    char       *format;
+
+    if (defel)
+        format = defGetString(defel);
+    else
+        format = "text";
+
+    if (strcmp(format, "text") == 0)
+        opts_out->from_routine = &CopyFromRoutineText;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        opts_out->from_routine = &CopyFromRoutineCSV;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        opts_out->from_routine = &CopyFromRoutineBinary;
+    }
+    else
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+}
+
+
 /*
  * error context callback for COPY FROM
  *
@@ -1379,9 +1597,6 @@ BeginCopyFrom(ParseState *pstate,
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
                 num_defaults;
-    FmgrInfo   *in_functions;
-    Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1566,25 +1781,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1603,8 +1799,6 @@ BeginCopyFrom(ParseState *pstate,
      * the input function), and info about defaults and constraints. (Which
      * input function we use depends on text/binary format choice.)
      */
-    in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-    typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
 
@@ -1616,15 +1810,6 @@ BeginCopyFrom(ParseState *pstate,
         if (att->attisdropped)
             continue;
 
-        /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-        else
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
 
@@ -1684,8 +1869,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->bytes_processed = 0;
 
     /* We keep those variables in cstate. */
-    cstate->in_functions = in_functions;
-    cstate->typioparams = typioparams;
     cstate->defmap = defmap;
     cstate->defexprs = defexprs;
     cstate->volatile_defexprs = volatile_defexprs;
@@ -1758,20 +1941,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
-    {
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->opts.from_routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1784,6 +1954,8 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    cstate->opts.from_routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 7cacd0b752..856ba261e1 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -172,7 +172,7 @@ ReceiveCopyBegin(CopyFromState cstate)
 {
     StringInfoData buf;
     int            natts = list_length(cstate->attnumlist);
-    int16        format = (cstate->opts.binary ? 1 : 0);
+    int16        format = cstate->opts.from_routine->CopyFromGetFormat(cstate);
     int            i;
 
     pq_beginmessage(&buf, PqMsg_CopyInResponse);
@@ -840,6 +840,221 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     return true;
 }
 
+typedef char *(*PostpareColumnValue) (CopyFromState cstate, char *string, int m);
+
+static inline char *
+PostpareColumnValueText(CopyFromState cstate, char *string, int m)
+{
+    /* do nothing */
+    return string;
+}
+
+static inline char *
+PostpareColumnValueCSV(CopyFromState cstate, char *string, int m)
+{
+    if (string == NULL &&
+        cstate->opts.force_notnull_flags[m])
+    {
+        /*
+         * FORCE_NOT_NULL option is set and column is NULL - convert it to the
+         * NULL string.
+         */
+        string = cstate->opts.null_print;
+    }
+    else if (string != NULL && cstate->opts.force_null_flags[m]
+             && strcmp(string, cstate->opts.null_print) == 0)
+    {
+        /*
+         * FORCE_NULL option is set and column matches the NULL string. It
+         * must have been quoted, or otherwise the string would already have
+         * been set to NULL. Convert it to NULL as specified.
+         */
+        string = NULL;
+    }
+    return string;
+}
+
+/*
+ * We don't use this function as a callback directly. We define
+ * CopyFromTextOneRow() and CopyFromCSVOneRow() and use them instead. It's for
+ * eliminating a "if (cstate->opts.csv_mode)" branch. This callback is called
+ * per tuple. So this optimization will be valuable when many tuples are
+ * copied.
+ *
+ * cstate->in_functions must be initialized in CopyFromTextBasedStart().
+ */
+static inline bool
+CopyFromTextBasedOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls, PostpareColumnValue
postpare_column_value)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        string = postpare_column_value(cstate, string, m);
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            cstate->num_errors++;
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    return CopyFromTextBasedOneRow(cstate, econtext, values, nulls, PostpareColumnValueText);
+}
+
+bool
+CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    return CopyFromTextBasedOneRow(cstate, econtext, values, nulls, PostpareColumnValueCSV);
+}
+
+/*
+ * cstate->in_functions must be initialized in CopyFromBinaryStart().
+ */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -857,181 +1072,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                cstate->num_errors++;
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    if (!cstate->opts.from_routine->CopyFromOneRow(cstate, econtext, values,
+                                                   nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 9abd7fe538..107642ef7a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -75,12 +75,11 @@ typedef struct CopyFormatOptions
     bool        convert_selectively;    /* do selective binary conversion? */
     CopyOnErrorChoice on_error; /* what to do when error happened */
     List       *convert_select; /* list of column names (can be NIL) */
+    const        CopyFromRoutine *from_routine;    /* callback routines for COPY
+                                                 * FROM */
     const        CopyToRoutine *to_routine;    /* callback routines for COPY TO */
 } CopyFormatOptions;
 
-/* This is private in commands/copyfrom.c */
-typedef struct CopyFromStateData *CopyFromState;
-
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
 
@@ -89,6 +88,7 @@ extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    uint64 *processed);
 
 extern void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, void *cstate, List
*options);
+extern void ProcessCopyOptionFormatFrom(ParseState *pstate, CopyFormatOptions *opts_out, DefElem *defel);
 extern void ProcessCopyOptionFormatTo(ParseState *pstate, CopyFormatOptions *opts_out, DefElem *defel);
 extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause,
                                    const char *filename,
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index ed52ce5f49..9f82cc0876 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -15,8 +15,40 @@
 #define COPYAPI_H
 
 #include "executor/tuptable.h"
+#include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 
+/* This is private in commands/copyfrom.c */
+typedef struct CopyFromStateData *CopyFromState;
+
+/* Routines for a COPY FROM format implementation. */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called for processing one COPY FROM option. This will return false when
+     * the given option is invalid.
+     */
+    bool        (*CopyFromProcessOption) (CopyFromState cstate, DefElem *defel);
+
+    /*
+     * Called when COPY FROM is started. This will return a format as int16
+     * value. It's used for the CopyInResponse message.
+     */
+    int16        (*CopyFromGetFormat) (CopyFromState cstate);
+
+    /*
+     * Called when COPY FROM is started. This will initialize something and
+     * receive a header.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /* Copy one row. It returns false if no more tuples. */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+
+    /* Called when COPY FROM is ended. This will finalize something. */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+}            CopyFromRoutine;
+
 /* This is private in commands/copyto.c */
 typedef struct CopyToStateData *CopyToState;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..096b55011e 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -183,4 +183,8 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls);
+
 #endif                            /* COPYFROM_INTERNAL_H */
-- 
2.43.0


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Junwang Zhao
Дата:
Сообщение: Re: Make COPY format extendable: Extract COPY TO format implementations
Следующее
От: Sutou Kouhei
Дата:
Сообщение: Re: Make COPY format extendable: Extract COPY TO format implementations