Re: Speed dblink using alternate libpq tuple storage

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: Speed dblink using alternate libpq tuple storage
Дата
Msg-id 20120216.174934.80061891.horiguchi.kyotaro@oss.ntt.co.jp
обсуждение исходный текст
Ответ на Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
Ответы Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
Список pgsql-hackers
Hello,
I added the function PQskipRemainingResult() and use it in
dblink. This reduces the number of executing try-catch block from
the number of rows to one per query in dblink.

- fe-exec.c : new function PQskipRemainingResult.
- dblink.c  : using PQskipRemainingResult in dblink_record_internal().
- libpq.sgml: documentation for PQskipRemainingResult and related             change in RowProcessor.

> Instead I added simple feature: rowProcessor can return '2',
> in which case getAnotherTuple() does early exit without setting
> any error state.  In user side it appears as PQisBusy() returned
> with TRUE result.  All pointers stay valid, so callback can just
> stuff them into some temp area.
...
> It's included in main patch, but I also attached it as separate patch
> so that it can be examined separately and reverted if not acceptable.

This patch is based on the patch above and composed in the same
manner - main three patches include all modifications and the '2'
patch separately.

This patch is not rebased to the HEAD because the HEAD yields
error about the symbol LEAKPROOF...

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/#fe-protocol3.c# b/src/interfaces/libpq/#fe-protocol3.c#
new file mode 100644
index 0000000..8b7eed2
--- /dev/null
+++ b/src/interfaces/libpq/#fe-protocol3.c#
@@ -0,0 +1,1967 @@
+/*-------------------------------------------------------------------------
+ *
+ * fe-protocol3.c
+ *      functions that are specific to frontend/backend protocol version 3
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *      src/interfaces/libpq/fe-protocol3.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <ctype.h>
+#include <fcntl.h>
+
+#include "libpq-fe.h"
+#include "libpq-int.h"
+
+#include "mb/pg_wchar.h"
+
+#ifdef WIN32
+#include "win32.h"
+#else
+#include <unistd.h>
+#include <netinet/in.h>
+#ifdef HAVE_NETINET_TCP_H
+#include <netinet/tcp.h>
+#endif
+#include <arpa/inet.h>
+#endif
+
+
+/*
+ * This macro lists the backend message types that could be "long" (more
+ * than a couple of kilobytes).
+ */
+#define VALID_LONG_MESSAGE_TYPE(id) \
+    ((id) == 'T' || (id) == 'D' || (id) == 'd' || (id) == 'V' || \
+     (id) == 'E' || (id) == 'N' || (id) == 'A')
+
+
+static void handleSyncLoss(PGconn *conn, char id, int msgLength);
+static int    getRowDescriptions(PGconn *conn);
+static int    getParamDescriptions(PGconn *conn);
+static int    getAnotherTuple(PGconn *conn, int msgLength);
+static int    getParameterStatus(PGconn *conn);
+static int    getNotify(PGconn *conn);
+static int    getCopyStart(PGconn *conn, ExecStatusType copytype);
+static int    getReadyForQuery(PGconn *conn);
+static void reportErrorPosition(PQExpBuffer msg, const char *query,
+                    int loc, int encoding);
+static int build_startup_packet(const PGconn *conn, char *packet,
+                     const PQEnvironmentOption *options);
+
+
+/*
+ * parseInput: if appropriate, parse input data from backend
+ * until input is exhausted or a stopping state is reached.
+ * Note that this function will NOT attempt to read more data from the backend.
+ */
+void
+pqParseInput3(PGconn *conn)
+{
+    char        id;
+    int            msgLength;
+    int            avail;
+
+    /*
+     * Loop to parse successive complete messages available in the buffer.
+     */
+    for (;;)
+    {
+        /*
+         * Try to read a message.  First get the type code and length. Return
+         * if not enough data.
+         */
+        conn->inCursor = conn->inStart;
+        if (pqGetc(&id, conn))
+            return;
+        if (pqGetInt(&msgLength, 4, conn))
+            return;
+
+        /*
+         * Try to validate message type/length here.  A length less than 4 is
+         * definitely broken.  Large lengths should only be believed for a few
+         * message types.
+         */
+        if (msgLength < 4)
+        {
+            handleSyncLoss(conn, id, msgLength);
+            return;
+        }
+        if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id))
+        {
+            handleSyncLoss(conn, id, msgLength);
+            return;
+        }
+
+        /*
+         * Can't process if message body isn't all here yet.
+         */
+        msgLength -= 4;
+        avail = conn->inEnd - conn->inCursor;
+        if (avail < msgLength)
+        {
+            /*
+             * Before returning, enlarge the input buffer if needed to hold
+             * the whole message.  This is better than leaving it to
+             * pqReadData because we can avoid multiple cycles of realloc()
+             * when the message is large; also, we can implement a reasonable
+             * recovery strategy if we are unable to make the buffer big
+             * enough.
+             */
+            if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength,
+                                     conn))
+            {
+                /*
+                 * XXX add some better recovery code... plan is to skip over
+                 * the message using its length, then report an error. For the
+                 * moment, just treat this like loss of sync (which indeed it
+                 * might be!)
+                 */
+                handleSyncLoss(conn, id, msgLength);
+            }
+            return;
+        }
+
+        /*
+         * NOTIFY and NOTICE messages can happen in any state; always process
+         * them right away.
+         *
+         * Most other messages should only be processed while in BUSY state.
+         * (In particular, in READY state we hold off further parsing until
+         * the application collects the current PGresult.)
+         *
+         * However, if the state is IDLE then we got trouble; we need to deal
+         * with the unexpected message somehow.
+         *
+         * ParameterStatus ('S') messages are a special case: in IDLE state we
+         * must process 'em (this case could happen if a new value was adopted
+         * from config file due to SIGHUP), but otherwise we hold off until
+         * BUSY state.
+         */
+        if (id == 'A')
+        {
+            if (getNotify(conn))
+                return;
+        }
+        else if (id == 'N')
+        {
+            if (pqGetErrorNotice3(conn, false))
+                return;
+        }
+        else if (conn->asyncStatus != PGASYNC_BUSY)
+        {
+            /* If not IDLE state, just wait ... */
+            if (conn->asyncStatus != PGASYNC_IDLE)
+                return;
+
+            /*
+             * Unexpected message in IDLE state; need to recover somehow.
+             * ERROR messages are displayed using the notice processor;
+             * ParameterStatus is handled normally; anything else is just
+             * dropped on the floor after displaying a suitable warning
+             * notice.    (An ERROR is very possibly the backend telling us why
+             * it is about to close the connection, so we don't want to just
+             * discard it...)
+             */
+            if (id == 'E')
+            {
+                if (pqGetErrorNotice3(conn, false /* treat as notice */ ))
+                    return;
+            }
+            else if (id == 'S')
+            {
+                if (getParameterStatus(conn))
+                    return;
+            }
+            else
+            {
+                pqInternalNotice(&conn->noticeHooks,
+                        "message type 0x%02x arrived from server while idle",
+                                 id);
+                /* Discard the unexpected message */
+                conn->inCursor += msgLength;
+            }
+        }
+        else
+        {
+            /*
+             * In BUSY state, we can process everything.
+             */
+            switch (id)
+            {
+                case 'C':        /* command complete */
+                    if (pqGets(&conn->workBuffer, conn))
+                        return;
+                    if (conn->result == NULL)
+                    {
+                        conn->result = PQmakeEmptyPGresult(conn,
+                                                           PGRES_COMMAND_OK);
+                        if (!conn->result)
+                            return;
+                    }
+                    strncpy(conn->result->cmdStatus, conn->workBuffer.data,
+                            CMDSTATUS_LEN);
+                    conn->asyncStatus = PGASYNC_READY;
+                    break;
+                case 'E':        /* error return */
+                    if (pqGetErrorNotice3(conn, true))
+                        return;
+                    conn->asyncStatus = PGASYNC_READY;
+                    break;
+                case 'Z':        /* backend is ready for new query */
+                    if (getReadyForQuery(conn))
+                        return;
+                    conn->asyncStatus = PGASYNC_IDLE;
+                    break;
+                case 'I':        /* empty query */
+                    if (conn->result == NULL)
+                    {
+                        conn->result = PQmakeEmptyPGresult(conn,
+                                                           PGRES_EMPTY_QUERY);
+                        if (!conn->result)
+                            return;
+                    }
+                    conn->asyncStatus = PGASYNC_READY;
+                    break;
+                case '1':        /* Parse Complete */
+                    /* If we're doing PQprepare, we're done; else ignore */
+                    if (conn->queryclass == PGQUERY_PREPARE)
+                    {
+                        if (conn->result == NULL)
+                        {
+                            conn->result = PQmakeEmptyPGresult(conn,
+                                                           PGRES_COMMAND_OK);
+                            if (!conn->result)
+                                return;
+                        }
+                        conn->asyncStatus = PGASYNC_READY;
+                    }
+                    break;
+                case '2':        /* Bind Complete */
+                case '3':        /* Close Complete */
+                    /* Nothing to do for these message types */
+                    break;
+                case 'S':        /* parameter status */
+                    if (getParameterStatus(conn))
+                        return;
+                    break;
+                case 'K':        /* secret key data from the backend */
+
+                    /*
+                     * This is expected only during backend startup, but it's
+                     * just as easy to handle it as part of the main loop.
+                     * Save the data and continue processing.
+                     */
+                    if (pqGetInt(&(conn->be_pid), 4, conn))
+                        return;
+                    if (pqGetInt(&(conn->be_key), 4, conn))
+                        return;
+                    break;
+                case 'T':        /* Row Description */
+                    if (conn->result == NULL ||
+                        conn->queryclass == PGQUERY_DESCRIBE)
+                    {
+                        /* First 'T' in a query sequence */
+                        if (getRowDescriptions(conn))
+                            return;
+
+                        /*
+                         * If we're doing a Describe, we're ready to pass the
+                         * result back to the client.
+                         */
+                        if (conn->queryclass == PGQUERY_DESCRIBE)
+                            conn->asyncStatus = PGASYNC_READY;
+                    }
+                    else
+                    {
+                        /*
+                         * A new 'T' message is treated as the start of
+                         * another PGresult.  (It is not clear that this is
+                         * really possible with the current backend.) We stop
+                         * parsing until the application accepts the current
+                         * result.
+                         */
+                        conn->asyncStatus = PGASYNC_READY;
+                        return;
+                    }
+                    break;
+                case 'n':        /* No Data */
+
+                    /*
+                     * NoData indicates that we will not be seeing a
+                     * RowDescription message because the statement or portal
+                     * inquired about doesn't return rows.
+                     *
+                     * If we're doing a Describe, we have to pass something
+                     * back to the client, so set up a COMMAND_OK result,
+                     * instead of TUPLES_OK.  Otherwise we can just ignore
+                     * this message.
+                     */
+                    if (conn->queryclass == PGQUERY_DESCRIBE)
+                    {
+                        if (conn->result == NULL)
+                        {
+                            conn->result = PQmakeEmptyPGresult(conn,
+                                                           PGRES_COMMAND_OK);
+                            if (!conn->result)
+                                return;
+                        }
+                        conn->asyncStatus = PGASYNC_READY;
+                    }
+                    break;
+                case 't':        /* Parameter Description */
+                    if (getParamDescriptions(conn))
+                        return;
+                    break;
+                case 'D':        /* Data Row */
+                    if (conn->result != NULL &&
+                        conn->result->resultStatus == PGRES_TUPLES_OK)
+                    {
+                        /* Read another tuple of a normal query response */
+                        if (getAnotherTuple(conn, msgLength))
+                            return;
+                    }
+                    else if (conn->result != NULL &&
+                             conn->result->resultStatus == PGRES_FATAL_ERROR)
+                    {
+                        /*
+                         * We've already choked for some reason.  Just discard
+                         * tuples till we get to the end of the query.
+                         */
+                        conn->inCursor += msgLength;
+                    }
+                    else
+                    {
+                        /* Set up to report error at end of query */
+                        printfPQExpBuffer(&conn->errorMessage,
+                                          libpq_gettext("server sent data (\"D\" message) without prior row
description(\"T\" message)\n"));
 
+                        pqSaveErrorResult(conn);
+                        /* Discard the unexpected message */
+                        conn->inCursor += msgLength;
+                    }
+                    break;
+                case 'G':        /* Start Copy In */
+                    if (getCopyStart(conn, PGRES_COPY_IN))
+                        return;
+                    conn->asyncStatus = PGASYNC_COPY_IN;
+                    break;
+                case 'H':        /* Start Copy Out */
+                    if (getCopyStart(conn, PGRES_COPY_OUT))
+                        return;
+                    conn->asyncStatus = PGASYNC_COPY_OUT;
+                    conn->copy_already_done = 0;
+                    break;
+                case 'W':        /* Start Copy Both */
+                    if (getCopyStart(conn, PGRES_COPY_BOTH))
+                        return;
+                    conn->asyncStatus = PGASYNC_COPY_BOTH;
+                    conn->copy_already_done = 0;
+                    break;
+                case 'd':        /* Copy Data */
+
+                    /*
+                     * If we see Copy Data, just silently drop it.    This would
+                     * only occur if application exits COPY OUT mode too
+                     * early.
+                     */
+                    conn->inCursor += msgLength;
+                    break;
+                case 'c':        /* Copy Done */
+
+                    /*
+                     * If we see Copy Done, just silently drop it.    This is
+                     * the normal case during PQendcopy.  We will keep
+                     * swallowing data, expecting to see command-complete for
+                     * the COPY command.
+                     */
+                    break;
+                default:
+                    printfPQExpBuffer(&conn->errorMessage,
+                                      libpq_gettext(
+                                                    "unexpected response from server; first received character was
\"%c\"\n"),
+                                      id);
+                    /* build an error result holding the error message */
+                    pqSaveErrorResult(conn);
+                    /* not sure if we will see more, so go to ready state */
+                    conn->asyncStatus = PGASYNC_READY;
+                    /* Discard the unexpected message */
+                    conn->inCursor += msgLength;
+                    break;
+            }                    /* switch on protocol character */
+        }
+        /* Successfully consumed this message */
+        if (conn->inCursor == conn->inStart + 5 + msgLength)
+        {
+            /* Normal case: parsing agrees with specified length */
+            conn->inStart = conn->inCursor;
+        }
+        else
+        {
+            /* Trouble --- report it */
+            printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("message contents do not agree with length in message type \"%c\"\n"),
+                              id);
+            /* build an error result holding the error message */
+            pqSaveErrorResult(conn);
+            conn->asyncStatus = PGASYNC_READY;
+            /* trust the specified message length as what to skip */
+            conn->inStart += 5 + msgLength;
+        }
+    }
+}
+
+/*
+ * handleSyncLoss: clean up after loss of message-boundary sync
+ *
+ * There isn't really a lot we can do here except abandon the connection.
+ */
+static void
+handleSyncLoss(PGconn *conn, char id, int msgLength)
+{
+    printfPQExpBuffer(&conn->errorMessage,
+                      libpq_gettext(
+    "lost synchronization with server: got message type \"%c\", length %d\n"),
+                      id, msgLength);
+    /* build an error result holding the error message */
+    pqSaveErrorResult(conn);
+    conn->asyncStatus = PGASYNC_READY;    /* drop out of GetResult wait loop */
+
+    pqsecure_close(conn);
+    closesocket(conn->sock);
+    conn->sock = -1;
+    conn->status = CONNECTION_BAD;        /* No more connection to backend */
+}
+
+/*
+ * parseInput subroutine to read a 'T' (row descriptions) message.
+ * We'll build a new PGresult structure (unless called for a Describe
+ * command for a prepared statement) containing the attribute data.
+ * Returns: 0 if completed message, EOF if not enough data yet.
+ *
+ * Note that if we run out of data, we have to release the partially
+ * constructed PGresult, and rebuild it again next time.  Fortunately,
+ * that shouldn't happen often, since 'T' messages usually fit in a packet.
+ */
+static int
+getRowDescriptions(PGconn *conn)
+{
+    PGresult   *result;
+    int            nfields;
+    int            i;
+
+    /*
+     * When doing Describe for a prepared statement, there'll already be a
+     * PGresult created by getParamDescriptions, and we should fill data into
+     * that.  Otherwise, create a new, empty PGresult.
+     */
+    if (conn->queryclass == PGQUERY_DESCRIBE)
+    {
+        if (conn->result)
+            result = conn->result;
+        else
+            result = PQmakeEmptyPGresult(conn, PGRES_COMMAND_OK);
+    }
+    else
+        result = PQmakeEmptyPGresult(conn, PGRES_TUPLES_OK);
+    if (!result)
+        goto failure;
+
+    /* parseInput already read the 'T' label and message length. */
+    /* the next two bytes are the number of fields */
+    if (pqGetInt(&(result->numAttributes), 2, conn))
+        goto failure;
+    nfields = result->numAttributes;
+
+    /* allocate space for the attribute descriptors */
+    if (nfields > 0)
+    {
+        result->attDescs = (PGresAttDesc *)
+            pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
+        if (!result->attDescs)
+            goto failure;
+        MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
+    }
+
+    /* result->binary is true only if ALL columns are binary */
+    result->binary = (nfields > 0) ? 1 : 0;
+
+    /* get type info */
+    for (i = 0; i < nfields; i++)
+    {
+        int            tableid;
+        int            columnid;
+        int            typid;
+        int            typlen;
+        int            atttypmod;
+        int            format;
+
+        if (pqGets(&conn->workBuffer, conn) ||
+            pqGetInt(&tableid, 4, conn) ||
+            pqGetInt(&columnid, 2, conn) ||
+            pqGetInt(&typid, 4, conn) ||
+            pqGetInt(&typlen, 2, conn) ||
+            pqGetInt(&atttypmod, 4, conn) ||
+            pqGetInt(&format, 2, conn))
+        {
+            goto failure;
+        }
+
+        /*
+         * Since pqGetInt treats 2-byte integers as unsigned, we need to
+         * coerce these results to signed form.
+         */
+        columnid = (int) ((int16) columnid);
+        typlen = (int) ((int16) typlen);
+        format = (int) ((int16) format);
+
+        result->attDescs[i].name = pqResultStrdup(result,
+                                                  conn->workBuffer.data);
+        if (!result->attDescs[i].name)
+            goto failure;
+        result->attDescs[i].tableid = tableid;
+        result->attDescs[i].columnid = columnid;
+        result->attDescs[i].format = format;
+        result->attDescs[i].typid = typid;
+        result->attDescs[i].typlen = typlen;
+        result->attDescs[i].atttypmod = atttypmod;
+
+        if (format != 1)
+            result->binary = 0;
+    }
+
+    /* Success! */
+    conn->result = result;
+    return 0;
+
+failure:
+
+    /*
+     * Discard incomplete result, unless it's from getParamDescriptions.
+     *
+     * Note that if we hit a bufferload boundary while handling the
+     * describe-statement case, we'll forget any PGresult space we just
+     * allocated, and then reallocate it on next try.  This will bloat the
+     * PGresult a little bit but the space will be freed at PQclear, so it
+     * doesn't seem worth trying to be smarter.
+     */
+    if (result != conn->result)
+        PQclear(result);
+    return EOF;
+}
+
+/*
+ * parseInput subroutine to read a 't' (ParameterDescription) message.
+ * We'll build a new PGresult structure containing the parameter data.
+ * Returns: 0 if completed message, EOF if not enough data yet.
+ *
+ * Note that if we run out of data, we have to release the partially
+ * constructed PGresult, and rebuild it again next time.  Fortunately,
+ * that shouldn't happen often, since 't' messages usually fit in a packet.
+ */
+static int
+getParamDescriptions(PGconn *conn)
+{
+    PGresult   *result;
+    int            nparams;
+    int            i;
+
+    result = PQmakeEmptyPGresult(conn, PGRES_COMMAND_OK);
+    if (!result)
+        goto failure;
+
+    /* parseInput already read the 't' label and message length. */
+    /* the next two bytes are the number of parameters */
+    if (pqGetInt(&(result->numParameters), 2, conn))
+        goto failure;
+    nparams = result->numParameters;
+
+    /* allocate space for the parameter descriptors */
+    if (nparams > 0)
+    {
+        result->paramDescs = (PGresParamDesc *)
+            pqResultAlloc(result, nparams * sizeof(PGresParamDesc), TRUE);
+        if (!result->paramDescs)
+            goto failure;
+        MemSet(result->paramDescs, 0, nparams * sizeof(PGresParamDesc));
+    }
+
+    /* get parameter info */
+    for (i = 0; i < nparams; i++)
+    {
+        int            typid;
+
+        if (pqGetInt(&typid, 4, conn))
+            goto failure;
+        result->paramDescs[i].typid = typid;
+    }
+
+    /* Success! */
+    conn->result = result;
+    return 0;
+
+failure:
+    PQclear(result);
+    return EOF;
+}
+
+/*
+ * parseInput subroutine to read a 'D' (row data) message.
+ * We add another tuple to the existing PGresult structure.
+ * Returns: 0 if completed message, EOF if error or not enough data yet.
+ *
+ * Note that if we run out of data, we have to suspend and reprocess
+ * the message after more data is received.  We keep a partially constructed
+ * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ */
+static int
+getAnotherTuple(PGconn *conn, int msgLength)
+{
+    PGresult   *result = conn->result;
+    int            nfields = result->numAttributes;
+    PGrowValue  rowval[result->numAttributes + 1];
+    int            tupnfields;        /* # fields from tuple */
+    int            vlen;            /* length of the current field value */
+    int            i;
+
+    /* Allocate tuple space if first time for this data message */
+    /* Get the field count and make sure it's what we expect */
+    if (pqGetInt(&tupnfields, 2, conn))
+        return EOF;
+
+    if (tupnfields != nfields)
+    {
+        /* Replace partially constructed result with an error result */
+        printfPQExpBuffer(&conn->errorMessage,
+                 libpq_gettext("unexpected field count in \"D\" message\n"));
+        pqSaveErrorResult(conn);
+        /* Discard the failed message by pretending we read it */
+        conn->inCursor = conn->inStart + 5 + msgLength;
+        return 0;
+    }
+
+    /* Scan the fields */
+    for (i = 0; i < nfields; i++)
+    {
+        /* get the value length */
+        if (pqGetInt(&vlen, 4, conn))
+            return EOF;
+        if (vlen == -1)
+            vlen = NULL_LEN;
+        else if (vlen < 0)
+            vlen = 0;
+        
+        /*
+         * Buffer content may be shifted on reloading additional
+         * data. So we must set all pointers on every scan.
+         * 
+         * rowval[i].value always points to the next address of the
+         * length field even if the value length is zero or the value
+         * is NULL for the access safety.
+         */
+        rowval[i].value = conn->inBuffer + conn->inCursor;
+         rowval[i].len = vlen;
+
+        /* Skip to the next length field */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            return EOF;
+    }
+
+    /*
+     * Set rowval[nfields] for the access safety. We can estimate the
+     * length of the buffer to store by
+     *
+     *    rowval[nfields].value - rowval[0].value - 4 * nfields.
+     */
+    rowval[nfields].value = conn->inBuffer + conn->inCursor;
+    rowval[nfields].len = NULL_LEN;
+
+    /* Success!  Pass the completed row values to rowProcessor */
+    if (!result->rowProcessor(result, result->rowProcessorParam, rowval))
+        goto rowProcessError;
+    
+    /* Free garbage error message. */
+    if (result->rowProcessorErrMes)
+    {
+        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }
+
+    return 0;
+
+rowProcessError:
+
+    /*
+     * Replace partially constructed result with an error result. First
+     * discard the old result to try to win back some memory.
+     */
+    pqClearAsyncResult(conn);
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * If error message is passed from addTupleFunc, set it into
+     * PGconn, assume out of memory if not.
+     */
+    appendPQExpBufferStr(&conn->errorMessage,
+                         libpq_gettext(result->rowProcessorErrMes ?
+                                       result->rowProcessorErrMes : 
+                                       "out of memory for query result\n"));
+    if (result->rowProcessorErrMes)
+    {
+o        free(result->rowProcessorErrMes);
+        result->rowProcessorErrMes = NULL;
+    }
+    pqSaveErrorResult(conn);
+
+    /* Discard the failed message by pretending we read it */
+    conn->inCursor = conn->inStart + 5 + msgLength;
+    return 0;
+}
+
+
+/*
+ * Attempt to read an Error or Notice response message.
+ * This is possible in several places, so we break it out as a subroutine.
+ * Entry: 'E' or 'N' message type and length have already been consumed.
+ * Exit: returns 0 if successfully consumed message.
+ *         returns EOF if not enough data.
+ */
+int
+pqGetErrorNotice3(PGconn *conn, bool isError)
+{
+    PGresult   *res = NULL;
+    PQExpBufferData workBuf;
+    char        id;
+    const char *val;
+    const char *querytext = NULL;
+    int            querypos = 0;
+
+    /*
+     * Since the fields might be pretty long, we create a temporary
+     * PQExpBuffer rather than using conn->workBuffer.    workBuffer is intended
+     * for stuff that is expected to be short.    We shouldn't use
+     * conn->errorMessage either, since this might be only a notice.
+     */
+    initPQExpBuffer(&workBuf);
+
+    /*
+     * Make a PGresult to hold the accumulated fields.    We temporarily lie
+     * about the result status, so that PQmakeEmptyPGresult doesn't uselessly
+     * copy conn->errorMessage.
+     */
+    res = PQmakeEmptyPGresult(conn, PGRES_EMPTY_QUERY);
+    if (!res)
+        goto fail;
+    res->resultStatus = isError ? PGRES_FATAL_ERROR : PGRES_NONFATAL_ERROR;
+
+    /*
+     * Read the fields and save into res.
+     */
+    for (;;)
+    {
+        if (pqGetc(&id, conn))
+            goto fail;
+        if (id == '\0')
+            break;                /* terminator found */
+        if (pqGets(&workBuf, conn))
+            goto fail;
+        pqSaveMessageField(res, id, workBuf.data);
+    }
+
+    /*
+     * Now build the "overall" error message for PQresultErrorMessage.
+     *
+     * Also, save the SQLSTATE in conn->last_sqlstate.
+     */
+    resetPQExpBuffer(&workBuf);
+    val = PQresultErrorField(res, PG_DIAG_SEVERITY);
+    if (val)
+        appendPQExpBuffer(&workBuf, "%s:  ", val);
+    val = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+    if (val)
+    {
+        if (strlen(val) < sizeof(conn->last_sqlstate))
+            strcpy(conn->last_sqlstate, val);
+        if (conn->verbosity == PQERRORS_VERBOSE)
+            appendPQExpBuffer(&workBuf, "%s: ", val);
+    }
+    val = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+    if (val)
+        appendPQExpBufferStr(&workBuf, val);
+    val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
+    if (val)
+    {
+        if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+        {
+            /* emit position as a syntax cursor display */
+            querytext = conn->last_query;
+            querypos = atoi(val);
+        }
+        else
+        {
+            /* emit position as text addition to primary message */
+            /* translator: %s represents a digit string */
+            appendPQExpBuffer(&workBuf, libpq_gettext(" at character %s"),
+                              val);
+        }
+    }
+    else
+    {
+        val = PQresultErrorField(res, PG_DIAG_INTERNAL_POSITION);
+        if (val)
+        {
+            querytext = PQresultErrorField(res, PG_DIAG_INTERNAL_QUERY);
+            if (conn->verbosity != PQERRORS_TERSE && querytext != NULL)
+            {
+                /* emit position as a syntax cursor display */
+                querypos = atoi(val);
+            }
+            else
+            {
+                /* emit position as text addition to primary message */
+                /* translator: %s represents a digit string */
+                appendPQExpBuffer(&workBuf, libpq_gettext(" at character %s"),
+                                  val);
+            }
+        }
+    }
+    appendPQExpBufferChar(&workBuf, '\n');
+    if (conn->verbosity != PQERRORS_TERSE)
+    {
+        if (querytext && querypos > 0)
+            reportErrorPosition(&workBuf, querytext, querypos,
+                                conn->client_encoding);
+        val = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+        if (val)
+            appendPQExpBuffer(&workBuf, libpq_gettext("DETAIL:  %s\n"), val);
+        val = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+        if (val)
+            appendPQExpBuffer(&workBuf, libpq_gettext("HINT:  %s\n"), val);
+        val = PQresultErrorField(res, PG_DIAG_INTERNAL_QUERY);
+        if (val)
+            appendPQExpBuffer(&workBuf, libpq_gettext("QUERY:  %s\n"), val);
+        val = PQresultErrorField(res, PG_DIAG_CONTEXT);
+        if (val)
+            appendPQExpBuffer(&workBuf, libpq_gettext("CONTEXT:  %s\n"), val);
+    }
+    if (conn->verbosity == PQERRORS_VERBOSE)
+    {
+        const char *valf;
+        const char *vall;
+
+        valf = PQresultErrorField(res, PG_DIAG_SOURCE_FILE);
+        vall = PQresultErrorField(res, PG_DIAG_SOURCE_LINE);
+        val = PQresultErrorField(res, PG_DIAG_SOURCE_FUNCTION);
+        if (val || valf || vall)
+        {
+            appendPQExpBufferStr(&workBuf, libpq_gettext("LOCATION:  "));
+            if (val)
+                appendPQExpBuffer(&workBuf, libpq_gettext("%s, "), val);
+            if (valf && vall)    /* unlikely we'd have just one */
+                appendPQExpBuffer(&workBuf, libpq_gettext("%s:%s"),
+                                  valf, vall);
+            appendPQExpBufferChar(&workBuf, '\n');
+        }
+    }
+
+    /*
+     * Either save error as current async result, or just emit the notice.
+     */
+    if (isError)
+    {
+        res->errMsg = pqResultStrdup(res, workBuf.data);
+        if (!res->errMsg)
+            goto fail;
+        pqClearAsyncResult(conn);
+        conn->result = res;
+        appendPQExpBufferStr(&conn->errorMessage, workBuf.data);
+    }
+    else
+    {
+        /* We can cheat a little here and not copy the message. */
+        res->errMsg = workBuf.data;
+        if (res->noticeHooks.noticeRec != NULL)
+            (*res->noticeHooks.noticeRec) (res->noticeHooks.noticeRecArg, res);
+        PQclear(res);
+    }
+
+    termPQExpBuffer(&workBuf);
+    return 0;
+
+fail:
+    PQclear(res);
+    termPQExpBuffer(&workBuf);
+    return EOF;
+}
+
+/*
+ * Add an error-location display to the error message under construction.
+ *
+ * The cursor location is measured in logical characters; the query string
+ * is presumed to be in the specified encoding.
+ */
+static void
+reportErrorPosition(PQExpBuffer msg, const char *query, int loc, int encoding)
+{
+#define DISPLAY_SIZE    60        /* screen width limit, in screen cols */
+#define MIN_RIGHT_CUT    10        /* try to keep this far away from EOL */
+
+    char       *wquery;
+    int            slen,
+                cno,
+                i,
+               *qidx,
+               *scridx,
+                qoffset,
+                scroffset,
+                ibeg,
+                iend,
+                loc_line;
+    bool        mb_encoding,
+                beg_trunc,
+                end_trunc;
+
+    /* Convert loc from 1-based to 0-based; no-op if out of range */
+    loc--;
+    if (loc < 0)
+        return;
+
+    /* Need a writable copy of the query */
+    wquery = strdup(query);
+    if (wquery == NULL)
+        return;                    /* fail silently if out of memory */
+
+    /*
+     * Each character might occupy multiple physical bytes in the string, and
+     * in some Far Eastern character sets it might take more than one screen
+     * column as well.    We compute the starting byte offset and starting
+     * screen column of each logical character, and store these in qidx[] and
+     * scridx[] respectively.
+     */
+
+    /* we need a safe allocation size... */
+    slen = strlen(wquery) + 1;
+
+    qidx = (int *) malloc(slen * sizeof(int));
+    if (qidx == NULL)
+    {
+        free(wquery);
+        return;
+    }
+    scridx = (int *) malloc(slen * sizeof(int));
+    if (scridx == NULL)
+    {
+        free(qidx);
+        free(wquery);
+        return;
+    }
+
+    /* We can optimize a bit if it's a single-byte encoding */
+    mb_encoding = (pg_encoding_max_length(encoding) != 1);
+
+    /*
+     * Within the scanning loop, cno is the current character's logical
+     * number, qoffset is its offset in wquery, and scroffset is its starting
+     * logical screen column (all indexed from 0).    "loc" is the logical
+     * character number of the error location.    We scan to determine loc_line
+     * (the 1-based line number containing loc) and ibeg/iend (first character
+     * number and last+1 character number of the line containing loc). Note
+     * that qidx[] and scridx[] are filled only as far as iend.
+     */
+    qoffset = 0;
+    scroffset = 0;
+    loc_line = 1;
+    ibeg = 0;
+    iend = -1;                    /* -1 means not set yet */
+
+    for (cno = 0; wquery[qoffset] != '\0'; cno++)
+    {
+        char        ch = wquery[qoffset];
+
+        qidx[cno] = qoffset;
+        scridx[cno] = scroffset;
+
+        /*
+         * Replace tabs with spaces in the writable copy.  (Later we might
+         * want to think about coping with their variable screen width, but
+         * not today.)
+         */
+        if (ch == '\t')
+            wquery[qoffset] = ' ';
+
+        /*
+         * If end-of-line, count lines and mark positions. Each \r or \n
+         * counts as a line except when \r \n appear together.
+         */
+        else if (ch == '\r' || ch == '\n')
+        {
+            if (cno < loc)
+            {
+                if (ch == '\r' ||
+                    cno == 0 ||
+                    wquery[qidx[cno - 1]] != '\r')
+                    loc_line++;
+                /* extract beginning = last line start before loc. */
+                ibeg = cno + 1;
+            }
+            else
+            {
+                /* set extract end. */
+                iend = cno;
+                /* done scanning. */
+                break;
+            }
+        }
+
+        /* Advance */
+        if (mb_encoding)
+        {
+            int            w;
+
+            w = pg_encoding_dsplen(encoding, &wquery[qoffset]);
+            /* treat any non-tab control chars as width 1 */
+            if (w <= 0)
+                w = 1;
+            scroffset += w;
+            qoffset += pg_encoding_mblen(encoding, &wquery[qoffset]);
+        }
+        else
+        {
+            /* We assume wide chars only exist in multibyte encodings */
+            scroffset++;
+            qoffset++;
+        }
+    }
+    /* Fix up if we didn't find an end-of-line after loc */
+    if (iend < 0)
+    {
+        iend = cno;                /* query length in chars, +1 */
+        qidx[iend] = qoffset;
+        scridx[iend] = scroffset;
+    }
+
+    /* Print only if loc is within computed query length */
+    if (loc <= cno)
+    {
+        /* If the line extracted is too long, we truncate it. */
+        beg_trunc = false;
+        end_trunc = false;
+        if (scridx[iend] - scridx[ibeg] > DISPLAY_SIZE)
+        {
+            /*
+             * We first truncate right if it is enough.  This code might be
+             * off a space or so on enforcing MIN_RIGHT_CUT if there's a wide
+             * character right there, but that should be okay.
+             */
+            if (scridx[ibeg] + DISPLAY_SIZE >= scridx[loc] + MIN_RIGHT_CUT)
+            {
+                while (scridx[iend] - scridx[ibeg] > DISPLAY_SIZE)
+                    iend--;
+                end_trunc = true;
+            }
+            else
+            {
+                /* Truncate right if not too close to loc. */
+                while (scridx[loc] + MIN_RIGHT_CUT < scridx[iend])
+                {
+                    iend--;
+                    end_trunc = true;
+                }
+
+                /* Truncate left if still too long. */
+                while (scridx[iend] - scridx[ibeg] > DISPLAY_SIZE)
+                {
+                    ibeg++;
+                    beg_trunc = true;
+                }
+            }
+        }
+
+        /* truncate working copy at desired endpoint */
+        wquery[qidx[iend]] = '\0';
+
+        /* Begin building the finished message. */
+        i = msg->len;
+        appendPQExpBuffer(msg, libpq_gettext("LINE %d: "), loc_line);
+        if (beg_trunc)
+            appendPQExpBufferStr(msg, "...");
+
+        /*
+         * While we have the prefix in the msg buffer, compute its screen
+         * width.
+         */
+        scroffset = 0;
+        for (; i < msg->len; i += pg_encoding_mblen(encoding, &msg->data[i]))
+        {
+            int            w = pg_encoding_dsplen(encoding, &msg->data[i]);
+
+            if (w <= 0)
+                w = 1;
+            scroffset += w;
+        }
+
+        /* Finish up the LINE message line. */
+        appendPQExpBufferStr(msg, &wquery[qidx[ibeg]]);
+        if (end_trunc)
+            appendPQExpBufferStr(msg, "...");
+        appendPQExpBufferChar(msg, '\n');
+
+        /* Now emit the cursor marker line. */
+        scroffset += scridx[loc] - scridx[ibeg];
+        for (i = 0; i < scroffset; i++)
+            appendPQExpBufferChar(msg, ' ');
+        appendPQExpBufferChar(msg, '^');
+        appendPQExpBufferChar(msg, '\n');
+    }
+
+    /* Clean up. */
+    free(scridx);
+    free(qidx);
+    free(wquery);
+}
+
+
+/*
+ * Attempt to read a ParameterStatus message.
+ * This is possible in several places, so we break it out as a subroutine.
+ * Entry: 'S' message type and length have already been consumed.
+ * Exit: returns 0 if successfully consumed message.
+ *         returns EOF if not enough data.
+ */
+static int
+getParameterStatus(PGconn *conn)
+{
+    PQExpBufferData valueBuf;
+
+    /* Get the parameter name */
+    if (pqGets(&conn->workBuffer, conn))
+        return EOF;
+    /* Get the parameter value (could be large) */
+    initPQExpBuffer(&valueBuf);
+    if (pqGets(&valueBuf, conn))
+    {
+        termPQExpBuffer(&valueBuf);
+        return EOF;
+    }
+    /* And save it */
+    pqSaveParameterStatus(conn, conn->workBuffer.data, valueBuf.data);
+    termPQExpBuffer(&valueBuf);
+    return 0;
+}
+
+
+/*
+ * Attempt to read a Notify response message.
+ * This is possible in several places, so we break it out as a subroutine.
+ * Entry: 'A' message type and length have already been consumed.
+ * Exit: returns 0 if successfully consumed Notify message.
+ *         returns EOF if not enough data.
+ */
+static int
+getNotify(PGconn *conn)
+{
+    int            be_pid;
+    char       *svname;
+    int            nmlen;
+    int            extralen;
+    PGnotify   *newNotify;
+
+    if (pqGetInt(&be_pid, 4, conn))
+        return EOF;
+    if (pqGets(&conn->workBuffer, conn))
+        return EOF;
+    /* must save name while getting extra string */
+    svname = strdup(conn->workBuffer.data);
+    if (!svname)
+        return EOF;
+    if (pqGets(&conn->workBuffer, conn))
+    {
+        free(svname);
+        return EOF;
+    }
+
+    /*
+     * Store the strings right after the PQnotify structure so it can all be
+     * freed at once.  We don't use NAMEDATALEN because we don't want to tie
+     * this interface to a specific server name length.
+     */
+    nmlen = strlen(svname);
+    extralen = strlen(conn->workBuffer.data);
+    newNotify = (PGnotify *) malloc(sizeof(PGnotify) + nmlen + extralen + 2);
+    if (newNotify)
+    {
+        newNotify->relname = (char *) newNotify + sizeof(PGnotify);
+        strcpy(newNotify->relname, svname);
+        newNotify->extra = newNotify->relname + nmlen + 1;
+        strcpy(newNotify->extra, conn->workBuffer.data);
+        newNotify->be_pid = be_pid;
+        newNotify->next = NULL;
+        if (conn->notifyTail)
+            conn->notifyTail->next = newNotify;
+        else
+            conn->notifyHead = newNotify;
+        conn->notifyTail = newNotify;
+    }
+
+    free(svname);
+    return 0;
+}
+
+/*
+ * getCopyStart - process CopyInResponse, CopyOutResponse or
+ * CopyBothResponse message
+ *
+ * parseInput already read the message type and length.
+ */
+static int
+getCopyStart(PGconn *conn, ExecStatusType copytype)
+{
+    PGresult   *result;
+    int            nfields;
+    int            i;
+
+    result = PQmakeEmptyPGresult(conn, copytype);
+    if (!result)
+        goto failure;
+
+    if (pqGetc(&conn->copy_is_binary, conn))
+        goto failure;
+    result->binary = conn->copy_is_binary;
+    /* the next two bytes are the number of fields    */
+    if (pqGetInt(&(result->numAttributes), 2, conn))
+        goto failure;
+    nfields = result->numAttributes;
+
+    /* allocate space for the attribute descriptors */
+    if (nfields > 0)
+    {
+        result->attDescs = (PGresAttDesc *)
+            pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
+        if (!result->attDescs)
+            goto failure;
+        MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
+    }
+
+    for (i = 0; i < nfields; i++)
+    {
+        int            format;
+
+        if (pqGetInt(&format, 2, conn))
+            goto failure;
+
+        /*
+         * Since pqGetInt treats 2-byte integers as unsigned, we need to
+         * coerce these results to signed form.
+         */
+        format = (int) ((int16) format);
+        result->attDescs[i].format = format;
+    }
+
+    /* Success! */
+    conn->result = result;
+    return 0;
+
+failure:
+    PQclear(result);
+    return EOF;
+}
+
+/*
+ * getReadyForQuery - process ReadyForQuery message
+ */
+static int
+getReadyForQuery(PGconn *conn)
+{
+    char        xact_status;
+
+    if (pqGetc(&xact_status, conn))
+        return EOF;
+    switch (xact_status)
+    {
+        case 'I':
+            conn->xactStatus = PQTRANS_IDLE;
+            break;
+        case 'T':
+            conn->xactStatus = PQTRANS_INTRANS;
+            break;
+        case 'E':
+            conn->xactStatus = PQTRANS_INERROR;
+            break;
+        default:
+            conn->xactStatus = PQTRANS_UNKNOWN;
+            break;
+    }
+
+    return 0;
+}
+
+/*
+ * getCopyDataMessage - fetch next CopyData message, process async messages
+ *
+ * Returns length word of CopyData message (> 0), or 0 if no complete
+ * message available, -1 if end of copy, -2 if error.
+ */
+static int
+getCopyDataMessage(PGconn *conn)
+{
+    char        id;
+    int            msgLength;
+    int            avail;
+
+    for (;;)
+    {
+        /*
+         * Do we have the next input message?  To make life simpler for async
+         * callers, we keep returning 0 until the next message is fully
+         * available, even if it is not Copy Data.
+         */
+        conn->inCursor = conn->inStart;
+        if (pqGetc(&id, conn))
+            return 0;
+        if (pqGetInt(&msgLength, 4, conn))
+            return 0;
+        if (msgLength < 4)
+        {
+            handleSyncLoss(conn, id, msgLength);
+            return -2;
+        }
+        avail = conn->inEnd - conn->inCursor;
+        if (avail < msgLength - 4)
+        {
+            /*
+             * Before returning, enlarge the input buffer if needed to hold
+             * the whole message.  See notes in parseInput.
+             */
+            if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength - 4,
+                                     conn))
+            {
+                /*
+                 * XXX add some better recovery code... plan is to skip over
+                 * the message using its length, then report an error. For the
+                 * moment, just treat this like loss of sync (which indeed it
+                 * might be!)
+                 */
+                handleSyncLoss(conn, id, msgLength);
+                return -2;
+            }
+            return 0;
+        }
+
+        /*
+         * If it's a legitimate async message type, process it.  (NOTIFY
+         * messages are not currently possible here, but we handle them for
+         * completeness.)  Otherwise, if it's anything except Copy Data,
+         * report end-of-copy.
+         */
+        switch (id)
+        {
+            case 'A':            /* NOTIFY */
+                if (getNotify(conn))
+                    return 0;
+                break;
+            case 'N':            /* NOTICE */
+                if (pqGetErrorNotice3(conn, false))
+                    return 0;
+                break;
+            case 'S':            /* ParameterStatus */
+                if (getParameterStatus(conn))
+                    return 0;
+                break;
+            case 'd':            /* Copy Data, pass it back to caller */
+                return msgLength;
+            default:            /* treat as end of copy */
+                return -1;
+        }
+
+        /* Drop the processed message and loop around for another */
+        conn->inStart = conn->inCursor;
+    }
+}
+
+/*
+ * PQgetCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH
+ *
+ * If successful, sets *buffer to point to a malloc'd row of data, and
+ * returns row length (always > 0) as result.
+ * Returns 0 if no row available yet (only possible if async is true),
+ * -1 if end of copy (consult PQgetResult), or -2 if error (consult
+ * PQerrorMessage).
+ */
+int
+pqGetCopyData3(PGconn *conn, char **buffer, int async)
+{
+    int            msgLength;
+
+    for (;;)
+    {
+        /*
+         * Collect the next input message.    To make life simpler for async
+         * callers, we keep returning 0 until the next message is fully
+         * available, even if it is not Copy Data.
+         */
+        msgLength = getCopyDataMessage(conn);
+        if (msgLength < 0)
+        {
+            /*
+             * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
+             * read status with PQgetResult().    The normal case is that it's
+             * Copy Done, but we let parseInput read that.    If error, we
+             * expect the state was already changed.
+             */
+            if (msgLength == -1)
+                conn->asyncStatus = PGASYNC_BUSY;
+            return msgLength;    /* end-of-copy or error */
+        }
+        if (msgLength == 0)
+        {
+            /* Don't block if async read requested */
+            if (async)
+                return 0;
+            /* Need to load more data */
+            if (pqWait(TRUE, FALSE, conn) ||
+                pqReadData(conn) < 0)
+                return -2;
+            continue;
+        }
+
+        /*
+         * Drop zero-length messages (shouldn't happen anyway).  Otherwise
+         * pass the data back to the caller.
+         */
+        msgLength -= 4;
+        if (msgLength > 0)
+        {
+            *buffer = (char *) malloc(msgLength + 1);
+            if (*buffer == NULL)
+            {
+                printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext("out of memory\n"));
+                return -2;
+            }
+            memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
+            (*buffer)[msgLength] = '\0';        /* Add terminating null */
+
+            /* Mark message consumed */
+            conn->inStart = conn->inCursor + msgLength;
+
+            return msgLength;
+        }
+
+        /* Empty, so drop it and loop around for another */
+        conn->inStart = conn->inCursor;
+    }
+}
+
+/*
+ * PQgetline - gets a newline-terminated string from the backend.
+ *
+ * See fe-exec.c for documentation.
+ */
+int
+pqGetline3(PGconn *conn, char *s, int maxlen)
+{
+    int            status;
+
+    if (conn->sock < 0 ||
+        conn->asyncStatus != PGASYNC_COPY_OUT ||
+        conn->copy_is_binary)
+    {
+        printfPQExpBuffer(&conn->errorMessage,
+                      libpq_gettext("PQgetline: not doing text COPY OUT\n"));
+        *s = '\0';
+        return EOF;
+    }
+
+    while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
+    {
+        /* need to load more data */
+        if (pqWait(TRUE, FALSE, conn) ||
+            pqReadData(conn) < 0)
+        {
+            *s = '\0';
+            return EOF;
+        }
+    }
+
+    if (status < 0)
+    {
+        /* End of copy detected; gin up old-style terminator */
+        strcpy(s, "\\.");
+        return 0;
+    }
+
+    /* Add null terminator, and strip trailing \n if present */
+    if (s[status - 1] == '\n')
+    {
+        s[status - 1] = '\0';
+        return 0;
+    }
+    else
+    {
+        s[status] = '\0';
+        return 1;
+    }
+}
+
+/*
+ * PQgetlineAsync - gets a COPY data row without blocking.
+ *
+ * See fe-exec.c for documentation.
+ */
+int
+pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize)
+{
+    int            msgLength;
+    int            avail;
+
+    if (conn->asyncStatus != PGASYNC_COPY_OUT)
+        return -1;                /* we are not doing a copy... */
+
+    /*
+     * Recognize the next input message.  To make life simpler for async
+     * callers, we keep returning 0 until the next message is fully available
+     * even if it is not Copy Data.  This should keep PQendcopy from blocking.
+     * (Note: unlike pqGetCopyData3, we do not change asyncStatus here.)
+     */
+    msgLength = getCopyDataMessage(conn);
+    if (msgLength < 0)
+        return -1;                /* end-of-copy or error */
+    if (msgLength == 0)
+        return 0;                /* no data yet */
+
+    /*
+     * Move data from libpq's buffer to the caller's.  In the case where a
+     * prior call found the caller's buffer too small, we use
+     * conn->copy_already_done to remember how much of the row was already
+     * returned to the caller.
+     */
+    conn->inCursor += conn->copy_already_done;
+    avail = msgLength - 4 - conn->copy_already_done;
+    if (avail <= bufsize)
+    {
+        /* Able to consume the whole message */
+        memcpy(buffer, &conn->inBuffer[conn->inCursor], avail);
+        /* Mark message consumed */
+        conn->inStart = conn->inCursor + avail;
+        /* Reset state for next time */
+        conn->copy_already_done = 0;
+        return avail;
+    }
+    else
+    {
+        /* We must return a partial message */
+        memcpy(buffer, &conn->inBuffer[conn->inCursor], bufsize);
+        /* The message is NOT consumed from libpq's buffer */
+        conn->copy_already_done += bufsize;
+        return bufsize;
+    }
+}
+
+/*
+ * PQendcopy
+ *
+ * See fe-exec.c for documentation.
+ */
+int
+pqEndcopy3(PGconn *conn)
+{
+    PGresult   *result;
+
+    if (conn->asyncStatus != PGASYNC_COPY_IN &&
+        conn->asyncStatus != PGASYNC_COPY_OUT)
+    {
+        printfPQExpBuffer(&conn->errorMessage,
+                          libpq_gettext("no COPY in progress\n"));
+        return 1;
+    }
+
+    /* Send the CopyDone message if needed */
+    if (conn->asyncStatus == PGASYNC_COPY_IN)
+    {
+        if (pqPutMsgStart('c', false, conn) < 0 ||
+            pqPutMsgEnd(conn) < 0)
+            return 1;
+
+        /*
+         * If we sent the COPY command in extended-query mode, we must issue a
+         * Sync as well.
+         */
+        if (conn->queryclass != PGQUERY_SIMPLE)
+        {
+            if (pqPutMsgStart('S', false, conn) < 0 ||
+                pqPutMsgEnd(conn) < 0)
+                return 1;
+        }
+    }
+
+    /*
+     * make sure no data is waiting to be sent, abort if we are non-blocking
+     * and the flush fails
+     */
+    if (pqFlush(conn) && pqIsnonblocking(conn))
+        return 1;
+
+    /* Return to active duty */
+    conn->asyncStatus = PGASYNC_BUSY;
+    resetPQExpBuffer(&conn->errorMessage);
+
+    /*
+     * Non blocking connections may have to abort at this point.  If everyone
+     * played the game there should be no problem, but in error scenarios the
+     * expected messages may not have arrived yet.    (We are assuming that the
+     * backend's packetizing will ensure that CommandComplete arrives along
+     * with the CopyDone; are there corner cases where that doesn't happen?)
+     */
+    if (pqIsnonblocking(conn) && PQisBusy(conn))
+        return 1;
+
+    /* Wait for the completion response */
+    result = PQgetResult(conn);
+
+    /* Expecting a successful result */
+    if (result && result->resultStatus == PGRES_COMMAND_OK)
+    {
+        PQclear(result);
+        return 0;
+    }
+
+    /*
+     * Trouble. For backwards-compatibility reasons, we issue the error
+     * message as if it were a notice (would be nice to get rid of this
+     * silliness, but too many apps probably don't handle errors from
+     * PQendcopy reasonably).  Note that the app can still obtain the error
+     * status from the PGconn object.
+     */
+    if (conn->errorMessage.len > 0)
+    {
+        /* We have to strip the trailing newline ... pain in neck... */
+        char        svLast = conn->errorMessage.data[conn->errorMessage.len - 1];
+
+        if (svLast == '\n')
+            conn->errorMessage.data[conn->errorMessage.len - 1] = '\0';
+        pqInternalNotice(&conn->noticeHooks, "%s", conn->errorMessage.data);
+        conn->errorMessage.data[conn->errorMessage.len - 1] = svLast;
+    }
+
+    PQclear(result);
+
+    return 1;
+}
+
+
+/*
+ * PQfn - Send a function call to the POSTGRES backend.
+ *
+ * See fe-exec.c for documentation.
+ */
+PGresult *
+pqFunctionCall3(PGconn *conn, Oid fnid,
+                int *result_buf, int *actual_result_len,
+                int result_is_int,
+                const PQArgBlock *args, int nargs)
+{
+    bool        needInput = false;
+    ExecStatusType status = PGRES_FATAL_ERROR;
+    char        id;
+    int            msgLength;
+    int            avail;
+    int            i;
+
+    /* PQfn already validated connection state */
+
+    if (pqPutMsgStart('F', false, conn) < 0 ||    /* function call msg */
+        pqPutInt(fnid, 4, conn) < 0 ||    /* function id */
+        pqPutInt(1, 2, conn) < 0 ||        /* # of format codes */
+        pqPutInt(1, 2, conn) < 0 ||        /* format code: BINARY */
+        pqPutInt(nargs, 2, conn) < 0)    /* # of args */
+    {
+        pqHandleSendFailure(conn);
+        return NULL;
+    }
+
+    for (i = 0; i < nargs; ++i)
+    {                            /* len.int4 + contents       */
+        if (pqPutInt(args[i].len, 4, conn))
+        {
+            pqHandleSendFailure(conn);
+            return NULL;
+        }
+        if (args[i].len == -1)
+            continue;            /* it's NULL */
+
+        if (args[i].isint)
+        {
+            if (pqPutInt(args[i].u.integer, args[i].len, conn))
+            {
+                pqHandleSendFailure(conn);
+                return NULL;
+            }
+        }
+        else
+        {
+            if (pqPutnchar((char *) args[i].u.ptr, args[i].len, conn))
+            {
+                pqHandleSendFailure(conn);
+                return NULL;
+            }
+        }
+    }
+
+    if (pqPutInt(1, 2, conn) < 0)        /* result format code: BINARY */
+    {
+        pqHandleSendFailure(conn);
+        return NULL;
+    }
+
+    if (pqPutMsgEnd(conn) < 0 ||
+        pqFlush(conn))
+    {
+        pqHandleSendFailure(conn);
+        return NULL;
+    }
+
+    for (;;)
+    {
+        if (needInput)
+        {
+            /* Wait for some data to arrive (or for the channel to close) */
+            if (pqWait(TRUE, FALSE, conn) ||
+                pqReadData(conn) < 0)
+                break;
+        }
+
+        /*
+         * Scan the message. If we run out of data, loop around to try again.
+         */
+        needInput = true;
+
+        conn->inCursor = conn->inStart;
+        if (pqGetc(&id, conn))
+            continue;
+        if (pqGetInt(&msgLength, 4, conn))
+            continue;
+
+        /*
+         * Try to validate message type/length here.  A length less than 4 is
+         * definitely broken.  Large lengths should only be believed for a few
+         * message types.
+         */
+        if (msgLength < 4)
+        {
+            handleSyncLoss(conn, id, msgLength);
+            break;
+        }
+        if (msgLength > 30000 && !VALID_LONG_MESSAGE_TYPE(id))
+        {
+            handleSyncLoss(conn, id, msgLength);
+            break;
+        }
+
+        /*
+         * Can't process if message body isn't all here yet.
+         */
+        msgLength -= 4;
+        avail = conn->inEnd - conn->inCursor;
+        if (avail < msgLength)
+        {
+            /*
+             * Before looping, enlarge the input buffer if needed to hold the
+             * whole message.  See notes in parseInput.
+             */
+            if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength,
+                                     conn))
+            {
+                /*
+                 * XXX add some better recovery code... plan is to skip over
+                 * the message using its length, then report an error. For the
+                 * moment, just treat this like loss of sync (which indeed it
+                 * might be!)
+                 */
+                handleSyncLoss(conn, id, msgLength);
+                break;
+            }
+            continue;
+        }
+
+        /*
+         * We should see V or E response to the command, but might get N
+         * and/or A notices first. We also need to swallow the final Z before
+         * returning.
+         */
+        switch (id)
+        {
+            case 'V':            /* function result */
+                if (pqGetInt(actual_result_len, 4, conn))
+                    continue;
+                if (*actual_result_len != -1)
+                {
+                    if (result_is_int)
+                    {
+                        if (pqGetInt(result_buf, *actual_result_len, conn))
+                            continue;
+                    }
+                    else
+                    {
+                        if (pqGetnchar((char *) result_buf,
+                                       *actual_result_len,
+                                       conn))
+                            continue;
+                    }
+                }
+                /* correctly finished function result message */
+                status = PGRES_COMMAND_OK;
+                break;
+            case 'E':            /* error return */
+                if (pqGetErrorNotice3(conn, true))
+                    continue;
+                status = PGRES_FATAL_ERROR;
+                break;
+            case 'A':            /* notify message */
+                /* handle notify and go back to processing return values */
+                if (getNotify(conn))
+                    continue;
+                break;
+            case 'N':            /* notice */
+                /* handle notice and go back to processing return values */
+                if (pqGetErrorNotice3(conn, false))
+                    continue;
+                break;
+            case 'Z':            /* backend is ready for new query */
+                if (getReadyForQuery(conn))
+                    continue;
+                /* consume the message and exit */
+                conn->inStart += 5 + msgLength;
+                /* if we saved a result object (probably an error), use it */
+                if (conn->result)
+                    return pqPrepareAsyncResult(conn);
+                return PQmakeEmptyPGresult(conn, status);
+            case 'S':            /* parameter status */
+                if (getParameterStatus(conn))
+                    continue;
+                break;
+            default:
+                /* The backend violates the protocol. */
+                printfPQExpBuffer(&conn->errorMessage,
+                                  libpq_gettext("protocol error: id=0x%x\n"),
+                                  id);
+                pqSaveErrorResult(conn);
+                /* trust the specified message length as what to skip */
+                conn->inStart += 5 + msgLength;
+                return pqPrepareAsyncResult(conn);
+        }
+        /* Completed this message, keep going */
+        /* trust the specified message length as what to skip */
+        conn->inStart += 5 + msgLength;
+        needInput = false;
+    }
+
+    /*
+     * We fall out of the loop only upon failing to read data.
+     * conn->errorMessage has been set by pqWait or pqReadData. We want to
+     * append it to any already-received error message.
+     */
+    pqSaveErrorResult(conn);
+    return pqPrepareAsyncResult(conn);
+}
+
+
+/*
+ * Construct startup packet
+ *
+ * Returns a malloc'd packet buffer, or NULL if out of memory
+ */
+char *
+pqBuildStartupPacket3(PGconn *conn, int *packetlen,
+                      const PQEnvironmentOption *options)
+{
+    char       *startpacket;
+
+    *packetlen = build_startup_packet(conn, NULL, options);
+    startpacket = (char *) malloc(*packetlen);
+    if (!startpacket)
+        return NULL;
+    *packetlen = build_startup_packet(conn, startpacket, options);
+    return startpacket;
+}
+
+/*
+ * Build a startup packet given a filled-in PGconn structure.
+ *
+ * We need to figure out how much space is needed, then fill it in.
+ * To avoid duplicate logic, this routine is called twice: the first time
+ * (with packet == NULL) just counts the space needed, the second time
+ * (with packet == allocated space) fills it in.  Return value is the number
+ * of bytes used.
+ */
+static int
+build_startup_packet(const PGconn *conn, char *packet,
+                     const PQEnvironmentOption *options)
+{
+    int            packet_len = 0;
+    const PQEnvironmentOption *next_eo;
+    const char *val;
+
+    /* Protocol version comes first. */
+    if (packet)
+    {
+        ProtocolVersion pv = htonl(conn->pversion);
+
+        memcpy(packet + packet_len, &pv, sizeof(ProtocolVersion));
+    }
+    packet_len += sizeof(ProtocolVersion);
+
+    /* Add user name, database name, options */
+
+#define ADD_STARTUP_OPTION(optname, optval) \
+    do { \
+        if (packet) \
+            strcpy(packet + packet_len, optname); \
+        packet_len += strlen(optname) + 1; \
+        if (packet) \
+            strcpy(packet + packet_len, optval); \
+        packet_len += strlen(optval) + 1; \
+    } while(0)
+
+    if (conn->pguser && conn->pguser[0])
+        ADD_STARTUP_OPTION("user", conn->pguser);
+    if (conn->dbName && conn->dbName[0])
+        ADD_STARTUP_OPTION("database", conn->dbName);
+    if (conn->replication && conn->replication[0])
+        ADD_STARTUP_OPTION("replication", conn->replication);
+    if (conn->pgoptions && conn->pgoptions[0])
+        ADD_STARTUP_OPTION("options", conn->pgoptions);
+    if (conn->send_appname)
+    {
+        /* Use appname if present, otherwise use fallback */
+        val = conn->appname ? conn->appname : conn->fbappname;
+        if (val && val[0])
+            ADD_STARTUP_OPTION("application_name", val);
+    }
+
+    if (conn->client_encoding_initial && conn->client_encoding_initial[0])
+        ADD_STARTUP_OPTION("client_encoding", conn->client_encoding_initial);
+
+    /* Add any environment-driven GUC settings needed */
+    for (next_eo = options; next_eo->envName; next_eo++)
+    {
+        if ((val = getenv(next_eo->envName)) != NULL)
+        {
+            if (pg_strcasecmp(val, "default") != 0)
+                ADD_STARTUP_OPTION(next_eo->pgName, val);
+        }
+    }
+
+    /* Add trailing terminator */
+    if (packet)
+        packet[packet_len] = '\0';
+    packet_len++;
+
+    return packet_len;
+}
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..23cf729 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,6 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQsetRowProcessor      161
+PQsetRowProcessorErrMsg      162
+PQskipRemainingResults      163
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 27a9805..4605e49 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2693,6 +2693,9 @@ makeEmptyPGconn(void)    conn->wait_ssl_try = false;#endif
+    /* set default row processor */
+    PQsetRowProcessor(conn, NULL, NULL);
+    /*     * We try to send at least 8K at a time, which is the usual size of pipe     * buffers on Unix systems.
Thatway, when we are sending a large amount
 
@@ -2711,8 +2714,13 @@ makeEmptyPGconn(void)    initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
+    /* set up initial row buffer */
+    conn->rowBufLen = 32;
+    conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+    if (conn->inBuffer == NULL ||        conn->outBuffer == NULL ||
+        conn->rowBuf == NULL ||        PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))   {
 
@@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn)        free(conn->inBuffer);    if (conn->outBuffer)
free(conn->outBuffer);
+    if (conn->rowBuf)
+        free(conn->rowBuf);    termPQExpBuffer(&conn->errorMessage);    termPQExpBuffer(&conn->workBuffer);
@@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..d7f3ae9 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
+static int    pqAddRow(PGresult *res, void *param, PGrowValue *columns);/* ----------------
@@ -160,6 +161,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)    result->curBlock = NULL;
result->curOffset= 0;    result->spaceLeft = 0;
 
+    result->rowProcessorErrMsg = NULL;    if (conn)    {
@@ -701,7 +703,6 @@ pqClearAsyncResult(PGconn *conn)    if (conn->result)        PQclear(conn->result);    conn->result
=NULL;
 
-    conn->curTuple = NULL;}/*
@@ -756,7 +757,6 @@ pqPrepareAsyncResult(PGconn *conn)     */    res = conn->result;    conn->result = NULL;        /*
handingover ownership to caller */
 
