Re: [PATCH] pgbench --throttle (submission 7 - with lag measurement)

Поиск
Список
Период
Сортировка
От Tatsuo Ishii
Тема Re: [PATCH] pgbench --throttle (submission 7 - with lag measurement)
Дата
Msg-id 20130718.140414.281967846374669830.t-ishii@sraoss.co.jp
обсуждение исходный текст
Ответ на Re: [PATCH] pgbench --throttle (submission 7 - with lag measurement)  (Tatsuo Ishii <ishii@postgresql.org>)
Ответы Re: [PATCH] pgbench --throttle (submission 7 - with lag measurement)  (Fabien COELHO <fabien.coelho@mines-paristech.fr>)
Список pgsql-hackers
>> Sorry about that, with your clarification I see what you were trying
>> to explain now.  The code initializes the target time like this:
>> 
>> thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
>> 
>> And then each time a transaction fires, it advances the reference time
>> forward based on the expected rate:
>> 
>> thread->throttle_trigger += wait;
>> 
>> It does *not* reset thread->throttle_trigger based on when the
>> previous transaction ended / when the next transaction started.  If
>> the goal is 10us transaction times, it beats a steady drum saying the
>> transactions should come at 10us, 20us, 30us (on average--there's some
>> randomness in the goals).  It does not pay any attention to when the
>> previous transactions finished.
>> 
>> That means that if an early transaction takes an extra 1000us, every
>> transaction after that will also show as 1000us late--even if all of
>> them take 10us.  You expect that those later transactions will show 0
>> lag, since they took the right duration.  For that to happen,
>> thread->throttle_trigger would need to be re-initialized with the
>> current time at the end of each completed transaction.
> 
> Yes, that's exactly what I understand from the code.
> 
>> The lag computation was not the interesting part of this feature to
>> me.  As I said before, I considered it more of a debugging level thing
>> than a number people would analyze as much as you did.  I understand
>> why you don't like it though.  If the reference time was moved forward
>> to match the transaction end each time, I think that would give the
>> lag definition you're looking for.  That's fine to me too, if Fabien
>> doesn't have a good reason to reject the idea.  We would need to make
>> sure that doesn't break some part of the design too.
> 
> I would like to hear from Fabien about the issue too.

For your information, included is the patch against git master head to
implement the lag in a way what I proposed. With the patch, I get more
consistent number on Linux (and Mac OS X).
--
Tatsuo Ishii
SRA OSS, Inc. Japan
English: http://www.sraoss.co.jp/index_en.php
Japanese: http://www.sraoss.co.jp
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 2ad8f0b..57e62dc 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -137,6 +137,12 @@ int            unlogged_tables = 0;double        sample_rate = 0.0;/*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64        throttle_delay = 0;
+
+/* * tablespace selection */char       *tablespace = NULL;
@@ -202,11 +208,15 @@ typedef struct    int            listen;            /* 0 indicates that an async query has been
                             * sent */    int            sleeping;        /* 1 indicates that the client is napping */
 
+    bool        throttling;     /* whether nap is for throttling */    int64        until;            /* napping until
(usec)*/
 
+    int64        wait;            /* randomly generated delay (usec) */    Variable   *variables;        /* array of
variabledefinitions */    int            nvariables;    instr_time    txn_begin;        /* used for measuring
transactionlatencies */
 
+    instr_time  txn_begin_throttle;        /* tx start time used when transaction throttling enabled */    instr_time
 stmt_begin;        /* used for measuring statement latencies */
 
+    bool        is_throttled;    /* whether transaction throttling is done */    int            use_file;        /*
indexin sql_files for this client */    bool        prepared[MAX_FILES];} CState;
 
@@ -224,6 +234,9 @@ typedef struct    instr_time *exec_elapsed;    /* time spent executing cmds (per Command) */    int
         *exec_count;        /* number of cmd executions (per Command) */    unsigned short random_state[3];        /*
separaterandomness for each thread */
 
