Re: PATCH: pgbench - option to build using ppoll() for larger connection counts

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: PATCH: pgbench - option to build using ppoll() for larger connection counts
Дата
Msg-id 18803.1537661494@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: PATCH: pgbench - option to build using ppoll() for larger connection counts  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
I wrote:
> I'm strongly tempted to just remove the POLL_UNWANTED business
> altogether, as it seems both pointless and unportable on its face.
> Almost by definition, we can't know what "other" bits a given
> implementation might set.
> I'm not entirely following the point of including POLLRDHUP in
> POLL_EVENTS, either.  What's wrong with the traditional solution
> of detecting EOF?

So after studying that a bit longer, I think it's just wrong.
It's not the business of this code to be checking for connection
errors at all; that is libpq's province.  The libpq API specifies
that callers should wait for read-ready on the socket, and nothing
else.  So the only bit we need concern ourselves with is POLLIN.

I also seriously disliked both the details of the abstraction API
and its lack of documentation.  (Other people complained about that
upthread, too.)  So attached is a rewrite attempt.  There's still a
couple of grotty things about it; in particular the ppoll variant of
socket_has_input() knows more than one could wish about how it's being
used.  But I couldn't see a way to make it cleaner without significant
changes to the logic in threadRun, and that didn't seem better.

I think that Andres' concern upthread about iterating over a whole
lot of sockets is somewhat misplaced.  We aren't going to be iterating
over the entire set of client connections, only those being run by a
particular pgbench thread.  So assuming you're using a reasonable ratio
of threads to clients, there won't be very many to look at in any one
thread.  In any case, I'm dubious that we could get much of a win from
some other abstraction for waiting: both of these code paths do work
pretty much proportional to the number of connections the current
thread is responsible for, and it's hard to see how to avoid that.

I've tested this on both Linux and FreeBSD, and it seems to work fine.

I'm reasonably happy with this version of the patch, and would be
ready to commit it, but I thought I'd throw it out for another round
of review if anyone wants to.

            regards, tom lane

diff --git a/configure b/configure
index 9b30402..21ecd29 100755
--- a/configure
+++ b/configure
@@ -15093,7 +15093,7 @@ fi
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`

-for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate
pstatpthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime
utimeswcstombs_l 
+for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate
ppollpstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime
utimeswcstombs_l 
 do :
   as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
 ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
diff --git a/configure.in b/configure.in
index 2e60a89..8fe6894 100644
--- a/configure.in
+++ b/configure.in
@@ -1562,7 +1562,7 @@ PGAC_FUNC_WCSTOMBS_L
 LIBS_including_readline="$LIBS"
 LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'`

-AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate
pstatpthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime
utimeswcstombs_l]) 
+AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate
ppollpstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime
utimeswcstombs_l]) 

 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 41b756c..ae81aba 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -28,8 +28,8 @@
  */

 #ifdef WIN32
-#define FD_SETSIZE 1024            /* set before winsock2.h is included */
-#endif                            /* ! WIN32 */
+#define FD_SETSIZE 1024            /* must set before winsock2.h is included */
+#endif

 #include "postgres_fe.h"
 #include "fe_utils/conditional.h"
@@ -45,12 +45,21 @@
 #include <signal.h>
 #include <time.h>
 #include <sys/time.h>
+#ifdef HAVE_SYS_RESOURCE_H
+#include <sys/resource.h>        /* for getrlimit */
+#endif
+
+/* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
+#if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
+#define POLL_USING_PPOLL
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#else                            /* no ppoll(), so use select() */
+#define POLL_USING_SELECT
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
-
-#ifdef HAVE_SYS_RESOURCE_H
-#include <sys/resource.h>        /* for getrlimit */
 #endif

 #ifndef M_PI
@@ -71,6 +80,33 @@
 #define MM2_ROT                47

 /*
+ * Multi-platform socket set implementations
+ */
+
+#ifdef POLL_USING_PPOLL
+#define SOCKET_WAIT_METHOD "ppoll"
+
+typedef struct socket_set
+{
+    int            maxfds;            /* allocated length of pollfds[] array */
+    int            curfds;            /* number currently in use */
+    struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
+} socket_set;
+
+#endif                            /* POLL_USING_PPOLL */
+
+#ifdef POLL_USING_SELECT
+#define SOCKET_WAIT_METHOD "select"
+
+typedef struct socket_set
+{
+    int            maxfd;            /* largest FD currently set in fds */
+    fd_set        fds;
+} socket_set;
+
+#endif                            /* POLL_USING_SELECT */
+
+/*
  * Multi-platform pthread implementations
  */