-    conn->curTuple = NULL;        /* just in case */    if (!res)        res = PQmakeEmptyPGresult(conn,
PGRES_FATAL_ERROR);   else
 
@@ -828,6 +828,73 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/*
+ * PQsetRowProcessor
+ *   Set function that copies column data out from network buffer.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+    conn->rowProcessor = (func ? func : pqAddRow);
+    conn->rowProcessorParam = param;
+}
+
+/*
+ * PQsetRowProcessorErrMsg
+ *    Set the error message pass back to the caller of RowProcessor.
+ *
+ *    You can replace the previous message by alternative mes, or clear
+ *    it with NULL.
+ */
+void
+PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+{
+    if (msg)
+        res->rowProcessorErrMsg = pqResultStrdup(res, msg);
+    else
+        res->rowProcessorErrMsg = NULL;
+}
+
+/*
+ * pqAddRow
+ *      add a row to the PGresult structure, growing it if necessary
+ *      Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+static int
+pqAddRow(PGresult *res, void *param, PGrowValue *columns)
+{
+    PGresAttValue *tup;
+    int            nfields = res->numAttributes;
+    int            i;
+
+    tup = (PGresAttValue *)
+        pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+    if (tup == NULL)
+        return FALSE;
+
+    for (i = 0 ; i < nfields ; i++)
+    {
+        tup[i].len = columns[i].len;
+        if (tup[i].len == NULL_LEN)
+        {
+            tup[i].value = res->null_field;
+        }
+        else
+        {
+            bool isbinary = (res->attDescs[i].format != 0);
+            tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+            if (tup[i].value == NULL)
+                return FALSE;
+
+            memcpy(tup[i].value, columns[i].value, tup[i].len);
+            /* We have to terminate this ourselves */
+            tup[i].value[tup[i].len] = '\0';
+        }
+    }
+
+    return pqAddTuple(res, tup);
+}
+
+/* * pqAddTuple *      add a row pointer to the PGresult structure, growing it if necessary *      Returns TRUE if OK,
FALSEif not enough memory to add the row
 
@@ -1223,7 +1290,6 @@ PQsendQueryStart(PGconn *conn)    /* initialize async result-accumulation state */
conn->result= NULL;
 
