Re: Make COPY format extendable: Extract COPY TO format implementations

Поиск
Список
Период
Сортировка
От Sutou Kouhei
Тема Re: Make COPY format extendable: Extract COPY TO format implementations
Дата
Msg-id 20240724.173059.909782980111496972.kou@clear-code.com
обсуждение исходный текст
Ответ на Re: Make COPY format extendable: Extract COPY TO format implementations  (Tomas Vondra <tomas.vondra@enterprisedb.com>)
Ответы Re: Make COPY format extendable: Extract COPY TO format implementations
Список pgsql-hackers
Hi,

In <9172d4eb-6de0-4c6d-beab-8210b7a2219b@enterprisedb.com>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Mon, 22 Jul 2024 14:36:40 +0200,
  Tomas Vondra <tomas.vondra@enterprisedb.com> wrote:

> Thanks for the summary/responses. I still think it'd be better to post a
> summary as a separate message, not as yet another post responding to
> someone else. If I was reading the thread, I would not have noticed this
> is meant to be a summary. I'd even consider putting a "THREAD SUMMARY"
> title on the first line, or something like that. Up to you, of course.

It makes sense. I'll do it as a separated e-mail.

> My suggestions would be to maintain this as a series of patches, making
> incremental changes, with the "more complex" or "more experimental"
> parts larger in the series. For example, I can imagine doing this:
> 
> 0001 - minimal version of the patch (e.g. current v17)
> 0002 - switch existing formats to the new interface
> 0003 - extend the interface to add bits needed for columnar formats
> 0004 - add DML to create/alter/drop custom implementations
> 0005 - minimal patch with extension adding support for Arrow
> 
> Or something like that. The idea is that we still have a coherent story
> of what we're trying to do, and can discuss the incremental changes
> (easier than looking at a large patch). It's even possible to commit
> earlier parts before the later parts are quite cleanup up for commit.
> And some changes changes may not be even meant for commit (e.g. the
> extension) but as guidance / validation for the earlier parts.

OK. I attach the v18 patch set:

