Revised nonblocking patches + quasi docs

Поиск
Список
Период
Сортировка
От Alfred Perlstein
Тема Revised nonblocking patches + quasi docs
Дата
Msg-id 20000114111430.A824@fw.wintelcom.net
обсуждение исходный текст
Ответ на Re: [HACKERS] LIBPQ patches ...  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [HACKERS] Revised nonblocking patches + quasi docs  (admin <admin@wtbwts.com>)
Список pgsql-hackers
* Tom Lane <tgl@sss.pgh.pa.us> [000109 08:18] wrote:
> Don Baccus <dhogaza@pacifier.com> writes:
> > At 05:27 PM 1/8/00 -0500, Tom Lane wrote:
> >> I also object strongly to the lack of documentation.
> 
> > ... I know there are some folks who aren't native-english speakers, so
> > perhaps you don't want to require that the implementor of such patches
> > provide the final documentation wording.  But the information should
> > be there and spelled out in a form that can be very easily moved to
> > the docs.
> 
> Oh, absolutely.  Thomas, our master of the docs, has always had the
> policy of "give me some words, I'll take care of formatting and
> editing..."
> 
> I was probably too harsh on Alfred last night, since in fact his code
> was fairly well commented, and some minimal doco could have been
> extracted from the routine headers.  But on a change like this, I think
> some paragraphs of coherent high-level explanation are needed: what it
> does, when and why you'd use it.  I didn't see that anywhere...

I've actually been trying to work on the sgml and failing miserably,
I have no clue how this stuff works (sgml compilation) are you asking
for a couple of paragraphs that describe the proposed changes?

If so I hope this suffices, if not some help on building the sgml
would be much appreciated:

--------

Summary:

First and foremost, great pains have been taken so that there 
are _no_ compatibility issues.

If a 6.5.3 libpq program should not behave any differently
with this patches in place, all they do is offer a step closer
to a truly non-blocking connection to the backend and address
some issues with non-blocking connections.

----

Added functions:

int PQisnonblocking(static PGconn *conn);
 returns whether or not the socket is in blocking mode, however... it doesn't actually check the socket flags, it
relieson the user to call 'PQsetnonblocking()' to keep the internal state of libpq sane.  users should no longer use
'PQsocket()'to retrieve the socket and 'manually' ioctl/fcntl it to non-blocking
 
 returns TRUE if the socket has been set to blocking more, FALSE if the socket is blocking

int PQflush(PGconn *conn);
 flush the send-queue to the backend, just make this visible to the user for convience, as the internal function works,
0for success, EOF for any failure.
 

int PQsetnonblocking(PGconn *conn, int arg);
 actually set the connection to the backend to blocking or non-blocking arg should be set to TRUE to set the connection
tonon-blocking or FALSE to set it blocking.
 
 there's an implied blocking flush of the send-queue which is really ok as the user is either 'going into' or
'returningfrom' a blocking state
 
 returns 0 for success, -1 for failure

---

New functionality:
 PQsetblocking() allows libpq to know what behavior the user really wants, the user will not block sending data to the
backend,potentially if i had a constant stream of data and was doing a COPYIN it'd never finish because unless the
backendlost the connection I would block while sending until the backend can take more data.
 

---

Implementation changes:
 none should be visible to programs based on 6.5.3's libpq.
 programs based on later versions of libpq will notice that the non-blocking connection functions will set the state of
theconnection to non-blocking automatically.
 
 when the connection is set non-blocking pqFlush() will not block if the sendqueue would be filled by new data inserted
intothe the queue.
 
 functions that poll for data from the backend implicitly _try_ flush the send queue if set to non-blocking.  This
allowsthe polling to act as a context for pushing queued data to the backend.
 

---

Problems:
 We need some sort of send-queue commit reservations so that there's no chance of us sending a partial queury down the
pipeto the backend, right now this is hacked around by potentially blocking in non-blocking mode if something 'goes
terriblywrong' I plan to fix this.
 

---

Quirks:
 PQexec() assumes the caller wants blocking behavior and will set the connection to blocking for the duration of the
PQexec()call, it will then restore it
 

---

Internal changes:
 new field in PGconn 'int nonblocking' set to 1 if the connection is  nonblocking, 0 if blocking (default)
 macro pqIsnonblocking(PGconn) to avoid a function call to check blocking status (only visible if libpq-int.h is
included)
 the internal function connectMakeNonblocking() has been replaced with PQsetblocking()
 restart a send if EINTR is reported during a flush.

