diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 3df86d1..1f36ffb 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -15,6 +15,8 @@ #include "postgres_fdw.h" #include "access/xact.h" +#include "access/xtm.h" +#include "access/transam.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "utils/hsearch.h" @@ -61,11 +63,17 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +typedef long long csn_t; +static csn_t currentGlobalTransactionId = 0; +static int currentLocalTransactionId = 0; + /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); +static void do_sql_send_command(PGconn *conn, const char *sql); +static void do_sql_wait_command(PGconn *conn, const char *sql); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -357,6 +365,32 @@ do_sql_command(PGconn *conn, const char *sql) PQclear(res); } +static void +do_sql_send_command(PGconn *conn, const char *sql) +{ + if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) + { + PGresult *res = PQgetResult(conn); + + elog(WARNING, "Failed to send command %s", sql); + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); + } +} + +static void +do_sql_wait_command(PGconn *conn, const char *sql) +{ + PGresult *res; + + while ((res = PQgetResult(conn)) != NULL) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); + } +} + /* * Start remote transaction or subtransaction, if needed. * @@ -375,17 +409,58 @@ begin_remote_xact(ConnCacheEntry *entry) /* Start main transaction if we haven't yet */ if (entry->xact_depth <= 0) { + TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId(); const char *sql; elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + if (TransactionIdIsValid(gxid)) + { + char stmt[64]; + PGresult *res; + + snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid); + res = PQexec(entry->conn, stmt); + PQclear(res); + } + if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; do_sql_command(entry->conn, sql); entry->xact_depth = 1; + if (UseTsDtmTransactions) + { + if (!currentGlobalTransactionId) + { + PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_extend('%d.%d')", + MyProcPid, ++currentLocalTransactionId)); + char *resp; + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pgfdw_report_error(ERROR, res, entry->conn, true, sql); + } + resp = PQgetvalue(res, 0, 0); + if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", ¤tGlobalTransactionId) != 1) + { + pgfdw_report_error(ERROR, res, entry->conn, true, sql); + } + PQclear(res); + } + else + { + PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pgfdw_report_error(ERROR, res, entry->conn, true, sql); + } + PQclear(res); + } + } } /* @@ -511,6 +586,78 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, PQclear(res); } +typedef bool (*DtmCommandResultHandler) (PGresult *result, void *arg); + +static bool +RunDtmStatement(char const * sql, unsigned expectedStatus, DtmCommandResultHandler handler, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + bool allOk = true; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->xact_depth > 0) + { + do_sql_send_command(entry->conn, sql); + } + } + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->xact_depth > 0) + { + PGresult *result = PQgetResult(entry->conn); + + if (PQresultStatus(result) != expectedStatus || (handler && !handler(result, arg))) + { + elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus); + pgfdw_report_error(ERROR, result, entry->conn, true, sql); + allOk = false; + } + PQclear(result); + PQgetResult(entry->conn); /* consume NULL result */ + } + } + return allOk; +} + +static bool +RunDtmCommand(char const * sql) +{ + return RunDtmStatement(sql, PGRES_COMMAND_OK, NULL, NULL); +} + +static bool +RunDtmFunction(char const * sql) +{ + return RunDtmStatement(sql, PGRES_TUPLES_OK, NULL, NULL); +} + + +static bool +DtmMaxCSN(PGresult *result, void *arg) +{ + char *resp = PQgetvalue(result, 0, 0); + csn_t *maxCSN = (csn_t *) arg; + csn_t csn = 0; + + if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &csn) != 1) + { + return false; + } + else + { + if (*maxCSN < csn) + { + *maxCSN = csn; + } + return true; + } +} + /* * pgfdw_xact_callback --- cleanup at main-transaction end. */ @@ -524,6 +671,40 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (!xact_got_connection) return; + if (currentGlobalTransactionId != 0) + { + switch (event) + { + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_COMMIT: + { + csn_t maxCSN = 0; + + if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'", + MyProcPid, currentLocalTransactionId)) || + !RunDtmFunction(psprintf("SELECT public.dtm_begin_prepare('%d.%d')", + MyProcPid, currentLocalTransactionId)) || + !RunDtmStatement(psprintf("SELECT public.dtm_prepare('%d.%d',0)", + MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) || + !RunDtmFunction(psprintf("SELECT public.dtm_end_prepare('%d.%d',%lld)", + MyProcPid, currentLocalTransactionId, maxCSN)) || + !RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'", + MyProcPid, currentLocalTransactionId))) + { + RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'", + MyProcPid, currentLocalTransactionId)); + ereport(ERROR, + (errcode(ERRCODE_TRANSACTION_ROLLBACK), + errmsg("transaction was aborted at one of the shards"))); + break; + } + return; + } + default: + break; + } + } + /* * Scan all connection cache entries to find open remote transactions, and * close them. @@ -540,15 +721,40 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + elog(DEBUG3, "closing remote transaction on connection %p event %d", + entry->conn, event); switch (event) { case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ - do_sql_command(entry->conn, "COMMIT TRANSACTION"); + do_sql_send_command(entry->conn, "COMMIT TRANSACTION"); + continue; + + case XACT_EVENT_PRE_PREPARE: + + /* + * We disallow remote transactions that modified anything, + * since it's not very reasonable to hold them open until + * the prepared transaction is committed. For the moment, + * throw error unconditionally; later we might allow + * read-only cases. Note that the error will cause us to + * come right back here with event == XACT_EVENT_ABORT, so + * we'll clean up the connection state at that point. + */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare a transaction that modified remote tables"))); + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_COMMIT: + case XACT_EVENT_PREPARE: + if (!currentGlobalTransactionId) + { + do_sql_wait_command(entry->conn, "COMMIT TRANSACTION"); + } /* * If there were any errors in subtransactions, and we @@ -573,27 +779,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) entry->have_prep_stmt = false; entry->have_error = false; break; - case XACT_EVENT_PRE_PREPARE: - /* - * We disallow remote transactions that modified anything, - * since it's not very reasonable to hold them open until - * the prepared transaction is committed. For the moment, - * throw error unconditionally; later we might allow - * read-only cases. Note that the error will cause us to - * come right back here with event == XACT_EVENT_ABORT, so - * we'll clean up the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified remote tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ @@ -617,6 +803,11 @@ pgfdw_xact_callback(XactEvent event, void *arg) entry->have_error = false; } break; + + case XACT_EVENT_START: + case XACT_EVENT_ABORT_PREPARED: + case XACT_EVENT_COMMIT_PREPARED: + break; } } @@ -630,21 +821,26 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (PQstatus(entry->conn) != CONNECTION_OK || PQtransactionStatus(entry->conn) != PQTRANS_IDLE) { - elog(DEBUG3, "discarding connection %p", entry->conn); + elog(WARNING, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn)); PQfinish(entry->conn); entry->conn = NULL; } } + if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) + { + /* + * Regardless of the event type, we can now mark ourselves as out of + * the transaction. (Note: if we are here during PRE_COMMIT or + * PRE_PREPARE, this saves a useless scan of the hashtable during + * COMMIT or PREPARE.) + */ + xact_got_connection = false; - /* - * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) - */ - xact_got_connection = false; + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; - /* Also reset cursor numbering for next transaction */ - cursor_number = 0; + currentGlobalTransactionId = 0; + } } /* diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql index a0f0fc1..0ce8f0e 100644 --- a/contrib/postgres_fdw/postgres_fdw--1.0.sql +++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql @@ -16,3 +16,8 @@ LANGUAGE C STRICT; CREATE FOREIGN DATA WRAPPER postgres_fdw HANDLER postgres_fdw_handler VALIDATOR postgres_fdw_validator; + +CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index d5a2af9..08b28b6 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -193,10 +193,14 @@ typedef struct List *already_used; /* expressions already dealt with */ } ec_member_foreign_arg; +bool UseTsDtmTransactions; +void _PG_init(void); + /* * SQL functions */ PG_FUNCTION_INFO_V1(postgres_fdw_handler); +PG_FUNCTION_INFO_V1(postgres_fdw_exec); /* * FDW callback routines @@ -3214,3 +3218,29 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel) /* We didn't find any suitable equivalence class expression */ return NULL; } + +Datum +postgres_fdw_exec(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + char const *sql = PG_GETARG_CSTRING(1); + Oid userid = GetUserId(); + ForeignTable *table = GetForeignTable(relid); + ForeignServer *server = GetForeignServer(table->serverid); + UserMapping *user = GetUserMapping(userid, server->serverid); + PGconn *conn = GetConnection(server, user, false); + PGresult *res = PQexec(conn, sql); + + PQclear(res); + ReleaseConnection(conn); + PG_RETURN_VOID(); +} + +void +_PG_init(void) +{ + DefineCustomBoolVariable("postgres_fdw.use_tsdtm", + "Use timestamp base distributed transaction manager for FDW connections", NULL, + &UseTsDtmTransactions, false, PGC_USERSET, 0, NULL, + NULL, NULL); +} diff --git a/contrib/postgres_fdw/tests/dtmbench.cpp b/contrib/postgres_fdw/tests/dtmbench.cpp new file mode 100644 index 0000000..c8e7d72 --- /dev/null +++ b/contrib/postgres_fdw/tests/dtmbench.cpp @@ -0,0 +1,275 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +using namespace std; +using namespace pqxx; + +typedef void* (*thread_proc_t)(void*); +typedef uint32_t xid_t; + +struct thread +{ + pthread_t t; + size_t transactions; + size_t updates; + size_t selects; + size_t aborts; + int id; + + void start(int tid, thread_proc_t proc) { + id = tid; + updates = 0; + selects = 0; + aborts = 0; + transactions = 0; + pthread_create(&t, NULL, proc, this); + } + + void wait() { + pthread_join(t, NULL); + } +}; + +struct config +{ + int nReaders; + int nWriters; + int nIterations; + int nAccounts; + int updatePercent; + int nShards; + string connection; + + config() { + nShards = 1; + nReaders = 1; + nWriters = 10; + nIterations = 1000; + nAccounts = 10000; + updatePercent = 100; + } +}; + +config cfg; +bool running; + +#define USEC 1000000 + +static time_t getCurrentTime() +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (time_t)tv.tv_sec*USEC + tv.tv_usec; +} + + +void exec(transaction_base& txn, char const* sql, ...) +{ + va_list args; + va_start(args, sql); + char buf[1024]; + vsprintf(buf, sql, args); + va_end(args); + txn.exec(buf); +} + +template +T execQuery( transaction_base& txn, char const* sql, ...) +{ + va_list args; + va_start(args, sql); + char buf[1024]; + vsprintf(buf, sql, args); + va_end(args); + result r = txn.exec(buf); + return r[0][0].as(T()); +} + +void* reader(void* arg) +{ + thread& t = *(thread*)arg; + connection conn(cfg.connection); + int64_t prevSum = 0; + + while (running) { + work txn(conn); + result r = txn.exec("select sum(v) from t"); + int64_t sum = r[0][0].as(int64_t()); + if (sum != prevSum) { + printf("Total=%ld\n", sum); + prevSum = sum; + } + t.transactions += 1; + t.selects += 1; + txn.commit(); + } + return NULL; +} + +void* writer(void* arg) +{ + thread& t = *(thread*)arg; + connection conn(cfg.connection); + for (int i = 0; i < cfg.nIterations; i++) + { + work txn(conn); + int srcAcc = random() % cfg.nAccounts; + int dstAcc = random() % cfg.nAccounts; + try { + if (random() % 100 < cfg.updatePercent) { + exec(txn, "update t set v = v - 1 where u=%d", srcAcc); + exec(txn, "update t set v = v + 1 where u=%d", dstAcc); + t.updates += 2; + } else { + int64_t sum = execQuery(txn, "select v from t where u=%d", srcAcc) + + execQuery(txn, "select v from t where u=%d", dstAcc); + if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) { + printf("Wrong sum=%ld\n", sum); + } + t.selects += 2; + } + txn.commit(); + t.transactions += 1; + } catch (pqxx_exception const& x) { + txn.abort(); + t.aborts += 1; + i -= 1; + continue; + } + } + return NULL; +} + +void initializeDatabase() +{ + connection conn(cfg.connection); + int accountsPerShard = (cfg.nAccounts + cfg.nShards - 1)/cfg.nShards; + for (int i = 0; i < cfg.nShards; i++) + { + work txn(conn); + exec(txn, "alter table t_fdw%i add check (u between %d and %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1); + exec(txn, "insert into t_fdw%i (select generate_series(%d,%d), %d)", i+1, accountsPerShard*i, accountsPerShard*(i+1)-1, 0); + txn.commit(); + } +} + +int main (int argc, char* argv[]) +{ + bool initialize = false; + + if (argc == 1){ + printf("Use -h to show usage options\n"); + return 1; + } + + for (int i = 1; i < argc; i++) { + if (argv[i][0] == '-') { + switch (argv[i][1]) { + case 'r': + cfg.nReaders = atoi(argv[++i]); + continue; + case 'w': + cfg.nWriters = atoi(argv[++i]); + continue; + case 'a': + cfg.nAccounts = atoi(argv[++i]); + continue; + case 'n': + cfg.nIterations = atoi(argv[++i]); + continue; + case 'p': + cfg.updatePercent = atoi(argv[++i]); + continue; + case 'c': + cfg.connection = string(argv[++i]); + continue; + case 'i': + initialize = true; + cfg.nShards = atoi(argv[++i]); + continue; + } + } + printf("Options:\n" + "\t-r N\tnumber of readers (1)\n" + "\t-w N\tnumber of writers (10)\n" + "\t-a N\tnumber of accounts (100000)\n" + "\t-n N\tnumber of iterations (1000)\n" + "\t-p N\tupdate percent (100)\n" + "\t-c STR\tdatabase connection string\n" + "\t-i N\tinitialize N shards\n"); + return 1; + } + + if (initialize) { + initializeDatabase(); + printf("%d accounts inserted\n", cfg.nAccounts); + return 0; + } + + time_t start = getCurrentTime(); + running = true; + + vector readers(cfg.nReaders); + vector writers(cfg.nWriters); + size_t nAborts = 0; + size_t nUpdates = 0; + size_t nSelects = 0; + size_t nTransactions = 0; + + for (int i = 0; i < cfg.nReaders; i++) { + readers[i].start(i, reader); + } + for (int i = 0; i < cfg.nWriters; i++) { + writers[i].start(i, writer); + } + + for (int i = 0; i < cfg.nWriters; i++) { + writers[i].wait(); + nUpdates += writers[i].updates; + nSelects += writers[i].selects; + nAborts += writers[i].aborts; + nTransactions += writers[i].transactions; + } + + running = false; + + for (int i = 0; i < cfg.nReaders; i++) { + readers[i].wait(); + nSelects += readers[i].selects; + nTransactions += writers[i].transactions; + } + + time_t elapsed = getCurrentTime() - start; + + printf( + "{\"tps\":%f, \"transactions\":%ld," + " \"selects\":%ld, \"updates\":%ld, \"aborts\":%ld, \"abort_percent\": %d," + " \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d, \"iterations\":%d ,\"shards\":%d}\n", + (double)(nTransactions*USEC)/elapsed, + nTransactions, + nSelects, + nUpdates, + nAborts, + (int)(nAborts*100/nTransactions), + cfg.nReaders, + cfg.nWriters, + cfg.updatePercent, + cfg.nAccounts, + cfg.nIterations, + cfg.nShards); + + return 0; +} diff --git a/contrib/postgres_fdw/tests/makefile b/contrib/postgres_fdw/tests/makefile new file mode 100644 index 0000000..766d99f --- /dev/null +++ b/contrib/postgres_fdw/tests/makefile @@ -0,0 +1,10 @@ +CXX=g++ +CXXFLAGS=-g -Wall -O2 -pthread + +all: dtmbench + +dtmbench: dtmbench.cpp + $(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx + +clean: + rm -f dtmbench \ No newline at end of file