Re: [patch] libpq one-row-at-a-time API

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [patch] libpq one-row-at-a-time API
Дата
Msg-id 11466.1343884786@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [patch] libpq one-row-at-a-time API  (Marko Kreen <markokr@gmail.com>)
Ответы Re: [patch] libpq one-row-at-a-time API
Список pgsql-hackers
Marko Kreen <markokr@gmail.com> writes:
> On Wed, Aug 1, 2012 at 6:18 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> So I'm working from the first set of patches in your message
>> <20120721194907.GA28021@gmail.com>.

> Great!

Here's an updated draft patch.  I've not looked at the docs yet so
this doesn't include that, but I'm reasonably happy with the code
changes now.  The main difference from what you had is that I pushed
the creation of the single-row PGresults into pqRowProcessor, so that
they're formed immediately while scanning the input message.  That
other method with postponing examination of the rowBuf does not work,
any more than it did with the original patch, because you can't assume
that the network input buffer is going to hold still.  For example,
calling PQconsumeInput after parseInput has absorbed a row-data message
but before calling PQgetResult would likely break things.

In principle I suppose we could hack PQconsumeInput enough that it
didn't damage the row buffer while still meeting its API contract of
clearing the read-ready condition on the socket; but it wouldn't be
simple or cheap to do so.

            regards, tom lane

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 1e62d8091a9d2bdf60af6745d5a01ee14ee5cf5a..7cf86176c2385b9e4ee37c72d7e3c662ea079f7a 100644
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
*************** typedef struct storeInfo
*** 70,75 ****
--- 70,78 ----
      AttInMetadata *attinmeta;
      MemoryContext tmpcontext;
      char      **cstrs;