0001: add a basic feature (Copy{From,To}Routine)
      (same as the v17 but it's based on the current master)
0002: use Copy{From,To}Rountine for the existing formats
      (this may not be committed because there is a
      profiling related concern)
0003: add support for specifying custom format by "COPY
      ... WITH (format 'my-format')"
      (this also has a test)
0004: export Copy{From,To}StateData
      (but this isn't enough to implement custom COPY
      FROM/TO handlers as an extension)
0005: add opaque member to Copy{From,To}StateData and export
      some functions to read the next data and flush the buffer
      (we can implement a PoC Apache Arrow COPY FROM/TO
      handler as an extension with this)

https://github.com/kou/pg-copy-arrow is a PoC Apache Arrow
COPY FROM/TO handler as an extension.


Notes:

* 0002: We use "static inline" and "constant argument" for
  optimization.
* 0002: This hides NextCopyFromRawFields() in a public
  header because it's not used in PostgreSQL and we want to
  use "static inline" for it. If it's a problem, we can keep
  it and create an internal function for "static inline".
* 0003: We use "CREATE FUNCTION" to register a custom COPY
  FROM/TO handler. It's the same approach as tablesample.
* 0004 and 0005: We can mix them but this patch set split
  them for easy to review. 0004 just moves the existing
  codes. It doesn't change the existing codes.
* PoC: I provide it as a separated repository instead of a
  patch because an extension exists as a separated project
  in general. If it's a problem, I can provide it as a patch
  for contrib/.
* This patch set still has minimal Copy{From,To}Routine. For
  example, custom COPY FROM/TO handlers can't process their
  own options with this patch set. We may add more callbacks
  to Copy{From,To}Routine later based on real world use-cases.

> Unfortunately, there's not much information about what exactly the tests
> did, context (hardware, ...). So I don't know, really. But if you share
> enough information on how to reproduce this, I'm willing to take a look
> and investigate.

Thanks. Here is related information based on the past
e-mails from Michael:

* Use -O2 for optimization build flag
  ("meson setup --buildtype=release" may be used)
* Use tmpfs for PGDATA
* Disable fsync
* Run on scissors (what is "scissors" in this context...?)
  https://www.postgresql.org/message-id/flat/Zbr6piWuVHDtFFOl%40paquier.xyz#dbbec4d5c54ef2317be01a54abaf495c
* Unlogged table may be used
* Use a table that has 30 integer columns (*1)
* Use 5M rows (*2)
* Use '/dev/null' for COPY TO (*3)
* Use blackhole_am for COPY FROM (*4)
  https://github.com/michaelpq/pg_plugins/tree/main/blackhole_am
* perf is used but used options are unknown (sorry)

(*1) This SQL may be used to create the table:

CREATE OR REPLACE FUNCTION create_table_cols(tabname text, num_cols int)
RETURNS VOID AS
$func$
DECLARE
  query text;
BEGIN
  query := 'CREATE UNLOGGED TABLE ' || tabname || ' (';
  FOR i IN 1..num_cols LOOP
    query := query || 'a_' || i::text || ' int default 1';
    IF i != num_cols THEN
      query := query || ', ';
    END IF;
  END LOOP;
  query := query || ')';
  EXECUTE format(query);
END
$func$ LANGUAGE plpgsql;
SELECT create_table_cols ('to_tab_30', 30);
SELECT create_table_cols ('from_tab_30', 30);

(*2) This SQL may be used to insert 5M rows:

INSERT INTO to_tab_30 SELECT FROM generate_series(1, 5000000);

(*3) This SQL may be used for COPY TO:

COPY to_tab_30 TO '/dev/null' WITH (FORMAT text);

(*4) This SQL may be used for COPY FROM:

CREATE EXTENSION blackhole_am;
ALTER TABLE from_tab_30 SET ACCESS METHOD blackhole_am;
COPY to_tab_30 TO '/tmp/to_tab_30.txt' WITH (FORMAT text);
COPY from_tab_30 FROM '/tmp/to_tab_30.txt' WITH (FORMAT text);


If there is enough information, could you try?


Thanks,
-- 
kou
From 22daacbd77c6dd0e13fe11e30fba90f7595ff6c1 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Mar 2024 13:52:34 +0900
Subject: [PATCH v18 1/5] Add CopyFromRoutine/CopyToRountine

They are for implementing custom COPY FROM/TO format. But this is not
enough to implement custom COPY FROM/TO format yet. We'll export some
APIs to receive/send data and add "format" option to COPY FROM/TO
later.

Existing text/csv/binary format implementations don't use
CopyFromRoutine/CopyToRoutine for now. We have a patch for it but we
defer it. Because there are some mysterious profile results in spite
of we get faster runtimes. See [1] for details.

[1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz

Note that this doesn't change existing text/csv/binary format
implementations.
---
 src/backend/commands/copyfrom.c          |  24 +++++-
 src/backend/commands/copyfromparse.c     |   5 ++
 src/backend/commands/copyto.c            |  31 ++++++-
 src/include/commands/copyapi.h           | 100 +++++++++++++++++++++++
 src/include/commands/copyfrom_internal.h |   4 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 158 insertions(+), 8 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ce4d62e707c..ff13b3e3592 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1618,12 +1618,22 @@ BeginCopyFrom(ParseState *pstate,
 
         /* Fetch the input function and typioparam info */
         if (cstate->opts.binary)
+        {
             getTypeBinaryInputInfo(att->atttypid,
                                    &in_func_oid, &typioparams[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                            &in_functions[attnum - 1],
+                                            &typioparams[attnum - 1]);
+
         else
+        {
             getTypeInputInfo(att->atttypid,
                              &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1763,10 +1773,13 @@ BeginCopyFrom(ParseState *pstate,
         /* Read and verify binary header */
         ReceiveCopyBinaryHeader(cstate);
     }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
+    else if (cstate->routine)
     {
+        cstate->routine->CopyFromStart(cstate, tupDesc);
+    }
+    else
+    {
+        /* create workspace for CopyReadAttributes results */
         AttrNumber    attr_count = list_length(cstate->attnumlist);
 
         cstate->max_fields = attr_count;
@@ -1784,6 +1797,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    if (cstate->routine)
+        cstate->routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 7efcb891598..92b8d5e72d5 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -1012,6 +1012,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 
         Assert(fieldno == attr_count);
     }
+    else if (cstate->routine)
+    {
+        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+            return false;
+    }
     else
     {
         /* binary */
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index ae8b2e36d72..ff19c457abf 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -20,6 +20,7 @@
 
 #include "access/tableam.h"
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -64,6 +65,9 @@ typedef enum CopyDest
  */
 typedef struct CopyToStateData
 {
+    /* format routine */
+    const CopyToRoutine *routine;
+
     /* low-level state data */
     CopyDest    copy_dest;        /* type of copy source/destination */
     FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
@@ -771,14 +775,22 @@ DoCopyTo(CopyToState cstate)
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
         if (cstate->opts.binary)
+        {
             getTypeBinaryOutputInfo(attr->atttypid,
                                     &out_func_oid,
                                     &isvarlena);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                           &cstate->out_functions[attnum - 1]);
         else
+        {
             getTypeOutputInfo(attr->atttypid,
                               &out_func_oid,
                               &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
     }
 
     /*
@@ -805,6 +817,8 @@ DoCopyTo(CopyToState cstate)
         tmp = 0;
         CopySendInt32(cstate, tmp);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToStart(cstate, tupDesc);
     else
     {
         /*
@@ -886,6 +900,8 @@ DoCopyTo(CopyToState cstate)
         /* Need to flush out the trailer */
         CopySendEndOfRow(cstate);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -910,15 +926,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
+    /* Make sure the tuple is fully deconstructed */
+    slot_getallattrs(slot);
+
+    if (cstate->routine)
+    {
+        cstate->routine->CopyToOneRow(cstate, slot);
+        MemoryContextSwitchTo(oldcontext);
+        return;
+    }
+
     if (cstate->opts.binary)
     {
         /* Binary per-tuple header */
         CopySendInt16(cstate, list_length(cstate->attnumlist));
     }
 
-    /* Make sure the tuple is fully deconstructed */
-    slot_getallattrs(slot);
-
     foreach(cur, cstate->attnumlist)
     {
         int            attnum = lfirst_int(cur);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 00000000000..635c4cbff27
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,100 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO/FROM handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/*
+ * API structure for a COPY FROM format implementation.  Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called when COPY FROM is started to set up the input functions
+     * associated to the relation's attributes writing to.  `finfo` can be
+     * optionally filled to provide the catalog information of the input
+     * function.  `typioparam` can be optionally filled to define the OID of
+     * the type to pass to the input function.  `atttypid` is the OID of data
+     * type used by the relation's attribute.
+     */
+    void        (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                   FmgrInfo *finfo, Oid *typioparam);
+
+    /*
+     * Called when COPY FROM is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation where the data needs
+     * to be copied.  This can be used for any initialization steps required
+     * by a format.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row to a set of `values` and `nulls` of size tupDesc->natts.
+     *
+     * 'econtext' is used to evaluate default expression for each column that
+     * is either not read from the file or is using the DEFAULT option of COPY
+     * FROM.  It is NULL if no default values are used.
+     *
+     * Returns false if there are no more tuples to copy.
+     */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                   Datum *values, bool *nulls);
+
+    /* Called when COPY FROM has ended. */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
+/*
+ * API structure for a COPY TO format implementation.   Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called when COPY TO is started to set up the output functions
+     * associated to the relation's attributes reading from.  `finfo` can be
+     * optionally filled.  `atttypid` is the OID of data type used by the
+     * relation's attribute.
+     */
+    void        (*CopyToOutFunc) (CopyToState cstate, Oid atttypid,
+                                  FmgrInfo *finfo);
+
+    /*
+     * Called when COPY TO is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation from where the data
+     * is read.
+     */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row for COPY TO.
+     *
+     * `slot` is the tuple slot where the data is emitted.
+     */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO has ended */
+    void        (*CopyToEnd) (CopyToState cstate);
+} CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc783..509b9e92a18 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
@@ -58,6 +59,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+    /* format routine */
+    const CopyFromRoutine *routine;
+
     /* low-level state data */
     CopySource    copy_src;        /* type of copy source */
     FILE       *copy_file;        /* used if copy_src == COPY_FILE */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b4d7f9217ce..3ce855c8f17 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -490,6 +490,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice
@@ -501,6 +502,7 @@ CopyMultiInsertInfo
 CopyOnErrorChoice
 CopySource
 CopyStmt
+CopyToRoutine
 CopyToState
 CopyToStateData
 Cost
-- 
2.45.2

From ace816c9ef7b1dceed35d7cf18b82e70fa9143e6 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jul 2024 16:44:44 +0900
Subject: [PATCH v18 2/5] Use CopyFromRoutine/CopyToRountine for the existing
 formats

The existing formats are text, csv and binary. If we find any
performance regression by this, we will not merge this to master.

This will increase indirect function call costs but this will reduce
runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)"
branch costs.

This uses an optimization based of static inline function and a
constant argument call for cstate->opts.csv_mode. For example,
CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool
is_csv" argument instead of using cstate->opts.csv_mode in
it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with
false (constant) for "bool is_csv". Compiler will remove "if (is_csv)"
branch in it by this optimization.

This doesn't change existing logic. This just moves existing codes.
---
 src/backend/commands/copyfrom.c          | 215 ++++++---
 src/backend/commands/copyfromparse.c     | 556 +++++++++++++----------
 src/backend/commands/copyto.c            | 480 ++++++++++++-------
 src/include/commands/copy.h              |   2 -
 src/include/commands/copyfrom_internal.h |   8 +
 5 files changed, 813 insertions(+), 448 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ff13b3e3592..1a59202f5ab 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -103,6 +103,157 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+
+/*
+ * CopyFromRoutine implementations for text and CSV.
+ */
+
+/*
+ * CopyFromTextLikeInFunc
+ *
+ * Assign input function data for a relation's attribute in text/CSV format.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid,
+                       FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyFromTextLikeStart
+ *
+ * Start of COPY FROM for text/CSV format.
+ */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Create workspace for CopyReadAttributes results; used by CSV and text
+     * format.
+     */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * CopyFromTextLikeEnd
+ *
+ * End of COPY FROM for text/CSV format.
+ */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/*
+ * CopyFromRoutine implementation for "binary".
+ */
+
+/*
+ * CopyFromBinaryInFunc
+ *
+ * Assign input function data for a relation's attribute in binary format.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                     FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyFromBinaryStart
+ *
+ * Start of COPY FROM for binary format.
+ */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * CopyFromBinaryEnd
+ *
+ * End of COPY FROM for binary format.
+ */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/*
+ * Routines assigned to each format.
++
+ * CSV and text share the same implementation, at the exception of the
+ * per-row callback.
+ */
+static const CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromCSVOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromInFunc = CopyFromBinaryInFunc,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/*
+ * Define the COPY FROM routines to use for a format.
+ */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyFromRoutineCSV;
+    else if (opts.binary)
+        return &CopyFromRoutineBinary;
+
+    /* default is text */
+    return &CopyFromRoutineText;
+}
+
+
 /*
  * error context callback for COPY FROM
  *
@@ -1381,7 +1532,6 @@ BeginCopyFrom(ParseState *pstate,
                 num_defaults;
     FmgrInfo   *in_functions;
     Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1413,6 +1563,9 @@ BeginCopyFrom(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyFromGetRoutine(cstate->opts);
+
     /* Process the target relation */
     cstate->rel = rel;
 
@@ -1566,25 +1719,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1617,23 +1751,9 @@ BeginCopyFrom(ParseState *pstate,
             continue;
 
         /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-        {
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-        }
-        else if (cstate->routine)
-            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
-                                            &in_functions[attnum - 1],
-                                            &typioparams[attnum - 1]);
-
-        else
-        {
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-        }
+        cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                        &in_functions[attnum - 1],
+                                        &typioparams[attnum - 1]);
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1768,23 +1888,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-    else if (cstate->routine)
-    {
-        cstate->routine->CopyFromStart(cstate, tupDesc);
-    }
-    else
-    {
-        /* create workspace for CopyReadAttributes results */
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1797,8 +1901,7 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
-    if (cstate->routine)
-        cstate->routine->CopyFromEnd(cstate);
+    cstate->routine->CopyFromEnd(cstate);
 
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 92b8d5e72d5..90824b47785 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -149,10 +149,10 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
-static int    CopyReadAttributesText(CopyFromState cstate);
-static int    CopyReadAttributesCSV(CopyFromState cstate);
+static inline bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static inline bool CopyReadLineText(CopyFromState cstate, bool is_csv);
+static inline int CopyReadAttributesText(CopyFromState cstate);
+static inline int CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
                                      Oid typioparam, int32 typmod,
                                      bool *isnull);
@@ -750,8 +750,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  *
  * NOTE: force_not_null option are not applied to the returned fields.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
     int            fldct;
     bool        done;
@@ -768,13 +768,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
         tupDesc = RelationGetDescr(cstate->rel);
 
         cstate->cur_lineno++;
-        done = CopyReadLine(cstate);
+        done = CopyReadLine(cstate, is_csv);
 
         if (cstate->opts.header_line == COPY_HEADER_MATCH)
         {
             int            fldnum;
 
-            if (cstate->opts.csv_mode)
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
                 fldct = CopyReadAttributesCSV(cstate);
             else
                 fldct = CopyReadAttributesText(cstate);
@@ -818,7 +822,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     cstate->cur_lineno++;
 
     /* Actually read the line into memory here */
-    done = CopyReadLine(cstate);
+    done = CopyReadLine(cstate, is_csv);
 
     /*
      * EOF at start of line means we're done.  If we see EOF after some
@@ -828,8 +832,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     if (done && cstate->line_buf.len == 0)
         return false;
 
-    /* Parse the line into de-escaped field values */
-    if (cstate->opts.csv_mode)
+    /*
+     * Parse the line into de-escaped field values
+     *
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
         fldct = CopyReadAttributesCSV(cstate);
     else
         fldct = CopyReadAttributesText(cstate);
@@ -839,6 +848,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     return true;
 }
 
+/*
+ * CopyFromTextLikeOneRow
+ *
+ * Copy one row to a set of `values` and `nulls` for the text and CSV
+ * formats.
+ *
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ */
+static inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate,
+                       ExprContext *econtext,
+                       Datum *values,
+                       bool *nulls,
+                       bool is_csv)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        if (is_csv)
+        {
+            if (string == NULL &&
+                cstate->opts.force_notnull_flags[m])
+            {
+                /*
+                 * FORCE_NOT_NULL option is set and column is NULL - convert
+                 * it to the NULL string.
+                 */
+                string = cstate->opts.null_print;
+            }
+            else if (string != NULL && cstate->opts.force_null_flags[m]
+                     && strcmp(string, cstate->opts.null_print) == 0)
+            {
+                /*
+                 * FORCE_NULL option is set and column matches the NULL
+                 * string. It must have been quoted, or otherwise the string
+                 * would already have been set to NULL. Convert it to NULL as
+                 * specified.
+                 */
+                string = NULL;
+            }
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+
+            cstate->num_errors++;
+
+            if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+            {
+                /*
+                 * Since we emit line number and column info in the below
+                 * notice message, we suppress error context information other
+                 * than the relation name.
+                 */
+                Assert(!cstate->relname_only);
+                cstate->relname_only = true;
+
+                if (cstate->cur_attval)
+                {
+                    char       *attval;
+
+                    attval = CopyLimitPrintoutLength(cstate->cur_attval);
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column %s:
\"%s\"",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname,
+                                   attval));
+                    pfree(attval);
+                }
+                else
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column %s: null
input",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname));
+
+                /* reset relname_only */
+                cstate->relname_only = false;
+            }
+
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+
+/*
+ * CopyFromTextOneRow
+ *
+ * Per-row callback for COPY FROM with text format.
+ */
+bool
+CopyFromTextOneRow(CopyFromState cstate,
+                   ExprContext *econtext,
+                   Datum *values,
+                   bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/*
+ * CopyFromCSVOneRow
+ *
+ * Per-row callback for COPY FROM with CSV format.
+ */
+bool
+CopyFromCSVOneRow(CopyFromState cstate,
+                  ExprContext *econtext,
+                  Datum *values,
+                  bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
+
+/*
+ * CopyFromBinaryOneRow
+ *
+ * Copy one row to a set of `values` and `nulls` for the binary format.
+ */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                     Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -856,221 +1126,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
-
-                cstate->num_errors++;
-
-                if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-                {
-                    /*
-                     * Since we emit line number and column info in the below
-                     * notice message, we suppress error context information
-                     * other than the relation name.
-                     */
-                    Assert(!cstate->relname_only);
-                    cstate->relname_only = true;
-
-                    if (cstate->cur_attval)
-                    {
-                        char       *attval;
-
-                        attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column %s:
\"%s\"",
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname,
-                                       attval));
-                        pfree(attval);
-                    }
-                    else
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column %s: null
input",
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname));
-
-                    /* reset relname_only */
-                    cstate->relname_only = false;
-                }
-
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else if (cstate->routine)
-    {
-        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
-            return false;
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
@@ -1100,8 +1170,8 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * by newline.  The terminating newline or EOF marker is not included
  * in the final value of line_buf.
  */
-static bool
-CopyReadLine(CopyFromState cstate)
+static inline bool
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
     bool        result;
 
@@ -1109,7 +1179,7 @@ CopyReadLine(CopyFromState cstate)
     cstate->line_buf_valid = false;
 
     /* Parse data and transfer into line_buf */
-    result = CopyReadLineText(cstate);
+    result = CopyReadLineText(cstate, is_csv);
 
     if (result)
     {
@@ -1176,8 +1246,8 @@ CopyReadLine(CopyFromState cstate)
 /*
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
-static bool
-CopyReadLineText(CopyFromState cstate)
+static inline bool
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
     char       *copy_input_buf;
     int            input_buf_ptr;
@@ -1193,7 +1263,11 @@ CopyReadLineText(CopyFromState cstate)
     char        quotec = '\0';
     char        escapec = '\0';
 
-    if (cstate->opts.csv_mode)
+    /*
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
     {
         quotec = cstate->opts.quote[0];
         escapec = cstate->opts.escape[0];
@@ -1270,7 +1344,11 @@ CopyReadLineText(CopyFromState cstate)
         prev_raw_ptr = input_buf_ptr;
         c = copy_input_buf[input_buf_ptr++];
 
-        if (cstate->opts.csv_mode)
+        /*
+         * is_csv will be optimized away by compiler, as argument is constant
+         * at caller.
+         */
+        if (is_csv)
         {
             /*
              * If character is '\\' or '\r', we may need to look ahead below.
@@ -1309,7 +1387,7 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \r */
-        if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\r' && (!is_csv || !in_quote))
         {
             /* Check for \r\n on first line, _and_ handle \r\n. */
             if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1337,10 +1415,10 @@ CopyReadLineText(CopyFromState cstate)
                     if (cstate->eol_type == EOL_CRNL)
                         ereport(ERROR,
                                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errmsg("literal carriage return found in data") :
                                  errmsg("unquoted carriage return found in data"),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errhint("Use \"\\r\" to represent carriage return.") :
                                  errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1354,10 +1432,10 @@ CopyReadLineText(CopyFromState cstate)
             else if (cstate->eol_type == EOL_NL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal carriage return found in data") :
                          errmsg("unquoted carriage return found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\r\" to represent carriage return.") :
                          errhint("Use quoted CSV field to represent carriage return.")));
             /* If reach here, we have found the line terminator */
@@ -1365,15 +1443,15 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \n */
-        if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\n' && (!is_csv || !in_quote))
         {
             if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal newline found in data") :
                          errmsg("unquoted newline found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\n\" to represent newline.") :
                          errhint("Use quoted CSV field to represent newline.")));
             cstate->eol_type = EOL_NL;    /* in case not set yet */
@@ -1385,7 +1463,7 @@ CopyReadLineText(CopyFromState cstate)
          * In CSV mode, we only recognize \. alone on a line.  This is because
          * \. is a valid CSV data value.
          */
-        if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line))
+        if (c == '\\' && (!is_csv || first_char_in_line))
         {
             char        c2;
 
@@ -1418,7 +1496,11 @@ CopyReadLineText(CopyFromState cstate)
 
                     if (c2 == '\n')
                     {
-                        if (!cstate->opts.csv_mode)
+                        /*
+                         * is_csv will be optimized away by compiler, as
+                         * argument is constant at caller.
+                         */
+                        if (!is_csv)
                             ereport(ERROR,
                                     (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                      errmsg("end-of-copy marker does not match previous newline style")));
@@ -1427,7 +1509,11 @@ CopyReadLineText(CopyFromState cstate)
                     }
                     else if (c2 != '\r')
                     {
-                        if (!cstate->opts.csv_mode)
+                        /*
+                         * is_csv will be optimized away by compiler, as
+                         * argument is constant at caller.
+                         */
+                        if (!is_csv)
                             ereport(ERROR,
                                     (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                      errmsg("end-of-copy marker corrupt")));
@@ -1443,7 +1529,11 @@ CopyReadLineText(CopyFromState cstate)
 
                 if (c2 != '\r' && c2 != '\n')
                 {
-                    if (!cstate->opts.csv_mode)
+                    /*
+                     * is_csv will be optimized away by compiler, as argument
+                     * is constant at caller.
+                     */
+                    if (!is_csv)
                         ereport(ERROR,
                                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                  errmsg("end-of-copy marker corrupt")));
@@ -1472,7 +1562,7 @@ CopyReadLineText(CopyFromState cstate)
                 result = true;    /* report EOF */
                 break;
             }
-            else if (!cstate->opts.csv_mode)
+            else if (!is_csv)
             {
                 /*
                  * If we are here, it means we found a backslash followed by
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index ff19c457abf..c7f69ba606d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -128,6 +128,321 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToRoutine implementations.
+ */
+
+/*
+ * CopyToTextLikeSendEndOfRow
+ *
+ * Apply line terminations for a line sent in text or CSV format depending
+ * on the destination, then send the end of a row.
+ */
+static inline void
+CopyToTextLikeSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+
+    /* Now take the actions related to the end of a row */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextLikeStart
+ *
+ * Start of COPY TO for text and CSV format.
+ */
+static void
+CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        ListCell   *cur;
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, colname, false);
+            else
+                CopyAttributeOutText(cstate, colname);
+        }
+
+        CopyToTextLikeSendEndOfRow(cstate);
+    }
+}
+
+/*
+ * CopyToTextLikeOutFunc
+ *
+ * Assign output function data for a relation's attribute in text/CSV format.
+ */
+static void
+CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+
+/*
+ * CopyToTextLikeOneRow
+ *
+ * Process one row for text/CSV format.
+ *
+ * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
+ */
+static inline void
+CopyToTextLikeOneRow(CopyToState cstate,
+                     TupleTableSlot *slot,
+                     bool is_csv)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1],
+                                        value);
+
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
+                CopyAttributeOutCSV(cstate, string,
+                                    cstate->opts.force_quote_flags[attnum - 1]);
+            else
+                CopyAttributeOutText(cstate, string);
+        }
+    }
+
+    CopyToTextLikeSendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextOneRow
+ *
+ * Per-row callback for COPY TO with text format.
+ */
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, false);
+}
+
+/*
+ * CopyToTextOneRow
+ *
+ * Per-row callback for COPY TO with CSV format.
+ */
+static void
+CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, true);
+}
+
+/*
+ * CopyToTextLikeEnd
+ *
+ * End of COPY TO for text/CSV format.
+ */
+static void
+CopyToTextLikeEnd(CopyToState cstate)
+{
+    /* Nothing to do here */
+}
+
+/*
+ * CopyToRoutine implementation for "binary".
+ */
+
+/*
+ * CopyToBinaryStart
+ *
+ * Start of COPY TO for binary format.
+ */
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /* Generate header for a binary copy */
+    int32        tmp;
+
+    /* Signature */
+    CopySendData(cstate, BinarySignature, 11);
+    /* Flags field */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+    /* No header extension */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+}
+
+/*
+ * CopyToBinaryOutFunc
+ *
+ * Assign output function data for a relation's attribute in binary format.
+ */
+static void
+CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyToBinaryOneRow
+ *
+ * Process one row for binary format.
+ */
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+    ListCell   *cur;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1],
+                                           value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToBinaryEnd
+ *
+ * End of COPY TO for binary format.
+ */
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CSV and text share the same implementation, at the exception of the
+ * output representation and per-row callbacks.
+ */
+static const CopyToRoutine CopyToRoutineText = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+static const CopyToRoutine CopyToRoutineCSV = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToCSVOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+static const CopyToRoutine CopyToRoutineBinary = {
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOutFunc = CopyToBinaryOutFunc,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
+
+/*
+ * Define the COPY TO routines to use for a format.  This should be called
+ * after options are parsed.
+ */
+static const CopyToRoutine *
+CopyToGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyToRoutineCSV;
+    else if (opts.binary)
+        return &CopyToRoutineBinary;
+
+    /* default is text */
+    return &CopyToRoutineText;
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -195,16 +510,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -239,10 +544,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -430,6 +731,9 @@ BeginCopyTo(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyToGetRoutine(cstate->opts);
+
     /* Process the source/target relation or query */
     if (rel)
     {
@@ -770,27 +1074,10 @@ DoCopyTo(CopyToState cstate)
     foreach(cur, cstate->attnumlist)
     {
         int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
-        if (cstate->opts.binary)
-        {
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-        }
-        else if (cstate->routine)
-            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
-                                           &cstate->out_functions[attnum - 1]);
-        else
-        {
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-        }
+        cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                       &cstate->out_functions[attnum - 1]);
     }
 
     /*
@@ -803,58 +1090,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else if (cstate->routine)
-        cstate->routine->CopyToStart(cstate, tupDesc);
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -893,15 +1129,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
-    else if (cstate->routine)
-        cstate->routine->CopyToEnd(cstate);
+    cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -917,11 +1145,7 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    bool        need_delim = false;
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
-    ListCell   *cur;
-    char       *string;
 
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
@@ -929,65 +1153,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    if (cstate->routine)
-    {
-        cstate->routine->CopyToOneRow(cstate, slot);
-        MemoryContextSwitchTo(oldcontext);
-        return;
-    }
-
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
-    foreach(cur, cstate->attnumlist)
-    {
-        int            attnum = lfirst_int(cur);
-        Datum        value = slot->tts_values[attnum - 1];
-        bool        isnull = slot->tts_isnull[attnum - 1];
-
-        if (!cstate->opts.binary)
-        {
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-        }
-
-        if (isnull)
-        {
-            if (!cstate->opts.binary)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-                CopySendInt32(cstate, -1);
-        }
-        else
-        {
-            if (!cstate->opts.binary)
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1]);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-            else
-            {
-                bytea       *outputbytes;
-
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 141fd48dc10..ccfbdf0ee01 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -104,8 +104,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
                          Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-                                  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 509b9e92a18..c11b5ff3cc0 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -187,4 +187,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* Callbacks for CopyFromRoutine->CopyFromOneRow */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+                               Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+                              Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                                 Datum *values, bool *nulls);