+    int64       throttle_trigger;     /* previous/next throttling (us) */
+    int64       throttle_lag;         /* total transaction lag behind throttling */
+    int64       throttle_lag_max;     /* max transaction lag */} TState;#define INVALID_THREAD        ((pthread_t) 0)
@@ -232,6 +245,8 @@ typedef struct{    instr_time    conn_time;    int            xacts;
+    int64       throttle_lag;
+    int64       throttle_lag_max;} TResult;/*
@@ -356,6 +371,7 @@ usage(void)           "  -N, --skip-some-updates  skip updates of pgbench_tellers and
pgbench_branches\n"          "  -P, --progress=NUM       show thread progress report every NUM seconds\n"           "
-r,--report-latencies   report average latency per command\n"
 
+           "  -R, --rate=SPEC          target rate in transactions per second\n"           "  -s, --scale=NUM
reportthis scale factor in output\n"           "  -S, --select-only        perform SELECT-only transactions\n"
"  -t, --transactions       number of transactions each client runs "
 
@@ -898,19 +914,80 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa{    PGresult
*res;   Command   **commands;
 
+    bool        trans_needs_throttle = false;top:    commands = sql_files[st->use_file];
+    /*
+     * Handle throttling once per transaction by sleeping.  It is simpler
+     * to do this here rather than at the end, because so much complicated
+     * logic happens below when statements finish.
+     */
+    if (throttle_delay && ! st->is_throttled)
+    {
+        /*
+         * Use inverse transform sampling to randomly generate a delay, such
+         * that the series of delays will approximate a Poisson distribution
+         * centered on the throttle_delay time.
+         *
+         * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
+         *
+         * If transactions are too slow or a given wait is shorter than
+         * a transaction, the next transaction will start right away.
+         */
+        int64 wait = (int64)
+            throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+        thread->throttle_trigger += wait;
+
+        st->until = thread->throttle_trigger;
+        st->wait = wait;
+        st->sleeping = 1;
+        st->throttling = true;
+        st->is_throttled = true;
+        if (debug)
+            fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+                    st->id, wait);
+
+    }
+    if (st->sleeping)    {                            /* are we sleeping? */        instr_time    now;
+        int64 now_us;
+        int64 start_us;        INSTR_TIME_SET_CURRENT(now);
-        if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+        now_us = INSTR_TIME_GET_MICROSEC(now);
+        if (st->until <= now_us)
+        {            st->sleeping = 0;    /* Done sleeping, go ahead with next command */
+            start_us = INSTR_TIME_GET_MICROSEC(st->txn_begin_throttle);
+            if (start_us <= 0)
+                start_us = INSTR_TIME_GET_MICROSEC(thread->start_time);
+
+            if (st->throttling)
+            {
+                /* Measure lag of throttled transaction relative to target */
+                int64 lag = now_us - start_us - st->wait;
+
+                if (debug)
+                    fprintf(stderr, "stmt_begin: "INT64_FORMAT" now_us: "INT64_FORMAT" wait:"INT64_FORMAT"
until:"INT64_FORMAT"lag:"INT64_FORMAT"\n", start_us, now_us, st->wait, st->until, lag);
 
+
+                thread->throttle_lag += lag;
+                if (lag > thread->throttle_lag_max)
+                    thread->throttle_lag_max = lag;
+                st->throttling = false;
+            }
+        }        else
+        {
+            if (debug)
+                fprintf(stderr, "still sleeping\n");
+            return true;        /* Still sleeping, nothing to do here */
+        }    }    if (st->listen)
@@ -1095,6 +1172,15 @@ top:            st->state = 0;            st->use_file = (int) getrand(thread, 0, num_files -
1);           commands = sql_files[st->use_file];
 
+            st->is_throttled = false;
+            /*
+             * No transaction is underway anymore, which means there is nothing
+             * to listen to right now.  When throttling rate limits are active,
+             * a sleep will happen next, as the next transaction starts.  And
+             * then in any case the next SQL command will set listen back to 1.
+             */
+            st->listen = 0;
+            trans_needs_throttle = (throttle_delay>0);        }    }
@@ -1113,6 +1199,16 @@ top:        INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);    }
+    /*
+     * This ensures that a throttling delay is inserted before proceeding
+     * with sql commands, after the first transaction. The first transaction
+     * throttling is performed when first entering doCustom.
+     */
+    if (trans_needs_throttle) {
+        trans_needs_throttle = false;
+        goto top;
+    }
+    /* Record transaction start time if logging is enabled */    if (logfile && st->state == 0)
INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -1121,6 +1217,9 @@ top:    if (is_latencies)        INSTR_TIME_SET_CURRENT(st->stmt_begin);
+    if (throttle_delay)
+        INSTR_TIME_SET_CURRENT(st->txn_begin_throttle);
+    if (commands[st->state]->type == SQL_COMMAND)    {        const Command *command = commands[st->state];
@@ -2017,7 +2116,8 @@ process_builtin(char *tb)static voidprintResults(int ttype, int normal_xacts, int nclients,
     TState *threads, int nthreads,
 
-             instr_time total_time, instr_time conn_total_time)
+             instr_time total_time, instr_time conn_total_time,
+             int64 throttle_lag, int64 throttle_lag_max){    double        time_include,                tps_include,
@@ -2055,6 +2155,19 @@ printResults(int ttype, int normal_xacts, int nclients,        printf("number of transactions
actuallyprocessed: %d\n",               normal_xacts);    }
 
+
+    if (throttle_delay)
+    {
+        /*
+         * Report average transaction lag under rate limit throttling.  This
+         * is the delay between scheduled and actual start times for the
+         * transaction.  The measured lag may be caused by thread/client load,
+         * the database load, or the Poisson throttling process.
+         */
+        printf("average rate limit lag: %.3f ms (max %.3f ms)\n",
+               0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+    }
+    printf("tps = %f (including connections establishing)\n", tps_include);    printf("tps = %f (excluding connections
establishing)\n",tps_exclude);
 
@@ -2140,6 +2253,7 @@ main(int argc, char **argv)        {"unlogged-tables", no_argument, &unlogged_tables, 1},
{"sampling-rate",required_argument, NULL, 4},        {"aggregate-interval", required_argument, NULL, 5},
 
+        {"rate", required_argument, NULL, 'R'},        {NULL, 0, NULL, 0}    };
@@ -2162,6 +2276,8 @@ main(int argc, char **argv)    instr_time    total_time;    instr_time    conn_total_time;    int
          total_xacts;
 
+    int64       throttle_lag = 0;
+    int64       throttle_lag_max = 0;    int            i;
@@ -2206,7 +2322,7 @@ main(int argc, char **argv)    state = (CState *) pg_malloc(sizeof(CState));    memset(state, 0,
sizeof(CState));
-    while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1)
+    while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
{       switch (c)        {
 
@@ -2371,6 +2487,19 @@ main(int argc, char **argv)                    exit(1);                }                break;
+            case 'R':
+            {
+                /* get a double from the beginning of option value */
+                double throttle_value = atof(optarg);
+                if (throttle_value <= 0.0)
+                {
+                    fprintf(stderr, "invalid rate limit: %s\n", optarg);
+                    exit(1);
+                }
+                /* Invert rate limit into a time offset */
+                throttle_delay = (int64) (1000000.0 / throttle_value);
+            }
+                break;            case 0:                /* This covers long options which take no argument. */
       break;
 
@@ -2408,6 +2537,9 @@ main(int argc, char **argv)        }    }
+    /* compute a per thread delay */
+    throttle_delay *= nthreads;
+    if (argc > optind)        dbName = argv[optind];    else
@@ -2721,6 +2853,9 @@ main(int argc, char **argv)            TResult    *r = (TResult *) ret;            total_xacts +=
r->xacts;
+            throttle_lag += r->throttle_lag;
+            if (r->throttle_lag_max > throttle_lag_max)
+                throttle_lag_max = r->throttle_lag_max;            INSTR_TIME_ADD(conn_total_time, r->conn_time);
     free(ret);        }
 