+     /* temp storage for results to avoid leaks on exception */
+     PGresult   *last_res;
+     PGresult   *cur_res;
  } storeInfo;

  /*
*************** static void materializeQueryResult(Funct
*** 83,90 ****
                         const char *conname,
                         const char *sql,
                         bool fail);
! static int storeHandler(PGresult *res, const PGdataValue *columns,
!              const char **errmsgp, void *param);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
--- 86,93 ----
                         const char *conname,
                         const char *sql,
                         bool fail);
! static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
! static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
  static remoteConn *getConnectionByName(const char *name);
  static HTAB *createConnHash(void);
  static void createNewConnection(const char *name, remoteConn *rconn);
*************** materializeResult(FunctionCallInfo fcinf
*** 927,934 ****
  /*
   * Execute the given SQL command and store its results into a tuplestore
   * to be returned as the result of the current function.
   * This is equivalent to PQexec followed by materializeResult, but we make
!  * use of libpq's "row processor" API to reduce per-row overhead.
   */
  static void
  materializeQueryResult(FunctionCallInfo fcinfo,
--- 930,939 ----
  /*
   * Execute the given SQL command and store its results into a tuplestore
   * to be returned as the result of the current function.
+  *
   * This is equivalent to PQexec followed by materializeResult, but we make
!  * use of libpq's single-row mode to avoid accumulating the whole result
!  * inside libpq before it gets transferred to the tuplestore.
   */
  static void
  materializeQueryResult(FunctionCallInfo fcinfo,
*************** materializeQueryResult(FunctionCallInfo
*** 944,962 ****
      /* prepTuplestoreResult must have been called previously */
      Assert(rsinfo->returnMode == SFRM_Materialize);

      PG_TRY();
      {
!         /* initialize storeInfo to empty */
!         memset(&sinfo, 0, sizeof(sinfo));
!         sinfo.fcinfo = fcinfo;
!
!         /* We'll collect tuples using storeHandler */
!         PQsetRowProcessor(conn, storeHandler, &sinfo);
!
!         res = PQexec(conn, sql);
!
!         /* We don't keep the custom row processor installed permanently */
!         PQsetRowProcessor(conn, NULL, NULL);

          if (!res ||
              (PQresultStatus(res) != PGRES_COMMAND_OK &&
--- 949,962 ----
      /* prepTuplestoreResult must have been called previously */
      Assert(rsinfo->returnMode == SFRM_Materialize);

+     /* initialize storeInfo to empty */
+     memset(&sinfo, 0, sizeof(sinfo));
+     sinfo.fcinfo = fcinfo;
+
      PG_TRY();
      {
!         /* execute query, collecting any tuples into the tuplestore */
!         res = storeQueryResult(&sinfo, conn, sql);

          if (!res ||
              (PQresultStatus(res) != PGRES_COMMAND_OK &&
*************** materializeQueryResult(FunctionCallInfo
*** 975,982 ****
          else if (PQresultStatus(res) == PGRES_COMMAND_OK)
          {
              /*
!              * storeHandler didn't get called, so we need to convert the
!              * command status string to a tuple manually
               */
              TupleDesc    tupdesc;
              AttInMetadata *attinmeta;
--- 975,982 ----
          else if (PQresultStatus(res) == PGRES_COMMAND_OK)
          {
              /*
!              * storeRow didn't get called, so we need to convert the command
!              * status string to a tuple manually
               */
              TupleDesc    tupdesc;
              AttInMetadata *attinmeta;
*************** materializeQueryResult(FunctionCallInfo
*** 1008,1032 ****
              tuplestore_puttuple(tupstore, tuple);

              PQclear(res);
          }
          else
          {
              Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
!             /* storeHandler should have created a tuplestore */
              Assert(rsinfo->setResult != NULL);

              PQclear(res);
          }
      }
      PG_CATCH();
      {
-         /* be sure to unset the custom row processor */
-         PQsetRowProcessor(conn, NULL, NULL);
          /* be sure to release any libpq result we collected */
!         if (res)
!             PQclear(res);
          /* and clear out any pending data in libpq */
!         while ((res = PQskipResult(conn)) != NULL)
              PQclear(res);
          PG_RE_THROW();
      }
--- 1008,1037 ----
              tuplestore_puttuple(tupstore, tuple);

              PQclear(res);
+             res = NULL;
          }
          else
          {
              Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
!             /* storeRow should have created a tuplestore */
              Assert(rsinfo->setResult != NULL);

              PQclear(res);
+             res = NULL;
          }
+         PQclear(sinfo.last_res);
+         sinfo.last_res = NULL;
+         PQclear(sinfo.cur_res);
+         sinfo.cur_res = NULL;
      }
      PG_CATCH();
      {
          /* be sure to release any libpq result we collected */
!         PQclear(res);
!         PQclear(sinfo.last_res);
!         PQclear(sinfo.cur_res);
          /* and clear out any pending data in libpq */
!         while ((res = PQgetResult(conn)) != NULL)
              PQclear(res);
          PG_RE_THROW();
      }
*************** materializeQueryResult(FunctionCallInfo
*** 1034,1056 ****
  }

  /*
!  * Custom row processor for materializeQueryResult.
!  * Prototype of this function must match PQrowProcessor.
   */
! static int
! storeHandler(PGresult *res, const PGdataValue *columns,
!              const char **errmsgp, void *param)
  {
-     storeInfo  *sinfo = (storeInfo *) param;
      int            nfields = PQnfields(res);
-     char      **cstrs = sinfo->cstrs;
      HeapTuple    tuple;
-     char       *pbuf;
-     int            pbuflen;
      int            i;
      MemoryContext oldcontext;

!     if (columns == NULL)
      {
          /* Prepare for new result set */
          ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
--- 1039,1108 ----
  }

  /*
!  * Execute query, and send any result rows to sinfo->tuplestore.
   */
! static PGresult *
! storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
! {
!     bool        first = true;
!     PGresult   *res;
!
!     if (!PQsendQuery(conn, sql))
!         return PQgetResult(conn);
!
!     if (!PQsetSingleRowMode(conn))        /* shouldn't fail */
!         elog(ERROR, "dblink: failed to set single-row mode");
!
!     for (;;)
!     {
!         CHECK_FOR_INTERRUPTS();
!
!         sinfo->cur_res = PQgetResult(conn);
!         if (!sinfo->cur_res)
!             break;
!
!         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
!         {
!             /* got one row from possibly-bigger resultset */
!             storeRow(sinfo, sinfo->cur_res, first);
!
!             PQclear(sinfo->cur_res);
!             sinfo->cur_res = NULL;
!             first = false;
!         }
!         else
!         {
!             /* if empty resultset, fill tuplestore header */
!             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
!                 storeRow(sinfo, sinfo->cur_res, first);
!
!             /* store completed result at last_res */
!             PQclear(sinfo->last_res);
!             sinfo->last_res = sinfo->cur_res;
!             sinfo->cur_res = NULL;
!             first = true;
!         }
!     }
!
!     /* return last_res */
!     res = sinfo->last_res;
!     sinfo->last_res = NULL;
!     return res;
! }
!
! /*
!  * Send single row to sinfo->tuplestore.
!  * If "first" is true, create the tuplestore using PGresult's metadata.
!  */
! static void
! storeRow(storeInfo *sinfo, PGresult *res, bool first)
  {
      int            nfields = PQnfields(res);
      HeapTuple    tuple;
      int            i;
      MemoryContext oldcontext;

!     if (first)
      {
          /* Prepare for new result set */
          ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
*************** storeHandler(PGresult *res, const PGdata
*** 1098,1110 ****
          sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);

          /* Create a new, empty tuplestore */
!         oldcontext = MemoryContextSwitchTo(
!                                     rsinfo->econtext->ecxt_per_query_memory);
          sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
          rsinfo->setResult = sinfo->tuplestore;
          rsinfo->setDesc = tupdesc;
          MemoryContextSwitchTo(oldcontext);

          /*
           * Set up sufficiently-wide string pointers array; this won't change
           * in size so it's easy to preallocate.
--- 1150,1165 ----
          sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);

          /* Create a new, empty tuplestore */
!         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
          sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
          rsinfo->setResult = sinfo->tuplestore;
          rsinfo->setDesc = tupdesc;
          MemoryContextSwitchTo(oldcontext);

+         /* Done if empty resultset */
+         if (PQntuples(res) == 0)
+             return;
+
          /*
           * Set up sufficiently-wide string pointers array; this won't change
           * in size so it's easy to preallocate.
*************** storeHandler(PGresult *res, const PGdata
*** 1121,1131 ****
                                        ALLOCSET_DEFAULT_MINSIZE,
                                        ALLOCSET_DEFAULT_INITSIZE,
                                        ALLOCSET_DEFAULT_MAXSIZE);
-
-         return 1;
      }

!     CHECK_FOR_INTERRUPTS();

      /*
       * Do the following work in a temp context that we reset after each tuple.
--- 1176,1185 ----
                                        ALLOCSET_DEFAULT_MINSIZE,
                                        ALLOCSET_DEFAULT_INITSIZE,
                                        ALLOCSET_DEFAULT_MAXSIZE);
      }

!     /* Should have a single-row result if we get here */
!     Assert(PQntuples(res) == 1);

      /*
       * Do the following work in a temp context that we reset after each tuple.
*************** storeHandler(PGresult *res, const PGdata
*** 1135,1180 ****
      oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);

      /*
!      * The strings passed to us are not null-terminated, but the datatype
!      * input functions we're about to call require null termination.  Copy the
!      * strings and add null termination.  As a micro-optimization, allocate
!      * all the strings with one palloc.
       */
-     pbuflen = nfields;            /* count the null terminators themselves */
-     for (i = 0; i < nfields; i++)
-     {
-         int            len = columns[i].len;
-
-         if (len > 0)
-             pbuflen += len;
-     }
-     pbuf = (char *) palloc(pbuflen);
-
      for (i = 0; i < nfields; i++)
      {
!         int            len = columns[i].len;
!
!         if (len < 0)
!             cstrs[i] = NULL;
          else
!         {
!             cstrs[i] = pbuf;
!             memcpy(pbuf, columns[i].value, len);
!             pbuf += len;
!             *pbuf++ = '\0';
!         }
      }

      /* Convert row to a tuple, and add it to the tuplestore */
!     tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);

      tuplestore_puttuple(sinfo->tuplestore, tuple);

      /* Clean up */
      MemoryContextSwitchTo(oldcontext);
      MemoryContextReset(sinfo->tmpcontext);
-
-     return 1;
  }

  /*
--- 1189,1212 ----
      oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);

      /*
!      * Fill cstrs with null-terminated strings of column values.
       */
      for (i = 0; i < nfields; i++)
      {
!         if (PQgetisnull(res, 0, i))
!             sinfo->cstrs[i] = NULL;
          else
!             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
      }

      /* Convert row to a tuple, and add it to the tuplestore */
!     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);

      tuplestore_puttuple(sinfo->tuplestore, tuple);

      /* Clean up */
      MemoryContextSwitchTo(oldcontext);
      MemoryContextReset(sinfo->tmpcontext);
  }

  /*
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 1251455f1f6d92c74e206b5a9b8dcdeb36ac9b98..9d95e262be3fbf26731c0926013db56dbc4e00ab 100644
*** a/src/interfaces/libpq/exports.txt
--- b/src/interfaces/libpq/exports.txt
*************** PQconnectStartParams      157
*** 160,165 ****
  PQping                    158
  PQpingParams              159
  PQlibVersion              160
! PQsetRowProcessor         161
! PQgetRowProcessor         162
! PQskipResult              163
--- 160,163 ----
  PQping                    158
  PQpingParams              159
  PQlibVersion              160
! PQsetSingleRowMode        161
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index a32258a8cbab5e655484d0471c031864fa5084de..adaab7aaade60a980b7eaf5b6e9734444c72b930 100644
*** a/src/interfaces/libpq/fe-connect.c
--- b/src/interfaces/libpq/fe-connect.c
*************** makeEmptyPGconn(void)
*** 2709,2716 ****
      /* Zero all pointers and booleans */
      MemSet(conn, 0, sizeof(PGconn));

!     /* install default row processor and notice hooks */
!     PQsetRowProcessor(conn, NULL, NULL);
      conn->noticeHooks.noticeRec = defaultNoticeReceiver;
      conn->noticeHooks.noticeProc = defaultNoticeProcessor;

--- 2709,2715 ----
      /* Zero all pointers and booleans */
      MemSet(conn, 0, sizeof(PGconn));

!     /* install default notice hooks */
      conn->noticeHooks.noticeRec = defaultNoticeReceiver;
      conn->noticeHooks.noticeProc = defaultNoticeProcessor;

*************** conninfo_uri_parse_options(PQconninfoOpt
*** 4658,4664 ****
          if (p == host)
          {
              printfPQExpBuffer(errorMessage,
!             libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"),
                                uri);
              goto cleanup;
          }
--- 4657,4663 ----
          if (p == host)
          {
              printfPQExpBuffer(errorMessage,
!                               libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"),
                                uri);
              goto cleanup;
          }
*************** conninfo_uri_parse_params(char *params,
*** 4878,4884 ****

              printfPQExpBuffer(errorMessage,
                                libpq_gettext(
!                                      "invalid URI query parameter: \"%s\"\n"),
                                keyword);
              return false;
          }
--- 4877,4883 ----

              printfPQExpBuffer(errorMessage,
                                libpq_gettext(
!                                     "invalid URI query parameter: \"%s\"\n"),
                                keyword);
              return false;
          }
*************** conninfo_uri_decode(const char *str, PQE
*** 4943,4949 ****
              if (!(get_hexdigit(*q++, &hi) && get_hexdigit(*q++, &lo)))
              {
                  printfPQExpBuffer(errorMessage,
!                         libpq_gettext("invalid percent-encoded token: \"%s\"\n"),
                                    str);
                  free(buf);
                  return NULL;
--- 4942,4948 ----
              if (!(get_hexdigit(*q++, &hi) && get_hexdigit(*q++, &lo)))
              {
                  printfPQExpBuffer(errorMessage,
!                     libpq_gettext("invalid percent-encoded token: \"%s\"\n"),
                                    str);
                  free(buf);
                  return NULL;
*************** static void
*** 5594,5601 ****
  dot_pg_pass_warning(PGconn *conn)
  {
      /* If it was 'invalid authorization', add .pgpass mention */
-     if (conn->dot_pgpass_used && conn->password_needed && conn->result &&
      /* only works with >= 9.0 servers */
          strcmp(PQresultErrorField(conn->result, PG_DIAG_SQLSTATE),
                 ERRCODE_INVALID_PASSWORD) == 0)
      {
--- 5593,5600 ----
  dot_pg_pass_warning(PGconn *conn)
  {
      /* If it was 'invalid authorization', add .pgpass mention */
      /* only works with >= 9.0 servers */
+     if (conn->dot_pgpass_used && conn->password_needed && conn->result &&
          strcmp(PQresultErrorField(conn->result, PG_DIAG_SQLSTATE),
                 ERRCODE_INVALID_PASSWORD) == 0)
      {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index badc0b32a8e8f3e33e42729a8f8899475a0de31b..53516db723492f11fc9f4bdc6d4d351693c4c22f 100644
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
*************** char       *const pgresStatus[] = {
*** 38,44 ****
      "PGRES_BAD_RESPONSE",
      "PGRES_NONFATAL_ERROR",
      "PGRES_FATAL_ERROR",
!     "PGRES_COPY_BOTH"
  };

  /*
--- 38,45 ----
      "PGRES_BAD_RESPONSE",
      "PGRES_NONFATAL_ERROR",
      "PGRES_FATAL_ERROR",
!     "PGRES_COPY_BOTH",
!     "PGRES_SINGLE_TUPLE"
  };

  /*
*************** static bool static_std_strings = false;
*** 51,58 ****

  static PGEvent *dupEvents(PGEvent *events, int count);
  static bool pqAddTuple(PGresult *res, PGresAttValue *tup);
- static int pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
-                   const char **errmsgp, void *param);
  static bool PQsendQueryStart(PGconn *conn);
  static int PQsendQueryGuts(PGconn *conn,
                  const char *command,
--- 52,57 ----
*************** static int PQsendQueryGuts(PGconn *conn,
*** 64,71 ****
                  const int *paramFormats,
                  int resultFormat);
  static void parseInput(PGconn *conn);
- static int dummyRowProcessor(PGresult *res, const PGdataValue *columns,
-                   const char **errmsgp, void *param);
  static bool PQexecStart(PGconn *conn);
  static PGresult *PQexecFinish(PGconn *conn);
  static int PQsendDescribe(PGconn *conn, char desc_type,
--- 63,68 ----
*************** PQmakeEmptyPGresult(PGconn *conn, ExecSt
*** 181,186 ****
--- 178,184 ----
              case PGRES_COPY_OUT:
              case PGRES_COPY_IN:
              case PGRES_COPY_BOTH:
+             case PGRES_SINGLE_TUPLE:
                  /* non-error cases */
                  break;
              default:
*************** PQclear(PGresult *res)
*** 698,703 ****
--- 696,703 ----

  /*
   * Handy subroutine to deallocate any partially constructed async result.
+  *
+  * Any "next" result gets cleared too.
   */
  void
  pqClearAsyncResult(PGconn *conn)
*************** pqClearAsyncResult(PGconn *conn)
*** 705,710 ****
--- 705,713 ----
      if (conn->result)
          PQclear(conn->result);
      conn->result = NULL;
+     if (conn->next_result)
+         PQclear(conn->next_result);
+     conn->next_result = NULL;
  }

  /*
*************** pqPrepareAsyncResult(PGconn *conn)
*** 758,764 ****
       * conn->errorMessage.
       */
      res = conn->result;
-     conn->result = NULL;        /* handing over ownership to caller */
      if (!res)
          res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR);
      else
--- 761,766 ----
*************** pqPrepareAsyncResult(PGconn *conn)
*** 771,776 ****
--- 773,788 ----
          appendPQExpBufferStr(&conn->errorMessage,
                               PQresultErrorMessage(res));
      }
+
+     /*
+      * Replace conn->result with next_result, if any.  In the normal case
+      * there isn't a next result and we're just dropping ownership of the
+      * current result.    In single-row mode this restores the situation to what
+      * it was before we created the current single-row result.
+      */
+     conn->result = conn->next_result;
+     conn->next_result = NULL;
+
      return res;
  }

*************** pqSaveParameterStatus(PGconn *conn, cons
*** 981,1065 ****


  /*
!  * PQsetRowProcessor
!  *      Set function that copies row data out from the network buffer,
!  *      along with a passthrough parameter for it.
!  */
! void
! PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param)
! {
!     if (!conn)
!         return;
!
!     if (func)
!     {
!         /* set custom row processor */
!         conn->rowProcessor = func;
!         conn->rowProcessorParam = param;
!     }
!     else
!     {
!         /* set default row processor */
!         conn->rowProcessor = pqStdRowProcessor;
!         conn->rowProcessorParam = conn;
!     }
! }
!
! /*
!  * PQgetRowProcessor
!  *      Get current row processor of PGconn.
!  *      If param is not NULL, also store the passthrough parameter at *param.
!  */
! PQrowProcessor
! PQgetRowProcessor(const PGconn *conn, void **param)
! {
!     if (!conn)
!     {
!         if (param)
!             *param = NULL;
!         return NULL;
!     }
!
!     if (param)
!         *param = conn->rowProcessorParam;
!     return conn->rowProcessor;
! }
!
! /*
!  * pqStdRowProcessor
!  *      Add the received row to the PGresult structure
!  *      Returns 1 if OK, -1 if error occurred.
   *
!  * Note: "param" should point to the PGconn, but we don't actually need that
!  * as of the current coding.
   */
! static int
! pqStdRowProcessor(PGresult *res, const PGdataValue *columns,
!                   const char **errmsgp, void *param)
  {
      int            nfields = res->numAttributes;
      PGresAttValue *tup;
      int            i;

!     if (columns == NULL)
      {
!         /* New result set ... we have nothing to do in this function. */
!         return 1;
      }

      /*
       * Basically we just allocate space in the PGresult for each field and
       * copy the data over.
       *
!      * Note: on malloc failure, we return -1 leaving *errmsgp still NULL,
!      * which caller will take to mean "out of memory".    This is preferable to
!      * trying to set up such a message here, because evidently there's not
!      * enough memory for gettext() to do anything.
       */
      tup = (PGresAttValue *)
          pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
      if (tup == NULL)
!         return -1;

      for (i = 0; i < nfields; i++)
      {
--- 993,1047 ----


  /*
!  * pqRowProcessor
!  *      Add the received row to the current async result (conn->result).
!  *      Returns 1 if OK, 0 if error occurred.
   *
!  * On error, *errmsgp can be set to an error string to be returned.
!  * If it is left NULL, the error is presumed to be "out of memory".
!  *
!  * In single-row mode, we create a new result holding just the current row,
!  * stashing the previous result in conn->next_result so that it becomes
!  * active again after pqPrepareAsyncResult().  This allows the result metadata
!  * (column descriptions) to be carried forward to each result row.
   */
! int
! pqRowProcessor(PGconn *conn, const char **errmsgp)
  {
+     PGresult   *res = conn->result;
      int            nfields = res->numAttributes;
+     const PGdataValue *columns = conn->rowBuf;
      PGresAttValue *tup;
      int            i;

!     /*
!      * In single-row mode, make a new PGresult that will hold just this one
!      * row; the original conn->result is left unchanged so that it can be used
!      * again as the template for future rows.
!      */
!     if (conn->singleRowMode)
      {
!         /* Copy everything that should be in the result at this point */
!         res = PQcopyResult(res,
!                            PG_COPYRES_ATTRS | PG_COPYRES_EVENTS |
!                            PG_COPYRES_NOTICEHOOKS);
!         if (!res)
!             return 0;
      }

      /*
       * Basically we just allocate space in the PGresult for each field and
       * copy the data over.
       *
!      * Note: on malloc failure, we return 0 leaving *errmsgp still NULL, which
!      * caller will take to mean "out of memory".  This is preferable to trying
!      * to set up such a message here, because evidently there's not enough
!      * memory for gettext() to do anything.
       */
      tup = (PGresAttValue *)
          pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE);
      if (tup == NULL)
!         goto fail;

      for (i = 0; i < nfields; i++)
      {
*************** pqStdRowProcessor(PGresult *res, const P
*** 1078,1084 ****

              val = (char *) pqResultAlloc(res, clen + 1, isbinary);
              if (val == NULL)
!                 return -1;

              /* copy and zero-terminate the data (even if it's binary) */
              memcpy(val, columns[i].value, clen);
--- 1060,1066 ----

              val = (char *) pqResultAlloc(res, clen + 1, isbinary);
              if (val == NULL)
!                 goto fail;

              /* copy and zero-terminate the data (even if it's binary) */
              memcpy(val, columns[i].value, clen);
*************** pqStdRowProcessor(PGresult *res, const P
*** 1091,1100 ****

      /* And add the tuple to the PGresult's tuple array */
      if (!pqAddTuple(res, tup))
!         return -1;

-     /* Success */
      return 1;
  }


--- 1073,1102 ----

      /* And add the tuple to the PGresult's tuple array */
      if (!pqAddTuple(res, tup))
!         goto fail;
!
!     /*
!      * Success.  In single-row mode, make the result available to the client
!      * immediately.
!      */
!     if (conn->singleRowMode)
!     {
!         /* Change result status to special single-row value */
!         res->resultStatus = PGRES_SINGLE_TUPLE;
!         /* Stash old result for re-use later */
!         conn->next_result = conn->result;
!         conn->result = res;
!         /* And mark the result ready to return */
!         conn->asyncStatus = PGASYNC_READY;
!     }

      return 1;
+
+ fail:
+     /* release locally allocated PGresult, if we made one */
+     if (res != conn->result)
+         PQclear(res);
+     return 0;
  }


*************** PQsendQueryStart(PGconn *conn)
*** 1343,1348 ****
--- 1345,1354 ----

      /* initialize async result-accumulation state */
      conn->result = NULL;
+     conn->next_result = NULL;
+
+     /* reset single-row processing mode */
+     conn->singleRowMode = false;

      /* ready to send command message */
      return true;
*************** pqHandleSendFailure(PGconn *conn)
*** 1548,1553 ****
--- 1554,1584 ----
  }

  /*
+  * Select row-by-row processing mode
+  */
+ int
+ PQsetSingleRowMode(PGconn *conn)
+ {
+     /*
+      * Only allow setting the flag when we have launched a query and not yet
+      * received any results.
+      */
+     if (!conn)
+         return 0;
+     if (conn->asyncStatus != PGASYNC_BUSY)
+         return 0;
+     if (conn->queryclass != PGQUERY_SIMPLE &&
+         conn->queryclass != PGQUERY_EXTENDED)
+         return 0;
+     if (conn->result)
+         return 0;
+
+     /* OK, set flag */
+     conn->singleRowMode = true;
+     return 1;
+ }
+
+ /*
   * Consume any available input from the backend
   * 0 return: some kind of trouble
   * 1 return: no problem
*************** PQconsumeInput(PGconn *conn)
*** 1587,1595 ****
   * parseInput: if appropriate, parse input data from backend
   * until input is exhausted or a stopping state is reached.
   * Note that this function will NOT attempt to read more data from the backend.
-  *
-  * Note: callers of parseInput must be prepared for a longjmp exit when we are
-  * in PGASYNC_BUSY state, since an external row processor might do that.
   */
  static void
  parseInput(PGconn *conn)
--- 1618,1623 ----
*************** PQgetResult(PGconn *conn)
*** 1737,1785 ****
      return res;
  }

- /*
-  * PQskipResult
-  *      Get the next PGresult produced by a query, but discard any data rows.
-  *
-  * This is mainly useful for cleaning up after a longjmp out of a row
-  * processor, when resuming processing of the current query result isn't
-  * wanted.    Note that this is of little value in an async-style application,
-  * since any preceding calls to PQisBusy would have already called the regular
-  * row processor.
-  */
- PGresult *
- PQskipResult(PGconn *conn)
- {
-     PGresult   *res;
-     PQrowProcessor savedRowProcessor;
-
-     if (!conn)
-         return NULL;
-
-     /* temporarily install dummy row processor */
-     savedRowProcessor = conn->rowProcessor;
-     conn->rowProcessor = dummyRowProcessor;
-     /* no need to save/change rowProcessorParam */
-
-     /* fetch the next result */
-     res = PQgetResult(conn);
-
-     /* restore previous row processor */
-     conn->rowProcessor = savedRowProcessor;
-
-     return res;
- }
-
- /*
-  * Do-nothing row processor for PQskipResult
-  */
- static int
- dummyRowProcessor(PGresult *res, const PGdataValue *columns,
-                   const char **errmsgp, void *param)
- {
-     return 1;
- }
-

  /*
   * PQexec
--- 1765,1770 ----
*************** PQexecStart(PGconn *conn)
*** 1886,1892 ****
       * Silently discard any prior query result that application didn't eat.
       * This is probably poor design, but it's here for backward compatibility.
       */
!     while ((result = PQskipResult(conn)) != NULL)
      {
          ExecStatusType resultStatus = result->resultStatus;

--- 1871,1877 ----
       * Silently discard any prior query result that application didn't eat.
       * This is probably poor design, but it's here for backward compatibility.
       */
!     while ((result = PQgetResult(conn)) != NULL)
      {
          ExecStatusType resultStatus = result->resultStatus;

diff --git a/src/interfaces/libpq/fe-lobj.c b/src/interfaces/libpq/fe-lobj.c
index 13fd98c2f913d3818758e75bac96822306981b52..f3a6d0341c13ce644a7f1b2f15919385b85d6a6e 100644
*** a/src/interfaces/libpq/fe-lobj.c
--- b/src/interfaces/libpq/fe-lobj.c
*************** lo_initialize(PGconn *conn)
*** 682,689 ****
      int            n;
      const char *query;
      const char *fname;
-     PQrowProcessor savedRowProcessor;
-     void       *savedRowProcessorParam;
      Oid            foid;

      if (!conn)
--- 682,687 ----
*************** lo_initialize(PGconn *conn)
*** 732,747 ****
              "or proname = 'loread' "
              "or proname = 'lowrite'";

-     /* Ensure the standard row processor is used to collect the result */
-     savedRowProcessor = conn->rowProcessor;
-     savedRowProcessorParam = conn->rowProcessorParam;
-     PQsetRowProcessor(conn, NULL, NULL);
-
      res = PQexec(conn, query);
-
-     conn->rowProcessor = savedRowProcessor;
-     conn->rowProcessorParam = savedRowProcessorParam;
-
      if (res == NULL)
      {
          free(lobjfuncs);
--- 730,736 ----
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 8dbd6b6982395850fc167eb20cb8d5590d33122f..1ba5885cd3b419a97f4fc5ea897eac209f364897 100644
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
*************** static int    getNotify(PGconn *conn);
*** 49,67 ****
  PostgresPollingStatusType
  pqSetenvPoll(PGconn *conn)
  {
-     PostgresPollingStatusType result;
      PGresult   *res;
-     PQrowProcessor savedRowProcessor;
-     void       *savedRowProcessorParam;

      if (conn == NULL || conn->status == CONNECTION_BAD)
          return PGRES_POLLING_FAILED;

-     /* Ensure the standard row processor is used to collect any results */
-     savedRowProcessor = conn->rowProcessor;
-     savedRowProcessorParam = conn->rowProcessorParam;
-     PQsetRowProcessor(conn, NULL, NULL);
-
      /* Check whether there are any data for us */
      switch (conn->setenv_state)
      {
--- 49,59 ----
*************** pqSetenvPoll(PGconn *conn)
*** 77,86 ****
                  if (n < 0)
                      goto error_return;
                  if (n == 0)
!                 {
!                     result = PGRES_POLLING_READING;
!                     goto normal_return;
!                 }

                  break;
              }
--- 69,75 ----
                  if (n < 0)
                      goto error_return;
                  if (n == 0)
!                     return PGRES_POLLING_READING;

                  break;
              }
*************** pqSetenvPoll(PGconn *conn)
*** 94,101 ****

              /* Should we raise an error if called when not active? */
          case SETENV_STATE_IDLE:
!             result = PGRES_POLLING_OK;
!             goto normal_return;

          default:
              printfPQExpBuffer(&conn->errorMessage,
--- 83,89 ----

              /* Should we raise an error if called when not active? */
          case SETENV_STATE_IDLE:
!             return PGRES_POLLING_OK;

          default:
              printfPQExpBuffer(&conn->errorMessage,
*************** pqSetenvPoll(PGconn *conn)
*** 192,201 ****
              case SETENV_STATE_CLIENT_ENCODING_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

--- 180,186 ----
              case SETENV_STATE_CLIENT_ENCODING_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 220,229 ****
              case SETENV_STATE_OPTION_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

--- 205,211 ----
              case SETENV_STATE_OPTION_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 262,278 ****
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
!                     result = PGRES_POLLING_READING;
!                     goto normal_return;
                  }

              case SETENV_STATE_QUERY1_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

--- 244,256 ----
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY1_WAIT;
!                     return PGRES_POLLING_READING;
                  }

              case SETENV_STATE_QUERY1_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 349,365 ****
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
!                     result = PGRES_POLLING_READING;
!                     goto normal_return;
                  }

              case SETENV_STATE_QUERY2_WAIT:
                  {
                      if (PQisBusy(conn))
!                     {
!                         result = PGRES_POLLING_READING;
!                         goto normal_return;
!                     }

                      res = PQgetResult(conn);

--- 327,339 ----
                          goto error_return;

                      conn->setenv_state = SETENV_STATE_QUERY2_WAIT;
!                     return PGRES_POLLING_READING;
                  }

              case SETENV_STATE_QUERY2_WAIT:
                  {
                      if (PQisBusy(conn))
!                         return PGRES_POLLING_READING;

                      res = PQgetResult(conn);

*************** pqSetenvPoll(PGconn *conn)
*** 406,413 ****
                      {
                          /* Query finished, so we're done */
                          conn->setenv_state = SETENV_STATE_IDLE;
!                         result = PGRES_POLLING_OK;
!                         goto normal_return;
                      }
                      break;
                  }
--- 380,386 ----
                      {
                          /* Query finished, so we're done */
                          conn->setenv_state = SETENV_STATE_IDLE;
!                         return PGRES_POLLING_OK;
                      }
                      break;
                  }
*************** pqSetenvPoll(PGconn *conn)
*** 425,436 ****

  error_return:
      conn->setenv_state = SETENV_STATE_IDLE;
!     result = PGRES_POLLING_FAILED;
!
! normal_return:
!     conn->rowProcessor = savedRowProcessor;
!     conn->rowProcessorParam = savedRowProcessorParam;
!     return result;
  }


--- 398,404 ----

  error_return:
      conn->setenv_state = SETENV_STATE_IDLE;
!     return PGRES_POLLING_FAILED;
  }


*************** normal_return:
*** 438,446 ****
   * parseInput: if appropriate, parse input data from backend
   * until input is exhausted or a stopping state is reached.
   * Note that this function will NOT attempt to read more data from the backend.
-  *
-  * Note: callers of parseInput must be prepared for a longjmp exit when we are
-  * in PGASYNC_BUSY state, since an external row processor might do that.
   */
  void
  pqParseInput2(PGconn *conn)
--- 406,411 ----
*************** getRowDescriptions(PGconn *conn)
*** 746,776 ****
      /* Success! */
      conn->result = result;

!     /*
!      * Advance inStart to show that the "T" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
      conn->inStart = conn->inCursor;

!     /* Give the row processor a chance to initialize for new result set */
!     errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, NULL, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;

!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;

  advance_and_error:

--- 711,726 ----
      /* Success! */
      conn->result = result;

!     /* Advance inStart to show that the "T" message has been processed. */
      conn->inStart = conn->inCursor;

!     /*
!      * We could perform additional setup for the new result set here, but for
!      * now there's nothing else to do.
!      */

!     /* And we're done. */
!     return 0;

  advance_and_error:

*************** advance_and_error:
*** 781,788 ****
       */
      conn->inStart = conn->inEnd;

- set_error_result:
-
      /*
       * Replace partially constructed result with an error result. First
       * discard the old result to try to win back some memory.
--- 731,736 ----
*************** set_error_result:
*** 790,796 ****
      pqClearAsyncResult(conn);

      /*
!      * If row processor didn't provide an error message, assume "out of
       * memory" was meant.  The advantage of having this special case is that
       * freeing the old result first greatly improves the odds that gettext()
       * will succeed in providing a translation.
--- 738,744 ----
      pqClearAsyncResult(conn);

      /*
!      * If preceding code didn't provide an error message, assume "out of
       * memory" was meant.  The advantage of having this special case is that
       * freeing the old result first greatly improves the odds that gettext()
       * will succeed in providing a translation.
*************** getAnotherTuple(PGconn *conn, bool binar
*** 937,967 ****
          free(bitmap);
      bitmap = NULL;

!     /*
!      * Advance inStart to show that the "D" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
      conn->inStart = conn->inCursor;

!     /* Pass the completed row values to rowProcessor */
      errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;

!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;

  advance_and_error:

--- 885,899 ----
          free(bitmap);
      bitmap = NULL;

!     /* Advance inStart to show that the "D" message has been processed. */
      conn->inStart = conn->inCursor;

!     /* Process the collected row */
      errmsg = NULL;
!     if (pqRowProcessor(conn, &errmsg))
!         return 0;                /* normal, successful exit */

!     goto set_error_result;        /* pqRowProcessor failed, report it */

  advance_and_error:

*************** set_error_result:
*** 981,987 ****
      pqClearAsyncResult(conn);

      /*
!      * If row processor didn't provide an error message, assume "out of
       * memory" was meant.  The advantage of having this special case is that
       * freeing the old result first greatly improves the odds that gettext()
       * will succeed in providing a translation.
--- 913,919 ----
      pqClearAsyncResult(conn);

      /*
!      * If preceding code didn't provide an error message, assume "out of
       * memory" was meant.  The advantage of having this special case is that
       * freeing the old result first greatly improves the odds that gettext()
       * will succeed in providing a translation.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 173af2e0a79ef3b443de515cda851e277deaed2d..d289f82285fea00d5de20542e43ea103493f9e58 100644
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
*************** static int build_startup_packet(const PG
*** 61,69 ****
   * parseInput: if appropriate, parse input data from backend
   * until input is exhausted or a stopping state is reached.
   * Note that this function will NOT attempt to read more data from the backend.
-  *
-  * Note: callers of parseInput must be prepared for a longjmp exit when we are
-  * in PGASYNC_BUSY state, since an external row processor might do that.
   */
  void
  pqParseInput3(PGconn *conn)
--- 61,66 ----
*************** handleSyncLoss(PGconn *conn, char id, in
*** 446,455 ****
   * Returns: 0 if processed message successfully, EOF to suspend parsing
   * (the latter case is not actually used currently).
   * In either case, conn->inStart has been advanced past the message.
-  *
-  * Note: the row processor could also choose to longjmp out of libpq,
-  * in which case the library's state must allow for resumption at the
-  * next message.
   */
  static int
  getRowDescriptions(PGconn *conn, int msgLength)
--- 443,448 ----
*************** getRowDescriptions(PGconn *conn, int msg
*** 564,573 ****
      /* Success! */
      conn->result = result;

!     /*
!      * Advance inStart to show that the "T" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
      conn->inStart = conn->inCursor;

      /*
--- 557,563 ----
      /* Success! */
      conn->result = result;

!     /* Advance inStart to show that the "T" message has been processed. */
      conn->inStart = conn->inCursor;

      /*
*************** getRowDescriptions(PGconn *conn, int msg
*** 580,604 ****
          return 0;
      }

!     /* Give the row processor a chance to initialize for new result set */
!     errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, NULL, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;

!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;

  advance_and_error:
      /* Discard unsaved result, if any */
--- 570,582 ----
          return 0;
      }

!     /*
!      * We could perform additional setup for the new result set here, but for
!      * now there's nothing else to do.
!      */

!     /* And we're done. */
!     return 0;

  advance_and_error:
      /* Discard unsaved result, if any */
*************** advance_and_error:
*** 608,615 ****
      /* Discard the failed message by pretending we read it */
      conn->inStart += 5 + msgLength;

- set_error_result:
-
      /*
       * Replace partially constructed result with an error result. First
       * discard the old result to try to win back some memory.
--- 586,591 ----
*************** set_error_result:
*** 617,624 ****
      pqClearAsyncResult(conn);

      /*
!      * If row processor didn't provide an error message, assume "out of
!      * memory" was meant.
       */
      if (!errmsg)
          errmsg = libpq_gettext("out of memory for query result");
--- 593,602 ----
      pqClearAsyncResult(conn);

      /*
!      * If preceding code didn't provide an error message, assume "out of
!      * memory" was meant.  The advantage of having this special case is that
!      * freeing the old result first greatly improves the odds that gettext()
!      * will succeed in providing a translation.
       */
      if (!errmsg)
          errmsg = libpq_gettext("out of memory for query result");
*************** failure:
*** 695,704 ****
   * Returns: 0 if processed message successfully, EOF to suspend parsing
   * (the latter case is not actually used currently).
   * In either case, conn->inStart has been advanced past the message.
-  *
-  * Note: the row processor could also choose to longjmp out of libpq,
-  * in which case the library's state must allow for resumption at the
-  * next message.
   */
  static int
  getAnotherTuple(PGconn *conn, int msgLength)
--- 673,678 ----
*************** getAnotherTuple(PGconn *conn, int msgLen
*** 778,808 ****
          goto advance_and_error;
      }

!     /*
!      * Advance inStart to show that the "D" message has been processed.  We
!      * must do this before calling the row processor, in case it longjmps.
!      */
      conn->inStart = conn->inCursor;

!     /* Pass the completed row values to rowProcessor */
      errmsg = NULL;
!     switch ((*conn->rowProcessor) (result, rowbuf, &errmsg,
!                                    conn->rowProcessorParam))
!     {
!         case 1:
!             /* everything is good */
!             return 0;
!
!         case -1:
!             /* error, report the errmsg below */
!             break;

!         default:
!             /* unrecognized return code */
!             errmsg = libpq_gettext("unrecognized return value from row processor");
!             break;
!     }
!     goto set_error_result;

  advance_and_error:
      /* Discard the failed message by pretending we read it */
--- 752,766 ----
          goto advance_and_error;
      }

!     /* Advance inStart to show that the "D" message has been processed. */
      conn->inStart = conn->inCursor;

!     /* Process the collected row */
      errmsg = NULL;
!     if (pqRowProcessor(conn, &errmsg))
!         return 0;                /* normal, successful exit */

!     goto set_error_result;        /* pqRowProcessor failed, report it */

  advance_and_error:
      /* Discard the failed message by pretending we read it */
*************** set_error_result:
*** 817,823 ****
      pqClearAsyncResult(conn);

      /*
!      * If row processor didn't provide an error message, assume "out of
       * memory" was meant.  The advantage of having this special case is that
       * freeing the old result first greatly improves the odds that gettext()
       * will succeed in providing a translation.
--- 775,781 ----
      pqClearAsyncResult(conn);

      /*
!      * If preceding code didn't provide an error message, assume "out of
       * memory" was meant.  The advantage of having this special case is that
       * freeing the old result first greatly improves the odds that gettext()
       * will succeed in providing a translation.
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 67db6119bbaa35ae31fb58bb902a455b093ea23f..9d05dd20605a84ab4c4ec9d61ef8697fb2f3b77e 100644
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
*************** typedef enum
*** 90,96 ****
                                   * backend */
      PGRES_NONFATAL_ERROR,        /* notice or warning message */
      PGRES_FATAL_ERROR,            /* query failed */
!     PGRES_COPY_BOTH                /* Copy In/Out data transfer in progress */
  } ExecStatusType;

  typedef enum
--- 90,97 ----
                                   * backend */
      PGRES_NONFATAL_ERROR,        /* notice or warning message */
      PGRES_FATAL_ERROR,            /* query failed */
!     PGRES_COPY_BOTH,            /* Copy In/Out data transfer in progress */
!     PGRES_SINGLE_TUPLE            /* single tuple from larger resultset */
  } ExecStatusType;

  typedef enum
*************** typedef struct pg_conn PGconn;
*** 129,145 ****
   */
  typedef struct pg_result PGresult;

- /* PGdataValue represents a data field value being passed to a row processor.
-  * It could be either text or binary data; text data is not zero-terminated.
-  * A SQL NULL is represented by len < 0; then value is still valid but there
-  * are no data bytes there.
-  */
- typedef struct pgDataValue
- {
-     int            len;            /* data length in bytes, or <0 if NULL */
-     const char *value;            /* data value, without zero-termination */
- } PGdataValue;
-
  /* PGcancel encapsulates the information needed to cancel a running
   * query on an existing connection.
   * The contents of this struct are not supposed to be known to applications.
--- 130,135 ----
*************** typedef struct pgNotify
*** 161,170 ****
      struct pgNotify *next;        /* list link */
  } PGnotify;

- /* Function type for row-processor callback */
- typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
-                                            const char **errmsgp, void *param);
-
  /* Function types for notice-handling callbacks */
  typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);
  typedef void (*PQnoticeProcessor) (void *arg, const char *message);
--- 151,156 ----
*************** extern int PQsendQueryPrepared(PGconn *c
*** 403,419 ****
                      const int *paramLengths,
                      const int *paramFormats,
                      int resultFormat);
  extern PGresult *PQgetResult(PGconn *conn);
- extern PGresult *PQskipResult(PGconn *conn);

  /* Routines for managing an asynchronous query */
  extern int    PQisBusy(PGconn *conn);
  extern int    PQconsumeInput(PGconn *conn);

- /* Override default per-row processing */
- extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
- extern PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
-
  /* LISTEN/NOTIFY support */
  extern PGnotify *PQnotifies(PGconn *conn);

--- 389,401 ----
                      const int *paramLengths,
                      const int *paramFormats,
                      int resultFormat);
+ extern int    PQsetSingleRowMode(PGconn *conn);
  extern PGresult *PQgetResult(PGconn *conn);

  /* Routines for managing an asynchronous query */
  extern int    PQisBusy(PGconn *conn);
  extern int    PQconsumeInput(PGconn *conn);

  /* LISTEN/NOTIFY support */
  extern PGnotify *PQnotifies(PGconn *conn);

diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4bc89269fababe5e4d8ecbf6e80ca1a8625d4bd5..2bac59c3d879ecabce42ceab3b5133df03a0886a 100644
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
*************** typedef struct pgLobjfuncs
*** 277,282 ****
--- 277,293 ----
      Oid            fn_lo_write;    /* OID of backend function LOwrite        */
  } PGlobjfuncs;

+ /* PGdataValue represents a data field value being passed to a row processor.
+  * It could be either text or binary data; text data is not zero-terminated.
+  * A SQL NULL is represented by len < 0; then value is still valid but there
+  * are no data bytes there.
+  */
+ typedef struct pgDataValue
+ {
+     int            len;            /* data length in bytes, or <0 if NULL */
+     const char *value;            /* data value, without zero-termination */
+ } PGdataValue;
+
  /*
   * PGconn stores all the state data associated with a single connection
   * to a backend.
*************** struct pg_conn
*** 324,333 ****
      /* Optional file to write trace info to */
      FILE       *Pfdebug;

-     /* Callback procedure for per-row processing */
-     PQrowProcessor rowProcessor;    /* function pointer */
-     void       *rowProcessorParam;        /* passthrough argument */
-
      /* Callback procedures for notice message processing */
      PGNoticeHooks noticeHooks;

--- 335,340 ----
*************** struct pg_conn
*** 346,351 ****
--- 353,359 ----
      bool        options_valid;    /* true if OK to attempt connection */
      bool        nonblocking;    /* whether this connection is using nonblock
                                   * sending semantics */
+     bool        singleRowMode;    /* return current query result row-by-row? */
      char        copy_is_binary; /* 1 = copy binary, 0 = copy text */
      int            copy_already_done;        /* # bytes already returned in COPY
                                           * OUT */
*************** struct pg_conn
*** 406,411 ****
--- 414,420 ----

      /* Status for asynchronous result construction */
      PGresult   *result;            /* result being constructed */
+     PGresult   *next_result;    /* next result (used in single-row mode) */

      /* Assorted state for SSL, GSS, etc */

*************** extern void pqSaveMessageField(PGresult
*** 517,522 ****
--- 526,532 ----
                     const char *value);
  extern void pqSaveParameterStatus(PGconn *conn, const char *name,
                        const char *value);
+ extern int    pqRowProcessor(PGconn *conn, const char **errmsgp);
  extern void pqHandleSendFailure(PGconn *conn);

  /* === in fe-protocol2.c === */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Marko Kreen
Дата:
Сообщение: Re: [patch] libpq one-row-at-a-time API
Следующее
От: "Etsuro Fujita"
Дата:
Сообщение: WIP Patch: Use sortedness of CSV foreign tables for query planning