---

Lastly:
 This is work in progress, I will be working towards making libpq better at not blocking.

here are the diffs:

Index: fe-connect.c
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-connect.c,v
retrieving revision 1.109
diff -u -c -IHeader -I$Id:  -r1.109 fe-connect.c
cvs diff: conflicting specifications of output style
*** fe-connect.c    2000/01/14 05:33:15    1.109
--- fe-connect.c    2000/01/14 18:36:54
***************
*** 594,624 ****     return 0; } 
- 
- /* ----------
-  * connectMakeNonblocking -
-  * Make a connection non-blocking.
-  * Returns 1 if successful, 0 if not.
-  * ----------
-  */
- static int
- connectMakeNonblocking(PGconn *conn)
- {
- #ifndef WIN32
-     if (fcntl(conn->sock, F_SETFL, O_NONBLOCK) < 0)
- #else
-     if (ioctlsocket(conn->sock, FIONBIO, &on) != 0)
- #endif
-     {
-         printfPQExpBuffer(&conn->errorMessage,
-                           "connectMakeNonblocking -- fcntl() failed: errno=%d\n%s\n",
-                           errno, strerror(errno));
-         return 0;
-     }
- 
-     return 1;
- }
-  /* ----------  * connectNoDelay -  * Sets the TCP_NODELAY socket option.
--- 594,599 ----
***************
*** 789,795 ****      *   Ewan Mellor <eem21@cam.ac.uk>.      * ---------- */ #if (!defined(WIN32) ||
defined(WIN32_NON_BLOCKING_CONNECTIONS))&& !defined(USE_SSL)
 
!     if (!connectMakeNonblocking(conn))         goto connect_errReturn; #endif     
--- 764,770 ----      *   Ewan Mellor <eem21@cam.ac.uk>.      * ---------- */ #if (!defined(WIN32) ||
defined(WIN32_NON_BLOCKING_CONNECTIONS))&& !defined(USE_SSL)
 
!     if (PQsetnonblocking(conn, TRUE) != 0)         goto connect_errReturn; #endif     
***************
*** 898,904 ****     /* This makes the connection non-blocking, for all those cases which forced us        not to do it
above.*/ #if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
 
!     if (!connectMakeNonblocking(conn))         goto connect_errReturn; #endif     
--- 873,879 ----     /* This makes the connection non-blocking, for all those cases which forced us        not to do it
above.*/ #if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
 
!     if (PQsetnonblocking(conn, TRUE) != 0)         goto connect_errReturn; #endif     
***************
*** 1702,1707 ****
--- 1677,1683 ----     conn->inBuffer = (char *) malloc(conn->inBufSize);     conn->outBufSize = 8 * 1024;
conn->outBuffer= (char *) malloc(conn->outBufSize);
 