-    conn->curTuple = NULL;    /* ready to send command message */    return true;
@@ -1832,6 +1898,22 @@ PQexecFinish(PGconn *conn)}/*
+ * Exaust remaining Data Rows in curret conn.
+ */
+PGresult *
+PQskipRemainingResults(PGconn *conn)
+{
+    pqClearAsyncResult(conn);
+
+    /* conn->result is set NULL in pqClearAsyncResult() */
+    pqSaveErrorResult(conn);
+
+    /* Skip over remaining Data Row messages */
+    PQgetResult(conn);
+}
+
+
+/* * PQdescribePrepared *      Obtain information about a previously prepared statement *
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/*
+ * pqGetnchar:
+ *    skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+    if (len > (size_t) (conn->inEnd - conn->inCursor))
+        return EOF;
+
+    conn->inCursor += len;
+
+    if (conn->Pfdebug)
+        fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+                (unsigned long) len);
+
+    return 0;
+}
+
+/* * pqPutnchar: *    write exactly len bytes to the current message */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..ae4d7b0 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, FALSE))                            return;
 
+                        /* getAnotherTuple moves inStart itself */
+                        continue;                    }                    else                    {
@@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, TRUE))                            return;
 
+                        /* getAnotherTuple moves inStart itself */
+                        continue;                    }                    else                    {
@@ -703,52 +707,51 @@ failure:/* * parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error
ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess
 
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, bool binary){    PGresult
*result= conn->result;    int            nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  *rowbuf;    /* the backend sends us a bitmap of which attributes are null */    char
std_bitmap[64];/* used unless it doesn't fit */    char       *bitmap = std_bitmap;    int            i;
 
+    int            rp;    size_t        nbytes;            /* the number of bytes in bitmap  */    char        bmap;
        /* One byte of the bitmap */    int            bitmap_index;    /* Its index */    int            bitcnt;
    /* number of bits examined in current byte */    int            vlen;            /* length of the current field
value*/
 
+    /* resize row buffer if needed */
+    if (nfields > conn->rowBufLen)
+    {
+        rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+        if (!rowbuf)
+            goto rowProcessError;
+        conn->rowBuf = rowbuf;
+        conn->rowBufLen = nfields;
+    }
+    else
+    {
+        rowbuf = conn->rowBuf;
+    }
+    result->binary = binary;
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
+    if (binary)    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-        /*
-         * If it's binary, fix the column format indicators.  We assume the
-         * backend will consistently send either B or D, not a mix.
-         */
-        if (binary)
-        {
-            for (i = 0; i < nfields; i++)
-                result->attDescs[i].format = 1;
-        }
+        for (i = 0; i < nfields; i++)
+            result->attDescs[i].format = 1;    }
-    tup = conn->curTuple;    /* Get the null-value bitmap */    nbytes = (nfields + BITS_PER_BYTE - 1) /
BITS_PER_BYTE;
@@ -757,7 +760,7 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);        if
(!bitmap)
-            goto outOfMemory;
+            goto rowProcessError;    }    if (pqGetnchar(bitmap, nbytes, conn))
@@ -771,34 +774,29 @@ getAnotherTuple(PGconn *conn, bool binary)    for (i = 0; i < nfields; i++)    {        if
(!(bmap& 0200)) 
-        {
-            /* if the field value is absent, make it a null string */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-        }
+            vlen = NULL_LEN;
+        else if (pqGetInt(&vlen, 4, conn))
+                goto EOFexit;        else        {
-            /* get the value length (the first four bytes are for length) */
-            if (pqGetInt(&vlen, 4, conn))
-                goto EOFexit;            if (!binary)                vlen = vlen - 4;            if (vlen < 0)
      vlen = 0;
 
-            if (tup[i].value == NULL)
-            {
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                if (tup[i].value == NULL)
-                    goto outOfMemory;
-            }
-            tup[i].len = vlen;
-            /* read in the value */
-            if (vlen > 0)
-                if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                    goto EOFexit;
-            /* we have to terminate this ourselves */
-            tup[i].value[vlen] = '\0';        }
+
+        /*
+         * rowbuf[i].value always points to the next address of the
+         * length field even if the value is NULL, to allow safe
+         * size estimates and data copy.
+         */
+        rowbuf[i].value = conn->inBuffer + conn->inCursor;
+        rowbuf[i].len = vlen;
+
+        /* Skip the value */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            goto EOFexit;
+        /* advance the bitmap stuff */        bitcnt++;        if (bitcnt == BITS_PER_BYTE)
@@ -811,33 +809,56 @@ getAnotherTuple(PGconn *conn, bool binary)            bmap <<= 1;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
-    if (bitmap != std_bitmap)        free(bitmap);
-    return 0;
+    bitmap = NULL;
+
+    /* tag the row as parsed */
+    conn->inStart = conn->inCursor;
+
+    /* Pass the completed row values to rowProcessor */
+    rp= conn->rowProcessor(result, conn->rowProcessorParam, rowbuf);
+    if (rp == 1)
+        return 0;
+    else if (rp == 2 && pqIsnonblocking(conn))
+        /* processor requested early exit */
+        return EOF;
+    else if (rp != 0)
+        PQsetRowProcessorErrMsg(result, libpq_gettext("invalid return value from row processor\n"));
+
+rowProcessError:
-outOfMemory:    /* Replace partially constructed result with an error result */
-    /*
-     * we do NOT use pqSaveErrorResult() here, because of the likelihood that
-     * there's not enough memory to concatenate messages...
-     */
-    pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    if (result->rowProcessorErrMsg)
+    {
+        printfPQExpBuffer(&conn->errorMessage, "%s", result->rowProcessorErrMsg);
+        pqSaveErrorResult(conn);
+    }
+    else
+    {
+        /*
+         * we do NOT use pqSaveErrorResult() here, because of the likelihood that
+         * there's not enough memory to concatenate messages...
+         */
+        pqClearAsyncResult(conn);
+        resetPQExpBuffer(&conn->errorMessage);
-    /*
-     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
-     * do to recover...
-     */
-    conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+        /*
+         * If error message is passed from RowProcessor, set it into
+         * PGconn, assume out of memory if not.
+         */
+        appendPQExpBufferStr(&conn->errorMessage,
+                             libpq_gettext("out of memory for query result\n"));
+
+        /*
+         * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can
+         * do to recover...
+         */
+        conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
+    }    conn->asyncStatus = PGASYNC_READY;
+    /* Discard the failed message --- good idea? */    conn->inStart = conn->inEnd;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..0260ba6 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, msgLength))                            return;
 