@@ -93,13 +129,6 @@ static int    pthread_join(pthread_t th, void **thread_return);
 /********************************************************************
  * some configurable parameters */

-/* max number of clients allowed */
-#ifdef FD_SETSIZE
-#define MAXCLIENTS    (FD_SETSIZE - 10)
-#else
-#define MAXCLIENTS    1024
-#endif
-
 #define DEFAULT_INIT_STEPS "dtgvp"    /* default -I setting */

 #define LOG_STEP_SECONDS    5    /* seconds between log messages */
@@ -523,8 +552,14 @@ static void processXactStats(TState *thread, CState *st, instr_time *now,
 static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
 static void addScript(ParsedScript script);
 static void *threadRun(void *arg);
-static void setalarm(int seconds);
 static void finishCon(CState *st);
+static void setalarm(int seconds);
+static socket_set *alloc_socket_set(int count);
+static void free_socket_set(socket_set *sa);
+static void clear_socket_set(socket_set *sa);
+static void add_socket_to_set(socket_set *sa, int fd, int idx);
+static int    wait_on_socket_set(socket_set *sa, int64 usecs);
+static bool socket_has_input(socket_set *sa, int fd, int idx);


 /* callback functions for our flex lexer */
@@ -4903,7 +4938,7 @@ main(int argc, char **argv)
             case 'c':
                 benchmarking_option_set = true;
                 nclients = atoi(optarg);
-                if (nclients <= 0 || nclients > MAXCLIENTS)
+                if (nclients <= 0)
                 {
                     fprintf(stderr, "invalid number of clients: \"%s\"\n",
                             optarg);
@@ -5606,6 +5641,7 @@ threadRun(void *arg)
                 end;
     int            nstate = thread->nstate;
     int            remains = nstate;    /* number of remaining clients */
+    socket_set *sockets = alloc_socket_set(nstate);
     int            i;

     /* for reporting progress: */
@@ -5673,14 +5709,16 @@ threadRun(void *arg)
     /* loop till all clients have terminated */
     while (remains > 0)
     {
-        fd_set        input_mask;
-        int            maxsock;    /* max socket number to be waited for */
+        int            nsocks;        /* number of sockets to be waited for */
         int64        min_usec;
         int64        now_usec = 0;    /* set this only if needed */

-        /* identify which client sockets should be checked for input */
-        FD_ZERO(&input_mask);
-        maxsock = -1;
+        /*
+         * identify which client sockets should be checked for input, and
+         * compute the nearest time (if any) at which we need to wake up.
+         */
+        clear_socket_set(sockets);
+        nsocks = 0;
         min_usec = PG_INT64_MAX;
         for (i = 0; i < nstate; i++)
         {
@@ -5728,9 +5766,7 @@ threadRun(void *arg)
                     goto done;
                 }

-                FD_SET(sock, &input_mask);
-                if (maxsock < sock)
-                    maxsock = sock;
+                add_socket_to_set(sockets, sock, nsocks++);
             }
             else if (st->state != CSTATE_ABORTED &&
                      st->state != CSTATE_FINISHED)
@@ -5764,35 +5800,29 @@ threadRun(void *arg)

         /*
          * If no clients are ready to execute actions, sleep until we receive
-         * data from the server, or a nap-time specified in the script ends,
-         * or it's time to print a progress report.  Update input_mask to show
-         * which client(s) received data.
+         * data on some client socket or the timeout (if any) elapses.
          */
         if (min_usec > 0)
         {
-            int            nsocks = 0; /* return from select(2) if called */
+            int            rc = 0;

             if (min_usec != PG_INT64_MAX)
             {
-                if (maxsock != -1)
+                if (nsocks > 0)
                 {
-                    struct timeval timeout;
-
-                    timeout.tv_sec = min_usec / 1000000;
-                    timeout.tv_usec = min_usec % 1000000;
-                    nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+                    rc = wait_on_socket_set(sockets, min_usec);
                 }
                 else            /* nothing active, simple sleep */
                 {
                     pg_usleep(min_usec);
                 }
             }
-            else                /* no explicit delay, select without timeout */
+            else                /* no explicit delay, wait without timeout */
             {
-                nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+                rc = wait_on_socket_set(sockets, 0);
             }

-            if (nsocks < 0)
+            if (rc < 0)
             {
                 if (errno == EINTR)
                 {
@@ -5800,19 +5830,20 @@ threadRun(void *arg)
                     continue;
                 }
                 /* must be something wrong */
-                fprintf(stderr, "select() failed: %s\n", strerror(errno));
+                fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno));
                 goto done;
             }
         }
         else
         {
-            /* min_usec == 0, i.e. something needs to be executed */
+            /* min_usec <= 0, i.e. something needs to be executed now */

-            /* If we didn't call select(), don't try to read any data */
-            FD_ZERO(&input_mask);
+            /* If we didn't wait, don't try to read any data */
+            clear_socket_set(sockets);
         }

         /* ok, advance the state machine of each connection */