+     conn->nonblocking = FALSE;     initPQExpBuffer(&conn->errorMessage);     initPQExpBuffer(&conn->workBuffer);
if(conn->inBuffer == NULL ||
 
***************
*** 1812,1817 ****
--- 1788,1794 ----     conn->lobjfuncs = NULL;     conn->inStart = conn->inCursor = conn->inEnd = 0;     conn->outCount
=0;
 
+     conn->nonblocking = FALSE;  } 
Index: fe-exec.c
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-exec.c,v
retrieving revision 1.86
diff -u -c -IHeader -I$Id:  -r1.86 fe-exec.c
cvs diff: conflicting specifications of output style
*** fe-exec.c    1999/11/11 00:10:14    1.86
--- fe-exec.c    2000/01/14 22:47:07
***************
*** 13,18 ****
--- 13,19 ----  */ #include <errno.h> #include <ctype.h>
+ #include <fcntl.h>  #include "postgres.h" #include "libpq-fe.h"
***************
*** 24,30 **** #include <unistd.h> #endif 
-  /* keep this in same order as ExecStatusType in libpq-fe.h */ const char *const pgresStatus[] = {
"PGRES_EMPTY_QUERY",
--- 25,30 ----
***************
*** 514,526 ****     conn->curTuple = NULL;      /* send the query to the backend; */
!     /* the frontend-backend protocol uses 'Q' to designate queries */
!     if (pqPutnchar("Q", 1, conn) ||
!         pqPuts(query, conn) ||
!         pqFlush(conn))     {
!         handleSendFailure(conn);
!         return 0;     }      /* OK, it's launched! */
--- 514,566 ----     conn->curTuple = NULL;      /* send the query to the backend; */
! 
!     /*
!      * in order to guarantee that we don't send a partial query 
!      * where we would become out of sync with the backend and/or
!      * block during a non-blocking connection we must first flush
!      * the send buffer before sending more data
!      *
!      * an alternative is to implement 'queue reservations' where
!      * we are able to roll up a transaction 
!      * (the 'Q' along with our query) and make sure we have
!      * enough space for it all in the send buffer.
!      */
!     if (pqIsnonblocking(conn))     {
!         /*
!          * the buffer must have emptied completely before we allow
!          * a new query to be buffered
!          */
!         if (pqFlush(conn))
!             return 0;
!         /* 'Q' == queries */
!         /* XXX: if we fail here we really ought to not block */
!         if (pqPutnchar("Q", 1, conn) ||
!             pqPuts(query, conn))
!         {
!             handleSendFailure(conn);    
!             return 0;
!         }
!         /*
!          * give the data a push, ignore the return value as
!          * ConsumeInput() will do any aditional flushing if needed
!          */
!         (void) pqFlush(conn);    
!     }
!     else
!     {
!         /* 
!          * the frontend-backend protocol uses 'Q' to 
!          * designate queries 
!          */
!         if (pqPutnchar("Q", 1, conn) ||
!             pqPuts(query, conn) ||
!             pqFlush(conn))
!         {
!             handleSendFailure(conn);
!             return 0;
!         }     }      /* OK, it's launched! */
***************
*** 574,580 ****
--- 614,630 ----      * we will NOT block waiting for more input.      */     if (pqReadData(conn) < 0)
+     {
+         /*
+          * for non-blocking connections
+          * try to flush the send-queue otherwise we may never get a 
+          * responce for something that may not have already been sent
+          * because it's in our write buffer!
+          */
+         if (pqIsnonblocking(conn))
+             (void) pqFlush(conn);         return 0;
+     }     /* Parsing of the data waits till later. */     return 1; }
***************
*** 1088,1093 ****
--- 1138,1153 ---- {     PGresult   *result;     PGresult   *lastResult;
+     bool    savedblocking;
+ 
+     /*
+      * we assume anyone calling PQexec wants blocking behaviour,
+      * we force the blocking status of the connection to blocking
+      * for the duration of this function and restore it on return
+      */
+     savedblocking = pqIsnonblocking(conn);
+     if (PQsetnonblocking(conn, FALSE) == -1)
+         return NULL;      /*      * Silently discard any prior query result that application didn't
***************
*** 1102,1115 ****             PQclear(result);             printfPQExpBuffer(&conn->errorMessage,
"PQexec:you gotta get out of a COPY state yourself.\n");
 
!             return NULL;         }         PQclear(result);     }      /* OK to send the message */     if
(!PQsendQuery(conn,query))
 
!         return NULL;      /*      * For backwards compatibility, return the last result if there are
--- 1162,1176 ----             PQclear(result);             printfPQExpBuffer(&conn->errorMessage,
"PQexec:you gotta get out of a COPY state yourself.\n");
 
!             /* restore blocking status */
!             goto errout;         }         PQclear(result);     }      /* OK to send the message */     if
(!PQsendQuery(conn,query))
 
!         goto errout;    /* restore blocking status */      /*      * For backwards compatibility, return the last
resultif there are
 
***************
*** 1142,1148 ****
--- 1203,1217 ----             result->resultStatus == PGRES_COPY_OUT)             break;     }
+ 
+     if (PQsetnonblocking(conn, savedblocking) == -1)
+         return NULL;     return lastResult;
+ 
+ errout:
+     if (PQsetnonblocking(conn, savedblocking) == -1)
+         return NULL;
+     return NULL; }  
***************
*** 1431,1438 ****              "PQendcopy() -- I don't think there's a copy in progress.\n");         return 1;     }

!     (void) pqFlush(conn);        /* make sure no data is waiting to be sent */      /* Return to active duty */
conn->asyncStatus= PGASYNC_BUSY;
 