+
 #endif                            /* COPYFROM_INTERNAL_H */
-- 
2.45.2

From f3a336853607e7c7e24158cc2b407aaca845dc88 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jul 2024 17:39:41 +0900
Subject: [PATCH v18 3/5] Add support for adding custom COPY TO format

This uses the handler approach like tablesample. The approach creates
an internal function that returns an internal struct. In this case,
a COPY TO handler returns a CopyToRoutine and a COPY FROM handler
returns a CopyFromRoutine.

This uses the same handler for COPY TO and COPY FROM. PostgreSQL calls a
COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM
and false for COPY TO:

    copy_handler(true) returns CopyToRoutine
    copy_handler(false) returns CopyFromRoutine

This also add a test module for custom COPY TO/FROM handler.
---
 src/backend/commands/copy.c                   |  96 ++++++++++++++---
 src/backend/commands/copyfrom.c               |   4 +-
 src/backend/commands/copyto.c                 |   4 +-
 src/backend/nodes/Makefile                    |   1 +
 src/backend/nodes/gen_node_support.pl         |   2 +
 src/backend/utils/adt/pseudotypes.c           |   1 +
 src/include/catalog/pg_proc.dat               |   6 ++
 src/include/catalog/pg_type.dat               |   6 ++
 src/include/commands/copy.h                   |   2 +
 src/include/commands/copyapi.h                |   4 +
 src/include/nodes/meson.build                 |   1 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 src/test/modules/test_copy_format/.gitignore  |   4 +
 src/test/modules/test_copy_format/Makefile    |  23 ++++
 .../expected/test_copy_format.out             |  21 ++++
 src/test/modules/test_copy_format/meson.build |  33 ++++++
 .../test_copy_format/sql/test_copy_format.sql |   6 ++
 .../test_copy_format--1.0.sql                 |   8 ++
 .../test_copy_format/test_copy_format.c       | 100 ++++++++++++++++++
 .../test_copy_format/test_copy_format.control |   4 +
 21 files changed, 313 insertions(+), 15 deletions(-)
 create mode 100644 src/test/modules/test_copy_format/.gitignore
 create mode 100644 src/test/modules/test_copy_format/Makefile
 create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out
 create mode 100644 src/test/modules/test_copy_format/meson.build
 create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.c
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index df7a4a21c94..e5137e7bb3d 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -32,6 +32,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -439,6 +440,87 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
     return COPY_LOG_VERBOSITY_DEFAULT;    /* keep compiler quiet */
 }
 