+
+                        /* getAnotherTuple() moves inStart itself */
+                        continue;                    }                    else if (conn->result != NULL &&
               conn->result->resultStatus == PGRES_FATAL_ERROR)
 
@@ -613,33 +616,22 @@ failure:/* * parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error
ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess
 
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, int msgLength){    PGresult
*result= conn->result;    int            nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  *rowbuf;    int            tupnfields;        /* # fields from tuple */    int            vlen;
   /* length of the current field value */    int            i;
 
-
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
-    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-    }
-    tup = conn->curTuple;
+    int            rp;    /* Get the field count and make sure it's what we expect */    if (pqGetInt(&tupnfields, 2,
conn))
@@ -652,52 +644,88 @@ getAnotherTuple(PGconn *conn, int msgLength)                 libpq_gettext("unexpected field
countin \"D\" message\n"));        pqSaveErrorResult(conn);        /* Discard the failed message by pretending we read
it*/
 
-        conn->inCursor = conn->inStart + 5 + msgLength;
+        conn->inStart += 5 + msgLength;        return 0;    }
+    /* resize row buffer if needed */
+    rowbuf = conn->rowBuf;
+    if (nfields > conn->rowBufLen)
+    {
+        rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+        if (!rowbuf)
+        {
+            goto outOfMemory1;
+        }
+        conn->rowBuf = rowbuf;
+        conn->rowBufLen = nfields;
+    }
+    /* Scan the fields */    for (i = 0; i < nfields; i++)    {        /* get the value length */        if
(pqGetInt(&vlen,4, conn))
 
-            return EOF;
+            goto protocolError;        if (vlen == -1)
-        {
-            /* null field */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-            continue;
-        }
-        if (vlen < 0)
+            vlen = NULL_LEN;
+        else if (vlen < 0)            vlen = 0;
-        if (tup[i].value == NULL)
-        {
-            bool        isbinary = (result->attDescs[i].format != 0);
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-            if (tup[i].value == NULL)
-                goto outOfMemory;
-        }
-        tup[i].len = vlen;
-        /* read in the value */
-        if (vlen > 0)
-            if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                return EOF;
-        /* we have to terminate this ourselves */
-        tup[i].value[vlen] = '\0';
+        /*
+         * rowbuf[i].value always points to the next address of the
+         * length field even if the value is NULL, to allow safe
+         * size estimates and data copy.
+         */
+        rowbuf[i].value = conn->inBuffer + conn->inCursor;
+        rowbuf[i].len = vlen;
+
+        /* Skip to the next length field */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            goto protocolError;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /* tag the row as parsed, check if correctly */
+    conn->inStart += 5 + msgLength;
+    if (conn->inCursor != conn->inStart)
+        goto protocolError;
+    /* Pass the completed row values to rowProcessor */
+    rp = conn->rowProcessor(result, conn->rowProcessorParam, rowbuf);
+    if (rp == 1)
+    {
+        /* everything is good */
+        return 0;
+    }
+    if (rp == 2 && pqIsnonblocking(conn))
+    {
+        /* processor requested early exit */
+        return EOF;
+    }
+
+    /* there was some problem */
+    if (rp == 0)
+    {
+        if (result->rowProcessorErrMsg == NULL)
+            goto outOfMemory2;
+
+        /* use supplied error message */
+        printfPQExpBuffer(&conn->errorMessage, "%s", result->rowProcessorErrMsg);
+    }
+    else
+    {
+        /* row processor messed up */
+        printfPQExpBuffer(&conn->errorMessage,
+                          libpq_gettext("invalid return value from row processor\n"));
+    }
+    pqSaveErrorResult(conn);    return 0;
-outOfMemory:
+outOfMemory1:
+    /* Discard the failed message by pretending we read it */
+    conn->inStart += 5 + msgLength;
+outOfMemory2:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.
 
@@ -706,9 +734,14 @@ outOfMemory:    printfPQExpBuffer(&conn->errorMessage,                      libpq_gettext("out of
memoryfor query result\n"));    pqSaveErrorResult(conn);
 
+    return 0;
+protocolError:
+    printfPQExpBuffer(&conn->errorMessage,
+                      libpq_gettext("invalid row contents\n"));
+    pqSaveErrorResult(conn);    /* Discard the failed message by pretending we read it */
-    conn->inCursor = conn->inStart + 5 + msgLength;
+    conn->inStart += 5 + msgLength;    return 0;}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..2d913c4 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,17 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGrowValue points a column value of in network buffer.
+ * Value is a string without null termination and length len.
+ * NULL is represented as len < 0, value points then to place
+ * where value would have been.
+ */
+typedef struct pgRowValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, without null termination */
+} PGrowValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -367,6 +378,8 @@ extern PGresult *PQexecPrepared(PGconn *conn,               const int *paramFormats,
intresultFormat);
 
+extern PGresult *PQskipRemainingResults(PGconn *conn);
+/* Interface for multiple-result or asynchronous queries */extern int    PQsendQuery(PGconn *conn, const char
*query);externint PQsendQueryParams(PGconn *conn,
 
@@ -416,6 +429,37 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * Columns array will contain PQnfields() entries, each one
+ * pointing to particular column data in network buffer.
+ * This function is supposed to copy data out from there
+ * and store somewhere.  NULL is signified with len<0.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQsetRowProcessorErrMsg.  It
+ * is assumed by caller as out of memory when the error message is not
+ * set on failure. This function is assumed not to throw any
+ * exception.
+ */
+typedef int (*PQrowProcessor)(PGresult *res, void *param,
+                                PGrowValue *columns);
+
+/*
+ * Set alternative row data processor for PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+                                   void *rowProcessorParam);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +498,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void    PQsetRowProcessorErrMsg(PGresult *res, char *msg);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 987311e..1fc5aab 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -209,6 +209,9 @@ struct pg_result    PGresult_data *curBlock;    /* most recently allocated block */    int
 curOffset;        /* start offset of free space in block */    int            spaceLeft;        /* number of free
bytesremaining in block */
 
+
+    /* temp etorage for message from row processor callback */
+    char       *rowProcessorErrMsg;};/* PGAsyncStatusType defines the state of the query-execution state machine */
@@ -398,7 +401,6 @@ struct pg_conn    /* Status for asynchronous result construction */    PGresult   *result;
 /* result being constructed */
 
-    PGresAttValue *curTuple;    /* tuple currently being read */#ifdef USE_SSL    bool        allow_ssl_try;    /*
Allowedto try SSL negotiation */
 
@@ -443,6 +445,14 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /*
+     * Read column data from network buffer.
+     */
+    PQrowProcessor rowProcessor;/* Function pointer */
+    void *rowProcessorParam;    /* Contextual parameter for rowProcessor */
+    PGrowValue *rowBuf;            /* Buffer for passing values to rowProcessor */
+    int rowBufLen;                /* Number of columns allocated in rowBuf */};/* PGcancel stores all data necessary
tocancel a connection. A copy of this
 
@@ -560,6 +570,7 @@ extern int    pqGets(PQExpBuffer buf, PGconn *conn);extern int    pqGets_append(PQExpBuffer buf,
PGconn*conn);extern int    pqPuts(const char *s, PGconn *conn);extern int    pqGetnchar(char *s, size_t len, PGconn
*conn);
+extern int    pqSkipnchar(size_t len, PGconn *conn);extern int    pqPutnchar(const char *s, size_t len, PGconn
*conn);externint    pqGetInt(int *result, size_t bytes, PGconn *conn);extern int    pqPutInt(int value, size_t bytes,
PGconn*conn); 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..9cb6d65 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,274 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-altrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-altrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, rows are stored into <type>PQresult</type>
+   until full resultset is received.  Then such completely-filled
+   <type>PQresult</type> is passed to user.  This behaviour can be
+   changed by registering alternative row processor function,
+   that will see each row data as soon as it is received
+   from network.  It has the option of processing the data
+   immediately, or storing it into custom container.
+  </para>
+
+  <para>
+   Note - as row processor sees rows as they arrive, it cannot know
+   whether the SQL statement actually finishes successfully on server
+   or not.  So some care must be taken to get proper
+   transactionality.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessor">
+    <term>
+     <function>PQsetRowProcessor</function>
+     <indexterm>
+      <primary>PQsetRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the row processor function.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default processor.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqrowprocessor">
+    <term>
+     <type>PQrowProcessor</type>
+     <indexterm>
+      <primary>PQrowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns);
+
+typedef struct
+{
+    int         len;            /* length in bytes of the value, -1 if NULL */
+    char       *value;          /* actual value, without null termination */
+} PGrowValue;
+</synopsis>
+     </para>
+
+     <para>
+      The <parameter>columns</parameter> array will have PQnfields()
+      elements, each one pointing to column value in network buffer.
+      The <parameter>len</parameter> field will contain number of
+      bytes in value.  If the field value is NULL then
+      <parameter>len</parameter> will be -1 and value will point
+      to position where the value would have been in buffer.
+      This allows estimating row size by pointer arithmetic.
+     </para>
+
+     <para>
+       This function must process or copy row values away from network
+       buffer before it returns, as next row might overwrite them.
+     </para>
+
+     <para>
+       This function must return 1 for success, and 0 for failure.
+       On failure this function should set the error message
+       with <function>PGsetRowProcessorErrMsg</function> if the cause
+       is other than out of memory.
+       When non-blocking API is in use, it can also return 2
+       for early exit from <function>PQisBusy</function> function.
+       The supplied <parameter>res</parameter> and <parameter>columns</parameter>
+       values will stay valid so row can be processed outside of callback.
+       Caller is resposible for tracking whether the <parameter>PQisBusy</parameter>
+       returned early from callback or for other reasons.
+       Usually this should happen via setting cached values to NULL
+       before calling <function>PQisBusy</function>.
+     </para>
+
+     <para>
+       The function is allowed to exit via exception (setjmp/longjmp).
+       The connection and row are guaranteed to be in valid state.
+       The connection can later be closed via <function>PQfinish</function>.
+       Processing can also be continued without closing the connection,
+       call <function>getResult</function> on syncronous mode,
+       <function>PQisBusy</function> on asynchronous connection.  Then
+       processing will continue with new row, previous row that got
+       exception will be skipped. Or you can discard all remaing rows
+       by calling <function>PQskipRemainingResults</function> without
+       closing connection.
+     </para>
+
+     <variablelist>
+       <varlistentry>
+
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         Extra parameter that was given to <function>PQsetRowProcessor</function>.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>columns</parameter></term>
+     <listitem>
+       <para>
+         Column values of the row to process.  Column values
+         are located in network buffer, the processor must
+         copy them out from there.
+       </para>
+       <para>
+         Column values are not null-terminated, so processor cannot
+         use C string functions on them directly.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqskipremaingresults">
+    <term>
+     <function>PQskipRemainigResults</function>
+     <indexterm>
+      <primary>PQskipRemainigResults</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+        Discard all the remaining row data
+        after <function>PQexec</function>
+        or <function>PQgetResult</function> exits by the exception raised
+        in <type>RowProcessor</type> without closing connection.
+      </para>
+      <para>
+        Do not call this function when the functions above return normally.
+<synopsis>
+void PQskipRemainingResults(PGconn *conn)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object.
+         </para>
+       </listitem>
+     </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessorerrmsg">
+    <term>
+     <function>PQsetRowProcessorErrMsg</function>
+     <indexterm>
+      <primary>PQsetRowProcessorErrMsg</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>PQrowProcessor</type>.  If this message is not set, the
+    caller assumes the error to be out of memory.
+<synopsis>
+void PQsetRowProcessorErrMsg(PGresult *res, char *msg)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>PQrowProcessor</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>msg</parameter></term>
+        <listitem>
+          <para>
+        Error message. This will be copied internally so there is
+        no need to care of the scope.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it will be overwritten. Set NULL to cancel the the custom
+        message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..b1c171a 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,24 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    char **cstrs;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +755,249 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */
-    if (!is_async)
-        res = PQexec(conn, sql);
-    else
+    PG_TRY();    {
-        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
+        if (!is_async)
+            res = PQexec(conn, sql);
+        else
+            res = PQgetResult(conn);
+    }
+    PG_CATCH();
+    {
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipRemainingResults(conn);
+        storeinfo.error_occurred = TRUE;    }
+    PG_END_TRY();
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
+    int i;
-    Assert(rsinfo->returnMode == SFRM_Materialize);
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    {
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
-    PG_TRY();
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->valbuf = NULL;
+    sinfo->valbuflen = NULL;
+
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
+    if (sinfo->valbuf)
+        sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    if (sinfo->valbuflen)
+        sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
+
+    if (sinfo->cstrs == NULL)    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        if (sinfo->valbuf)
+            free(sinfo->valbuf);
+        if (sinfo->valbuflen)
+            free(sinfo->valbuflen);
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
+        ereport(ERROR,
+                (errcode(ERRCODE_OUT_OF_MEMORY),
+                 errmsg("out of memory")));
+    }
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)
+        {
+            if (sinfo->valbuf[i])
+                free(sinfo->valbuf[i]);        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->cstrs)
+    {
+        free(sinfo->cstrs);
+        sinfo->cstrs = NULL;
+    }
-            values = (char **) palloc(nfields * sizeof(char *));
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+static int
+storeHandler(PGresult *res, void *param, PGrowValue *columns)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      **cstrs = sinfo->cstrs;
-                if (!is_sql_cmd)
-                {
-                    int            i;
+    if (sinfo->error_occurred)
+        return FALSE;
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
+
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return FALSE;
+    }
+
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)
+    {
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            char *tmp = sinfo->valbuf[i];
+            int tmplen = sinfo->valbuflen[i];
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
+            /*
+             * Divide calls to malloc and realloc so that things will
+             * go fine even on the systems of which realloc() does not
+             * accept NULL as old memory block.
+             *
+             * Also try to (re)allocate in bigger steps to
+             * avoid flood of allocations on weird data.
+             */
+            if (tmp == NULL)
+            {
+                tmplen = len + 1;
+                if (tmplen < 64)
+                    tmplen = 64;
+                tmp = (char *)malloc(tmplen);
+            }
+            else if (tmplen < len + 1)
+            {
+                if (len + 1 > tmplen * 2)
+                    tmplen = len + 1;
+                else
+                    tmplen = tmplen * 2;
+                tmp = (char *)realloc(tmp, tmplen);            }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+            /*
+             * sinfo->valbuf[n] will be freed in finishStoreInfo()
+             * when realloc returns NULL.
+             */
+            if (tmp == NULL)
+                return FALSE;
-        PQclear(res);
-    }
-    PG_CATCH();
-    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+            sinfo->valbuf[i] = tmp;
+            sinfo->valbuflen[i] = tmplen;
+
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';
+        }    }
-    PG_END_TRY();
+
+    /*
+     * These functions may throw exception. It will be caught in
+     * dblink_record_internal()
+     */
+    tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+    tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+    return TRUE;}/*
diff --git b/doc/src/sgml/libpq.sgml a/doc/src/sgml/libpq.sgml
index de95ea8..9cb6d65 100644
--- b/doc/src/sgml/libpq.sgml
+++ a/doc/src/sgml/libpq.sgml
@@ -7352,6 +7352,14 @@ typedef struct       On failure this function should set the error message       with
<function>PGsetRowProcessorErrMsg</function>if the cause       is other than out of memory.
 
+       When non-blocking API is in use, it can also return 2
+       for early exit from <function>PQisBusy</function> function.
+       The supplied <parameter>res</parameter> and <parameter>columns</parameter>
+       values will stay valid so row can be processed outside of callback.
+       Caller is resposible for tracking whether the <parameter>PQisBusy</parameter>
+       returned early from callback or for other reasons.
+       Usually this should happen via setting cached values to NULL
+       before calling <function>PQisBusy</function>.     </para>     <para>
diff --git b/src/interfaces/libpq/fe-protocol2.c a/src/interfaces/libpq/fe-protocol2.c
index 7498580..ae4d7b0 100644
--- b/src/interfaces/libpq/fe-protocol2.c
+++ a/src/interfaces/libpq/fe-protocol2.c
@@ -820,6 +820,9 @@ getAnotherTuple(PGconn *conn, bool binary)    rp= conn->rowProcessor(result,
conn->rowProcessorParam,rowbuf);    if (rp == 1)        return 0;
 
+    else if (rp == 2 && pqIsnonblocking(conn))
+        /* processor requested early exit */
+        return EOF;    else if (rp != 0)        PQsetRowProcessorErrMsg(result, libpq_gettext("invalid return value
fromrow processor\n"));
 
diff --git b/src/interfaces/libpq/fe-protocol3.c a/src/interfaces/libpq/fe-protocol3.c
index a67e3ac..0260ba6 100644
--- b/src/interfaces/libpq/fe-protocol3.c
+++ a/src/interfaces/libpq/fe-protocol3.c
@@ -697,6 +697,11 @@ getAnotherTuple(PGconn *conn, int msgLength)        /* everything is good */        return 0;
}
+    if (rp == 2 && pqIsnonblocking(conn))
+    {
+        /* processor requested early exit */
+        return EOF;
+    }    /* there was some problem */    if (rp == 0)

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Marko Kreen
Дата:
Сообщение: Re: Speed dblink using alternate libpq tuple storage
Следующее
От: Dave Page
Дата:
Сообщение: Re: Google Summer of Code? Call for mentors.