Re: Speed dblink using alternate libpq tuple storage

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: Speed dblink using alternate libpq tuple storage
Дата
Msg-id 20120228.170444.122386056.horiguchi.kyotaro@oss.ntt.co.jp
обсуждение исходный текст
Ответ на Re: Speed dblink using alternate libpq tuple storage  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
Ответы Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
Список pgsql-hackers
This is the new version of the patch.
It is not rebased to the HEAD because of a build error.

> It's better to restore old two-path error handling.

I restorerd "OOM and save result" route. But it seems needed to
get back any amount of memory on REAL OOM as the comment in
original code says.

So I restored the meaning of rp == 0 && errMsg == NULL as REAL
OOM which is to throw the async result away and the result will
be preserved if errMsg is not NULL. `unknown error' has been
removed.

As the result, if row processor returns 0 the parser skips to the
end of rows and returns the working result or an error result
according to whether errMsg is set or not in the row processor.


> I don't think that should be required.  Just use a dummy msg.

Considering the above, pqAddRow is also restored to leave errMsg
NULL on OOM.

> There is still one EOF in v3 getAnotherTuple() -
> pqGetInt(tupnfields), please turn that one also to
> protocolerror.

pqGetInt() returns EOF only when it wants additional reading from
network if the parameter `bytes' is appropreate. Non-zero return
from it seems should be handled as EOF, not a protocol error. The
one point I had modified bugilly is also restored. The so-called
'protocol error' has been vanished eventually.

> Instead use ("%s", errmsg) as argument there.  libpq code
> is noisy enough, no need to add more.

done

Is there someting left undone?


By the way, I noticed that dblink always says that the current
connection is 'unnamed' in messages the errors in
dblink_record_internal@dblink.  I could see that
dblink_record_internal defines the local variable conname = NULL
and pass it to dblink_res_error to display the error message. But
no assignment on it in the function.

It seemed properly shown when I added the code to set conname
from PG_GETARG_TEXT_PP(0) if available, in other words do that
just after DBLINK_GET_CONN/DBLINK_GET_NAMED_CONN's. It seems the
dblink's manner...  This is not included in this patch.

