diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 0b6a109..ce8653e 100644 *** a/doc/src/sgml/func.sgml --- b/doc/src/sgml/func.sgml *************** FOR EACH ROW EXECUTE PROCEDURE suppress_ *** 15023,15026 **** --- 15023,15100 ---- . + + + Snapshot Synchronization Functions + + + pg_export_snapshot + + + + PostgreSQL allows different sessions to synchronize their + snapshots. A database snapshot determines which data is visible to + the client that is using this snapshot. Synchronized snapshots are necessary when + two clients need to see the same content in the database. If these two clients + just connected to the database and opened their transactions, then they could + never be sure that there was no data modification right between both + connections. + + + As a solution, PostgreSQL offers the function + pg_export_snapshot which saves the snapshot internally and + from then on until the end of the saving transaction, the snapshot can be + copied into a a new transaction with the + command to open a second transaction with the + exact same snapshot. Now both transactions are guaranteed to see the exact same + data even though they might have connected at different times. + + + Note that a snapshot can only be used to start a new transaction as long + as the transaction that originally saved it is held open. Also note that even + after the synchronization both clients still run their own independent + transactions. As a consequence, even though synchronized with respect to + reading pre-existing data, both transactions won't be able to see each other's + uncommitted data. + + + Snapshot Synchronization Functions + + + Name Return Type Description + + + + + + + pg_export_snapshot() + + text + Save the snapshot and return its identifier + + + +
+ + + The function pg_export_snapshot does not take an argument + and returns the snapshot's identifier as text data. Internally the + function will save the snapshot data to a file so that it can be retrieved + from a different backend process later on. Note that as soon as the + transaction ends, any saved snapshots become invalid and their + identifiers cannot be used in other transactions anymore. If the function + has been executed, the transaction cannot be prepared anymore with . + + + SELECT pg_export_snapshot(); + + pg_export_snapshot + -------------------- + 000003A1-1 + (1 row) + +
+ diff --git a/doc/src/sgml/ref/set_transaction.sgml b/doc/src/sgml/ref/set_transaction.sgml index e28a7e1..059b94b 100644 *** a/doc/src/sgml/ref/set_transaction.sgml --- b/doc/src/sgml/ref/set_transaction.sgml *************** SET SESSION CHARACTERISTICS AS TRANSACTI *** 40,45 **** --- 40,46 ---- ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED } READ WRITE | READ ONLY [ NOT ] DEFERRABLE + SNAPSHOT snapshot-id *************** SET SESSION CHARACTERISTICS AS TRANSACTI *** 146,151 **** --- 147,167 ---- contributing to or being canceled by a serialization failure. This mode is well suited for long-running reports or backups. + + + The SNAPSHOT property allows a new transaction to run + with the same snapshot as an already running transaction. A call to + pg_export_snapshot (see ) must have been executed in the other + transaction. This function returns a snapshot id which must be passed as a + parameter to this command to create a second transaction running with the same + snapshot. You also need to make the transaction ISOLATION LEVEL + SERIALIZABLE or ISOLATION LEVEL REPEATABLE READ. + If the transaction has already executed a query, started a subtransaction or + assigned a snapshot, no further snapshot assignment is possible in this + transaction. + + *************** SET SESSION CHARACTERISTICS AS TRANSACTI *** 178,183 **** --- 194,225 ---- + + Examples + + + To begin a new transaction block with the same snapshot as an already + existing transaction, first export the snapshot from the existing + transaction. This will return the snapshot id: + + + # SELECT pg_export_snapshot(); + pg_export_snapshot + -------------------- + 000003A1-1 + (1 row) + + + Then reference this snapshot id as the first command in the newly opened + transaction: + + + BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; + SET TRANSACTION SNAPSHOT '000003A1-1'; + + + + Compatibility *************** SET SESSION CHARACTERISTICS AS TRANSACTI *** 198,206 **** ! The DEFERRABLE transaction_mode ! is a PostgreSQL language extension. --- 240,250 ---- ! DEFERRABLE transaction_mode ! and SNAPSHOT snapshot-id are ! PostgreSQL language extensions. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 3dab45c..7ce516e 100644 *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** CommitTransaction(void) *** 1852,1857 **** --- 1852,1863 ---- */ PreCommit_Notify(); + /* + * Cleans up exported snapshots (this needs to happen before we update + * our MyProc entry). + */ + InvalidateExportedSnapshots(); + /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); *************** PrepareTransaction(void) *** 2067,2072 **** --- 2073,2083 ---- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE a transaction that has operated on temporary tables"))); + if (exportedSnapshots) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has exported snapshots"))); + /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); *************** AbortTransaction(void) *** 2228,2233 **** --- 2239,2247 ---- /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); + /* Invalidate any exported snapshots */ + InvalidateExportedSnapshots(); + /* Make sure we have a valid memory context and resource owner */ AtAbort_Memory(); AtAbort_ResourceOwner(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8f65ddc..331a564 100644 *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 58,63 **** --- 58,64 ---- #include "utils/guc.h" #include "utils/ps_status.h" #include "utils/relmapper.h" + #include "utils/snapmgr.h" #include "utils/timestamp.h" #include "pg_trace.h" *************** StartupXLOG(void) *** 6373,6378 **** --- 6374,6384 ---- CheckRequiredParameterValues(); /* + * We can delete any saved transaction snapshots that still exist + */ + DeleteAllExportedSnapshotFiles(); + + /* * We're in recovery, so unlogged relations relations may be trashed * and must be reset. This should be done BEFORE allowing Hot Standby * connections, so that read-only backends don't try to read whatever diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index e9f3896..e1ab161 100644 *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** static void processCASbits(int cas_bits, *** 553,560 **** SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE ! SHOW SIMILAR SIMPLE SMALLINT SOME STABLE STANDALONE_P START STATEMENT ! STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SYMMETRIC SYSID SYSTEM_P TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN TIME TIMESTAMP --- 553,560 ---- SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE ! SHOW SIMILAR SIMPLE SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START ! STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SYMMETRIC SYSID SYSTEM_P TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN TIME TIMESTAMP *************** set_rest: /* Generic SET syntaxes: */ *** 1286,1291 **** --- 1286,1299 ---- n->args = $2; $$ = n; } + | TRANSACTION SNAPSHOT Sconst + { + VariableSetStmt *n = makeNode(VariableSetStmt); + n->kind = VAR_SET_VALUE; + n->name = "TRANSACTION SNAPSHOT"; + n->args = list_make1(makeStringConst($3, @3)); + $$ = n; + } | SESSION CHARACTERISTICS AS TRANSACTION transaction_mode_list { VariableSetStmt *n = makeNode(VariableSetStmt); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 9489012..b665d75 100644 *** a/src/backend/storage/ipc/procarray.c --- b/src/backend/storage/ipc/procarray.c *************** ProcArrayShmemSize(void) *** 166,174 **** { Size size; - /* Size of the ProcArray structure itself */ - #define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) - size = offsetof(ProcArrayStruct, procs); size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS)); --- 166,171 ---- *************** ProcArrayShmemSize(void) *** 179,192 **** * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the * main structures created in those functions must be identically sized, * since we may at times copy the whole of the data structures around. We ! * refer to this size as TOTAL_MAX_CACHED_SUBXIDS. * * Ideally we'd only create this structure if we were actually doing hot * standby in the current run, but we don't know that yet at the time * shared memory is being set up. */ - #define TOTAL_MAX_CACHED_SUBXIDS \ - ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS) if (EnableHotStandby) { --- 176,187 ---- * TransactionIdIsInProgress() and GetRunningTransactionData(). All of the * main structures created in those functions must be identically sized, * since we may at times copy the whole of the data structures around. We ! * refer to this size as TOTAL_MAX_CACHED_SUBXIDS, defined in procarray.h. * * Ideally we'd only create this structure if we were actually doing hot * standby in the current run, but we don't know that yet at the time * shared memory is being set up. */ if (EnableHotStandby) { *************** GetOldestXmin(bool allDbs, bool ignoreVa *** 1144,1150 **** * not statically allocated (see xip allocation below). */ Snapshot ! GetSnapshotData(Snapshot snapshot) { ProcArrayStruct *arrayP = procArray; TransactionId xmin; --- 1139,1145 ---- * not statically allocated (see xip allocation below). */ Snapshot ! GetSnapshotData(Snapshot snapshot, Snapshot stemplate) { ProcArrayStruct *arrayP = procArray; TransactionId xmin; *************** GetSnapshotData(Snapshot snapshot) *** 1158,1163 **** --- 1153,1181 ---- Assert(snapshot != NULL); /* + * We only get a valid snapshot in stemplate if the snapshot + * synchronization feature used. In that case we just need to copy the + * values that we get onto the snapshot we return. + * Note that in this case we always duplicate an existing snapshot, that is + * currently held by another active transaction. That's why we do not need + * to update any { RecentGlobalXmin, RecentXmin, globalxmin }. + */ + if (stemplate != InvalidSnapshot) + { + /* + * 'stemplate' is only read and its values are copied onto 'snapshot'. + */ + CopySnapshotOnto(stemplate, snapshot); + + /* + * We can use the result of the copy except for that this snapshot + * should look like new and not copied. + */ + snapshot->copied = false; + return snapshot; + } + + /* * Allocating space for maxProcs xids is usually overkill; numProcs would * be sufficient. But it seems better to do the malloc while not holding * the lock, so we can't look at numProcs. Likewise, we allocate much diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index d39f897..462cc59 100644 *** a/src/backend/storage/lmgr/predicate.c --- b/src/backend/storage/lmgr/predicate.c *************** static void OldSerXidSetActiveSerXmin(Tr *** 416,423 **** static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); ! static Snapshot GetSafeSnapshot(Snapshot snapshot); ! static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); --- 416,423 ---- static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); ! static Snapshot GetSafeSnapshot(Snapshot snapshot, Snapshot stemplate); ! static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, Snapshot stemplate); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); *************** SummarizeOldestCommittedSxact(void) *** 1491,1497 **** * area that can safely be passed to GetSnapshotData. */ static Snapshot ! GetSafeSnapshot(Snapshot origSnapshot) { Snapshot snapshot; --- 1491,1497 ---- * area that can safely be passed to GetSnapshotData. */ static Snapshot ! GetSafeSnapshot(Snapshot origSnapshot, Snapshot stemplate) { Snapshot snapshot; *************** GetSafeSnapshot(Snapshot origSnapshot) *** 1505,1511 **** * our caller passed to us. The pointer returned is actually the same * one passed to it, but we avoid assuming that here. */ ! snapshot = GetSerializableTransactionSnapshotInt(origSnapshot); if (MySerializableXact == InvalidSerializableXact) return snapshot; /* no concurrent r/w xacts; it's safe */ --- 1505,1511 ---- * our caller passed to us. The pointer returned is actually the same * one passed to it, but we avoid assuming that here. */ ! snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, stemplate); if (MySerializableXact == InvalidSerializableXact) return snapshot; /* no concurrent r/w xacts; it's safe */ *************** GetSafeSnapshot(Snapshot origSnapshot) *** 1562,1568 **** * within this function. */ Snapshot ! GetSerializableTransactionSnapshot(Snapshot snapshot) { Assert(IsolationIsSerializable()); --- 1562,1568 ---- * within this function. */ Snapshot ! GetSerializableTransactionSnapshot(Snapshot snapshot, Snapshot stemplate) { Assert(IsolationIsSerializable()); *************** GetSerializableTransactionSnapshot(Snaps *** 1572,1584 **** * thereby avoid all SSI overhead once it's running. */ if (XactReadOnly && XactDeferrable) ! return GetSafeSnapshot(snapshot); ! return GetSerializableTransactionSnapshotInt(snapshot); } static Snapshot ! GetSerializableTransactionSnapshotInt(Snapshot snapshot) { PGPROC *proc; VirtualTransactionId vxid; --- 1572,1584 ---- * thereby avoid all SSI overhead once it's running. */ if (XactReadOnly && XactDeferrable) ! return GetSafeSnapshot(snapshot, stemplate); ! return GetSerializableTransactionSnapshotInt(snapshot, stemplate); } static Snapshot ! GetSerializableTransactionSnapshotInt(Snapshot snapshot, Snapshot stemplate) { PGPROC *proc; VirtualTransactionId vxid; *************** GetSerializableTransactionSnapshotInt(Sn *** 1616,1622 **** } while (!sxact); /* Get the snapshot */ ! snapshot = GetSnapshotData(snapshot); /* * If there are no serializable transactions which are not read-only, we --- 1616,1622 ---- } while (!sxact); /* Get the snapshot */ ! snapshot = GetSnapshotData(snapshot, stemplate); /* * If there are no serializable transactions which are not read-only, we diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a71729c..9ffe3a1 100644 *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 72,77 **** --- 72,78 ---- #include "utils/plancache.h" #include "utils/portal.h" #include "utils/ps_status.h" + #include "utils/snapmgr.h" #include "utils/tzparser.h" #include "utils/xml.h" *************** ExecSetVariableStmt(VariableSetStmt *stm *** 6094,6099 **** --- 6095,6109 ---- switch (stmt->kind) { case VAR_SET_VALUE: + if (strcmp(stmt->name, "TRANSACTION SNAPSHOT") == 0) + { + if (!ImportSnapshot(ExtractSetVariableArgs(stmt))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not import the requested snapshot"))); + break; + } + /* fallthrough */ case VAR_SET_CURRENT: set_config_option(stmt->name, ExtractSetVariableArgs(stmt), diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 518aaf1..d047ba1 100644 *** a/src/backend/utils/time/snapmgr.c --- b/src/backend/utils/time/snapmgr.c *************** *** 33,44 **** */ #include "postgres.h" #include "access/transam.h" #include "access/xact.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" ! #include "utils/memutils.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/tqual.h" --- 33,50 ---- */ #include "postgres.h" + #include + #include + #include + #include "access/transam.h" #include "access/xact.h" + #include "miscadmin.h" + #include "storage/fd.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" ! #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/tqual.h" *************** static Snapshot CopySnapshot(Snapshot sn *** 116,133 **** static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); /* ! * GetTransactionSnapshot * Get the appropriate snapshot for a new query in a transaction. * ! * Note that the return value may point at static storage that will be modified ! * by future calls and by CommandCounterIncrement(). Callers should call ! * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be ! * used very long. */ ! Snapshot ! GetTransactionSnapshot(void) { /* First call in transaction? */ if (!FirstSnapshotSet) --- 122,145 ---- static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); + /* What we need for exporting snapshots */ + #define SNAPSHOT_EXPORT_DIR "pg_snapshots" + #define XactExportFilePath(path, xid, num, suffix) \ + snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%08X-%d%s", xid, num, suffix) + + List *exportedSnapshots = NIL; /* ! * GetTransactionSnapshotFromTemplate * Get the appropriate snapshot for a new query in a transaction. * ! * A template snapshot is passed for the synchronized snapshots feature. ! * In that case we want to have a snapshot back that has the template's ! * values. We just pass it along and the lower level functions take care ! * of it. */ ! static Snapshot ! GetTransactionSnapshotFromTemplate(Snapshot stemplate) { /* First call in transaction? */ if (!FirstSnapshotSet) *************** GetTransactionSnapshot(void) *** 145,153 **** { /* First, create the snapshot in CurrentSnapshotData */ if (IsolationIsSerializable()) ! CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData); else ! CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData); /* Make a saved copy */ CurrentSnapshot = CopySnapshot(CurrentSnapshot); FirstXactSnapshot = CurrentSnapshot; --- 157,166 ---- { /* First, create the snapshot in CurrentSnapshotData */ if (IsolationIsSerializable()) ! CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData, ! stemplate); else ! CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, stemplate); /* Make a saved copy */ CurrentSnapshot = CopySnapshot(CurrentSnapshot); FirstXactSnapshot = CurrentSnapshot; *************** GetTransactionSnapshot(void) *** 156,162 **** RegisteredSnapshots++; } else ! CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData); FirstSnapshotSet = true; return CurrentSnapshot; --- 169,182 ---- RegisteredSnapshots++; } else ! { ! /* ! * template is only used for the synchronized snapshot feature. Which in ! * turn is only allowed for IsolationUsesXactSnapshot() == true transactions ! */ ! Assert(stemplate == InvalidSnapshot); ! CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, InvalidSnapshot); ! } FirstSnapshotSet = true; return CurrentSnapshot; *************** GetTransactionSnapshot(void) *** 165,176 **** if (IsolationUsesXactSnapshot()) return CurrentSnapshot; ! CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData); return CurrentSnapshot; } /* * GetLatestSnapshot * Get a snapshot that is up-to-date as of the current instant, * even if we are executing in transaction-snapshot mode. --- 185,217 ---- if (IsolationUsesXactSnapshot()) return CurrentSnapshot; ! /* see comment above */ ! Assert(stemplate == InvalidSnapshot); ! CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, InvalidSnapshot); return CurrentSnapshot; } /* + * GetTransactionSnapshot + * Get the appropriate snapshot for a new query in a transaction. + * + * This is the public interface for anything different than the snapshot + * synchronization feature. + * + * Note that the return value may point at static storage that will be modified + * by future calls and by CommandCounterIncrement(). Callers should call + * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be + * used very long. + */ + Snapshot + GetTransactionSnapshot(void) + { + return GetTransactionSnapshotFromTemplate(InvalidSnapshot); + } + + + /* * GetLatestSnapshot * Get a snapshot that is up-to-date as of the current instant, * even if we are executing in transaction-snapshot mode. *************** GetLatestSnapshot(void) *** 182,188 **** if (!FirstSnapshotSet) elog(ERROR, "no snapshot has been set"); ! SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData); return SecondarySnapshot; } --- 223,229 ---- if (!FirstSnapshotSet) elog(ERROR, "no snapshot has been set"); ! SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData, InvalidSnapshot); return SecondarySnapshot; } *************** SnapshotSetCommandId(CommandId curcid) *** 204,246 **** } /* ! * CopySnapshot ! * Copy the given snapshot. * ! * The copy is palloc'd in TopTransactionContext and has initial refcounts set ! * to 0. The returned snapshot has the copied flag set. */ ! static Snapshot ! CopySnapshot(Snapshot snapshot) { - Snapshot newsnap; Size subxipoff; - Size size; - - Assert(snapshot != InvalidSnapshot); - - /* We allocate any XID arrays needed in the same palloc block. */ - size = subxipoff = sizeof(SnapshotData) + - snapshot->xcnt * sizeof(TransactionId); - if (snapshot->subxcnt > 0) - size += snapshot->subxcnt * sizeof(TransactionId); ! newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); ! memcpy(newsnap, snapshot, sizeof(SnapshotData)); ! newsnap->regd_count = 0; ! newsnap->active_count = 0; ! newsnap->copied = true; /* setup XID array */ if (snapshot->xcnt > 0) { ! newsnap->xip = (TransactionId *) (newsnap + 1); ! memcpy(newsnap->xip, snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); } else ! newsnap->xip = NULL; /* * Setup subXID array. Don't bother to copy it if it had overflowed, --- 245,277 ---- } /* ! * CopySnapshotOnto ! * Copy the given snapshot onto an already sufficiently allocated other ! * snapshot. * ! * Return the modified snapshot (onto). */ ! Snapshot ! CopySnapshotOnto(Snapshot snapshot, Snapshot onto) { Size subxipoff; ! subxipoff = sizeof(SnapshotData) + snapshot->xcnt * sizeof(TransactionId); ! memcpy(onto, snapshot, sizeof(SnapshotData)); ! onto->regd_count = 0; ! onto->active_count = 0; ! onto->copied = true; /* setup XID array */ if (snapshot->xcnt > 0) { ! onto->xip = (TransactionId *) (onto + 1); ! memcpy(onto->xip, snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); } else ! onto->xip = NULL; /* * Setup subXID array. Don't bother to copy it if it had overflowed, *************** CopySnapshot(Snapshot snapshot) *** 251,264 **** if (snapshot->subxcnt > 0 && (!snapshot->suboverflowed || snapshot->takenDuringRecovery)) { ! newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff); ! memcpy(newsnap->subxip, snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); } else ! newsnap->subxip = NULL; ! return newsnap; } /* --- 282,320 ---- if (snapshot->subxcnt > 0 && (!snapshot->suboverflowed || snapshot->takenDuringRecovery)) { ! onto->subxip = (TransactionId *) ((char *) onto + subxipoff); ! memcpy(onto->subxip, snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); } else ! onto->subxip = NULL; ! return onto; ! } ! ! /* ! * CopySnapshot ! * Copy the given snapshot. ! * ! * The copy is palloc'd in TopTransactionContext and has initial refcounts set ! * to 0. The returned snapshot has the copied flag set. ! */ ! static Snapshot ! CopySnapshot(Snapshot snapshot) ! { ! Snapshot newsnap; ! Size size; ! ! Assert(snapshot != InvalidSnapshot); ! ! /* We allocate any XID arrays needed in the same palloc block. */ ! size = sizeof(SnapshotData) + snapshot->xcnt * sizeof(TransactionId); ! if (snapshot->subxcnt > 0) ! size += snapshot->subxcnt * sizeof(TransactionId); ! ! newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); ! ! return CopySnapshotOnto(snapshot, newsnap); } /* *************** AtEOXact_Snapshot(bool isCommit) *** 586,588 **** --- 642,1039 ---- SnapshotResetXmin(); } + + /* + * PreCommit_Snapshot + * Cleans up exported snapshots (this needs to happen before we update + * our MyProc entry, hence it is in PreCommit). + */ + void + InvalidateExportedSnapshots(void) + { + ListCell *snapshot; + int i; + char buf[MAXPGPATH]; + + if (exportedSnapshots == NIL) + return; + + Assert(list_length(exportedSnapshots) > 0); + Assert(TransactionIdIsValid(GetTopTransactionIdIfAny())); + + for(i = 1; i <= list_length(exportedSnapshots); i++) + { + XactExportFilePath(buf, GetTopTransactionId(), i, ""); + unlink(buf); + } + + foreach(snapshot, exportedSnapshots) + UnregisterSnapshotFromOwner(lfirst(snapshot), TopTransactionResourceOwner); + + exportedSnapshots = NIL; + } + + /* + * DeleteAllExportedSnapshotFiles + * Cleans up any files that have been left behind by a crashed backend + * that had exported snapshots before it died. + */ + void + DeleteAllExportedSnapshotFiles(void) + { + char buf[MAXPGPATH]; + DIR *s_dir; + struct dirent *s_de; + + if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR))) + { + /* + * We really should have that directory in a sane cluster setup. But + * then again if we don't it's not fatal enough to make it FATAL. + */ + elog(WARNING, + "could not open directory \"%s\": %m", + SNAPSHOT_EXPORT_DIR); + return; + } + + while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL) + { + if (strcmp(s_de->d_name, ".") == 0 || + strcmp(s_de->d_name, "..") == 0) + continue; + + snprintf(buf, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name); + unlink(buf); + } + FreeDir(s_dir); + } + + /* + * ExportSnapshot + * Export the snapshot to a file so that other backends can import the same + * snapshot. + * Returns the token (the file name) that can be used to import this + * snapshot. + */ + static char * + ExportSnapshot(Snapshot snapshot) + { + #define SNAPSHOT_APPEND(x, y) (appendStringInfo(&buf, (x), (y))) + TransactionId *children, topXid; + FILE *f; + int i; + int nchildren; + MemoryContext oldcxt; + char path[MAXPGPATH]; + char pathtmp[MAXPGPATH]; + StringInfoData buf; + + Assert(IsTransactionState()); + + /* + * This will also assign a transaction id if we do not yet have one. + */ + topXid = GetTopTransactionId(); + + Assert(TransactionIdIsValid(GetTopTransactionIdIfAny())); + + /* + * We cannot export a snapshot from a subtransaction because in a + * subtransaction we don't see our open subxip values in the snapshot so + * they would be missing in the backend applying it. + */ + if (GetCurrentTransactionNestLevel() != 1) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot export a snapshot from a subtransaction"))); + + /* + * We do however see our already committed subxip values and add them to + * the subxip array. + */ + nchildren = xactGetCommittedChildren(&children); + + initStringInfo(&buf); + + /* Write up all the data that we return */ + SNAPSHOT_APPEND("xid:%d ", topXid); + SNAPSHOT_APPEND("xmi:%d ", snapshot->xmin); + SNAPSHOT_APPEND("xma:%d ", snapshot->xmax); + /* Include our own transaction ID into the count. */ + SNAPSHOT_APPEND("xcnt:%d ", snapshot->xcnt + 1); + for (i = 0; i < snapshot->xcnt; i++) + SNAPSHOT_APPEND("xip:%d ", snapshot->xip[i]); + /* + * Finally add our own XID, since by definition we will still be running + * when the other transaction takes over the snapshot. + */ + SNAPSHOT_APPEND("xip:%d ", topXid); + if (snapshot->suboverflowed || snapshot->subxcnt + nchildren > TOTAL_MAX_CACHED_SUBXIDS) + SNAPSHOT_APPEND("sof:%d ", 1); + else + { + SNAPSHOT_APPEND("sxcnt:%d ", snapshot->subxcnt + nchildren); + for (i = 0; i < snapshot->subxcnt; i++) + SNAPSHOT_APPEND("sxp:%d ", snapshot->subxip[i]); + /* Add already committed subtransactions. */ + for (i = 0; i < nchildren; i++) + SNAPSHOT_APPEND("sxp:%d ", children[i]); + } + + /* + * buf ends with a trailing space but we leave it in for simplicity. The + * parsing routines also depend on it. + */ + + /* Register the snapshot and add it to the list of exported snapshots */ + snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner); + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + exportedSnapshots = lappend(exportedSnapshots, snapshot); + MemoryContextSwitchTo(oldcxt); + + XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp"); + if (!(f = AllocateFile(pathtmp, PG_BINARY_W))) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", pathtmp))); + + if (fwrite(buf.data, buf.len, 1, f) != 1) + /* Aborting the transaction will also call FreeFile() */ + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", pathtmp))); + + if (FreeFile(f)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", pathtmp))); + + /* + * Now that we have written everything into a .tmp file we rename the file + * and remove the .tmp suffix. Our filename is predictable and we're + * paranoid enough to not let us read a partially written file (we can't + * read a .tmp file because this would fail the valid characters check in + * ImportSnapshot). + */ + XactExportFilePath(path, topXid, list_length(exportedSnapshots), ""); + + if (rename(pathtmp, path) < 0) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + pathtmp, path))); + + /* + * The basename of the file is what we return from pg_export_snapshot(). + * It's already in path in a textual format and we know that the path + * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and over the + * slash and pstrdup it to not return a local variable. + */ + return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1); + #undef SNAPSHOT_APPEND + } + + /* + * Poor man's type independent parser. We only use it in the three functions + * below so there's no need to get ambitious about putting extra (x) around the + * arguments. + */ + #define SNAPSHOT_PARSE(valFunc, inFunc, type, strpp, prfx, notfound) \ + do { \ + char *n, *p = strstr(*strpp, prfx); \ + type v; \ + \ + if (!p) \ + return notfound; \ + p += strlen(prfx); \ + n = strchr(p, ' '); \ + if (!n) \ + return notfound; \ + *n = '\0'; \ + v = valFunc(DirectFunctionCall1(inFunc, CStringGetDatum(p))); \ + *strpp = n + 1; \ + return v; \ + } while (0); + + static int + parseIntFromText(char **s, const char *prefix) + { + SNAPSHOT_PARSE(DatumGetInt32, int4in, int, s, prefix, 0); + } + + static bool + parseBoolFromText(char **s, const char *prefix) + { + SNAPSHOT_PARSE(DatumGetInt32, int4in, bool, s, prefix, false); + } + + static TransactionId + parseXactFromText(char **s, const char *prefix) + { + SNAPSHOT_PARSE(DatumGetTransactionId, xidin, TransactionId, + s, prefix, InvalidTransactionId); + } + + #undef SNAPSHOT_PARSE + + /* + * ImportSnapshot + * Import a previously exported snapshot. We expect that whatever we get + * is a filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file. + * This is called from "SET TRANSACTION SNAPSHOT 'foo'" and we always + * start fresh from zero with respect to the transaction state that we + * work on. Returns true on success and false on failure. + */ + bool + ImportSnapshot(char *idstr) + { + char path[MAXPGPATH]; + FILE *f; + int i; + char *s; + struct stat stat_buf; + int sxcnt, xcnt; + TransactionId xid, origXid, myXid; + SnapshotData snapshot = {HeapTupleSatisfiesMVCC}; + + if (FirstSnapshotSet || GetCurrentTransactionNestLevel() != 1) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("SET TRANSACTION SNAPSHOT must be called before any query"))); + + /* + * If we were in read committed mode then the next query would execute with a + * new snapshot thus making this function call quite useless. + */ + if (!IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("A snapshot importing transaction must have ISOLATION " + "LEVEL SERIALIZABLE or ISOLATION LEVEL REPEATABLE READ"))); + + /* We're lucky to always start off from a pretty clean state */ + Assert(IsTransactionState()); + Assert(GetCurrentTransactionNestLevel() == 1); + Assert(GetTopTransactionIdIfAny() == InvalidTransactionId); + Assert(CurrentSnapshot == NULL); + Assert(SecondarySnapshot == NULL); + Assert(RegisteredSnapshots == 0); + + /* verify the identifier, only 0-9,A-F and a hyphen are allowed... */ + s = idstr; + while (*s) + { + if (!isdigit(*s) && !(*s >= 'A' && *s <= 'F') && *s != '-') + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not import the requested snapshot"), + errhint("the given snapshot identifier contains invalid characters"))); + s++; + } + + /* + * Assign a transaction id. We only do this to detect a possible + * transaction id wraparound which is somewhere between unlikely + * and impossible... + */ + myXid = GetTopTransactionId(); + + snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr); + + /* get the size of the file so that we know how much memory we need */ + if (stat(path, &stat_buf) != 0 || !(f = AllocateFile(path, PG_BINARY_R))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not import the requested snapshot"), + errhint("snapshot has not been exported or does not exist anymore"))); + + s = palloc(stat_buf.st_size + 1); + if (fread(s, stat_buf.st_size, 1, f) != 1) + return false; + + s[stat_buf.st_size] = '\0'; + + FreeFile(f); + + origXid = parseXactFromText(&s, "xid:"); + + snapshot.xmin = parseXactFromText(&s, "xmi:"); + Assert(snapshot.xmin != InvalidTransactionId); + snapshot.xmax = parseXactFromText(&s, "xma:"); + Assert(snapshot.xmax != InvalidTransactionId); + + xcnt = parseIntFromText(&s, "xcnt:"); + /* + * This snapshot only serves as a template, there is no need for it to have + * maxProcs entries, so let's make it just as large as we need it. + */ + snapshot.xip = palloc(xcnt * sizeof(TransactionId)); + + i = 0; + while ((xid = parseXactFromText(&s, "xip:")) != InvalidTransactionId) + snapshot.xip[i++] = xid; + snapshot.xcnt = i; + Assert(snapshot.xcnt == xcnt); + + /* + * We only write "sof:1" if the snapshot overflowed. If not, then there is + * no "sof:x" entry at all and parseBoolFromText() will return false. + */ + snapshot.suboverflowed = parseBoolFromText(&s, "sof:"); + + if (!snapshot.suboverflowed) + { + sxcnt = parseIntFromText(&s, "sxcnt:"); + snapshot.subxip = palloc(sxcnt * sizeof(TransactionId)); + + i = 0; + while ((xid = parseXactFromText(&s, "sxp:")) != InvalidTransactionId) + snapshot.subxip[i++] = xid; + snapshot.subxcnt = i; + Assert(snapshot.subxcnt == sxcnt); + } else { + snapshot.subxip = NULL; + snapshot.subxcnt = 0; + } + + /* complete the snapshot data structure */ + snapshot.curcid = 0; + snapshot.takenDuringRecovery = RecoveryInProgress(); + + /* + * Note that MyProc->xmin can go backwards here. However this is safe + * because the xmin we set here is the same as in the backend's proc->xmin + * whose snapshot we are copying. At this very moment, anybody computing a + * minimum will calculate at least this xmin as the overall xmin with or + * without us setting MyProc->xmin to this value. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + MyProc->xmin = snapshot.xmin; + LWLockRelease(ProcArrayLock); + + /* bail out if the original transaction is not running anymore... */ + if (!TransactionIdIsInProgress(origXid) || TransactionIdPrecedes(myXid, origXid)) + return false; + + /* + * Install the snapshot as if we got it through GetTransactionSnapshot(). + * This will set up CurrentSnapshot and also set up the predicate locks for a + * serializable transaction. + */ + GetTransactionSnapshotFromTemplate(&snapshot); + return true; + } + + Datum + pg_export_snapshot(PG_FUNCTION_ARGS) + { + char *snapshotData; + + RequireTransactionChain(true, "pg_export_snapshot()"); + + snapshotData = ExportSnapshot(GetTransactionSnapshot()); + PG_RETURN_TEXT_P(cstring_to_text(snapshotData)); + } + diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index e535fda..0e981cd 100644 *** a/src/bin/initdb/initdb.c --- b/src/bin/initdb/initdb.c *************** main(int argc, char *argv[]) *** 2557,2562 **** --- 2557,2563 ---- "pg_serial", "pg_subtrans", "pg_twophase", + "pg_snapshots", "pg_multixact/members", "pg_multixact/offsets", "base", diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 0019df5..4706f10 100644 *** a/src/include/access/twophase.h --- b/src/include/access/twophase.h *************** *** 22,30 **** */ typedef struct GlobalTransactionData *GlobalTransaction; - /* GUC variable */ - extern int max_prepared_xacts; - extern Size TwoPhaseShmemSize(void); extern void TwoPhaseShmemInit(void); --- 22,27 ---- diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 96f43fe..a4e0387 100644 *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** DATA(insert OID = 2171 ( pg_cancel_backe *** 2853,2858 **** --- 2853,2860 ---- DESCR("cancel a server process' current query"); DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ )); DESCR("terminate a server process"); + DATA(insert OID = 3122 ( pg_export_snapshot PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_export_snapshot _null_ _null_ _null_ )); + DESCR("export a snapshot"); DATA(insert OID = 2172 ( pg_start_backup PGNSP PGUID 12 1 0 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ )); DESCR("prepare for taking an online backup"); DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ )); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 9d19417..15326cf 100644 *** a/src/include/miscadmin.h --- b/src/include/miscadmin.h *************** extern PGDLLIMPORT int NBuffers; *** 134,139 **** --- 134,142 ---- extern int MaxBackends; extern int MaxConnections; + /* GUC variable */ + extern int max_prepared_xacts; + extern PGDLLIMPORT int MyProcPid; extern PGDLLIMPORT pg_time_t MyStartTime; extern PGDLLIMPORT struct Port *MyProcPort; diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 12c2faf..3d170bc 100644 *** a/src/include/parser/kwlist.h --- b/src/include/parser/kwlist.h *************** PG_KEYWORD("show", SHOW, UNRESERVED_KEYW *** 337,342 **** --- 337,343 ---- PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD) PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD) + PG_KEYWORD("snapshot", SNAPSHOT, UNRESERVED_KEYWORD) PG_KEYWORD("some", SOME, RESERVED_KEYWORD) PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD) PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD) diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h index 9603b10..50b046e 100644 *** a/src/include/storage/predicate.h --- b/src/include/storage/predicate.h *************** extern void CheckPointPredicate(void); *** 42,48 **** extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno); /* predicate lock maintenance */ ! extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot); extern void RegisterPredicateLockingXid(TransactionId xid); extern void PredicateLockRelation(Relation relation, Snapshot snapshot); extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot); --- 42,48 ---- extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno); /* predicate lock maintenance */ ! extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot, Snapshot stemplate); extern void RegisterPredicateLockingXid(TransactionId xid); extern void PredicateLockRelation(Relation relation, Snapshot snapshot); extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a11d438..dfe57aa 100644 *** a/src/include/storage/procarray.h --- b/src/include/storage/procarray.h *************** extern void ExpireOldKnownAssignedTransa *** 39,45 **** extern RunningTransactions GetRunningTransactionData(void); ! extern Snapshot GetSnapshotData(Snapshot snapshot); extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); --- 39,45 ---- extern RunningTransactions GetRunningTransactionData(void); ! extern Snapshot GetSnapshotData(Snapshot snapshot, Snapshot stemplate); extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); *************** extern void XidCacheRemoveRunningXids(Tr *** 69,72 **** --- 69,78 ---- int nxids, const TransactionId *xids, TransactionId latestXid); + /* Size of the ProcArray structure itself */ + #define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) + + #define TOTAL_MAX_CACHED_SUBXIDS \ + ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS) + #endif /* PROCARRAY_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index e665a28..a26e2fe 100644 *** a/src/include/utils/snapmgr.h --- b/src/include/utils/snapmgr.h *************** extern TransactionId TransactionXmin; *** 22,27 **** --- 22,29 ---- extern TransactionId RecentXmin; extern TransactionId RecentGlobalXmin; + extern List *exportedSnapshots; + extern Snapshot GetTransactionSnapshot(void); extern Snapshot GetLatestSnapshot(void); extern void SnapshotSetCommandId(CommandId curcid); *************** extern void UpdateActiveSnapshotCommandI *** 32,37 **** --- 34,40 ---- extern void PopActiveSnapshot(void); extern Snapshot GetActiveSnapshot(void); extern bool ActiveSnapshotSet(void); + extern Snapshot CopySnapshotOnto(Snapshot onto, Snapshot snapshot); extern Snapshot RegisterSnapshot(Snapshot snapshot); extern void UnregisterSnapshot(Snapshot snapshot); *************** extern void AtSubCommit_Snapshot(int lev *** 42,45 **** --- 45,53 ---- extern void AtSubAbort_Snapshot(int level); extern void AtEOXact_Snapshot(bool isCommit); + extern Datum pg_export_snapshot(PG_FUNCTION_ARGS); + extern bool ImportSnapshot(char *idstr); + extern void InvalidateExportedSnapshots(void); + extern void DeleteAllExportedSnapshotFiles(void); + #endif /* SNAPMGR_H */