+        nsocks = 0;
         for (i = 0; i < nstate; i++)
         {
             CState       *st = &state[i];
@@ -5829,7 +5860,7 @@ threadRun(void *arg)
                     goto done;
                 }

-                if (!FD_ISSET(sock, &input_mask))
+                if (!socket_has_input(sockets, sock, nsocks++))
                     continue;
             }
             else if (st->state == CSTATE_FINISHED ||
@@ -5967,6 +5998,7 @@ done:
         fclose(thread->logfile);
         thread->logfile = NULL;
     }
+    free_socket_set(sockets);
     return NULL;
 }

@@ -6025,8 +6057,185 @@ setalarm(int seconds)
     }
 }

+#endif                            /* WIN32 */
+
+
+/*
+ * These functions provide an abstraction layer that hides the syscall
+ * we use to wait for input on a set of sockets.
+ *
+ * Currently there are two implementations, based on ppoll(2) and select(2).
+ * ppoll() is preferred where available due to its typically higher ceiling
+ * on the number of usable sockets.  We do not use the more-widely-available
+ * poll(2) because it only offers millisecond timeout resolution, which could
+ * be problematic with high --rate settings.
+ *
+ * Function APIs:
+ *
+ * alloc_socket_set: allocate an empty socket set with room for up to
+ *        "count" sockets.
+ *
+ * free_socket_set: deallocate a socket set.
+ *
+ * clear_socket_set: reset a socket set to empty.
+ *
+ * add_socket_to_set: add socket with indicated FD to slot "idx" in the
+ *        socket set.  Slots must be filled in order, starting with 0.
+ *
+ * wait_on_socket_set: wait for input on any socket in set, or for timeout
+ *        to expire.  timeout is measured in microseconds; 0 means wait forever.
+ *        Returns result code of underlying syscall (>=0 if OK, else see errno).
+ *
+ * socket_has_input: after waiting, call this to see if given socket has
+ *        input.  fd and idx parameters should match some previous call to
+ *        add_socket_to_set.
+ *
+ * Note that wait_on_socket_set destructively modifies the state of the
+ * socket set.  After checking for input, caller must apply clear_socket_set
+ * and add_socket_to_set again before waiting again.
+ */
+
+#ifdef POLL_USING_PPOLL
+
+static socket_set *
+alloc_socket_set(int count)
+{
+    socket_set *sa;
+
+    sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
+                                   sizeof(struct pollfd) * count);
+    sa->maxfds = count;
+    sa->curfds = 0;
+    return sa;
+}
+
+static void
+free_socket_set(socket_set *sa)
+{
+    pg_free(sa);
+}
+
+static void
+clear_socket_set(socket_set *sa)
+{
+    sa->curfds = 0;
+}
+
+static void
+add_socket_to_set(socket_set *sa, int fd, int idx)
+{
+    Assert(idx < sa->maxfds && idx == sa->curfds);
+    sa->pollfds[idx].fd = fd;
+    sa->pollfds[idx].events = POLLIN;
+    sa->pollfds[idx].revents = 0;
+    sa->curfds++;
+}
+
+static int
+wait_on_socket_set(socket_set *sa, int64 usecs)
+{
+    if (usecs > 0)
+    {
+        struct timespec timeout;
+
+        timeout.tv_sec = usecs / 1000000;
+        timeout.tv_nsec = (usecs % 1000000) * 1000;
+        return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
+    }
+    else
+    {
+        return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
+    }
+}
+
+static bool
+socket_has_input(socket_set *sa, int fd, int idx)
+{
+    /*
+     * In some cases, threadRun will apply clear_socket_set and then try to
+     * apply socket_has_input anyway with arguments that it used before that,
+     * or might've used before that except that it exited its setup loop
+     * early.  Hence, if the socket set is empty, silently return false
+     * regardless of the parameters.  If it's not empty, we can Assert that
+     * the parameters match a previous call.
+     */
+    if (sa->curfds == 0)
+        return false;
+
+    Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
+    return (sa->pollfds[idx].revents & POLLIN) != 0;
+}
+
+#endif                            /* POLL_USING_PPOLL */
+
+#ifdef POLL_USING_SELECT
+
+static socket_set *
+alloc_socket_set(int count)
+{
+    return (socket_set *) pg_malloc0(sizeof(socket_set));
+}
+
+static void
+free_socket_set(socket_set *sa)
+{
+    pg_free(sa);
+}
+
+static void
+clear_socket_set(socket_set *sa)
+{
+    FD_ZERO(&sa->fds);
+    sa->maxfd = -1;
+}
+
+static void
+add_socket_to_set(socket_set *sa, int fd, int idx)
+{
+    if (fd < 0 || fd >= FD_SETSIZE)
+    {
+        /*
+         * Doing a hard exit here is a bit grotty, but it doesn't seem worth
+         * complicating the API to make it less grotty.
+         */
+        fprintf(stderr, "too many client connections for select()\n");
+        exit(1);
+    }
+    FD_SET(fd, &sa->fds);
+    if (fd > sa->maxfd)
+        sa->maxfd = fd;
+}
+
+static int
+wait_on_socket_set(socket_set *sa, int64 usecs)
+{
+    if (usecs > 0)
+    {
+        struct timeval timeout;
+
+        timeout.tv_sec = usecs / 1000000;
+        timeout.tv_usec = usecs % 1000000;
+        return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
+    }
+    else
+    {
+        return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
+    }
+}
+
+static bool
+socket_has_input(socket_set *sa, int fd, int idx)
+{
+    return (FD_ISSET(fd, &sa->fds) != 0);
+}
+
+#endif                            /* POLL_USING_SELECT */
+
+
 /* partial pthread implementation for Windows */