--- 1500,1516 ----              "PQendcopy() -- I don't think there's a copy in progress.\n");         return 1;     }
+ 
+     /*
+      * make sure no data is waiting to be sent, 
+      * abort if we are non-blocking and the flush fails
+      */
+     if (pqFlush(conn) && pqIsnonblocking(conn))
+         return (1); 
!     /* non blocking connections may have to abort at this point. */
!     if (pqIsnonblocking(conn) && PQisBusy(conn))
!         return (1);      /* Return to active duty */     conn->asyncStatus = PGASYNC_BUSY;
***************
*** 2025,2028 ****
--- 2103,2192 ----         return 1;     else         return 0;
+ }
+ 
+ /* PQsetnonblocking:
+      sets the PGconn's database connection non-blocking if the arg is TRUE
+      or makes it non-blocking if the arg is FALSE, this will not protect
+      you from PQexec(), you'll only be safe when using the non-blocking
+      API
+      Needs to be called only on a connected database connection.
+ */
+ 
+ int
+ PQsetnonblocking(PGconn *conn, int arg)
+ {
+     int    fcntlarg;
+ 
+     arg = (arg == TRUE) ? 1 : 0;
+     /* early out if the socket is already in the state requested */
+     if (arg == conn->nonblocking)
+         return (0);
+ 
+     /*
+      * to guarantee constancy for flushing/query/result-polling behavior
+      * we need to flush the send queue at this point in order to guarantee
+      * proper behavior.
+      * this is ok because either they are making a transition
+      *  _from_ or _to_ blocking mode, either way we can block them.
+      */
+     /* if we are going from blocking to non-blocking flush here */
+     if (!pqIsnonblocking(conn) && pqFlush(conn))
+         return (-1);
+ 
+ 
+ #ifdef USE_SSL
+     if (conn->ssl)
+     {
+         printfPQExpBuffer(&conn->errorMessage,
+             "PQsetnonblocking() -- not supported when using SSL\n");
+         return (-1);
+     }
+ #endif /* USE_SSL */
+ 
+ #ifndef WIN32
+     fcntlarg = fcntl(conn->sock, F_GETFL, 0);
+     if (fcntlarg == -1)
+         return (-1);
+ 
+     if ((arg == TRUE && 
+         fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
+         (arg == FALSE &&
+         fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1)) 
+ #else
+     fcntlarg = arg;
+     if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
+ #endif
+     {
+         printfPQExpBuffer(&conn->errorMessage,
+             "PQsetblocking() -- unable to set nonblocking status to %s\n",
+             arg == TRUE ? "TRUE" : "FALSE");
+         return (-1);
+     }
+ 
+     conn->nonblocking = arg;
+ 
+     /* if we are going from non-blocking to blocking flush here */
+     if (pqIsnonblocking(conn) && pqFlush(conn))
+         return (-1);
+ 
+     return (0);
+ }
+ 
+ /* return the blocking status of the database connection, TRUE == nonblocking,
+      FALSE == blocking
+ */
+ int
+ PQisnonblocking(const PGconn *conn)
+ {
+ 
+     return (pqIsnonblocking(conn));
+ }
+ 
+ /* try to force data out, really only useful for non-blocking users */
+ int
+ PQflush(PGconn *conn)
+ {
+ 
+     return (pqFlush(conn)); }
Index: fe-misc.c
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-misc.c,v
retrieving revision 1.33
diff -u -c -IHeader -I$Id:  -r1.33 fe-misc.c
cvs diff: conflicting specifications of output style
*** fe-misc.c    1999/11/30 03:08:19    1.33
--- fe-misc.c    2000/01/12 03:12:14
***************
*** 86,91 ****
--- 86,122 ---- {     size_t avail = Max(conn->outBufSize - conn->outCount, 0); 
+     /*
+      * if we are non-blocking and the send queue is too full to buffer this
+      * request then try to flush some and return an error 
+      */
+     if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
+     {
+         /*
+          * even if the flush failed we may still have written some
+          * data, recalculate the size of the send-queue relative
+          * to the amount we have to send, we may be able to queue it
+          * afterall even though it's not sent to the database it's
+          * ok, any routines that check the data coming from the
+          * database better call pqFlush() anyway.
+          */
+         if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
+         {
+             printfPQExpBuffer(&conn->errorMessage,
+                 "pqPutBytes --  pqFlush couldn't flush enough"
+                 " data: space available: %d, space needed %d\n",
+                 Max(conn->outBufSize - conn->outCount, 0), nbytes);
+             return EOF;
+         }
+     }
+ 
+     /* 
+      * is the amount of data to be sent is larger than the size of the
+      * output buffer then we must flush it to make more room.
+      *
+      * the code above will make sure the loop conditional is never 
+      * true for non-blocking connections
+      */     while (nbytes > avail)     {         memcpy(conn->outBuffer + conn->outCount, s, avail);
***************
*** 548,553 ****
--- 579,592 ----         return EOF;     } 
+     /* 
+      * don't try to send zero data, allows us to use this function
+      * without too much worry about overhead
+      */
+     if (len == 0)
+         return (0);
+ 
+     /* while there's still data to send */     while (len > 0)     {         /* Prevent being SIGPIPEd if backend has
closedthe connection. */
 
***************
*** 556,561 ****
--- 595,601 ---- #endif          int sent;
+  #ifdef USE_SSL         if (conn->ssl)            sent = SSL_write(conn->ssl, ptr, len);
***************
*** 585,590 ****
--- 625,632 ----                 case EWOULDBLOCK:                     break; #endif
+                 case EINTR:
+                     continue;                  case EPIPE: #ifdef ECONNRESET
***************
*** 616,628 ****             ptr += sent;             len -= sent;         }         if (len > 0)         {
/*We didn't send it all, wait till we can send more */ 
 
-             /* At first glance this looks as though it should block.  I think
-              * that it will be OK though, as long as the socket is
-              * non-blocking. */             if (pqWait(FALSE, TRUE, conn))                 return EOF;         }
--- 658,688 ----             ptr += sent;             len -= sent;         }
+          if (len > 0)         {             /* We didn't send it all, wait till we can send more */
+ 
+             /* 
+              * if the socket is in non-blocking mode we may need
+              * to abort here 
+              */
+ #ifdef USE_SSL
+             /* can't do anything for our SSL users yet */
+             if (conn->ssl == NULL)
+             {
+ #endif
+                 if (pqIsnonblocking(conn))
+                 {
+                     /* shift the contents of the buffer */
+                     memmove(conn->outBuffer, ptr, len);
+                     conn->outCount = len;
+                     return EOF;
+                 }
+ #ifdef USE_SSL
+             }
+ #endif              if (pqWait(FALSE, TRUE, conn))                 return EOF;         }
Index: libpq-fe.h
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-fe.h,v
retrieving revision 1.54
diff -u -c -IHeader -I$Id:  -r1.54 libpq-fe.h
cvs diff: conflicting specifications of output style
*** libpq-fe.h    2000/01/14 05:33:15    1.54
--- libpq-fe.h    2000/01/14 22:45:33
***************
*** 261,266 ****
--- 261,273 ----     extern int    PQgetlineAsync(PGconn *conn, char *buffer, int bufsize);     extern int
PQputnbytes(PGconn*conn, const char *buffer, int nbytes);     extern int    PQendcopy(PGconn *conn);
 
+ 
+     /* Set blocking/nonblocking connection to the backend */
+     extern int    PQsetnonblocking(PGconn *conn, int arg);
+     extern int    PQisnonblocking(const PGconn *conn);
+ 
+     /* Force the write buffer to be written (or at least try) */
+     extern int    PQflush(PGconn *conn);      /*      * "Fast path" interface --- not really recommended for
application
Index: libpq-int.h
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-int.h,v
retrieving revision 1.15
diff -u -c -IHeader -I$Id:  -r1.15 libpq-int.h
cvs diff: conflicting specifications of output style
*** libpq-int.h    2000/01/14 05:33:15    1.15
--- libpq-int.h    2000/01/14 18:32:51
***************
*** 214,219 ****
--- 214,222 ----     int            inEnd;            /* offset to first position after avail
      * data */ 
 
+     int            nonblocking;    /* whether this connection is using a blocking
+                                  * socket to the backend or not */
+      /* Buffer for data not yet sent to backend */     char       *outBuffer;        /* currently allocated buffer */
   int            outBufSize;        /* allocated size of buffer */
 
***************
*** 297,301 ****
--- 300,310 ---- #define strerror(A) (sys_errlist[(A)]) #endif     /* sunos4 */ #endif     /* !strerror */
+ 
+ /* 
+  * this is so that we can check is a connection is non-blocking internally
+  * without the overhead of a function call
+  */
+ #define pqIsnonblocking(conn)    (conn->nonblocking)  #endif     /* LIBPQ_INT_H */


on a side note miscadmin.h causes problems on FreeBSD because it uses
pid_t without having included sys/types.h

thanks!

-- 
-Alfred Perlstein - [bright@wintelcom.net|alfred@freebsd.org]


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Don Baccus
Дата:
Сообщение: Re: [HACKERS] [hackers]development suggestion needed
Следующее
От: admin
Дата:
Сообщение: Re: [HACKERS] Revised nonblocking patches + quasi docs