+/*
+ * Process the "format" option.
+ *
+ * This function checks whether the option value is a built-in format such as
+ * "text" and "csv" or not. If the option value isn't a built-in format, this
+ * function finds a COPY format handler that returns a CopyToRoutine (for
+ * is_from == false) or CopyFromRountine (for is_from == true). If no COPY
+ * format handler is found, this function reports an error.
+ */
+static void
+ProcessCopyOptionFormat(ParseState *pstate,
+                        CopyFormatOptions *opts_out,
+                        bool is_from,
+                        DefElem *defel)
+{
+    char       *format;
+    Oid            funcargtypes[1];
+    Oid            handlerOid = InvalidOid;
+    Datum        datum;
+    Node       *routine;
+
+    format = defGetString(defel);
+
+    /* built-in formats */
+    if (strcmp(format, "text") == 0)
+         /* default format */ return;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        return;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        return;
+    }
+
+    /* custom format */
+    funcargtypes[0] = INTERNALOID;
+    handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                funcargtypes, true);
+    if (!OidIsValid(handlerOid))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
+    routine = (Node *) DatumGetPointer(datum);
+    if (is_from)
+    {
+        if (routine == NULL || !IsA(routine, CopyFromRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyFromRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
+    else
+    {
+        if (routine == NULL || !IsA(routine, CopyToRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyToRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
+
+    opts_out->routine = routine;
+}
+
 /*
  * Process the statement option list for COPY.
  *
@@ -481,22 +563,10 @@ ProcessCopyOptions(ParseState *pstate,
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-                opts_out->csv_mode = true;
-            else if (strcmp(fmt, "binary") == 0)
-                opts_out->binary = true;
-            else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+            ProcessCopyOptionFormat(pstate, opts_out, is_from, defel);
         }
         else if (strcmp(defel->defname, "freeze") == 0)
         {
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1a59202f5ab..2b48c825a0a 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -244,7 +244,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = {
 static const CopyFromRoutine *
 CopyFromGetRoutine(CopyFormatOptions opts)
 {
-    if (opts.csv_mode)
+    if (opts.routine)
+        return (const CopyFromRoutine *) opts.routine;
+    else if (opts.csv_mode)
         return &CopyFromRoutineCSV;
     else if (opts.binary)
         return &CopyFromRoutineBinary;
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c7f69ba606d..a9e923467dc 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -435,7 +435,9 @@ static const CopyToRoutine CopyToRoutineBinary = {
 static const CopyToRoutine *
 CopyToGetRoutine(CopyFormatOptions opts)
 {
-    if (opts.csv_mode)
+    if (opts.routine)
+        return (const CopyToRoutine *) opts.routine;
+    else if (opts.csv_mode)
         return &CopyToRoutineCSV;
     else if (opts.binary)
         return &CopyToRoutineBinary;
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index 66bbad8e6e0..173ee11811c 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -49,6 +49,7 @@ node_headers = \
     access/sdir.h \
     access/tableam.h \
     access/tsmapi.h \
+    commands/copyapi.h \
     commands/event_trigger.h \
     commands/trigger.h \
     executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
index 81df3bdf95f..428ab4f0d93 100644
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index e189e9b79d2..25f24ab95d2 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(internal);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 73d9cf85826..126254473e6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7644,6 +7644,12 @@
 { oid => '3312', descr => 'I/O',
   proname => 'tsm_handler_out', prorettype => 'cstring',
   proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' },
+{ oid => '8753', descr => 'I/O',
+  proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler',
+  proargtypes => 'cstring', prosrc => 'copy_handler_in' },
+{ oid => '8754', descr => 'I/O',
+  proname => 'copy_handler_out', prorettype => 'cstring',
+  proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
 { oid => '267', descr => 'I/O',
   proname => 'table_am_handler_in', proisstrict => 'f',
   prorettype => 'table_am_handler', proargtypes => 'cstring',
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index ceff66ccde1..14c6c1ea486 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -633,6 +633,12 @@
   typcategory => 'P', typinput => 'tsm_handler_in',
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+{ oid => '8752',
+  descr => 'pseudo-type for the result of a copy to/from method functoin',
+  typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'copy_handler_in',
+  typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '269',
   descr => 'pseudo-type for the result of a table AM handler function',
   typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index ccfbdf0ee01..79bd4fb9151 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -84,6 +84,8 @@ typedef struct CopyFormatOptions
     CopyOnErrorChoice on_error; /* what to do when error happened */
     CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
     List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
+                                 * NULL) */
 } CopyFormatOptions;
 
 /* These are private in commands/copy[from|to].c */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 635c4cbff27..2223cad8fd9 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -27,6 +27,8 @@ typedef struct CopyToStateData *CopyToState;
  */
 typedef struct CopyFromRoutine
 {
+    NodeTag        type;
+
     /*
      * Called when COPY FROM is started to set up the input functions
      * associated to the relation's attributes writing to.  `finfo` can be
@@ -69,6 +71,8 @@ typedef struct CopyFromRoutine
  */
 typedef struct CopyToRoutine
 {
+    NodeTag        type;
+
     /*
      * Called when COPY TO is started to set up the output functions
      * associated to the relation's attributes reading from.  `finfo` can be
diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build
index b665e55b657..103df1a7873 100644
--- a/src/include/nodes/meson.build
+++ b/src/include/nodes/meson.build
@@ -11,6 +11,7 @@ node_support_input_i = [
   'access/sdir.h',
   'access/tableam.h',
   'access/tsmapi.h',
+  'commands/copyapi.h',
   'commands/event_trigger.h',
   'commands/trigger.h',
   'executor/tuptable.h',
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 256799f520a..b7b46928a19 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
           spgist_name_ops \
           test_bloomfilter \
           test_copy_callbacks \
+          test_copy_format \
           test_custom_rmgrs \
           test_ddl_deparse \
           test_dsa \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index d8fe059d236..c42b4b2b31f 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -14,6 +14,7 @@ subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
 subdir('test_copy_callbacks')
+subdir('test_copy_format')
 subdir('test_custom_rmgrs')
 subdir('test_ddl_deparse')
 subdir('test_dsa')
diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_copy_format/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile
new file mode 100644
index 00000000000..8497f91624d
--- /dev/null
+++ b/src/test/modules/test_copy_format/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_format/Makefile
+
+MODULE_big = test_copy_format
+OBJS = \
+    $(WIN32RES) \
+    test_copy_format.o
+PGFILEDESC = "test_copy_format - test custom COPY FORMAT"
+
+EXTENSION = test_copy_format
+DATA = test_copy_format--1.0.sql
+
+REGRESS = test_copy_format
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_format
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
new file mode 100644
index 00000000000..4ed7c0b12db
--- /dev/null
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -0,0 +1,21 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (format 'test_copy_format');
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromEnd
+COPY public.test TO stdout WITH (format 'test_copy_format');
+NOTICE:  test_copy_format: is_from=false
+NOTICE:  CopyToOutFunc: atttypid=21
+NOTICE:  CopyToOutFunc: atttypid=23
+NOTICE:  CopyToOutFunc: atttypid=20
+NOTICE:  CopyToStart: natts=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToEnd
diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build
new file mode 100644
index 00000000000..4cefe7b709a
--- /dev/null
+++ b/src/test/modules/test_copy_format/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+test_copy_format_sources = files(
+  'test_copy_format.c',
+)
+
+if host_system == 'windows'
+  test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_format',
+    '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',])
+endif
+
+test_copy_format = shared_module('test_copy_format',
+  test_copy_format_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_copy_format
+
+test_install_data += files(
+  'test_copy_format.control',
+  'test_copy_format--1.0.sql',
+)
+
+tests += {
+  'name': 'test_copy_format',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_format',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
new file mode 100644
index 00000000000..e805f7cb011
--- /dev/null
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -0,0 +1,6 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (format 'test_copy_format');
+\.
+COPY public.test TO stdout WITH (format 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql
b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
new file mode 100644
index 00000000000..d24ea03ce99
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit
+
+CREATE FUNCTION test_copy_format(internal)
+    RETURNS copy_handler
+    AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
new file mode 100644
index 00000000000..f6b105659ab
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -0,0 +1,100 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_format.c
+ *        Code for testing custom COPY format.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *        src/test/modules/test_copy_format/test_copy_format.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "commands/copyapi.h"
+#include "commands/defrem.h"
+
+PG_MODULE_MAGIC;
+
+static void
+CopyFromInFunc(CopyFromState cstate, Oid atttypid,
+               FmgrInfo *finfo, Oid *typioparam)
+{
+    ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyFromStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts)));
+}
+
+static bool
+CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    ereport(NOTICE, (errmsg("CopyFromOneRow")));
+    return false;
+}
+
+static void
+CopyFromEnd(CopyFromState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyFromEnd")));
+}
+
+static const CopyFromRoutine CopyFromRoutineTestCopyFormat = {
+    .type = T_CopyFromRoutine,
+    .CopyFromInFunc = CopyFromInFunc,
+    .CopyFromStart = CopyFromStart,
+    .CopyFromOneRow = CopyFromOneRow,
+    .CopyFromEnd = CopyFromEnd,
+};
+
+static void
+CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyToStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts)));
+}
+
+static void
+CopyToOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid)));
+}
+
+static void
+CopyToEnd(CopyToState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyToEnd")));
+}
+
+static const CopyToRoutine CopyToRoutineTestCopyFormat = {
+    .type = T_CopyToRoutine,
+    .CopyToOutFunc = CopyToOutFunc,
+    .CopyToStart = CopyToStart,
+    .CopyToOneRow = CopyToOneRow,
+    .CopyToEnd = CopyToEnd,
+};
+
+PG_FUNCTION_INFO_V1(test_copy_format);
+Datum
+test_copy_format(PG_FUNCTION_ARGS)
+{
+    bool        is_from = PG_GETARG_BOOL(0);
+
+    ereport(NOTICE,
+            (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
+
+    if (is_from)
+        PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat);
+    else
+        PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+}
diff --git a/src/test/modules/test_copy_format/test_copy_format.control
b/src/test/modules/test_copy_format/test_copy_format.control
new file mode 100644
index 00000000000..f05a6362358
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.control
@@ -0,0 +1,4 @@
+comment = 'Test code for custom COPY format'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_format'
+relocatable = true
-- 
2.45.2

From 19512a04864ec88829a553de983f41d2ce31a375 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 14:54:10 +0900
Subject: [PATCH v18 4/5] Export CopyToStateData and CopyFromStateData

It's for custom COPY TO/FROM format handlers implemented as extension.

This just moves codes. This doesn't change codes except
CopyDest/CopyFrom enum values. CopyDest/CopyFrom enum values such as
COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of
COPY_ prefix is used for CopyDest enum values and COPY_SOURCE_ prefix
instead of COPY_PREFIX_ is used for CopyFrom enum values. For example,
COPY_FILE in CopyDest is renamed to COPY_DEST_FILE and COPY_FILE in
CopyFrom is renamed to COPY_SOURCE_FILE.

Note that this isn't enough to implement custom COPY TO/FROM format
handlers as extension. We'll do the followings in a subsequent commit:

For custom COPY TO format handler:

1. Add an opaque space for custom COPY TO format handler
2. Export CopySendEndOfRow() to flush buffer

For custom COPY FROM format handler:

1. Add an opaque space for custom COPY FROM format handler
2. Export CopyReadBinaryData() to read the next data
---
 src/backend/commands/copyfrom.c          |   4 +-
 src/backend/commands/copyfromparse.c     |  10 +-
 src/backend/commands/copyto.c            |  77 +-----
 src/include/commands/copy.h              |  78 +-----
 src/include/commands/copyapi.h           | 306 ++++++++++++++++++++++-
 src/include/commands/copyfrom_internal.h | 165 ------------
 6 files changed, 320 insertions(+), 320 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 2b48c825a0a..5902172b8df 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1699,7 +1699,7 @@ BeginCopyFrom(ParseState *pstate,
                             pg_encoding_to_char(GetDatabaseEncoding()))));
     }
 
-    cstate->copy_src = COPY_FILE;    /* default */
+    cstate->copy_src = COPY_SOURCE_FILE;    /* default */
 
     cstate->whereClause = whereClause;
 
@@ -1827,7 +1827,7 @@ BeginCopyFrom(ParseState *pstate,
     if (data_source_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_src = COPY_CALLBACK;
+        cstate->copy_src = COPY_SOURCE_CALLBACK;
         cstate->data_source_cb = data_source_cb;
     }
     else if (pipe)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 90824b47785..74844103228 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -180,7 +180,7 @@ ReceiveCopyBegin(CopyFromState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_src = COPY_FRONTEND;
+    cstate->copy_src = COPY_SOURCE_FRONTEND;
     cstate->fe_msgbuf = makeStringInfo();
     /* We *must* flush here to ensure FE knows it can send. */
     pq_flush();
@@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
 
     switch (cstate->copy_src)
     {
-        case COPY_FILE:
+        case COPY_SOURCE_FILE:
             bytesread = fread(databuf, 1, maxread, cstate->copy_file);
             if (ferror(cstate->copy_file))
                 ereport(ERROR,
@@ -257,7 +257,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
             if (bytesread == 0)
                 cstate->raw_reached_eof = true;
             break;
-        case COPY_FRONTEND:
+        case COPY_SOURCE_FRONTEND:
             while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof)
             {
                 int            avail;
@@ -340,7 +340,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
                 bytesread += avail;
             }
             break;
-        case COPY_CALLBACK:
+        case COPY_SOURCE_CALLBACK:
             bytesread = cstate->data_source_cb(databuf, minread, maxread);
             break;
     }
@@ -1188,7 +1188,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv)
          * after \. up to the protocol end of copy data.  (XXX maybe better
          * not to treat \. as special?)
          */
-        if (cstate->copy_src == COPY_FRONTEND)
+        if (cstate->copy_src == COPY_SOURCE_FRONTEND)
         {
             int            inbytes;
 
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index a9e923467dc..54aa6cdecaf 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -37,67 +37,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * Represents the different dest cases we need to worry about at
- * the bottom level
- */
-typedef enum CopyDest
-{
-    COPY_FILE,                    /* to file (or a piped program) */
-    COPY_FRONTEND,                /* to frontend */
-    COPY_CALLBACK,                /* to callback function */
-} CopyDest;
-
-/*
- * This struct contains all the state variables used throughout a COPY TO
- * operation.
- *
- * Multi-byte encodings: all supported client-side encodings encode multi-byte
- * characters by having the first byte's high bit set. Subsequent bytes of the
- * character can have the high bit not set. When scanning data in such an
- * encoding to look for a match to a single-byte (ie ASCII) character, we must
- * use the full pg_encoding_mblen() machinery to skip over multibyte
- * characters, else we might find a false match to a trailing byte. In
- * supported server encodings, there is no possibility of a false match, and
- * it's faster to make useless comparisons to trailing bytes than it is to
- * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
- * when we have to do it the hard way.
- */
-typedef struct CopyToStateData
-{
-    /* format routine */
-    const CopyToRoutine *routine;
-
-    /* low-level state data */
-    CopyDest    copy_dest;        /* type of copy source/destination */
-    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
-
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy to */
-    QueryDesc  *queryDesc;        /* executable query to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDOUT */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_dest_cb data_dest_cb; /* function for writing data */
-
-    CopyFormatOptions opts;
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    FmgrInfo   *out_functions;    /* lookup info for output functions */
-    MemoryContext rowcontext;    /* per-row evaluation context */
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyToStateData;
-
 /* DestReceiver for COPY (query) TO */
 typedef struct
 {
@@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
 {
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             /* Default line termination depends on platform */
 #ifndef WIN32
             CopySendChar(cstate, '\n');
@@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
             CopySendString(cstate, "\r\n");
 #endif
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* The FE/BE protocol uses \n as newline for all platforms */
             CopySendChar(cstate, '\n');
             break;
@@ -464,7 +403,7 @@ SendCopyBegin(CopyToState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_dest = COPY_FRONTEND;
+    cstate->copy_dest = COPY_DEST_FRONTEND;
 }
 
 static void
@@ -511,7 +450,7 @@ CopySendEndOfRow(CopyToState cstate)
 
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -545,11 +484,11 @@ CopySendEndOfRow(CopyToState cstate)
                              errmsg("could not write to COPY file: %m")));
             }
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
-        case COPY_CALLBACK:
+        case COPY_DEST_CALLBACK:
             cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
             break;
     }
@@ -928,12 +867,12 @@ BeginCopyTo(ParseState *pstate,
     /* See Multibyte encoding comment above */
     cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
 
-    cstate->copy_dest = COPY_FILE;    /* default */
+    cstate->copy_dest = COPY_DEST_FILE; /* default */
 
     if (data_dest_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_dest = COPY_CALLBACK;
+        cstate->copy_dest = COPY_DEST_CALLBACK;
         cstate->data_dest_cb = data_dest_cb;
     }
     else if (pipe)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 79bd4fb9151..e2411848e9f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,87 +14,11 @@
 #ifndef COPY_H
 #define COPY_H
 
-#include "nodes/execnodes.h"
+#include "commands/copyapi.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-/*
- * Represents whether a header line should be present, and whether it must
- * match the actual names (which implies "true").
- */
-typedef enum CopyHeaderChoice
-{
-    COPY_HEADER_FALSE = 0,
-    COPY_HEADER_TRUE,
-    COPY_HEADER_MATCH,
-} CopyHeaderChoice;
-
-/*
- * Represents where to save input processing errors.  More values to be added
- * in the future.
- */
-typedef enum CopyOnErrorChoice
-{
-    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
-    COPY_ON_ERROR_IGNORE,        /* ignore errors */
-} CopyOnErrorChoice;
-
-/*
- * Represents verbosity of logged messages by COPY command.
- */
-typedef enum CopyLogVerbosityChoice
-{
-    COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages, default */
-    COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
-} CopyLogVerbosityChoice;
-
-/*
- * A struct to hold COPY options, in a parsed form. All of these are related
- * to formatting, except for 'freeze', which doesn't really belong here, but
- * it's expedient to parse it along with all the other options.
- */
-typedef struct CopyFormatOptions
-{
-    /* parameters from the COPY command */
-    int            file_encoding;    /* file or remote side's character encoding,
-                                 * -1 if not specified */
-    bool        binary;            /* binary format? */
-    bool        freeze;            /* freeze rows on loading? */
-    bool        csv_mode;        /* Comma Separated Value format? */
-    CopyHeaderChoice header_line;    /* header line? */
-    char       *null_print;        /* NULL marker string (server encoding!) */
-    int            null_print_len; /* length of same */
-    char       *null_print_client;    /* same converted to file encoding */
-    char       *default_print;    /* DEFAULT marker string */
-    int            default_print_len;    /* length of same */
-    char       *delim;            /* column delimiter (must be 1 byte) */
-    char       *quote;            /* CSV quote char (must be 1 byte) */
-    char       *escape;            /* CSV escape char (must be 1 byte) */
-    List       *force_quote;    /* list of column names */
-    bool        force_quote_all;    /* FORCE_QUOTE *? */
-    bool       *force_quote_flags;    /* per-column CSV FQ flags */
-    List       *force_notnull;    /* list of column names */
-    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
-    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
-    List       *force_null;        /* list of column names */
-    bool        force_null_all; /* FORCE_NULL *? */
-    bool       *force_null_flags;    /* per-column CSV FN flags */
-    bool        convert_selectively;    /* do selective binary conversion? */
-    CopyOnErrorChoice on_error; /* what to do when error happened */
-    CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
-    List       *convert_select; /* list of column names (can be NIL) */
-    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
-                                 * NULL) */
-} CopyFormatOptions;
-
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
-typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
-typedef void (*copy_data_dest_cb) (void *data, int len);
-
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 2223cad8fd9..3104d99ea9f 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -14,12 +14,83 @@
 #ifndef COPYAPI_H
 #define COPYAPI_H
 
+#include "commands/trigger.h"
+#include "executor/execdesc.h"
 #include "executor/tuptable.h"
 #include "nodes/execnodes.h"
 
-/* These are private in commands/copy[from|to].c */
+/*
+ * Represents whether a header line should be present, and whether it must
+ * match the actual names (which implies "true").
+ */
+typedef enum CopyHeaderChoice
+{
+    COPY_HEADER_FALSE = 0,
+    COPY_HEADER_TRUE,
+    COPY_HEADER_MATCH,
+} CopyHeaderChoice;
+
+/*
+ * Represents where to save input processing errors.  More values to be added
+ * in the future.
+ */
+typedef enum CopyOnErrorChoice
+{
+    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
+    COPY_ON_ERROR_IGNORE,        /* ignore errors */
+} CopyOnErrorChoice;
+
+/*
+ * Represents verbosity of logged messages by COPY command.
+ */
+typedef enum CopyLogVerbosityChoice
+{
+    COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages, default */
+    COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
+} CopyLogVerbosityChoice;
+
+/*
+ * A struct to hold COPY options, in a parsed form. All of these are related
+ * to formatting, except for 'freeze', which doesn't really belong here, but
+ * it's expedient to parse it along with all the other options.
+ */
+typedef struct CopyFormatOptions
+{
+    /* parameters from the COPY command */
+    int            file_encoding;    /* file or remote side's character encoding,
+                                 * -1 if not specified */
+    bool        binary;            /* binary format? */
+    bool        freeze;            /* freeze rows on loading? */
+    bool        csv_mode;        /* Comma Separated Value format? */
+    CopyHeaderChoice header_line;    /* header line? */
+    char       *null_print;        /* NULL marker string (server encoding!) */
+    int            null_print_len; /* length of same */
+    char       *null_print_client;    /* same converted to file encoding */
+    char       *default_print;    /* DEFAULT marker string */
+    int            default_print_len;    /* length of same */
+    char       *delim;            /* column delimiter (must be 1 byte) */
+    char       *quote;            /* CSV quote char (must be 1 byte) */
+    char       *escape;            /* CSV escape char (must be 1 byte) */
+    List       *force_quote;    /* list of column names */
+    bool        force_quote_all;    /* FORCE_QUOTE *? */
+    bool       *force_quote_flags;    /* per-column CSV FQ flags */
+    List       *force_notnull;    /* list of column names */
+    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
+    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
+    List       *force_null;        /* list of column names */
+    bool        force_null_all; /* FORCE_NULL *? */
+    bool       *force_null_flags;    /* per-column CSV FN flags */
+    bool        convert_selectively;    /* do selective binary conversion? */
+    CopyOnErrorChoice on_error; /* what to do when error happened */
+    CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
+    List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
+                                 * NULL) */
+} CopyFormatOptions;
+
+typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+
 typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
 
 /*
  * API structure for a COPY FROM format implementation.  Note this must be
@@ -65,6 +136,174 @@ typedef struct CopyFromRoutine
     void        (*CopyFromEnd) (CopyFromState cstate);
 } CopyFromRoutine;
 
+/*
+ * Represents the different source cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopySource
+{
+    COPY_SOURCE_FILE,            /* from file (or a piped program) */
+    COPY_SOURCE_FRONTEND,        /* from frontend */
+    COPY_SOURCE_CALLBACK,        /* from callback function */
+} CopySource;
+
+/*
+ * Represents the end-of-line terminator type of the input
+ */
+typedef enum EolType
+{
+    EOL_UNKNOWN,
+    EOL_NL,
+    EOL_CR,
+    EOL_CRNL,
+} EolType;
+
+/*
+ * Represents the insert method to be used during COPY FROM.
+ */
+typedef enum CopyInsertMethod
+{
+    CIM_SINGLE,                    /* use table_tuple_insert or ExecForeignInsert */
+    CIM_MULTI,                    /* always use table_multi_insert or
+                                 * ExecForeignBatchInsert */
+    CIM_MULTI_CONDITIONAL,        /* use table_multi_insert or
+                                 * ExecForeignBatchInsert only if valid */
+} CopyInsertMethod;
+
+/*
+ * This struct contains all the state variables used throughout a COPY FROM
+ * operation.
+ */
+typedef struct CopyFromStateData
+{
+    /* format routine */
+    const CopyFromRoutine *routine;
+
+    /* low-level state data */
+    CopySource    copy_src;        /* type of copy source */
+    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
+
+    EolType        eol_type;        /* EOL type of input */
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    Oid            conversion_proc;    /* encoding conversion function */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDIN */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_source_cb data_source_cb; /* function for reading data */
+
+    CopyFormatOptions opts;
+    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /* these are just for error messages, see CopyFromErrorCallback */
+    const char *cur_relname;    /* table name for error messages */
+    uint64        cur_lineno;        /* line number for error messages */
+    const char *cur_attname;    /* current att for error messages */
+    const char *cur_attval;        /* current att value for error messages */
+    bool        relname_only;    /* don't output line number, att, etc. */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    AttrNumber    num_defaults;    /* count of att that are missing and have
+                                 * default value */
+    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
+    Oid           *typioparams;    /* array of element types for in_functions */
+    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
+                                     * execution */
+    uint64        num_errors;        /* total number of rows which contained soft
+                                 * errors */
+    int           *defmap;            /* array of default att numbers related to
+                                 * missing att */
+    ExprState **defexprs;        /* array of default att expressions for all
+                                 * att */
+    bool       *defaults;        /* if DEFAULT marker was found for
+                                 * corresponding att */
+    bool        volatile_defexprs;    /* is any of defexprs volatile? */
+    List       *range_table;    /* single element list of RangeTblEntry */
+    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
+    ExprState  *qualexpr;
+
+    TransitionCaptureState *transition_capture;
+
+    /*
+     * These variables are used to reduce overhead in COPY FROM.
+     *
+     * attribute_buf holds the separated, de-escaped text for each field of
+     * the current line.  The CopyReadAttributes functions return arrays of
+     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
+     * the buffer on each cycle.
+     *
+     * In binary COPY FROM, attribute_buf holds the binary data for the
+     * current field, but the usage is otherwise similar.
+     */
+    StringInfoData attribute_buf;
+
+    /* field raw data pointers found by COPY FROM */
+
+    int            max_fields;
+    char      **raw_fields;
+
+    /*
+     * Similarly, line_buf holds the whole input line being processed. The
+     * input cycle is first to read the whole line into line_buf, and then
+     * extract the individual attribute fields into attribute_buf.  line_buf
+     * is preserved unmodified so that we can display it in error messages if
+     * appropriate.  (In binary mode, line_buf is not used.)
+     */
+    StringInfoData line_buf;
+    bool        line_buf_valid; /* contains the row being processed? */
+
+    /*
+     * input_buf holds input data, already converted to database encoding.
+     *
+     * In text mode, CopyReadLine parses this data sufficiently to locate line
+     * boundaries, then transfers the data to line_buf. We guarantee that
+     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
+     * mode, input_buf is not used.)
+     *
+     * If encoding conversion is not required, input_buf is not a separate
+     * buffer but points directly to raw_buf.  In that case, input_buf_len
+     * tracks the number of bytes that have been verified as valid in the
+     * database encoding, and raw_buf_len is the total number of bytes stored
+     * in the buffer.
+     */
+#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
+    char       *input_buf;
+    int            input_buf_index;    /* next byte to process */
+    int            input_buf_len;    /* total # of bytes stored */
+    bool        input_reached_eof;    /* true if we reached EOF */
+    bool        input_reached_error;    /* true if a conversion error happened */
+    /* Shorthand for number of unconsumed bytes available in input_buf */
+#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
+
+    /*
+     * raw_buf holds raw input data read from the data source (file or client
+     * connection), not yet converted to the database encoding.  Like with
+     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
+     */
+#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
+    char       *raw_buf;
+    int            raw_buf_index;    /* next byte to process */
+    int            raw_buf_len;    /* total # of bytes stored */
+    bool        raw_reached_eof;    /* true if we reached EOF */
+
+    /* Shorthand for number of unconsumed bytes available in raw_buf */
+#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
+
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyFromStateData;
+
+
+typedef struct CopyToStateData *CopyToState;
+
 /*
  * API structure for a COPY TO format implementation.   Note this must be
  * allocated in a server-lifetime manner, typically as a static const struct.
@@ -101,4 +340,67 @@ typedef struct CopyToRoutine
     void        (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * Represents the different dest cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopyDest
+{
+    COPY_DEST_FILE,                /* to file (or a piped program) */
+    COPY_DEST_FRONTEND,            /* to frontend */
+    COPY_DEST_CALLBACK,            /* to callback function */
+} CopyDest;
+
+typedef void (*copy_data_dest_cb) (void *data, int len);
+
+/*
+ * This struct contains all the state variables used throughout a COPY TO
+ * operation.
+ *
+ * Multi-byte encodings: all supported client-side encodings encode multi-byte
+ * characters by having the first byte's high bit set. Subsequent bytes of the
+ * character can have the high bit not set. When scanning data in such an
+ * encoding to look for a match to a single-byte (ie ASCII) character, we must
+ * use the full pg_encoding_mblen() machinery to skip over multibyte
+ * characters, else we might find a false match to a trailing byte. In
+ * supported server encodings, there is no possibility of a false match, and
+ * it's faster to make useless comparisons to trailing bytes than it is to
+ * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
+ * when we have to do it the hard way.
+ */
+typedef struct CopyToStateData
+{
+    /* format routine */
+    const CopyToRoutine *routine;
+
+    /* low-level state data */
+    CopyDest    copy_dest;        /* type of copy source/destination */
+    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
+
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy to */
+    QueryDesc  *queryDesc;        /* executable query to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDOUT */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_dest_cb data_dest_cb; /* function for writing data */
+
+    CopyFormatOptions opts;
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    FmgrInfo   *out_functions;    /* lookup info for output functions */
+    MemoryContext rowcontext;    /* per-row evaluation context */
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyToStateData;
+
 #endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c11b5ff3cc0..3863d26d5b7 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -19,171 +19,6 @@
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
-/*
- * Represents the different source cases we need to worry about at
- * the bottom level
- */
-typedef enum CopySource
-{
-    COPY_FILE,                    /* from file (or a piped program) */
-    COPY_FRONTEND,                /* from frontend */
-    COPY_CALLBACK,                /* from callback function */
-} CopySource;
-
-/*
- *    Represents the end-of-line terminator type of the input
- */
-typedef enum EolType
-{
-    EOL_UNKNOWN,
-    EOL_NL,
-    EOL_CR,
-    EOL_CRNL,
-} EolType;
-
-/*
- * Represents the insert method to be used during COPY FROM.
- */
-typedef enum CopyInsertMethod
-{
-    CIM_SINGLE,                    /* use table_tuple_insert or ExecForeignInsert */
-    CIM_MULTI,                    /* always use table_multi_insert or
-                                 * ExecForeignBatchInsert */
-    CIM_MULTI_CONDITIONAL,        /* use table_multi_insert or
-                                 * ExecForeignBatchInsert only if valid */
-} CopyInsertMethod;
-
-/*
- * This struct contains all the state variables used throughout a COPY FROM
- * operation.
- */
-typedef struct CopyFromStateData
-{
-    /* format routine */
-    const CopyFromRoutine *routine;
-
-    /* low-level state data */
-    CopySource    copy_src;        /* type of copy source */
-    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
-
-    EolType        eol_type;        /* EOL type of input */
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    Oid            conversion_proc;    /* encoding conversion function */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDIN */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_source_cb data_source_cb; /* function for reading data */
-
-    CopyFormatOptions opts;
-    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /* these are just for error messages, see CopyFromErrorCallback */
-    const char *cur_relname;    /* table name for error messages */
-    uint64        cur_lineno;        /* line number for error messages */
-    const char *cur_attname;    /* current att for error messages */
-    const char *cur_attval;        /* current att value for error messages */
-    bool        relname_only;    /* don't output line number, att, etc. */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    AttrNumber    num_defaults;    /* count of att that are missing and have
-                                 * default value */
-    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
-    Oid           *typioparams;    /* array of element types for in_functions */
-    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
-                                     * execution */
-    uint64        num_errors;        /* total number of rows which contained soft
-                                 * errors */
-    int           *defmap;            /* array of default att numbers related to
-                                 * missing att */
-    ExprState **defexprs;        /* array of default att expressions for all
-                                 * att */
-    bool       *defaults;        /* if DEFAULT marker was found for
-                                 * corresponding att */
-    bool        volatile_defexprs;    /* is any of defexprs volatile? */
-    List       *range_table;    /* single element list of RangeTblEntry */
-    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
-    ExprState  *qualexpr;
-
-    TransitionCaptureState *transition_capture;
-
-    /*
-     * These variables are used to reduce overhead in COPY FROM.
-     *
-     * attribute_buf holds the separated, de-escaped text for each field of
-     * the current line.  The CopyReadAttributes functions return arrays of
-     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
-     * the buffer on each cycle.
-     *
-     * In binary COPY FROM, attribute_buf holds the binary data for the
-     * current field, but the usage is otherwise similar.
-     */
-    StringInfoData attribute_buf;
-
-    /* field raw data pointers found by COPY FROM */
-
-    int            max_fields;
-    char      **raw_fields;
-
-    /*
-     * Similarly, line_buf holds the whole input line being processed. The
-     * input cycle is first to read the whole line into line_buf, and then
-     * extract the individual attribute fields into attribute_buf.  line_buf
-     * is preserved unmodified so that we can display it in error messages if
-     * appropriate.  (In binary mode, line_buf is not used.)
-     */
-    StringInfoData line_buf;
-    bool        line_buf_valid; /* contains the row being processed? */
-
-    /*
-     * input_buf holds input data, already converted to database encoding.
-     *
-     * In text mode, CopyReadLine parses this data sufficiently to locate line
-     * boundaries, then transfers the data to line_buf. We guarantee that
-     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
-     * mode, input_buf is not used.)
-     *
-     * If encoding conversion is not required, input_buf is not a separate
-     * buffer but points directly to raw_buf.  In that case, input_buf_len
-     * tracks the number of bytes that have been verified as valid in the
-     * database encoding, and raw_buf_len is the total number of bytes stored
-     * in the buffer.
-     */
-#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
-    char       *input_buf;
-    int            input_buf_index;    /* next byte to process */
-    int            input_buf_len;    /* total # of bytes stored */
-    bool        input_reached_eof;    /* true if we reached EOF */
-    bool        input_reached_error;    /* true if a conversion error happened */
-    /* Shorthand for number of unconsumed bytes available in input_buf */
-#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
-
-    /*
-     * raw_buf holds raw input data read from the data source (file or client
-     * connection), not yet converted to the database encoding.  Like with
-     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
-     */
-#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
-    char       *raw_buf;
-    int            raw_buf_index;    /* next byte to process */
-    int            raw_buf_len;    /* total # of bytes stored */
-    bool        raw_reached_eof;    /* true if we reached EOF */
-
-    /* Shorthand for number of unconsumed bytes available in raw_buf */
-#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
-
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyFromStateData;
-
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
-- 
2.45.2

From 7afdeeaafd4045477d90cf0c9ab356074e4ea100 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 15:12:43 +0900
Subject: [PATCH v18 5/5] Add support for implementing custom COPY TO/FROM
 format as extension

For custom COPY TO format implementation:

* Add CopyToStateData::opaque that can be used to keep data for custom
  COPY TO format implementation
* Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf
* Rename CopySendEndOfRow() to CopyToStateFlush() because it's a
  method for CopyToState and it's used for flushing. End-of-row related
  codes were moved to CopyToTextSendEndOfRow().

For custom COPY FROM format implementation:

* Add CopyFromStateData::opaque that can be used to keep data for
  custom COPY From format implementation
* Export CopyReadBinaryData() to read the next data
* Rename CopyReadBinaryData() to CopyFromStateRead() because it's a
  method for CopyFromState and "BinaryData" is redundant.
---
 src/backend/commands/copyfromparse.c | 21 ++++++++++-----------
 src/backend/commands/copyto.c        | 15 +++++++--------
 src/include/commands/copyapi.h       | 10 ++++++++++
 3 files changed, 27 insertions(+), 19 deletions(-)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 74844103228..cd80d34f3da 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -164,7 +164,6 @@ static int    CopyGetData(CopyFromState cstate, void *databuf,
 static inline bool CopyGetInt32(CopyFromState cstate, int32 *val);
 static inline bool CopyGetInt16(CopyFromState cstate, int16 *val);
 static void CopyLoadInputBuf(CopyFromState cstate);
-static int    CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
 
 void
 ReceiveCopyBegin(CopyFromState cstate)
@@ -193,7 +192,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate)
     int32        tmp;
 
     /* Signature */
-    if (CopyReadBinaryData(cstate, readSig, 11) != 11 ||
+    if (CopyFromStateRead(cstate, readSig, 11) != 11 ||
         memcmp(readSig, BinarySignature, 11) != 0)
         ereport(ERROR,
                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
@@ -221,7 +220,7 @@ ReceiveCopyBinaryHeader(CopyFromState cstate)
     /* Skip extension header, if present */
     while (tmp-- > 0)
     {
-        if (CopyReadBinaryData(cstate, readSig, 1) != 1)
+        if (CopyFromStateRead(cstate, readSig, 1) != 1)
             ereport(ERROR,
                     (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                      errmsg("invalid COPY file header (wrong length)")));
@@ -363,7 +362,7 @@ CopyGetInt32(CopyFromState cstate, int32 *val)
 {
     uint32        buf;
 
-    if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
+    if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
     {
         *val = 0;                /* suppress compiler warning */
         return false;
@@ -380,7 +379,7 @@ CopyGetInt16(CopyFromState cstate, int16 *val)
 {
     uint16        buf;
 
-    if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
+    if (CopyFromStateRead(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf))
     {
         *val = 0;                /* suppress compiler warning */
         return false;
@@ -691,14 +690,14 @@ CopyLoadInputBuf(CopyFromState cstate)
 }
 
 /*
- * CopyReadBinaryData
+ * CopyFromStateRead
  *
  * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf
  * and writes them to 'dest'.  Returns the number of bytes read (which
  * would be less than 'nbytes' only if we reach EOF).
  */
-static int
-CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
+int
+CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes)
 {
     int            copied_bytes = 0;
 
@@ -1078,7 +1077,7 @@ CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
          */
         char        dummy;
 
-        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+        if (CopyFromStateRead(cstate, &dummy, 1) > 0)
             ereport(ERROR,
                     (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                      errmsg("received copy data after EOF marker")));
@@ -2103,8 +2102,8 @@ CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
     resetStringInfo(&cstate->attribute_buf);
 
     enlargeStringInfo(&cstate->attribute_buf, fld_size);
-    if (CopyReadBinaryData(cstate, cstate->attribute_buf.data,
-                           fld_size) != fld_size)
+    if (CopyFromStateRead(cstate, cstate->attribute_buf.data,
+                          fld_size) != fld_size)
         ereport(ERROR,
                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                  errmsg("unexpected EOF in COPY data")));
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 54aa6cdecaf..cd9e352533a 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -63,7 +63,6 @@ static void SendCopyEnd(CopyToState cstate);
 static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
 static void CopySendString(CopyToState cstate, const char *str);
 static void CopySendChar(CopyToState cstate, char c);
-static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
@@ -99,7 +98,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
     }
 
     /* Now take the actions related to the end of a row */
-    CopySendEndOfRow(cstate);
+    CopyToStateFlush(cstate);
 }
 
 /*
@@ -325,7 +324,7 @@ CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
         }
     }
 
-    CopySendEndOfRow(cstate);
+    CopyToStateFlush(cstate);
 }
 
 /*
@@ -339,7 +338,7 @@ CopyToBinaryEnd(CopyToState cstate)
     /* Generate trailer for a binary copy */
     CopySendInt16(cstate, -1);
     /* Need to flush out the trailer */
-    CopySendEndOfRow(cstate);
+    CopyToStateFlush(cstate);
 }
 
 /*
@@ -419,8 +418,8 @@ SendCopyEnd(CopyToState cstate)
  * CopySendData sends output data to the destination (file or frontend)
  * CopySendString does the same for null-terminated strings
  * CopySendChar does the same for single characters
- * CopySendEndOfRow does the appropriate thing at end of each data row
- *    (data is not actually flushed except by CopySendEndOfRow)
+ * CopyToStateFlush flushes the buffered data
+ *    (data is not actually flushed except by CopyToStateFlush)
  *
  * NB: no data conversion is applied by these functions
  *----------
@@ -443,8 +442,8 @@ CopySendChar(CopyToState cstate, char c)
     appendStringInfoCharMacro(cstate->fe_msgbuf, c);
 }
 
-static void
-CopySendEndOfRow(CopyToState cstate)
+void
+CopyToStateFlush(CopyToState cstate)
 {
     StringInfo    fe_msgbuf = cstate->fe_msgbuf;
 
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 3104d99ea9f..0820b47a2d2 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -299,8 +299,13 @@ typedef struct CopyFromStateData
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyFromStateData;
 
+extern int    CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes);
+
 
 typedef struct CopyToStateData *CopyToState;
 
@@ -401,6 +406,11 @@ typedef struct CopyToStateData
     FmgrInfo   *out_functions;    /* lookup info for output functions */
     MemoryContext rowcontext;    /* per-row evaluation context */
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyToStateData;
 
+extern void CopyToStateFlush(CopyToState cstate);
+
 #endif                            /* COPYAPI_H */
-- 
2.45.2


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Alexander Lakhin
Дата:
Сообщение: Re: race condition when writing pg_control
Следующее
От: jian he
Дата:
Сообщение: Re: pgsql: Add more SQL/JSON constructor functions