@@ -2731,7 +2866,7 @@ main(int argc, char **argv)    INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time,start_time);    printResults(ttype, total_xacts, nclients, threads, nthreads,
 
-                 total_time, conn_total_time);
+                 total_time, conn_total_time, throttle_lag, throttle_lag_max);    return 0;}
@@ -2756,6 +2891,17 @@ threadRun(void *arg)    AggVals        aggs;
+    /*
+     * Initialize throttling rate target for all of the thread's clients.  It
+     * might be a little more accurate to reset thread->start_time here too.
+     * The possible drift seems too small relative to typical throttle delay
+     * times to worry about it.
+     */
+    INSTR_TIME_SET_CURRENT(start);
+    thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+    thread->throttle_lag = 0;
+    thread->throttle_lag_max = 0;
+    result = pg_malloc(sizeof(TResult));    INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2831,25 +2977,38 @@ threadRun(void *arg)            Command   **commands = sql_files[st->use_file];            int
         sock;
 
-            if (st->sleeping)
+            if (st->con == NULL)            {
-                int            this_usec;
-
-                if (min_usec == INT64_MAX)
+                continue;
+            }
+            else if (st->sleeping)
+            {
+                if (st->throttling && timer_exceeded)                {
-                    instr_time    now;
-
-                    INSTR_TIME_SET_CURRENT(now);
-                    now_usec = INSTR_TIME_GET_MICROSEC(now);
+                    /* interrupt client which has not started a transaction */
+                    remains--;
+                    st->sleeping = 0;
+                    st->throttling = false;
+                    PQfinish(st->con);
+                    st->con = NULL;
+                    continue;                }
+                else /* just a nap from the script */
+                {
+                    int            this_usec;
-                this_usec = st->until - now_usec;
-                if (min_usec > this_usec)
-                    min_usec = this_usec;
-            }
-            else if (st->con == NULL)
-            {
-                continue;
+                    if (min_usec == INT64_MAX)
+                    {
+                        instr_time    now;
+
+                        INSTR_TIME_SET_CURRENT(now);
+                        now_usec = INSTR_TIME_GET_MICROSEC(now);
+                    }
+
+                    this_usec = st->until - now_usec;
+                    if (min_usec > this_usec)
+                        min_usec = this_usec;
+                }            }            else if (commands[st->state]->type == META_COMMAND)            {
@@ -2986,6 +3145,8 @@ done:    result->xacts = 0;    for (i = 0; i < nstate; i++)        result->xacts +=
state[i].cnt;
+    result->throttle_lag = thread->throttle_lag;
+    result->throttle_lag_max = thread->throttle_lag_max;    INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time,end, start);    if (logfile) 

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Atri Sharma
Дата:
Сообщение: Re: Proposal/design feedback needed: WITHIN GROUP (sql standard ordered set aggregate functions)
Следующее
От: Fabien COELHO
Дата:
Сообщение: Re: [PATCH] pgbench --throttle (submission 7 - with lag measurement)