[PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ...

Поиск
Список
Период
Сортировка
От Daniel Farina
Тема [PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ...
Дата
Msg-id 1259012082-6196-4-git-send-email-dfarina@truviso.com
обсуждение исходный текст
Ответ на [PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..."  (Daniel Farina <dfarina@truviso.com>)
Список pgsql-hackers
This patch enables dblink to be used for the purpose of efficient
bulk-loading via COPY and libpq in combination with the COPY TO
FUNCTION patch.

The following functions were added to accomplish this:

dblink_connection_reset: useful when handling errors and one just
wants to restore a connection to a known state, rolling back as many
transactions as necessary.

dblink_copy_end: completes the COPY

dblink_copy_open: puts a connection into the COPY state.  Accepts
connection name, relation name, and binary mode flag.

dblink_copy_write: writes a row to the last connection put in the COPY
state by dblink_copy_open.

Generally speaking, code that uses this will look like the following
(presuming a named connection has already been made):
   try:       SELECT dblink_copy_open('myconn', 'relation_name', true);       COPY bar TO FUNCTION dblink_copy_write;
-- since the dblink connection is still in the COPY state, one       -- can even copy some more data in multiple
steps...COPYbar_2 TO FUNCTION dblink_copy_write;
 
       SELECT dblink_copy_end();   finally:       SELECT dblink_copy_reset('myconn');

Signed-off-by: Daniel Farina <dfarina@truviso.com>
---contrib/dblink/dblink.c             |  190 +++++++++++++++++++++++++++++++++++contrib/dblink/dblink.h             |
 5 +contrib/dblink/dblink.sql.in        |   20 ++++contrib/dblink/uninstall_dblink.sql |    8 ++4 files changed, 223
insertions(+),0 deletions(-)
 

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 72fdf56..d32aeec 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1722,6 +1722,196 @@ dblink_get_notify(PG_FUNCTION_ARGS) * internal functions */
+/*
+ * Attempts to take the connection into a known state by rolling back
+ * transactions.  If unable to restore the connection to a known idle state,
+ * raises an error.
+ */
+PG_FUNCTION_INFO_V1(dblink_connection_reset);
+Datum
+dblink_connection_reset(PG_FUNCTION_ARGS)
+{
+    PGresult    *res       = NULL;
+    PGconn        *conn       = NULL;
+    char        *conname   = NULL;
+    remoteConn    *rconn       = NULL;
+
+    bool         triedonce = false;
+
+    DBLINK_INIT;
+
+    /* must be text */
+    Assert(PG_NARGS() == 1);
+    DBLINK_GET_NAMED_CONN;
+
+    if (!conn)
+        DBLINK_CONN_NOT_AVAIL;
+
+    while (!triedonce)
+    {
+        switch (PQtransactionStatus(conn))
+        {
+            case PQTRANS_IDLE:
+                /* Everything is okay */
+                goto finish;
+            case PQTRANS_ACTIVE:
+            case PQTRANS_INTRANS:
+            case PQTRANS_INERROR:
+                res = PQexec(conn, "ROLLBACK;");
+
+                if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                    ereport(ERROR,
+                            (errcode(ERRCODE_CONNECTION_FAILURE),
+                             errmsg("%s: could not issue ROLLBACK command",
+                                    PG_FUNCNAME_MACRO)));
+
+                PQclear(res);
+                triedonce = true;
+                break;
+            case PQTRANS_UNKNOWN:
+                elog(ERROR, "%s: connection in unknown transaction state",
+                     PG_FUNCNAME_MACRO);
+        }
+    }
+
+finish:
+    PG_RETURN_VOID();
+}
+
+/*
+ * dblink COPY support, procedures and variables
+ */
+static PGconn *dblink_copy_current = NULL;
+
+/*
+ * dblink_copy_open
+ *
+ * Start a COPY into a given relation on the named remote connection.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_open);
+Datum
+dblink_copy_open(PG_FUNCTION_ARGS)
+{
+    PGresult   *res = NULL;
+    PGconn       *conn = NULL;
+    char       *conname = NULL;
+    remoteConn *rconn = NULL;
+
+    const char    *copy_stmt     = "COPY %s FROM STDIN%s;";
+    const char    *with_binary = " WITH BINARY";
+    const char    *quoted_remoted_relname;
+    bool         isbinary;
+    int             snprintf_retcode;
+
+    /*
+     * Should be large enough to contain any formatted output.  Formed by
+     * counting the characters in the static formatting sections plus the
+     * bounded length of identifiers.  Some modest padding was added for
+     * paranoia's sake, although all uses of this buffer are checked for
+     * over-length formats anyway.
+     */
+    char         buf[64 + NAMEDATALEN];
+
+    DBLINK_INIT;
+
+    /* must be text,text,bool */
+    Assert(PG_NARGS() == 3);
+    DBLINK_GET_NAMED_CONN;
+
+    if (!conn)
+        DBLINK_CONN_NOT_AVAIL;
+
+    /* Read the procedure arguments into primitive values */
+    quoted_remoted_relname = NameListToQuotedString(
+        textToQualifiedNameList(PG_GETARG_TEXT_P(1)));
+    isbinary = PG_GETARG_BOOL(2);
+
+    /*
+     * Query parameterization only handles value-parameters -- of which
+     * identifiers are not considered one of -- so format the string the old
+     * fashioned way.  It is very important to quote identifiers for this
+     * reason, as performed previously.
+     */
+    snprintf_retcode = snprintf(buf, sizeof buf, copy_stmt,
+                                quoted_remoted_relname,
+                                isbinary ? with_binary : "");
+
+    if (snprintf_retcode < 0)
+        elog(ERROR, "could not format dblink COPY query");
+    else if (snprintf_retcode >= sizeof buf)
+        /*
+         * Should not be able to happen, see documentation of the "buf" value
+         * in this procedure.
+         */
+        elog(ERROR, "could not fit formatted dblink COPY query into buffer");
+
+    /*
+     * Run the created query, and check to ensure that PGRES_COPY_IN state has
+     * been achieved.
+     */
+    res = PQexec(conn, buf);
+    if (!res || PQresultStatus(res) != PGRES_COPY_IN)
+        ereport(ERROR,
+                (errcode(ERRCODE_CONNECTION_FAILURE),
+                 errmsg("could not start COPY FROM on remote node")));
+    PQclear(res);
+
+    /*
+     * Everything went well; finally bind the global dblink_copy_current to the
+     * connection ready to accept copy data.
+     */
+    dblink_copy_current = conn;
+    PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
+
+/*
+ * dblink_copy_write
+ *
+ * Write the provided StringInfo to the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_write);
+Datum
+dblink_copy_write(PG_FUNCTION_ARGS)
+{
+    StringInfo copybuf = (void *) PG_GETARG_POINTER(0);
+
+    if (PQputCopyData(dblink_copy_current, copybuf->data, copybuf->len) != 1)
+        ereport(ERROR,
+                (errcode(ERRCODE_CONNECTION_EXCEPTION),
+                 errmsg("could not send row to remote node")));
+
+    PG_RETURN_VOID();
+}
+
+/*
+ * dblink_copy_end
+ *
+ * Finish the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_end);
+Datum
+dblink_copy_end(PG_FUNCTION_ARGS)
+{
+    PGresult   *res;
+
+    /* Check to ensure that termination data was sent successfully */
+    if (PQputCopyEnd(dblink_copy_current, NULL) != 1)
+        elog(ERROR, "COPY end failed");
+
+    do
+    {
+        res = PQgetResult(dblink_copy_current);
+        if (res == NULL)
+            break;
+        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            elog(ERROR, "COPY failed: %s",
+                 PQerrorMessage(dblink_copy_current));
+        PQclear(res);
+    } while (true);
+
+    dblink_copy_current = NULL;
+    PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}/* * get_pkey_attnames
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
index 255f5d0..8a2faee 100644
--- a/contrib/dblink/dblink.h
+++ b/contrib/dblink/dblink.h
@@ -59,4 +59,9 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);extern Datum
dblink_current_query(PG_FUNCTION_ARGS);externDatum dblink_get_notify(PG_FUNCTION_ARGS);
 
+extern Datum dblink_connection_reset(PG_FUNCTION_ARGS);
+
+extern Datum dblink_copy_open(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_write(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_end(PG_FUNCTION_ARGS);#endif   /* DBLINK_H */
diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in
index da5dd65..aedca34 100644
--- a/contrib/dblink/dblink.sql.in
+++ b/contrib/dblink/dblink.sql.in
@@ -221,3 +221,23 @@ CREATE OR REPLACE FUNCTION dblink_get_notify(RETURNS setof recordAS 'MODULE_PATHNAME',
'dblink_get_notify'LANGUAGEC STRICT;
 
+
+CREATE OR REPLACE FUNCTION dblink_connection_reset (text)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_connection_reset'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_open (text, text, boolean)
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_open'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_write (internal)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_copy_write'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_end ()
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_end'
+LANGUAGE C STRICT;
diff --git a/contrib/dblink/uninstall_dblink.sql b/contrib/dblink/uninstall_dblink.sql
index 45cf13c..465beb7 100644
--- a/contrib/dblink/uninstall_dblink.sql
+++ b/contrib/dblink/uninstall_dblink.sql
@@ -11,6 +11,14 @@ DROP FUNCTION dblink_build_sql_delete (text, int2vector, int4, _text);DROP FUNCTION
dblink_build_sql_insert(text, int2vector, int4, _text, _text);
 
+DROP FUNCTION dblink_copy_end ();
+
+DROP FUNCTION dblink_copy_open (text, text, boolean);
+
+DROP FUNCTION dblink_copy_write (internal);
+
+DROP FUNCTION dblink_connection_reset (text);
+DROP FUNCTION dblink_get_pkey (text);DROP TYPE dblink_pkey_results;
-- 
1.6.5.3



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Daniel Farina
Дата:
Сообщение: [PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..."
Следующее
От: Daniel Farina
Дата:
Сообщение: [PATCH 2/4] Add tests for "COPY ... TO FUNCTION ..."