*** doc/src/sgml/catalogs.sgml.orig Mon Jun 13 19:14:47 2005 --- doc/src/sgml/catalogs.sgml Fri Jun 17 18:28:45 2005 *************** *** 3933,3938 **** --- 3933,3943 ---- + pg_prepared_xacts + currently prepared transactions + + + pg_rules rules *************** *** 4167,4174 **** pid integer ! process ID of the server process holding or awaiting this ! lock mode --- 4172,4181 ---- pid integer ! ! Process ID of the server process holding or awaiting this ! lock. Zero if the lock is held by a prepared transaction. ! mode *************** *** 4246,4251 **** --- 4253,4339 ---- procpid column of the pg_stat_activity view to get more information on the session holding or waiting to hold the lock. + + + + + + <structname>pg_prepared_xacts</structname> + + + pg_prepared_xacts + + + + The view pg_prepared_xacts displays + information about transactions that are currently prepared for two-phase + commit (see for details). + + + + pg_prepared_xacts contains one row per prepared + transaction. An entry is removed when the transaction is committed or + rolled back. + + + + <structname>pg_prepared_xacts</> Columns + + + + + Name + Type + References + Description + + + + + transaction + xid + + + Numeric transaction identifier of the prepared transaction + + + + gid + text + + + Global transaction identifier that was assigned to the transaction + + + + owner + name + pg_shadow.usename + + Name of the user that executed the transaction + + + + database + name + pg_database.datname + + Name of the database in which the transaction was executed + + + + +
+ + + When the pg_prepared_xacts view is accessed, the + internal transaction manager data structures are momentarily locked, and + a copy is made for the view to display. This ensures that the + view produces a consistent set of results, while not blocking + normal operations longer than necessary. Nonetheless + there could be some impact on database performance if this view is + read often.
*** doc/src/sgml/ref/allfiles.sgml.orig Sat Aug 21 12:16:04 2004 --- doc/src/sgml/ref/allfiles.sgml Thu Jun 16 15:19:33 2005 *************** *** 30,35 **** --- 30,36 ---- + *************** *** 88,98 **** --- 89,101 ---- + + *** doc/src/sgml/ref/commit_prepared.sgml.orig Thu Jun 16 15:19:31 2005 --- doc/src/sgml/ref/commit_prepared.sgml Fri Jun 17 18:28:41 2005 *************** *** 0 **** --- 1,111 ---- + + + + + COMMIT PREPARED + SQL - Language Statements + + + + COMMIT PREPARED + commit a transaction that was earlier prepared for two-phase commit + + + + COMMIT PREPARED + + + + + COMMIT PREPARED transaction_id + + + + + Description + + + COMMIT PREPARED commits a transaction that is in + prepared state. + + + + + Parameters + + + + transaction_id + + + The transaction identifier of the transaction that is to be + committed. + + + + + + + + Notes + + + To commit a prepared transaction, you must be either the same user that + executed the transaction originally, or a superuser. But you do not + have to be in the same session that executed the transaction. + + + + This command cannot be executed inside a transaction block. The prepared + transaction is committed immediately. + + + + All currently available prepared transactions are listed in the + pg_prepared_xacts system view. + + + + + Examples + + Commit the transaction identified by the transaction + identifier foobar: + + + COMMIT PREPARED 'foobar'; + + + + + + + See Also + + + + + + + + + + *** doc/src/sgml/ref/prepare_transaction.sgml.orig Thu Jun 16 15:19:31 2005 --- doc/src/sgml/ref/prepare_transaction.sgml Fri Jun 17 18:28:41 2005 *************** *** 0 **** --- 1,160 ---- + + + + + PREPARE TRANSACTION + SQL - Language Statements + + + + PREPARE TRANSACTION + prepare the current transaction for two-phase commit + + + + PREPARE TRANSACTION + + + + + PREPARE TRANSACTION transaction_id + + + + + Description + + + PREPARE TRANSACTION prepares the current transaction + for two-phase commit. After this command, the transaction is no longer + associated with the current session; instead, its state is fully stored on + disk, and there is a very high probability that it can be committed + successfully, even if a database crash occurs before the commit is + requested. + + + + Once prepared, a transaction can later be committed or rolled + back with COMMIT PREPARED or + ROLLBACK PREPARED, respectively. Those commands + can be issued from any session, not only the one that executed the + original transaction. + + + + From the point of view of the issuing session, PREPARE + TRANSACTION is not unlike a ROLLBACK command: + after executing it, there is no active current transaction, and the + effects of the prepared transaction are no longer visible. (The effects + will become visible again if the transaction is committed.) + + + + If the PREPARE TRANSACTION command fails for any + reason, it becomes a ROLLBACK: the current transaction + is canceled. + + + + + Parameters + + + + transaction_id + + + An arbitrary identifier that later identifies this transaction for + COMMIT PREPARED or ROLLBACK PREPARED. + The identifier must be written as a string literal, and must be + less than 200 bytes long. It must not be the same as the identifier + used for any currently prepared transaction. + + + + + + + + Notes + + + This command must be used inside a transaction block. Use + BEGIN to start one. + + + + It is not currently allowed to PREPARE a transaction that + has executed any operations involving temporary tables nor + created any cursors WITH HOLD. Those features are too tightly + tied to the current session to be useful in a transaction to be prepared. + + + + If the transaction modified any run-time parameters with SET, + those effects persist after PREPARE TRANSACTION, and will not + be affected by any later COMMIT PREPARED or + ROLLBACK PREPARED. Thus, in this one respect + PREPARE TRANSACTION acts more like COMMIT than + ROLLBACK. + + + + All currently available prepared transactions are listed in the + pg_prepared_xacts system view. + + + + From a performance standpoint, it is unwise to leave transactions in + the prepared state for a long time: this will for instance interfere with + the ability of VACUUM to reclaim storage. Keep in mind also + that the transaction continues to hold whatever locks it held. + The intended + usage of the feature is that a prepared transaction will normally be + committed or rolled back as soon as an external transaction manager + has verified that other databases are also prepared to commit. + + + + + Examples + + Prepare the current transaction for two-phase commit, using + foobar as the transaction identifier: + + + PREPARE TRANSACTION 'foobar'; + + + + + + See Also + + + + + + + + + + *** doc/src/sgml/ref/rollback_prepared.sgml.orig Thu Jun 16 15:19:31 2005 --- doc/src/sgml/ref/rollback_prepared.sgml Fri Jun 17 18:28:42 2005 *************** *** 0 **** --- 1,111 ---- + + + + + ROLLBACK PREPARED + SQL - Language Statements + + + + ROLLBACK PREPARED + cancel a transaction that was earlier prepared for two-phase commit + + + + ROLLBACK PREPARED + + + + + ROLLBACK PREPARED transaction_id + + + + + Description + + + ROLLBACK PREPARED rolls back a transaction that is in + prepared state. + + + + + Parameters + + + + transaction_id + + + The transaction identifier of the transaction that is to be + rolled back. + + + + + + + + Notes + + + To roll back a prepared transaction, you must be either the same user that + executed the transaction originally, or a superuser. But you do not + have to be in the same session that executed the transaction. + + + + This command cannot be executed inside a transaction block. The prepared + transaction is rolled back immediately. + + + + All currently available prepared transactions are listed in the + pg_prepared_xacts system view. + + + + + Examples + + Roll back the transaction identified by the transaction + identifier foobar: + + + ROLLBACK PREPARED 'foobar'; + + + + + + + See Also + + + + + + + + + + *** doc/src/sgml/reference.sgml.orig Sat Aug 21 12:16:03 2004 --- doc/src/sgml/reference.sgml Thu Jun 16 15:19:15 2005 *************** *** 62,67 **** --- 62,68 ---- &cluster; &commentOn; &commit; + &commitPrepared; ©Table; &createAggregate; &createCast; *************** *** 120,130 **** --- 121,133 ---- &move; ¬ify; &prepare; + &prepareTransaction; &reindex; &releaseSavepoint; &reset; &revoke; &rollback; + &rollbackPrepared; &rollbackTo; &savepoint; &select; *** doc/src/sgml/runtime.sgml.orig Tue Jun 14 18:17:23 2005 --- doc/src/sgml/runtime.sgml Fri Jun 17 17:21:30 2005 *************** *** 1113,1118 **** --- 1113,1145 ---- + + max_prepared_transactions (integer) + + max_prepared_transactions configuration parameter + + + + Sets the maximum number of transactions that can be in the + prepared state simultaneously (see ). + Setting this parameter to zero disables the prepared-transaction + feature. + The default is 50. + This option can only be set at server start. + + + + Increasing this parameter may cause PostgreSQL + to request more System V shared + memory than your operating system's default configuration + allows. See for information on how to + adjust those parameters, if necessary. + + + + work_mem (integer) *** src/backend/access/transam/Makefile.orig Thu Apr 28 17:47:10 2005 --- src/backend/access/transam/Makefile Thu Jun 16 15:18:53 2005 *************** *** 12,18 **** top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o all: SUBSYS.o --- 12,18 ---- top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o all: SUBSYS.o *** src/backend/access/transam/subtrans.c.orig Thu May 19 17:35:45 2005 --- src/backend/access/transam/subtrans.c Fri Jun 17 14:00:44 2005 *************** *** 222,243 **** /* * This must be called ONCE during postmaster or standalone-backend startup, * after StartupXLOG has initialized ShmemVariableCache->nextXid. */ void ! StartupSUBTRANS(void) { int startPage; /* * Since we don't expect pg_subtrans to be valid across crashes, we ! * initialize the currently-active page to zeroes during startup. * Whenever we advance into a new page, ExtendSUBTRANS will likewise * zero the new page without regard to whatever was previously on * disk. */ LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE); ! startPage = TransactionIdToPage(ShmemVariableCache->nextXid); (void) ZeroSUBTRANSPage(startPage); LWLockRelease(SubtransControlLock); --- 222,254 ---- /* * This must be called ONCE during postmaster or standalone-backend startup, * after StartupXLOG has initialized ShmemVariableCache->nextXid. + * + * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid + * if there are none. */ void ! StartupSUBTRANS(TransactionId oldestActiveXID) { int startPage; + int endPage; /* * Since we don't expect pg_subtrans to be valid across crashes, we ! * initialize the currently-active page(s) to zeroes during startup. * Whenever we advance into a new page, ExtendSUBTRANS will likewise * zero the new page without regard to whatever was previously on * disk. */ LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE); ! startPage = TransactionIdToPage(oldestActiveXID); ! endPage = TransactionIdToPage(ShmemVariableCache->nextXid); ! ! while (startPage != endPage) ! { ! (void) ZeroSUBTRANSPage(startPage); ! startPage++; ! } (void) ZeroSUBTRANSPage(startPage); LWLockRelease(SubtransControlLock); *** src/backend/access/transam/transam.c.orig Sun Feb 20 16:46:48 2005 --- src/backend/access/transam/transam.c Fri Jun 17 14:37:02 2005 *************** *** 173,178 **** --- 173,186 ---- * recursively. However, if it's older than TransactionXmin, we can't * look at pg_subtrans; instead assume that the parent crashed without * cleaning up its children. + * + * Originally we Assert'ed that the result of SubTransGetParent was + * not zero. However with the introduction of prepared transactions, + * there can be a window just after database startup where we do not + * have complete knowledge in pg_subtrans of the transactions after + * TransactionXmin. StartupSUBTRANS() has ensured that any missing + * information will be zeroed. Since this case should not happen under + * normal conditions, it seems reasonable to emit a WARNING for it. */ if (xidstatus == TRANSACTION_STATUS_SUB_COMMITTED) { *************** *** 181,187 **** if (TransactionIdPrecedes(transactionId, TransactionXmin)) return false; parentXid = SubTransGetParent(transactionId); ! Assert(TransactionIdIsValid(parentXid)); return TransactionIdDidCommit(parentXid); } --- 189,200 ---- if (TransactionIdPrecedes(transactionId, TransactionXmin)) return false; parentXid = SubTransGetParent(transactionId); ! if (!TransactionIdIsValid(parentXid)) ! { ! elog(WARNING, "no pg_subtrans entry for subcommitted XID %u", ! transactionId); ! return false; ! } return TransactionIdDidCommit(parentXid); } *************** *** 224,230 **** if (TransactionIdPrecedes(transactionId, TransactionXmin)) return true; parentXid = SubTransGetParent(transactionId); ! Assert(TransactionIdIsValid(parentXid)); return TransactionIdDidAbort(parentXid); } --- 237,249 ---- if (TransactionIdPrecedes(transactionId, TransactionXmin)) return true; parentXid = SubTransGetParent(transactionId); ! if (!TransactionIdIsValid(parentXid)) ! { ! /* see notes in TransactionIdDidCommit */ ! elog(WARNING, "no pg_subtrans entry for subcommitted XID %u", ! transactionId); ! return true; ! } return TransactionIdDidAbort(parentXid); } *** src/backend/access/transam/twophase.c.orig Thu Jun 16 15:18:52 2005 --- src/backend/access/transam/twophase.c Fri Jun 17 17:23:43 2005 *************** *** 0 **** --- 1,1659 ---- + /*------------------------------------------------------------------------- + * + * twophase.c + * Two-phase commit support functions. + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL$ + * + * NOTES + * Each global transaction is associated with a global transaction + * identifier (GID). The client assigns a GID to a postgres + * transaction with the PREPARE TRANSACTION command. + * + * We keep all active global transactions in a shared memory array. + * When the PREPARE TRANSACTION command is issued, the GID is + * reserved for the transaction in the array. This is done before + * a WAL entry is made, because the reservation checks for duplicate + * GIDs and aborts the transaction if there already is a global + * transaction in prepared state with the same GID. + * + * A global transaction (gxact) also has a dummy PGPROC that is entered + * into the ProcArray array; this is what keeps the XID considered + * running by TransactionIdIsInProgress. It is also convenient as a + * PGPROC to hook the gxact's locks to. + * + * In order to survive crashes and shutdowns, all prepared + * transactions must be stored in permanent storage. This includes + * locking information, pending notifications etc. All that state + * information is written to the per-transaction state file in + * the pg_twophase directory. + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include + #include + #include + #include + + #include "access/heapam.h" + #include "access/subtrans.h" + #include "access/twophase.h" + #include "access/twophase_rmgr.h" + #include "access/xact.h" + #include "catalog/pg_type.h" + #include "funcapi.h" + #include "miscadmin.h" + #include "storage/fd.h" + #include "storage/proc.h" + #include "storage/procarray.h" + #include "storage/smgr.h" + #include "utils/builtins.h" + #include "pgstat.h" + + + /* + * Directory where Two-phase commit files reside within PGDATA + */ + #define TWOPHASE_DIR "pg_twophase" + + /* GUC variable, can't be changed after startup */ + int max_prepared_xacts = 50; + + /* + * This struct describes one global transaction that is in prepared state + * or attempting to become prepared. + * + * The first component of the struct is a dummy PGPROC that is inserted + * into the global ProcArray so that the transaction appears to still be + * running and holding locks. It must be first because we cast pointers + * to PGPROC and pointers to GlobalTransactionData back and forth. + * + * The lifecycle of a global transaction is: + * + * 1. After checking that the requested GID is not in use, set up an + * entry in the TwoPhaseState->prepXacts array with the correct XID and GID, + * with locking_xid = my own XID and valid = false. + * + * 2. After successfully completing prepare, set valid = true and enter the + * contained PGPROC into the global ProcArray. + * + * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry + * is valid and its locking_xid is no longer active, then store my current + * XID into locking_xid. This prevents concurrent attempts to commit or + * rollback the same prepared xact. + * + * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry + * from the ProcArray and the TwoPhaseState->prepXacts array and return it to + * the freelist. + * + * Note that if the preparing transaction fails between steps 1 and 2, the + * entry will remain in prepXacts until recycled. We can detect recyclable + * entries by checking for valid = false and locking_xid no longer active. + * + * typedef struct GlobalTransactionData *GlobalTransaction appears in + * twophase.h + */ + #define GIDSIZE 200 + + typedef struct GlobalTransactionData + { + PGPROC proc; /* dummy proc */ + AclId owner; /* ID of user that executed the xact */ + TransactionId locking_xid; /* top-level XID of backend working on xact */ + bool valid; /* TRUE if fully prepared */ + char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ + } GlobalTransactionData; + + /* + * Two Phase Commit shared state. Access to this struct is protected + * by TwoPhaseStateLock. + */ + typedef struct TwoPhaseStateData + { + /* Head of linked list of free GlobalTransactionData structs */ + SHMEM_OFFSET freeGXacts; + + /* Number of valid prepXacts entries. */ + int numPrepXacts; + + /* + * There are max_prepared_xacts items in this array, but C wants a + * fixed-size array. + */ + GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */ + } TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */ + + static TwoPhaseStateData *TwoPhaseState; + + + static void RecordTransactionCommitPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels); + static void RecordTransactionAbortPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels); + static void ProcessRecords(char *bufptr, TransactionId xid, + const TwoPhaseCallback callbacks[]); + + + /* + * Initialization of shared memory + */ + int + TwoPhaseShmemSize(void) + { + /* Need the fixed struct, the array of pointers, and the GTD structs */ + return MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + + sizeof(GlobalTransaction) * max_prepared_xacts) + + sizeof(GlobalTransactionData) * max_prepared_xacts; + } + + void + TwoPhaseShmemInit(void) + { + bool found; + + TwoPhaseState = ShmemInitStruct("Prepared Transaction Table", + TwoPhaseShmemSize(), + &found); + if (!IsUnderPostmaster) + { + GlobalTransaction gxacts; + int i; + + Assert(!found); + TwoPhaseState->freeGXacts = INVALID_OFFSET; + TwoPhaseState->numPrepXacts = 0; + + /* + * Initialize the linked list of free GlobalTransactionData structs + */ + gxacts = (GlobalTransaction) + ((char *) TwoPhaseState + + MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + + sizeof(GlobalTransaction) * max_prepared_xacts)); + for (i = 0; i < max_prepared_xacts; i++) + { + gxacts[i].proc.links.next = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]); + } + } + else + Assert(found); + } + + + /* + * MarkAsPreparing + * Reserve the GID for the given transaction. + * + * Internally, this creates a gxact struct and puts it into the active array. + * NOTE: this is also used when reloading a gxact after a crash; so avoid + * assuming that we can use very much backend context. + */ + GlobalTransaction + MarkAsPreparing(TransactionId xid, Oid databaseid, char *gid, AclId owner) + { + GlobalTransaction gxact; + int i; + + if (strlen(gid) >= GIDSIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("global transaction identifier \"%s\" is too long", + gid))); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + /* + * First, find and recycle any gxacts that failed during prepare. + * We do this partly to ensure we don't mistakenly say their GIDs + * are still reserved, and partly so we don't fail on out-of-slots + * unnecessarily. + */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid)) + { + /* It's dead Jim ... remove from the active array */ + TwoPhaseState->numPrepXacts--; + TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; + /* and put it back in the freelist */ + gxact->proc.links.next = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact); + /* Back up index count too, so we don't miss scanning one */ + i--; + } + } + + /* Check for conflicting GID */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + if (strcmp(gxact->gid, gid) == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("global transaction identifier \"%s\" is already in use", + gid))); + } + } + + /* Get a free gxact from the freelist */ + if (TwoPhaseState->freeGXacts == INVALID_OFFSET) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of prepared transactions reached"), + errhint("Increase max_prepared_transactions (currently %d).", + max_prepared_xacts))); + gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts); + TwoPhaseState->freeGXacts = gxact->proc.links.next; + + /* Initialize it */ + MemSet(&gxact->proc, 0, sizeof(PGPROC)); + SHMQueueElemInit(&(gxact->proc.links)); + gxact->proc.waitStatus = STATUS_OK; + gxact->proc.xid = xid; + gxact->proc.xmin = InvalidTransactionId; + gxact->proc.pid = 0; + gxact->proc.databaseId = databaseid; + gxact->proc.lwWaiting = false; + gxact->proc.lwExclusive = false; + gxact->proc.lwWaitLink = NULL; + gxact->proc.waitLock = NULL; + gxact->proc.waitProcLock = NULL; + SHMQueueInit(&(gxact->proc.procLocks)); + /* subxid data must be filled later by GXactLoadSubxactData */ + gxact->proc.subxids.overflowed = false; + gxact->proc.subxids.nxids = 0; + + gxact->owner = owner; + gxact->locking_xid = xid; + gxact->valid = false; + strcpy(gxact->gid, gid); + + /* And insert it into the active array */ + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + + LWLockRelease(TwoPhaseStateLock); + + return gxact; + } + + /* + * GXactLoadSubxactData + * + * If the transaction being persisted had any subtransactions, this must + * be called before MarkAsPrepared() to load information into the dummy + * PGPROC. + */ + static void + GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, + TransactionId *children) + { + /* We need no extra lock since the GXACT isn't valid yet */ + if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS) + { + gxact->proc.subxids.overflowed = true; + nsubxacts = PGPROC_MAX_CACHED_SUBXIDS; + } + if (nsubxacts > 0) + { + memcpy(gxact->proc.subxids.xids, children, + nsubxacts * sizeof(TransactionId)); + gxact->proc.subxids.nxids = nsubxacts; + } + } + + /* + * MarkAsPrepared + * Mark the GXACT as fully valid, and enter it into the global ProcArray. + */ + void + MarkAsPrepared(GlobalTransaction gxact) + { + /* Lock here may be overkill, but I'm not convinced of that ... */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(!gxact->valid); + gxact->valid = true; + LWLockRelease(TwoPhaseStateLock); + + /* + * Put it into the global ProcArray so TransactionIdInProgress considers + * the XID as still running. + */ + ProcArrayAdd(&gxact->proc); + } + + /* + * LockGXact + * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. + */ + static GlobalTransaction + LockGXact(char *gid, AclId user) + { + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs */ + if (!gxact->valid) + continue; + if (strcmp(gxact->gid, gid) != 0) + continue; + + /* Found it, but has someone else got it locked? */ + if (TransactionIdIsValid(gxact->locking_xid)) + { + if (TransactionIdIsActive(gxact->locking_xid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepared transaction with gid \"%s\" is busy", + gid))); + gxact->locking_xid = InvalidTransactionId; + } + + if (user != gxact->owner && !superuser_arg(user)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to finish prepared transaction"), + errhint("Must be superuser or the user that prepared the transaction."))); + + /* OK for me to lock it */ + gxact->locking_xid = GetTopTransactionId(); + + LWLockRelease(TwoPhaseStateLock); + + return gxact; + } + + LWLockRelease(TwoPhaseStateLock); + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("prepared transaction with gid \"%s\" does not exist", + gid))); + + /* NOTREACHED */ + return NULL; + } + + /* + * RemoveGXact + * Remove the prepared transaction from the shared memory array. + * + * NB: caller should have already removed it from ProcArray + */ + static void + RemoveGXact(GlobalTransaction gxact) + { + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + if (gxact == TwoPhaseState->prepXacts[i]) + { + /* remove from the active array */ + TwoPhaseState->numPrepXacts--; + TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; + + /* and put it back in the freelist */ + gxact->proc.links.next = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact); + + LWLockRelease(TwoPhaseStateLock); + + return; + } + } + + LWLockRelease(TwoPhaseStateLock); + + elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); + } + + /* + * Returns an array of all prepared transactions for the user-level + * function pg_prepared_xact. + * + * The returned array and all its elements are copies of internal data + * structures, to minimize the time we need to hold the TwoPhaseStateLock. + * + * WARNING -- we return even those transactions that are not fully prepared + * yet. The caller should filter them out if he doesn't want them. + * + * The returned array is palloc'd. + */ + static int + GetPreparedTransactionList(GlobalTransaction *gxacts) + { + GlobalTransaction array; + int num; + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + if (TwoPhaseState->numPrepXacts == 0) + { + LWLockRelease(TwoPhaseStateLock); + + *gxacts = NULL; + return 0; + } + + num = TwoPhaseState->numPrepXacts; + array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num); + *gxacts = array; + for (i = 0; i < num; i++) + memcpy(array + i, TwoPhaseState->prepXacts[i], + sizeof(GlobalTransactionData)); + + LWLockRelease(TwoPhaseStateLock); + + return num; + } + + + /* Working status for pg_prepared_xact */ + typedef struct + { + GlobalTransaction array; + int ngxacts; + int currIdx; + } Working_State; + + /* + * pg_prepared_xact + * Produce a view with one row per prepared transaction. + * + * This function is here so we don't have to export the + * GlobalTransactionData struct definition. + */ + Datum + pg_prepared_xact(PG_FUNCTION_ARGS) + { + FuncCallContext *funcctx; + Working_State *status; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * Switch to memory context appropriate for multiple function + * calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + /* this had better match pg_prepared_xacts view in system_views.sql */ + tupdesc = CreateTemplateTupleDesc(4, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "ownerid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "dbid", + OIDOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect all the 2PC status information that we will format and + * send out as a result set. + */ + status = (Working_State *) palloc(sizeof(Working_State)); + funcctx->user_fctx = (void *) status; + + status->ngxacts = GetPreparedTransactionList(&status->array); + status->currIdx = 0; + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + status = (Working_State *) funcctx->user_fctx; + + while (status->array != NULL && status->currIdx < status->ngxacts) + { + GlobalTransaction gxact = &status->array[status->currIdx++]; + Datum values[4]; + bool nulls[4]; + HeapTuple tuple; + Datum result; + + if (!gxact->valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = TransactionIdGetDatum(gxact->proc.xid); + values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid)); + values[2] = Int32GetDatum(gxact->owner); + values[3] = ObjectIdGetDatum(gxact->proc.databaseId); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); + } + + /* + * TwoPhaseGetDummyProc + * Get the PGPROC that represents a prepared transaction specified by XID + */ + PGPROC * + TwoPhaseGetDummyProc(TransactionId xid) + { + PGPROC *result = NULL; + int i; + + static TransactionId cached_xid = InvalidTransactionId; + static PGPROC *cached_proc = NULL; + + /* + * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called + * repeatedly for the same XID. We can save work with a simple cache. + */ + if (xid == cached_xid) + return cached_proc; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + if (gxact->proc.xid == xid) + { + result = &gxact->proc; + break; + } + } + + LWLockRelease(TwoPhaseStateLock); + + if (result == NULL) /* should not happen */ + elog(ERROR, "failed to find dummy PGPROC for xid %u", xid); + + cached_xid = xid; + cached_proc = result; + + return result; + } + + /************************************************************************/ + /* State file support */ + /************************************************************************/ + + #define TwoPhaseFilePath(path, xid) \ + snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid) + + /* + * 2PC state file format: + * + * 1. TwoPhaseFileHeader + * 2. TransactionId[] (subtransactions) + * 3. RelFileNode[] (files to be deleted at commit) + * 4. RelFileNode[] (files to be deleted at abort) + * 5. TwoPhaseRecordOnDisk + * 6. ... + * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID) + * 8. CRC32 + * + * Each segment except the final CRC32 is MAXALIGN'd. + */ + + /* + * Header for a 2PC state file + */ + #define TWOPHASE_MAGIC 0x57F94530 /* format identifier */ + + typedef struct TwoPhaseFileHeader + { + uint32 magic; /* format identifier */ + uint32 total_len; /* actual file length */ + TransactionId xid; /* original transaction XID */ + Oid database; /* OID of database it was in */ + AclId owner; /* user running the transaction */ + int32 nsubxacts; /* number of following subxact XIDs */ + int32 ncommitrels; /* number of delete-on-commit rels */ + int32 nabortrels; /* number of delete-on-abort rels */ + char gid[GIDSIZE]; /* GID for transaction */ + } TwoPhaseFileHeader; + + /* + * Header for each record in a state file + * + * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header. + * The rmgr data will be stored starting on a MAXALIGN boundary. + */ + typedef struct TwoPhaseRecordOnDisk + { + uint32 len; /* length of rmgr data */ + TwoPhaseRmgrId rmid; /* resource manager for this record */ + uint16 info; /* flag bits for use by rmgr */ + } TwoPhaseRecordOnDisk; + + /* + * During prepare, the state file is assembled in memory before writing it + * to WAL and the actual state file. We use a chain of XLogRecData blocks + * so that we will be able to pass the state file contents directly to + * XLogInsert. + */ + static struct xllist + { + XLogRecData *head; /* first data block in the chain */ + XLogRecData *tail; /* last block in chain */ + uint32 bytes_free; /* free bytes left in tail block */ + uint32 total_len; /* total data bytes in chain */ + } records; + + + /* + * Append a block of data to records data structure. + * + * NB: each block is padded to a MAXALIGN multiple. This must be + * accounted for when the file is later read! + * + * The data is copied, so the caller is free to modify it afterwards. + */ + static void + save_state_data(const void *data, uint32 len) + { + uint32 padlen = MAXALIGN(len); + + if (padlen > records.bytes_free) + { + records.tail->next = palloc0(sizeof(XLogRecData)); + records.tail = records.tail->next; + records.tail->buffer = InvalidBuffer; + records.tail->len = 0; + records.tail->next = NULL; + + records.bytes_free = Max(padlen, 512); + records.tail->data = palloc(records.bytes_free); + } + + memcpy(((char *) records.tail->data) + records.tail->len, data, len); + records.tail->len += padlen; + records.bytes_free -= padlen; + records.total_len += padlen; + } + + /* + * Start preparing a state file. + * + * Initializes data structure and inserts the 2PC file header record. + */ + void + StartPrepare(GlobalTransaction gxact) + { + TransactionId xid = gxact->proc.xid; + TwoPhaseFileHeader hdr; + TransactionId *children; + RelFileNode *commitrels; + RelFileNode *abortrels; + + /* Initialize linked list */ + records.head = palloc0(sizeof(XLogRecData)); + records.head->buffer = InvalidBuffer; + records.head->len = 0; + records.head->next = NULL; + + records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512); + records.head->data = palloc(records.bytes_free); + + records.tail = records.head; + + records.total_len = 0; + + /* Create header */ + hdr.magic = TWOPHASE_MAGIC; + hdr.total_len = 0; /* EndPrepare will fill this in */ + hdr.xid = xid; + hdr.database = MyDatabaseId; + hdr.owner = GetUserId(); + hdr.nsubxacts = xactGetCommittedChildren(&children); + hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels); + hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels); + StrNCpy(hdr.gid, gxact->gid, GIDSIZE); + + save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); + + /* Add the additional info about subxacts and deletable files */ + if (hdr.nsubxacts > 0) + { + save_state_data(children, hdr.nsubxacts * sizeof(TransactionId)); + /* While we have the child-xact data, stuff it in the gxact too */ + GXactLoadSubxactData(gxact, hdr.nsubxacts, children); + pfree(children); + } + if (hdr.ncommitrels > 0) + { + save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode)); + pfree(commitrels); + } + if (hdr.nabortrels > 0) + { + save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode)); + pfree(abortrels); + } + } + + /* + * Finish preparing state file. + * + * Calculates CRC and writes state file to WAL and in pg_twophase directory. + */ + void + EndPrepare(GlobalTransaction gxact) + { + TransactionId xid = gxact->proc.xid; + TwoPhaseFileHeader *hdr; + char path[MAXPGPATH]; + XLogRecData *record; + XLogRecPtr recptr; + pg_crc32 statefile_crc; + pg_crc32 bogus_crc; + int fd; + + /* Add the end sentinel to the list of 2PC records */ + RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, + NULL, 0); + + /* Go back and fill in total_len in the file header record */ + hdr = (TwoPhaseFileHeader *) records.head->data; + Assert(hdr->magic == TWOPHASE_MAGIC); + hdr->total_len = records.total_len + sizeof(pg_crc32); + + /* + * Create the 2PC state file. + * + * Note: because we use BasicOpenFile(), we are responsible for ensuring + * the FD gets closed in any error exit path. Once we get into the + * critical section, though, it doesn't matter since any failure causes + * PANIC anyway. + */ + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create twophase state file \"%s\": %m", + path))); + + /* Write data to file, and calculate CRC as we pass over it */ + INIT_CRC32(statefile_crc); + + for (record = records.head; record != NULL; record = record->next) + { + COMP_CRC32(statefile_crc, record->data, record->len); + if ((write(fd, record->data, record->len)) != record->len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + } + + FIN_CRC32(statefile_crc); + + /* + * Write a deliberately bogus CRC to the state file, and flush it to disk. + * This is to minimize the odds of failure within the critical section + * below --- in particular, running out of disk space. + * + * On most filesystems, write() rather than fsync() detects out-of-space, + * so the fsync might be considered optional. Using it means there + * are three fsyncs not two associated with preparing a transaction; is + * the risk of an error from fsync high enough to justify that? + */ + bogus_crc = ~ statefile_crc; + + if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + + if (pg_fsync(fd) != 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file: %m"))); + } + + /* Back up to prepare for rewriting the CRC */ + if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek twophase state file: %m"))); + } + + /* + * The state file isn't valid yet, because we haven't written the correct + * CRC yet. Before we do that, insert entry in WAL and flush it to disk. + * + * Between the time we have written the WAL entry and the time we + * flush the correct state file CRC to disk, we have an inconsistency: + * the xact is prepared according to WAL but not according to our on-disk + * state. We use a critical section to force a PANIC if we are unable to + * complete the flush --- then, WAL replay should repair the + * inconsistency. + * + * We have to lock out checkpoint start here, too; otherwise a checkpoint + * starting immediately after the WAL record is inserted could complete + * before we've finished flushing, meaning that the WAL record would not + * get replayed if a crash follows. + */ + START_CRIT_SECTION(); + + LWLockAcquire(CheckpointStartLock, LW_SHARED); + + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head); + XLogFlush(recptr); + + /* If we crash now, we have prepared: WAL replay will fix things */ + + /* write correct CRC, flush, and close file */ + if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + + if (pg_fsync(fd) != 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file: %m"))); + } + + if (close(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close twophase state file: %m"))); + + LWLockRelease(CheckpointStartLock); + + END_CRIT_SECTION(); + + records.tail = records.head = NULL; + } + + /* + * Register a 2PC record to be written to state file. + */ + void + RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, + const void *data, uint32 len) + { + TwoPhaseRecordOnDisk record; + + record.rmid = rmid; + record.info = info; + record.len = len; + save_state_data(&record, sizeof(TwoPhaseRecordOnDisk)); + if (len > 0) + save_state_data(data, len); + } + + + /* + * Read and validate the state file for xid. + * + * If it looks OK (has a valid magic number and CRC), return the palloc'd + * contents of the file. Otherwise return NULL. + */ + static char * + ReadTwoPhaseFile(TransactionId xid) + { + char path[MAXPGPATH]; + char *buf; + TwoPhaseFileHeader *hdr; + int fd; + struct stat stat; + uint32 crc_offset; + pg_crc32 calc_crc, file_crc; + + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open twophase state file \"%s\": %m", + path))); + return NULL; + } + + /* + * Check file length. We can determine a lower bound pretty easily. + * We set an upper bound mainly to avoid palloc() failure on a corrupt + * file. + */ + if (fstat(fd, &stat)) + { + close(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not stat twophase state file \"%s\": %m", + path))); + return NULL; + } + + if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) + + MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) + + sizeof(pg_crc32)) || + stat.st_size > 10000000) + { + close(fd); + return NULL; + } + + crc_offset = stat.st_size - sizeof(pg_crc32); + if (crc_offset != MAXALIGN(crc_offset)) + { + close(fd); + return NULL; + } + + /* + * OK, slurp in the file. + */ + buf = (char *) palloc(stat.st_size); + + if (read(fd, buf, stat.st_size) != stat.st_size) + { + close(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read twophase state file \"%s\": %m", + path))); + pfree(buf); + return NULL; + } + + close(fd); + + hdr = (TwoPhaseFileHeader *) buf; + if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size) + { + pfree(buf); + return NULL; + } + + INIT_CRC32(calc_crc); + COMP_CRC32(calc_crc, buf, crc_offset); + FIN_CRC32(calc_crc); + + file_crc = *((pg_crc32 *) (buf + crc_offset)); + + if (!EQ_CRC32(calc_crc, file_crc)) + { + pfree(buf); + return NULL; + } + + return buf; + } + + + /* + * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED + */ + void + FinishPreparedTransaction(char *gid, bool isCommit) + { + GlobalTransaction gxact; + TransactionId xid; + char *buf; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *children; + RelFileNode *commitrels; + RelFileNode *abortrels; + int i; + + /* + * Validate the GID, and lock the GXACT to ensure that two backends + * do not try to commit the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + xid = gxact->proc.xid; + + /* + * Read and validate the state file + */ + buf = ReadTwoPhaseFile(xid); + if (buf == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("twophase state file for transaction %u is corrupt", + xid))); + + /* + * Disassemble the header area + */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + children = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + commitrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + abortrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + /* + * The order of operations here is critical: make the XLOG entry for + * commit or abort, then mark the transaction committed or aborted in + * pg_clog, then remove its PGPROC from the global ProcArray (which + * means TransactionIdIsInProgress will stop saying the prepared xact + * is in progress), then run the post-commit or post-abort callbacks. + * The callbacks will release the locks the transaction held. + */ + if (isCommit) + RecordTransactionCommitPrepared(xid, + hdr->nsubxacts, children, + hdr->ncommitrels, commitrels); + else + RecordTransactionAbortPrepared(xid, + hdr->nsubxacts, children, + hdr->nabortrels, abortrels); + + ProcArrayRemove(&gxact->proc); + + /* + * In case we fail while running the callbacks, mark the gxact invalid + * so no one else will try to commit/rollback, and so it can be recycled + * properly later. It is still locked by our XID so it won't go away yet. + */ + gxact->valid = false; + + if (isCommit) + ProcessRecords(bufptr, xid, twophase_postcommit_callbacks); + else + ProcessRecords(bufptr, xid, twophase_postabort_callbacks); + + /* + * We also have to remove any files that were supposed to be dropped. + * NB: this code knows that we couldn't be dropping any temp rels ... + */ + if (isCommit) + { + for (i = 0; i < hdr->ncommitrels; i++) + smgrdounlink(smgropen(commitrels[i]), false, false); + } + else + { + for (i = 0; i < hdr->nabortrels; i++) + smgrdounlink(smgropen(abortrels[i]), false, false); + } + + pgstat_count_xact_commit(); + + /* + * And now we can clean up our mess. + */ + RemoveTwoPhaseFile(xid, true); + + RemoveGXact(gxact); + + pfree(buf); + } + + /* + * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile) + * and call the indicated callbacks for each 2PC record. + */ + static void + ProcessRecords(char *bufptr, TransactionId xid, + const TwoPhaseCallback callbacks[]) + { + for (;;) + { + TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr; + + Assert(record->rmid <= TWOPHASE_RM_MAX_ID); + if (record->rmid == TWOPHASE_RM_END_ID) + break; + + bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk)); + + if (callbacks[record->rmid] != NULL) + callbacks[record->rmid](xid, record->info, + (void *) bufptr, record->len); + + bufptr += MAXALIGN(record->len); + } + } + + /* + * Remove the 2PC file for the specified XID. + * + * If giveWarning is false, do not complain about file-not-present; + * this is an expected case during WAL replay. + */ + void + RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) + { + char path[MAXPGPATH]; + + TwoPhaseFilePath(path, xid); + if (unlink(path)) + if (errno != ENOENT || giveWarning) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove two-phase state file \"%s\": %m", + path))); + } + + /* + * Recreates a state file. This is used in WAL replay. + * + * Note: content and len don't include CRC. + */ + void + RecreateTwoPhaseFile(TransactionId xid, void *content, int len) + { + char path[MAXPGPATH]; + pg_crc32 statefile_crc; + int fd; + + /* Recompute CRC */ + INIT_CRC32(statefile_crc); + COMP_CRC32(statefile_crc, content, len); + FIN_CRC32(statefile_crc); + + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, + O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not recreate twophase state file \"%s\": %m", + path))); + + /* Write content and CRC */ + if (write(fd, content, len) != len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + + /* Sync and close the file */ + if (pg_fsync(fd) != 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file: %m"))); + } + + if (close(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close twophase state file: %m"))); + } + + /* + * PrescanPreparedTransactions + * + * Scan the pg_twophase directory and determine the range of valid XIDs + * present. This is run during database startup, after we have completed + * reading WAL. ShmemVariableCache->nextXid has been set to one more than + * the highest XID for which evidence exists in WAL. + * + * We throw away any prepared xacts with main XID beyond nextXid --- if any + * are present, it suggests that the DBA has done a PITR recovery to an + * earlier point in time without cleaning out pg_twophase. We dare not + * try to recover such prepared xacts since they likely depend on database + * state that doesn't exist now. + * + * However, we will advance nextXid beyond any subxact XIDs belonging to + * valid prepared xacts. We need to do this since subxact commit doesn't + * write a WAL entry, and so there might be no evidence in WAL of those + * subxact XIDs. + * + * Our other responsibility is to determine and return the oldest valid XID + * among the prepared xacts (if none, return ShmemVariableCache->nextXid). + * This is needed to synchronize pg_subtrans startup properly. + */ + TransactionId + PrescanPreparedTransactions(void) + { + TransactionId origNextXid = ShmemVariableCache->nextXid; + TransactionId result = origNextXid; + char dir[MAXPGPATH]; + DIR *cldir; + struct dirent *clde; + + snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR); + + cldir = AllocateDir(dir); + if (cldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", dir))); + + errno = 0; + while ((clde = readdir(cldir)) != NULL) + { + if (strlen(clde->d_name) == 8 && + strspn(clde->d_name, "0123456789ABCDEF") == 8) + { + TransactionId xid; + char *buf; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + int i; + + xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + ereport(WARNING, + (errmsg("removing future twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + /* + * Note: we can't check if already processed because clog + * subsystem isn't up yet. + */ + + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid); + if (buf == NULL) + { + ereport(WARNING, + (errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + if (!TransactionIdEquals(hdr->xid, xid)) + { + ereport(WARNING, + (errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + pfree(buf); + errno = 0; + continue; + } + + /* + * OK, we think this file is valid. Incorporate xid into the + * running-minimum result. + */ + if (TransactionIdPrecedes(xid, result)) + result = xid; + + /* + * Examine subtransaction XIDs ... they should all follow main + * XID, and they may force us to advance nextXid. + */ + subxids = (TransactionId *) + (buf + MAXALIGN(sizeof(TwoPhaseFileHeader))); + for (i = 0; i < hdr->nsubxacts; i++) + { + TransactionId subxid = subxids[i]; + + Assert(TransactionIdFollows(subxid, xid)); + if (TransactionIdFollowsOrEquals(subxid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = subxid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + } + + pfree(buf); + } + errno = 0; + } + #ifdef WIN32 + + /* + * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but + * not in released version + */ + if (GetLastError() == ERROR_NO_MORE_FILES) + errno = 0; + #endif + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", dir))); + + FreeDir(cldir); + + return result; + } + + /* + * RecoverPreparedTransactions + * + * Scan the pg_twophase directory and reload shared-memory state for each + * prepared transaction (reacquire locks, etc). This is run during database + * startup. + */ + void + RecoverPreparedTransactions(void) + { + char dir[MAXPGPATH]; + DIR *cldir; + struct dirent *clde; + + snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR); + + cldir = AllocateDir(dir); + if (cldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", dir))); + + errno = 0; + while ((clde = readdir(cldir)) != NULL) + { + if (strlen(clde->d_name) == 8 && + strspn(clde->d_name, "0123456789ABCDEF") == 8) + { + TransactionId xid; + char *buf; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + GlobalTransaction gxact; + int i; + + xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + ereport(WARNING, + (errmsg("removing stale twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid); + if (buf == NULL) + { + ereport(WARNING, + (errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + ereport(LOG, + (errmsg("recovering prepared transaction %u", xid))); + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], xid); + + /* + * Recreate its GXACT and dummy PGPROC + */ + gxact = MarkAsPreparing(xid, hdr->database, hdr->gid, hdr->owner); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); + MarkAsPrepared(gxact); + + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, xid, twophase_recover_callbacks); + + pfree(buf); + } + errno = 0; + } + #ifdef WIN32 + + /* + * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but + * not in released version + */ + if (GetLastError() == ERROR_NO_MORE_FILES) + errno = 0; + #endif + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", dir))); + + FreeDir(cldir); + } + + /* + * RecordTransactionCommitPrepared + * + * This is basically the same as RecordTransactionCommit: in particular, + * we must take the CheckpointStartLock to avoid a race condition. + * + * We know the transaction made at least one XLOG entry (its PREPARE), + * so it is never possible to optimize out the commit record. + */ + static void + RecordTransactionCommitPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels) + { + XLogRecData rdata[3]; + int lastrdata = 0; + xl_xact_commit_prepared xlrec; + XLogRecPtr recptr; + + START_CRIT_SECTION(); + + /* See notes in RecordTransactionCommit */ + LWLockAcquire(CheckpointStartLock, LW_SHARED); + + /* Emit the XLOG commit record */ + xlrec.xid = xid; + xlrec.crec.xtime = time(NULL); + xlrec.crec.nrels = nrels; + xlrec.crec.nsubxacts = nchildren; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactCommitPrepared; + rdata[0].buffer = InvalidBuffer; + /* dump rels to delete */ + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].data = (char *) rels; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].buffer = InvalidBuffer; + lastrdata = 1; + } + /* dump committed child Xids */ + if (nchildren > 0) + { + rdata[lastrdata].next = &(rdata[2]); + rdata[2].data = (char *) children; + rdata[2].len = nchildren * sizeof(TransactionId); + rdata[2].buffer = InvalidBuffer; + lastrdata = 2; + } + rdata[lastrdata].next = NULL; + + recptr = XLogInsert(RM_XACT_ID, + XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN, + rdata); + + /* we don't currently try to sleep before flush here ... */ + + /* Flush XLOG to disk */ + XLogFlush(recptr); + + /* Mark the transaction committed in pg_clog */ + TransactionIdCommit(xid); + /* to avoid race conditions, the parent must commit first */ + TransactionIdCommitTree(nchildren, children); + + /* Checkpoint is allowed again */ + LWLockRelease(CheckpointStartLock); + + END_CRIT_SECTION(); + } + + /* + * RecordTransactionAbortPrepared + * + * This is basically the same as RecordTransactionAbort. + * + * We know the transaction made at least one XLOG entry (its PREPARE), + * so it is never possible to optimize out the abort record. + */ + static void + RecordTransactionAbortPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels) + { + XLogRecData rdata[3]; + int lastrdata = 0; + xl_xact_abort_prepared xlrec; + XLogRecPtr recptr; + + /* + * Catch the scenario where we aborted partway through + * RecordTransactionCommitPrepared ... + */ + if (TransactionIdDidCommit(xid)) + elog(PANIC, "cannot abort transaction %u, it was already committed", + xid); + + START_CRIT_SECTION(); + + /* Emit the XLOG abort record */ + xlrec.xid = xid; + xlrec.arec.xtime = time(NULL); + xlrec.arec.nrels = nrels; + xlrec.arec.nsubxacts = nchildren; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactAbortPrepared; + rdata[0].buffer = InvalidBuffer; + /* dump rels to delete */ + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].data = (char *) rels; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].buffer = InvalidBuffer; + lastrdata = 1; + } + /* dump committed child Xids */ + if (nchildren > 0) + { + rdata[lastrdata].next = &(rdata[2]); + rdata[2].data = (char *) children; + rdata[2].len = nchildren * sizeof(TransactionId); + rdata[2].buffer = InvalidBuffer; + lastrdata = 2; + } + rdata[lastrdata].next = NULL; + + recptr = XLogInsert(RM_XACT_ID, + XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN, + rdata); + + /* Always flush, since we're about to remove the 2PC state file */ + XLogFlush(recptr); + + /* + * Mark the transaction aborted in clog. This is not absolutely + * necessary but we may as well do it while we are here. + */ + TransactionIdAbort(xid); + TransactionIdAbortTree(nchildren, children); + + END_CRIT_SECTION(); + } + *** src/backend/access/transam/twophase_rmgr.c.orig Thu Jun 16 15:18:52 2005 --- src/backend/access/transam/twophase_rmgr.c Fri Jun 17 14:40:06 2005 *************** *** 0 **** --- 1,49 ---- + /*------------------------------------------------------------------------- + * + * twophase_rmgr.c + * Two-phase-commit resource managers tables + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include "access/twophase_rmgr.h" + #include "commands/async.h" + #include "storage/lock.h" + #include "utils/flatfiles.h" + #include "utils/inval.h" + + + const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] = + { + NULL, /* END ID */ + lock_twophase_recover, /* Lock */ + NULL, /* Inval */ + NULL, /* flat file update */ + NULL /* notify/listen */ + }; + + const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] = + { + NULL, /* END ID */ + lock_twophase_postcommit, /* Lock */ + inval_twophase_postcommit, /* Inval */ + flatfile_twophase_postcommit, /* flat file update */ + notify_twophase_postcommit /* notify/listen */ + }; + + const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] = + { + NULL, /* END ID */ + lock_twophase_postabort, /* Lock */ + NULL, /* Inval */ + NULL, /* flat file update */ + NULL /* notify/listen */ + }; *** src/backend/access/transam/xact.c.orig Mon Jun 6 16:22:57 2005 --- src/backend/access/transam/xact.c Fri Jun 17 12:37:52 2005 *************** *** 22,27 **** --- 22,28 ---- #include "access/multixact.h" #include "access/subtrans.h" + #include "access/twophase.h" #include "access/xact.h" #include "catalog/heap.h" #include "catalog/index.h" *************** *** 68,74 **** TRANS_START, TRANS_INPROGRESS, TRANS_COMMIT, ! TRANS_ABORT } TransState; /* --- 69,76 ---- TRANS_START, TRANS_INPROGRESS, TRANS_COMMIT, ! TRANS_ABORT, ! TRANS_PREPARE } TransState; /* *************** *** 90,95 **** --- 92,98 ---- TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */ + TBLOCK_PREPARE, /* live xact, PREPARE received */ /* subtransaction states */ TBLOCK_SUBBEGIN, /* starting a subtransaction */ *************** *** 172,177 **** --- 175,186 ---- static AbsoluteTime xactStartTime; /* integer part */ static int xactStartTimeUsec; /* microsecond part */ + /* + * GID to be used for preparing the current transaction. This is also + * global to a whole transaction, so we don't keep it in the state stack. + */ + static char *prepareGID; + /* * List of add-on start- and end-of-xact callbacks *************** *** 267,276 **** return true; case TRANS_ABORT: return true; } /* ! * Shouldn't get here, but lint is not happy with this... */ return false; } --- 276,287 ---- return true; case TRANS_ABORT: return true; + case TRANS_PREPARE: + return true; } /* ! * Shouldn't get here, but lint is not happy without this... */ return false; } *************** *** 660,671 **** RecordTransactionCommit(void) { int nrels; ! RelFileNode *rptr; int nchildren; TransactionId *children; /* Get data needed for commit record */ ! nrels = smgrGetPendingDeletes(true, &rptr); nchildren = xactGetCommittedChildren(&children); /* --- 671,682 ---- RecordTransactionCommit(void) { int nrels; ! RelFileNode *rels; int nchildren; TransactionId *children; /* Get data needed for commit record */ ! nrels = smgrGetPendingDeletes(true, &rels); nchildren = xactGetCommittedChildren(&children); /* *************** *** 726,732 **** if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rptr; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; --- 737,743 ---- if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; *************** *** 809,820 **** MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; - /* Show myself as out of the transaction in PGPROC array */ - MyProc->logRec.xrecoff = 0; - /* And clean up local data */ ! if (rptr) ! pfree(rptr); if (children) pfree(children); } --- 820,828 ---- MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; /* And clean up local data */ ! if (rels) ! pfree(rels); if (children) pfree(children); } *************** *** 970,981 **** RecordTransactionAbort(void) { int nrels; ! RelFileNode *rptr; int nchildren; TransactionId *children; /* Get data needed for abort record */ ! nrels = smgrGetPendingDeletes(false, &rptr); nchildren = xactGetCommittedChildren(&children); /* --- 978,989 ---- RecordTransactionAbort(void) { int nrels; ! RelFileNode *rels; int nchildren; TransactionId *children; /* Get data needed for abort record */ ! nrels = smgrGetPendingDeletes(false, &rels); nchildren = xactGetCommittedChildren(&children); /* *************** *** 1026,1032 **** if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rptr; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; --- 1034,1040 ---- if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; *************** *** 1069,1080 **** MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; - /* Show myself as out of the transaction in PGPROC array */ - MyProc->logRec.xrecoff = 0; - /* And clean up local data */ ! if (rptr) ! pfree(rptr); if (children) pfree(children); } --- 1077,1085 ---- MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; /* And clean up local data */ ! if (rels) ! pfree(rels); if (children) pfree(children); } *************** *** 1166,1178 **** RecordSubTransactionAbort(void) { int nrels; ! RelFileNode *rptr; TransactionId xid = GetCurrentTransactionId(); int nchildren; TransactionId *children; /* Get data needed for abort record */ ! nrels = smgrGetPendingDeletes(false, &rptr); nchildren = xactGetCommittedChildren(&children); /* --- 1171,1183 ---- RecordSubTransactionAbort(void) { int nrels; ! RelFileNode *rels; TransactionId xid = GetCurrentTransactionId(); int nchildren; TransactionId *children; /* Get data needed for abort record */ ! nrels = smgrGetPendingDeletes(false, &rels); nchildren = xactGetCommittedChildren(&children); /* *************** *** 1212,1218 **** if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rptr; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; --- 1217,1223 ---- if (nrels > 0) { rdata[0].next = &(rdata[1]); ! rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; *************** *** 1256,1263 **** XidCacheRemoveRunningXids(xid, nchildren, children); /* And clean up local data */ ! if (rptr) ! pfree(rptr); if (children) pfree(children); } --- 1261,1268 ---- XidCacheRemoveRunningXids(xid, nchildren, children); /* And clean up local data */ ! if (rels) ! pfree(rels); if (children) pfree(children); } *************** *** 1419,1426 **** --- 1424,1434 ---- ShowTransactionState("StartTransaction"); } + /* * CommitTransaction + * + * NB: if you change this routine, better look at PrepareTransaction too! */ static void CommitTransaction(void) *************** *** 1510,1515 **** --- 1518,1525 ---- * xid 0 as running as well, or it will be able to see two tuple versions * - one deleted by xid 1 and one inserted by xid 0. See notes in * GetSnapshotData. + * + * Note: MyProc may be null during bootstrap. *---------- */ if (MyProc != NULL) *************** *** 1608,1613 **** --- 1618,1842 ---- RESUME_INTERRUPTS(); } + + /* + * PrepareTransaction + * + * NB: if you change this routine, better look at CommitTransaction too! + */ + static void + PrepareTransaction(void) + { + TransactionState s = CurrentTransactionState; + TransactionId xid = GetCurrentTransactionId(); + GlobalTransaction gxact; + + ShowTransactionState("PrepareTransaction"); + + /* + * check the current transaction state + */ + if (s->state != TRANS_INPROGRESS) + elog(WARNING, "PrepareTransaction while in %s state", + TransStateAsString(s->state)); + Assert(s->parent == NULL); + + /* + * Do pre-commit processing (most of this stuff requires database + * access, and in fact could still cause an error...) + * + * It is possible for PrepareHoldablePortals to invoke functions that + * queue deferred triggers, and it's also possible that triggers create + * holdable cursors. So we have to loop until there's nothing left to + * do. + */ + for (;;) + { + /* + * Fire all currently pending deferred triggers. + */ + AfterTriggerFireDeferred(); + + /* + * Convert any open holdable cursors into static portals. If there + * weren't any, we are done ... otherwise loop back to check if they + * queued deferred triggers. Lather, rinse, repeat. + */ + if (!PrepareHoldablePortals()) + break; + } + + /* Now we can shut down the deferred-trigger manager */ + AfterTriggerEndXact(true); + + /* Close any open regular cursors */ + AtCommit_Portals(); + + /* + * Let ON COMMIT management do its thing (must happen after closing + * cursors, to avoid dangling-reference problems) + */ + PreCommit_on_commit_actions(); + + /* close large objects before lower-level cleanup */ + AtEOXact_LargeObject(true); + + /* NOTIFY and flatfiles will be handled below */ + + /* Prevent cancel/die interrupt while cleaning up */ + HOLD_INTERRUPTS(); + + /* + * set the current transaction state information appropriately during + * the processing + */ + s->state = TRANS_PREPARE; + + /* Tell bufmgr and smgr to prepare for commit */ + BufmgrCommit(); + + /* + * Reserve the GID for this transaction. This could fail if the + * requested GID is invalid or already in use. + */ + gxact = MarkAsPreparing(xid, MyDatabaseId, prepareGID, GetUserId()); + prepareGID = NULL; + + /* + * Collect data for the 2PC state file. Note that in general, no actual + * state change should happen in the called modules during this step, + * since it's still possible to fail before commit, and in that case we + * want transaction abort to be able to clean up. (In particular, the + * AtPrepare routines may error out if they find cases they cannot + * handle.) State cleanup should happen in the PostPrepare routines + * below. However, some modules can go ahead and clear state here + * because they wouldn't do anything with it during abort anyway. + * + * Note: because the 2PC state file records will be replayed in the same + * order they are made, the order of these calls has to match the order + * in which we want things to happen during COMMIT PREPARED or + * ROLLBACK PREPARED; in particular, pay attention to whether things + * should happen before or after releasing the transaction's locks. + */ + StartPrepare(gxact); + + AtPrepare_Notify(); + AtPrepare_UpdateFlatFiles(); + AtPrepare_Inval(); + AtPrepare_Locks(); + + /* + * Here is where we really truly prepare. + * + * We have to record transaction prepares even if we didn't + * make any updates, because the transaction manager might + * get confused if we lose a global transaction. + */ + EndPrepare(gxact); + + /* + * Mark the prepared transaction as valid. As soon as we mark ourselves + * not running in MyProc below, others can commit/rollback the xact. + * + * NB: a side effect of this is to make a dummy ProcArray entry for the + * prepared XID. This must happen before we clear the XID from MyProc, + * else there is a window where the XID is not running according to + * TransactionIdInProgress, and onlookers would be entitled to assume + * the xact crashed. Instead we have a window where the same XID + * appears twice in ProcArray, which is OK. + */ + MarkAsPrepared(gxact); + + /* + * Now we clean up backend-internal state and release internal + * resources. + */ + + /* Break the chain of back-links in the XLOG records I output */ + MyLastRecPtr.xrecoff = 0; + MyXactMadeXLogEntry = false; + MyXactMadeTempRelUpdate = false; + + /* + * Let others know about no transaction in progress by me. This has + * to be done *after* the prepared transaction has been marked valid, + * else someone may think it is unlocked and recyclable. + */ + + /* Lock ProcArrayLock because that's what GetSnapshotData uses. */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->xid = InvalidTransactionId; + MyProc->xmin = InvalidTransactionId; + + /* Clear the subtransaction-XID cache too while holding the lock */ + MyProc->subxids.nxids = 0; + MyProc->subxids.overflowed = false; + + LWLockRelease(ProcArrayLock); + + /* + * This is all post-transaction cleanup. Note that if an error is raised + * here, it's too late to abort the transaction. This should be just + * noncritical resource releasing. See notes in CommitTransaction. + */ + + CallXactCallbacks(XACT_EVENT_PREPARE); + + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_BEFORE_LOCKS, + true, true); + + /* Check we've released all buffer pins */ + AtEOXact_Buffers(true); + + /* notify and flatfiles don't need a postprepare call */ + + PostPrepare_Inval(); + + PostPrepare_smgr(); + + AtEOXact_MultiXact(); + + PostPrepare_Locks(xid); + + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_LOCKS, + true, true); + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_AFTER_LOCKS, + true, true); + + /* PREPARE acts the same as COMMIT as far as GUC is concerned */ + AtEOXact_GUC(true, false); + AtEOXact_SPI(true); + AtEOXact_on_commit_actions(true); + AtEOXact_Namespace(true); + /* smgrcommit already done */ + AtEOXact_Files(); + + CurrentResourceOwner = NULL; + ResourceOwnerDelete(TopTransactionResourceOwner); + s->curTransactionOwner = NULL; + CurTransactionResourceOwner = NULL; + TopTransactionResourceOwner = NULL; + + AtCommit_Memory(); + + s->transactionId = InvalidTransactionId; + s->subTransactionId = InvalidSubTransactionId; + s->nestingLevel = 0; + s->childXids = NIL; + + /* + * done with 1st phase commit processing, set current transaction + * state back to default + */ + s->state = TRANS_DEFAULT; + + RESUME_INTERRUPTS(); + } + + /* * AbortTransaction */ *************** *** 1640,1646 **** /* * check the current transaction state */ ! if (s->state != TRANS_INPROGRESS) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); --- 1869,1875 ---- /* * check the current transaction state */ ! if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); *************** *** 1833,1838 **** --- 2062,2068 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(ERROR, "StartTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 1935,1940 **** --- 2165,2179 ---- break; /* + * We are completing a "PREPARE TRANSACTION" command. Do it and + * return to the idle state. + */ + case TBLOCK_PREPARE: + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; + + /* * We were just issued a SAVEPOINT inside a transaction block. * Start a subtransaction. (DefineSavepoint already did * PushTransaction, so as to have someplace to put the *************** *** 1964,1969 **** --- 2203,2214 ---- CommitTransaction(); s->blockState = TBLOCK_DEFAULT; } + else if (s->blockState == TBLOCK_PREPARE) + { + Assert(s->parent == NULL); + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + } else { Assert(s->blockState == TBLOCK_INPROGRESS || *************** *** 2156,2161 **** --- 2401,2417 ---- break; /* + * Here, we failed while trying to PREPARE. Clean up the + * transaction and return to idle state (we do not want to + * stay in the transaction). + */ + case TBLOCK_PREPARE: + AbortTransaction(); + CleanupTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; + + /* * We got an error inside a subtransaction. Abort just the * subtransaction, and go to the persistent SUBABORT state * until we get ROLLBACK. *************** *** 2487,2492 **** --- 2743,2749 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "BeginTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2494,2499 **** --- 2751,2807 ---- } /* + * PrepareTransactionBlock + * This executes a PREPARE command. + * + * Since PREPARE may actually do a ROLLBACK, the result indicates what + * happened: TRUE for PREPARE, FALSE for ROLLBACK. + * + * Note that we don't actually do anything here except change blockState. + * The real work will be done in the upcoming PrepareTransaction(). + * We do it this way because it's not convenient to change memory context, + * resource owner, etc while executing inside a Portal. + */ + bool + PrepareTransactionBlock(char *gid) + { + TransactionState s; + bool result; + + /* Set up to commit the current transaction */ + result = EndTransactionBlock(); + + /* If successful, change outer tblock state to PREPARE */ + if (result) + { + s = CurrentTransactionState; + + while (s->parent != NULL) + s = s->parent; + + if (s->blockState == TBLOCK_END) + { + /* Save GID where PrepareTransaction can find it again */ + prepareGID = MemoryContextStrdup(TopTransactionContext, gid); + + s->blockState = TBLOCK_PREPARE; + } + else + { + /* + * ignore case where we are not in a transaction; + * EndTransactionBlock already issued a warning. + */ + Assert(s->blockState == TBLOCK_STARTED); + /* Don't send back a PREPARE result tag... */ + result = false; + } + } + + return result; + } + + /* * EndTransactionBlock * This executes a COMMIT command. * *************** *** 2603,2608 **** --- 2911,2917 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2694,2699 **** --- 3003,3009 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2740,2745 **** --- 3050,3056 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "DefineSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2795,2800 **** --- 3106,3112 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "ReleaseSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2892,2897 **** --- 3204,3210 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 2999,3004 **** --- 3312,3318 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "BeginInternalSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 3064,3069 **** --- 3378,3384 ---- case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; *************** *** 3111,3116 **** --- 3426,3432 ---- case TBLOCK_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: + case TBLOCK_PREPARE: /* In a transaction, so clean up */ AbortTransaction(); CleanupTransaction(); *************** *** 3202,3207 **** --- 3518,3524 ---- case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBEND: + case TBLOCK_PREPARE: return 'T'; /* in transaction */ case TBLOCK_ABORT: case TBLOCK_SUBABORT: *************** *** 3684,3689 **** --- 4001,4008 ---- return "ABORT END"; case TBLOCK_ABORT_PENDING: return "ABORT PEND"; + case TBLOCK_PREPARE: + return "PREPARE"; case TBLOCK_SUBBEGIN: return "SUB BEGIN"; case TBLOCK_SUBINPROGRESS: *************** *** 3717,3728 **** return "DEFAULT"; case TRANS_START: return "START"; case TRANS_COMMIT: return "COMMIT"; case TRANS_ABORT: return "ABORT"; ! case TRANS_INPROGRESS: ! return "INPROGR"; } return "UNRECOGNIZED"; } --- 4036,4049 ---- return "DEFAULT"; case TRANS_START: return "START"; + case TRANS_INPROGRESS: + return "INPROGR"; case TRANS_COMMIT: return "COMMIT"; case TRANS_ABORT: return "ABORT"; ! case TRANS_PREPARE: ! return "PREPARE"; } return "UNRECOGNIZED"; } *************** *** 3767,3772 **** --- 4088,4163 ---- * XLOG support routines */ + static void + xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid) + { + TransactionId *sub_xids; + TransactionId max_xid; + int i; + + TransactionIdCommit(xid); + + /* Mark committed subtransactions as committed */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + TransactionIdCommitTree(xlrec->nsubxacts, sub_xids); + + /* Make sure nextXid is beyond any XID mentioned in the record */ + max_xid = xid; + for (i = 0; i < xlrec->nsubxacts; i++) + { + if (TransactionIdPrecedes(max_xid, sub_xids[i])) + max_xid = sub_xids[i]; + } + if (TransactionIdFollowsOrEquals(max_xid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = max_xid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + + /* Make sure files supposed to be dropped are dropped */ + for (i = 0; i < xlrec->nrels; i++) + { + XLogCloseRelation(xlrec->xnodes[i]); + smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + } + } + + static void + xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) + { + TransactionId *sub_xids; + TransactionId max_xid; + int i; + + TransactionIdAbort(xid); + + /* Mark subtransactions as aborted */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + TransactionIdAbortTree(xlrec->nsubxacts, sub_xids); + + /* Make sure nextXid is beyond any XID mentioned in the record */ + max_xid = xid; + for (i = 0; i < xlrec->nsubxacts; i++) + { + if (TransactionIdPrecedes(max_xid, sub_xids[i])) + max_xid = sub_xids[i]; + } + if (TransactionIdFollowsOrEquals(max_xid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = max_xid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + + /* Make sure files supposed to be dropped are dropped */ + for (i = 0; i < xlrec->nrels; i++) + { + XLogCloseRelation(xlrec->xnodes[i]); + smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + } + } + void xact_redo(XLogRecPtr lsn, XLogRecord *record) { *************** *** 3775,3912 **** if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); - TransactionId *sub_xids; - TransactionId max_xid; - int i; - - TransactionIdCommit(record->xl_xid); - - /* Mark committed subtransactions as committed */ - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - TransactionIdCommitTree(xlrec->nsubxacts, sub_xids); ! /* Make sure nextXid is beyond any XID mentioned in the record */ ! max_xid = record->xl_xid; ! for (i = 0; i < xlrec->nsubxacts; i++) ! { ! if (TransactionIdPrecedes(max_xid, sub_xids[i])) ! max_xid = sub_xids[i]; ! } ! if (TransactionIdFollowsOrEquals(max_xid, ! ShmemVariableCache->nextXid)) ! { ! ShmemVariableCache->nextXid = max_xid; ! TransactionIdAdvance(ShmemVariableCache->nextXid); ! } ! /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { ! XLogCloseRelation(xlrec->xnodes[i]); ! smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); } } ! else if (info == XLOG_XACT_ABORT) { ! xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); ! TransactionId *sub_xids; ! TransactionId max_xid; ! int i; ! ! TransactionIdAbort(record->xl_xid); ! ! /* Mark subtransactions as aborted */ ! sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); ! TransactionIdAbortTree(xlrec->nsubxacts, sub_xids); ! /* Make sure nextXid is beyond any XID mentioned in the record */ ! max_xid = record->xl_xid; for (i = 0; i < xlrec->nsubxacts; i++) ! { ! if (TransactionIdPrecedes(max_xid, sub_xids[i])) ! max_xid = sub_xids[i]; ! } ! if (TransactionIdFollowsOrEquals(max_xid, ! ShmemVariableCache->nextXid)) ! { ! ShmemVariableCache->nextXid = max_xid; ! TransactionIdAdvance(ShmemVariableCache->nextXid); ! } ! /* Make sure files supposed to be dropped are dropped */ for (i = 0; i < xlrec->nrels; i++) { ! XLogCloseRelation(xlrec->xnodes[i]); ! smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); } } ! else ! elog(PANIC, "xact_redo: unknown op code %u", info); } void xact_desc(char *buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; - int i; if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) rec; - struct tm *tm = localtime(&xlrec->xtime); - - sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u", - tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, - tm->tm_hour, tm->tm_min, tm->tm_sec); - if (xlrec->nrels > 0) - { - sprintf(buf + strlen(buf), "; rels:"); - for (i = 0; i < xlrec->nrels; i++) - { - RelFileNode rnode = xlrec->xnodes[i]; ! sprintf(buf + strlen(buf), " %u/%u/%u", ! rnode.spcNode, rnode.dbNode, rnode.relNode); ! } ! } ! if (xlrec->nsubxacts > 0) ! { ! TransactionId *xacts = (TransactionId *) ! &xlrec->xnodes[xlrec->nrels]; ! ! sprintf(buf + strlen(buf), "; subxacts:"); ! for (i = 0; i < xlrec->nsubxacts; i++) ! sprintf(buf + strlen(buf), " %u", xacts[i]); ! } } else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) rec; - struct tm *tm = localtime(&xlrec->xtime); ! sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u", ! tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, ! tm->tm_hour, tm->tm_min, tm->tm_sec); ! if (xlrec->nrels > 0) ! { ! sprintf(buf + strlen(buf), "; rels:"); ! for (i = 0; i < xlrec->nrels; i++) ! { ! RelFileNode rnode = xlrec->xnodes[i]; ! sprintf(buf + strlen(buf), " %u/%u/%u", ! rnode.spcNode, rnode.dbNode, rnode.relNode); ! } ! } ! if (xlrec->nsubxacts > 0) ! { ! TransactionId *xacts = (TransactionId *) ! &xlrec->xnodes[xlrec->nrels]; ! sprintf(buf + strlen(buf), "; subxacts:"); ! for (i = 0; i < xlrec->nsubxacts; i++) ! sprintf(buf + strlen(buf), " %u", xacts[i]); ! } } else strcat(buf, "UNKNOWN"); --- 4166,4302 ---- if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); ! xact_redo_commit(xlrec, record->xl_xid); ! } ! else if (info == XLOG_XACT_ABORT) ! { ! xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); ! ! xact_redo_abort(xlrec, record->xl_xid); ! } ! else if (info == XLOG_XACT_PREPARE) ! { ! /* the record contents are exactly the 2PC file */ ! RecreateTwoPhaseFile(record->xl_xid, ! XLogRecGetData(record), record->xl_len); ! } ! else if (info == XLOG_XACT_COMMIT_PREPARED) ! { ! xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); ! xact_redo_commit(&xlrec->crec, xlrec->xid); ! RemoveTwoPhaseFile(xlrec->xid, false); ! } ! else if (info == XLOG_XACT_ABORT_PREPARED) ! { ! xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record); ! ! xact_redo_abort(&xlrec->arec, xlrec->xid); ! RemoveTwoPhaseFile(xlrec->xid, false); ! } ! else ! elog(PANIC, "xact_redo: unknown op code %u", info); ! } ! ! static void ! xact_desc_commit(char *buf, xl_xact_commit *xlrec) ! { ! struct tm *tm = localtime(&xlrec->xtime); ! int i; ! ! sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u", ! tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, ! tm->tm_hour, tm->tm_min, tm->tm_sec); ! if (xlrec->nrels > 0) ! { ! sprintf(buf + strlen(buf), "; rels:"); for (i = 0; i < xlrec->nrels; i++) { ! RelFileNode rnode = xlrec->xnodes[i]; ! ! sprintf(buf + strlen(buf), " %u/%u/%u", ! rnode.spcNode, rnode.dbNode, rnode.relNode); } } ! if (xlrec->nsubxacts > 0) { ! TransactionId *xacts = (TransactionId *) ! &xlrec->xnodes[xlrec->nrels]; ! sprintf(buf + strlen(buf), "; subxacts:"); for (i = 0; i < xlrec->nsubxacts; i++) ! sprintf(buf + strlen(buf), " %u", xacts[i]); ! } ! } ! ! static void ! xact_desc_abort(char *buf, xl_xact_abort *xlrec) ! { ! struct tm *tm = localtime(&xlrec->xtime); ! int i; ! sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u", ! tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, ! tm->tm_hour, tm->tm_min, tm->tm_sec); ! if (xlrec->nrels > 0) ! { ! sprintf(buf + strlen(buf), "; rels:"); for (i = 0; i < xlrec->nrels; i++) { ! RelFileNode rnode = xlrec->xnodes[i]; ! ! sprintf(buf + strlen(buf), " %u/%u/%u", ! rnode.spcNode, rnode.dbNode, rnode.relNode); } } ! if (xlrec->nsubxacts > 0) ! { ! TransactionId *xacts = (TransactionId *) ! &xlrec->xnodes[xlrec->nrels]; ! ! sprintf(buf + strlen(buf), "; subxacts:"); ! for (i = 0; i < xlrec->nsubxacts; i++) ! sprintf(buf + strlen(buf), " %u", xacts[i]); ! } } void xact_desc(char *buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) rec; ! strcat(buf, "commit: "); ! xact_desc_commit(buf, xlrec); } else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) rec; ! strcat(buf, "abort: "); ! xact_desc_abort(buf, xlrec); ! } ! else if (info == XLOG_XACT_PREPARE) ! { ! strcat(buf, "prepare"); ! } ! else if (info == XLOG_XACT_COMMIT_PREPARED) ! { ! xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec; ! sprintf(buf + strlen(buf), "commit %u: ", xlrec->xid); ! xact_desc_commit(buf, &xlrec->crec); ! } ! else if (info == XLOG_XACT_ABORT_PREPARED) ! { ! xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec; ! sprintf(buf + strlen(buf), "abort %u: ", xlrec->xid); ! xact_desc_abort(buf, &xlrec->arec); } else strcat(buf, "UNKNOWN"); *** src/backend/access/transam/xlog.c.orig Fri Jun 10 16:38:50 2005 --- src/backend/access/transam/xlog.c Fri Jun 17 14:54:04 2005 *************** *** 25,30 **** --- 25,31 ---- #include "access/clog.h" #include "access/multixact.h" #include "access/subtrans.h" + #include "access/twophase.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xlog_internal.h" *************** *** 814,831 **** /* Compute record's XLOG location */ INSERT_RECPTR(RecPtr, Insert, curridx); - /* If first XLOG record of transaction, save it in PGPROC array */ - if (MyLastRecPtr.xrecoff == 0 && !no_tran) - { - /* - * We do not acquire ProcArrayLock here because of possible deadlock. - * Anyone who wants to inspect other procs' logRec must acquire - * WALInsertLock, instead. A better solution would be a per-PROC - * spinlock, but no time for that before 7.2 --- tgl 12/19/01. - */ - MyProc->logRec = RecPtr; - } - #ifdef WAL_DEBUG if (XLOG_DEBUG) { --- 815,820 ---- *************** *** 3827,3832 **** --- 3816,3822 ---- BootStrapCLOG(); BootStrapSUBTRANS(); BootStrapMultiXact(); + free(buffer); } *************** *** 4268,4273 **** --- 4258,4264 ---- uint32 endLogSeg; XLogRecord *record; uint32 freespace; + TransactionId oldestActiveXID; CritSectionCount++; *************** *** 4678,4710 **** XLogCtl->Write.curridx = NextBufIdx(0); } ! #ifdef NOT_USED ! /* UNDO */ ! if (InRecovery) ! { ! RecPtr = ReadRecPtr; ! if (XLByteLT(checkPoint.undo, RecPtr)) ! { ! ereport(LOG, ! (errmsg("undo starts at %X/%X", ! RecPtr.xlogid, RecPtr.xrecoff))); ! do ! { ! record = ReadRecord(&RecPtr, PANIC); ! if (TransactionIdIsValid(record->xl_xid) && ! !TransactionIdDidCommit(record->xl_xid)) ! RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record); ! RecPtr = record->xl_prev; ! } while (XLByteLE(checkPoint.undo, RecPtr)); ! ereport(LOG, ! (errmsg("undo done at %X/%X", ! ReadRecPtr.xlogid, ReadRecPtr.xrecoff))); ! } ! else ! ereport(LOG, ! (errmsg("undo is not required"))); ! } ! #endif if (InRecovery) { --- 4669,4676 ---- XLogCtl->Write.curridx = NextBufIdx(0); } ! /* Pre-scan prepared transactions to find out the range of XIDs present */ ! oldestActiveXID = PrescanPreparedTransactions(); if (InRecovery) { *************** *** 4767,4775 **** /* Start up the commit log and related stuff, too */ StartupCLOG(); ! StartupSUBTRANS(); StartupMultiXact(); ereport(LOG, (errmsg("database system is ready"))); CritSectionCount--; --- 4733,4744 ---- /* Start up the commit log and related stuff, too */ StartupCLOG(); ! StartupSUBTRANS(oldestActiveXID); StartupMultiXact(); + /* Reload shared-memory state for prepared transactions */ + RecoverPreparedTransactions(); + ereport(LOG, (errmsg("database system is ready"))); CritSectionCount--; *************** *** 5096,5126 **** } /* - * Get UNDO record ptr - this is oldest of PGPROC->logRec values. We - * do this while holding insert lock to ensure that we won't miss any - * about-to-commit transactions (UNDO must include all xacts that have - * commits after REDO point). - * - * XXX temporarily ifdef'd out to avoid three-way deadlock condition: - * GetUndoRecPtr needs to grab ProcArrayLock to ensure that it is looking - * at a stable set of proc records, but grabbing ProcArrayLock while - * holding WALInsertLock is no good. GetNewTransactionId may cause a - * WAL record to be written while holding XidGenLock, and - * GetSnapshotData needs to get XidGenLock while holding ProcArrayLock, - * so there's a risk of deadlock. Need to find a better solution. See - * pgsql-hackers discussion of 17-Dec-01. - * - * XXX actually, the whole UNDO code is dead code and unlikely to ever be - * revived, so the lack of a good solution here is not troubling. - */ - #ifdef NOT_USED - checkPoint.undo = GetUndoRecPtr(); - - if (shutdown && checkPoint.undo.xrecoff != 0) - elog(PANIC, "active transaction while database system is shutting down"); - #endif - - /* * Now we can release insert lock and checkpoint start lock, allowing * other xacts to proceed even while we are flushing disk buffers. */ --- 5065,5070 ---- *************** *** 5195,5216 **** /* * Select point at which we can truncate the log, which we base on the * prior checkpoint's earliest info. ! * ! * With UNDO support: oldest item is redo or undo, whichever is older; ! * but watch out for case that undo = 0. ! * ! * Without UNDO support: just use the redo pointer. This allows xlog ! * space to be freed much faster when there are long-running ! * transactions. ! */ ! #ifdef NOT_USED ! if (ControlFile->checkPointCopy.undo.xrecoff != 0 && ! XLByteLT(ControlFile->checkPointCopy.undo, ! ControlFile->checkPointCopy.redo)) ! XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg); ! else ! #endif ! XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); /* * Update the control file. --- 5139,5146 ---- /* * Select point at which we can truncate the log, which we base on the * prior checkpoint's earliest info. ! */ ! XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); /* * Update the control file. *** src/backend/catalog/system_views.sql.orig Tue May 17 17:46:09 2005 --- src/backend/catalog/system_views.sql Fri Jun 17 18:19:33 2005 *************** *** 102,107 **** --- 102,140 ---- REVOKE ALL on pg_statistic FROM public; + CREATE VIEW pg_locks AS + SELECT * + FROM pg_lock_status() AS L + (locktype text, database oid, relation oid, page int4, tuple int2, + transaction xid, classid oid, objid oid, objsubid int2, + pid int4, mode text, granted boolean); + + CREATE VIEW pg_prepared_xacts AS + SELECT P.transaction, P.gid, U.usename AS owner, D.datname AS database + FROM pg_prepared_xact() AS P + (transaction xid, gid text, ownerid int4, dbid oid) + LEFT JOIN pg_database D ON P.dbid = D.oid + LEFT JOIN pg_shadow U ON P.ownerid = U.usesysid; + + CREATE VIEW pg_settings AS + SELECT * + FROM pg_show_all_settings() AS A + (name text, setting text, category text, short_desc text, extra_desc text, + context text, vartype text, source text, min_val text, max_val text); + + CREATE RULE pg_settings_u AS + ON UPDATE TO pg_settings + WHERE new.name = old.name DO + SELECT set_config(old.name, new.setting, 'f'); + + CREATE RULE pg_settings_n AS + ON UPDATE TO pg_settings + DO INSTEAD NOTHING; + + GRANT SELECT, UPDATE ON pg_settings TO PUBLIC; + + -- Statistics views + CREATE VIEW pg_stat_all_tables AS SELECT C.oid AS relid, *************** *** 258,284 **** pg_stat_get_db_blocks_hit(D.oid) AS blks_read, pg_stat_get_db_blocks_hit(D.oid) AS blks_hit FROM pg_database D; - - CREATE VIEW pg_locks AS - SELECT * - FROM pg_lock_status() AS L - (locktype text, database oid, relation oid, page int4, tuple int2, - transaction xid, classid oid, objid oid, objsubid int2, - pid int4, mode text, granted boolean); - - CREATE VIEW pg_settings AS - SELECT * - FROM pg_show_all_settings() AS A - (name text, setting text, category text, short_desc text, extra_desc text, - context text, vartype text, source text, min_val text, max_val text); - - CREATE RULE pg_settings_u AS - ON UPDATE TO pg_settings - WHERE new.name = old.name DO - SELECT set_config(old.name, new.setting, 'f'); - - CREATE RULE pg_settings_n AS - ON UPDATE TO pg_settings - DO INSTEAD NOTHING; - - GRANT SELECT, UPDATE ON pg_settings TO PUBLIC; --- 291,293 ---- *** src/backend/commands/async.c.orig Thu May 19 15:34:48 2005 --- src/backend/commands/async.c Fri Jun 17 16:11:23 2005 *************** *** 78,83 **** --- 78,84 ---- #include #include "access/heapam.h" + #include "access/twophase_rmgr.h" #include "catalog/pg_listener.h" #include "commands/async.h" #include "libpq/libpq.h" *************** *** 407,412 **** --- 408,443 ---- CommitTransactionCommand(); } + + /* + *-------------------------------------------------------------- + * AtPrepare_Notify + * + * This is called at the prepare phase of a two-phase + * transaction. Save the state for possible commit later. + *-------------------------------------------------------------- + */ + void + AtPrepare_Notify(void) + { + ListCell *p; + + foreach(p, pendingNotifies) + { + const char *relname = (const char *) lfirst(p); + + RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, + relname, strlen(relname) + 1); + } + + /* + * We can clear the state immediately, rather than needing a separate + * PostPrepare call, because if the transaction fails we'd just + * discard the state anyway. + */ + ClearPendingNotifies(); + } + /* *-------------------------------------------------------------- * AtCommit_Notify *************** *** 1016,1023 **** foreach(p, pendingNotifies) { ! /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */ ! if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0) return true; } --- 1047,1055 ---- foreach(p, pendingNotifies) { ! const char *prelname = (const char *) lfirst(p); ! ! if (strcmp(prelname, relname) == 0) return true; } *************** *** 1036,1039 **** --- 1068,1090 ---- * list head pointer. */ pendingNotifies = NIL; + } + + /* + * 2PC processing routine for COMMIT PREPARED case. + * + * (We don't have to do anything for ROLLBACK PREPARED.) + */ + void + notify_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len) + { + /* + * Set up to issue the NOTIFY at the end of my own + * current transaction. (XXX this has some issues if my own + * transaction later rolls back, or if there is any significant + * delay before I commit. OK for now because we disallow + * COMMIT PREPARED inside a transaction block.) + */ + Async_Notify((char *) recdata); } *** src/backend/nodes/copyfuncs.c.orig Thu Jun 9 00:18:58 2005 --- src/backend/nodes/copyfuncs.c Thu Jun 16 15:18:21 2005 *************** *** 2085,2090 **** --- 2085,2091 ---- COPY_SCALAR_FIELD(kind); COPY_NODE_FIELD(options); + COPY_STRING_FIELD(gid); return newnode; } *** src/backend/nodes/equalfuncs.c.orig Thu Jun 9 00:18:58 2005 --- src/backend/nodes/equalfuncs.c Thu Jun 16 15:18:22 2005 *************** *** 1053,1058 **** --- 1053,1059 ---- { COMPARE_SCALAR_FIELD(kind); COMPARE_NODE_FIELD(options); + COMPARE_STRING_FIELD(gid); return true; } *** src/backend/parser/gram.y.orig Wed Jun 8 17:15:28 2005 --- src/backend/parser/gram.y Fri Jun 17 15:10:25 2005 *************** *** 387,393 **** ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER PARTIAL PASSWORD PLACING POSITION ! PRECISION PRESERVE PREPARE PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE --- 387,393 ---- ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER PARTIAL PASSWORD PLACING POSITION ! PRECISION PRESERVE PREPARE PREPARED PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE *************** *** 4121,4126 **** --- 4121,4147 ---- (Node *)makeString($4))); $$ = (Node *)n; } + | PREPARE TRANSACTION Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_PREPARE; + n->gid = $3; + $$ = (Node *)n; + } + | COMMIT PREPARED Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_COMMIT_PREPARED; + n->gid = $3; + $$ = (Node *)n; + } + | ROLLBACK PREPARED Sconst + { + TransactionStmt *n = makeNode(TransactionStmt); + n->kind = TRANS_STMT_ROLLBACK_PREPARED; + n->gid = $3; + $$ = (Node *)n; + } ; opt_transaction: WORK {} *************** *** 7855,7860 **** --- 7872,7878 ---- | PARTIAL | PASSWORD | PREPARE + | PREPARED | PRESERVE | PRIOR | PRIVILEGES *** src/backend/parser/keywords.c.orig Wed May 11 12:13:51 2005 --- src/backend/parser/keywords.c Fri Jun 17 14:54:06 2005 *************** *** 243,248 **** --- 243,249 ---- {"position", POSITION}, {"precision", PRECISION}, {"prepare", PREPARE}, + {"prepared", PREPARED}, {"preserve", PRESERVE}, {"primary", PRIMARY}, {"prior", PRIOR}, *** src/backend/postmaster/postmaster.c.orig Tue Jun 14 18:17:30 2005 --- src/backend/postmaster/postmaster.c Thu Jun 16 15:18:10 2005 *************** *** 252,258 **** static void pmdaemonize(void); static Port *ConnCreate(int serverFd); static void ConnFree(Port *port); ! static void reset_shared(unsigned short port); static void SIGHUP_handler(SIGNAL_ARGS); static void pmdie(SIGNAL_ARGS); static void reaper(SIGNAL_ARGS); --- 252,258 ---- static void pmdaemonize(void); static Port *ConnCreate(int serverFd); static void ConnFree(Port *port); ! static void reset_shared(int port); static void SIGHUP_handler(SIGNAL_ARGS); static void pmdie(SIGNAL_ARGS); static void reaper(SIGNAL_ARGS); *************** *** 1783,1789 **** * reset_shared -- reset shared memory and semaphores */ static void ! reset_shared(unsigned short port) { /* * Create or re-create shared memory and semaphores. --- 1783,1789 ---- * reset_shared -- reset shared memory and semaphores */ static void ! reset_shared(int port) { /* * Create or re-create shared memory and semaphores. *************** *** 1793,1799 **** * used to determine IPC keys. This helps ensure that we will clean * up dead IPC objects if the postmaster crashes and is restarted. */ ! CreateSharedMemoryAndSemaphores(false, MaxBackends, port); } --- 1793,1799 ---- * used to determine IPC keys. This helps ensure that we will clean * up dead IPC objects if the postmaster crashes and is restarted. */ ! CreateSharedMemoryAndSemaphores(false, port); } *************** *** 3182,3188 **** /* BackendRun will close sockets */ /* Attach process to shared data structures */ ! CreateSharedMemoryAndSemaphores(false, MaxBackends, 0); #ifdef USE_SSL /* --- 3182,3188 ---- /* BackendRun will close sockets */ /* Attach process to shared data structures */ ! CreateSharedMemoryAndSemaphores(false, 0); #ifdef USE_SSL /* *************** *** 3203,3209 **** ClosePostmasterPorts(false); /* Attach process to shared data structures */ ! CreateSharedMemoryAndSemaphores(false, MaxBackends, 0); BootstrapMain(argc - 2, argv + 2); proc_exit(0); --- 3203,3209 ---- ClosePostmasterPorts(false); /* Attach process to shared data structures */ ! CreateSharedMemoryAndSemaphores(false, 0); BootstrapMain(argc - 2, argv + 2); proc_exit(0); *** src/backend/storage/ipc/ipci.c.orig Thu May 19 17:35:46 2005 --- src/backend/storage/ipc/ipci.c Thu Jun 16 15:18:03 2005 *************** *** 17,22 **** --- 17,23 ---- #include "access/clog.h" #include "access/multixact.h" #include "access/subtrans.h" + #include "access/twophase.h" #include "access/xlog.h" #include "miscadmin.h" #include "postmaster/bgwriter.h" *************** *** 54,62 **** * memory. This is true for a standalone backend, false for a postmaster. */ void ! CreateSharedMemoryAndSemaphores(bool makePrivate, ! int maxBackends, ! int port) { PGShmemHeader *seghdr = NULL; --- 55,61 ---- * memory. This is true for a standalone backend, false for a postmaster. */ void ! CreateSharedMemoryAndSemaphores(bool makePrivate, int port) { PGShmemHeader *seghdr = NULL; *************** *** 72,86 **** */ size = hash_estimate_size(SHMEM_INDEX_SIZE, sizeof(ShmemIndexEnt)); size += BufferShmemSize(); ! size += LockShmemSize(maxBackends); ! size += ProcGlobalShmemSize(maxBackends); size += XLOGShmemSize(); size += CLOGShmemSize(); size += SUBTRANSShmemSize(); size += MultiXactShmemSize(); size += LWLockShmemSize(); ! size += ProcArrayShmemSize(maxBackends); ! size += SInvalShmemSize(maxBackends); size += FreeSpaceShmemSize(); size += BgWriterShmemSize(); #ifdef EXEC_BACKEND --- 71,86 ---- */ size = hash_estimate_size(SHMEM_INDEX_SIZE, sizeof(ShmemIndexEnt)); size += BufferShmemSize(); ! size += LockShmemSize(); ! size += ProcGlobalShmemSize(); size += XLOGShmemSize(); size += CLOGShmemSize(); size += SUBTRANSShmemSize(); + size += TwoPhaseShmemSize(); size += MultiXactShmemSize(); size += LWLockShmemSize(); ! size += ProcArrayShmemSize(); ! size += SInvalShmemSize(MaxBackends); size += FreeSpaceShmemSize(); size += BgWriterShmemSize(); #ifdef EXEC_BACKEND *************** *** 100,106 **** /* * Create semaphores */ ! numSemas = ProcGlobalSemas(maxBackends); numSemas += SpinlockSemas(); PGReserveSemaphores(numSemas, port); } --- 100,106 ---- /* * Create semaphores */ ! numSemas = ProcGlobalSemas(); numSemas += SpinlockSemas(); PGReserveSemaphores(numSemas, port); } *************** *** 144,149 **** --- 144,150 ---- XLOGShmemInit(); CLOGShmemInit(); SUBTRANSShmemInit(); + TwoPhaseShmemInit(); MultiXactShmemInit(); InitBufferPool(); *************** *** 151,168 **** * Set up lock manager */ InitLocks(); ! InitLockTable(maxBackends); /* * Set up process table */ ! InitProcGlobal(maxBackends); ! CreateSharedProcArray(maxBackends); /* * Set up shared-inval messaging */ ! CreateSharedInvalidationState(maxBackends); /* * Set up free-space map --- 152,169 ---- * Set up lock manager */ InitLocks(); ! InitLockTable(); /* * Set up process table */ ! InitProcGlobal(); ! CreateSharedProcArray(); /* * Set up shared-inval messaging */ ! CreateSharedInvalidationState(MaxBackends); /* * Set up free-space map *** src/backend/storage/ipc/procarray.c.orig Thu May 19 19:57:11 2005 --- src/backend/storage/ipc/procarray.c Fri Jun 17 15:51:11 2005 *************** *** 11,16 **** --- 11,21 ---- * Because of various subtle race conditions it is critical that a backend * hold the correct locks while setting or clearing its MyProc->xid field. * See notes in GetSnapshotData. + * + * The process array now also includes PGPROC structures representing + * prepared transactions. The xid and subxids fields of these are valid, + * as is the procLocks list. They can be distinguished from regular backend + * PGPROCs at need by checking for pid == 0. * * * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group *************** *** 25,30 **** --- 30,36 ---- #include "postgres.h" #include "access/subtrans.h" + #include "access/twophase.h" #include "miscadmin.h" #include "storage/proc.h" #include "storage/procarray.h" *************** *** 76,100 **** * Report shared-memory space needed by CreateSharedProcArray. */ int ! ProcArrayShmemSize(int maxBackends) { ! /* sizeof(ProcArrayStruct) includes the first array element */ ! return MAXALIGN(sizeof(ProcArrayStruct) + ! (maxBackends - 1) * sizeof(PGPROC *)); } /* * Initialize the shared PGPROC array during postmaster startup. */ void ! CreateSharedProcArray(int maxBackends) { bool found; /* Create or attach to the ProcArray shared structure */ procArray = (ProcArrayStruct *) ! ShmemInitStruct("Proc Array", ProcArrayShmemSize(maxBackends), ! &found); if (!found) { --- 82,104 ---- * Report shared-memory space needed by CreateSharedProcArray. */ int ! ProcArrayShmemSize(void) { ! return MAXALIGN(offsetof(ProcArrayStruct, procs) + ! (MaxBackends + max_prepared_xacts) * sizeof(PGPROC *)); } /* * Initialize the shared PGPROC array during postmaster startup. */ void ! CreateSharedProcArray(void) { bool found; /* Create or attach to the ProcArray shared structure */ procArray = (ProcArrayStruct *) ! ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found); if (!found) { *************** *** 102,119 **** * We're the first - initialize. */ procArray->numProcs = 0; ! procArray->maxProcs = maxBackends; } } /* ! * Add my own PGPROC (found in the global MyProc) to the shared array. ! * ! * This must be called during backend startup, after fully initializing ! * the contents of MyProc. */ void ! ProcArrayAddMyself(void) { ProcArrayStruct *arrayP = procArray; --- 106,120 ---- * We're the first - initialize. */ procArray->numProcs = 0; ! procArray->maxProcs = MaxBackends + max_prepared_xacts; } } /* ! * Add the specified PGPROC to the shared array. */ void ! ProcArrayAdd(PGPROC *proc) { ProcArrayStruct *arrayP = procArray; *************** *** 132,163 **** errmsg("sorry, too many clients already"))); } ! arrayP->procs[arrayP->numProcs] = MyProc; arrayP->numProcs++; LWLockRelease(ProcArrayLock); } /* ! * Remove my own PGPROC (found in the global MyProc) from the shared array. ! * ! * This must be called during backend shutdown. */ void ! ProcArrayRemoveMyself(void) { ProcArrayStruct *arrayP = procArray; int index; #ifdef XIDCACHE_DEBUG ! DisplayXidCache(); #endif LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); for (index = 0; index < arrayP->numProcs; index++) { ! if (arrayP->procs[index] == MyProc) { arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1]; arrayP->numProcs--; --- 133,164 ---- errmsg("sorry, too many clients already"))); } ! arrayP->procs[arrayP->numProcs] = proc; arrayP->numProcs++; LWLockRelease(ProcArrayLock); } /* ! * Remove the specified PGPROC from the shared array. */ void ! ProcArrayRemove(PGPROC *proc) { ProcArrayStruct *arrayP = procArray; int index; #ifdef XIDCACHE_DEBUG ! /* dump stats at backend shutdown, but not prepared-xact end */ ! if (proc->pid != 0) ! DisplayXidCache(); #endif LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); for (index = 0; index < arrayP->numProcs; index++) { ! if (arrayP->procs[index] == proc) { arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1]; arrayP->numProcs--; *************** *** 169,175 **** /* Ooops */ LWLockRelease(ProcArrayLock); ! elog(LOG, "failed to find my own proc %p in ProcArray", MyProc); } --- 170,176 ---- /* Ooops */ LWLockRelease(ProcArrayLock); ! elog(LOG, "failed to find proc %p in ProcArray", proc); } *************** *** 330,335 **** --- 331,385 ---- } /* + * TransactionIdIsActive -- is xid the top-level XID of an active backend? + * + * This differs from TransactionIdIsInProgress in that it ignores prepared + * transactions. Also, we ignore subtransactions since that's not needed + * for current uses. + */ + bool + TransactionIdIsActive(TransactionId xid) + { + bool result = false; + ProcArrayStruct *arrayP = procArray; + int i; + + /* + * Don't bother checking a transaction older than RecentXmin; it + * could not possibly still be running. + */ + if (TransactionIdPrecedes(xid, RecentXmin)) + return false; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (i = 0; i < arrayP->numProcs; i++) + { + PGPROC *proc = arrayP->procs[i]; + + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId pxid = proc->xid; + + if (!TransactionIdIsValid(pxid)) + continue; + + if (proc->pid == 0) + continue; /* ignore prepared transactions */ + + if (TransactionIdEquals(pxid, xid)) + { + result = true; + break; + } + } + + LWLockRelease(ProcArrayLock); + + return result; + } + + + /* * GetOldestXmin -- returns oldest transaction that was running * when any current transaction was started. * *************** *** 441,452 **** TransactionIdIsValid(MyProc->xmin)); /* ! * Allocating space for MaxBackends xids is usually overkill; * numProcs would be sufficient. But it seems better to do the * malloc while not holding the lock, so we can't look at numProcs. * * This does open a possibility for avoiding repeated malloc/free: since ! * MaxBackends does not change at runtime, we can simply reuse the * previous xip array if any. (This relies on the fact that all * callers pass static SnapshotData structs.) */ --- 491,502 ---- TransactionIdIsValid(MyProc->xmin)); /* ! * Allocating space for maxProcs xids is usually overkill; * numProcs would be sufficient. But it seems better to do the * malloc while not holding the lock, so we can't look at numProcs. * * This does open a possibility for avoiding repeated malloc/free: since ! * maxProcs does not change at runtime, we can simply reuse the * previous xip array if any. (This relies on the fact that all * callers pass static SnapshotData structs.) */ *************** *** 456,462 **** * First call for this snapshot */ snapshot->xip = (TransactionId *) ! malloc(MaxBackends * sizeof(TransactionId)); if (snapshot->xip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), --- 506,512 ---- * First call for this snapshot */ snapshot->xip = (TransactionId *) ! malloc(arrayP->maxProcs * sizeof(TransactionId)); if (snapshot->xip == NULL) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), *************** *** 602,615 **** /* * BackendPidGetProc -- get a backend's PGPROC given its PID */ ! struct PGPROC * BackendPidGetProc(int pid) { PGPROC *result = NULL; ProcArrayStruct *arrayP = procArray; int index; LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) --- 652,672 ---- /* * BackendPidGetProc -- get a backend's PGPROC given its PID + * + * Returns NULL if not found. Note that it is up to the caller to be + * sure that the question remains meaningful for long enough for the + * answer to be used ... */ ! PGPROC * BackendPidGetProc(int pid) { PGPROC *result = NULL; ProcArrayStruct *arrayP = procArray; int index; + if (pid == 0) /* never match dummy PGPROCs */ + return NULL; + LWLockAcquire(ProcArrayLock, LW_SHARED); for (index = 0; index < arrayP->numProcs; index++) *************** *** 642,651 **** * active transactions. This is used as a heuristic to decide if * a pre-XLOG-flush delay is worthwhile during commit. * ! * An active transaction is something that has written at least one XLOG ! * record; read-only transactions don't count. Also, do not count backends ! * that are blocked waiting for locks, since they are not going to get to ! * run until someone else commits. */ int CountActiveBackends(void) --- 699,706 ---- * active transactions. This is used as a heuristic to decide if * a pre-XLOG-flush delay is worthwhile during commit. * ! * Do not count backends that are blocked waiting for locks, since they are ! * not going to get to run until someone else commits. */ int CountActiveBackends(void) *************** *** 656,662 **** /* * Note: for speed, we don't acquire ProcArrayLock. This is a little bit ! * bogus, but since we are only testing xrecoff for zero or nonzero, * it should be OK. The result is only used for heuristic purposes * anyway... */ --- 711,717 ---- /* * Note: for speed, we don't acquire ProcArrayLock. This is a little bit ! * bogus, but since we are only testing fields for zero or nonzero, * it should be OK. The result is only used for heuristic purposes * anyway... */ *************** *** 666,672 **** if (proc == MyProc) continue; /* do not count myself */ ! if (proc->logRec.xrecoff == 0) continue; /* do not count if not in a transaction */ if (proc->waitLock != NULL) continue; /* do not count if blocked on a lock */ --- 721,729 ---- if (proc == MyProc) continue; /* do not count myself */ ! if (proc->pid == 0) ! continue; /* do not count prepared xacts */ ! if (proc->xid == InvalidTransactionId) continue; /* do not count if not in a transaction */ if (proc->waitLock != NULL) continue; /* do not count if blocked on a lock */ *************** *** 676,700 **** return count; } - /* - * CountEmptyBackendSlots - count empty slots in backend process table - * - * Acquiring the lock here is almost certainly overkill, but just in - * case fetching an int is not atomic on your machine ... - */ - int - CountEmptyBackendSlots(void) - { - int count; - - LWLockAcquire(ProcArrayLock, LW_SHARED); - - count = procArray->maxProcs - procArray->numProcs; - - LWLockRelease(ProcArrayLock); - - return count; - } #define XidCacheRemove(i) \ do { \ --- 733,738 ---- *** src/backend/storage/lmgr/lmgr.c.orig Tue Jun 14 18:15:32 2005 --- src/backend/storage/lmgr/lmgr.c Thu Jun 16 15:17:57 2005 *************** *** 77,83 **** * Create the lock table described by LockConflicts */ void ! InitLockTable(int maxBackends) { LOCKMETHODID LongTermTableId; --- 77,83 ---- * Create the lock table described by LockConflicts */ void ! InitLockTable(void) { LOCKMETHODID LongTermTableId; *************** *** 91,98 **** /* number of lock modes is lengthof()-1 because of dummy zero */ LockTableId = LockMethodTableInit("LockTable", LockConflicts, ! lengthof(LockConflicts) - 1, ! maxBackends); if (!LockMethodIsValid(LockTableId)) elog(ERROR, "could not initialize lock table"); Assert(LockTableId == DEFAULT_LOCKMETHOD); --- 91,97 ---- /* number of lock modes is lengthof()-1 because of dummy zero */ LockTableId = LockMethodTableInit("LockTable", LockConflicts, ! lengthof(LockConflicts) - 1); if (!LockMethodIsValid(LockTableId)) elog(ERROR, "could not initialize lock table"); Assert(LockTableId == DEFAULT_LOCKMETHOD); *** src/backend/storage/lmgr/lock.c.orig Tue Jun 14 18:15:32 2005 --- src/backend/storage/lmgr/lock.c Fri Jun 17 16:11:42 2005 *************** *** 33,38 **** --- 33,40 ---- #include #include + #include "access/twophase.h" + #include "access/twophase_rmgr.h" #include "access/xact.h" #include "miscadmin.h" #include "storage/proc.h" *************** *** 44,50 **** /* This configuration variable is used to set the lock table size */ int max_locks_per_xact; /* set by guc.c */ ! #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends)) /* --- 46,60 ---- /* This configuration variable is used to set the lock table size */ int max_locks_per_xact; /* set by guc.c */ ! #define NLOCKENTS() (max_locks_per_xact * (MaxBackends + max_prepared_xacts)) ! ! ! /* Record that's written to 2PC state file when a lock is persisted */ ! typedef struct TwoPhaseLockRecord ! { ! LOCKTAG locktag; ! LOCKMODE lockmode; ! } TwoPhaseLockRecord; /* *************** *** 168,175 **** /* ! * InitLocks -- Init the lock module. Create a private data ! * structure for constructing conflict masks. */ void InitLocks(void) --- 178,184 ---- /* ! * InitLocks -- Init the lock module. Nothing to do here at present. */ void InitLocks(void) *************** *** 222,229 **** LOCKMETHODID LockMethodTableInit(const char *tabName, const LOCKMASK *conflictsP, ! int numModes, ! int maxBackends) { LockMethod newLockMethod; LOCKMETHODID lockmethodid; --- 231,237 ---- LOCKMETHODID LockMethodTableInit(const char *tabName, const LOCKMASK *conflictsP, ! int numModes) { LockMethod newLockMethod; LOCKMETHODID lockmethodid; *************** *** 239,245 **** numModes, MAX_LOCKMODES - 1); /* Compute init/max size to request for lock hashtables */ ! max_table_size = NLOCKENTS(maxBackends); init_table_size = max_table_size / 2; /* Allocate a string for the shmem index table lookups. */ --- 247,253 ---- numModes, MAX_LOCKMODES - 1); /* Compute init/max size to request for lock hashtables */ ! max_table_size = NLOCKENTS(); init_table_size = max_table_size / 2; /* Allocate a string for the shmem index table lookups. */ *************** *** 1418,1427 **** while (proclock) { bool wakeupNeeded = false; ! PROCLOCK *nextHolder; /* Get link first, since we may unlink/delete this proclock */ ! nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); Assert(proclock->tag.proc == MAKE_OFFSET(MyProc)); --- 1426,1435 ---- while (proclock) { bool wakeupNeeded = false; ! PROCLOCK *nextplock; /* Get link first, since we may unlink/delete this proclock */ ! nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); Assert(proclock->tag.proc == MAKE_OFFSET(MyProc)); *************** *** 1474,1480 **** CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded); next_item: ! proclock = nextHolder; } LWLockRelease(masterLock); --- 1482,1488 ---- CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded); next_item: ! proclock = nextplock; } LWLockRelease(masterLock); *************** *** 1606,1618 **** /* * Estimate shared-memory space used for lock tables */ int ! LockShmemSize(int maxBackends) { int size = 0; ! long max_table_size = NLOCKENTS(maxBackends); /* lock method headers */ size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData)); --- 1614,1874 ---- /* + * AtPrepare_Locks + * Do the preparatory work for a PREPARE: make 2PC state file records + * for all locks currently held. + * + * User locks are non-transactional and are therefore ignored. + * + * There are some special cases that we error out on: we can't be holding + * any session locks (should be OK since only VACUUM uses those) and we + * can't be holding any locks on temporary objects (since that would mess + * up the current backend if it tries to exit before the prepared xact is + * committed). + */ + void + AtPrepare_Locks(void) + { + LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD; + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + + /* + * We don't need to touch shared memory for this --- all the necessary + * state information is in the locallock table. + */ + hash_seq_init(&status, LockMethodLocalHash[lockmethodid]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + TwoPhaseLockRecord record; + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + int i; + + /* Ignore items that are not of the lockmethod to be processed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* Ignore it if we don't actually hold the lock */ + if (locallock->nLocks <= 0) + continue; + + /* Scan to verify there are no session locks */ + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + /* elog not ereport since this should not happen */ + if (lockOwners[i].owner == NULL) + elog(ERROR, "cannot PREPARE when session locks exist"); + } + + /* Can't handle it if the lock is on a temporary object */ + if (locallock->isTempObject) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has operated on temporary tables"))); + + /* + * Create a 2PC record. + */ + memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG)); + record.lockmode = locallock->tag.mode; + + RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0, + &record, sizeof(TwoPhaseLockRecord)); + } + } + + /* + * PostPrepare_Locks + * Clean up after successful PREPARE + * + * Here, we want to transfer ownership of our locks to a dummy PGPROC + * that's now associated with the prepared transaction, and we want to + * clean out the corresponding entries in the LOCALLOCK table. + * + * Note: by removing the LOCALLOCK entries, we are leaving dangling + * pointers in the transaction's resource owner. This is OK at the + * moment since resowner.c doesn't try to free locks retail at a toplevel + * transaction commit or abort. We could alternatively zero out nLocks + * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll, + * but that probably costs more cycles. + */ + void + PostPrepare_Locks(TransactionId xid) + { + PGPROC *newproc = TwoPhaseGetDummyProc(xid); + LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD; + HASH_SEQ_STATUS status; + SHM_QUEUE *procLocks = &(MyProc->procLocks); + LWLockId masterLock; + LockMethod lockMethodTable; + int numLockModes; + LOCALLOCK *locallock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + bool found; + LOCK *lock; + + /* This is a critical section: any error means big trouble */ + START_CRIT_SECTION(); + + lockMethodTable = LockMethods[lockmethodid]; + if (!lockMethodTable) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + numLockModes = lockMethodTable->numLockModes; + masterLock = lockMethodTable->masterLock; + + /* + * First we run through the locallock table and get rid of unwanted + * entries, then we scan the process's proclocks and transfer them + * to the target proc. + * + * We do this separately because we may have multiple locallock + * entries pointing to the same proclock, and we daren't end up with + * any dangling pointers. + */ + hash_seq_init(&status, LockMethodLocalHash[lockmethodid]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + if (locallock->proclock == NULL || locallock->lock == NULL) + { + /* + * We must've run out of shared memory while trying to set up + * this lock. Just forget the local entry. + */ + Assert(locallock->nLocks == 0); + RemoveLocalLock(locallock); + continue; + } + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* We already checked there are no session locks */ + + /* Mark the proclock to show we need to release this lockmode */ + if (locallock->nLocks > 0) + locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); + + /* And remove the locallock hashtable entry */ + RemoveLocalLock(locallock); + } + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + + while (proclock) + { + PROCLOCK *nextplock; + LOCKMASK holdMask; + PROCLOCK *newproclock; + + /* Get link first, since we may unlink/delete this proclock */ + nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); + + Assert(proclock->tag.proc == MAKE_OFFSET(MyProc)); + + lock = (LOCK *) MAKE_PTR(proclock->tag.lock); + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCK_LOCKMETHOD(*lock) != lockmethodid) + goto next_item; + + PROCLOCK_PRINT("PostPrepare_Locks", proclock); + LOCK_PRINT("PostPrepare_Locks", lock, 0); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); + Assert((proclock->holdMask & ~lock->grantMask) == 0); + + /* + * Since there were no session locks, we should be releasing all locks + */ + if (proclock->releaseMask != proclock->holdMask) + elog(PANIC, "we seem to have dropped a bit somewhere"); + + holdMask = proclock->holdMask; + + /* + * We cannot simply modify proclock->tag.proc to reassign ownership + * of the lock, because that's part of the hash key and the proclock + * would then be in the wrong hash chain. So, unlink and delete the + * old proclock; create a new one with the right contents; and link + * it into place. We do it in this order to be certain we won't + * run out of shared memory (the way dynahash.c works, the deleted + * object is certain to be available for reallocation). + */ + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + if (!hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL)) + elog(PANIC, "proclock table corrupted"); + + /* + * Create the hash key for the new proclock table. + */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); + proclocktag.lock = MAKE_OFFSET(lock); + proclocktag.proc = MAKE_OFFSET(newproc); + + newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &proclocktag, + HASH_ENTER_NULL, &found); + if (!newproclock) + ereport(PANIC, /* should not happen */ + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errdetail("Not enough memory for reassigning the prepared transaction's locks."))); + + /* + * If new, initialize the new entry + */ + if (!found) + { + newproclock->holdMask = 0; + newproclock->releaseMask = 0; + /* Add new proclock to appropriate lists */ + SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink); + SHMQueueInsertBefore(&newproc->procLocks, &newproclock->procLink); + PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock); + } + else + { + PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock); + Assert((newproclock->holdMask & ~lock->grantMask) == 0); + } + + /* + * Pass over the identified lock ownership. + */ + Assert((newproclock->holdMask & holdMask) == 0); + newproclock->holdMask |= holdMask; + + next_item: + proclock = nextplock; + } + + LWLockRelease(masterLock); + + END_CRIT_SECTION(); + } + + + /* * Estimate shared-memory space used for lock tables */ int ! LockShmemSize(void) { int size = 0; ! long max_table_size = NLOCKENTS(); /* lock method headers */ size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData)); *************** *** 1704,1724 **** #ifdef LOCK_DEBUG /* ! * Dump all locks in the MyProc->procLocks list. * * Must have already acquired the masterLock. */ void ! DumpLocks(void) { - PGPROC *proc; SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int lockmethodid = DEFAULT_LOCKMETHOD; LockMethod lockMethodTable; - proc = MyProc; if (proc == NULL) return; --- 1960,1978 ---- #ifdef LOCK_DEBUG /* ! * Dump all locks in the given proc's procLocks list. * * Must have already acquired the masterLock. */ void ! DumpLocks(PGPROC *proc) { SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int lockmethodid = DEFAULT_LOCKMETHOD; LockMethod lockMethodTable; if (proc == NULL) return; *************** *** 1793,1795 **** --- 2047,2300 ---- } #endif /* LOCK_DEBUG */ + + /* + * LOCK 2PC resource manager's routines + */ + + /* + * Re-acquire a lock belonging to a transaction that was prepared. + * + * Because this function is run at db startup, re-acquiring the locks should + * never conflict with running transactions because there are none. We + * assume that the lock state represented by the stored 2PC files is legal. + */ + void + lock_twophase_recover(TransactionId xid, uint16 info, + void *recdata, uint32 len) + { + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + PGPROC *proc = TwoPhaseGetDummyProc(xid); + LOCKTAG *locktag; + LOCKMODE lockmode; + LOCKMETHODID lockmethodid; + LOCK *lock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + bool found; + LWLockId masterLock; + LockMethod lockMethodTable; + + Assert(len == sizeof(TwoPhaseLockRecord)); + locktag = &rec->locktag; + lockmode = rec->lockmode; + lockmethodid = locktag->locktag_lockmethodid; + + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; + if (!lockMethodTable) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + /* + * Find or create a lock with this tag. + */ + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) locktag, + HASH_ENTER_NULL, &found); + if (!lock) + { + LWLockRelease(masterLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You may need to increase max_locks_per_transaction."))); + } + + /* + * if it's a new lock object, initialize it + */ + if (!found) + { + lock->grantMask = 0; + lock->waitMask = 0; + SHMQueueInit(&(lock->procLocks)); + ProcQueueInit(&(lock->waitProcs)); + lock->nRequested = 0; + lock->nGranted = 0; + MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); + MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES); + LOCK_PRINT("lock_twophase_recover: new", lock, lockmode); + } + else + { + LOCK_PRINT("lock_twophase_recover: found", lock, lockmode); + Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); + Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); + } + + /* + * Create the hash key for the proclock table. + */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ + proclocktag.lock = MAKE_OFFSET(lock); + proclocktag.proc = MAKE_OFFSET(proc); + + /* + * Find or create a proclock entry with this tag + */ + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &proclocktag, + HASH_ENTER_NULL, &found); + if (!proclock) + { + /* Ooops, not enough shmem for the proclock */ + if (lock->nRequested == 0) + { + /* + * There are no other requestors of this lock, so garbage-collect + * the lock object. We *must* do this to avoid a permanent leak + * of shared memory, because there won't be anything to cause + * anyone to release the lock object later. + */ + Assert(SHMQueueEmpty(&(lock->procLocks))); + if (!hash_search(LockMethodLockHash[lockmethodid], + (void *) &(lock->tag), + HASH_REMOVE, NULL)) + elog(PANIC, "lock table corrupted"); + } + LWLockRelease(masterLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You may need to increase max_locks_per_transaction."))); + } + + /* + * If new, initialize the new entry + */ + if (!found) + { + proclock->holdMask = 0; + proclock->releaseMask = 0; + /* Add proclock to appropriate lists */ + SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); + SHMQueueInsertBefore(&proc->procLocks, &proclock->procLink); + PROCLOCK_PRINT("lock_twophase_recover: new", proclock); + } + else + { + PROCLOCK_PRINT("lock_twophase_recover: found", proclock); + Assert((proclock->holdMask & ~lock->grantMask) == 0); + } + + /* + * lock->nRequested and lock->requested[] count the total number of + * requests, whether granted or waiting, so increment those + * immediately. + */ + lock->nRequested++; + lock->requested[lockmode]++; + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + + /* + * We shouldn't already hold the desired lock. + */ + if (proclock->holdMask & LOCKBIT_ON(lockmode)) + elog(ERROR, "lock %s on object %u/%u/%u is already held", + lock_mode_names[lockmode], + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3); + + /* + * We ignore any possible conflicts and just grant ourselves the lock. + */ + GrantLock(lock, proclock, lockmode); + + LWLockRelease(masterLock); + } + + /* + * 2PC processing routine for COMMIT PREPARED case. + * + * Find and release the lock indicated by the 2PC record. + */ + void + lock_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len) + { + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + PGPROC *proc = TwoPhaseGetDummyProc(xid); + LOCKTAG *locktag; + LOCKMODE lockmode; + LOCKMETHODID lockmethodid; + PROCLOCKTAG proclocktag; + LOCK *lock; + PROCLOCK *proclock; + LWLockId masterLock; + LockMethod lockMethodTable; + bool wakeupNeeded; + + Assert(len == sizeof(TwoPhaseLockRecord)); + locktag = &rec->locktag; + lockmode = rec->lockmode; + lockmethodid = locktag->locktag_lockmethodid; + + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; + if (!lockMethodTable) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + /* + * Re-find the lock object (it had better be there). + */ + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) locktag, + HASH_FIND, NULL); + if (!lock) + elog(PANIC, "failed to re-find shared lock object"); + + /* + * Re-find the proclock object (ditto). + */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ + proclocktag.lock = MAKE_OFFSET(lock); + proclocktag.proc = MAKE_OFFSET(proc); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &proclocktag, + HASH_FIND, NULL); + if (!proclock) + elog(PANIC, "failed to re-find shared proclock object"); + + /* + * Double-check that we are actually holding a lock of the type we + * want to release. + */ + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) + { + PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock); + LWLockRelease(masterLock); + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); + return; + } + + /* + * Do the releasing. CleanUpLock will waken any now-wakable waiters. + */ + wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); + + CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded); + + LWLockRelease(masterLock); + } + + /* + * 2PC processing routine for ROLLBACK PREPARED case. + * + * This is actually just the same as the COMMIT case. + */ + void + lock_twophase_postabort(TransactionId xid, uint16 info, + void *recdata, uint32 len) + { + lock_twophase_postcommit(xid, info, recdata, len); + } *** src/backend/storage/lmgr/proc.c.orig Tue Jun 14 18:15:32 2005 --- src/backend/storage/lmgr/proc.c Thu Jun 16 15:29:18 2005 *************** *** 92,104 **** * Report shared-memory space needed by InitProcGlobal. */ int ! ProcGlobalShmemSize(int maxBackends) { int size = 0; size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ size += MAXALIGN(NUM_DUMMY_PROCS * sizeof(PGPROC)); /* DummyProcs */ ! size += MAXALIGN(maxBackends * sizeof(PGPROC)); /* MyProcs */ size += MAXALIGN(sizeof(slock_t)); /* ProcStructLock */ return size; --- 92,104 ---- * Report shared-memory space needed by InitProcGlobal. */ int ! ProcGlobalShmemSize(void) { int size = 0; size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ size += MAXALIGN(NUM_DUMMY_PROCS * sizeof(PGPROC)); /* DummyProcs */ ! size += MAXALIGN(MaxBackends * sizeof(PGPROC)); /* MyProcs */ size += MAXALIGN(sizeof(slock_t)); /* ProcStructLock */ return size; *************** *** 108,117 **** * Report number of semaphores needed by InitProcGlobal. */ int ! ProcGlobalSemas(int maxBackends) { /* We need a sema per backend, plus one for each dummy process. */ ! return maxBackends + NUM_DUMMY_PROCS; } /* --- 108,117 ---- * Report number of semaphores needed by InitProcGlobal. */ int ! ProcGlobalSemas(void) { /* We need a sema per backend, plus one for each dummy process. */ ! return MaxBackends + NUM_DUMMY_PROCS; } /* *************** *** 134,140 **** * postmaster, not in backends. */ void ! InitProcGlobal(int maxBackends) { bool foundProcGlobal, foundDummy; --- 134,140 ---- * postmaster, not in backends. */ void ! InitProcGlobal(void) { bool foundProcGlobal, foundDummy; *************** *** 170,182 **** * Pre-create the PGPROC structures and create a semaphore for * each. */ ! procs = (PGPROC *) ShmemAlloc(maxBackends * sizeof(PGPROC)); if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); ! MemSet(procs, 0, maxBackends * sizeof(PGPROC)); ! for (i = 0; i < maxBackends; i++) { PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = ProcGlobal->freeProcs; --- 170,182 ---- * Pre-create the PGPROC structures and create a semaphore for * each. */ ! procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC)); if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); ! MemSet(procs, 0, MaxBackends * sizeof(PGPROC)); ! for (i = 0; i < MaxBackends; i++) { PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = ProcGlobal->freeProcs; *************** *** 254,260 **** MyProc->xmin = InvalidTransactionId; MyProc->pid = MyProcPid; MyProc->databaseId = MyDatabaseId; - MyProc->logRec.xrecoff = 0; MyProc->lwWaiting = false; MyProc->lwExclusive = false; MyProc->lwWaitLink = NULL; --- 254,259 ---- *************** *** 265,271 **** /* * Add our PGPROC to the PGPROC array in shared memory. */ ! ProcArrayAddMyself(); /* * Arrange to clean up at backend exit. --- 264,270 ---- /* * Add our PGPROC to the PGPROC array in shared memory. */ ! ProcArrayAdd(MyProc); /* * Arrange to clean up at backend exit. *************** *** 332,338 **** MyProc->xid = InvalidTransactionId; MyProc->xmin = InvalidTransactionId; MyProc->databaseId = MyDatabaseId; - MyProc->logRec.xrecoff = 0; MyProc->lwWaiting = false; MyProc->lwExclusive = false; MyProc->lwWaitLink = NULL; --- 331,336 ---- *************** *** 353,358 **** --- 351,385 ---- } /* + * Check whether there are at least N free PGPROC objects. + * + * Note: this is designed on the assumption that N will generally be small. + */ + bool + HaveNFreeProcs(int n) + { + SHMEM_OFFSET offset; + PGPROC *proc; + /* use volatile pointer to prevent code rearrangement */ + volatile PROC_HDR *procglobal = ProcGlobal; + + SpinLockAcquire(ProcStructLock); + + offset = procglobal->freeProcs; + + while (n > 0 && offset != INVALID_OFFSET) + { + proc = (PGPROC *) MAKE_PTR(offset); + offset = proc->links.next; + n--; + } + + SpinLockRelease(ProcStructLock); + + return (n <= 0); + } + + /* * Cancel any pending wait for lock, when aborting a transaction. * * Returns true if we had been waiting for a lock, else false. *************** *** 478,484 **** #endif /* Remove our PGPROC from the PGPROC array in shared memory */ ! ProcArrayRemoveMyself(); SpinLockAcquire(ProcStructLock); --- 505,511 ---- #endif /* Remove our PGPROC from the PGPROC array in shared memory */ ! ProcArrayRemove(MyProc); SpinLockAcquire(ProcStructLock); *** src/backend/storage/smgr/smgr.c.orig Mon Jun 6 16:22:58 2005 --- src/backend/storage/smgr/smgr.c Fri Jun 17 16:31:23 2005 *************** *** 434,440 **** * during transactional operations, since it can't be undone. * * If isRedo is true, it is okay for the underlying file to be gone ! * already. (In practice isRedo will always be true.) * * This also implies smgrclose() on the SMgrRelation object. */ --- 434,440 ---- * during transactional operations, since it can't be undone. * * If isRedo is true, it is okay for the underlying file to be gone ! * already. * * This also implies smgrclose() on the SMgrRelation object. */ *************** *** 676,681 **** --- 676,705 ---- reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); } + + + /* + * PostPrepare_smgr -- Clean up after a successful PREPARE + * + * What we have to do here is throw away the in-memory state about pending + * relation deletes. It's all been recorded in the 2PC state file and + * it's no longer smgr's job to worry about it. + */ + void + PostPrepare_smgr(void) + { + PendingRelDelete *pending; + PendingRelDelete *next; + + for (pending = pendingDeletes; pending != NULL; pending = next) + { + next = pending->next; + pendingDeletes = next; + /* must explicitly free the list entry */ + pfree(pending); + } + } + /* * smgrDoPendingDeletes() -- Take care of relation deletes at end of xact. *** src/backend/tcop/postgres.c.orig Tue Jun 14 18:17:30 2005 --- src/backend/tcop/postgres.c Thu Jun 16 15:17:43 2005 *************** *** 930,935 **** --- 930,936 ---- TransactionStmt *stmt = (TransactionStmt *) parsetree; if (stmt->kind == TRANS_STMT_COMMIT || + stmt->kind == TRANS_STMT_PREPARE || stmt->kind == TRANS_STMT_ROLLBACK || stmt->kind == TRANS_STMT_ROLLBACK_TO) allowit = true; *************** *** 1261,1266 **** --- 1262,1268 ---- TransactionStmt *stmt = (TransactionStmt *) parsetree; if (stmt->kind == TRANS_STMT_COMMIT || + stmt->kind == TRANS_STMT_PREPARE || stmt->kind == TRANS_STMT_ROLLBACK || stmt->kind == TRANS_STMT_ROLLBACK_TO) allowit = true; *************** *** 1751,1756 **** --- 1753,1759 ---- is_trans_stmt = true; if (stmt->kind == TRANS_STMT_COMMIT || + stmt->kind == TRANS_STMT_PREPARE || stmt->kind == TRANS_STMT_ROLLBACK || stmt->kind == TRANS_STMT_ROLLBACK_TO) is_trans_exit = true; *** src/backend/tcop/utility.c.orig Thu Apr 28 17:47:15 2005 --- src/backend/tcop/utility.c Fri Jun 17 12:38:26 2005 *************** *** 17,22 **** --- 17,23 ---- #include "postgres.h" #include "access/heapam.h" + #include "access/twophase.h" #include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_shadow.h" *************** *** 383,393 **** if (strcmp(item->defname, "transaction_isolation") == 0) SetPGVariable("transaction_isolation", list_make1(item->arg), ! false); else if (strcmp(item->defname, "transaction_read_only") == 0) SetPGVariable("transaction_read_only", list_make1(item->arg), ! false); } } break; --- 384,394 ---- if (strcmp(item->defname, "transaction_isolation") == 0) SetPGVariable("transaction_isolation", list_make1(item->arg), ! true); else if (strcmp(item->defname, "transaction_read_only") == 0) SetPGVariable("transaction_read_only", list_make1(item->arg), ! true); } } break; *************** *** 401,406 **** --- 402,426 ---- } break; + case TRANS_STMT_PREPARE: + if (!PrepareTransactionBlock(stmt->gid)) + { + /* report unsuccessful commit in completionTag */ + if (completionTag) + strcpy(completionTag, "ROLLBACK"); + } + break; + + case TRANS_STMT_COMMIT_PREPARED: + PreventTransactionChain(stmt, "COMMIT PREPARED"); + FinishPreparedTransaction(stmt->gid, true); + break; + + case TRANS_STMT_ROLLBACK_PREPARED: + PreventTransactionChain(stmt, "ROLLBACK PREPARED"); + FinishPreparedTransaction(stmt->gid, false); + break; + case TRANS_STMT_ROLLBACK: UserAbortTransactionBlock(); break; *************** *** 1213,1218 **** --- 1233,1250 ---- case TRANS_STMT_RELEASE: tag = "RELEASE"; + break; + + case TRANS_STMT_PREPARE: + tag = "PREPARE TRANSACTION"; + break; + + case TRANS_STMT_COMMIT_PREPARED: + tag = "COMMIT PREPARED"; + break; + + case TRANS_STMT_ROLLBACK_PREPARED: + tag = "ROLLBACK PREPARED"; break; default: *** src/backend/utils/cache/inval.c.orig Wed Apr 13 21:38:19 2005 --- src/backend/utils/cache/inval.c Thu Jun 16 16:16:20 2005 *************** *** 86,91 **** --- 86,92 ---- */ #include "postgres.h" + #include "access/twophase_rmgr.h" #include "access/xact.h" #include "catalog/catalog.h" #include "miscadmin.h" *************** *** 171,176 **** --- 172,184 ---- static int cache_callback_count = 0; + /* info values for 2PC callback */ + #define TWOPHASE_INFO_MSG 0 /* SharedInvalidationMessage */ + #define TWOPHASE_INFO_FILE_BEFORE 1 /* relcache file inval */ + #define TWOPHASE_INFO_FILE_AFTER 2 /* relcache file inval */ + + static void PersistInvalidationMessage(SharedInvalidationMessage *msg); + /* ---------------------------------------------------------------- * Invalidation list support functions *************** *** 637,642 **** --- 645,700 ---- } /* + * AtPrepare_Inval + * Save the inval lists state at 2PC transaction prepare. + * + * In this phase we just generate 2PC records for all the pending invalidation + * work. + */ + void + AtPrepare_Inval(void) + { + /* Must be at top of stack */ + Assert(transInvalInfo != NULL && transInvalInfo->parent == NULL); + + /* + * Relcache init file invalidation requires processing both before + * and after we send the SI messages. + */ + if (transInvalInfo->RelcacheInitFileInval) + RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_BEFORE, + NULL, 0); + + AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, + &transInvalInfo->CurrentCmdInvalidMsgs); + + ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, + PersistInvalidationMessage); + + if (transInvalInfo->RelcacheInitFileInval) + RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_AFTER, + NULL, 0); + } + + /* + * PostPrepare_Inval + * Clean up after successful PREPARE. + * + * Here, we want to act as though the transaction aborted, so that we will + * undo any syscache changes it made, thereby bringing us into sync with the + * outside world, which doesn't believe the transaction committed yet. + * + * If the prepared transaction is later aborted, there is nothing more to + * do; if it commits, we will receive the consequent inval messages just + * like everyone else. + */ + void + PostPrepare_Inval(void) + { + AtEOXact_Inval(false); + } + + /* * AtSubStart_Inval * Initialize inval lists at start of a subtransaction. */ *************** *** 653,658 **** --- 711,757 ---- myInfo->my_level = GetCurrentTransactionNestLevel(); transInvalInfo = myInfo; } + + /* + * PersistInvalidationMessage + * Write an invalidation message to the 2PC state file. + */ + static void + PersistInvalidationMessage(SharedInvalidationMessage *msg) + { + RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_MSG, + msg, sizeof(SharedInvalidationMessage)); + } + + /* + * inval_twophase_postcommit + * Process an invalidation message from the 2PC state file. + */ + void + inval_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len) + { + SharedInvalidationMessage *msg; + + switch (info) + { + case TWOPHASE_INFO_MSG: + msg = (SharedInvalidationMessage *) recdata; + Assert(len == sizeof(SharedInvalidationMessage)); + SendSharedInvalidMessage(msg); + break; + case TWOPHASE_INFO_FILE_BEFORE: + RelationCacheInitFileInvalidate(true); + break; + case TWOPHASE_INFO_FILE_AFTER: + RelationCacheInitFileInvalidate(false); + break; + default: + Assert(false); + break; + } + } + /* * AtEOXact_Inval *** src/backend/utils/init/flatfiles.c.orig Mon Jun 6 13:01:24 2005 --- src/backend/utils/init/flatfiles.c Fri Jun 17 16:11:56 2005 *************** *** 32,37 **** --- 32,38 ---- #include #include "access/heapam.h" + #include "access/twophase_rmgr.h" #include "catalog/pg_database.h" #include "catalog/pg_group.h" #include "catalog/pg_namespace.h" *************** *** 48,57 **** --- 49,64 ---- #include "utils/syscache.h" + /* Actual names of the flat files (within $PGDATA/global/) */ #define DATABASE_FLAT_FILE "pg_database" #define GROUP_FLAT_FILE "pg_group" #define USER_FLAT_FILE "pg_pwd" + /* Info bits in a flatfiles 2PC record */ + #define FF_BIT_DATABASE 1 + #define FF_BIT_GROUP 2 + #define FF_BIT_USER 4 + /* * The need-to-update-files flags are SubTransactionIds that show *************** *** 757,762 **** --- 764,806 ---- SendPostmasterSignal(PMSIGNAL_PASSWORD_CHANGE); } + + /* + * This routine is called during transaction prepare. + * + * Record which files need to be refreshed if this transaction later + * commits. + * + * Note: it's OK to clear the flags immediately, since if the PREPARE fails + * further on, we'd only reset the flags anyway. So there's no need for a + * separate PostPrepare call. + */ + void + AtPrepare_UpdateFlatFiles(void) + { + uint16 info = 0; + + if (database_file_update_subid != InvalidSubTransactionId) + { + database_file_update_subid = InvalidSubTransactionId; + info |= FF_BIT_DATABASE; + } + if (group_file_update_subid != InvalidSubTransactionId) + { + group_file_update_subid = InvalidSubTransactionId; + info |= FF_BIT_GROUP; + } + if (user_file_update_subid != InvalidSubTransactionId) + { + user_file_update_subid = InvalidSubTransactionId; + info |= FF_BIT_USER; + } + if (info != 0) + RegisterTwoPhaseRecord(TWOPHASE_RM_FLATFILES_ID, info, + NULL, 0); + } + + /* * AtEOSubXact_UpdateFlatFiles * *************** *** 830,833 **** --- 874,902 ---- } return PointerGetDatum(NULL); + } + + + /* + * 2PC processing routine for COMMIT PREPARED case. + * + * (We don't have to do anything for ROLLBACK PREPARED.) + */ + void + flatfile_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len) + { + /* + * Set flags to do the needed file updates at the end of my own + * current transaction. (XXX this has some issues if my own + * transaction later rolls back, or if there is any significant + * delay before I commit. OK for now because we disallow + * COMMIT PREPARED inside a transaction block.) + */ + if (info & FF_BIT_DATABASE) + database_file_update_needed(); + if (info & FF_BIT_GROUP) + group_file_update_needed(); + if (info & FF_BIT_USER) + user_file_update_needed(); } *** src/backend/utils/init/postinit.c.orig Thu May 19 17:35:47 2005 --- src/backend/utils/init/postinit.c Thu Jun 16 15:17:31 2005 *************** *** 232,238 **** * We're running a postgres bootstrap process or a standalone * backend. Create private "shmem" and semaphores. */ ! CreateSharedMemoryAndSemaphores(true, MaxBackends, 0); } } --- 232,238 ---- * We're running a postgres bootstrap process or a standalone * backend. Create private "shmem" and semaphores. */ ! CreateSharedMemoryAndSemaphores(true, 0); } } *************** *** 456,462 **** */ if (!am_superuser && ReservedBackends > 0 && ! CountEmptyBackendSlots() < ReservedBackends) ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("connection limit exceeded for non-superusers"))); --- 456,462 ---- */ if (!am_superuser && ReservedBackends > 0 && ! !HaveNFreeProcs(ReservedBackends)) ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("connection limit exceeded for non-superusers"))); *** src/backend/utils/misc/guc.c.orig Tue Jun 14 18:17:31 2005 --- src/backend/utils/misc/guc.c Fri Jun 17 17:21:52 2005 *************** *** 25,30 **** --- 25,31 ---- #include "utils/guc.h" #include "utils/guc_tables.h" + #include "access/twophase.h" #include "catalog/namespace.h" #include "catalog/pg_type.h" #include "commands/async.h" *************** *** 1102,1107 **** --- 1103,1117 ---- 1000, 25, INT_MAX, NULL, NULL }, + { + {"max_prepared_transactions", PGC_POSTMASTER, RESOURCES, + gettext_noop("Sets the maximum number of simultaneously prepared transactions."), + NULL + }, + &max_prepared_xacts, + 50, 0, 10000, NULL, NULL + }, + #ifdef LOCK_DEBUG { {"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS, *** src/backend/utils/misc/postgresql.conf.sample.orig Fri Jun 10 16:38:56 2005 --- src/backend/utils/misc/postgresql.conf.sample Fri Jun 17 17:21:53 2005 *************** *** 79,84 **** --- 79,85 ---- #shared_buffers = 1000 # min 16, at least max_connections*2, 8KB each #temp_buffers = 1000 # min 100, 8KB each + #max_prepared_transactions = 50 # 0-10000 #work_mem = 1024 # min 64, size in KB #maintenance_work_mem = 16384 # min 1024, size in KB #max_stack_depth = 2048 # min 100, size in KB *** src/backend/utils/mmgr/portalmem.c.orig Sun May 29 00:23:06 2005 --- src/backend/utils/mmgr/portalmem.c Fri Jun 17 11:08:40 2005 *************** *** 467,472 **** --- 467,514 ---- } /* + * Pre-prepare processing for portals. + * + * Currently we refuse PREPARE if the transaction created any holdable + * cursors, since it's quite unclear what to do with one. However, this + * has the same API as CommitHoldablePortals and is invoked in the same + * way by xact.c, so that we can easily do something reasonable if anyone + * comes up with something reasonable to do. + * + * Returns TRUE if any holdable cursors were processed, FALSE if not. + */ + bool + PrepareHoldablePortals(void) + { + bool result = false; + HASH_SEQ_STATUS status; + PortalHashEnt *hentry; + + hash_seq_init(&status, PortalHashTable); + + while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL) + { + Portal portal = hentry->portal; + + /* Is it a holdable portal created in the current xact? */ + if ((portal->cursorOptions & CURSOR_OPT_HOLD) && + portal->createSubid != InvalidSubTransactionId && + portal->status == PORTAL_READY) + { + /* + * We are exiting the transaction that created a holdable + * cursor. Can't do PREPARE. + */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has created a cursor WITH HOLD"))); + } + } + + return result; + } + + /* * Pre-commit processing for portals. * * Remove all non-holdable portals created in this transaction. *** src/bin/initdb/initdb.c.orig Sat Apr 30 11:56:59 2005 --- src/bin/initdb/initdb.c Thu Jun 16 15:17:18 2005 *************** *** 2124,2129 **** --- 2124,2130 ---- "pg_xlog/archive_status", "pg_clog", "pg_subtrans", + "pg_twophase", "pg_multixact/members", "pg_multixact/offsets", "base", *** src/bin/psql/common.c.orig Tue Jun 14 11:11:29 2005 --- src/bin/psql/common.c Fri Jun 17 12:44:27 2005 *************** *** 1216,1221 **** --- 1216,1236 ---- return true; if (wordlen == 8 && pg_strncasecmp(query, "rollback", 8) == 0) return true; + if (wordlen == 7 && pg_strncasecmp(query, "prepare", 7) == 0) + { + /* PREPARE TRANSACTION is a TC command, PREPARE foo is not */ + query += wordlen; + + query = skip_white_space(query); + + wordlen = 0; + while (isalpha((unsigned char) query[wordlen])) + wordlen += PQmblen(&query[wordlen], pset.encoding); + + if (wordlen == 11 && pg_strncasecmp(query, "transaction", 11) == 0) + return true; + return false; + } /* * Commands not allowed within transactions. The statements checked *** src/include/access/subtrans.h.orig Fri Dec 31 17:46:40 2004 --- src/include/access/subtrans.h Fri Jun 17 14:00:36 2005 *************** *** 18,24 **** extern int SUBTRANSShmemSize(void); extern void SUBTRANSShmemInit(void); extern void BootStrapSUBTRANS(void); ! extern void StartupSUBTRANS(void); extern void ShutdownSUBTRANS(void); extern void CheckPointSUBTRANS(void); extern void ExtendSUBTRANS(TransactionId newestXact); --- 18,24 ---- extern int SUBTRANSShmemSize(void); extern void SUBTRANSShmemInit(void); extern void BootStrapSUBTRANS(void); ! extern void StartupSUBTRANS(TransactionId oldestActiveXID); extern void ShutdownSUBTRANS(void); extern void CheckPointSUBTRANS(void); extern void ExtendSUBTRANS(TransactionId newestXact); *** src/include/access/twophase.h.orig Thu Jun 16 15:16:55 2005 --- src/include/access/twophase.h Fri Jun 17 14:00:37 2005 *************** *** 0 **** --- 1,49 ---- + /*------------------------------------------------------------------------- + * + * twophase.h + * Two-phase-commit related declarations. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #ifndef TWOPHASE_H + #define TWOPHASE_H + + #include "storage/lock.h" + + + /* + * GlobalTransactionData is defined in twophase.c; other places have no + * business knowing the internal definition. + */ + typedef struct GlobalTransactionData *GlobalTransaction; + + /* GUC variable */ + extern int max_prepared_xacts; + + extern int TwoPhaseShmemSize(void); + extern void TwoPhaseShmemInit(void); + + extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid); + + extern GlobalTransaction MarkAsPreparing(TransactionId xid, Oid databaseid, + char *gid, AclId owner); + extern void MarkAsPrepared(GlobalTransaction gxact); + + extern void StartPrepare(GlobalTransaction gxact); + extern void EndPrepare(GlobalTransaction gxact); + + extern TransactionId PrescanPreparedTransactions(void); + extern void RecoverPreparedTransactions(void); + + extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len); + extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning); + + extern void FinishPreparedTransaction(char *gid, bool isCommit); + + #endif /* TWOPHASE_H */ *** src/include/access/twophase_rmgr.h.orig Thu Jun 16 15:16:55 2005 --- src/include/access/twophase_rmgr.h Thu Jun 16 16:01:11 2005 *************** *** 0 **** --- 1,39 ---- + /*------------------------------------------------------------------------- + * + * twophase_rmgr.h + * Two-phase-commit resource managers definition + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #ifndef TWOPHASE_RMGR_H + #define TWOPHASE_RMGR_H + + typedef void (*TwoPhaseCallback) (TransactionId xid, uint16 info, + void *recdata, uint32 len); + typedef uint8 TwoPhaseRmgrId; + + /* + * Built-in resource managers + */ + #define TWOPHASE_RM_END_ID 0 + #define TWOPHASE_RM_LOCK_ID 1 + #define TWOPHASE_RM_INVAL_ID 2 + #define TWOPHASE_RM_FLATFILES_ID 3 + #define TWOPHASE_RM_NOTIFY_ID 4 + #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_NOTIFY_ID + + extern const TwoPhaseCallback twophase_recover_callbacks[]; + extern const TwoPhaseCallback twophase_postcommit_callbacks[]; + extern const TwoPhaseCallback twophase_postabort_callbacks[]; + + + extern void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, + const void *data, uint32 len); + + #endif /* TWOPHASE_RMGR_H */ *** src/include/access/xact.h.orig Mon Jun 6 13:01:24 2005 --- src/include/access/xact.h Fri Jun 17 12:40:27 2005 *************** *** 47,53 **** typedef enum { XACT_EVENT_COMMIT, ! XACT_EVENT_ABORT } XactEvent; typedef void (*XactCallback) (XactEvent event, void *arg); --- 47,54 ---- typedef enum { XACT_EVENT_COMMIT, ! XACT_EVENT_ABORT, ! XACT_EVENT_PREPARE } XactEvent; typedef void (*XactCallback) (XactEvent event, void *arg); *************** *** 72,79 **** * XLOG allows to store some information in high 4 bits of log * record xl_info field */ ! #define XLOG_XACT_COMMIT 0x00 ! #define XLOG_XACT_ABORT 0x20 typedef struct xl_xact_commit { --- 73,83 ---- * XLOG allows to store some information in high 4 bits of log * record xl_info field */ ! #define XLOG_XACT_COMMIT 0x00 ! #define XLOG_XACT_PREPARE 0x10 ! #define XLOG_XACT_ABORT 0x20 ! #define XLOG_XACT_COMMIT_PREPARED 0x30 ! #define XLOG_XACT_ABORT_PREPARED 0x40 typedef struct xl_xact_commit { *************** *** 99,104 **** --- 103,133 ---- #define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes) + /* + * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records + * except that we have to store the XID of the prepared transaction explicitly + * --- the XID in the record header will be for the transaction doing the + * COMMIT PREPARED or ABORT PREPARED command. + */ + + typedef struct xl_xact_commit_prepared + { + TransactionId xid; /* XID of prepared xact */ + xl_xact_commit crec; /* COMMIT record */ + /* MORE DATA FOLLOWS AT END OF STRUCT */ + } xl_xact_commit_prepared; + + #define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes) + + typedef struct xl_xact_abort_prepared + { + TransactionId xid; /* XID of prepared xact */ + xl_xact_abort arec; /* ABORT record */ + /* MORE DATA FOLLOWS AT END OF STRUCT */ + } xl_xact_abort_prepared; + + #define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes) + /* ---------------- * extern definitions *************** *** 121,126 **** --- 150,156 ---- extern void AbortCurrentTransaction(void); extern void BeginTransactionBlock(void); extern bool EndTransactionBlock(void); + extern bool PrepareTransactionBlock(char *gid); extern void UserAbortTransactionBlock(void); extern void ReleaseSavepoint(List *options); extern void DefineSavepoint(char *name); *** src/include/catalog/catversion.h.orig Fri Jun 17 14:54:17 2005 --- src/include/catalog/catversion.h Fri Jun 17 15:26:00 2005 *************** *** 53,58 **** */ /* yyyymmddN */ ! #define CATALOG_VERSION_NO 200506151 #endif --- 53,58 ---- */ /* yyyymmddN */ ! #define CATALOG_VERSION_NO 200506171 #endif *** src/include/catalog/pg_proc.h.orig Tue Jun 14 18:17:37 2005 --- src/include/catalog/pg_proc.h Thu Jun 16 15:16:32 2005 *************** *** 3005,3010 **** --- 3005,3012 ---- DESCR("SHOW ALL as a function"); DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 f f t t v 0 2249 "" _null_ _null_ _null_ pg_lock_status - _null_ )); DESCR("view system lock information"); + DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 f f t t v 0 2249 "" _null_ _null_ _null_ pg_prepared_xact - _null_ )); + DESCR("view two-phase transactions"); DATA(insert OID = 2079 ( pg_table_is_visible PGNSP PGUID 12 f f t f s 1 16 "26" _null_ _null_ _null_ pg_table_is_visible - _null_ )); DESCR("is table visible in search path?"); *** src/include/commands/async.h.orig Fri Dec 31 17:46:46 2004 --- src/include/commands/async.h Thu Jun 16 16:01:05 2005 *************** *** 26,31 **** --- 26,32 ---- extern void AtSubStart_Notify(void); extern void AtSubCommit_Notify(void); extern void AtSubAbort_Notify(void); + extern void AtPrepare_Notify(void); /* signal handler for inbound notifies (SIGUSR2) */ extern void NotifyInterruptHandler(SIGNAL_ARGS); *************** *** 37,41 **** --- 38,45 ---- */ extern void EnableNotifyInterrupt(void); extern bool DisableNotifyInterrupt(void); + + extern void notify_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len); #endif /* ASYNC_H */ *** src/include/nodes/parsenodes.h.orig Sun Jun 5 18:32:57 2005 --- src/include/nodes/parsenodes.h Thu Jun 16 15:16:23 2005 *************** *** 1556,1562 **** TRANS_STMT_ROLLBACK, TRANS_STMT_SAVEPOINT, TRANS_STMT_RELEASE, ! TRANS_STMT_ROLLBACK_TO } TransactionStmtKind; typedef struct TransactionStmt --- 1556,1565 ---- TRANS_STMT_ROLLBACK, TRANS_STMT_SAVEPOINT, TRANS_STMT_RELEASE, ! TRANS_STMT_ROLLBACK_TO, ! TRANS_STMT_PREPARE, ! TRANS_STMT_COMMIT_PREPARED, ! TRANS_STMT_ROLLBACK_PREPARED } TransactionStmtKind; typedef struct TransactionStmt *************** *** 1564,1569 **** --- 1567,1573 ---- NodeTag type; TransactionStmtKind kind; /* see above */ List *options; /* for BEGIN/START and savepoint commands */ + char *gid; /* for two-phase-commit related commands */ } TransactionStmt; /* ---------------------- *** src/include/storage/ipc.h.orig Fri Dec 31 17:47:03 2004 --- src/include/storage/ipc.h Thu Jun 16 15:16:11 2005 *************** *** 29,36 **** extern void on_exit_reset(void); /* ipci.c */ ! extern void CreateSharedMemoryAndSemaphores(bool makePrivate, ! int maxBackends, ! int port); #endif /* IPC_H */ --- 29,34 ---- extern void on_exit_reset(void); /* ipci.c */ ! extern void CreateSharedMemoryAndSemaphores(bool makePrivate, int port); #endif /* IPC_H */ *** src/include/storage/lmgr.h.orig Tue Jun 14 18:15:33 2005 --- src/include/storage/lmgr.h Thu Jun 16 15:16:11 2005 *************** *** 41,47 **** * so increase that if you want to add more modes. */ ! extern void InitLockTable(int maxBackends); extern void RelationInitLockInfo(Relation relation); /* Lock a relation */ --- 41,47 ---- * so increase that if you want to add more modes. */ ! extern void InitLockTable(void); extern void RelationInitLockInfo(Relation relation); /* Lock a relation */ *** src/include/storage/lock.h.orig Tue Jun 14 18:15:33 2005 --- src/include/storage/lock.h Thu Jun 16 16:01:00 2005 *************** *** 370,376 **** extern LockMethod GetLocksMethodTable(LOCK *lock); extern LOCKMETHODID LockMethodTableInit(const char *tabName, const LOCKMASK *conflictsP, ! int numModes, int maxBackends); extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid); extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, --- 370,376 ---- extern LockMethod GetLocksMethodTable(LOCK *lock); extern LOCKMETHODID LockMethodTableInit(const char *tabName, const LOCKMASK *conflictsP, ! int numModes); extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid); extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, *************** *** 383,395 **** extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); extern void LockReleaseCurrentOwner(void); extern void LockReassignCurrentOwner(void); extern int LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock, PGPROC *proc); extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode); extern void GrantAwaitedLock(void); extern void RemoveFromWaitQueue(PGPROC *proc); ! extern int LockShmemSize(int maxBackends); extern bool DeadLockCheck(PGPROC *proc); extern void DeadLockReport(void); extern void RememberSimpleDeadLock(PGPROC *proc1, --- 383,397 ---- extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); extern void LockReleaseCurrentOwner(void); extern void LockReassignCurrentOwner(void); + extern void AtPrepare_Locks(void); + extern void PostPrepare_Locks(TransactionId xid); extern int LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock, PGPROC *proc); extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode); extern void GrantAwaitedLock(void); extern void RemoveFromWaitQueue(PGPROC *proc); ! extern int LockShmemSize(void); extern bool DeadLockCheck(PGPROC *proc); extern void DeadLockReport(void); extern void RememberSimpleDeadLock(PGPROC *proc1, *************** *** 400,407 **** extern LockData *GetLockStatusData(void); extern const char *GetLockmodeName(LOCKMODE mode); #ifdef LOCK_DEBUG ! extern void DumpLocks(void); extern void DumpAllLocks(void); #endif --- 402,416 ---- extern LockData *GetLockStatusData(void); extern const char *GetLockmodeName(LOCKMODE mode); + extern void lock_twophase_recover(TransactionId xid, uint16 info, + void *recdata, uint32 len); + extern void lock_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len); + extern void lock_twophase_postabort(TransactionId xid, uint16 info, + void *recdata, uint32 len); + #ifdef LOCK_DEBUG ! extern void DumpLocks(PGPROC *proc); extern void DumpAllLocks(void); #endif *** src/include/storage/lwlock.h.orig Thu May 19 17:35:47 2005 --- src/include/storage/lwlock.h Thu Jun 16 15:16:12 2005 *************** *** 46,51 **** --- 46,52 ---- MultiXactMemberControlLock, RelCacheInitLock, BgWriterCommLock, + TwoPhaseStateLock, NumFixedLWLocks, /* must be last except for * MaxDynamicLWLock */ *** src/include/storage/proc.h.orig Thu May 19 17:35:47 2005 --- src/include/storage/proc.h Thu Jun 16 15:16:12 2005 *************** *** 46,51 **** --- 46,58 ---- * links: list link for any list the PGPROC is in. When waiting for a lock, * the PGPROC is linked into that lock's waitProcs queue. A recycled PGPROC * is linked into ProcGlobal's freeProcs list. + * + * Note: twophase.c also sets up a dummy PGPROC struct for each currently + * prepared transaction. These PGPROCs appear in the ProcArray data structure + * so that the prepared transactions appear to be still running and are + * correctly shown as holding locks. A prepared transaction PGPROC can be + * distinguished from a real one at need by the fact that it has pid == 0. + * The semaphore and lock-related fields in a prepared-xact PGPROC are unused. */ struct PGPROC { *************** *** 62,77 **** * were starting our xact: vacuum must not * remove tuples deleted by xid >= xmin ! */ ! int pid; /* This backend's process id */ Oid databaseId; /* OID of database this backend is using */ - /* - * XLOG location of first XLOG record written by this backend's - * current transaction. If backend is not in a transaction or hasn't - * yet modified anything, logRec.xrecoff is zero. - */ - XLogRecPtr logRec; - /* Info about LWLock the process is currently waiting for, if any. */ bool lwWaiting; /* true if waiting for an LW lock */ bool lwExclusive; /* true if waiting for exclusive access */ --- 69,77 ---- * were starting our xact: vacuum must not * remove tuples deleted by xid >= xmin ! */ ! int pid; /* This backend's process id, or 0 */ Oid databaseId; /* OID of database this backend is using */ /* Info about LWLock the process is currently waiting for, if any. */ bool lwWaiting; /* true if waiting for an LW lock */ bool lwExclusive; /* true if waiting for exclusive access */ *************** *** 120,130 **** /* * Function Prototypes */ ! extern int ProcGlobalSemas(int maxBackends); ! extern int ProcGlobalShmemSize(int maxBackends); ! extern void InitProcGlobal(int maxBackends); extern void InitProcess(void); extern void InitDummyProcess(int proctype); extern void ProcReleaseLocks(bool isCommit); extern void ProcQueueInit(PROC_QUEUE *queue); --- 120,131 ---- /* * Function Prototypes */ ! extern int ProcGlobalSemas(void); ! extern int ProcGlobalShmemSize(void); ! extern void InitProcGlobal(void); extern void InitProcess(void); extern void InitDummyProcess(int proctype); + extern bool HaveNFreeProcs(int n); extern void ProcReleaseLocks(bool isCommit); extern void ProcQueueInit(PROC_QUEUE *queue); *** src/include/storage/procarray.h.orig Thu May 19 17:35:47 2005 --- src/include/storage/procarray.h Thu Jun 16 15:16:12 2005 *************** *** 14,34 **** #ifndef PROCARRAY_H #define PROCARRAY_H ! extern int ProcArrayShmemSize(int maxBackends); ! extern void CreateSharedProcArray(int maxBackends); ! extern void ProcArrayAddMyself(void); ! extern void ProcArrayRemoveMyself(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern TransactionId GetOldestXmin(bool allDbs); ! /* Use "struct PGPROC", not PGPROC, to avoid including proc.h here */ ! extern struct PGPROC *BackendPidGetProc(int pid); extern bool IsBackendPid(int pid); extern bool DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself); extern int CountActiveBackends(void); - extern int CountEmptyBackendSlots(void); extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, TransactionId *xids); --- 14,36 ---- #ifndef PROCARRAY_H #define PROCARRAY_H ! #include "storage/lock.h" ! ! ! extern int ProcArrayShmemSize(void); ! extern void CreateSharedProcArray(void); ! extern void ProcArrayAdd(PGPROC *proc); ! extern void ProcArrayRemove(PGPROC *proc); extern bool TransactionIdIsInProgress(TransactionId xid); + extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestXmin(bool allDbs); ! extern PGPROC *BackendPidGetProc(int pid); extern bool IsBackendPid(int pid); extern bool DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself); extern int CountActiveBackends(void); extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, TransactionId *xids); *** src/include/storage/smgr.h.orig Mon Jun 6 13:01:25 2005 --- src/include/storage/smgr.h Thu Jun 16 15:16:12 2005 *************** *** 79,84 **** --- 79,85 ---- extern int smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr); extern void AtSubCommit_smgr(void); extern void AtSubAbort_smgr(void); + extern void PostPrepare_smgr(void); extern void smgrcommit(void); extern void smgrabort(void); extern void smgrsync(void); *** src/include/utils/builtins.h.orig Fri May 27 17:52:01 2005 --- src/include/utils/builtins.h Thu Jun 16 15:16:02 2005 *************** *** 825,830 **** --- 825,833 ---- /* lockfuncs.c */ extern Datum pg_lock_status(PG_FUNCTION_ARGS); + /* access/transam/twophase.c */ + extern Datum pg_prepared_xact(PG_FUNCTION_ARGS); + /* catalog/pg_conversion.c */ extern Datum pg_convert_using(PG_FUNCTION_ARGS); *** src/include/utils/flatfiles.h.orig Wed May 11 12:14:04 2005 --- src/include/utils/flatfiles.h Thu Jun 16 16:00:53 2005 *************** *** 23,33 **** --- 23,37 ---- extern void BuildFlatFiles(bool database_only); + extern void AtPrepare_UpdateFlatFiles(void); extern void AtEOXact_UpdateFlatFiles(bool isCommit); extern void AtEOSubXact_UpdateFlatFiles(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid); extern Datum flatfile_update_trigger(PG_FUNCTION_ARGS); + + extern void flatfile_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len); #endif /* FLATFILES_H */ *** src/include/utils/inval.h.orig Fri Dec 31 17:47:09 2004 --- src/include/utils/inval.h Thu Jun 16 16:00:54 2005 *************** *** 30,35 **** --- 30,39 ---- extern void AtEOSubXact_Inval(bool isCommit); + extern void AtPrepare_Inval(void); + + extern void PostPrepare_Inval(void); + extern void CommandEndInvalidationMessages(void); extern void CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple); *************** *** 46,50 **** --- 50,57 ---- extern void CacheRegisterRelcacheCallback(CacheCallbackFunction func, Datum arg); + + extern void inval_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len); #endif /* INVAL_H */ *** src/include/utils/portal.h.orig Mon Apr 11 15:51:16 2005 --- src/include/utils/portal.h Fri Jun 17 11:08:32 2005 *************** *** 183,188 **** --- 183,189 ---- /* Prototypes for functions in utils/mmgr/portalmem.c */ extern void EnablePortalManager(void); extern bool CommitHoldablePortals(void); + extern bool PrepareHoldablePortals(void); extern void AtCommit_Portals(void); extern void AtAbort_Portals(void); extern void AtCleanup_Portals(void); *** src/test/regress/expected/prepared_xacts.out.orig Thu Jun 16 15:15:51 2005 --- src/test/regress/expected/prepared_xacts.out Thu Jun 16 18:36:35 2005 *************** *** 0 **** --- 1,213 ---- + -- + -- PREPARED TRANSACTIONS (two-phase commit) + -- + -- We can't readily test persistence of prepared xacts within the + -- regression script framework, unfortunately. Note that a crash + -- isn't really needed ... stopping and starting the postmaster would + -- be enough, but we can't even do that here. + -- create a simple table that we'll use in the tests + CREATE TABLE pxtest1 (foobar VARCHAR(10)); + INSERT INTO pxtest1 VALUES ('aaa'); + -- Test PREPARE TRANSACTION + BEGIN; + UPDATE pxtest1 SET foobar = 'bbb' WHERE foobar = 'aaa'; + SELECT * FROM pxtest1; + foobar + -------- + bbb + (1 row) + + PREPARE TRANSACTION 'foo1'; + SELECT * FROM pxtest1; + foobar + -------- + aaa + (1 row) + + -- Test pg_prepared_xacts system view + SELECT gid FROM pg_prepared_xacts; + gid + ------ + foo1 + (1 row) + + -- Test ROLLBACK PREPARED + ROLLBACK PREPARED 'foo1'; + SELECT * FROM pxtest1; + foobar + -------- + aaa + (1 row) + + SELECT gid FROM pg_prepared_xacts; + gid + ----- + (0 rows) + + -- Test COMMIT PREPARED + BEGIN; + INSERT INTO pxtest1 VALUES ('ddd'); + SELECT * FROM pxtest1; + foobar + -------- + aaa + ddd + (2 rows) + + PREPARE TRANSACTION 'foo2'; + SELECT * FROM pxtest1; + foobar + -------- + aaa + (1 row) + + COMMIT PREPARED 'foo2'; + SELECT * FROM pxtest1; + foobar + -------- + aaa + ddd + (2 rows) + + -- Test duplicate gids + BEGIN; + UPDATE pxtest1 SET foobar = 'eee' WHERE foobar = 'ddd'; + SELECT * FROM pxtest1; + foobar + -------- + aaa + eee + (2 rows) + + PREPARE TRANSACTION 'foo3'; + SELECT gid FROM pg_prepared_xacts; + gid + ------ + foo3 + (1 row) + + BEGIN; + INSERT INTO pxtest1 VALUES ('fff'); + SELECT * FROM pxtest1; + foobar + -------- + aaa + ddd + fff + (3 rows) + + -- This should fail, because the gid foo3 is already in use + PREPARE TRANSACTION 'foo3'; + ERROR: global transaction identifier "foo3" is already in use + SELECT * FROM pxtest1; + foobar + -------- + aaa + ddd + (2 rows) + + ROLLBACK PREPARED 'foo3'; + SELECT * FROM pxtest1; + foobar + -------- + aaa + ddd + (2 rows) + + -- Clean up + DROP TABLE pxtest1; + -- Test subtransactions + BEGIN; + CREATE TABLE pxtest2 (a int); + INSERT INTO pxtest2 VALUES (1); + SAVEPOINT a; + INSERT INTO pxtest2 VALUES (2); + ROLLBACK TO a; + SAVEPOINT b; + INSERT INTO pxtest2 VALUES (3); + PREPARE TRANSACTION 'regress-one'; + CREATE TABLE pxtest3(fff int); + -- Test shared invalidation + BEGIN; + DROP TABLE pxtest3; + CREATE TABLE pxtest4 (a int); + INSERT INTO pxtest4 VALUES (1); + INSERT INTO pxtest4 VALUES (2); + DECLARE foo CURSOR FOR SELECT * FROM pxtest4; + -- Fetch 1 tuple, keeping the cursor open + FETCH 1 FROM foo; + a + --- + 1 + (1 row) + + PREPARE TRANSACTION 'regress-two'; + -- No such cursor + FETCH 1 FROM foo; + ERROR: cursor "foo" does not exist + -- Table doesn't exist, the creation hasn't been committed yet + SELECT * FROM pxtest2; + ERROR: relation "pxtest2" does not exist + -- There should be two prepared transactions + SELECT gid FROM pg_prepared_xacts; + gid + ------------- + regress-one + regress-two + (2 rows) + + -- pxtest3 should be locked because of the pending DROP + set statement_timeout to 1000; + SELECT * FROM pxtest3; + ERROR: canceling query due to user request + reset statement_timeout; + -- Disconnect, we will continue testing in a different backend + \c - + -- There should still be two prepared transactions + SELECT gid FROM pg_prepared_xacts; + gid + ------------- + regress-one + regress-two + (2 rows) + + -- pxtest3 should still be locked because of the pending DROP + set statement_timeout to 1000; + SELECT * FROM pxtest3; + ERROR: canceling query due to user request + reset statement_timeout; + -- Commit table creation + COMMIT PREPARED 'regress-one'; + \d pxtest2 + Table "public.pxtest2" + Column | Type | Modifiers + --------+---------+----------- + a | integer | + + SELECT * FROM pxtest2; + a + --- + 1 + 3 + (2 rows) + + -- There should be one prepared transaction + SELECT gid FROM pg_prepared_xacts; + gid + ------------- + regress-two + (1 row) + + -- Commit table drop + COMMIT PREPARED 'regress-two'; + SELECT * FROM pxtest3; + ERROR: relation "pxtest3" does not exist + -- There should be no prepared transactions + SELECT gid FROM pg_prepared_xacts; + gid + ----- + (0 rows) + + -- Clean up + DROP TABLE pxtest2; + DROP TABLE pxtest4; *** src/test/regress/expected/rules.out.orig Tue May 17 17:24:11 2005 --- src/test/regress/expected/rules.out Thu Jun 16 18:36:43 2005 *************** *** 1279,1284 **** --- 1279,1285 ---- iexit | SELECT ih.name, ih.thepath, interpt_pp(ih.thepath, r.thepath) AS exit FROM ihighway ih, ramp r WHERE (ih.thepath ## r.thepath); pg_indexes | SELECT n.nspname AS schemaname, c.relname AS tablename, i.relname AS indexname, t.spcname AS "tablespace", pg_get_indexdef(i.oid) AS indexdef FROM ((((pg_index x JOIN pg_class c ON ((c.oid = x.indrelid))) JOIN pg_class i ON ((i.oid = x.indexrelid))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) LEFT JOIN pg_tablespace t ON ((t.oid = i.reltablespace))) WHERE ((c.relkind = 'r'::"char") AND (i.relkind = 'i'::"char")); pg_locks | SELECT l.locktype, l."database", l.relation, l.page, l.tuple, l."transaction", l.classid, l.objid, l.objsubid, l.pid, l."mode", l.granted FROM pg_lock_status() l(locktype text, "database" oid, relation oid, page integer, tuple smallint, "transaction" xid, classid oid, objid oid, objsubid smallint, pid integer, "mode" text, granted boolean); + pg_prepared_xacts | SELECT p."transaction", p.gid, u.usename AS "owner", d.datname AS "database" FROM ((pg_prepared_xact() p("transaction" xid, gid text, ownerid integer, dbid oid) LEFT JOIN pg_database d ON ((p.dbid = d.oid))) LEFT JOIN pg_shadow u ON ((p.ownerid = u.usesysid))); pg_rules | SELECT n.nspname AS schemaname, c.relname AS tablename, r.rulename, pg_get_ruledef(r.oid) AS definition FROM ((pg_rewrite r JOIN pg_class c ON ((c.oid = r.ev_class))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) WHERE (r.rulename <> '_RETURN'::name); pg_settings | SELECT a.name, a.setting, a.category, a.short_desc, a.extra_desc, a.context, a.vartype, a.source, a.min_val, a.max_val FROM pg_show_all_settings() a(name text, setting text, category text, short_desc text, extra_desc text, context text, vartype text, source text, min_val text, max_val text); pg_stat_activity | SELECT d.oid AS datid, d.datname, pg_stat_get_backend_pid(s.backendid) AS procpid, pg_stat_get_backend_userid(s.backendid) AS usesysid, u.usename, pg_stat_get_backend_activity(s.backendid) AS current_query, pg_stat_get_backend_activity_start(s.backendid) AS query_start, pg_stat_get_backend_start(s.backendid) AS backend_start, pg_stat_get_backend_client_addr(s.backendid) AS client_addr, pg_stat_get_backend_client_port(s.backendid) AS client_port FROM pg_database d, (SELECT pg_stat_get_backend_idset() AS backendid) s, pg_shadow u WHERE ((pg_stat_get_backend_dbid(s.backendid) = d.oid) AND (pg_stat_get_backend_userid(s.backendid) = u.usesysid)); *************** *** 1316,1322 **** shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color)))); street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath); toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp; ! (40 rows) SELECT tablename, rulename, definition FROM pg_rules ORDER BY tablename, rulename; --- 1317,1323 ---- shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color)))); street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath); toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp; ! (41 rows) SELECT tablename, rulename, definition FROM pg_rules ORDER BY tablename, rulename; *** src/test/regress/parallel_schedule.orig Fri Jun 18 02:14:25 2004 --- src/test/regress/parallel_schedule Thu Jun 16 15:15:38 2005 *************** *** 60,66 **** # ---------- # The fourth group of parallel test # ---------- ! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace test: privileges test: misc --- 60,66 ---- # ---------- # The fourth group of parallel test # ---------- ! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace prepared_xacts test: privileges test: misc *** src/test/regress/serial_schedule.orig Fri Jun 18 02:14:25 2004 --- src/test/regress/serial_schedule Thu Jun 16 15:15:39 2005 *************** *** 74,79 **** --- 74,80 ---- test: hash_index test: update test: namespace + test: prepared_xacts test: privileges test: misc test: select_views *** src/test/regress/sql/prepared_xacts.sql.orig Thu Jun 16 15:15:22 2005 --- src/test/regress/sql/prepared_xacts.sql Thu Jun 16 18:22:06 2005 *************** *** 0 **** --- 1,137 ---- + -- + -- PREPARED TRANSACTIONS (two-phase commit) + -- + -- We can't readily test persistence of prepared xacts within the + -- regression script framework, unfortunately. Note that a crash + -- isn't really needed ... stopping and starting the postmaster would + -- be enough, but we can't even do that here. + + + -- create a simple table that we'll use in the tests + CREATE TABLE pxtest1 (foobar VARCHAR(10)); + + INSERT INTO pxtest1 VALUES ('aaa'); + + + -- Test PREPARE TRANSACTION + BEGIN; + UPDATE pxtest1 SET foobar = 'bbb' WHERE foobar = 'aaa'; + SELECT * FROM pxtest1; + PREPARE TRANSACTION 'foo1'; + + SELECT * FROM pxtest1; + + -- Test pg_prepared_xacts system view + SELECT gid FROM pg_prepared_xacts; + + -- Test ROLLBACK PREPARED + ROLLBACK PREPARED 'foo1'; + + SELECT * FROM pxtest1; + + SELECT gid FROM pg_prepared_xacts; + + + -- Test COMMIT PREPARED + BEGIN; + INSERT INTO pxtest1 VALUES ('ddd'); + SELECT * FROM pxtest1; + PREPARE TRANSACTION 'foo2'; + + SELECT * FROM pxtest1; + + COMMIT PREPARED 'foo2'; + + SELECT * FROM pxtest1; + + -- Test duplicate gids + BEGIN; + UPDATE pxtest1 SET foobar = 'eee' WHERE foobar = 'ddd'; + SELECT * FROM pxtest1; + PREPARE TRANSACTION 'foo3'; + + SELECT gid FROM pg_prepared_xacts; + + BEGIN; + INSERT INTO pxtest1 VALUES ('fff'); + SELECT * FROM pxtest1; + + -- This should fail, because the gid foo3 is already in use + PREPARE TRANSACTION 'foo3'; + + SELECT * FROM pxtest1; + + ROLLBACK PREPARED 'foo3'; + + SELECT * FROM pxtest1; + + -- Clean up + DROP TABLE pxtest1; + + -- Test subtransactions + BEGIN; + CREATE TABLE pxtest2 (a int); + INSERT INTO pxtest2 VALUES (1); + SAVEPOINT a; + INSERT INTO pxtest2 VALUES (2); + ROLLBACK TO a; + SAVEPOINT b; + INSERT INTO pxtest2 VALUES (3); + PREPARE TRANSACTION 'regress-one'; + + CREATE TABLE pxtest3(fff int); + + -- Test shared invalidation + BEGIN; + DROP TABLE pxtest3; + CREATE TABLE pxtest4 (a int); + INSERT INTO pxtest4 VALUES (1); + INSERT INTO pxtest4 VALUES (2); + DECLARE foo CURSOR FOR SELECT * FROM pxtest4; + -- Fetch 1 tuple, keeping the cursor open + FETCH 1 FROM foo; + PREPARE TRANSACTION 'regress-two'; + + -- No such cursor + FETCH 1 FROM foo; + + -- Table doesn't exist, the creation hasn't been committed yet + SELECT * FROM pxtest2; + + -- There should be two prepared transactions + SELECT gid FROM pg_prepared_xacts; + + -- pxtest3 should be locked because of the pending DROP + set statement_timeout to 1000; + SELECT * FROM pxtest3; + reset statement_timeout; + + -- Disconnect, we will continue testing in a different backend + \c - + + -- There should still be two prepared transactions + SELECT gid FROM pg_prepared_xacts; + + -- pxtest3 should still be locked because of the pending DROP + set statement_timeout to 1000; + SELECT * FROM pxtest3; + reset statement_timeout; + + -- Commit table creation + COMMIT PREPARED 'regress-one'; + \d pxtest2 + SELECT * FROM pxtest2; + + -- There should be one prepared transaction + SELECT gid FROM pg_prepared_xacts; + + -- Commit table drop + COMMIT PREPARED 'regress-two'; + SELECT * FROM pxtest3; + + -- There should be no prepared transactions + SELECT gid FROM pg_prepared_xacts; + + -- Clean up + DROP TABLE pxtest2; + DROP TABLE pxtest4;