Furthurmore dblink_res_error looks only into returned PGresult to
display the error and always says only `Error occurred on dblink
connection..: could not execute query'..

Is it right to consider this as follows?
- dblink is wrong in error handling. A client of libpq should  see PGconn by PQerrorMessage() if (or regardless of
whether?) PGresult says nothing about error.
 


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1af8df6..239edb8 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -160,3 +160,7 @@ PQconnectStartParams      157PQping                    158PQpingParams              159PQlibVersion
            160
 
+PQsetRowProcessor      161
+PQgetRowProcessor      162
+PQresultSetErrMsg      163
+PQskipResult          164
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 27a9805..4605e49 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2693,6 +2693,9 @@ makeEmptyPGconn(void)    conn->wait_ssl_try = false;#endif
+    /* set default row processor */
+    PQsetRowProcessor(conn, NULL, NULL);
+    /*     * We try to send at least 8K at a time, which is the usual size of pipe     * buffers on Unix systems.
Thatway, when we are sending a large amount
 
@@ -2711,8 +2714,13 @@ makeEmptyPGconn(void)    initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
+    /* set up initial row buffer */
+    conn->rowBufLen = 32;
+    conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue));
+    if (conn->inBuffer == NULL ||        conn->outBuffer == NULL ||
+        conn->rowBuf == NULL ||        PQExpBufferBroken(&conn->errorMessage) ||
PQExpBufferBroken(&conn->workBuffer))   {
 
@@ -2814,6 +2822,8 @@ freePGconn(PGconn *conn)        free(conn->inBuffer);    if (conn->outBuffer)
free(conn->outBuffer);
+    if (conn->rowBuf)
+        free(conn->rowBuf);    termPQExpBuffer(&conn->errorMessage);    termPQExpBuffer(&conn->workBuffer);
@@ -5078,3 +5088,4 @@ PQregisterThreadLock(pgthreadlock_t newhandler)    return prev;}
+
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b743566..ce58778 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type,
           const char *desc_target);static int    check_field_number(const PGresult *res, int field_num);
 
+static int    pqAddRow(PGresult *res, PGrowValue *columns, void *param);/* ----------------
@@ -701,7 +702,6 @@ pqClearAsyncResult(PGconn *conn)    if (conn->result)        PQclear(conn->result);    conn->result
=NULL;
 
-    conn->curTuple = NULL;}/*
@@ -756,7 +756,6 @@ pqPrepareAsyncResult(PGconn *conn)     */    res = conn->result;    conn->result = NULL;        /*
handingover ownership to caller */
 
-    conn->curTuple = NULL;        /* just in case */    if (!res)        res = PQmakeEmptyPGresult(conn,
PGRES_FATAL_ERROR);   else
 
@@ -828,6 +827,87 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/*
+ * PQsetRowProcessor
+ *   Set function that copies column data out from network buffer.
+ */
+void
+PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
+{
+    conn->rowProcessor = (func ? func : pqAddRow);
+    conn->rowProcessorParam = param;
+}
+
+/*
+ * PQgetRowProcessor
+ *   Get current row processor of conn. set pointer to current parameter for
+ *   row processor to param if not NULL.
+ */
+PQrowProcessor
+PQgetRowProcessor(PGconn *conn, void **param)
+{
+    if (param)
+        *param = conn->rowProcessorParam;
+
+    return conn->rowProcessor;
+}
+
+/*
+ * PQresultSetErrMsg
+ *    Set the error message to PGresult.
+ *
+ *    You can replace the previous message by alternative mes, or clear
+ *    it with NULL.
+ */
+void
+PQresultSetErrMsg(PGresult *res, const char *msg)
+{
+    if (msg)
+        res->errMsg = pqResultStrdup(res, msg);
+    else
+        res->errMsg = NULL;
+}
+
+/*
+ * pqAddRow
+ *      add a row to the PGresult structure, growing it if necessary
+ *      Returns TRUE if OK, FALSE if not enough memory to add the row.
+ */
+static int
+pqAddRow(PGresult *res, PGrowValue *columns, void *param)
+{
+    PGresAttValue *tup;
+    int            nfields = res->numAttributes;
+    int            i;
+
+    tup = (PGresAttValue *)
+        pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
+    if (tup == NULL)
+        return FALSE;
+
+    for (i = 0 ; i < nfields ; i++)
+    {
+        tup[i].len = columns[i].len;
+        if (tup[i].len == NULL_LEN)
+        {
+            tup[i].value = res->null_field;
+        }
+        else
+        {
+            bool isbinary = (res->attDescs[i].format != 0);
+            tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary);
+            if (tup[i].value == NULL)
+                return FALSE;
+
+            memcpy(tup[i].value, columns[i].value, tup[i].len);
+            /* We have to terminate this ourselves */
+            tup[i].value[tup[i].len] = '\0';
+        }
+    }
+
+    return pqAddTuple(res, tup);
+}
+
+/* * pqAddTuple *      add a row pointer to the PGresult structure, growing it if necessary *      Returns TRUE if OK,
FALSEif not enough memory to add the row
 
@@ -1223,7 +1303,6 @@ PQsendQueryStart(PGconn *conn)    /* initialize async result-accumulation state */
conn->result= NULL;
 
-    conn->curTuple = NULL;    /* ready to send command message */    return true;
@@ -1831,6 +1910,55 @@ PQexecFinish(PGconn *conn)    return lastResult;}
+
+/*
+ * Do-nothing row processor for PQskipResult
+ */
+static int
+dummyRowProcessor(PGresult *res, PGrowValue *columns, void *param)
+{
+    return 1;
+}
+
+/*
+ * Exaust remaining Data Rows in curret conn.
+ * 
+ * Exaust current result if skipAll is false and all succeeding results if
+ * true.
+ */
+int
+PQskipResult(PGconn *conn, int skipAll)
+{
+    PQrowProcessor savedRowProcessor;
+    void * savedRowProcParam;
+    PGresult *res;
+    int ret = 0;
+
+    /* save the current row processor settings and set dummy processor */
+    savedRowProcessor = PQgetRowProcessor(conn, &savedRowProcParam);
+    PQsetRowProcessor(conn, dummyRowProcessor, NULL);
+    
+    /*
+     * Throw away the remaining rows in current result, or all succeeding
+     * results if skipAll is not FALSE.
+     */
+    if (skipAll)
+    {
+        while ((res = PQgetResult(conn)) != NULL)
+            PQclear(res);
+    }
+    else if ((res = PQgetResult(conn)) != NULL)
+    {
+        PQclear(res);
+        ret = 1;
+    }
+    
+    PQsetRowProcessor(conn, savedRowProcessor, savedRowProcParam);
+
+    return ret;
+}
+
+/* * PQdescribePrepared *      Obtain information about a previously prepared statement
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index ce0eac3..d11cb3c 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/*
+ * pqGetnchar:
+ *    skip len bytes in input buffer.
+ */
+int
+pqSkipnchar(size_t len, PGconn *conn)
+{
+    if (len > (size_t) (conn->inEnd - conn->inCursor))
+        return EOF;
+
+    conn->inCursor += len;
+
+    if (conn->Pfdebug)
+        fprintf(conn->Pfdebug, "From backend (%lu skipped)\n",
+                (unsigned long) len);
+
+    return 0;
+}
+
+/* * pqPutnchar: *    write exactly len bytes to the current message */
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index a7c3899..36773cb 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -569,6 +569,8 @@ pqParseInput2(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, FALSE))                            return;
 
+                        /* getAnotherTuple moves inStart itself */
+                        continue;                    }                    else                    {
@@ -585,6 +587,8 @@ pqParseInput2(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, TRUE))                            return;
 
+                        /* getAnotherTuple moves inStart itself */
+                        continue;                    }                    else                    {
@@ -703,52 +707,55 @@ failure:/* * parseInput subroutine to read a 'B' or 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error
ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess
 
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, bool binary){    PGresult
*result= conn->result;    int            nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  *rowbuf;    /* the backend sends us a bitmap of which attributes are null */    char
std_bitmap[64];/* used unless it doesn't fit */    char       *bitmap = std_bitmap;    int            i;
 
+    int            rp;    size_t        nbytes;            /* the number of bytes in bitmap  */    char        bmap;
        /* One byte of the bitmap */    int            bitmap_index;    /* Its index */    int            bitcnt;
    /* number of bits examined in current byte */    int            vlen;            /* length of the current field
value*/
 
+    char        *errmsg = libpq_gettext("unknown error\n");
-    result->binary = binary;
-
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
+    /* resize row buffer if needed */
+    if (nfields > conn->rowBufLen)    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-
-        /*
-         * If it's binary, fix the column format indicators.  We assume the
-         * backend will consistently send either B or D, not a mix.
-         */
-        if (binary)
+        rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+        if (!rowbuf)        {
-            for (i = 0; i < nfields; i++)
-                result->attDescs[i].format = 1;
+            errmsg = libpq_gettext("out of memory for query result\n");
+            goto error_clearresult;        }
+        conn->rowBuf = rowbuf;
+        conn->rowBufLen = nfields;
+    }
+    else
+    {
+        rowbuf = conn->rowBuf;
+    }
+
+    result->binary = binary;
+
+    if (binary)
+    {
+        for (i = 0; i < nfields; i++)
+            result->attDescs[i].format = 1;    }
-    tup = conn->curTuple;    /* Get the null-value bitmap */    nbytes = (nfields + BITS_PER_BYTE - 1) /
BITS_PER_BYTE;
@@ -757,11 +764,15 @@ getAnotherTuple(PGconn *conn, bool binary)    {        bitmap = (char *) malloc(nbytes);
if(!bitmap)
 
-            goto outOfMemory;
+        {
+            errmsg = libpq_gettext("out of memory for query result\n");
+            goto error_clearresult;
+        }    }    if (pqGetnchar(bitmap, nbytes, conn))
-        goto EOFexit;
+        goto error_clearresult;
+    /* Scan the fields */    bitmap_index = 0;
@@ -771,34 +782,29 @@ getAnotherTuple(PGconn *conn, bool binary)    for (i = 0; i < nfields; i++)    {        if
(!(bmap& 0200))
 
-        {
-            /* if the field value is absent, make it a null string */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-        }
+            vlen = NULL_LEN;
+        else if (pqGetInt(&vlen, 4, conn))
+                goto EOFexit;        else        {
-            /* get the value length (the first four bytes are for length) */
-            if (pqGetInt(&vlen, 4, conn))
-                goto EOFexit;            if (!binary)                vlen = vlen - 4;            if (vlen < 0)
      vlen = 0;
 
-            if (tup[i].value == NULL)
-            {
-                tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary);
-                if (tup[i].value == NULL)
-                    goto outOfMemory;
-            }
-            tup[i].len = vlen;
-            /* read in the value */
-            if (vlen > 0)
-                if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                    goto EOFexit;
-            /* we have to terminate this ourselves */
-            tup[i].value[vlen] = '\0';        }
+
+        /*
+         * rowbuf[i].value always points to the next address of the
+         * length field even if the value is NULL, to allow safe
+         * size estimates and data copy.
+         */
+        rowbuf[i].value = conn->inBuffer + conn->inCursor;
+        rowbuf[i].len = vlen;
+
+        /* Skip the value */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            goto EOFexit;
+        /* advance the bitmap stuff */        bitcnt++;        if (bitcnt == BITS_PER_BYTE)
@@ -811,33 +817,64 @@ getAnotherTuple(PGconn *conn, bool binary)            bmap <<= 1;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
-    if (bitmap != std_bitmap)        free(bitmap);
-    return 0;
+    bitmap = NULL;
-outOfMemory:
-    /* Replace partially constructed result with an error result */
+    /* tag the row as parsed */
+    conn->inStart = conn->inCursor;
+
+    /* Pass the completed row values to rowProcessor */
+    rp= conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
+    if (rp == 1)
+        return 0;
+    else if (rp == 2 && pqIsnonblocking(conn))
+        /* processor requested early exit */
+        return EOF;
+    else if (rp == 0)
+    {
+        errmsg = result->errMsg;
+        result->errMsg = NULL;
+        if (errmsg == NULL)
+        {
+            /* If errmsg == NULL, we assume that the row processor
+             * notices out of memory. We should immediately free any
+             * space to go forward. */
+            errmsg = "out of memory";
+            goto error_clearresult;
+        }
+        /*
+         * We assume that some ancestor which has a relation with the
+         * row processor wants the result built halfway when row
+         * processor sets any errMsg for rp == 0.
+         */
+        goto error_saveresult;
+    }
+    errmsg = libpq_gettext("invalid return value from row processor\n");
+    /* FALL THROUGH */
+error_clearresult:    /*     * we do NOT use pqSaveErrorResult() here, because of the likelihood that     * there's
notenough memory to concatenate messages...     */    pqClearAsyncResult(conn);
 
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+error_saveresult:
+    /*
+     * If error message is passed from RowProcessor, set it into
+     * PGconn, assume out of memory if not.
+     */
+    printfPQExpBuffer(&conn->errorMessage, "%s", errmsg);
+        /*     * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can     * do to recover...     */
conn->result = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
 
+    conn->asyncStatus = PGASYNC_READY;
+    /* Discard the failed message --- good idea? */    conn->inStart = conn->inEnd;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 892dcbc..2693ce0 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -327,6 +327,9 @@ pqParseInput3(PGconn *conn)                        /* Read another tuple of a normal query response
*/                       if (getAnotherTuple(conn, msgLength))                            return;
 
+
+                        /* getAnotherTuple() moves inStart itself */
+                        continue;                    }                    else if (conn->result != NULL &&
               conn->result->resultStatus == PGRES_FATAL_ERROR)
 
@@ -613,33 +616,23 @@ failure:/* * parseInput subroutine to read a 'D' (row data) message.
- * We add another tuple to the existing PGresult structure.
+ * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error
ornot enough data yet. * * Note that if we run out of data, we have to suspend and reprocess
 
- * the message after more data is received.  We keep a partially constructed
- * tuple in conn->curTuple, and avoid reallocating already-allocated storage.
+ * the message after more data is received. */static intgetAnotherTuple(PGconn *conn, int msgLength){    PGresult
*result= conn->result;    int            nfields = result->numAttributes;
 
-    PGresAttValue *tup;
+    PGrowValue  *rowbuf;    int            tupnfields;        /* # fields from tuple */    int            vlen;
   /* length of the current field value */    int            i;
 
-
-    /* Allocate tuple space if first time for this data message */
-    if (conn->curTuple == NULL)
-    {
-        conn->curTuple = (PGresAttValue *)
-            pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE);
-        if (conn->curTuple == NULL)
-            goto outOfMemory;
-        MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue));
-    }
-    tup = conn->curTuple;
+    int            rp;
+    char        *errmsg = libpq_gettext("unknown error\n");    /* Get the field count and make sure it's what we
expect*/    if (pqGetInt(&tupnfields, 2, conn))
 
@@ -647,13 +640,22 @@ getAnotherTuple(PGconn *conn, int msgLength)    if (tupnfields != nfields)    {
-        /* Replace partially constructed result with an error result */
-        printfPQExpBuffer(&conn->errorMessage,
-                 libpq_gettext("unexpected field count in \"D\" message\n"));
-        pqSaveErrorResult(conn);
-        /* Discard the failed message by pretending we read it */
-        conn->inCursor = conn->inStart + 5 + msgLength;
-        return 0;
+        errmsg = libpq_gettext("unexpected field count in \"D\" message\n");
+        goto error_and_forward;
+    }
+
+    /* resize row buffer if needed */
+    rowbuf = conn->rowBuf;
+    if (nfields > conn->rowBufLen)
+    {
+        rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue));
+        if (!rowbuf)
+        {
+            errmsg = libpq_gettext("out of memory for query result\n");
+            goto error_and_forward;
+        }
+        conn->rowBuf = rowbuf;
+        conn->rowBufLen = nfields;    }    /* Scan the fields */
@@ -662,53 +664,88 @@ getAnotherTuple(PGconn *conn, int msgLength)        /* get the value length */        if
(pqGetInt(&vlen,4, conn))            return EOF;
 
+        if (vlen == -1)
-        {
-            /* null field */
-            tup[i].value = result->null_field;
-            tup[i].len = NULL_LEN;
-            continue;
-        }
-        if (vlen < 0)
+            vlen = NULL_LEN;
+        else if (vlen < 0)            vlen = 0;
-        if (tup[i].value == NULL)
-        {
-            bool        isbinary = (result->attDescs[i].format != 0);
-            tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary);
-            if (tup[i].value == NULL)
-                goto outOfMemory;
-        }
-        tup[i].len = vlen;
-        /* read in the value */
-        if (vlen > 0)
-            if (pqGetnchar((char *) (tup[i].value), vlen, conn))
-                return EOF;
-        /* we have to terminate this ourselves */
-        tup[i].value[vlen] = '\0';
+        /*
+         * rowbuf[i].value always points to the next address of the
+         * length field even if the value is NULL, to allow safe
+         * size estimates and data copy.
+         */
+        rowbuf[i].value = conn->inBuffer + conn->inCursor;
+        rowbuf[i].len = vlen;
+
+        /* Skip to the next length field */
+        if (vlen > 0 && pqSkipnchar(vlen, conn))
+            return EOF;    }
-    /* Success!  Store the completed tuple in the result */
-    if (!pqAddTuple(result, tup))
-        goto outOfMemory;
-    /* and reset for a new message */
-    conn->curTuple = NULL;
+    /* tag the row as parsed, check if correctly */
+    conn->inStart += 5 + msgLength;
+    if (conn->inCursor != conn->inStart)
+    {
+        errmsg = libpq_gettext("invalid row contents\n");
+        goto error_clearresult;
+    }
-    return 0;
+    /* Pass the completed row values to rowProcessor */
+    rp = conn->rowProcessor(result, rowbuf, conn->rowProcessorParam);
+    if (rp == 1)
+    {
+        /* everything is good */
+        return 0;
+    }
+    if (rp == 2 && pqIsnonblocking(conn))
+    {
+        /* processor requested early exit */
+        return EOF;
+    }
+
+    /* there was some problem */
+    if (rp == 0)
+    {
+        /*
+         * Unlink errMsg from result here to use it after
+         * pqClearAsyncResult() is called.
+         */
+        errmsg = result->errMsg;
+        result->errMsg = NULL;
+        if (errmsg == NULL)
+        {
+            /* If errmsg == NULL, we assume that the row processor
+             * notices out of memory. We should immediately free any
+             * space to go forward. */
+            errmsg = "out of memory";
+            goto error_clearresult;
+        }
+        /*
+         * We assume that some ancestor which has a relation with the
+         * row processor wants the result built halfway when row
+         * processor sets any errMsg for rp == 0.
+         */
+        goto error_saveresult;
+    }
-outOfMemory:
+    errmsg = libpq_gettext("invalid return value from row processor\n");
+    goto error_clearresult;
+
+error_and_forward:
+    /* Discard the failed message by pretending we read it */
+    conn->inCursor = conn->inStart + 5 + msgLength;
+error_clearresult:
+    pqClearAsyncResult(conn);
+    
+error_saveresult:    /*     * Replace partially constructed result with an error result. First     * discard the old
resultto try to win back some memory.     */
 
-    pqClearAsyncResult(conn);
-    printfPQExpBuffer(&conn->errorMessage,
-                      libpq_gettext("out of memory for query result\n"));
+    printfPQExpBuffer(&conn->errorMessage, "%s", errmsg);    pqSaveErrorResult(conn);
-
-    /* Discard the failed message by pretending we read it */
-    conn->inCursor = conn->inStart + 5 + msgLength;    return 0;}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ef26ab9..810b04e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -149,6 +149,17 @@ typedef struct pgNotify    struct pgNotify *next;        /* list link */} PGnotify;
+/* PGrowValue points a column value of in network buffer.
+ * Value is a string without null termination and length len.
+ * NULL is represented as len < 0, value points then to place
+ * where value would have been.
+ */
+typedef struct pgRowValue
+{
+    int            len;            /* length in bytes of the value */
+    char       *value;            /* actual value, without null termination */
+} PGrowValue;
+/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult
*res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message);
 
@@ -416,6 +427,38 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const *
keywords,            const char *const * values, int expand_dbname);
 
+/*
+ * Typedef for alternative row processor.
+ *
+ * Columns array will contain PQnfields() entries, each one
+ * pointing to particular column data in network buffer.
+ * This function is supposed to copy data out from there
+ * and store somewhere.  NULL is signified with len<0.
+ *
+ * This function must return 1 for success and must return 0 for
+ * failure and may set error message by PQresultSetErrMsg.  It is assumed by
+ * caller as out of memory when the error message is not set on
+ * failure. This function is assumed not to throw any exception.
+ */
+typedef int (*PQrowProcessor)(PGresult *res, PGrowValue *columns,
+                              void *param);
+
+/*
+ * Set alternative row data processor for PGconn.
+ *
+ * By registering this function, pg_result disables its own result
+ * store and calls it for rows one by one.
+ *
+ * func is row processor function. See the typedef RowProcessor.
+ *
+ * rowProcessorParam is the contextual variable that passed to
+ * RowProcessor.
+ */
+extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func,
+                                   void *rowProcessorParam);
+extern PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param);
+extern int  PQskipResult(PGconn *conn, int skipAll);
+/* Force the write buffer to be written (or at least try) */extern int    PQflush(PGconn *conn);
@@ -454,6 +497,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num,
intfield_num);extern int    PQgetlength(const PGresult *res, int tup_num, int field_num);extern int
PQgetisnull(constPGresult *res, int tup_num, int field_num);
 
+extern void    PQresultSetErrMsg(PGresult *res, const char *msg);extern int    PQnparams(const PGresult *res);extern
Oid   PQparamtype(const PGresult *res, int param_num);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 987311e..9cabd20 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -398,7 +398,6 @@ struct pg_conn    /* Status for asynchronous result construction */    PGresult   *result;
 /* result being constructed */
 
-    PGresAttValue *curTuple;    /* tuple currently being read */#ifdef USE_SSL    bool        allow_ssl_try;    /*
Allowedto try SSL negotiation */
 
@@ -443,6 +442,14 @@ struct pg_conn    /* Buffer for receiving various parts of messages */    PQExpBufferData
workBuffer;/* expansible string */
 
+
+    /*
+     * Read column data from network buffer.
+     */
+    PQrowProcessor rowProcessor;/* Function pointer */
+    void *rowProcessorParam;    /* Contextual parameter for rowProcessor */
+    PGrowValue *rowBuf;            /* Buffer for passing values to rowProcessor */
+    int rowBufLen;                /* Number of columns allocated in rowBuf */};/* PGcancel stores all data necessary
tocancel a connection. A copy of this 
@@ -560,6 +567,7 @@ extern int    pqGets(PQExpBuffer buf, PGconn *conn);extern int    pqGets_append(PQExpBuffer buf,
PGconn*conn);extern int    pqPuts(const char *s, PGconn *conn);extern int    pqGetnchar(char *s, size_t len, PGconn
*conn);
+extern int    pqSkipnchar(size_t len, PGconn *conn);extern int    pqPutnchar(const char *s, size_t len, PGconn
*conn);externint    pqGetInt(int *result, size_t bytes, PGconn *conn);extern int    pqPutInt(int value, size_t bytes,
PGconn*conn); 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 72c9384..5ef89e7 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -7233,6 +7233,332 @@ int PQisthreadsafe(); </sect1>
+ <sect1 id="libpq-altrowprocessor">
+  <title>Alternative row processor</title>
+
+  <indexterm zone="libpq-altrowprocessor">
+   <primary>PGresult</primary>
+   <secondary>PGconn</secondary>
+  </indexterm>
+
+  <para>
+   As the standard usage, rows are stored into <type>PGresult</type>
+   until full resultset is received.  Then such completely-filled
+   <type>PGresult</type> is passed to user.  This behavior can be
+   changed by registering alternative row processor function,
+   that will see each row data as soon as it is received
+   from network.  It has the option of processing the data
+   immediately, or storing it into custom container.
+  </para>
+
+  <para>
+   Note - as row processor sees rows as they arrive, it cannot know
+   whether the SQL statement actually finishes successfully on server
+   or not.  So some care must be taken to get proper
+   transactionality.
+  </para>
+
+  <variablelist>
+   <varlistentry id="libpq-pqsetrowprocessor">
+    <term>
+     <function>PQsetRowProcessor</function>
+     <indexterm>
+      <primary>PQsetRowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       Sets a callback function to process each row.
+<synopsis>
+void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
+</synopsis>
+     </para>
+     
+     <para>
+       <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object to set the row processor function.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>func</parameter></term>
+       <listitem>
+         <para>
+           Storage handler function to set. NULL means to use the
+           default processor.
+         </para>
+       </listitem>
+     </varlistentry>
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+           A pointer to contextual parameter passed
+           to <parameter>func</parameter>.
+         </para>
+       </listitem>
+     </varlistentry>
+       </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqrowprocessor">
+    <term>
+     <type>PQrowProcessor</type>
+     <indexterm>
+      <primary>PQrowProcessor</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+       The type for the row processor callback function.
+<synopsis>
+int (*PQrowProcessor)(PGresult *res, PGrowValue *columns, void *param);
+
+typedef struct
+{
+    int         len;            /* length in bytes of the value, -1 if NULL */
+    char       *value;          /* actual value, without null termination */
+} PGrowValue;
+</synopsis>
+     </para>
+
+     <para>
+      The <parameter>columns</parameter> array will have PQnfields()
+      elements, each one pointing to column value in network buffer.
+      The <parameter>len</parameter> field will contain number of
+      bytes in value.  If the field value is NULL then
+      <parameter>len</parameter> will be -1 and value will point
+      to position where the value would have been in buffer.
+      This allows estimating row size by pointer arithmetic.
+     </para>
+
+     <para>
+       This function must process or copy row values away from network
+       buffer before it returns, as next row might overwrite them.
+     </para>
+
+     <para>
+       This function must return 1 for success, and 0 for failure.  On
+       failure the caller assumes the error as an out of memory and
+       releases the PGresult under construction. If you set any
+       message with <function>PQresultSetErrMsg</function>, it is set
+       as the PGconn's error message and the PGresult will be
+       preserved.  When non-blocking API is in use, it can also return
+       2 for early exit from <function>PQisBusy</function> function.
+       The supplied <parameter>res</parameter>
+       and <parameter>columns</parameter> values will stay valid so
+       row can be processed outside of callback.  Caller is
+       responsible for tracking whether
+       the <parameter>PQisBusy</parameter> returned early from
+       callback or for other reasons.  Usually this should happen via
+       setting cached values to NULL before
+       calling <function>PQisBusy</function>.
+     </para>
+
+     <para>
+       The function is allowed to exit via exception (setjmp/longjmp).
+       The connection and row are guaranteed to be in valid state.
+       The connection can later be closed
+       via <function>PQfinish</function>.  Processing can also be
+       continued without closing the connection,
+       call <function>getResult</function> on synchronous mode,
+       <function>PQisBusy</function> on asynchronous connection.  Then
+       processing will continue with new row, previous row that got
+       exception will be skipped. Or you can discard all remaining
+       rows by calling <function>PQskipResult</function> without
+       closing connection.
+     </para>
+
+     <variablelist>
+       <varlistentry>
+
+     <term><parameter>res</parameter></term>
+     <listitem>
+       <para>
+         A pointer to the <type>PGresult</type> object.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>columns</parameter></term>
+     <listitem>
+       <para>
+         Column values of the row to process.  Column values
+         are located in network buffer, the processor must
+         copy them out from there.
+       </para>
+       <para>
+         Column values are not null-terminated, so processor cannot
+         use C string functions on them directly.
+       </para>
+     </listitem>
+       </varlistentry>
+       <varlistentry>
+
+     <term><parameter>param</parameter></term>
+     <listitem>
+       <para>
+         Extra parameter that was given to <function>PQsetRowProcessor</function>.
+       </para>
+     </listitem>
+       </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqskipresult">
+    <term>
+     <function>PQskipResult</function>
+     <indexterm>
+      <primary>PQskipResult</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+        Discard all the remaining row data
+        after <function>PQexec</function>
+        or <function>PQgetResult</function> exits by the exception raised
+        in <type>RowProcessor</type> without closing connection.
+<synopsis>
+void PQskipResult(PGconn *conn, int skipAll)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><parameter>skipAll</parameter></term>
+       <listitem>
+         <para>
+           Skip remaining rows in current result
+           if <parameter>skipAll</parameter> is false(0). Skip
+           remaining rows in current result and all rows in
+           succeeding results if true(non-zero).
+         </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqresultseterrmsg">
+    <term>
+     <function>PQresultSetErrMsg</function>
+     <indexterm>
+      <primary>PQresultSetErrMsg</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+    Set the message for the error occurred
+    in <type>PQrowProcessor</type>.  If this message is not set, the
+    caller assumes the error to be `unknown' error.
+<synopsis>
+void PQresultSetErrMsg(PGresult *res, const char *msg)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+      <varlistentry>
+        <term><parameter>res</parameter></term>
+        <listitem>
+          <para>
+        A pointer to the <type>PGresult</type> object
+        passed to <type>PQrowProcessor</type>.
+          </para>
+        </listitem>
+      </varlistentry>
+      <varlistentry>
+        <term><parameter>msg</parameter></term>
+        <listitem>
+          <para>
+        Error message. This will be copied internally so there is
+        no need to care of the scope.
+          </para>
+          <para>
+        If <parameter>res</parameter> already has a message previously
+        set, it will be overwritten. Set NULL to cancel the the custom
+        message.
+          </para>
+        </listitem>
+      </varlistentry>
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <variablelist>
+   <varlistentry id="libpq-pqgetrowprcessor">
+    <term>
+     <function>PQgetRowProcessor</function>
+     <indexterm>
+      <primary>PQgetRowProcessor</primary>
+     </indexterm>
+    </term>
+    <listitem>
+      <para>
+       Get row processor and its context parameter currently set to
+       the connection.
+<synopsis>
+PQrowProcessor PQgetRowProcessor(PGconn *conn, void **param)
+</synopsis>
+      </para>
+      <para>
+    <variablelist>
+     <varlistentry>
+       <term><parameter>conn</parameter></term>
+       <listitem>
+         <para>
+           The connection object.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><parameter>param</parameter></term>
+       <listitem>
+         <para>
+              Set the current row processor parameter of the
+              connection here if not NULL.
+         </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+      </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+ </sect1>
+
+ <sect1 id="libpq-build">  <title>Building <application>libpq</application> Programs</title>
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..8bf0759 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    char **cstrs;
+    bool error_occurred;
+    bool nummismatch;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,51 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */
 
