Hi all,
We have enhanced pgbench so that it accepts a series of SQL commands
in a file(see attached patches against 8.0.3). This would make it
possible to test various sets of SQL commands. In the file it is
allowed to use a "meta command". Currently only "\setrandom" meta
command is allowed, which sets specified random number into a
variable. For example,
\setrandom aid 1 100000
will set a random number into variable "aid" between 1 and 10000.
A variable can be reffered to in an SQL command by adding ":" in front
of the the command name.
Here is an example SQL command file.
\setrandom aid 1 100000
\setrandom bid 1 1
\setrandom tid 1 10
\setrandom delta 1 10000
BEGIN
UPDATE accounts SET abalance = abalance + :delta WHERE aid = :aid
SELECT abalance FROM accounts WHERE aid = :aid
UPDATE tellers SET tbalance = tbalance + :delta WHERE tid = :tid
UPDATE branches SET bbalance = bbalance + :delta WHERE bid = :bid
INSERT INTO history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, 'now')
END
This will execute virtually same SQL commands builtin pgbench.
To use the SQL command file, you can use "-f" option:
pgbench -f /foo/bar/sqlfile
I think the enhanced pgbench is quite usefull and I would like to
include in 8.1. Or should I keep it for 8.2?
--
SRA OSS, Inc. Japan
Tatsuo Ishii
*** pgbench/pgbench.c 2004-11-09 15:09:31.000000000 +0900
--- pgbench-new/pgbench.c 2005-09-27 14:31:34.000000000 +0900
***************
*** 41,46 ****
--- 41,49 ---- #include <sys/resource.h> #endif /* ! WIN32 */
+ #include <ctype.h>
+ #include <search.h>
+ extern char *optarg; extern int optind;
***************
*** 72,77 ****
--- 75,83 ---- #define ntellers 10 #define naccounts 100000
+ #define SQL_COMMAND 1
+ #define META_COMMAND 2
+ FILE *LOGFILE = NULL; bool use_log; /* log transaction latencies to a file */
***************
*** 91,96 ****
--- 97,108 ---- typedef struct {
+ char *name;
+ char *value;
+ } Variable;
+
+ typedef struct
+ { PGconn *con; /* connection handle to DB */ int id; /* client No.
*/ int state; /* state No. */
***************
*** 103,115 **** int tid; /* teller id for this transaction */ int delta;
int abalance; struct timeval txn_begin; /* used for measuring latencies */ } CState; static void
usage(void){
! fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s
scaling_factor][-n][-C][-v][-S][-N][-l][-Ulogin][-P password][-d][dbname]\n"); fprintf(stderr, "(initialize mode):
pgbench-i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n"); }
--- 115,137 ---- int tid; /* teller id for this transaction */ int delta;
int abalance;
+ void *variables; struct timeval txn_begin; /* used for measuring latencies */ } CState;
+ typedef struct
+ {
+ int type;
+ int argc;
+ char **argv;
+ } Command;
+
+ Command **commands = NULL;
+ static void usage(void) {
! fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s
scaling_factor][-n][-C][-v][-S][-N][-ffilename][-l][-U login][-P password][-d][dbname]\n"); fprintf(stderr,
"(initializemode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n"); }
***************
*** 190,195 ****
--- 212,326 ---- return (0); /* OK */ }
+ static int
+ compareVariables(const void *v1, const void *v2)
+ {
+ return strcmp(((Variable *)v1)->name, ((Variable *)v2)->name);
+ }
+
+ static char *
+ getVariable(CState * st, char *name)
+ {
+ Variable key = { name }, *var;
+
+ var = tfind(&key, &st->variables, compareVariables);
+ if (var != NULL)
+ return (*(Variable **)var)->value;
+ else
+ return NULL;
+ }
+
+ static int
+ putVariable(CState * st, char *name, char *value)
+ {
+ Variable key = { name }, *var;
+
+ var = tfind(&key, &st->variables, compareVariables);
+ if (var == NULL)
+ {
+ if ((var = malloc(sizeof(Variable))) == NULL)
+ return false;
+
+ var->name = NULL;
+ var->value = NULL;
+
+ if ((var->name = strdup(name)) == NULL
+ || (var->value = strdup(value)) == NULL
+ || tsearch(var, &st->variables, compareVariables) == NULL)
+ {
+ free(var->name);
+ free(var->value);
+ free(var);
+ return false;
+ }
+ }
+ else
+ {
+ free((*(Variable **)var)->value);
+ if (((*(Variable **)var)->value = strdup(value)) == NULL)
+ return false;
+ }
+
+ return true;
+ }
+
+ static char *
+ assignVariables(CState * st, char *sql)
+ {
+ int i, j;
+ char *p, *name, *val;
+ void *tmp;
+
+ i = 0;
+ while ((p = strchr(&sql[i], ':')) != NULL)
+ {
+ i = j = p - sql;
+ do
+ i++;
+ while (isalnum(sql[i]) != 0 || sql[i] == '_');
+ if (i == j + 1)
+ continue;
+
+ if ((name = strndup(&sql[j + 1], i - (j + 1))) == NULL)
+ return NULL;
+ val = getVariable(st, name);
+ free(name);
+ if (val == NULL)
+ continue;
+
+ if (strlen(val) > i - j)
+ {
+ tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
+ if (tmp == NULL)
+ {
+ free(sql);
+ return NULL;
+ }
+ sql = tmp;
+ }
+
+ if (strlen(val) != i - j)
+ memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
+
+ strncpy(&sql[j], val, strlen(val));
+
+ if (strlen(val) < i - j)
+ {
+ tmp = realloc(sql, strlen(sql) + 1);
+ if (tmp == NULL)
+ {
+ free(sql);
+ return NULL;
+ }
+ sql = tmp;
+ }
+
+ i = j + strlen(val);
+ }
+
+ return sql;
+ }
+ /* process a transaction */ static void doOne(CState * state, int n, int debug, int ttype)
***************
*** 465,470 ****
--- 596,765 ---- } }
+ static void
+ doCustom(CState * state, int n, int debug)
+ {
+ PGresult *res;
+ CState *st = &state[n];
+
+ if (st->listen)
+ { /* are we receiver? */
+ if (commands[st->state]->type == SQL_COMMAND)
+ {
+ if (debug)
+ fprintf(stderr, "client %d receiving\n", n);
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n,
st->state);
+ remains--; /* I've aborted */
+ PQfinish(st->con);
+ st->con = NULL;
+ return;
+ }
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
+ }
+
+ /*
+ * transaction finished: record the time it took in the
+ * log
+ */
+ if (use_log && commands[st->state + 1] == NULL)
+ {
+ double diff;
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+ diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
+ (int) (now.tv_usec - st->txn_begin.tv_usec);
+
+ fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
+ }
+
+ if (commands[st->state]->type == SQL_COMMAND)
+ {
+ res = PQgetResult(st->con);
+ if (strncasecmp(commands[st->state]->argv[0], "select", 6) != 0)
+ {
+ if (check(state, res, n, PGRES_COMMAND_OK))
+ return;
+ }
+ else
+ {
+ if (check(state, res, n, PGRES_TUPLES_OK))
+ return;
+ }
+ PQclear(res);
+ discard_response(st);
+ }
+
+ if (commands[st->state + 1] == NULL)
+ {
+ if (is_connect)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+
+ if (++st->cnt >= nxacts)
+ {
+ remains--; /* I'm done */
+ if (st->con != NULL)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ return;
+ }
+ }
+
+ /* increment state counter */
+ st->state++;
+ if (commands[st->state] == NULL)
+ st->state = 0;
+ }
+
+ if (st->con == NULL)
+ {
+ if ((st->con = doConnect()) == NULL)
+ {
+ fprintf(stderr, "Client %d aborted in establishing connection.\n",
+ n);
+ remains--; /* I've aborted */
+ PQfinish(st->con);
+ st->con = NULL;
+ return;
+ }
+ }
+
+ if (use_log && st->state == 0)
+ gettimeofday(&(st->txn_begin), NULL);
+
+ if (commands[st->state]->type == SQL_COMMAND)
+ {
+ char *sql;
+
+ if ((sql = strdup(commands[st->state]->argv[0])) == NULL
+ || (sql = assignVariables(st, sql)) == NULL)
+ {
+ fprintf(stderr, "out of memory\n");
+ st->ecnt++;
+ return;
+ }
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", n, sql);
+ if (PQsendQuery(st->con, sql) == 0)
+ {
+ if (debug)
+ fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
+ st->ecnt++;
+ }
+ else
+ {
+ st->listen++; /* flags that should be listened */
+ }
+ }
+ else if (commands[st->state]->type == META_COMMAND)
+ {
+ int argc = commands[st->state]->argc, i;
+ char **argv = commands[st->state]->argv;
+
+ if (debug)
+ {
+ fprintf(stderr, "client %d executing \\%s", n, argv[0]);
+ for (i = 1; i < argc; i++)
+ fprintf(stderr, " %s", argv[i]);
+ fprintf(stderr, "\n");
+ }
+
+ if (strcasecmp(argv[0], "setrandom") == 0)
+ {
+ char *val;
+
+ if ((val = malloc(strlen(argv[3]) + 1)) == NULL)
+ {
+ fprintf(stderr, "%s: out of memory\n", argv[0]);
+ st->ecnt++;
+ return;
+ }
+
+ sprintf(val, "%d", getrand(atoi(argv[2]), atoi(argv[3])));
+
+ if (putVariable(st, argv[1], val) == false)
+ {
+ fprintf(stderr, "%s: out of memory\n", argv[0]);
+ free(val);
+ st->ecnt++;
+ return;
+ }
+
+ free(val);
+ st->listen++;
+ }
+ }
+ }
+ /* discard connections */ static void disconnect_all(CState * state)
***************
*** 644,649 ****
--- 939,1098 ---- PQfinish(con); }
+ static int
+ process_file(char *filename)
+ {
+ const char delim[] = " \f\n\r\t\v";
+
+ FILE *fd;
+ int lineno, i, j;
+ char buf[BUFSIZ], *p, *tok;
+ void *tmp;
+
+ if (strcmp(filename, "-") == 0)
+ fd = stdin;
+ else if ((fd = fopen(filename, "r")) == NULL)
+ {
+ fprintf(stderr, "%s: %s\n", strerror(errno), filename);
+ return false;
+ }
+
+ fprintf(stderr, "processing file...\n");
+
+ lineno = 1;
+ i = 0;
+ while (fgets(buf, sizeof(buf), fd) != NULL)
+ {
+ if ((p = strchr(buf, '\n')) != NULL)
+ *p = '\0';
+ p = buf;
+ while (isspace(*p))
+ p++;
+ if (*p == '\0' || strncmp(p, "--", 2) == 0)
+ {
+ lineno++;
+ continue;
+ }
+
+ if ((tmp = realloc(commands, sizeof(Command *) * (i + 1))) == NULL)
+ {
+ i--;
+ goto error;
+ }
+ commands = tmp;
+
+ if ((commands[i] = malloc(sizeof(Command))) == NULL)
+ goto error;
+
+ commands[i]->argv = NULL;
+ commands[i]->argc = 0;
+
+ if (*p == '\\')
+ {
+ commands[i]->type = META_COMMAND;
+
+ j = 0;
+ tok = strtok(++p, delim);
+ while (tok != NULL)
+ {
+ tmp = realloc(commands[i]->argv, sizeof(char *) * (j + 1));
+ if (tmp == NULL)
+ goto error;
+ commands[i]->argv = tmp;
+
+ if ((commands[i]->argv[j] = strdup(tok)) == NULL)
+ goto error;
+
+ commands[i]->argc++;
+
+ j++;
+ tok = strtok(NULL, delim);
+ }
+
+ if (strcasecmp(commands[i]->argv[0], "setrandom") == 0)
+ {
+ int min, max;
+
+ if (commands[i]->argc < 4)
+ {
+ fprintf(stderr, "%s: %d: \\%s: missing argument\n", filename, lineno, commands[i]->argv[0]);
+ goto error;
+ }
+
+ for (j = 4; j < commands[i]->argc; j++)
+ fprintf(stderr, "%s: %d: \\%s: extra argument \"%s\" ignored\n", filename, lineno,
commands[i]->argv[0],commands[i]->argv[j]);
+
+ if ((min = atoi(commands[i]->argv[2])) < 0)
+ {
+ fprintf(stderr, "%s: %d: \\%s: invalid minimum number %s\n", filename, lineno,
commands[i]->argv[0],commands[i]->argv[2]);
+ goto error;
+ }
+
+ if ((max = atoi(commands[i]->argv[3])) < min || max > RAND_MAX)
+ {
+ fprintf(stderr, "%s: %d: \\%s: invalid maximum number %s\n", filename, lineno,
commands[i]->argv[0],commands[i]->argv[3]);
+ goto error;
+ }
+ }
+ else
+ {
+ fprintf(stderr, "%s: %d: invalid command \\%s\n", filename, lineno, commands[i]->argv[0]);
+ goto error;
+ }
+ }
+ else
+ {
+ commands[i]->type = SQL_COMMAND;
+
+ if ((commands[i]->argv = malloc(sizeof(char *))) == NULL)
+ goto error;
+
+ if ((commands[i]->argv[0] = strdup(p)) == NULL)
+ goto error;
+
+ commands[i]->argc++;
+ }
+
+ i++;
+ lineno++;
+ }
+ fclose(fd);
+
+ if ((tmp = realloc(commands, sizeof(Command *) * (i + 1))) == NULL)
+ goto error;
+ commands = tmp;
+
+ commands[i] = NULL;
+
+ return true;
+
+ error:
+ if (errno == ENOMEM)
+ fprintf(stderr, "%s: %d: out of memory\n", filename, lineno);
+
+ fclose(fd);
+
+ if (commands == NULL)
+ return false;
+
+ while (i >= 0)
+ {
+ if (commands[i] != NULL)
+ {
+ for (j = 0; j < commands[i]->argc; j++)
+ free(commands[i]->argv[j]);
+
+ free(commands[i]->argv);
+ free(commands[i]);
+ }
+
+ i--;
+ }
+ free(commands);
+
+ return false;
+ }
+ /* print out results */ static void printResults(
***************
*** 670,677 **** s = "TPC-B (sort of)"; else if (ttype == 2) s = "Update only accounts";
! else s = "SELECT only"; printf("transaction type: %s\n", s); printf("scaling factor: %d\n",
tps);
--- 1119,1128 ---- s = "TPC-B (sort of)"; else if (ttype == 2) s = "Update only accounts";
! else if (ttype == 1) s = "SELECT only";
+ else
+ s = "Custom query"; printf("transaction type: %s\n", s); printf("scaling factor: %d\n", tps);
***************
*** 695,700 ****
--- 1146,1152 ---- int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT
* only, 2: skip update of branches and * tellers */
+ char *filename = NULL; static CState *state; /* status of clients */
***************
*** 724,730 **** else if ((env = getenv("PGUSER")) != NULL && *env != '\0') login = env;
! while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1) { switch (c) {
--- 1176,1182 ---- else if ((env = getenv("PGUSER")) != NULL && *env != '\0') login = env;
! while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:")) != -1) { switch (c) {
***************
*** 806,811 ****
--- 1258,1267 ---- case 'l': use_log = true; break;
+ case 'f':
+ ttype = 3;
+ filename = optarg;
+ break; default: usage(); exit(1);
***************
*** 868,941 **** exit(1); }
! /*
! * get the scaling factor that should be same as count(*) from
! * branches...
! */
! res = PQexec(con, "select count(*) from branches");
! if (PQresultStatus(res) != PGRES_TUPLES_OK) {
! fprintf(stderr, "%s", PQerrorMessage(con));
! exit(1);
! }
! tps = atoi(PQgetvalue(res, 0, 0));
! if (tps < 0)
! {
! fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
! exit(1); }
! PQclear(res);
!
! if (!is_no_vacuum) {
! fprintf(stderr, "starting vacuum...");
! res = PQexec(con, "vacuum branches");
! if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, "%s",
PQerrorMessage(con)); exit(1); }
! PQclear(res);
!
! res = PQexec(con, "vacuum tellers");
! if (PQresultStatus(res) != PGRES_COMMAND_OK) {
! fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } PQclear(res);
! res = PQexec(con, "delete from history");
! if (PQresultStatus(res) != PGRES_COMMAND_OK) {
! fprintf(stderr, "%s", PQerrorMessage(con));
! exit(1);
! }
! PQclear(res);
! res = PQexec(con, "vacuum history");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! fprintf(stderr, "%s", PQerrorMessage(con));
! exit(1);
! }
! PQclear(res);
! fprintf(stderr, "end.\n");
! if (is_full_vacuum)
! {
! fprintf(stderr, "starting full vacuum...");
! res = PQexec(con, "vacuum analyze accounts"); if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); }
PQclear(res); fprintf(stderr, "end.\n"); } }
- PQfinish(con); /* set random seed */ gettimeofday(&tv1, NULL);
--- 1324,1406 ---- exit(1); }
! if (ttype == 3) {
! PQfinish(con);
! if (process_file(filename) == false)
! exit(1); }
! else {
! /*
! * get the scaling factor that should be same as count(*) from
! * branches...
! */
! res = PQexec(con, "select count(*) from branches");
! if (PQresultStatus(res) != PGRES_TUPLES_OK) { fprintf(stderr, "%s", PQerrorMessage(con));
exit(1); }
! tps = atoi(PQgetvalue(res, 0, 0));
! if (tps < 0) {
! fprintf(stderr, "count(*) from branches invalid (%d)\n", tps); exit(1); }
PQclear(res);
! if (!is_no_vacuum) {
! fprintf(stderr, "starting vacuum...");
! res = PQexec(con, "vacuum branches");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! fprintf(stderr, "%s", PQerrorMessage(con));
! exit(1);
! }
! PQclear(res);
! res = PQexec(con, "vacuum tellers");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! fprintf(stderr, "%s", PQerrorMessage(con));
! exit(1);
! }
! PQclear(res);
! res = PQexec(con, "delete from history");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! fprintf(stderr, "%s", PQerrorMessage(con));
! exit(1);
! }
! PQclear(res);
! res = PQexec(con, "vacuum history"); if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); }
PQclear(res);
+ fprintf(stderr, "end.\n");
+
+ if (is_full_vacuum)
+ {
+ fprintf(stderr, "starting full vacuum...");
+ res = PQexec(con, "vacuum analyze accounts");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, "%s", PQerrorMessage(con));
+ exit(1);
+ }
+ PQclear(res);
+ fprintf(stderr, "end.\n");
+ } }
+ PQfinish(con); } /* set random seed */ gettimeofday(&tv1, NULL);
***************
*** 965,970 ****
--- 1430,1437 ---- doOne(state, i, debug, ttype); else if (ttype == 1)
doSelectOnly(state,i, debug);
+ else if (ttype == 3)
+ doCustom(state, i, debug); } for (;;)
***************
*** 982,997 **** FD_ZERO(&input_mask);
! maxsock = 0; for (i = 0; i < nclients; i++) {
! if (state[i].con) { int sock = PQsocket(state[i].con);
if (sock < 0) {
- fprintf(stderr, "Client %d: PQsocket failed\n", i); disconnect_all(state);
exit(1); }
--- 1449,1464 ---- FD_ZERO(&input_mask);
! maxsock = -1; for (i = 0; i < nclients; i++) {
! if (state[i].con &&
! (ttype != 3 || commands[state[i].state]->type != META_COMMAND)) { int
sock = PQsocket(state[i].con); if (sock < 0) {
disconnect_all(state); exit(1); }
***************
*** 1001,1036 **** } }
! if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
! (fd_set *) NULL, (struct timeval *) NULL)) < 0) {
! if (errno == EINTR)
! continue;
! /* must be something wrong */
! disconnect_all(state);
! fprintf(stderr, "select failed: %s\n", strerror(errno));
! exit(1);
! }
! else if (nsocks == 0)
! { /* timeout */
! fprintf(stderr, "select timeout\n");
! for (i = 0; i < nclients; i++) {
! fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
! i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen); }
- exit(0); } /* ok, backend returns reply */ for (i = 0; i < nclients; i++)
{
! if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask)) { if
(ttype== 0 || ttype == 2) doOne(state, i, debug, ttype); else if (ttype == 1)
doSelectOnly(state, i, debug); } } }
--- 1468,1510 ---- } }
! if (maxsock != -1) {
! if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
! (fd_set *) NULL, (struct timeval *) NULL)) < 0) {
! if (errno == EINTR)
! continue;
! /* must be something wrong */
! disconnect_all(state);
! fprintf(stderr, "select failed: %s\n", strerror(errno));
! exit(1);
! }
! else if (nsocks == 0)
! { /* timeout */
! fprintf(stderr, "select timeout\n");
! for (i = 0; i < nclients; i++)
! {
! fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
! i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
! }
! exit(0); } } /* ok, backend returns reply */ for (i = 0; i <
nclients;i++) {
! if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
! || (ttype == 3
! && commands[state[i].state]->type == META_COMMAND))) {
if (ttype == 0 || ttype == 2) doOne(state, i, debug, ttype); else if (ttype == 1)
doSelectOnly(state, i, debug);
+ else if (ttype == 3)
+ doCustom(state, i, debug); } } }