+#ifdef WIN32
+
 typedef struct win32_pthread
 {
     HANDLE        handle;
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 4094e22..5d40796 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -443,6 +443,9 @@
 /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */
 #undef HAVE_PPC_LWARX_MUTEX_HINT

+/* Define to 1 if you have the `ppoll' function. */
+#undef HAVE_PPOLL
+
 /* Define to 1 if you have the `pstat' function. */
 #undef HAVE_PSTAT

diff --git a/src/include/pg_config.h.win32 b/src/include/pg_config.h.win32
index 6618b43..182698a 100644
--- a/src/include/pg_config.h.win32
+++ b/src/include/pg_config.h.win32
@@ -327,6 +327,9 @@
 /* Define to 1 if you have the `posix_fallocate' function. */
 /* #undef HAVE_POSIX_FALLOCATE */

+/* Define to 1 if you have the `ppoll' function. */
+/* #undef HAVE_PPOLL */
+
 /* Define to 1 if you have the `pstat' function. */
 /* #undef HAVE_PSTAT */

diff --git a/src/template/linux b/src/template/linux
index f820bf7..e392908 100644
--- a/src/template/linux
+++ b/src/template/linux
@@ -6,6 +6,7 @@ if test x"$PREFERRED_SEMAPHORES" = x"" ; then
 fi

 # Force _GNU_SOURCE on; plperl is broken with Perl 5.8.0 otherwise
+# This is also required for ppoll(2), and perhaps other things
 CPPFLAGS="$CPPFLAGS -D_GNU_SOURCE"

 # If --enable-profiling is specified, we need -DLINUX_PROFILE

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Chapman Flack
Дата:
Сообщение: Re: vary read_only in SPI calls? or poke at the on-entry snapshot?
Следующее
От: Michael Paquier
Дата:
Сообщение: Re: testing pg_dump against very old versions