-    res = PQexec(conn, buf.data);
+    PG_TRY();
+    {
+        res = PQexec(conn, buf.data);
+    }
+    PG_CATCH();
+    {
+        ErrorData *edata;
+
+        finishStoreInfo(&storeinfo);
+        edata = CopyErrorData();
+        FlushErrorState();
+
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipResult(conn, FALSE);
+        ReThrowError(edata);
+    }
+    PG_END_TRY();
+
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +632,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +693,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -660,6 +714,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)        {            /*
text,text,bool*/            DBLINK_GET_CONN;
 
+            conname = text_to_cstring(PG_GETARG_TEXT_PP(0));            sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
        fail = PG_GETARG_BOOL(2);        }
 
@@ -675,6 +730,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)            else            {
      DBLINK_GET_CONN;
 
+                conname = text_to_cstring(PG_GETARG_TEXT_PP(0));                sql =
text_to_cstring(PG_GETARG_TEXT_PP(1));           }        }
 
@@ -705,6 +761,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)        else            /* shouldn't
happen*/            elog(ERROR, "wrong number of arguments");
 
+
+        conname = text_to_cstring(PG_GETARG_TEXT_PP(0));    }    if (!conn)
@@ -715,164 +773,257 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */
-    if (!is_async)
-        res = PQexec(conn, sql);
-    else
+    PG_TRY();    {
-        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
+        if (!is_async)
+            res = PQexec(conn, sql);
+        else
+            res = PQgetResult(conn);    }
+    PG_CATCH();
+    {
+        ErrorData *edata;
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+        finishStoreInfo(&storeinfo);
+        edata = CopyErrorData();
+        FlushErrorState();
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipResult(conn, FALSE);
+        ReThrowError(edata);
+    }
+    PG_END_TRY();
+
+    finishStoreInfo(&storeinfo);
+
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
+    int i;
+
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    {
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
-    Assert(rsinfo->returnMode == SFRM_Materialize);
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
-    PG_TRY();
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->valbuf = NULL;
+    sinfo->valbuflen = NULL;
+
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
+    if (sinfo->valbuf)
+        sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    if (sinfo->valbuflen)
+        sinfo->cstrs = (char **)malloc(sinfo->nattrs * sizeof(char*));
+
+    if (sinfo->cstrs == NULL)    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        if (sinfo->valbuf)
+            free(sinfo->valbuf);
+        if (sinfo->valbuflen)
+            free(sinfo->valbuflen);
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
+        ereport(ERROR,
+                (errcode(ERRCODE_OUT_OF_MEMORY),
+                 errmsg("out of memory")));
+    }
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+    for (i = 0 ; i < sinfo->nattrs ; i++)
+    {
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-            is_sql_cmd = false;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)
+        {
+            if (sinfo->valbuf[i])
+                free(sinfo->valbuf[i]);        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->cstrs)
+    {
+        free(sinfo->cstrs);
+        sinfo->cstrs = NULL;
+    }
-            values = (char **) palloc(nfields * sizeof(char *));
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      **cstrs = sinfo->cstrs;
-                if (!is_sql_cmd)
-                {
-                    int            i;
+    if (sinfo->error_occurred)
+    {
+        PQresultSetErrMsg(res, "storeHandler is called after error\n");
+        return FALSE;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
+
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        
+        PQresultSetErrMsg(res, "unexpected field count in \"D\" message\n");
+        return FALSE;
+    }
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)
+    {
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            char *tmp = sinfo->valbuf[i];
+            int tmplen = sinfo->valbuflen[i];
+
+            /*
+             * Divide calls to malloc and realloc so that things will
+             * go fine even on the systems of which realloc() does not
+             * accept NULL as old memory block.
+             *
+             * Also try to (re)allocate in bigger steps to
+             * avoid flood of allocations on weird data.
+             */
+            if (tmp == NULL)
+            {
+                tmplen = len + 1;
+                if (tmplen < 64)
+                    tmplen = 64;
+                tmp = (char *)malloc(tmplen);
+            }
+            else if (tmplen < len + 1)
+            {
+                if (len + 1 > tmplen * 2)
+                    tmplen = len + 1;
+                else
+                    tmplen = tmplen * 2;
+                tmp = (char *)realloc(tmp, tmplen);            }
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+            /*
+             * sinfo->valbuf[n] will be freed in finishStoreInfo()
+             * when realloc returns NULL.
+             */
+            if (tmp == NULL)
+                return FALSE;  /* Inform out of memory to the caller */
-        PQclear(res);
-    }
-    PG_CATCH();
-    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+            sinfo->valbuf[i] = tmp;
+            sinfo->valbuflen[i] = tmplen;
+
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';
+        }    }
-    PG_END_TRY();
+
+    /*
+     * These functions may throw exception. It will be caught in
+     * dblink_record_internal()
+     */
+    tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+    tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+    return TRUE;}/*
diff --git b/doc/src/sgml/libpq.sgml a/doc/src/sgml/libpq.sgml
index 1245e85..5ef89e7 100644
--- b/doc/src/sgml/libpq.sgml
+++ a/doc/src/sgml/libpq.sgml
@@ -7353,7 +7353,16 @@ typedef struct       releases the PGresult under construction. If you set any       message with
<function>PQresultSetErrMsg</function>,it is set       as the PGconn's error message and the PGresult will be
 
-       preserved.
+       preserved.  When non-blocking API is in use, it can also return
+       2 for early exit from <function>PQisBusy</function> function.
+       The supplied <parameter>res</parameter>
+       and <parameter>columns</parameter> values will stay valid so
+       row can be processed outside of callback.  Caller is
+       responsible for tracking whether
+       the <parameter>PQisBusy</parameter> returned early from
+       callback or for other reasons.  Usually this should happen via
+       setting cached values to NULL before
+       calling <function>PQisBusy</function>.     </para>     <para>
diff --git b/src/interfaces/libpq/fe-protocol2.c a/src/interfaces/libpq/fe-protocol2.c
index 6555f85..36773cb 100644
--- b/src/interfaces/libpq/fe-protocol2.c
+++ a/src/interfaces/libpq/fe-protocol2.c
@@ -828,6 +828,9 @@ getAnotherTuple(PGconn *conn, bool binary)    rp= conn->rowProcessor(result, rowbuf,
conn->rowProcessorParam);   if (rp == 1)        return 0;
 
+    else if (rp == 2 && pqIsnonblocking(conn))
+        /* processor requested early exit */
+        return EOF;    else if (rp == 0)    {        errmsg = result->errMsg;
diff --git b/src/interfaces/libpq/fe-protocol3.c a/src/interfaces/libpq/fe-protocol3.c
index 3725de2..2693ce0 100644
--- b/src/interfaces/libpq/fe-protocol3.c
+++ a/src/interfaces/libpq/fe-protocol3.c
@@ -698,6 +698,11 @@ getAnotherTuple(PGconn *conn, int msgLength)        /* everything is good */        return 0;
}
+    if (rp == 2 && pqIsnonblocking(conn))
+    {
+        /* processor requested early exit */
+        return EOF;
+    }    /* there was some problem */    if (rp == 0)

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Simon Riggs
Дата:
Сообщение: Re: foreign key locks, 2nd attempt
Следующее
От: Fujii Masao
Дата:
Сообщение: Re: pg_basebackup -x stream from the standby gets stuck