*** doc/src/sgml/catalogs.sgml.orig Mon Jun 13 19:14:47 2005
--- doc/src/sgml/catalogs.sgml Fri Jun 17 18:28:45 2005
***************
*** 3933,3938 ****
--- 3933,3943 ----
+ pg_prepared_xacts
+ currently prepared transactions
+
+
+
pg_rules
rules
***************
*** 4167,4174 ****
pid
integer
! process ID of the server process holding or awaiting this
! lock
mode
--- 4172,4181 ----
pid
integer
!
! Process ID of the server process holding or awaiting this
! lock. Zero if the lock is held by a prepared transaction.
!
mode
***************
*** 4246,4251 ****
--- 4253,4339 ----
procpid column of the
pg_stat_activity view to get more
information on the session holding or waiting to hold the lock.
+
+
+
+
+
+ pg_prepared_xacts
+
+
+ pg_prepared_xacts
+
+
+
+ The view pg_prepared_xacts displays
+ information about transactions that are currently prepared for two-phase
+ commit (see for details).
+
+
+
+ pg_prepared_xacts contains one row per prepared
+ transaction. An entry is removed when the transaction is committed or
+ rolled back.
+
+
+
+ pg_prepared_xacts> Columns
+
+
+
+
+ Name
+ Type
+ References
+ Description
+
+
+
+
+ transaction
+ xid
+
+
+ Numeric transaction identifier of the prepared transaction
+
+
+
+ gid
+ text
+
+
+ Global transaction identifier that was assigned to the transaction
+
+
+
+ owner
+ name
+ pg_shadow .usename
+
+ Name of the user that executed the transaction
+
+
+
+ database
+ name
+ pg_database .datname
+
+ Name of the database in which the transaction was executed
+
+
+
+
+
+
+
+ When the pg_prepared_xacts view is accessed, the
+ internal transaction manager data structures are momentarily locked, and
+ a copy is made for the view to display. This ensures that the
+ view produces a consistent set of results, while not blocking
+ normal operations longer than necessary. Nonetheless
+ there could be some impact on database performance if this view is
+ read often.
*** doc/src/sgml/ref/allfiles.sgml.orig Sat Aug 21 12:16:04 2004
--- doc/src/sgml/ref/allfiles.sgml Thu Jun 16 15:19:33 2005
***************
*** 30,35 ****
--- 30,36 ----
+
***************
*** 88,98 ****
--- 89,101 ----
+
+
*** doc/src/sgml/ref/commit_prepared.sgml.orig Thu Jun 16 15:19:31 2005
--- doc/src/sgml/ref/commit_prepared.sgml Fri Jun 17 18:28:41 2005
***************
*** 0 ****
--- 1,111 ----
+
+
+
+
+ COMMIT PREPARED
+ SQL - Language Statements
+
+
+
+ COMMIT PREPARED
+ commit a transaction that was earlier prepared for two-phase commit
+
+
+
+ COMMIT PREPARED
+
+
+
+
+ COMMIT PREPARED transaction_id
+
+
+
+
+ Description
+
+
+ COMMIT PREPARED commits a transaction that is in
+ prepared state.
+
+
+
+
+ Parameters
+
+
+
+ transaction_id
+
+
+ The transaction identifier of the transaction that is to be
+ committed.
+
+
+
+
+
+
+
+ Notes
+
+
+ To commit a prepared transaction, you must be either the same user that
+ executed the transaction originally, or a superuser. But you do not
+ have to be in the same session that executed the transaction.
+
+
+
+ This command cannot be executed inside a transaction block. The prepared
+ transaction is committed immediately.
+
+
+
+ All currently available prepared transactions are listed in the
+ pg_prepared_xacts> system view.
+
+
+
+
+ Examples
+
+ Commit the transaction identified by the transaction
+ identifier foobar>:
+
+
+ COMMIT PREPARED 'foobar';
+
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
+
+
+
*** doc/src/sgml/ref/prepare_transaction.sgml.orig Thu Jun 16 15:19:31 2005
--- doc/src/sgml/ref/prepare_transaction.sgml Fri Jun 17 18:28:41 2005
***************
*** 0 ****
--- 1,160 ----
+
+
+
+
+ PREPARE TRANSACTION
+ SQL - Language Statements
+
+
+
+ PREPARE TRANSACTION
+ prepare the current transaction for two-phase commit
+
+
+
+ PREPARE TRANSACTION
+
+
+
+
+ PREPARE TRANSACTION transaction_id
+
+
+
+
+ Description
+
+
+ PREPARE TRANSACTION prepares the current transaction
+ for two-phase commit. After this command, the transaction is no longer
+ associated with the current session; instead, its state is fully stored on
+ disk, and there is a very high probability that it can be committed
+ successfully, even if a database crash occurs before the commit is
+ requested.
+
+
+
+ Once prepared, a transaction can later be committed or rolled
+ back with COMMIT PREPARED or
+ ROLLBACK PREPARED , respectively. Those commands
+ can be issued from any session, not only the one that executed the
+ original transaction.
+
+
+
+ From the point of view of the issuing session, PREPARE
+ TRANSACTION is not unlike a ROLLBACK> command:
+ after executing it, there is no active current transaction, and the
+ effects of the prepared transaction are no longer visible. (The effects
+ will become visible again if the transaction is committed.)
+
+
+
+ If the PREPARE TRANSACTION command fails for any
+ reason, it becomes a ROLLBACK>: the current transaction
+ is canceled.
+
+
+
+
+ Parameters
+
+
+
+ transaction_id
+
+
+ An arbitrary identifier that later identifies this transaction for
+ COMMIT PREPARED> or ROLLBACK PREPARED>.
+ The identifier must be written as a string literal, and must be
+ less than 200 bytes long. It must not be the same as the identifier
+ used for any currently prepared transaction.
+
+
+
+
+
+
+
+ Notes
+
+
+ This command must be used inside a transaction block. Use
+ BEGIN to start one.
+
+
+
+ It is not currently allowed to PREPARE> a transaction that
+ has executed any operations involving temporary tables nor
+ created any cursors WITH HOLD>. Those features are too tightly
+ tied to the current session to be useful in a transaction to be prepared.
+
+
+
+ If the transaction modified any run-time parameters with SET>,
+ those effects persist after PREPARE TRANSACTION>, and will not
+ be affected by any later COMMIT PREPARED or
+ ROLLBACK PREPARED . Thus, in this one respect
+ PREPARE TRANSACTION> acts more like COMMIT> than
+ ROLLBACK>.
+
+
+
+ All currently available prepared transactions are listed in the
+ pg_prepared_xacts> system view.
+
+
+
+ From a performance standpoint, it is unwise to leave transactions in
+ the prepared state for a long time: this will for instance interfere with
+ the ability of VACUUM> to reclaim storage. Keep in mind also
+ that the transaction continues to hold whatever locks it held.
+ The intended
+ usage of the feature is that a prepared transaction will normally be
+ committed or rolled back as soon as an external transaction manager
+ has verified that other databases are also prepared to commit.
+
+
+
+
+ Examples
+
+ Prepare the current transaction for two-phase commit, using
+ foobar> as the transaction identifier:
+
+
+ PREPARE TRANSACTION 'foobar';
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
+
+
+
*** doc/src/sgml/ref/rollback_prepared.sgml.orig Thu Jun 16 15:19:31 2005
--- doc/src/sgml/ref/rollback_prepared.sgml Fri Jun 17 18:28:42 2005
***************
*** 0 ****
--- 1,111 ----
+
+
+
+
+ ROLLBACK PREPARED
+ SQL - Language Statements
+
+
+
+ ROLLBACK PREPARED
+ cancel a transaction that was earlier prepared for two-phase commit
+
+
+
+ ROLLBACK PREPARED
+
+
+
+
+ ROLLBACK PREPARED transaction_id
+
+
+
+
+ Description
+
+
+ ROLLBACK PREPARED rolls back a transaction that is in
+ prepared state.
+
+
+
+
+ Parameters
+
+
+
+ transaction_id
+
+
+ The transaction identifier of the transaction that is to be
+ rolled back.
+
+
+
+
+
+
+
+ Notes
+
+
+ To roll back a prepared transaction, you must be either the same user that
+ executed the transaction originally, or a superuser. But you do not
+ have to be in the same session that executed the transaction.
+
+
+
+ This command cannot be executed inside a transaction block. The prepared
+ transaction is rolled back immediately.
+
+
+
+ All currently available prepared transactions are listed in the
+ pg_prepared_xacts> system view.
+
+
+
+
+ Examples
+
+ Roll back the transaction identified by the transaction
+ identifier foobar>:
+
+
+ ROLLBACK PREPARED 'foobar';
+
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
+
+
+
*** doc/src/sgml/reference.sgml.orig Sat Aug 21 12:16:03 2004
--- doc/src/sgml/reference.sgml Thu Jun 16 15:19:15 2005
***************
*** 62,67 ****
--- 62,68 ----
&cluster;
&commentOn;
&commit;
+ &commitPrepared;
©Table;
&createAggregate;
&createCast;
***************
*** 120,130 ****
--- 121,133 ----
&move;
¬ify;
&prepare;
+ &prepareTransaction;
&reindex;
&releaseSavepoint;
&reset;
&revoke;
&rollback;
+ &rollbackPrepared;
&rollbackTo;
&savepoint;
&select;
*** doc/src/sgml/runtime.sgml.orig Tue Jun 14 18:17:23 2005
--- doc/src/sgml/runtime.sgml Fri Jun 17 17:21:30 2005
***************
*** 1113,1118 ****
--- 1113,1145 ----
+
+ max_prepared_transactions (integer )
+
+ max_prepared_transactions> configuration parameter
+
+
+
+ Sets the maximum number of transactions that can be in the
+ prepared> state simultaneously (see ).
+ Setting this parameter to zero disables the prepared-transaction
+ feature.
+ The default is 50.
+ This option can only be set at server start.
+
+
+
+ Increasing this parameter may cause PostgreSQL>
+ to request more System V> shared
+ memory than your operating system's default configuration
+ allows. See for information on how to
+ adjust those parameters, if necessary.
+
+
+
+
work_mem (integer )
*** src/backend/access/transam/Makefile.orig Thu Apr 28 17:47:10 2005
--- src/backend/access/transam/Makefile Thu Jun 16 15:18:53 2005
***************
*** 12,18 ****
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o
all: SUBSYS.o
--- 12,18 ----
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o
all: SUBSYS.o
*** src/backend/access/transam/subtrans.c.orig Thu May 19 17:35:45 2005
--- src/backend/access/transam/subtrans.c Fri Jun 17 14:00:44 2005
***************
*** 222,243 ****
/*
* This must be called ONCE during postmaster or standalone-backend startup,
* after StartupXLOG has initialized ShmemVariableCache->nextXid.
*/
void
! StartupSUBTRANS(void)
{
int startPage;
/*
* Since we don't expect pg_subtrans to be valid across crashes, we
! * initialize the currently-active page to zeroes during startup.
* Whenever we advance into a new page, ExtendSUBTRANS will likewise
* zero the new page without regard to whatever was previously on
* disk.
*/
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
! startPage = TransactionIdToPage(ShmemVariableCache->nextXid);
(void) ZeroSUBTRANSPage(startPage);
LWLockRelease(SubtransControlLock);
--- 222,254 ----
/*
* This must be called ONCE during postmaster or standalone-backend startup,
* after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
*/
void
! StartupSUBTRANS(TransactionId oldestActiveXID)
{
int startPage;
+ int endPage;
/*
* Since we don't expect pg_subtrans to be valid across crashes, we
! * initialize the currently-active page(s) to zeroes during startup.
* Whenever we advance into a new page, ExtendSUBTRANS will likewise
* zero the new page without regard to whatever was previously on
* disk.
*/
LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE);
! startPage = TransactionIdToPage(oldestActiveXID);
! endPage = TransactionIdToPage(ShmemVariableCache->nextXid);
!
! while (startPage != endPage)
! {
! (void) ZeroSUBTRANSPage(startPage);
! startPage++;
! }
(void) ZeroSUBTRANSPage(startPage);
LWLockRelease(SubtransControlLock);
*** src/backend/access/transam/transam.c.orig Sun Feb 20 16:46:48 2005
--- src/backend/access/transam/transam.c Fri Jun 17 14:37:02 2005
***************
*** 173,178 ****
--- 173,186 ----
* recursively. However, if it's older than TransactionXmin, we can't
* look at pg_subtrans; instead assume that the parent crashed without
* cleaning up its children.
+ *
+ * Originally we Assert'ed that the result of SubTransGetParent was
+ * not zero. However with the introduction of prepared transactions,
+ * there can be a window just after database startup where we do not
+ * have complete knowledge in pg_subtrans of the transactions after
+ * TransactionXmin. StartupSUBTRANS() has ensured that any missing
+ * information will be zeroed. Since this case should not happen under
+ * normal conditions, it seems reasonable to emit a WARNING for it.
*/
if (xidstatus == TRANSACTION_STATUS_SUB_COMMITTED)
{
***************
*** 181,187 ****
if (TransactionIdPrecedes(transactionId, TransactionXmin))
return false;
parentXid = SubTransGetParent(transactionId);
! Assert(TransactionIdIsValid(parentXid));
return TransactionIdDidCommit(parentXid);
}
--- 189,200 ----
if (TransactionIdPrecedes(transactionId, TransactionXmin))
return false;
parentXid = SubTransGetParent(transactionId);
! if (!TransactionIdIsValid(parentXid))
! {
! elog(WARNING, "no pg_subtrans entry for subcommitted XID %u",
! transactionId);
! return false;
! }
return TransactionIdDidCommit(parentXid);
}
***************
*** 224,230 ****
if (TransactionIdPrecedes(transactionId, TransactionXmin))
return true;
parentXid = SubTransGetParent(transactionId);
! Assert(TransactionIdIsValid(parentXid));
return TransactionIdDidAbort(parentXid);
}
--- 237,249 ----
if (TransactionIdPrecedes(transactionId, TransactionXmin))
return true;
parentXid = SubTransGetParent(transactionId);
! if (!TransactionIdIsValid(parentXid))
! {
! /* see notes in TransactionIdDidCommit */
! elog(WARNING, "no pg_subtrans entry for subcommitted XID %u",
! transactionId);
! return true;
! }
return TransactionIdDidAbort(parentXid);
}
*** src/backend/access/transam/twophase.c.orig Thu Jun 16 15:18:52 2005
--- src/backend/access/transam/twophase.c Fri Jun 17 17:23:43 2005
***************
*** 0 ****
--- 1,1659 ----
+ /*-------------------------------------------------------------------------
+ *
+ * twophase.c
+ * Two-phase commit support functions.
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ * NOTES
+ * Each global transaction is associated with a global transaction
+ * identifier (GID). The client assigns a GID to a postgres
+ * transaction with the PREPARE TRANSACTION command.
+ *
+ * We keep all active global transactions in a shared memory array.
+ * When the PREPARE TRANSACTION command is issued, the GID is
+ * reserved for the transaction in the array. This is done before
+ * a WAL entry is made, because the reservation checks for duplicate
+ * GIDs and aborts the transaction if there already is a global
+ * transaction in prepared state with the same GID.
+ *
+ * A global transaction (gxact) also has a dummy PGPROC that is entered
+ * into the ProcArray array; this is what keeps the XID considered
+ * running by TransactionIdIsInProgress. It is also convenient as a
+ * PGPROC to hook the gxact's locks to.
+ *
+ * In order to survive crashes and shutdowns, all prepared
+ * transactions must be stored in permanent storage. This includes
+ * locking information, pending notifications etc. All that state
+ * information is written to the per-transaction state file in
+ * the pg_twophase directory.
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include
+ #include
+ #include
+ #include
+
+ #include "access/heapam.h"
+ #include "access/subtrans.h"
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
+ #include "access/xact.h"
+ #include "catalog/pg_type.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "storage/fd.h"
+ #include "storage/proc.h"
+ #include "storage/procarray.h"
+ #include "storage/smgr.h"
+ #include "utils/builtins.h"
+ #include "pgstat.h"
+
+
+ /*
+ * Directory where Two-phase commit files reside within PGDATA
+ */
+ #define TWOPHASE_DIR "pg_twophase"
+
+ /* GUC variable, can't be changed after startup */
+ int max_prepared_xacts = 50;
+
+ /*
+ * This struct describes one global transaction that is in prepared state
+ * or attempting to become prepared.
+ *
+ * The first component of the struct is a dummy PGPROC that is inserted
+ * into the global ProcArray so that the transaction appears to still be
+ * running and holding locks. It must be first because we cast pointers
+ * to PGPROC and pointers to GlobalTransactionData back and forth.
+ *
+ * The lifecycle of a global transaction is:
+ *
+ * 1. After checking that the requested GID is not in use, set up an
+ * entry in the TwoPhaseState->prepXacts array with the correct XID and GID,
+ * with locking_xid = my own XID and valid = false.
+ *
+ * 2. After successfully completing prepare, set valid = true and enter the
+ * contained PGPROC into the global ProcArray.
+ *
+ * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
+ * is valid and its locking_xid is no longer active, then store my current
+ * XID into locking_xid. This prevents concurrent attempts to commit or
+ * rollback the same prepared xact.
+ *
+ * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
+ * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
+ * the freelist.
+ *
+ * Note that if the preparing transaction fails between steps 1 and 2, the
+ * entry will remain in prepXacts until recycled. We can detect recyclable
+ * entries by checking for valid = false and locking_xid no longer active.
+ *
+ * typedef struct GlobalTransactionData *GlobalTransaction appears in
+ * twophase.h
+ */
+ #define GIDSIZE 200
+
+ typedef struct GlobalTransactionData
+ {
+ PGPROC proc; /* dummy proc */
+ AclId owner; /* ID of user that executed the xact */
+ TransactionId locking_xid; /* top-level XID of backend working on xact */
+ bool valid; /* TRUE if fully prepared */
+ char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
+ } GlobalTransactionData;
+
+ /*
+ * Two Phase Commit shared state. Access to this struct is protected
+ * by TwoPhaseStateLock.
+ */
+ typedef struct TwoPhaseStateData
+ {
+ /* Head of linked list of free GlobalTransactionData structs */
+ SHMEM_OFFSET freeGXacts;
+
+ /* Number of valid prepXacts entries. */
+ int numPrepXacts;
+
+ /*
+ * There are max_prepared_xacts items in this array, but C wants a
+ * fixed-size array.
+ */
+ GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */
+ } TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */
+
+ static TwoPhaseStateData *TwoPhaseState;
+
+
+ static void RecordTransactionCommitPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels);
+ static void RecordTransactionAbortPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels);
+ static void ProcessRecords(char *bufptr, TransactionId xid,
+ const TwoPhaseCallback callbacks[]);
+
+
+ /*
+ * Initialization of shared memory
+ */
+ int
+ TwoPhaseShmemSize(void)
+ {
+ /* Need the fixed struct, the array of pointers, and the GTD structs */
+ return MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
+ sizeof(GlobalTransaction) * max_prepared_xacts) +
+ sizeof(GlobalTransactionData) * max_prepared_xacts;
+ }
+
+ void
+ TwoPhaseShmemInit(void)
+ {
+ bool found;
+
+ TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
+ TwoPhaseShmemSize(),
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ GlobalTransaction gxacts;
+ int i;
+
+ Assert(!found);
+ TwoPhaseState->freeGXacts = INVALID_OFFSET;
+ TwoPhaseState->numPrepXacts = 0;
+
+ /*
+ * Initialize the linked list of free GlobalTransactionData structs
+ */
+ gxacts = (GlobalTransaction)
+ ((char *) TwoPhaseState +
+ MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
+ sizeof(GlobalTransaction) * max_prepared_xacts));
+ for (i = 0; i < max_prepared_xacts; i++)
+ {
+ gxacts[i].proc.links.next = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]);
+ }
+ }
+ else
+ Assert(found);
+ }
+
+
+ /*
+ * MarkAsPreparing
+ * Reserve the GID for the given transaction.
+ *
+ * Internally, this creates a gxact struct and puts it into the active array.
+ * NOTE: this is also used when reloading a gxact after a crash; so avoid
+ * assuming that we can use very much backend context.
+ */
+ GlobalTransaction
+ MarkAsPreparing(TransactionId xid, Oid databaseid, char *gid, AclId owner)
+ {
+ GlobalTransaction gxact;
+ int i;
+
+ if (strlen(gid) >= GIDSIZE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("global transaction identifier \"%s\" is too long",
+ gid)));
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ /*
+ * First, find and recycle any gxacts that failed during prepare.
+ * We do this partly to ensure we don't mistakenly say their GIDs
+ * are still reserved, and partly so we don't fail on out-of-slots
+ * unnecessarily.
+ */
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ gxact = TwoPhaseState->prepXacts[i];
+ if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid))
+ {
+ /* It's dead Jim ... remove from the active array */
+ TwoPhaseState->numPrepXacts--;
+ TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
+ /* and put it back in the freelist */
+ gxact->proc.links.next = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+ /* Back up index count too, so we don't miss scanning one */
+ i--;
+ }
+ }
+
+ /* Check for conflicting GID */
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ gxact = TwoPhaseState->prepXacts[i];
+ if (strcmp(gxact->gid, gid) == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("global transaction identifier \"%s\" is already in use",
+ gid)));
+ }
+ }
+
+ /* Get a free gxact from the freelist */
+ if (TwoPhaseState->freeGXacts == INVALID_OFFSET)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("maximum number of prepared transactions reached"),
+ errhint("Increase max_prepared_transactions (currently %d).",
+ max_prepared_xacts)));
+ gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts);
+ TwoPhaseState->freeGXacts = gxact->proc.links.next;
+
+ /* Initialize it */
+ MemSet(&gxact->proc, 0, sizeof(PGPROC));
+ SHMQueueElemInit(&(gxact->proc.links));
+ gxact->proc.waitStatus = STATUS_OK;
+ gxact->proc.xid = xid;
+ gxact->proc.xmin = InvalidTransactionId;
+ gxact->proc.pid = 0;
+ gxact->proc.databaseId = databaseid;
+ gxact->proc.lwWaiting = false;
+ gxact->proc.lwExclusive = false;
+ gxact->proc.lwWaitLink = NULL;
+ gxact->proc.waitLock = NULL;
+ gxact->proc.waitProcLock = NULL;
+ SHMQueueInit(&(gxact->proc.procLocks));
+ /* subxid data must be filled later by GXactLoadSubxactData */
+ gxact->proc.subxids.overflowed = false;
+ gxact->proc.subxids.nxids = 0;
+
+ gxact->owner = owner;
+ gxact->locking_xid = xid;
+ gxact->valid = false;
+ strcpy(gxact->gid, gid);
+
+ /* And insert it into the active array */
+ Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
+ TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return gxact;
+ }
+
+ /*
+ * GXactLoadSubxactData
+ *
+ * If the transaction being persisted had any subtransactions, this must
+ * be called before MarkAsPrepared() to load information into the dummy
+ * PGPROC.
+ */
+ static void
+ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
+ TransactionId *children)
+ {
+ /* We need no extra lock since the GXACT isn't valid yet */
+ if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
+ {
+ gxact->proc.subxids.overflowed = true;
+ nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
+ }
+ if (nsubxacts > 0)
+ {
+ memcpy(gxact->proc.subxids.xids, children,
+ nsubxacts * sizeof(TransactionId));
+ gxact->proc.subxids.nxids = nsubxacts;
+ }
+ }
+
+ /*
+ * MarkAsPrepared
+ * Mark the GXACT as fully valid, and enter it into the global ProcArray.
+ */
+ void
+ MarkAsPrepared(GlobalTransaction gxact)
+ {
+ /* Lock here may be overkill, but I'm not convinced of that ... */
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+ Assert(!gxact->valid);
+ gxact->valid = true;
+ LWLockRelease(TwoPhaseStateLock);
+
+ /*
+ * Put it into the global ProcArray so TransactionIdInProgress considers
+ * the XID as still running.
+ */
+ ProcArrayAdd(&gxact->proc);
+ }
+
+ /*
+ * LockGXact
+ * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
+ */
+ static GlobalTransaction
+ LockGXact(char *gid, AclId user)
+ {
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ /* Ignore not-yet-valid GIDs */
+ if (!gxact->valid)
+ continue;
+ if (strcmp(gxact->gid, gid) != 0)
+ continue;
+
+ /* Found it, but has someone else got it locked? */
+ if (TransactionIdIsValid(gxact->locking_xid))
+ {
+ if (TransactionIdIsActive(gxact->locking_xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("prepared transaction with gid \"%s\" is busy",
+ gid)));
+ gxact->locking_xid = InvalidTransactionId;
+ }
+
+ if (user != gxact->owner && !superuser_arg(user))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to finish prepared transaction"),
+ errhint("Must be superuser or the user that prepared the transaction.")));
+
+ /* OK for me to lock it */
+ gxact->locking_xid = GetTopTransactionId();
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return gxact;
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("prepared transaction with gid \"%s\" does not exist",
+ gid)));
+
+ /* NOTREACHED */
+ return NULL;
+ }
+
+ /*
+ * RemoveGXact
+ * Remove the prepared transaction from the shared memory array.
+ *
+ * NB: caller should have already removed it from ProcArray
+ */
+ static void
+ RemoveGXact(GlobalTransaction gxact)
+ {
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ if (gxact == TwoPhaseState->prepXacts[i])
+ {
+ /* remove from the active array */
+ TwoPhaseState->numPrepXacts--;
+ TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
+
+ /* and put it back in the freelist */
+ gxact->proc.links.next = TwoPhaseState->freeGXacts;
+ TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact);
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
+ }
+
+ /*
+ * Returns an array of all prepared transactions for the user-level
+ * function pg_prepared_xact.
+ *
+ * The returned array and all its elements are copies of internal data
+ * structures, to minimize the time we need to hold the TwoPhaseStateLock.
+ *
+ * WARNING -- we return even those transactions that are not fully prepared
+ * yet. The caller should filter them out if he doesn't want them.
+ *
+ * The returned array is palloc'd.
+ */
+ static int
+ GetPreparedTransactionList(GlobalTransaction *gxacts)
+ {
+ GlobalTransaction array;
+ int num;
+ int i;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ if (TwoPhaseState->numPrepXacts == 0)
+ {
+ LWLockRelease(TwoPhaseStateLock);
+
+ *gxacts = NULL;
+ return 0;
+ }
+
+ num = TwoPhaseState->numPrepXacts;
+ array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
+ *gxacts = array;
+ for (i = 0; i < num; i++)
+ memcpy(array + i, TwoPhaseState->prepXacts[i],
+ sizeof(GlobalTransactionData));
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return num;
+ }
+
+
+ /* Working status for pg_prepared_xact */
+ typedef struct
+ {
+ GlobalTransaction array;
+ int ngxacts;
+ int currIdx;
+ } Working_State;
+
+ /*
+ * pg_prepared_xact
+ * Produce a view with one row per prepared transaction.
+ *
+ * This function is here so we don't have to export the
+ * GlobalTransactionData struct definition.
+ */
+ Datum
+ pg_prepared_xact(PG_FUNCTION_ARGS)
+ {
+ FuncCallContext *funcctx;
+ Working_State *status;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ TupleDesc tupdesc;
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /*
+ * Switch to memory context appropriate for multiple function
+ * calls
+ */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* build tupdesc for result tuples */
+ /* this had better match pg_prepared_xacts view in system_views.sql */
+ tupdesc = CreateTemplateTupleDesc(4, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "ownerid",
+ INT4OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "dbid",
+ OIDOID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /*
+ * Collect all the 2PC status information that we will format and
+ * send out as a result set.
+ */
+ status = (Working_State *) palloc(sizeof(Working_State));
+ funcctx->user_fctx = (void *) status;
+
+ status->ngxacts = GetPreparedTransactionList(&status->array);
+ status->currIdx = 0;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ status = (Working_State *) funcctx->user_fctx;
+
+ while (status->array != NULL && status->currIdx < status->ngxacts)
+ {
+ GlobalTransaction gxact = &status->array[status->currIdx++];
+ Datum values[4];
+ bool nulls[4];
+ HeapTuple tuple;
+ Datum result;
+
+ if (!gxact->valid)
+ continue;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = TransactionIdGetDatum(gxact->proc.xid);
+ values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid));
+ values[2] = Int32GetDatum(gxact->owner);
+ values[3] = ObjectIdGetDatum(gxact->proc.databaseId);
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
+ SRF_RETURN_DONE(funcctx);
+ }
+
+ /*
+ * TwoPhaseGetDummyProc
+ * Get the PGPROC that represents a prepared transaction specified by XID
+ */
+ PGPROC *
+ TwoPhaseGetDummyProc(TransactionId xid)
+ {
+ PGPROC *result = NULL;
+ int i;
+
+ static TransactionId cached_xid = InvalidTransactionId;
+ static PGPROC *cached_proc = NULL;
+
+ /*
+ * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
+ * repeatedly for the same XID. We can save work with a simple cache.
+ */
+ if (xid == cached_xid)
+ return cached_proc;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ if (gxact->proc.xid == xid)
+ {
+ result = &gxact->proc;
+ break;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ if (result == NULL) /* should not happen */
+ elog(ERROR, "failed to find dummy PGPROC for xid %u", xid);
+
+ cached_xid = xid;
+ cached_proc = result;
+
+ return result;
+ }
+
+ /************************************************************************/
+ /* State file support */
+ /************************************************************************/
+
+ #define TwoPhaseFilePath(path, xid) \
+ snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid)
+
+ /*
+ * 2PC state file format:
+ *
+ * 1. TwoPhaseFileHeader
+ * 2. TransactionId[] (subtransactions)
+ * 3. RelFileNode[] (files to be deleted at commit)
+ * 4. RelFileNode[] (files to be deleted at abort)
+ * 5. TwoPhaseRecordOnDisk
+ * 6. ...
+ * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ * 8. CRC32
+ *
+ * Each segment except the final CRC32 is MAXALIGN'd.
+ */
+
+ /*
+ * Header for a 2PC state file
+ */
+ #define TWOPHASE_MAGIC 0x57F94530 /* format identifier */
+
+ typedef struct TwoPhaseFileHeader
+ {
+ uint32 magic; /* format identifier */
+ uint32 total_len; /* actual file length */
+ TransactionId xid; /* original transaction XID */
+ Oid database; /* OID of database it was in */
+ AclId owner; /* user running the transaction */
+ int32 nsubxacts; /* number of following subxact XIDs */
+ int32 ncommitrels; /* number of delete-on-commit rels */
+ int32 nabortrels; /* number of delete-on-abort rels */
+ char gid[GIDSIZE]; /* GID for transaction */
+ } TwoPhaseFileHeader;
+
+ /*
+ * Header for each record in a state file
+ *
+ * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
+ * The rmgr data will be stored starting on a MAXALIGN boundary.
+ */
+ typedef struct TwoPhaseRecordOnDisk
+ {
+ uint32 len; /* length of rmgr data */
+ TwoPhaseRmgrId rmid; /* resource manager for this record */
+ uint16 info; /* flag bits for use by rmgr */
+ } TwoPhaseRecordOnDisk;
+
+ /*
+ * During prepare, the state file is assembled in memory before writing it
+ * to WAL and the actual state file. We use a chain of XLogRecData blocks
+ * so that we will be able to pass the state file contents directly to
+ * XLogInsert.
+ */
+ static struct xllist
+ {
+ XLogRecData *head; /* first data block in the chain */
+ XLogRecData *tail; /* last block in chain */
+ uint32 bytes_free; /* free bytes left in tail block */
+ uint32 total_len; /* total data bytes in chain */
+ } records;
+
+
+ /*
+ * Append a block of data to records data structure.
+ *
+ * NB: each block is padded to a MAXALIGN multiple. This must be
+ * accounted for when the file is later read!
+ *
+ * The data is copied, so the caller is free to modify it afterwards.
+ */
+ static void
+ save_state_data(const void *data, uint32 len)
+ {
+ uint32 padlen = MAXALIGN(len);
+
+ if (padlen > records.bytes_free)
+ {
+ records.tail->next = palloc0(sizeof(XLogRecData));
+ records.tail = records.tail->next;
+ records.tail->buffer = InvalidBuffer;
+ records.tail->len = 0;
+ records.tail->next = NULL;
+
+ records.bytes_free = Max(padlen, 512);
+ records.tail->data = palloc(records.bytes_free);
+ }
+
+ memcpy(((char *) records.tail->data) + records.tail->len, data, len);
+ records.tail->len += padlen;
+ records.bytes_free -= padlen;
+ records.total_len += padlen;
+ }
+
+ /*
+ * Start preparing a state file.
+ *
+ * Initializes data structure and inserts the 2PC file header record.
+ */
+ void
+ StartPrepare(GlobalTransaction gxact)
+ {
+ TransactionId xid = gxact->proc.xid;
+ TwoPhaseFileHeader hdr;
+ TransactionId *children;
+ RelFileNode *commitrels;
+ RelFileNode *abortrels;
+
+ /* Initialize linked list */
+ records.head = palloc0(sizeof(XLogRecData));
+ records.head->buffer = InvalidBuffer;
+ records.head->len = 0;
+ records.head->next = NULL;
+
+ records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
+ records.head->data = palloc(records.bytes_free);
+
+ records.tail = records.head;
+
+ records.total_len = 0;
+
+ /* Create header */
+ hdr.magic = TWOPHASE_MAGIC;
+ hdr.total_len = 0; /* EndPrepare will fill this in */
+ hdr.xid = xid;
+ hdr.database = MyDatabaseId;
+ hdr.owner = GetUserId();
+ hdr.nsubxacts = xactGetCommittedChildren(&children);
+ hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
+ hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
+ StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
+
+ save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
+
+ /* Add the additional info about subxacts and deletable files */
+ if (hdr.nsubxacts > 0)
+ {
+ save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
+ /* While we have the child-xact data, stuff it in the gxact too */
+ GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
+ pfree(children);
+ }
+ if (hdr.ncommitrels > 0)
+ {
+ save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
+ pfree(commitrels);
+ }
+ if (hdr.nabortrels > 0)
+ {
+ save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
+ pfree(abortrels);
+ }
+ }
+
+ /*
+ * Finish preparing state file.
+ *
+ * Calculates CRC and writes state file to WAL and in pg_twophase directory.
+ */
+ void
+ EndPrepare(GlobalTransaction gxact)
+ {
+ TransactionId xid = gxact->proc.xid;
+ TwoPhaseFileHeader *hdr;
+ char path[MAXPGPATH];
+ XLogRecData *record;
+ XLogRecPtr recptr;
+ pg_crc32 statefile_crc;
+ pg_crc32 bogus_crc;
+ int fd;
+
+ /* Add the end sentinel to the list of 2PC records */
+ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
+ NULL, 0);
+
+ /* Go back and fill in total_len in the file header record */
+ hdr = (TwoPhaseFileHeader *) records.head->data;
+ Assert(hdr->magic == TWOPHASE_MAGIC);
+ hdr->total_len = records.total_len + sizeof(pg_crc32);
+
+ /*
+ * Create the 2PC state file.
+ *
+ * Note: because we use BasicOpenFile(), we are responsible for ensuring
+ * the FD gets closed in any error exit path. Once we get into the
+ * critical section, though, it doesn't matter since any failure causes
+ * PANIC anyway.
+ */
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create twophase state file \"%s\": %m",
+ path)));
+
+ /* Write data to file, and calculate CRC as we pass over it */
+ INIT_CRC32(statefile_crc);
+
+ for (record = records.head; record != NULL; record = record->next)
+ {
+ COMP_CRC32(statefile_crc, record->data, record->len);
+ if ((write(fd, record->data, record->len)) != record->len)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+ }
+
+ FIN_CRC32(statefile_crc);
+
+ /*
+ * Write a deliberately bogus CRC to the state file, and flush it to disk.
+ * This is to minimize the odds of failure within the critical section
+ * below --- in particular, running out of disk space.
+ *
+ * On most filesystems, write() rather than fsync() detects out-of-space,
+ * so the fsync might be considered optional. Using it means there
+ * are three fsyncs not two associated with preparing a transaction; is
+ * the risk of an error from fsync high enough to justify that?
+ */
+ bogus_crc = ~ statefile_crc;
+
+ if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file: %m")));
+ }
+
+ /* Back up to prepare for rewriting the CRC */
+ if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek twophase state file: %m")));
+ }
+
+ /*
+ * The state file isn't valid yet, because we haven't written the correct
+ * CRC yet. Before we do that, insert entry in WAL and flush it to disk.
+ *
+ * Between the time we have written the WAL entry and the time we
+ * flush the correct state file CRC to disk, we have an inconsistency:
+ * the xact is prepared according to WAL but not according to our on-disk
+ * state. We use a critical section to force a PANIC if we are unable to
+ * complete the flush --- then, WAL replay should repair the
+ * inconsistency.
+ *
+ * We have to lock out checkpoint start here, too; otherwise a checkpoint
+ * starting immediately after the WAL record is inserted could complete
+ * before we've finished flushing, meaning that the WAL record would not
+ * get replayed if a crash follows.
+ */
+ START_CRIT_SECTION();
+
+ LWLockAcquire(CheckpointStartLock, LW_SHARED);
+
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head);
+ XLogFlush(recptr);
+
+ /* If we crash now, we have prepared: WAL replay will fix things */
+
+ /* write correct CRC, flush, and close file */
+ if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file: %m")));
+ }
+
+ if (close(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close twophase state file: %m")));
+
+ LWLockRelease(CheckpointStartLock);
+
+ END_CRIT_SECTION();
+
+ records.tail = records.head = NULL;
+ }
+
+ /*
+ * Register a 2PC record to be written to state file.
+ */
+ void
+ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
+ const void *data, uint32 len)
+ {
+ TwoPhaseRecordOnDisk record;
+
+ record.rmid = rmid;
+ record.info = info;
+ record.len = len;
+ save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
+ if (len > 0)
+ save_state_data(data, len);
+ }
+
+
+ /*
+ * Read and validate the state file for xid.
+ *
+ * If it looks OK (has a valid magic number and CRC), return the palloc'd
+ * contents of the file. Otherwise return NULL.
+ */
+ static char *
+ ReadTwoPhaseFile(TransactionId xid)
+ {
+ char path[MAXPGPATH];
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ int fd;
+ struct stat stat;
+ uint32 crc_offset;
+ pg_crc32 calc_crc, file_crc;
+
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+ if (fd < 0)
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not open twophase state file \"%s\": %m",
+ path)));
+ return NULL;
+ }
+
+ /*
+ * Check file length. We can determine a lower bound pretty easily.
+ * We set an upper bound mainly to avoid palloc() failure on a corrupt
+ * file.
+ */
+ if (fstat(fd, &stat))
+ {
+ close(fd);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not stat twophase state file \"%s\": %m",
+ path)));
+ return NULL;
+ }
+
+ if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
+ MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
+ sizeof(pg_crc32)) ||
+ stat.st_size > 10000000)
+ {
+ close(fd);
+ return NULL;
+ }
+
+ crc_offset = stat.st_size - sizeof(pg_crc32);
+ if (crc_offset != MAXALIGN(crc_offset))
+ {
+ close(fd);
+ return NULL;
+ }
+
+ /*
+ * OK, slurp in the file.
+ */
+ buf = (char *) palloc(stat.st_size);
+
+ if (read(fd, buf, stat.st_size) != stat.st_size)
+ {
+ close(fd);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not read twophase state file \"%s\": %m",
+ path)));
+ pfree(buf);
+ return NULL;
+ }
+
+ close(fd);
+
+ hdr = (TwoPhaseFileHeader *) buf;
+ if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
+ {
+ pfree(buf);
+ return NULL;
+ }
+
+ INIT_CRC32(calc_crc);
+ COMP_CRC32(calc_crc, buf, crc_offset);
+ FIN_CRC32(calc_crc);
+
+ file_crc = *((pg_crc32 *) (buf + crc_offset));
+
+ if (!EQ_CRC32(calc_crc, file_crc))
+ {
+ pfree(buf);
+ return NULL;
+ }
+
+ return buf;
+ }
+
+
+ /*
+ * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
+ */
+ void
+ FinishPreparedTransaction(char *gid, bool isCommit)
+ {
+ GlobalTransaction gxact;
+ TransactionId xid;
+ char *buf;
+ char *bufptr;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *children;
+ RelFileNode *commitrels;
+ RelFileNode *abortrels;
+ int i;
+
+ /*
+ * Validate the GID, and lock the GXACT to ensure that two backends
+ * do not try to commit the same GID at once.
+ */
+ gxact = LockGXact(gid, GetUserId());
+ xid = gxact->proc.xid;
+
+ /*
+ * Read and validate the state file
+ */
+ buf = ReadTwoPhaseFile(xid);
+ if (buf == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("twophase state file for transaction %u is corrupt",
+ xid)));
+
+ /*
+ * Disassemble the header area
+ */
+ hdr = (TwoPhaseFileHeader *) buf;
+ Assert(TransactionIdEquals(hdr->xid, xid));
+ bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ children = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+ commitrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+ abortrels = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ /*
+ * The order of operations here is critical: make the XLOG entry for
+ * commit or abort, then mark the transaction committed or aborted in
+ * pg_clog, then remove its PGPROC from the global ProcArray (which
+ * means TransactionIdIsInProgress will stop saying the prepared xact
+ * is in progress), then run the post-commit or post-abort callbacks.
+ * The callbacks will release the locks the transaction held.
+ */
+ if (isCommit)
+ RecordTransactionCommitPrepared(xid,
+ hdr->nsubxacts, children,
+ hdr->ncommitrels, commitrels);
+ else
+ RecordTransactionAbortPrepared(xid,
+ hdr->nsubxacts, children,
+ hdr->nabortrels, abortrels);
+
+ ProcArrayRemove(&gxact->proc);
+
+ /*
+ * In case we fail while running the callbacks, mark the gxact invalid
+ * so no one else will try to commit/rollback, and so it can be recycled
+ * properly later. It is still locked by our XID so it won't go away yet.
+ */
+ gxact->valid = false;
+
+ if (isCommit)
+ ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
+ else
+ ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
+
+ /*
+ * We also have to remove any files that were supposed to be dropped.
+ * NB: this code knows that we couldn't be dropping any temp rels ...
+ */
+ if (isCommit)
+ {
+ for (i = 0; i < hdr->ncommitrels; i++)
+ smgrdounlink(smgropen(commitrels[i]), false, false);
+ }
+ else
+ {
+ for (i = 0; i < hdr->nabortrels; i++)
+ smgrdounlink(smgropen(abortrels[i]), false, false);
+ }
+
+ pgstat_count_xact_commit();
+
+ /*
+ * And now we can clean up our mess.
+ */
+ RemoveTwoPhaseFile(xid, true);
+
+ RemoveGXact(gxact);
+
+ pfree(buf);
+ }
+
+ /*
+ * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile)
+ * and call the indicated callbacks for each 2PC record.
+ */
+ static void
+ ProcessRecords(char *bufptr, TransactionId xid,
+ const TwoPhaseCallback callbacks[])
+ {
+ for (;;)
+ {
+ TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
+
+ Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
+ if (record->rmid == TWOPHASE_RM_END_ID)
+ break;
+
+ bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
+
+ if (callbacks[record->rmid] != NULL)
+ callbacks[record->rmid](xid, record->info,
+ (void *) bufptr, record->len);
+
+ bufptr += MAXALIGN(record->len);
+ }
+ }
+
+ /*
+ * Remove the 2PC file for the specified XID.
+ *
+ * If giveWarning is false, do not complain about file-not-present;
+ * this is an expected case during WAL replay.
+ */
+ void
+ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
+ {
+ char path[MAXPGPATH];
+
+ TwoPhaseFilePath(path, xid);
+ if (unlink(path))
+ if (errno != ENOENT || giveWarning)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove two-phase state file \"%s\": %m",
+ path)));
+ }
+
+ /*
+ * Recreates a state file. This is used in WAL replay.
+ *
+ * Note: content and len don't include CRC.
+ */
+ void
+ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+ {
+ char path[MAXPGPATH];
+ pg_crc32 statefile_crc;
+ int fd;
+
+ /* Recompute CRC */
+ INIT_CRC32(statefile_crc);
+ COMP_CRC32(statefile_crc, content, len);
+ FIN_CRC32(statefile_crc);
+
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path,
+ O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not recreate twophase state file \"%s\": %m",
+ path)));
+
+ /* Write content and CRC */
+ if (write(fd, content, len) != len)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+ if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write twophase state file: %m")));
+ }
+
+ /* Sync and close the file */
+ if (pg_fsync(fd) != 0)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file: %m")));
+ }
+
+ if (close(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close twophase state file: %m")));
+ }
+
+ /*
+ * PrescanPreparedTransactions
+ *
+ * Scan the pg_twophase directory and determine the range of valid XIDs
+ * present. This is run during database startup, after we have completed
+ * reading WAL. ShmemVariableCache->nextXid has been set to one more than
+ * the highest XID for which evidence exists in WAL.
+ *
+ * We throw away any prepared xacts with main XID beyond nextXid --- if any
+ * are present, it suggests that the DBA has done a PITR recovery to an
+ * earlier point in time without cleaning out pg_twophase. We dare not
+ * try to recover such prepared xacts since they likely depend on database
+ * state that doesn't exist now.
+ *
+ * However, we will advance nextXid beyond any subxact XIDs belonging to
+ * valid prepared xacts. We need to do this since subxact commit doesn't
+ * write a WAL entry, and so there might be no evidence in WAL of those
+ * subxact XIDs.
+ *
+ * Our other responsibility is to determine and return the oldest valid XID
+ * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
+ * This is needed to synchronize pg_subtrans startup properly.
+ */
+ TransactionId
+ PrescanPreparedTransactions(void)
+ {
+ TransactionId origNextXid = ShmemVariableCache->nextXid;
+ TransactionId result = origNextXid;
+ char dir[MAXPGPATH];
+ DIR *cldir;
+ struct dirent *clde;
+
+ snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+
+ cldir = AllocateDir(dir);
+ if (cldir == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open directory \"%s\": %m", dir)));
+
+ errno = 0;
+ while ((clde = readdir(cldir)) != NULL)
+ {
+ if (strlen(clde->d_name) == 8 &&
+ strspn(clde->d_name, "0123456789ABCDEF") == 8)
+ {
+ TransactionId xid;
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *subxids;
+ int i;
+
+ xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+ /* Reject XID if too new */
+ if (TransactionIdFollowsOrEquals(xid, origNextXid))
+ {
+ ereport(WARNING,
+ (errmsg("removing future twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ /*
+ * Note: we can't check if already processed because clog
+ * subsystem isn't up yet.
+ */
+
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid);
+ if (buf == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ /* Deconstruct header */
+ hdr = (TwoPhaseFileHeader *) buf;
+ if (!TransactionIdEquals(hdr->xid, xid))
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ pfree(buf);
+ errno = 0;
+ continue;
+ }
+
+ /*
+ * OK, we think this file is valid. Incorporate xid into the
+ * running-minimum result.
+ */
+ if (TransactionIdPrecedes(xid, result))
+ result = xid;
+
+ /*
+ * Examine subtransaction XIDs ... they should all follow main
+ * XID, and they may force us to advance nextXid.
+ */
+ subxids = (TransactionId *)
+ (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
+ for (i = 0; i < hdr->nsubxacts; i++)
+ {
+ TransactionId subxid = subxids[i];
+
+ Assert(TransactionIdFollows(subxid, xid));
+ if (TransactionIdFollowsOrEquals(subxid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = subxid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+ }
+
+ pfree(buf);
+ }
+ errno = 0;
+ }
+ #ifdef WIN32
+
+ /*
+ * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
+ * not in released version
+ */
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ errno = 0;
+ #endif
+ if (errno)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read directory \"%s\": %m", dir)));
+
+ FreeDir(cldir);
+
+ return result;
+ }
+
+ /*
+ * RecoverPreparedTransactions
+ *
+ * Scan the pg_twophase directory and reload shared-memory state for each
+ * prepared transaction (reacquire locks, etc). This is run during database
+ * startup.
+ */
+ void
+ RecoverPreparedTransactions(void)
+ {
+ char dir[MAXPGPATH];
+ DIR *cldir;
+ struct dirent *clde;
+
+ snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+
+ cldir = AllocateDir(dir);
+ if (cldir == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open directory \"%s\": %m", dir)));
+
+ errno = 0;
+ while ((clde = readdir(cldir)) != NULL)
+ {
+ if (strlen(clde->d_name) == 8 &&
+ strspn(clde->d_name, "0123456789ABCDEF") == 8)
+ {
+ TransactionId xid;
+ char *buf;
+ char *bufptr;
+ TwoPhaseFileHeader *hdr;
+ TransactionId *subxids;
+ GlobalTransaction gxact;
+ int i;
+
+ xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ {
+ ereport(WARNING,
+ (errmsg("removing stale twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid);
+ if (buf == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid, true);
+ errno = 0;
+ continue;
+ }
+
+ ereport(LOG,
+ (errmsg("recovering prepared transaction %u", xid)));
+
+ /* Deconstruct header */
+ hdr = (TwoPhaseFileHeader *) buf;
+ Assert(TransactionIdEquals(hdr->xid, xid));
+ bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+ subxids = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ /*
+ * Reconstruct subtrans state for the transaction --- needed
+ * because pg_subtrans is not preserved over a restart
+ */
+ for (i = 0; i < hdr->nsubxacts; i++)
+ SubTransSetParent(subxids[i], xid);
+
+ /*
+ * Recreate its GXACT and dummy PGPROC
+ */
+ gxact = MarkAsPreparing(xid, hdr->database, hdr->gid, hdr->owner);
+ GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
+ MarkAsPrepared(gxact);
+
+ /*
+ * Recover other state (notably locks) using resource managers
+ */
+ ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+
+ pfree(buf);
+ }
+ errno = 0;
+ }
+ #ifdef WIN32
+
+ /*
+ * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but
+ * not in released version
+ */
+ if (GetLastError() == ERROR_NO_MORE_FILES)
+ errno = 0;
+ #endif
+ if (errno)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read directory \"%s\": %m", dir)));
+
+ FreeDir(cldir);
+ }
+
+ /*
+ * RecordTransactionCommitPrepared
+ *
+ * This is basically the same as RecordTransactionCommit: in particular,
+ * we must take the CheckpointStartLock to avoid a race condition.
+ *
+ * We know the transaction made at least one XLOG entry (its PREPARE),
+ * so it is never possible to optimize out the commit record.
+ */
+ static void
+ RecordTransactionCommitPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels)
+ {
+ XLogRecData rdata[3];
+ int lastrdata = 0;
+ xl_xact_commit_prepared xlrec;
+ XLogRecPtr recptr;
+
+ START_CRIT_SECTION();
+
+ /* See notes in RecordTransactionCommit */
+ LWLockAcquire(CheckpointStartLock, LW_SHARED);
+
+ /* Emit the XLOG commit record */
+ xlrec.xid = xid;
+ xlrec.crec.xtime = time(NULL);
+ xlrec.crec.nrels = nrels;
+ xlrec.crec.nsubxacts = nchildren;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactCommitPrepared;
+ rdata[0].buffer = InvalidBuffer;
+ /* dump rels to delete */
+ if (nrels > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) rels;
+ rdata[1].len = nrels * sizeof(RelFileNode);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ rdata[2].buffer = InvalidBuffer;
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_XACT_ID,
+ XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
+ rdata);
+
+ /* we don't currently try to sleep before flush here ... */
+
+ /* Flush XLOG to disk */
+ XLogFlush(recptr);
+
+ /* Mark the transaction committed in pg_clog */
+ TransactionIdCommit(xid);
+ /* to avoid race conditions, the parent must commit first */
+ TransactionIdCommitTree(nchildren, children);
+
+ /* Checkpoint is allowed again */
+ LWLockRelease(CheckpointStartLock);
+
+ END_CRIT_SECTION();
+ }
+
+ /*
+ * RecordTransactionAbortPrepared
+ *
+ * This is basically the same as RecordTransactionAbort.
+ *
+ * We know the transaction made at least one XLOG entry (its PREPARE),
+ * so it is never possible to optimize out the abort record.
+ */
+ static void
+ RecordTransactionAbortPrepared(TransactionId xid,
+ int nchildren,
+ TransactionId *children,
+ int nrels,
+ RelFileNode *rels)
+ {
+ XLogRecData rdata[3];
+ int lastrdata = 0;
+ xl_xact_abort_prepared xlrec;
+ XLogRecPtr recptr;
+
+ /*
+ * Catch the scenario where we aborted partway through
+ * RecordTransactionCommitPrepared ...
+ */
+ if (TransactionIdDidCommit(xid))
+ elog(PANIC, "cannot abort transaction %u, it was already committed",
+ xid);
+
+ START_CRIT_SECTION();
+
+ /* Emit the XLOG abort record */
+ xlrec.xid = xid;
+ xlrec.arec.xtime = time(NULL);
+ xlrec.arec.nrels = nrels;
+ xlrec.arec.nsubxacts = nchildren;
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactAbortPrepared;
+ rdata[0].buffer = InvalidBuffer;
+ /* dump rels to delete */
+ if (nrels > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) rels;
+ rdata[1].len = nrels * sizeof(RelFileNode);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+ /* dump committed child Xids */
+ if (nchildren > 0)
+ {
+ rdata[lastrdata].next = &(rdata[2]);
+ rdata[2].data = (char *) children;
+ rdata[2].len = nchildren * sizeof(TransactionId);
+ rdata[2].buffer = InvalidBuffer;
+ lastrdata = 2;
+ }
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_XACT_ID,
+ XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
+ rdata);
+
+ /* Always flush, since we're about to remove the 2PC state file */
+ XLogFlush(recptr);
+
+ /*
+ * Mark the transaction aborted in clog. This is not absolutely
+ * necessary but we may as well do it while we are here.
+ */
+ TransactionIdAbort(xid);
+ TransactionIdAbortTree(nchildren, children);
+
+ END_CRIT_SECTION();
+ }
+
*** src/backend/access/transam/twophase_rmgr.c.orig Thu Jun 16 15:18:52 2005
--- src/backend/access/transam/twophase_rmgr.c Fri Jun 17 14:40:06 2005
***************
*** 0 ****
--- 1,49 ----
+ /*-------------------------------------------------------------------------
+ *
+ * twophase_rmgr.c
+ * Two-phase-commit resource managers tables
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include "access/twophase_rmgr.h"
+ #include "commands/async.h"
+ #include "storage/lock.h"
+ #include "utils/flatfiles.h"
+ #include "utils/inval.h"
+
+
+ const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+ {
+ NULL, /* END ID */
+ lock_twophase_recover, /* Lock */
+ NULL, /* Inval */
+ NULL, /* flat file update */
+ NULL /* notify/listen */
+ };
+
+ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+ {
+ NULL, /* END ID */
+ lock_twophase_postcommit, /* Lock */
+ inval_twophase_postcommit, /* Inval */
+ flatfile_twophase_postcommit, /* flat file update */
+ notify_twophase_postcommit /* notify/listen */
+ };
+
+ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+ {
+ NULL, /* END ID */
+ lock_twophase_postabort, /* Lock */
+ NULL, /* Inval */
+ NULL, /* flat file update */
+ NULL /* notify/listen */
+ };
*** src/backend/access/transam/xact.c.orig Mon Jun 6 16:22:57 2005
--- src/backend/access/transam/xact.c Fri Jun 17 12:37:52 2005
***************
*** 22,27 ****
--- 22,28 ----
#include "access/multixact.h"
#include "access/subtrans.h"
+ #include "access/twophase.h"
#include "access/xact.h"
#include "catalog/heap.h"
#include "catalog/index.h"
***************
*** 68,74 ****
TRANS_START,
TRANS_INPROGRESS,
TRANS_COMMIT,
! TRANS_ABORT
} TransState;
/*
--- 69,76 ----
TRANS_START,
TRANS_INPROGRESS,
TRANS_COMMIT,
! TRANS_ABORT,
! TRANS_PREPARE
} TransState;
/*
***************
*** 90,95 ****
--- 92,98 ----
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */
+ TBLOCK_PREPARE, /* live xact, PREPARE received */
/* subtransaction states */
TBLOCK_SUBBEGIN, /* starting a subtransaction */
***************
*** 172,177 ****
--- 175,186 ----
static AbsoluteTime xactStartTime; /* integer part */
static int xactStartTimeUsec; /* microsecond part */
+ /*
+ * GID to be used for preparing the current transaction. This is also
+ * global to a whole transaction, so we don't keep it in the state stack.
+ */
+ static char *prepareGID;
+
/*
* List of add-on start- and end-of-xact callbacks
***************
*** 267,276 ****
return true;
case TRANS_ABORT:
return true;
}
/*
! * Shouldn't get here, but lint is not happy with this...
*/
return false;
}
--- 276,287 ----
return true;
case TRANS_ABORT:
return true;
+ case TRANS_PREPARE:
+ return true;
}
/*
! * Shouldn't get here, but lint is not happy without this...
*/
return false;
}
***************
*** 660,671 ****
RecordTransactionCommit(void)
{
int nrels;
! RelFileNode *rptr;
int nchildren;
TransactionId *children;
/* Get data needed for commit record */
! nrels = smgrGetPendingDeletes(true, &rptr);
nchildren = xactGetCommittedChildren(&children);
/*
--- 671,682 ----
RecordTransactionCommit(void)
{
int nrels;
! RelFileNode *rels;
int nchildren;
TransactionId *children;
/* Get data needed for commit record */
! nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
/*
***************
*** 726,732 ****
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rptr;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
--- 737,743 ----
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
***************
*** 809,820 ****
MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false;
- /* Show myself as out of the transaction in PGPROC array */
- MyProc->logRec.xrecoff = 0;
-
/* And clean up local data */
! if (rptr)
! pfree(rptr);
if (children)
pfree(children);
}
--- 820,828 ----
MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false;
/* And clean up local data */
! if (rels)
! pfree(rels);
if (children)
pfree(children);
}
***************
*** 970,981 ****
RecordTransactionAbort(void)
{
int nrels;
! RelFileNode *rptr;
int nchildren;
TransactionId *children;
/* Get data needed for abort record */
! nrels = smgrGetPendingDeletes(false, &rptr);
nchildren = xactGetCommittedChildren(&children);
/*
--- 978,989 ----
RecordTransactionAbort(void)
{
int nrels;
! RelFileNode *rels;
int nchildren;
TransactionId *children;
/* Get data needed for abort record */
! nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
/*
***************
*** 1026,1032 ****
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rptr;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
--- 1034,1040 ----
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
***************
*** 1069,1080 ****
MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false;
- /* Show myself as out of the transaction in PGPROC array */
- MyProc->logRec.xrecoff = 0;
-
/* And clean up local data */
! if (rptr)
! pfree(rptr);
if (children)
pfree(children);
}
--- 1077,1085 ----
MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false;
/* And clean up local data */
! if (rels)
! pfree(rels);
if (children)
pfree(children);
}
***************
*** 1166,1178 ****
RecordSubTransactionAbort(void)
{
int nrels;
! RelFileNode *rptr;
TransactionId xid = GetCurrentTransactionId();
int nchildren;
TransactionId *children;
/* Get data needed for abort record */
! nrels = smgrGetPendingDeletes(false, &rptr);
nchildren = xactGetCommittedChildren(&children);
/*
--- 1171,1183 ----
RecordSubTransactionAbort(void)
{
int nrels;
! RelFileNode *rels;
TransactionId xid = GetCurrentTransactionId();
int nchildren;
TransactionId *children;
/* Get data needed for abort record */
! nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
/*
***************
*** 1212,1218 ****
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rptr;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
--- 1217,1223 ----
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
! rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
***************
*** 1256,1263 ****
XidCacheRemoveRunningXids(xid, nchildren, children);
/* And clean up local data */
! if (rptr)
! pfree(rptr);
if (children)
pfree(children);
}
--- 1261,1268 ----
XidCacheRemoveRunningXids(xid, nchildren, children);
/* And clean up local data */
! if (rels)
! pfree(rels);
if (children)
pfree(children);
}
***************
*** 1419,1426 ****
--- 1424,1434 ----
ShowTransactionState("StartTransaction");
}
+
/*
* CommitTransaction
+ *
+ * NB: if you change this routine, better look at PrepareTransaction too!
*/
static void
CommitTransaction(void)
***************
*** 1510,1515 ****
--- 1518,1525 ----
* xid 0 as running as well, or it will be able to see two tuple versions
* - one deleted by xid 1 and one inserted by xid 0. See notes in
* GetSnapshotData.
+ *
+ * Note: MyProc may be null during bootstrap.
*----------
*/
if (MyProc != NULL)
***************
*** 1608,1613 ****
--- 1618,1842 ----
RESUME_INTERRUPTS();
}
+
+ /*
+ * PrepareTransaction
+ *
+ * NB: if you change this routine, better look at CommitTransaction too!
+ */
+ static void
+ PrepareTransaction(void)
+ {
+ TransactionState s = CurrentTransactionState;
+ TransactionId xid = GetCurrentTransactionId();
+ GlobalTransaction gxact;
+
+ ShowTransactionState("PrepareTransaction");
+
+ /*
+ * check the current transaction state
+ */
+ if (s->state != TRANS_INPROGRESS)
+ elog(WARNING, "PrepareTransaction while in %s state",
+ TransStateAsString(s->state));
+ Assert(s->parent == NULL);
+
+ /*
+ * Do pre-commit processing (most of this stuff requires database
+ * access, and in fact could still cause an error...)
+ *
+ * It is possible for PrepareHoldablePortals to invoke functions that
+ * queue deferred triggers, and it's also possible that triggers create
+ * holdable cursors. So we have to loop until there's nothing left to
+ * do.
+ */
+ for (;;)
+ {
+ /*
+ * Fire all currently pending deferred triggers.
+ */
+ AfterTriggerFireDeferred();
+
+ /*
+ * Convert any open holdable cursors into static portals. If there
+ * weren't any, we are done ... otherwise loop back to check if they
+ * queued deferred triggers. Lather, rinse, repeat.
+ */
+ if (!PrepareHoldablePortals())
+ break;
+ }
+
+ /* Now we can shut down the deferred-trigger manager */
+ AfterTriggerEndXact(true);
+
+ /* Close any open regular cursors */
+ AtCommit_Portals();
+
+ /*
+ * Let ON COMMIT management do its thing (must happen after closing
+ * cursors, to avoid dangling-reference problems)
+ */
+ PreCommit_on_commit_actions();
+
+ /* close large objects before lower-level cleanup */
+ AtEOXact_LargeObject(true);
+
+ /* NOTIFY and flatfiles will be handled below */
+
+ /* Prevent cancel/die interrupt while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /*
+ * set the current transaction state information appropriately during
+ * the processing
+ */
+ s->state = TRANS_PREPARE;
+
+ /* Tell bufmgr and smgr to prepare for commit */
+ BufmgrCommit();
+
+ /*
+ * Reserve the GID for this transaction. This could fail if the
+ * requested GID is invalid or already in use.
+ */
+ gxact = MarkAsPreparing(xid, MyDatabaseId, prepareGID, GetUserId());
+ prepareGID = NULL;
+
+ /*
+ * Collect data for the 2PC state file. Note that in general, no actual
+ * state change should happen in the called modules during this step,
+ * since it's still possible to fail before commit, and in that case we
+ * want transaction abort to be able to clean up. (In particular, the
+ * AtPrepare routines may error out if they find cases they cannot
+ * handle.) State cleanup should happen in the PostPrepare routines
+ * below. However, some modules can go ahead and clear state here
+ * because they wouldn't do anything with it during abort anyway.
+ *
+ * Note: because the 2PC state file records will be replayed in the same
+ * order they are made, the order of these calls has to match the order
+ * in which we want things to happen during COMMIT PREPARED or
+ * ROLLBACK PREPARED; in particular, pay attention to whether things
+ * should happen before or after releasing the transaction's locks.
+ */
+ StartPrepare(gxact);
+
+ AtPrepare_Notify();
+ AtPrepare_UpdateFlatFiles();
+ AtPrepare_Inval();
+ AtPrepare_Locks();
+
+ /*
+ * Here is where we really truly prepare.
+ *
+ * We have to record transaction prepares even if we didn't
+ * make any updates, because the transaction manager might
+ * get confused if we lose a global transaction.
+ */
+ EndPrepare(gxact);
+
+ /*
+ * Mark the prepared transaction as valid. As soon as we mark ourselves
+ * not running in MyProc below, others can commit/rollback the xact.
+ *
+ * NB: a side effect of this is to make a dummy ProcArray entry for the
+ * prepared XID. This must happen before we clear the XID from MyProc,
+ * else there is a window where the XID is not running according to
+ * TransactionIdInProgress, and onlookers would be entitled to assume
+ * the xact crashed. Instead we have a window where the same XID
+ * appears twice in ProcArray, which is OK.
+ */
+ MarkAsPrepared(gxact);
+
+ /*
+ * Now we clean up backend-internal state and release internal
+ * resources.
+ */
+
+ /* Break the chain of back-links in the XLOG records I output */
+ MyLastRecPtr.xrecoff = 0;
+ MyXactMadeXLogEntry = false;
+ MyXactMadeTempRelUpdate = false;
+
+ /*
+ * Let others know about no transaction in progress by me. This has
+ * to be done *after* the prepared transaction has been marked valid,
+ * else someone may think it is unlocked and recyclable.
+ */
+
+ /* Lock ProcArrayLock because that's what GetSnapshotData uses. */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ MyProc->xid = InvalidTransactionId;
+ MyProc->xmin = InvalidTransactionId;
+
+ /* Clear the subtransaction-XID cache too while holding the lock */
+ MyProc->subxids.nxids = 0;
+ MyProc->subxids.overflowed = false;
+
+ LWLockRelease(ProcArrayLock);
+
+ /*
+ * This is all post-transaction cleanup. Note that if an error is raised
+ * here, it's too late to abort the transaction. This should be just
+ * noncritical resource releasing. See notes in CommitTransaction.
+ */
+
+ CallXactCallbacks(XACT_EVENT_PREPARE);
+
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_BEFORE_LOCKS,
+ true, true);
+
+ /* Check we've released all buffer pins */
+ AtEOXact_Buffers(true);
+
+ /* notify and flatfiles don't need a postprepare call */
+
+ PostPrepare_Inval();
+
+ PostPrepare_smgr();
+
+ AtEOXact_MultiXact();
+
+ PostPrepare_Locks(xid);
+
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_LOCKS,
+ true, true);
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_AFTER_LOCKS,
+ true, true);
+
+ /* PREPARE acts the same as COMMIT as far as GUC is concerned */
+ AtEOXact_GUC(true, false);
+ AtEOXact_SPI(true);
+ AtEOXact_on_commit_actions(true);
+ AtEOXact_Namespace(true);
+ /* smgrcommit already done */
+ AtEOXact_Files();
+
+ CurrentResourceOwner = NULL;
+ ResourceOwnerDelete(TopTransactionResourceOwner);
+ s->curTransactionOwner = NULL;
+ CurTransactionResourceOwner = NULL;
+ TopTransactionResourceOwner = NULL;
+
+ AtCommit_Memory();
+
+ s->transactionId = InvalidTransactionId;
+ s->subTransactionId = InvalidSubTransactionId;
+ s->nestingLevel = 0;
+ s->childXids = NIL;
+
+ /*
+ * done with 1st phase commit processing, set current transaction
+ * state back to default
+ */
+ s->state = TRANS_DEFAULT;
+
+ RESUME_INTERRUPTS();
+ }
+
+
/*
* AbortTransaction
*/
***************
*** 1640,1646 ****
/*
* check the current transaction state
*/
! if (s->state != TRANS_INPROGRESS)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
--- 1869,1875 ----
/*
* check the current transaction state
*/
! if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
***************
*** 1833,1838 ****
--- 2062,2068 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(ERROR, "StartTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 1935,1940 ****
--- 2165,2179 ----
break;
/*
+ * We are completing a "PREPARE TRANSACTION" command. Do it and
+ * return to the idle state.
+ */
+ case TBLOCK_PREPARE:
+ PrepareTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+
+ /*
* We were just issued a SAVEPOINT inside a transaction block.
* Start a subtransaction. (DefineSavepoint already did
* PushTransaction, so as to have someplace to put the
***************
*** 1964,1969 ****
--- 2203,2214 ----
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
}
+ else if (s->blockState == TBLOCK_PREPARE)
+ {
+ Assert(s->parent == NULL);
+ PrepareTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ }
else
{
Assert(s->blockState == TBLOCK_INPROGRESS ||
***************
*** 2156,2161 ****
--- 2401,2417 ----
break;
/*
+ * Here, we failed while trying to PREPARE. Clean up the
+ * transaction and return to idle state (we do not want to
+ * stay in the transaction).
+ */
+ case TBLOCK_PREPARE:
+ AbortTransaction();
+ CleanupTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+
+ /*
* We got an error inside a subtransaction. Abort just the
* subtransaction, and go to the persistent SUBABORT state
* until we get ROLLBACK.
***************
*** 2487,2492 ****
--- 2743,2749 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "BeginTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2494,2499 ****
--- 2751,2807 ----
}
/*
+ * PrepareTransactionBlock
+ * This executes a PREPARE command.
+ *
+ * Since PREPARE may actually do a ROLLBACK, the result indicates what
+ * happened: TRUE for PREPARE, FALSE for ROLLBACK.
+ *
+ * Note that we don't actually do anything here except change blockState.
+ * The real work will be done in the upcoming PrepareTransaction().
+ * We do it this way because it's not convenient to change memory context,
+ * resource owner, etc while executing inside a Portal.
+ */
+ bool
+ PrepareTransactionBlock(char *gid)
+ {
+ TransactionState s;
+ bool result;
+
+ /* Set up to commit the current transaction */
+ result = EndTransactionBlock();
+
+ /* If successful, change outer tblock state to PREPARE */
+ if (result)
+ {
+ s = CurrentTransactionState;
+
+ while (s->parent != NULL)
+ s = s->parent;
+
+ if (s->blockState == TBLOCK_END)
+ {
+ /* Save GID where PrepareTransaction can find it again */
+ prepareGID = MemoryContextStrdup(TopTransactionContext, gid);
+
+ s->blockState = TBLOCK_PREPARE;
+ }
+ else
+ {
+ /*
+ * ignore case where we are not in a transaction;
+ * EndTransactionBlock already issued a warning.
+ */
+ Assert(s->blockState == TBLOCK_STARTED);
+ /* Don't send back a PREPARE result tag... */
+ result = false;
+ }
+ }
+
+ return result;
+ }
+
+ /*
* EndTransactionBlock
* This executes a COMMIT command.
*
***************
*** 2603,2608 ****
--- 2911,2917 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "EndTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2694,2699 ****
--- 3003,3009 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2740,2745 ****
--- 3050,3056 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "DefineSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2795,2800 ****
--- 3106,3112 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "ReleaseSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2892,2897 ****
--- 3204,3210 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "RollbackToSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2999,3004 ****
--- 3312,3318 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "BeginInternalSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 3064,3069 ****
--- 3378,3384 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 3111,3116 ****
--- 3426,3432 ----
case TBLOCK_INPROGRESS:
case TBLOCK_END:
case TBLOCK_ABORT_PENDING:
+ case TBLOCK_PREPARE:
/* In a transaction, so clean up */
AbortTransaction();
CleanupTransaction();
***************
*** 3202,3207 ****
--- 3518,3524 ----
case TBLOCK_SUBINPROGRESS:
case TBLOCK_END:
case TBLOCK_SUBEND:
+ case TBLOCK_PREPARE:
return 'T'; /* in transaction */
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
***************
*** 3684,3689 ****
--- 4001,4008 ----
return "ABORT END";
case TBLOCK_ABORT_PENDING:
return "ABORT PEND";
+ case TBLOCK_PREPARE:
+ return "PREPARE";
case TBLOCK_SUBBEGIN:
return "SUB BEGIN";
case TBLOCK_SUBINPROGRESS:
***************
*** 3717,3728 ****
return "DEFAULT";
case TRANS_START:
return "START";
case TRANS_COMMIT:
return "COMMIT";
case TRANS_ABORT:
return "ABORT";
! case TRANS_INPROGRESS:
! return "INPROGR";
}
return "UNRECOGNIZED";
}
--- 4036,4049 ----
return "DEFAULT";
case TRANS_START:
return "START";
+ case TRANS_INPROGRESS:
+ return "INPROGR";
case TRANS_COMMIT:
return "COMMIT";
case TRANS_ABORT:
return "ABORT";
! case TRANS_PREPARE:
! return "PREPARE";
}
return "UNRECOGNIZED";
}
***************
*** 3767,3772 ****
--- 4088,4163 ----
* XLOG support routines
*/
+ static void
+ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
+ {
+ TransactionId *sub_xids;
+ TransactionId max_xid;
+ int i;
+
+ TransactionIdCommit(xid);
+
+ /* Mark committed subtransactions as committed */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
+
+ /* Make sure nextXid is beyond any XID mentioned in the record */
+ max_xid = xid;
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ {
+ if (TransactionIdPrecedes(max_xid, sub_xids[i]))
+ max_xid = sub_xids[i];
+ }
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = max_xid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+
+ /* Make sure files supposed to be dropped are dropped */
+ for (i = 0; i < xlrec->nrels; i++)
+ {
+ XLogCloseRelation(xlrec->xnodes[i]);
+ smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
+ }
+ }
+
+ static void
+ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+ {
+ TransactionId *sub_xids;
+ TransactionId max_xid;
+ int i;
+
+ TransactionIdAbort(xid);
+
+ /* Mark subtransactions as aborted */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
+
+ /* Make sure nextXid is beyond any XID mentioned in the record */
+ max_xid = xid;
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ {
+ if (TransactionIdPrecedes(max_xid, sub_xids[i]))
+ max_xid = sub_xids[i];
+ }
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = max_xid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+
+ /* Make sure files supposed to be dropped are dropped */
+ for (i = 0; i < xlrec->nrels; i++)
+ {
+ XLogCloseRelation(xlrec->xnodes[i]);
+ smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
+ }
+ }
+
void
xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
***************
*** 3775,3912 ****
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
-
- TransactionIdCommit(record->xl_xid);
-
- /* Mark committed subtransactions as committed */
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
! /* Make sure nextXid is beyond any XID mentioned in the record */
! max_xid = record->xl_xid;
! for (i = 0; i < xlrec->nsubxacts; i++)
! {
! if (TransactionIdPrecedes(max_xid, sub_xids[i]))
! max_xid = sub_xids[i];
! }
! if (TransactionIdFollowsOrEquals(max_xid,
! ShmemVariableCache->nextXid))
! {
! ShmemVariableCache->nextXid = max_xid;
! TransactionIdAdvance(ShmemVariableCache->nextXid);
! }
! /* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
! XLogCloseRelation(xlrec->xnodes[i]);
! smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
}
}
! else if (info == XLOG_XACT_ABORT)
{
! xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
! TransactionId *sub_xids;
! TransactionId max_xid;
! int i;
!
! TransactionIdAbort(record->xl_xid);
!
! /* Mark subtransactions as aborted */
! sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
! TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
! /* Make sure nextXid is beyond any XID mentioned in the record */
! max_xid = record->xl_xid;
for (i = 0; i < xlrec->nsubxacts; i++)
! {
! if (TransactionIdPrecedes(max_xid, sub_xids[i]))
! max_xid = sub_xids[i];
! }
! if (TransactionIdFollowsOrEquals(max_xid,
! ShmemVariableCache->nextXid))
! {
! ShmemVariableCache->nextXid = max_xid;
! TransactionIdAdvance(ShmemVariableCache->nextXid);
! }
! /* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
! XLogCloseRelation(xlrec->xnodes[i]);
! smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
}
}
! else
! elog(PANIC, "xact_redo: unknown op code %u", info);
}
void
xact_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
- int i;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- struct tm *tm = localtime(&xlrec->xtime);
-
- sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u",
- tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
- tm->tm_hour, tm->tm_min, tm->tm_sec);
- if (xlrec->nrels > 0)
- {
- sprintf(buf + strlen(buf), "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
- {
- RelFileNode rnode = xlrec->xnodes[i];
! sprintf(buf + strlen(buf), " %u/%u/%u",
! rnode.spcNode, rnode.dbNode, rnode.relNode);
! }
! }
! if (xlrec->nsubxacts > 0)
! {
! TransactionId *xacts = (TransactionId *)
! &xlrec->xnodes[xlrec->nrels];
!
! sprintf(buf + strlen(buf), "; subxacts:");
! for (i = 0; i < xlrec->nsubxacts; i++)
! sprintf(buf + strlen(buf), " %u", xacts[i]);
! }
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
- struct tm *tm = localtime(&xlrec->xtime);
! sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u",
! tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
! tm->tm_hour, tm->tm_min, tm->tm_sec);
! if (xlrec->nrels > 0)
! {
! sprintf(buf + strlen(buf), "; rels:");
! for (i = 0; i < xlrec->nrels; i++)
! {
! RelFileNode rnode = xlrec->xnodes[i];
! sprintf(buf + strlen(buf), " %u/%u/%u",
! rnode.spcNode, rnode.dbNode, rnode.relNode);
! }
! }
! if (xlrec->nsubxacts > 0)
! {
! TransactionId *xacts = (TransactionId *)
! &xlrec->xnodes[xlrec->nrels];
! sprintf(buf + strlen(buf), "; subxacts:");
! for (i = 0; i < xlrec->nsubxacts; i++)
! sprintf(buf + strlen(buf), " %u", xacts[i]);
! }
}
else
strcat(buf, "UNKNOWN");
--- 4166,4302 ----
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
! xact_redo_commit(xlrec, record->xl_xid);
! }
! else if (info == XLOG_XACT_ABORT)
! {
! xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
!
! xact_redo_abort(xlrec, record->xl_xid);
! }
! else if (info == XLOG_XACT_PREPARE)
! {
! /* the record contents are exactly the 2PC file */
! RecreateTwoPhaseFile(record->xl_xid,
! XLogRecGetData(record), record->xl_len);
! }
! else if (info == XLOG_XACT_COMMIT_PREPARED)
! {
! xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
! xact_redo_commit(&xlrec->crec, xlrec->xid);
! RemoveTwoPhaseFile(xlrec->xid, false);
! }
! else if (info == XLOG_XACT_ABORT_PREPARED)
! {
! xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
!
! xact_redo_abort(&xlrec->arec, xlrec->xid);
! RemoveTwoPhaseFile(xlrec->xid, false);
! }
! else
! elog(PANIC, "xact_redo: unknown op code %u", info);
! }
!
! static void
! xact_desc_commit(char *buf, xl_xact_commit *xlrec)
! {
! struct tm *tm = localtime(&xlrec->xtime);
! int i;
!
! sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u",
! tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
! tm->tm_hour, tm->tm_min, tm->tm_sec);
! if (xlrec->nrels > 0)
! {
! sprintf(buf + strlen(buf), "; rels:");
for (i = 0; i < xlrec->nrels; i++)
{
! RelFileNode rnode = xlrec->xnodes[i];
!
! sprintf(buf + strlen(buf), " %u/%u/%u",
! rnode.spcNode, rnode.dbNode, rnode.relNode);
}
}
! if (xlrec->nsubxacts > 0)
{
! TransactionId *xacts = (TransactionId *)
! &xlrec->xnodes[xlrec->nrels];
! sprintf(buf + strlen(buf), "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
! sprintf(buf + strlen(buf), " %u", xacts[i]);
! }
! }
!
! static void
! xact_desc_abort(char *buf, xl_xact_abort *xlrec)
! {
! struct tm *tm = localtime(&xlrec->xtime);
! int i;
! sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u",
! tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
! tm->tm_hour, tm->tm_min, tm->tm_sec);
! if (xlrec->nrels > 0)
! {
! sprintf(buf + strlen(buf), "; rels:");
for (i = 0; i < xlrec->nrels; i++)
{
! RelFileNode rnode = xlrec->xnodes[i];
!
! sprintf(buf + strlen(buf), " %u/%u/%u",
! rnode.spcNode, rnode.dbNode, rnode.relNode);
}
}
! if (xlrec->nsubxacts > 0)
! {
! TransactionId *xacts = (TransactionId *)
! &xlrec->xnodes[xlrec->nrels];
!
! sprintf(buf + strlen(buf), "; subxacts:");
! for (i = 0; i < xlrec->nsubxacts; i++)
! sprintf(buf + strlen(buf), " %u", xacts[i]);
! }
}
void
xact_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
! strcat(buf, "commit: ");
! xact_desc_commit(buf, xlrec);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
! strcat(buf, "abort: ");
! xact_desc_abort(buf, xlrec);
! }
! else if (info == XLOG_XACT_PREPARE)
! {
! strcat(buf, "prepare");
! }
! else if (info == XLOG_XACT_COMMIT_PREPARED)
! {
! xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
! sprintf(buf + strlen(buf), "commit %u: ", xlrec->xid);
! xact_desc_commit(buf, &xlrec->crec);
! }
! else if (info == XLOG_XACT_ABORT_PREPARED)
! {
! xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
! sprintf(buf + strlen(buf), "abort %u: ", xlrec->xid);
! xact_desc_abort(buf, &xlrec->arec);
}
else
strcat(buf, "UNKNOWN");
*** src/backend/access/transam/xlog.c.orig Fri Jun 10 16:38:50 2005
--- src/backend/access/transam/xlog.c Fri Jun 17 14:54:04 2005
***************
*** 25,30 ****
--- 25,31 ----
#include "access/clog.h"
#include "access/multixact.h"
#include "access/subtrans.h"
+ #include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
***************
*** 814,831 ****
/* Compute record's XLOG location */
INSERT_RECPTR(RecPtr, Insert, curridx);
- /* If first XLOG record of transaction, save it in PGPROC array */
- if (MyLastRecPtr.xrecoff == 0 && !no_tran)
- {
- /*
- * We do not acquire ProcArrayLock here because of possible deadlock.
- * Anyone who wants to inspect other procs' logRec must acquire
- * WALInsertLock, instead. A better solution would be a per-PROC
- * spinlock, but no time for that before 7.2 --- tgl 12/19/01.
- */
- MyProc->logRec = RecPtr;
- }
-
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
--- 815,820 ----
***************
*** 3827,3832 ****
--- 3816,3822 ----
BootStrapCLOG();
BootStrapSUBTRANS();
BootStrapMultiXact();
+
free(buffer);
}
***************
*** 4268,4273 ****
--- 4258,4264 ----
uint32 endLogSeg;
XLogRecord *record;
uint32 freespace;
+ TransactionId oldestActiveXID;
CritSectionCount++;
***************
*** 4678,4710 ****
XLogCtl->Write.curridx = NextBufIdx(0);
}
! #ifdef NOT_USED
! /* UNDO */
! if (InRecovery)
! {
! RecPtr = ReadRecPtr;
! if (XLByteLT(checkPoint.undo, RecPtr))
! {
! ereport(LOG,
! (errmsg("undo starts at %X/%X",
! RecPtr.xlogid, RecPtr.xrecoff)));
! do
! {
! record = ReadRecord(&RecPtr, PANIC);
! if (TransactionIdIsValid(record->xl_xid) &&
! !TransactionIdDidCommit(record->xl_xid))
! RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
! RecPtr = record->xl_prev;
! } while (XLByteLE(checkPoint.undo, RecPtr));
! ereport(LOG,
! (errmsg("undo done at %X/%X",
! ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
! }
! else
! ereport(LOG,
! (errmsg("undo is not required")));
! }
! #endif
if (InRecovery)
{
--- 4669,4676 ----
XLogCtl->Write.curridx = NextBufIdx(0);
}
! /* Pre-scan prepared transactions to find out the range of XIDs present */
! oldestActiveXID = PrescanPreparedTransactions();
if (InRecovery)
{
***************
*** 4767,4775 ****
/* Start up the commit log and related stuff, too */
StartupCLOG();
! StartupSUBTRANS();
StartupMultiXact();
ereport(LOG,
(errmsg("database system is ready")));
CritSectionCount--;
--- 4733,4744 ----
/* Start up the commit log and related stuff, too */
StartupCLOG();
! StartupSUBTRANS(oldestActiveXID);
StartupMultiXact();
+ /* Reload shared-memory state for prepared transactions */
+ RecoverPreparedTransactions();
+
ereport(LOG,
(errmsg("database system is ready")));
CritSectionCount--;
***************
*** 5096,5126 ****
}
/*
- * Get UNDO record ptr - this is oldest of PGPROC->logRec values. We
- * do this while holding insert lock to ensure that we won't miss any
- * about-to-commit transactions (UNDO must include all xacts that have
- * commits after REDO point).
- *
- * XXX temporarily ifdef'd out to avoid three-way deadlock condition:
- * GetUndoRecPtr needs to grab ProcArrayLock to ensure that it is looking
- * at a stable set of proc records, but grabbing ProcArrayLock while
- * holding WALInsertLock is no good. GetNewTransactionId may cause a
- * WAL record to be written while holding XidGenLock, and
- * GetSnapshotData needs to get XidGenLock while holding ProcArrayLock,
- * so there's a risk of deadlock. Need to find a better solution. See
- * pgsql-hackers discussion of 17-Dec-01.
- *
- * XXX actually, the whole UNDO code is dead code and unlikely to ever be
- * revived, so the lack of a good solution here is not troubling.
- */
- #ifdef NOT_USED
- checkPoint.undo = GetUndoRecPtr();
-
- if (shutdown && checkPoint.undo.xrecoff != 0)
- elog(PANIC, "active transaction while database system is shutting down");
- #endif
-
- /*
* Now we can release insert lock and checkpoint start lock, allowing
* other xacts to proceed even while we are flushing disk buffers.
*/
--- 5065,5070 ----
***************
*** 5195,5216 ****
/*
* Select point at which we can truncate the log, which we base on the
* prior checkpoint's earliest info.
! *
! * With UNDO support: oldest item is redo or undo, whichever is older;
! * but watch out for case that undo = 0.
! *
! * Without UNDO support: just use the redo pointer. This allows xlog
! * space to be freed much faster when there are long-running
! * transactions.
! */
! #ifdef NOT_USED
! if (ControlFile->checkPointCopy.undo.xrecoff != 0 &&
! XLByteLT(ControlFile->checkPointCopy.undo,
! ControlFile->checkPointCopy.redo))
! XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
! else
! #endif
! XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
/*
* Update the control file.
--- 5139,5146 ----
/*
* Select point at which we can truncate the log, which we base on the
* prior checkpoint's earliest info.
! */
! XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
/*
* Update the control file.
*** src/backend/catalog/system_views.sql.orig Tue May 17 17:46:09 2005
--- src/backend/catalog/system_views.sql Fri Jun 17 18:19:33 2005
***************
*** 102,107 ****
--- 102,140 ----
REVOKE ALL on pg_statistic FROM public;
+ CREATE VIEW pg_locks AS
+ SELECT *
+ FROM pg_lock_status() AS L
+ (locktype text, database oid, relation oid, page int4, tuple int2,
+ transaction xid, classid oid, objid oid, objsubid int2,
+ pid int4, mode text, granted boolean);
+
+ CREATE VIEW pg_prepared_xacts AS
+ SELECT P.transaction, P.gid, U.usename AS owner, D.datname AS database
+ FROM pg_prepared_xact() AS P
+ (transaction xid, gid text, ownerid int4, dbid oid)
+ LEFT JOIN pg_database D ON P.dbid = D.oid
+ LEFT JOIN pg_shadow U ON P.ownerid = U.usesysid;
+
+ CREATE VIEW pg_settings AS
+ SELECT *
+ FROM pg_show_all_settings() AS A
+ (name text, setting text, category text, short_desc text, extra_desc text,
+ context text, vartype text, source text, min_val text, max_val text);
+
+ CREATE RULE pg_settings_u AS
+ ON UPDATE TO pg_settings
+ WHERE new.name = old.name DO
+ SELECT set_config(old.name, new.setting, 'f');
+
+ CREATE RULE pg_settings_n AS
+ ON UPDATE TO pg_settings
+ DO INSTEAD NOTHING;
+
+ GRANT SELECT, UPDATE ON pg_settings TO PUBLIC;
+
+ -- Statistics views
+
CREATE VIEW pg_stat_all_tables AS
SELECT
C.oid AS relid,
***************
*** 258,284 ****
pg_stat_get_db_blocks_hit(D.oid) AS blks_read,
pg_stat_get_db_blocks_hit(D.oid) AS blks_hit
FROM pg_database D;
-
- CREATE VIEW pg_locks AS
- SELECT *
- FROM pg_lock_status() AS L
- (locktype text, database oid, relation oid, page int4, tuple int2,
- transaction xid, classid oid, objid oid, objsubid int2,
- pid int4, mode text, granted boolean);
-
- CREATE VIEW pg_settings AS
- SELECT *
- FROM pg_show_all_settings() AS A
- (name text, setting text, category text, short_desc text, extra_desc text,
- context text, vartype text, source text, min_val text, max_val text);
-
- CREATE RULE pg_settings_u AS
- ON UPDATE TO pg_settings
- WHERE new.name = old.name DO
- SELECT set_config(old.name, new.setting, 'f');
-
- CREATE RULE pg_settings_n AS
- ON UPDATE TO pg_settings
- DO INSTEAD NOTHING;
-
- GRANT SELECT, UPDATE ON pg_settings TO PUBLIC;
--- 291,293 ----
*** src/backend/commands/async.c.orig Thu May 19 15:34:48 2005
--- src/backend/commands/async.c Fri Jun 17 16:11:23 2005
***************
*** 78,83 ****
--- 78,84 ----
#include
#include "access/heapam.h"
+ #include "access/twophase_rmgr.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "libpq/libpq.h"
***************
*** 407,412 ****
--- 408,443 ----
CommitTransactionCommand();
}
+
+ /*
+ *--------------------------------------------------------------
+ * AtPrepare_Notify
+ *
+ * This is called at the prepare phase of a two-phase
+ * transaction. Save the state for possible commit later.
+ *--------------------------------------------------------------
+ */
+ void
+ AtPrepare_Notify(void)
+ {
+ ListCell *p;
+
+ foreach(p, pendingNotifies)
+ {
+ const char *relname = (const char *) lfirst(p);
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
+ relname, strlen(relname) + 1);
+ }
+
+ /*
+ * We can clear the state immediately, rather than needing a separate
+ * PostPrepare call, because if the transaction fails we'd just
+ * discard the state anyway.
+ */
+ ClearPendingNotifies();
+ }
+
/*
*--------------------------------------------------------------
* AtCommit_Notify
***************
*** 1016,1023 ****
foreach(p, pendingNotifies)
{
! /* Use NAMEDATALEN for relname comparison. DZ - 26-08-1996 */
! if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
return true;
}
--- 1047,1055 ----
foreach(p, pendingNotifies)
{
! const char *prelname = (const char *) lfirst(p);
!
! if (strcmp(prelname, relname) == 0)
return true;
}
***************
*** 1036,1039 ****
--- 1068,1090 ----
* list head pointer.
*/
pendingNotifies = NIL;
+ }
+
+ /*
+ * 2PC processing routine for COMMIT PREPARED case.
+ *
+ * (We don't have to do anything for ROLLBACK PREPARED.)
+ */
+ void
+ notify_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+ {
+ /*
+ * Set up to issue the NOTIFY at the end of my own
+ * current transaction. (XXX this has some issues if my own
+ * transaction later rolls back, or if there is any significant
+ * delay before I commit. OK for now because we disallow
+ * COMMIT PREPARED inside a transaction block.)
+ */
+ Async_Notify((char *) recdata);
}
*** src/backend/nodes/copyfuncs.c.orig Thu Jun 9 00:18:58 2005
--- src/backend/nodes/copyfuncs.c Thu Jun 16 15:18:21 2005
***************
*** 2085,2090 ****
--- 2085,2091 ----
COPY_SCALAR_FIELD(kind);
COPY_NODE_FIELD(options);
+ COPY_STRING_FIELD(gid);
return newnode;
}
*** src/backend/nodes/equalfuncs.c.orig Thu Jun 9 00:18:58 2005
--- src/backend/nodes/equalfuncs.c Thu Jun 16 15:18:22 2005
***************
*** 1053,1058 ****
--- 1053,1059 ----
{
COMPARE_SCALAR_FIELD(kind);
COMPARE_NODE_FIELD(options);
+ COMPARE_STRING_FIELD(gid);
return true;
}
*** src/backend/parser/gram.y.orig Wed Jun 8 17:15:28 2005
--- src/backend/parser/gram.y Fri Jun 17 15:10:25 2005
***************
*** 387,393 ****
ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER
PARTIAL PASSWORD PLACING POSITION
! PRECISION PRESERVE PREPARE PRIMARY
PRIOR PRIVILEGES PROCEDURAL PROCEDURE
QUOTE
--- 387,393 ----
ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER
PARTIAL PASSWORD PLACING POSITION
! PRECISION PRESERVE PREPARE PREPARED PRIMARY
PRIOR PRIVILEGES PROCEDURAL PROCEDURE
QUOTE
***************
*** 4121,4126 ****
--- 4121,4147 ----
(Node *)makeString($4)));
$$ = (Node *)n;
}
+ | PREPARE TRANSACTION Sconst
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+ n->kind = TRANS_STMT_PREPARE;
+ n->gid = $3;
+ $$ = (Node *)n;
+ }
+ | COMMIT PREPARED Sconst
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+ n->kind = TRANS_STMT_COMMIT_PREPARED;
+ n->gid = $3;
+ $$ = (Node *)n;
+ }
+ | ROLLBACK PREPARED Sconst
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+ n->kind = TRANS_STMT_ROLLBACK_PREPARED;
+ n->gid = $3;
+ $$ = (Node *)n;
+ }
;
opt_transaction: WORK {}
***************
*** 7855,7860 ****
--- 7872,7878 ----
| PARTIAL
| PASSWORD
| PREPARE
+ | PREPARED
| PRESERVE
| PRIOR
| PRIVILEGES
*** src/backend/parser/keywords.c.orig Wed May 11 12:13:51 2005
--- src/backend/parser/keywords.c Fri Jun 17 14:54:06 2005
***************
*** 243,248 ****
--- 243,249 ----
{"position", POSITION},
{"precision", PRECISION},
{"prepare", PREPARE},
+ {"prepared", PREPARED},
{"preserve", PRESERVE},
{"primary", PRIMARY},
{"prior", PRIOR},
*** src/backend/postmaster/postmaster.c.orig Tue Jun 14 18:17:30 2005
--- src/backend/postmaster/postmaster.c Thu Jun 16 15:18:10 2005
***************
*** 252,258 ****
static void pmdaemonize(void);
static Port *ConnCreate(int serverFd);
static void ConnFree(Port *port);
! static void reset_shared(unsigned short port);
static void SIGHUP_handler(SIGNAL_ARGS);
static void pmdie(SIGNAL_ARGS);
static void reaper(SIGNAL_ARGS);
--- 252,258 ----
static void pmdaemonize(void);
static Port *ConnCreate(int serverFd);
static void ConnFree(Port *port);
! static void reset_shared(int port);
static void SIGHUP_handler(SIGNAL_ARGS);
static void pmdie(SIGNAL_ARGS);
static void reaper(SIGNAL_ARGS);
***************
*** 1783,1789 ****
* reset_shared -- reset shared memory and semaphores
*/
static void
! reset_shared(unsigned short port)
{
/*
* Create or re-create shared memory and semaphores.
--- 1783,1789 ----
* reset_shared -- reset shared memory and semaphores
*/
static void
! reset_shared(int port)
{
/*
* Create or re-create shared memory and semaphores.
***************
*** 1793,1799 ****
* used to determine IPC keys. This helps ensure that we will clean
* up dead IPC objects if the postmaster crashes and is restarted.
*/
! CreateSharedMemoryAndSemaphores(false, MaxBackends, port);
}
--- 1793,1799 ----
* used to determine IPC keys. This helps ensure that we will clean
* up dead IPC objects if the postmaster crashes and is restarted.
*/
! CreateSharedMemoryAndSemaphores(false, port);
}
***************
*** 3182,3188 ****
/* BackendRun will close sockets */
/* Attach process to shared data structures */
! CreateSharedMemoryAndSemaphores(false, MaxBackends, 0);
#ifdef USE_SSL
/*
--- 3182,3188 ----
/* BackendRun will close sockets */
/* Attach process to shared data structures */
! CreateSharedMemoryAndSemaphores(false, 0);
#ifdef USE_SSL
/*
***************
*** 3203,3209 ****
ClosePostmasterPorts(false);
/* Attach process to shared data structures */
! CreateSharedMemoryAndSemaphores(false, MaxBackends, 0);
BootstrapMain(argc - 2, argv + 2);
proc_exit(0);
--- 3203,3209 ----
ClosePostmasterPorts(false);
/* Attach process to shared data structures */
! CreateSharedMemoryAndSemaphores(false, 0);
BootstrapMain(argc - 2, argv + 2);
proc_exit(0);
*** src/backend/storage/ipc/ipci.c.orig Thu May 19 17:35:46 2005
--- src/backend/storage/ipc/ipci.c Thu Jun 16 15:18:03 2005
***************
*** 17,22 ****
--- 17,23 ----
#include "access/clog.h"
#include "access/multixact.h"
#include "access/subtrans.h"
+ #include "access/twophase.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "postmaster/bgwriter.h"
***************
*** 54,62 ****
* memory. This is true for a standalone backend, false for a postmaster.
*/
void
! CreateSharedMemoryAndSemaphores(bool makePrivate,
! int maxBackends,
! int port)
{
PGShmemHeader *seghdr = NULL;
--- 55,61 ----
* memory. This is true for a standalone backend, false for a postmaster.
*/
void
! CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
{
PGShmemHeader *seghdr = NULL;
***************
*** 72,86 ****
*/
size = hash_estimate_size(SHMEM_INDEX_SIZE, sizeof(ShmemIndexEnt));
size += BufferShmemSize();
! size += LockShmemSize(maxBackends);
! size += ProcGlobalShmemSize(maxBackends);
size += XLOGShmemSize();
size += CLOGShmemSize();
size += SUBTRANSShmemSize();
size += MultiXactShmemSize();
size += LWLockShmemSize();
! size += ProcArrayShmemSize(maxBackends);
! size += SInvalShmemSize(maxBackends);
size += FreeSpaceShmemSize();
size += BgWriterShmemSize();
#ifdef EXEC_BACKEND
--- 71,86 ----
*/
size = hash_estimate_size(SHMEM_INDEX_SIZE, sizeof(ShmemIndexEnt));
size += BufferShmemSize();
! size += LockShmemSize();
! size += ProcGlobalShmemSize();
size += XLOGShmemSize();
size += CLOGShmemSize();
size += SUBTRANSShmemSize();
+ size += TwoPhaseShmemSize();
size += MultiXactShmemSize();
size += LWLockShmemSize();
! size += ProcArrayShmemSize();
! size += SInvalShmemSize(MaxBackends);
size += FreeSpaceShmemSize();
size += BgWriterShmemSize();
#ifdef EXEC_BACKEND
***************
*** 100,106 ****
/*
* Create semaphores
*/
! numSemas = ProcGlobalSemas(maxBackends);
numSemas += SpinlockSemas();
PGReserveSemaphores(numSemas, port);
}
--- 100,106 ----
/*
* Create semaphores
*/
! numSemas = ProcGlobalSemas();
numSemas += SpinlockSemas();
PGReserveSemaphores(numSemas, port);
}
***************
*** 144,149 ****
--- 144,150 ----
XLOGShmemInit();
CLOGShmemInit();
SUBTRANSShmemInit();
+ TwoPhaseShmemInit();
MultiXactShmemInit();
InitBufferPool();
***************
*** 151,168 ****
* Set up lock manager
*/
InitLocks();
! InitLockTable(maxBackends);
/*
* Set up process table
*/
! InitProcGlobal(maxBackends);
! CreateSharedProcArray(maxBackends);
/*
* Set up shared-inval messaging
*/
! CreateSharedInvalidationState(maxBackends);
/*
* Set up free-space map
--- 152,169 ----
* Set up lock manager
*/
InitLocks();
! InitLockTable();
/*
* Set up process table
*/
! InitProcGlobal();
! CreateSharedProcArray();
/*
* Set up shared-inval messaging
*/
! CreateSharedInvalidationState(MaxBackends);
/*
* Set up free-space map
*** src/backend/storage/ipc/procarray.c.orig Thu May 19 19:57:11 2005
--- src/backend/storage/ipc/procarray.c Fri Jun 17 15:51:11 2005
***************
*** 11,16 ****
--- 11,21 ----
* Because of various subtle race conditions it is critical that a backend
* hold the correct locks while setting or clearing its MyProc->xid field.
* See notes in GetSnapshotData.
+ *
+ * The process array now also includes PGPROC structures representing
+ * prepared transactions. The xid and subxids fields of these are valid,
+ * as is the procLocks list. They can be distinguished from regular backend
+ * PGPROCs at need by checking for pid == 0.
*
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
***************
*** 25,30 ****
--- 30,36 ----
#include "postgres.h"
#include "access/subtrans.h"
+ #include "access/twophase.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "storage/procarray.h"
***************
*** 76,100 ****
* Report shared-memory space needed by CreateSharedProcArray.
*/
int
! ProcArrayShmemSize(int maxBackends)
{
! /* sizeof(ProcArrayStruct) includes the first array element */
! return MAXALIGN(sizeof(ProcArrayStruct) +
! (maxBackends - 1) * sizeof(PGPROC *));
}
/*
* Initialize the shared PGPROC array during postmaster startup.
*/
void
! CreateSharedProcArray(int maxBackends)
{
bool found;
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
! ShmemInitStruct("Proc Array", ProcArrayShmemSize(maxBackends),
! &found);
if (!found)
{
--- 82,104 ----
* Report shared-memory space needed by CreateSharedProcArray.
*/
int
! ProcArrayShmemSize(void)
{
! return MAXALIGN(offsetof(ProcArrayStruct, procs) +
! (MaxBackends + max_prepared_xacts) * sizeof(PGPROC *));
}
/*
* Initialize the shared PGPROC array during postmaster startup.
*/
void
! CreateSharedProcArray(void)
{
bool found;
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
! ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found);
if (!found)
{
***************
*** 102,119 ****
* We're the first - initialize.
*/
procArray->numProcs = 0;
! procArray->maxProcs = maxBackends;
}
}
/*
! * Add my own PGPROC (found in the global MyProc) to the shared array.
! *
! * This must be called during backend startup, after fully initializing
! * the contents of MyProc.
*/
void
! ProcArrayAddMyself(void)
{
ProcArrayStruct *arrayP = procArray;
--- 106,120 ----
* We're the first - initialize.
*/
procArray->numProcs = 0;
! procArray->maxProcs = MaxBackends + max_prepared_xacts;
}
}
/*
! * Add the specified PGPROC to the shared array.
*/
void
! ProcArrayAdd(PGPROC *proc)
{
ProcArrayStruct *arrayP = procArray;
***************
*** 132,163 ****
errmsg("sorry, too many clients already")));
}
! arrayP->procs[arrayP->numProcs] = MyProc;
arrayP->numProcs++;
LWLockRelease(ProcArrayLock);
}
/*
! * Remove my own PGPROC (found in the global MyProc) from the shared array.
! *
! * This must be called during backend shutdown.
*/
void
! ProcArrayRemoveMyself(void)
{
ProcArrayStruct *arrayP = procArray;
int index;
#ifdef XIDCACHE_DEBUG
! DisplayXidCache();
#endif
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
for (index = 0; index < arrayP->numProcs; index++)
{
! if (arrayP->procs[index] == MyProc)
{
arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
arrayP->numProcs--;
--- 133,164 ----
errmsg("sorry, too many clients already")));
}
! arrayP->procs[arrayP->numProcs] = proc;
arrayP->numProcs++;
LWLockRelease(ProcArrayLock);
}
/*
! * Remove the specified PGPROC from the shared array.
*/
void
! ProcArrayRemove(PGPROC *proc)
{
ProcArrayStruct *arrayP = procArray;
int index;
#ifdef XIDCACHE_DEBUG
! /* dump stats at backend shutdown, but not prepared-xact end */
! if (proc->pid != 0)
! DisplayXidCache();
#endif
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
for (index = 0; index < arrayP->numProcs; index++)
{
! if (arrayP->procs[index] == proc)
{
arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
arrayP->numProcs--;
***************
*** 169,175 ****
/* Ooops */
LWLockRelease(ProcArrayLock);
! elog(LOG, "failed to find my own proc %p in ProcArray", MyProc);
}
--- 170,176 ----
/* Ooops */
LWLockRelease(ProcArrayLock);
! elog(LOG, "failed to find proc %p in ProcArray", proc);
}
***************
*** 330,335 ****
--- 331,385 ----
}
/*
+ * TransactionIdIsActive -- is xid the top-level XID of an active backend?
+ *
+ * This differs from TransactionIdIsInProgress in that it ignores prepared
+ * transactions. Also, we ignore subtransactions since that's not needed
+ * for current uses.
+ */
+ bool
+ TransactionIdIsActive(TransactionId xid)
+ {
+ bool result = false;
+ ProcArrayStruct *arrayP = procArray;
+ int i;
+
+ /*
+ * Don't bother checking a transaction older than RecentXmin; it
+ * could not possibly still be running.
+ */
+ if (TransactionIdPrecedes(xid, RecentXmin))
+ return false;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (i = 0; i < arrayP->numProcs; i++)
+ {
+ PGPROC *proc = arrayP->procs[i];
+
+ /* Fetch xid just once - see GetNewTransactionId */
+ TransactionId pxid = proc->xid;
+
+ if (!TransactionIdIsValid(pxid))
+ continue;
+
+ if (proc->pid == 0)
+ continue; /* ignore prepared transactions */
+
+ if (TransactionIdEquals(pxid, xid))
+ {
+ result = true;
+ break;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ return result;
+ }
+
+
+ /*
* GetOldestXmin -- returns oldest transaction that was running
* when any current transaction was started.
*
***************
*** 441,452 ****
TransactionIdIsValid(MyProc->xmin));
/*
! * Allocating space for MaxBackends xids is usually overkill;
* numProcs would be sufficient. But it seems better to do the
* malloc while not holding the lock, so we can't look at numProcs.
*
* This does open a possibility for avoiding repeated malloc/free: since
! * MaxBackends does not change at runtime, we can simply reuse the
* previous xip array if any. (This relies on the fact that all
* callers pass static SnapshotData structs.)
*/
--- 491,502 ----
TransactionIdIsValid(MyProc->xmin));
/*
! * Allocating space for maxProcs xids is usually overkill;
* numProcs would be sufficient. But it seems better to do the
* malloc while not holding the lock, so we can't look at numProcs.
*
* This does open a possibility for avoiding repeated malloc/free: since
! * maxProcs does not change at runtime, we can simply reuse the
* previous xip array if any. (This relies on the fact that all
* callers pass static SnapshotData structs.)
*/
***************
*** 456,462 ****
* First call for this snapshot
*/
snapshot->xip = (TransactionId *)
! malloc(MaxBackends * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
--- 506,512 ----
* First call for this snapshot
*/
snapshot->xip = (TransactionId *)
! malloc(arrayP->maxProcs * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
***************
*** 602,615 ****
/*
* BackendPidGetProc -- get a backend's PGPROC given its PID
*/
! struct PGPROC *
BackendPidGetProc(int pid)
{
PGPROC *result = NULL;
ProcArrayStruct *arrayP = procArray;
int index;
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
--- 652,672 ----
/*
* BackendPidGetProc -- get a backend's PGPROC given its PID
+ *
+ * Returns NULL if not found. Note that it is up to the caller to be
+ * sure that the question remains meaningful for long enough for the
+ * answer to be used ...
*/
! PGPROC *
BackendPidGetProc(int pid)
{
PGPROC *result = NULL;
ProcArrayStruct *arrayP = procArray;
int index;
+ if (pid == 0) /* never match dummy PGPROCs */
+ return NULL;
+
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (index = 0; index < arrayP->numProcs; index++)
***************
*** 642,651 ****
* active transactions. This is used as a heuristic to decide if
* a pre-XLOG-flush delay is worthwhile during commit.
*
! * An active transaction is something that has written at least one XLOG
! * record; read-only transactions don't count. Also, do not count backends
! * that are blocked waiting for locks, since they are not going to get to
! * run until someone else commits.
*/
int
CountActiveBackends(void)
--- 699,706 ----
* active transactions. This is used as a heuristic to decide if
* a pre-XLOG-flush delay is worthwhile during commit.
*
! * Do not count backends that are blocked waiting for locks, since they are
! * not going to get to run until someone else commits.
*/
int
CountActiveBackends(void)
***************
*** 656,662 ****
/*
* Note: for speed, we don't acquire ProcArrayLock. This is a little bit
! * bogus, but since we are only testing xrecoff for zero or nonzero,
* it should be OK. The result is only used for heuristic purposes
* anyway...
*/
--- 711,717 ----
/*
* Note: for speed, we don't acquire ProcArrayLock. This is a little bit
! * bogus, but since we are only testing fields for zero or nonzero,
* it should be OK. The result is only used for heuristic purposes
* anyway...
*/
***************
*** 666,672 ****
if (proc == MyProc)
continue; /* do not count myself */
! if (proc->logRec.xrecoff == 0)
continue; /* do not count if not in a transaction */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
--- 721,729 ----
if (proc == MyProc)
continue; /* do not count myself */
! if (proc->pid == 0)
! continue; /* do not count prepared xacts */
! if (proc->xid == InvalidTransactionId)
continue; /* do not count if not in a transaction */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
***************
*** 676,700 ****
return count;
}
- /*
- * CountEmptyBackendSlots - count empty slots in backend process table
- *
- * Acquiring the lock here is almost certainly overkill, but just in
- * case fetching an int is not atomic on your machine ...
- */
- int
- CountEmptyBackendSlots(void)
- {
- int count;
-
- LWLockAcquire(ProcArrayLock, LW_SHARED);
-
- count = procArray->maxProcs - procArray->numProcs;
-
- LWLockRelease(ProcArrayLock);
-
- return count;
- }
#define XidCacheRemove(i) \
do { \
--- 733,738 ----
*** src/backend/storage/lmgr/lmgr.c.orig Tue Jun 14 18:15:32 2005
--- src/backend/storage/lmgr/lmgr.c Thu Jun 16 15:17:57 2005
***************
*** 77,83 ****
* Create the lock table described by LockConflicts
*/
void
! InitLockTable(int maxBackends)
{
LOCKMETHODID LongTermTableId;
--- 77,83 ----
* Create the lock table described by LockConflicts
*/
void
! InitLockTable(void)
{
LOCKMETHODID LongTermTableId;
***************
*** 91,98 ****
/* number of lock modes is lengthof()-1 because of dummy zero */
LockTableId = LockMethodTableInit("LockTable",
LockConflicts,
! lengthof(LockConflicts) - 1,
! maxBackends);
if (!LockMethodIsValid(LockTableId))
elog(ERROR, "could not initialize lock table");
Assert(LockTableId == DEFAULT_LOCKMETHOD);
--- 91,97 ----
/* number of lock modes is lengthof()-1 because of dummy zero */
LockTableId = LockMethodTableInit("LockTable",
LockConflicts,
! lengthof(LockConflicts) - 1);
if (!LockMethodIsValid(LockTableId))
elog(ERROR, "could not initialize lock table");
Assert(LockTableId == DEFAULT_LOCKMETHOD);
*** src/backend/storage/lmgr/lock.c.orig Tue Jun 14 18:15:32 2005
--- src/backend/storage/lmgr/lock.c Fri Jun 17 16:11:42 2005
***************
*** 33,38 ****
--- 33,40 ----
#include
#include
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "storage/proc.h"
***************
*** 44,50 ****
/* This configuration variable is used to set the lock table size */
int max_locks_per_xact; /* set by guc.c */
! #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends))
/*
--- 46,60 ----
/* This configuration variable is used to set the lock table size */
int max_locks_per_xact; /* set by guc.c */
! #define NLOCKENTS() (max_locks_per_xact * (MaxBackends + max_prepared_xacts))
!
!
! /* Record that's written to 2PC state file when a lock is persisted */
! typedef struct TwoPhaseLockRecord
! {
! LOCKTAG locktag;
! LOCKMODE lockmode;
! } TwoPhaseLockRecord;
/*
***************
*** 168,175 ****
/*
! * InitLocks -- Init the lock module. Create a private data
! * structure for constructing conflict masks.
*/
void
InitLocks(void)
--- 178,184 ----
/*
! * InitLocks -- Init the lock module. Nothing to do here at present.
*/
void
InitLocks(void)
***************
*** 222,229 ****
LOCKMETHODID
LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP,
! int numModes,
! int maxBackends)
{
LockMethod newLockMethod;
LOCKMETHODID lockmethodid;
--- 231,237 ----
LOCKMETHODID
LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP,
! int numModes)
{
LockMethod newLockMethod;
LOCKMETHODID lockmethodid;
***************
*** 239,245 ****
numModes, MAX_LOCKMODES - 1);
/* Compute init/max size to request for lock hashtables */
! max_table_size = NLOCKENTS(maxBackends);
init_table_size = max_table_size / 2;
/* Allocate a string for the shmem index table lookups. */
--- 247,253 ----
numModes, MAX_LOCKMODES - 1);
/* Compute init/max size to request for lock hashtables */
! max_table_size = NLOCKENTS();
init_table_size = max_table_size / 2;
/* Allocate a string for the shmem index table lookups. */
***************
*** 1418,1427 ****
while (proclock)
{
bool wakeupNeeded = false;
! PROCLOCK *nextHolder;
/* Get link first, since we may unlink/delete this proclock */
! nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
--- 1426,1435 ----
while (proclock)
{
bool wakeupNeeded = false;
! PROCLOCK *nextplock;
/* Get link first, since we may unlink/delete this proclock */
! nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
***************
*** 1474,1480 ****
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
next_item:
! proclock = nextHolder;
}
LWLockRelease(masterLock);
--- 1482,1488 ----
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
next_item:
! proclock = nextplock;
}
LWLockRelease(masterLock);
***************
*** 1606,1618 ****
/*
* Estimate shared-memory space used for lock tables
*/
int
! LockShmemSize(int maxBackends)
{
int size = 0;
! long max_table_size = NLOCKENTS(maxBackends);
/* lock method headers */
size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
--- 1614,1874 ----
/*
+ * AtPrepare_Locks
+ * Do the preparatory work for a PREPARE: make 2PC state file records
+ * for all locks currently held.
+ *
+ * User locks are non-transactional and are therefore ignored.
+ *
+ * There are some special cases that we error out on: we can't be holding
+ * any session locks (should be OK since only VACUUM uses those) and we
+ * can't be holding any locks on temporary objects (since that would mess
+ * up the current backend if it tries to exit before the prepared xact is
+ * committed).
+ */
+ void
+ AtPrepare_Locks(void)
+ {
+ LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD;
+ HASH_SEQ_STATUS status;
+ LOCALLOCK *locallock;
+
+ /*
+ * We don't need to touch shared memory for this --- all the necessary
+ * state information is in the locallock table.
+ */
+ hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
+
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ TwoPhaseLockRecord record;
+ LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+ int i;
+
+ /* Ignore items that are not of the lockmethod to be processed */
+ if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+ continue;
+
+ /* Ignore it if we don't actually hold the lock */
+ if (locallock->nLocks <= 0)
+ continue;
+
+ /* Scan to verify there are no session locks */
+ for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ {
+ /* elog not ereport since this should not happen */
+ if (lockOwners[i].owner == NULL)
+ elog(ERROR, "cannot PREPARE when session locks exist");
+ }
+
+ /* Can't handle it if the lock is on a temporary object */
+ if (locallock->isTempObject)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
+
+ /*
+ * Create a 2PC record.
+ */
+ memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
+ record.lockmode = locallock->tag.mode;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
+ &record, sizeof(TwoPhaseLockRecord));
+ }
+ }
+
+ /*
+ * PostPrepare_Locks
+ * Clean up after successful PREPARE
+ *
+ * Here, we want to transfer ownership of our locks to a dummy PGPROC
+ * that's now associated with the prepared transaction, and we want to
+ * clean out the corresponding entries in the LOCALLOCK table.
+ *
+ * Note: by removing the LOCALLOCK entries, we are leaving dangling
+ * pointers in the transaction's resource owner. This is OK at the
+ * moment since resowner.c doesn't try to free locks retail at a toplevel
+ * transaction commit or abort. We could alternatively zero out nLocks
+ * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
+ * but that probably costs more cycles.
+ */
+ void
+ PostPrepare_Locks(TransactionId xid)
+ {
+ PGPROC *newproc = TwoPhaseGetDummyProc(xid);
+ LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD;
+ HASH_SEQ_STATUS status;
+ SHM_QUEUE *procLocks = &(MyProc->procLocks);
+ LWLockId masterLock;
+ LockMethod lockMethodTable;
+ int numLockModes;
+ LOCALLOCK *locallock;
+ PROCLOCK *proclock;
+ PROCLOCKTAG proclocktag;
+ bool found;
+ LOCK *lock;
+
+ /* This is a critical section: any error means big trouble */
+ START_CRIT_SECTION();
+
+ lockMethodTable = LockMethods[lockmethodid];
+ if (!lockMethodTable)
+ elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+
+ numLockModes = lockMethodTable->numLockModes;
+ masterLock = lockMethodTable->masterLock;
+
+ /*
+ * First we run through the locallock table and get rid of unwanted
+ * entries, then we scan the process's proclocks and transfer them
+ * to the target proc.
+ *
+ * We do this separately because we may have multiple locallock
+ * entries pointing to the same proclock, and we daren't end up with
+ * any dangling pointers.
+ */
+ hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
+
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ if (locallock->proclock == NULL || locallock->lock == NULL)
+ {
+ /*
+ * We must've run out of shared memory while trying to set up
+ * this lock. Just forget the local entry.
+ */
+ Assert(locallock->nLocks == 0);
+ RemoveLocalLock(locallock);
+ continue;
+ }
+
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+ continue;
+
+ /* We already checked there are no session locks */
+
+ /* Mark the proclock to show we need to release this lockmode */
+ if (locallock->nLocks > 0)
+ locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
+
+ /* And remove the locallock hashtable entry */
+ RemoveLocalLock(locallock);
+ }
+
+ LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
+
+ while (proclock)
+ {
+ PROCLOCK *nextplock;
+ LOCKMASK holdMask;
+ PROCLOCK *newproclock;
+
+ /* Get link first, since we may unlink/delete this proclock */
+ nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
+ offsetof(PROCLOCK, procLink));
+
+ Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
+
+ lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
+ goto next_item;
+
+ PROCLOCK_PRINT("PostPrepare_Locks", proclock);
+ LOCK_PRINT("PostPrepare_Locks", lock, 0);
+ Assert(lock->nRequested >= 0);
+ Assert(lock->nGranted >= 0);
+ Assert(lock->nGranted <= lock->nRequested);
+ Assert((proclock->holdMask & ~lock->grantMask) == 0);
+
+ /*
+ * Since there were no session locks, we should be releasing all locks
+ */
+ if (proclock->releaseMask != proclock->holdMask)
+ elog(PANIC, "we seem to have dropped a bit somewhere");
+
+ holdMask = proclock->holdMask;
+
+ /*
+ * We cannot simply modify proclock->tag.proc to reassign ownership
+ * of the lock, because that's part of the hash key and the proclock
+ * would then be in the wrong hash chain. So, unlink and delete the
+ * old proclock; create a new one with the right contents; and link
+ * it into place. We do it in this order to be certain we won't
+ * run out of shared memory (the way dynahash.c works, the deleted
+ * object is certain to be available for reallocation).
+ */
+ SHMQueueDelete(&proclock->lockLink);
+ SHMQueueDelete(&proclock->procLink);
+ if (!hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &(proclock->tag),
+ HASH_REMOVE, NULL))
+ elog(PANIC, "proclock table corrupted");
+
+ /*
+ * Create the hash key for the new proclock table.
+ */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));
+ proclocktag.lock = MAKE_OFFSET(lock);
+ proclocktag.proc = MAKE_OFFSET(newproc);
+
+ newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &proclocktag,
+ HASH_ENTER_NULL, &found);
+ if (!newproclock)
+ ereport(PANIC, /* should not happen */
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
+
+ /*
+ * If new, initialize the new entry
+ */
+ if (!found)
+ {
+ newproclock->holdMask = 0;
+ newproclock->releaseMask = 0;
+ /* Add new proclock to appropriate lists */
+ SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
+ SHMQueueInsertBefore(&newproc->procLocks, &newproclock->procLink);
+ PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
+ }
+ else
+ {
+ PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
+ Assert((newproclock->holdMask & ~lock->grantMask) == 0);
+ }
+
+ /*
+ * Pass over the identified lock ownership.
+ */
+ Assert((newproclock->holdMask & holdMask) == 0);
+ newproclock->holdMask |= holdMask;
+
+ next_item:
+ proclock = nextplock;
+ }
+
+ LWLockRelease(masterLock);
+
+ END_CRIT_SECTION();
+ }
+
+
+ /*
* Estimate shared-memory space used for lock tables
*/
int
! LockShmemSize(void)
{
int size = 0;
! long max_table_size = NLOCKENTS();
/* lock method headers */
size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
***************
*** 1704,1724 ****
#ifdef LOCK_DEBUG
/*
! * Dump all locks in the MyProc->procLocks list.
*
* Must have already acquired the masterLock.
*/
void
! DumpLocks(void)
{
- PGPROC *proc;
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
int lockmethodid = DEFAULT_LOCKMETHOD;
LockMethod lockMethodTable;
- proc = MyProc;
if (proc == NULL)
return;
--- 1960,1978 ----
#ifdef LOCK_DEBUG
/*
! * Dump all locks in the given proc's procLocks list.
*
* Must have already acquired the masterLock.
*/
void
! DumpLocks(PGPROC *proc)
{
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
int lockmethodid = DEFAULT_LOCKMETHOD;
LockMethod lockMethodTable;
if (proc == NULL)
return;
***************
*** 1793,1795 ****
--- 2047,2300 ----
}
#endif /* LOCK_DEBUG */
+
+ /*
+ * LOCK 2PC resource manager's routines
+ */
+
+ /*
+ * Re-acquire a lock belonging to a transaction that was prepared.
+ *
+ * Because this function is run at db startup, re-acquiring the locks should
+ * never conflict with running transactions because there are none. We
+ * assume that the lock state represented by the stored 2PC files is legal.
+ */
+ void
+ lock_twophase_recover(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+ {
+ TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
+ PGPROC *proc = TwoPhaseGetDummyProc(xid);
+ LOCKTAG *locktag;
+ LOCKMODE lockmode;
+ LOCKMETHODID lockmethodid;
+ LOCK *lock;
+ PROCLOCK *proclock;
+ PROCLOCKTAG proclocktag;
+ bool found;
+ LWLockId masterLock;
+ LockMethod lockMethodTable;
+
+ Assert(len == sizeof(TwoPhaseLockRecord));
+ locktag = &rec->locktag;
+ lockmode = rec->lockmode;
+ lockmethodid = locktag->locktag_lockmethodid;
+
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
+ if (!lockMethodTable)
+ elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+
+ masterLock = lockMethodTable->masterLock;
+
+ LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+ /*
+ * Find or create a lock with this tag.
+ */
+ lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+ (void *) locktag,
+ HASH_ENTER_NULL, &found);
+ if (!lock)
+ {
+ LWLockRelease(masterLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You may need to increase max_locks_per_transaction.")));
+ }
+
+ /*
+ * if it's a new lock object, initialize it
+ */
+ if (!found)
+ {
+ lock->grantMask = 0;
+ lock->waitMask = 0;
+ SHMQueueInit(&(lock->procLocks));
+ ProcQueueInit(&(lock->waitProcs));
+ lock->nRequested = 0;
+ lock->nGranted = 0;
+ MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
+ MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
+ LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
+ }
+ else
+ {
+ LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
+ Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+ Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
+ }
+
+ /*
+ * Create the hash key for the proclock table.
+ */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
+ proclocktag.lock = MAKE_OFFSET(lock);
+ proclocktag.proc = MAKE_OFFSET(proc);
+
+ /*
+ * Find or create a proclock entry with this tag
+ */
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &proclocktag,
+ HASH_ENTER_NULL, &found);
+ if (!proclock)
+ {
+ /* Ooops, not enough shmem for the proclock */
+ if (lock->nRequested == 0)
+ {
+ /*
+ * There are no other requestors of this lock, so garbage-collect
+ * the lock object. We *must* do this to avoid a permanent leak
+ * of shared memory, because there won't be anything to cause
+ * anyone to release the lock object later.
+ */
+ Assert(SHMQueueEmpty(&(lock->procLocks)));
+ if (!hash_search(LockMethodLockHash[lockmethodid],
+ (void *) &(lock->tag),
+ HASH_REMOVE, NULL))
+ elog(PANIC, "lock table corrupted");
+ }
+ LWLockRelease(masterLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You may need to increase max_locks_per_transaction.")));
+ }
+
+ /*
+ * If new, initialize the new entry
+ */
+ if (!found)
+ {
+ proclock->holdMask = 0;
+ proclock->releaseMask = 0;
+ /* Add proclock to appropriate lists */
+ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
+ SHMQueueInsertBefore(&proc->procLocks, &proclock->procLink);
+ PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
+ }
+ else
+ {
+ PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
+ Assert((proclock->holdMask & ~lock->grantMask) == 0);
+ }
+
+ /*
+ * lock->nRequested and lock->requested[] count the total number of
+ * requests, whether granted or waiting, so increment those
+ * immediately.
+ */
+ lock->nRequested++;
+ lock->requested[lockmode]++;
+ Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
+
+ /*
+ * We shouldn't already hold the desired lock.
+ */
+ if (proclock->holdMask & LOCKBIT_ON(lockmode))
+ elog(ERROR, "lock %s on object %u/%u/%u is already held",
+ lock_mode_names[lockmode],
+ lock->tag.locktag_field1, lock->tag.locktag_field2,
+ lock->tag.locktag_field3);
+
+ /*
+ * We ignore any possible conflicts and just grant ourselves the lock.
+ */
+ GrantLock(lock, proclock, lockmode);
+
+ LWLockRelease(masterLock);
+ }
+
+ /*
+ * 2PC processing routine for COMMIT PREPARED case.
+ *
+ * Find and release the lock indicated by the 2PC record.
+ */
+ void
+ lock_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+ {
+ TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
+ PGPROC *proc = TwoPhaseGetDummyProc(xid);
+ LOCKTAG *locktag;
+ LOCKMODE lockmode;
+ LOCKMETHODID lockmethodid;
+ PROCLOCKTAG proclocktag;
+ LOCK *lock;
+ PROCLOCK *proclock;
+ LWLockId masterLock;
+ LockMethod lockMethodTable;
+ bool wakeupNeeded;
+
+ Assert(len == sizeof(TwoPhaseLockRecord));
+ locktag = &rec->locktag;
+ lockmode = rec->lockmode;
+ lockmethodid = locktag->locktag_lockmethodid;
+
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
+ if (!lockMethodTable)
+ elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+
+ masterLock = lockMethodTable->masterLock;
+
+ LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+ /*
+ * Re-find the lock object (it had better be there).
+ */
+ lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+ (void *) locktag,
+ HASH_FIND, NULL);
+ if (!lock)
+ elog(PANIC, "failed to re-find shared lock object");
+
+ /*
+ * Re-find the proclock object (ditto).
+ */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
+ proclocktag.lock = MAKE_OFFSET(lock);
+ proclocktag.proc = MAKE_OFFSET(proc);
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &proclocktag,
+ HASH_FIND, NULL);
+ if (!proclock)
+ elog(PANIC, "failed to re-find shared proclock object");
+
+ /*
+ * Double-check that we are actually holding a lock of the type we
+ * want to release.
+ */
+ if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
+ {
+ PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
+ LWLockRelease(masterLock);
+ elog(WARNING, "you don't own a lock of type %s",
+ lock_mode_names[lockmode]);
+ return;
+ }
+
+ /*
+ * Do the releasing. CleanUpLock will waken any now-wakable waiters.
+ */
+ wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
+
+ CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
+
+ LWLockRelease(masterLock);
+ }
+
+ /*
+ * 2PC processing routine for ROLLBACK PREPARED case.
+ *
+ * This is actually just the same as the COMMIT case.
+ */
+ void
+ lock_twophase_postabort(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+ {
+ lock_twophase_postcommit(xid, info, recdata, len);
+ }
*** src/backend/storage/lmgr/proc.c.orig Tue Jun 14 18:15:32 2005
--- src/backend/storage/lmgr/proc.c Thu Jun 16 15:29:18 2005
***************
*** 92,104 ****
* Report shared-memory space needed by InitProcGlobal.
*/
int
! ProcGlobalShmemSize(int maxBackends)
{
int size = 0;
size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
size += MAXALIGN(NUM_DUMMY_PROCS * sizeof(PGPROC)); /* DummyProcs */
! size += MAXALIGN(maxBackends * sizeof(PGPROC)); /* MyProcs */
size += MAXALIGN(sizeof(slock_t)); /* ProcStructLock */
return size;
--- 92,104 ----
* Report shared-memory space needed by InitProcGlobal.
*/
int
! ProcGlobalShmemSize(void)
{
int size = 0;
size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
size += MAXALIGN(NUM_DUMMY_PROCS * sizeof(PGPROC)); /* DummyProcs */
! size += MAXALIGN(MaxBackends * sizeof(PGPROC)); /* MyProcs */
size += MAXALIGN(sizeof(slock_t)); /* ProcStructLock */
return size;
***************
*** 108,117 ****
* Report number of semaphores needed by InitProcGlobal.
*/
int
! ProcGlobalSemas(int maxBackends)
{
/* We need a sema per backend, plus one for each dummy process. */
! return maxBackends + NUM_DUMMY_PROCS;
}
/*
--- 108,117 ----
* Report number of semaphores needed by InitProcGlobal.
*/
int
! ProcGlobalSemas(void)
{
/* We need a sema per backend, plus one for each dummy process. */
! return MaxBackends + NUM_DUMMY_PROCS;
}
/*
***************
*** 134,140 ****
* postmaster, not in backends.
*/
void
! InitProcGlobal(int maxBackends)
{
bool foundProcGlobal,
foundDummy;
--- 134,140 ----
* postmaster, not in backends.
*/
void
! InitProcGlobal(void)
{
bool foundProcGlobal,
foundDummy;
***************
*** 170,182 ****
* Pre-create the PGPROC structures and create a semaphore for
* each.
*/
! procs = (PGPROC *) ShmemAlloc(maxBackends * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
! MemSet(procs, 0, maxBackends * sizeof(PGPROC));
! for (i = 0; i < maxBackends; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
--- 170,182 ----
* Pre-create the PGPROC structures and create a semaphore for
* each.
*/
! procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
! MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
! for (i = 0; i < MaxBackends; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
***************
*** 254,260 ****
MyProc->xmin = InvalidTransactionId;
MyProc->pid = MyProcPid;
MyProc->databaseId = MyDatabaseId;
- MyProc->logRec.xrecoff = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
--- 254,259 ----
***************
*** 265,271 ****
/*
* Add our PGPROC to the PGPROC array in shared memory.
*/
! ProcArrayAddMyself();
/*
* Arrange to clean up at backend exit.
--- 264,270 ----
/*
* Add our PGPROC to the PGPROC array in shared memory.
*/
! ProcArrayAdd(MyProc);
/*
* Arrange to clean up at backend exit.
***************
*** 332,338 ****
MyProc->xid = InvalidTransactionId;
MyProc->xmin = InvalidTransactionId;
MyProc->databaseId = MyDatabaseId;
- MyProc->logRec.xrecoff = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
--- 331,336 ----
***************
*** 353,358 ****
--- 351,385 ----
}
/*
+ * Check whether there are at least N free PGPROC objects.
+ *
+ * Note: this is designed on the assumption that N will generally be small.
+ */
+ bool
+ HaveNFreeProcs(int n)
+ {
+ SHMEM_OFFSET offset;
+ PGPROC *proc;
+ /* use volatile pointer to prevent code rearrangement */
+ volatile PROC_HDR *procglobal = ProcGlobal;
+
+ SpinLockAcquire(ProcStructLock);
+
+ offset = procglobal->freeProcs;
+
+ while (n > 0 && offset != INVALID_OFFSET)
+ {
+ proc = (PGPROC *) MAKE_PTR(offset);
+ offset = proc->links.next;
+ n--;
+ }
+
+ SpinLockRelease(ProcStructLock);
+
+ return (n <= 0);
+ }
+
+ /*
* Cancel any pending wait for lock, when aborting a transaction.
*
* Returns true if we had been waiting for a lock, else false.
***************
*** 478,484 ****
#endif
/* Remove our PGPROC from the PGPROC array in shared memory */
! ProcArrayRemoveMyself();
SpinLockAcquire(ProcStructLock);
--- 505,511 ----
#endif
/* Remove our PGPROC from the PGPROC array in shared memory */
! ProcArrayRemove(MyProc);
SpinLockAcquire(ProcStructLock);
*** src/backend/storage/smgr/smgr.c.orig Mon Jun 6 16:22:58 2005
--- src/backend/storage/smgr/smgr.c Fri Jun 17 16:31:23 2005
***************
*** 434,440 ****
* during transactional operations, since it can't be undone.
*
* If isRedo is true, it is okay for the underlying file to be gone
! * already. (In practice isRedo will always be true.)
*
* This also implies smgrclose() on the SMgrRelation object.
*/
--- 434,440 ----
* during transactional operations, since it can't be undone.
*
* If isRedo is true, it is okay for the underlying file to be gone
! * already.
*
* This also implies smgrclose() on the SMgrRelation object.
*/
***************
*** 676,681 ****
--- 676,705 ----
reln->smgr_rnode.dbNode,
reln->smgr_rnode.relNode)));
}
+
+
+ /*
+ * PostPrepare_smgr -- Clean up after a successful PREPARE
+ *
+ * What we have to do here is throw away the in-memory state about pending
+ * relation deletes. It's all been recorded in the 2PC state file and
+ * it's no longer smgr's job to worry about it.
+ */
+ void
+ PostPrepare_smgr(void)
+ {
+ PendingRelDelete *pending;
+ PendingRelDelete *next;
+
+ for (pending = pendingDeletes; pending != NULL; pending = next)
+ {
+ next = pending->next;
+ pendingDeletes = next;
+ /* must explicitly free the list entry */
+ pfree(pending);
+ }
+ }
+
/*
* smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
*** src/backend/tcop/postgres.c.orig Tue Jun 14 18:17:30 2005
--- src/backend/tcop/postgres.c Thu Jun 16 15:17:43 2005
***************
*** 930,935 ****
--- 930,936 ----
TransactionStmt *stmt = (TransactionStmt *) parsetree;
if (stmt->kind == TRANS_STMT_COMMIT ||
+ stmt->kind == TRANS_STMT_PREPARE ||
stmt->kind == TRANS_STMT_ROLLBACK ||
stmt->kind == TRANS_STMT_ROLLBACK_TO)
allowit = true;
***************
*** 1261,1266 ****
--- 1262,1268 ----
TransactionStmt *stmt = (TransactionStmt *) parsetree;
if (stmt->kind == TRANS_STMT_COMMIT ||
+ stmt->kind == TRANS_STMT_PREPARE ||
stmt->kind == TRANS_STMT_ROLLBACK ||
stmt->kind == TRANS_STMT_ROLLBACK_TO)
allowit = true;
***************
*** 1751,1756 ****
--- 1753,1759 ----
is_trans_stmt = true;
if (stmt->kind == TRANS_STMT_COMMIT ||
+ stmt->kind == TRANS_STMT_PREPARE ||
stmt->kind == TRANS_STMT_ROLLBACK ||
stmt->kind == TRANS_STMT_ROLLBACK_TO)
is_trans_exit = true;
*** src/backend/tcop/utility.c.orig Thu Apr 28 17:47:15 2005
--- src/backend/tcop/utility.c Fri Jun 17 12:38:26 2005
***************
*** 17,22 ****
--- 17,23 ----
#include "postgres.h"
#include "access/heapam.h"
+ #include "access/twophase.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_shadow.h"
***************
*** 383,393 ****
if (strcmp(item->defname, "transaction_isolation") == 0)
SetPGVariable("transaction_isolation",
list_make1(item->arg),
! false);
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("transaction_read_only",
list_make1(item->arg),
! false);
}
}
break;
--- 384,394 ----
if (strcmp(item->defname, "transaction_isolation") == 0)
SetPGVariable("transaction_isolation",
list_make1(item->arg),
! true);
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("transaction_read_only",
list_make1(item->arg),
! true);
}
}
break;
***************
*** 401,406 ****
--- 402,426 ----
}
break;
+ case TRANS_STMT_PREPARE:
+ if (!PrepareTransactionBlock(stmt->gid))
+ {
+ /* report unsuccessful commit in completionTag */
+ if (completionTag)
+ strcpy(completionTag, "ROLLBACK");
+ }
+ break;
+
+ case TRANS_STMT_COMMIT_PREPARED:
+ PreventTransactionChain(stmt, "COMMIT PREPARED");
+ FinishPreparedTransaction(stmt->gid, true);
+ break;
+
+ case TRANS_STMT_ROLLBACK_PREPARED:
+ PreventTransactionChain(stmt, "ROLLBACK PREPARED");
+ FinishPreparedTransaction(stmt->gid, false);
+ break;
+
case TRANS_STMT_ROLLBACK:
UserAbortTransactionBlock();
break;
***************
*** 1213,1218 ****
--- 1233,1250 ----
case TRANS_STMT_RELEASE:
tag = "RELEASE";
+ break;
+
+ case TRANS_STMT_PREPARE:
+ tag = "PREPARE TRANSACTION";
+ break;
+
+ case TRANS_STMT_COMMIT_PREPARED:
+ tag = "COMMIT PREPARED";
+ break;
+
+ case TRANS_STMT_ROLLBACK_PREPARED:
+ tag = "ROLLBACK PREPARED";
break;
default:
*** src/backend/utils/cache/inval.c.orig Wed Apr 13 21:38:19 2005
--- src/backend/utils/cache/inval.c Thu Jun 16 16:16:20 2005
***************
*** 86,91 ****
--- 86,92 ----
*/
#include "postgres.h"
+ #include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "miscadmin.h"
***************
*** 171,176 ****
--- 172,184 ----
static int cache_callback_count = 0;
+ /* info values for 2PC callback */
+ #define TWOPHASE_INFO_MSG 0 /* SharedInvalidationMessage */
+ #define TWOPHASE_INFO_FILE_BEFORE 1 /* relcache file inval */
+ #define TWOPHASE_INFO_FILE_AFTER 2 /* relcache file inval */
+
+ static void PersistInvalidationMessage(SharedInvalidationMessage *msg);
+
/* ----------------------------------------------------------------
* Invalidation list support functions
***************
*** 637,642 ****
--- 645,700 ----
}
/*
+ * AtPrepare_Inval
+ * Save the inval lists state at 2PC transaction prepare.
+ *
+ * In this phase we just generate 2PC records for all the pending invalidation
+ * work.
+ */
+ void
+ AtPrepare_Inval(void)
+ {
+ /* Must be at top of stack */
+ Assert(transInvalInfo != NULL && transInvalInfo->parent == NULL);
+
+ /*
+ * Relcache init file invalidation requires processing both before
+ * and after we send the SI messages.
+ */
+ if (transInvalInfo->RelcacheInitFileInval)
+ RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_BEFORE,
+ NULL, 0);
+
+ AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
+ &transInvalInfo->CurrentCmdInvalidMsgs);
+
+ ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
+ PersistInvalidationMessage);
+
+ if (transInvalInfo->RelcacheInitFileInval)
+ RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_AFTER,
+ NULL, 0);
+ }
+
+ /*
+ * PostPrepare_Inval
+ * Clean up after successful PREPARE.
+ *
+ * Here, we want to act as though the transaction aborted, so that we will
+ * undo any syscache changes it made, thereby bringing us into sync with the
+ * outside world, which doesn't believe the transaction committed yet.
+ *
+ * If the prepared transaction is later aborted, there is nothing more to
+ * do; if it commits, we will receive the consequent inval messages just
+ * like everyone else.
+ */
+ void
+ PostPrepare_Inval(void)
+ {
+ AtEOXact_Inval(false);
+ }
+
+ /*
* AtSubStart_Inval
* Initialize inval lists at start of a subtransaction.
*/
***************
*** 653,658 ****
--- 711,757 ----
myInfo->my_level = GetCurrentTransactionNestLevel();
transInvalInfo = myInfo;
}
+
+ /*
+ * PersistInvalidationMessage
+ * Write an invalidation message to the 2PC state file.
+ */
+ static void
+ PersistInvalidationMessage(SharedInvalidationMessage *msg)
+ {
+ RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_MSG,
+ msg, sizeof(SharedInvalidationMessage));
+ }
+
+ /*
+ * inval_twophase_postcommit
+ * Process an invalidation message from the 2PC state file.
+ */
+ void
+ inval_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+ {
+ SharedInvalidationMessage *msg;
+
+ switch (info)
+ {
+ case TWOPHASE_INFO_MSG:
+ msg = (SharedInvalidationMessage *) recdata;
+ Assert(len == sizeof(SharedInvalidationMessage));
+ SendSharedInvalidMessage(msg);
+ break;
+ case TWOPHASE_INFO_FILE_BEFORE:
+ RelationCacheInitFileInvalidate(true);
+ break;
+ case TWOPHASE_INFO_FILE_AFTER:
+ RelationCacheInitFileInvalidate(false);
+ break;
+ default:
+ Assert(false);
+ break;
+ }
+ }
+
/*
* AtEOXact_Inval
*** src/backend/utils/init/flatfiles.c.orig Mon Jun 6 13:01:24 2005
--- src/backend/utils/init/flatfiles.c Fri Jun 17 16:11:56 2005
***************
*** 32,37 ****
--- 32,38 ----
#include
#include "access/heapam.h"
+ #include "access/twophase_rmgr.h"
#include "catalog/pg_database.h"
#include "catalog/pg_group.h"
#include "catalog/pg_namespace.h"
***************
*** 48,57 ****
--- 49,64 ----
#include "utils/syscache.h"
+ /* Actual names of the flat files (within $PGDATA/global/) */
#define DATABASE_FLAT_FILE "pg_database"
#define GROUP_FLAT_FILE "pg_group"
#define USER_FLAT_FILE "pg_pwd"
+ /* Info bits in a flatfiles 2PC record */
+ #define FF_BIT_DATABASE 1
+ #define FF_BIT_GROUP 2
+ #define FF_BIT_USER 4
+
/*
* The need-to-update-files flags are SubTransactionIds that show
***************
*** 757,762 ****
--- 764,806 ----
SendPostmasterSignal(PMSIGNAL_PASSWORD_CHANGE);
}
+
+ /*
+ * This routine is called during transaction prepare.
+ *
+ * Record which files need to be refreshed if this transaction later
+ * commits.
+ *
+ * Note: it's OK to clear the flags immediately, since if the PREPARE fails
+ * further on, we'd only reset the flags anyway. So there's no need for a
+ * separate PostPrepare call.
+ */
+ void
+ AtPrepare_UpdateFlatFiles(void)
+ {
+ uint16 info = 0;
+
+ if (database_file_update_subid != InvalidSubTransactionId)
+ {
+ database_file_update_subid = InvalidSubTransactionId;
+ info |= FF_BIT_DATABASE;
+ }
+ if (group_file_update_subid != InvalidSubTransactionId)
+ {
+ group_file_update_subid = InvalidSubTransactionId;
+ info |= FF_BIT_GROUP;
+ }
+ if (user_file_update_subid != InvalidSubTransactionId)
+ {
+ user_file_update_subid = InvalidSubTransactionId;
+ info |= FF_BIT_USER;
+ }
+ if (info != 0)
+ RegisterTwoPhaseRecord(TWOPHASE_RM_FLATFILES_ID, info,
+ NULL, 0);
+ }
+
+
/*
* AtEOSubXact_UpdateFlatFiles
*
***************
*** 830,833 ****
--- 874,902 ----
}
return PointerGetDatum(NULL);
+ }
+
+
+ /*
+ * 2PC processing routine for COMMIT PREPARED case.
+ *
+ * (We don't have to do anything for ROLLBACK PREPARED.)
+ */
+ void
+ flatfile_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len)
+ {
+ /*
+ * Set flags to do the needed file updates at the end of my own
+ * current transaction. (XXX this has some issues if my own
+ * transaction later rolls back, or if there is any significant
+ * delay before I commit. OK for now because we disallow
+ * COMMIT PREPARED inside a transaction block.)
+ */
+ if (info & FF_BIT_DATABASE)
+ database_file_update_needed();
+ if (info & FF_BIT_GROUP)
+ group_file_update_needed();
+ if (info & FF_BIT_USER)
+ user_file_update_needed();
}
*** src/backend/utils/init/postinit.c.orig Thu May 19 17:35:47 2005
--- src/backend/utils/init/postinit.c Thu Jun 16 15:17:31 2005
***************
*** 232,238 ****
* We're running a postgres bootstrap process or a standalone
* backend. Create private "shmem" and semaphores.
*/
! CreateSharedMemoryAndSemaphores(true, MaxBackends, 0);
}
}
--- 232,238 ----
* We're running a postgres bootstrap process or a standalone
* backend. Create private "shmem" and semaphores.
*/
! CreateSharedMemoryAndSemaphores(true, 0);
}
}
***************
*** 456,462 ****
*/
if (!am_superuser &&
ReservedBackends > 0 &&
! CountEmptyBackendSlots() < ReservedBackends)
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("connection limit exceeded for non-superusers")));
--- 456,462 ----
*/
if (!am_superuser &&
ReservedBackends > 0 &&
! !HaveNFreeProcs(ReservedBackends))
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("connection limit exceeded for non-superusers")));
*** src/backend/utils/misc/guc.c.orig Tue Jun 14 18:17:31 2005
--- src/backend/utils/misc/guc.c Fri Jun 17 17:21:52 2005
***************
*** 25,30 ****
--- 25,31 ----
#include "utils/guc.h"
#include "utils/guc_tables.h"
+ #include "access/twophase.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "commands/async.h"
***************
*** 1102,1107 ****
--- 1103,1117 ----
1000, 25, INT_MAX, NULL, NULL
},
+ {
+ {"max_prepared_transactions", PGC_POSTMASTER, RESOURCES,
+ gettext_noop("Sets the maximum number of simultaneously prepared transactions."),
+ NULL
+ },
+ &max_prepared_xacts,
+ 50, 0, 10000, NULL, NULL
+ },
+
#ifdef LOCK_DEBUG
{
{"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS,
*** src/backend/utils/misc/postgresql.conf.sample.orig Fri Jun 10 16:38:56 2005
--- src/backend/utils/misc/postgresql.conf.sample Fri Jun 17 17:21:53 2005
***************
*** 79,84 ****
--- 79,85 ----
#shared_buffers = 1000 # min 16, at least max_connections*2, 8KB each
#temp_buffers = 1000 # min 100, 8KB each
+ #max_prepared_transactions = 50 # 0-10000
#work_mem = 1024 # min 64, size in KB
#maintenance_work_mem = 16384 # min 1024, size in KB
#max_stack_depth = 2048 # min 100, size in KB
*** src/backend/utils/mmgr/portalmem.c.orig Sun May 29 00:23:06 2005
--- src/backend/utils/mmgr/portalmem.c Fri Jun 17 11:08:40 2005
***************
*** 467,472 ****
--- 467,514 ----
}
/*
+ * Pre-prepare processing for portals.
+ *
+ * Currently we refuse PREPARE if the transaction created any holdable
+ * cursors, since it's quite unclear what to do with one. However, this
+ * has the same API as CommitHoldablePortals and is invoked in the same
+ * way by xact.c, so that we can easily do something reasonable if anyone
+ * comes up with something reasonable to do.
+ *
+ * Returns TRUE if any holdable cursors were processed, FALSE if not.
+ */
+ bool
+ PrepareHoldablePortals(void)
+ {
+ bool result = false;
+ HASH_SEQ_STATUS status;
+ PortalHashEnt *hentry;
+
+ hash_seq_init(&status, PortalHashTable);
+
+ while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
+ {
+ Portal portal = hentry->portal;
+
+ /* Is it a holdable portal created in the current xact? */
+ if ((portal->cursorOptions & CURSOR_OPT_HOLD) &&
+ portal->createSubid != InvalidSubTransactionId &&
+ portal->status == PORTAL_READY)
+ {
+ /*
+ * We are exiting the transaction that created a holdable
+ * cursor. Can't do PREPARE.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has created a cursor WITH HOLD")));
+ }
+ }
+
+ return result;
+ }
+
+ /*
* Pre-commit processing for portals.
*
* Remove all non-holdable portals created in this transaction.
*** src/bin/initdb/initdb.c.orig Sat Apr 30 11:56:59 2005
--- src/bin/initdb/initdb.c Thu Jun 16 15:17:18 2005
***************
*** 2124,2129 ****
--- 2124,2130 ----
"pg_xlog/archive_status",
"pg_clog",
"pg_subtrans",
+ "pg_twophase",
"pg_multixact/members",
"pg_multixact/offsets",
"base",
*** src/bin/psql/common.c.orig Tue Jun 14 11:11:29 2005
--- src/bin/psql/common.c Fri Jun 17 12:44:27 2005
***************
*** 1216,1221 ****
--- 1216,1236 ----
return true;
if (wordlen == 8 && pg_strncasecmp(query, "rollback", 8) == 0)
return true;
+ if (wordlen == 7 && pg_strncasecmp(query, "prepare", 7) == 0)
+ {
+ /* PREPARE TRANSACTION is a TC command, PREPARE foo is not */
+ query += wordlen;
+
+ query = skip_white_space(query);
+
+ wordlen = 0;
+ while (isalpha((unsigned char) query[wordlen]))
+ wordlen += PQmblen(&query[wordlen], pset.encoding);
+
+ if (wordlen == 11 && pg_strncasecmp(query, "transaction", 11) == 0)
+ return true;
+ return false;
+ }
/*
* Commands not allowed within transactions. The statements checked
*** src/include/access/subtrans.h.orig Fri Dec 31 17:46:40 2004
--- src/include/access/subtrans.h Fri Jun 17 14:00:36 2005
***************
*** 18,24 ****
extern int SUBTRANSShmemSize(void);
extern void SUBTRANSShmemInit(void);
extern void BootStrapSUBTRANS(void);
! extern void StartupSUBTRANS(void);
extern void ShutdownSUBTRANS(void);
extern void CheckPointSUBTRANS(void);
extern void ExtendSUBTRANS(TransactionId newestXact);
--- 18,24 ----
extern int SUBTRANSShmemSize(void);
extern void SUBTRANSShmemInit(void);
extern void BootStrapSUBTRANS(void);
! extern void StartupSUBTRANS(TransactionId oldestActiveXID);
extern void ShutdownSUBTRANS(void);
extern void CheckPointSUBTRANS(void);
extern void ExtendSUBTRANS(TransactionId newestXact);
*** src/include/access/twophase.h.orig Thu Jun 16 15:16:55 2005
--- src/include/access/twophase.h Fri Jun 17 14:00:37 2005
***************
*** 0 ****
--- 1,49 ----
+ /*-------------------------------------------------------------------------
+ *
+ * twophase.h
+ * Two-phase-commit related declarations.
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef TWOPHASE_H
+ #define TWOPHASE_H
+
+ #include "storage/lock.h"
+
+
+ /*
+ * GlobalTransactionData is defined in twophase.c; other places have no
+ * business knowing the internal definition.
+ */
+ typedef struct GlobalTransactionData *GlobalTransaction;
+
+ /* GUC variable */
+ extern int max_prepared_xacts;
+
+ extern int TwoPhaseShmemSize(void);
+ extern void TwoPhaseShmemInit(void);
+
+ extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid);
+
+ extern GlobalTransaction MarkAsPreparing(TransactionId xid, Oid databaseid,
+ char *gid, AclId owner);
+ extern void MarkAsPrepared(GlobalTransaction gxact);
+
+ extern void StartPrepare(GlobalTransaction gxact);
+ extern void EndPrepare(GlobalTransaction gxact);
+
+ extern TransactionId PrescanPreparedTransactions(void);
+ extern void RecoverPreparedTransactions(void);
+
+ extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
+ extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
+
+ extern void FinishPreparedTransaction(char *gid, bool isCommit);
+
+ #endif /* TWOPHASE_H */
*** src/include/access/twophase_rmgr.h.orig Thu Jun 16 15:16:55 2005
--- src/include/access/twophase_rmgr.h Thu Jun 16 16:01:11 2005
***************
*** 0 ****
--- 1,39 ----
+ /*-------------------------------------------------------------------------
+ *
+ * twophase_rmgr.h
+ * Two-phase-commit resource managers definition
+ *
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef TWOPHASE_RMGR_H
+ #define TWOPHASE_RMGR_H
+
+ typedef void (*TwoPhaseCallback) (TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
+ typedef uint8 TwoPhaseRmgrId;
+
+ /*
+ * Built-in resource managers
+ */
+ #define TWOPHASE_RM_END_ID 0
+ #define TWOPHASE_RM_LOCK_ID 1
+ #define TWOPHASE_RM_INVAL_ID 2
+ #define TWOPHASE_RM_FLATFILES_ID 3
+ #define TWOPHASE_RM_NOTIFY_ID 4
+ #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_NOTIFY_ID
+
+ extern const TwoPhaseCallback twophase_recover_callbacks[];
+ extern const TwoPhaseCallback twophase_postcommit_callbacks[];
+ extern const TwoPhaseCallback twophase_postabort_callbacks[];
+
+
+ extern void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
+ const void *data, uint32 len);
+
+ #endif /* TWOPHASE_RMGR_H */
*** src/include/access/xact.h.orig Mon Jun 6 13:01:24 2005
--- src/include/access/xact.h Fri Jun 17 12:40:27 2005
***************
*** 47,53 ****
typedef enum
{
XACT_EVENT_COMMIT,
! XACT_EVENT_ABORT
} XactEvent;
typedef void (*XactCallback) (XactEvent event, void *arg);
--- 47,54 ----
typedef enum
{
XACT_EVENT_COMMIT,
! XACT_EVENT_ABORT,
! XACT_EVENT_PREPARE
} XactEvent;
typedef void (*XactCallback) (XactEvent event, void *arg);
***************
*** 72,79 ****
* XLOG allows to store some information in high 4 bits of log
* record xl_info field
*/
! #define XLOG_XACT_COMMIT 0x00
! #define XLOG_XACT_ABORT 0x20
typedef struct xl_xact_commit
{
--- 73,83 ----
* XLOG allows to store some information in high 4 bits of log
* record xl_info field
*/
! #define XLOG_XACT_COMMIT 0x00
! #define XLOG_XACT_PREPARE 0x10
! #define XLOG_XACT_ABORT 0x20
! #define XLOG_XACT_COMMIT_PREPARED 0x30
! #define XLOG_XACT_ABORT_PREPARED 0x40
typedef struct xl_xact_commit
{
***************
*** 99,104 ****
--- 103,133 ----
#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
+ /*
+ * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
+ * except that we have to store the XID of the prepared transaction explicitly
+ * --- the XID in the record header will be for the transaction doing the
+ * COMMIT PREPARED or ABORT PREPARED command.
+ */
+
+ typedef struct xl_xact_commit_prepared
+ {
+ TransactionId xid; /* XID of prepared xact */
+ xl_xact_commit crec; /* COMMIT record */
+ /* MORE DATA FOLLOWS AT END OF STRUCT */
+ } xl_xact_commit_prepared;
+
+ #define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
+
+ typedef struct xl_xact_abort_prepared
+ {
+ TransactionId xid; /* XID of prepared xact */
+ xl_xact_abort arec; /* ABORT record */
+ /* MORE DATA FOLLOWS AT END OF STRUCT */
+ } xl_xact_abort_prepared;
+
+ #define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
+
/* ----------------
* extern definitions
***************
*** 121,126 ****
--- 150,156 ----
extern void AbortCurrentTransaction(void);
extern void BeginTransactionBlock(void);
extern bool EndTransactionBlock(void);
+ extern bool PrepareTransactionBlock(char *gid);
extern void UserAbortTransactionBlock(void);
extern void ReleaseSavepoint(List *options);
extern void DefineSavepoint(char *name);
*** src/include/catalog/catversion.h.orig Fri Jun 17 14:54:17 2005
--- src/include/catalog/catversion.h Fri Jun 17 15:26:00 2005
***************
*** 53,58 ****
*/
/* yyyymmddN */
! #define CATALOG_VERSION_NO 200506151
#endif
--- 53,58 ----
*/
/* yyyymmddN */
! #define CATALOG_VERSION_NO 200506171
#endif
*** src/include/catalog/pg_proc.h.orig Tue Jun 14 18:17:37 2005
--- src/include/catalog/pg_proc.h Thu Jun 16 15:16:32 2005
***************
*** 3005,3010 ****
--- 3005,3012 ----
DESCR("SHOW ALL as a function");
DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 f f t t v 0 2249 "" _null_ _null_ _null_ pg_lock_status - _null_ ));
DESCR("view system lock information");
+ DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 f f t t v 0 2249 "" _null_ _null_ _null_ pg_prepared_xact - _null_ ));
+ DESCR("view two-phase transactions");
DATA(insert OID = 2079 ( pg_table_is_visible PGNSP PGUID 12 f f t f s 1 16 "26" _null_ _null_ _null_ pg_table_is_visible - _null_ ));
DESCR("is table visible in search path?");
*** src/include/commands/async.h.orig Fri Dec 31 17:46:46 2004
--- src/include/commands/async.h Thu Jun 16 16:01:05 2005
***************
*** 26,31 ****
--- 26,32 ----
extern void AtSubStart_Notify(void);
extern void AtSubCommit_Notify(void);
extern void AtSubAbort_Notify(void);
+ extern void AtPrepare_Notify(void);
/* signal handler for inbound notifies (SIGUSR2) */
extern void NotifyInterruptHandler(SIGNAL_ARGS);
***************
*** 37,41 ****
--- 38,45 ----
*/
extern void EnableNotifyInterrupt(void);
extern bool DisableNotifyInterrupt(void);
+
+ extern void notify_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
#endif /* ASYNC_H */
*** src/include/nodes/parsenodes.h.orig Sun Jun 5 18:32:57 2005
--- src/include/nodes/parsenodes.h Thu Jun 16 15:16:23 2005
***************
*** 1556,1562 ****
TRANS_STMT_ROLLBACK,
TRANS_STMT_SAVEPOINT,
TRANS_STMT_RELEASE,
! TRANS_STMT_ROLLBACK_TO
} TransactionStmtKind;
typedef struct TransactionStmt
--- 1556,1565 ----
TRANS_STMT_ROLLBACK,
TRANS_STMT_SAVEPOINT,
TRANS_STMT_RELEASE,
! TRANS_STMT_ROLLBACK_TO,
! TRANS_STMT_PREPARE,
! TRANS_STMT_COMMIT_PREPARED,
! TRANS_STMT_ROLLBACK_PREPARED
} TransactionStmtKind;
typedef struct TransactionStmt
***************
*** 1564,1569 ****
--- 1567,1573 ----
NodeTag type;
TransactionStmtKind kind; /* see above */
List *options; /* for BEGIN/START and savepoint commands */
+ char *gid; /* for two-phase-commit related commands */
} TransactionStmt;
/* ----------------------
*** src/include/storage/ipc.h.orig Fri Dec 31 17:47:03 2004
--- src/include/storage/ipc.h Thu Jun 16 15:16:11 2005
***************
*** 29,36 ****
extern void on_exit_reset(void);
/* ipci.c */
! extern void CreateSharedMemoryAndSemaphores(bool makePrivate,
! int maxBackends,
! int port);
#endif /* IPC_H */
--- 29,34 ----
extern void on_exit_reset(void);
/* ipci.c */
! extern void CreateSharedMemoryAndSemaphores(bool makePrivate, int port);
#endif /* IPC_H */
*** src/include/storage/lmgr.h.orig Tue Jun 14 18:15:33 2005
--- src/include/storage/lmgr.h Thu Jun 16 15:16:11 2005
***************
*** 41,47 ****
* so increase that if you want to add more modes.
*/
! extern void InitLockTable(int maxBackends);
extern void RelationInitLockInfo(Relation relation);
/* Lock a relation */
--- 41,47 ----
* so increase that if you want to add more modes.
*/
! extern void InitLockTable(void);
extern void RelationInitLockInfo(Relation relation);
/* Lock a relation */
*** src/include/storage/lock.h.orig Tue Jun 14 18:15:33 2005
--- src/include/storage/lock.h Thu Jun 16 16:01:00 2005
***************
*** 370,376 ****
extern LockMethod GetLocksMethodTable(LOCK *lock);
extern LOCKMETHODID LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP,
! int numModes, int maxBackends);
extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid);
extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid,
LOCKTAG *locktag,
--- 370,376 ----
extern LockMethod GetLocksMethodTable(LOCK *lock);
extern LOCKMETHODID LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP,
! int numModes);
extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid);
extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid,
LOCKTAG *locktag,
***************
*** 383,395 ****
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
extern void LockReleaseCurrentOwner(void);
extern void LockReassignCurrentOwner(void);
extern int LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock, PROCLOCK *proclock, PGPROC *proc);
extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
extern void GrantAwaitedLock(void);
extern void RemoveFromWaitQueue(PGPROC *proc);
! extern int LockShmemSize(int maxBackends);
extern bool DeadLockCheck(PGPROC *proc);
extern void DeadLockReport(void);
extern void RememberSimpleDeadLock(PGPROC *proc1,
--- 383,397 ----
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
extern void LockReleaseCurrentOwner(void);
extern void LockReassignCurrentOwner(void);
+ extern void AtPrepare_Locks(void);
+ extern void PostPrepare_Locks(TransactionId xid);
extern int LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock, PROCLOCK *proclock, PGPROC *proc);
extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
extern void GrantAwaitedLock(void);
extern void RemoveFromWaitQueue(PGPROC *proc);
! extern int LockShmemSize(void);
extern bool DeadLockCheck(PGPROC *proc);
extern void DeadLockReport(void);
extern void RememberSimpleDeadLock(PGPROC *proc1,
***************
*** 400,407 ****
extern LockData *GetLockStatusData(void);
extern const char *GetLockmodeName(LOCKMODE mode);
#ifdef LOCK_DEBUG
! extern void DumpLocks(void);
extern void DumpAllLocks(void);
#endif
--- 402,416 ----
extern LockData *GetLockStatusData(void);
extern const char *GetLockmodeName(LOCKMODE mode);
+ extern void lock_twophase_recover(TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
+ extern void lock_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
+ extern void lock_twophase_postabort(TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
+
#ifdef LOCK_DEBUG
! extern void DumpLocks(PGPROC *proc);
extern void DumpAllLocks(void);
#endif
*** src/include/storage/lwlock.h.orig Thu May 19 17:35:47 2005
--- src/include/storage/lwlock.h Thu Jun 16 15:16:12 2005
***************
*** 46,51 ****
--- 46,52 ----
MultiXactMemberControlLock,
RelCacheInitLock,
BgWriterCommLock,
+ TwoPhaseStateLock,
NumFixedLWLocks, /* must be last except for
* MaxDynamicLWLock */
*** src/include/storage/proc.h.orig Thu May 19 17:35:47 2005
--- src/include/storage/proc.h Thu Jun 16 15:16:12 2005
***************
*** 46,51 ****
--- 46,58 ----
* links: list link for any list the PGPROC is in. When waiting for a lock,
* the PGPROC is linked into that lock's waitProcs queue. A recycled PGPROC
* is linked into ProcGlobal's freeProcs list.
+ *
+ * Note: twophase.c also sets up a dummy PGPROC struct for each currently
+ * prepared transaction. These PGPROCs appear in the ProcArray data structure
+ * so that the prepared transactions appear to be still running and are
+ * correctly shown as holding locks. A prepared transaction PGPROC can be
+ * distinguished from a real one at need by the fact that it has pid == 0.
+ * The semaphore and lock-related fields in a prepared-xact PGPROC are unused.
*/
struct PGPROC
{
***************
*** 62,77 ****
* were starting our xact: vacuum must not
* remove tuples deleted by xid >= xmin ! */
! int pid; /* This backend's process id */
Oid databaseId; /* OID of database this backend is using */
- /*
- * XLOG location of first XLOG record written by this backend's
- * current transaction. If backend is not in a transaction or hasn't
- * yet modified anything, logRec.xrecoff is zero.
- */
- XLogRecPtr logRec;
-
/* Info about LWLock the process is currently waiting for, if any. */
bool lwWaiting; /* true if waiting for an LW lock */
bool lwExclusive; /* true if waiting for exclusive access */
--- 69,77 ----
* were starting our xact: vacuum must not
* remove tuples deleted by xid >= xmin ! */
! int pid; /* This backend's process id, or 0 */
Oid databaseId; /* OID of database this backend is using */
/* Info about LWLock the process is currently waiting for, if any. */
bool lwWaiting; /* true if waiting for an LW lock */
bool lwExclusive; /* true if waiting for exclusive access */
***************
*** 120,130 ****
/*
* Function Prototypes
*/
! extern int ProcGlobalSemas(int maxBackends);
! extern int ProcGlobalShmemSize(int maxBackends);
! extern void InitProcGlobal(int maxBackends);
extern void InitProcess(void);
extern void InitDummyProcess(int proctype);
extern void ProcReleaseLocks(bool isCommit);
extern void ProcQueueInit(PROC_QUEUE *queue);
--- 120,131 ----
/*
* Function Prototypes
*/
! extern int ProcGlobalSemas(void);
! extern int ProcGlobalShmemSize(void);
! extern void InitProcGlobal(void);
extern void InitProcess(void);
extern void InitDummyProcess(int proctype);
+ extern bool HaveNFreeProcs(int n);
extern void ProcReleaseLocks(bool isCommit);
extern void ProcQueueInit(PROC_QUEUE *queue);
*** src/include/storage/procarray.h.orig Thu May 19 17:35:47 2005
--- src/include/storage/procarray.h Thu Jun 16 15:16:12 2005
***************
*** 14,34 ****
#ifndef PROCARRAY_H
#define PROCARRAY_H
! extern int ProcArrayShmemSize(int maxBackends);
! extern void CreateSharedProcArray(int maxBackends);
! extern void ProcArrayAddMyself(void);
! extern void ProcArrayRemoveMyself(void);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern TransactionId GetOldestXmin(bool allDbs);
! /* Use "struct PGPROC", not PGPROC, to avoid including proc.h here */
! extern struct PGPROC *BackendPidGetProc(int pid);
extern bool IsBackendPid(int pid);
extern bool DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself);
extern int CountActiveBackends(void);
- extern int CountEmptyBackendSlots(void);
extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, TransactionId *xids);
--- 14,36 ----
#ifndef PROCARRAY_H
#define PROCARRAY_H
! #include "storage/lock.h"
!
!
! extern int ProcArrayShmemSize(void);
! extern void CreateSharedProcArray(void);
! extern void ProcArrayAdd(PGPROC *proc);
! extern void ProcArrayRemove(PGPROC *proc);
extern bool TransactionIdIsInProgress(TransactionId xid);
+ extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(bool allDbs);
! extern PGPROC *BackendPidGetProc(int pid);
extern bool IsBackendPid(int pid);
extern bool DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself);
extern int CountActiveBackends(void);
extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, TransactionId *xids);
*** src/include/storage/smgr.h.orig Mon Jun 6 13:01:25 2005
--- src/include/storage/smgr.h Thu Jun 16 15:16:12 2005
***************
*** 79,84 ****
--- 79,85 ----
extern int smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
extern void AtSubCommit_smgr(void);
extern void AtSubAbort_smgr(void);
+ extern void PostPrepare_smgr(void);
extern void smgrcommit(void);
extern void smgrabort(void);
extern void smgrsync(void);
*** src/include/utils/builtins.h.orig Fri May 27 17:52:01 2005
--- src/include/utils/builtins.h Thu Jun 16 15:16:02 2005
***************
*** 825,830 ****
--- 825,833 ----
/* lockfuncs.c */
extern Datum pg_lock_status(PG_FUNCTION_ARGS);
+ /* access/transam/twophase.c */
+ extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
+
/* catalog/pg_conversion.c */
extern Datum pg_convert_using(PG_FUNCTION_ARGS);
*** src/include/utils/flatfiles.h.orig Wed May 11 12:14:04 2005
--- src/include/utils/flatfiles.h Thu Jun 16 16:00:53 2005
***************
*** 23,33 ****
--- 23,37 ----
extern void BuildFlatFiles(bool database_only);
+ extern void AtPrepare_UpdateFlatFiles(void);
extern void AtEOXact_UpdateFlatFiles(bool isCommit);
extern void AtEOSubXact_UpdateFlatFiles(bool isCommit,
SubTransactionId mySubid,
SubTransactionId parentSubid);
extern Datum flatfile_update_trigger(PG_FUNCTION_ARGS);
+
+ extern void flatfile_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
#endif /* FLATFILES_H */
*** src/include/utils/inval.h.orig Fri Dec 31 17:47:09 2004
--- src/include/utils/inval.h Thu Jun 16 16:00:54 2005
***************
*** 30,35 ****
--- 30,39 ----
extern void AtEOSubXact_Inval(bool isCommit);
+ extern void AtPrepare_Inval(void);
+
+ extern void PostPrepare_Inval(void);
+
extern void CommandEndInvalidationMessages(void);
extern void CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple);
***************
*** 46,50 ****
--- 50,57 ----
extern void CacheRegisterRelcacheCallback(CacheCallbackFunction func,
Datum arg);
+
+ extern void inval_twophase_postcommit(TransactionId xid, uint16 info,
+ void *recdata, uint32 len);
#endif /* INVAL_H */
*** src/include/utils/portal.h.orig Mon Apr 11 15:51:16 2005
--- src/include/utils/portal.h Fri Jun 17 11:08:32 2005
***************
*** 183,188 ****
--- 183,189 ----
/* Prototypes for functions in utils/mmgr/portalmem.c */
extern void EnablePortalManager(void);
extern bool CommitHoldablePortals(void);
+ extern bool PrepareHoldablePortals(void);
extern void AtCommit_Portals(void);
extern void AtAbort_Portals(void);
extern void AtCleanup_Portals(void);
*** src/test/regress/expected/prepared_xacts.out.orig Thu Jun 16 15:15:51 2005
--- src/test/regress/expected/prepared_xacts.out Thu Jun 16 18:36:35 2005
***************
*** 0 ****
--- 1,213 ----
+ --
+ -- PREPARED TRANSACTIONS (two-phase commit)
+ --
+ -- We can't readily test persistence of prepared xacts within the
+ -- regression script framework, unfortunately. Note that a crash
+ -- isn't really needed ... stopping and starting the postmaster would
+ -- be enough, but we can't even do that here.
+ -- create a simple table that we'll use in the tests
+ CREATE TABLE pxtest1 (foobar VARCHAR(10));
+ INSERT INTO pxtest1 VALUES ('aaa');
+ -- Test PREPARE TRANSACTION
+ BEGIN;
+ UPDATE pxtest1 SET foobar = 'bbb' WHERE foobar = 'aaa';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ bbb
+ (1 row)
+
+ PREPARE TRANSACTION 'foo1';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ (1 row)
+
+ -- Test pg_prepared_xacts system view
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ ------
+ foo1
+ (1 row)
+
+ -- Test ROLLBACK PREPARED
+ ROLLBACK PREPARED 'foo1';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ (1 row)
+
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ -----
+ (0 rows)
+
+ -- Test COMMIT PREPARED
+ BEGIN;
+ INSERT INTO pxtest1 VALUES ('ddd');
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ ddd
+ (2 rows)
+
+ PREPARE TRANSACTION 'foo2';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ (1 row)
+
+ COMMIT PREPARED 'foo2';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ ddd
+ (2 rows)
+
+ -- Test duplicate gids
+ BEGIN;
+ UPDATE pxtest1 SET foobar = 'eee' WHERE foobar = 'ddd';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ eee
+ (2 rows)
+
+ PREPARE TRANSACTION 'foo3';
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ ------
+ foo3
+ (1 row)
+
+ BEGIN;
+ INSERT INTO pxtest1 VALUES ('fff');
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ ddd
+ fff
+ (3 rows)
+
+ -- This should fail, because the gid foo3 is already in use
+ PREPARE TRANSACTION 'foo3';
+ ERROR: global transaction identifier "foo3" is already in use
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ ddd
+ (2 rows)
+
+ ROLLBACK PREPARED 'foo3';
+ SELECT * FROM pxtest1;
+ foobar
+ --------
+ aaa
+ ddd
+ (2 rows)
+
+ -- Clean up
+ DROP TABLE pxtest1;
+ -- Test subtransactions
+ BEGIN;
+ CREATE TABLE pxtest2 (a int);
+ INSERT INTO pxtest2 VALUES (1);
+ SAVEPOINT a;
+ INSERT INTO pxtest2 VALUES (2);
+ ROLLBACK TO a;
+ SAVEPOINT b;
+ INSERT INTO pxtest2 VALUES (3);
+ PREPARE TRANSACTION 'regress-one';
+ CREATE TABLE pxtest3(fff int);
+ -- Test shared invalidation
+ BEGIN;
+ DROP TABLE pxtest3;
+ CREATE TABLE pxtest4 (a int);
+ INSERT INTO pxtest4 VALUES (1);
+ INSERT INTO pxtest4 VALUES (2);
+ DECLARE foo CURSOR FOR SELECT * FROM pxtest4;
+ -- Fetch 1 tuple, keeping the cursor open
+ FETCH 1 FROM foo;
+ a
+ ---
+ 1
+ (1 row)
+
+ PREPARE TRANSACTION 'regress-two';
+ -- No such cursor
+ FETCH 1 FROM foo;
+ ERROR: cursor "foo" does not exist
+ -- Table doesn't exist, the creation hasn't been committed yet
+ SELECT * FROM pxtest2;
+ ERROR: relation "pxtest2" does not exist
+ -- There should be two prepared transactions
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ -------------
+ regress-one
+ regress-two
+ (2 rows)
+
+ -- pxtest3 should be locked because of the pending DROP
+ set statement_timeout to 1000;
+ SELECT * FROM pxtest3;
+ ERROR: canceling query due to user request
+ reset statement_timeout;
+ -- Disconnect, we will continue testing in a different backend
+ \c -
+ -- There should still be two prepared transactions
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ -------------
+ regress-one
+ regress-two
+ (2 rows)
+
+ -- pxtest3 should still be locked because of the pending DROP
+ set statement_timeout to 1000;
+ SELECT * FROM pxtest3;
+ ERROR: canceling query due to user request
+ reset statement_timeout;
+ -- Commit table creation
+ COMMIT PREPARED 'regress-one';
+ \d pxtest2
+ Table "public.pxtest2"
+ Column | Type | Modifiers
+ --------+---------+-----------
+ a | integer |
+
+ SELECT * FROM pxtest2;
+ a
+ ---
+ 1
+ 3
+ (2 rows)
+
+ -- There should be one prepared transaction
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ -------------
+ regress-two
+ (1 row)
+
+ -- Commit table drop
+ COMMIT PREPARED 'regress-two';
+ SELECT * FROM pxtest3;
+ ERROR: relation "pxtest3" does not exist
+ -- There should be no prepared transactions
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ -----
+ (0 rows)
+
+ -- Clean up
+ DROP TABLE pxtest2;
+ DROP TABLE pxtest4;
*** src/test/regress/expected/rules.out.orig Tue May 17 17:24:11 2005
--- src/test/regress/expected/rules.out Thu Jun 16 18:36:43 2005
***************
*** 1279,1284 ****
--- 1279,1285 ----
iexit | SELECT ih.name, ih.thepath, interpt_pp(ih.thepath, r.thepath) AS exit FROM ihighway ih, ramp r WHERE (ih.thepath ## r.thepath);
pg_indexes | SELECT n.nspname AS schemaname, c.relname AS tablename, i.relname AS indexname, t.spcname AS "tablespace", pg_get_indexdef(i.oid) AS indexdef FROM ((((pg_index x JOIN pg_class c ON ((c.oid = x.indrelid))) JOIN pg_class i ON ((i.oid = x.indexrelid))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) LEFT JOIN pg_tablespace t ON ((t.oid = i.reltablespace))) WHERE ((c.relkind = 'r'::"char") AND (i.relkind = 'i'::"char"));
pg_locks | SELECT l.locktype, l."database", l.relation, l.page, l.tuple, l."transaction", l.classid, l.objid, l.objsubid, l.pid, l."mode", l.granted FROM pg_lock_status() l(locktype text, "database" oid, relation oid, page integer, tuple smallint, "transaction" xid, classid oid, objid oid, objsubid smallint, pid integer, "mode" text, granted boolean);
+ pg_prepared_xacts | SELECT p."transaction", p.gid, u.usename AS "owner", d.datname AS "database" FROM ((pg_prepared_xact() p("transaction" xid, gid text, ownerid integer, dbid oid) LEFT JOIN pg_database d ON ((p.dbid = d.oid))) LEFT JOIN pg_shadow u ON ((p.ownerid = u.usesysid)));
pg_rules | SELECT n.nspname AS schemaname, c.relname AS tablename, r.rulename, pg_get_ruledef(r.oid) AS definition FROM ((pg_rewrite r JOIN pg_class c ON ((c.oid = r.ev_class))) LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) WHERE (r.rulename <> '_RETURN'::name);
pg_settings | SELECT a.name, a.setting, a.category, a.short_desc, a.extra_desc, a.context, a.vartype, a.source, a.min_val, a.max_val FROM pg_show_all_settings() a(name text, setting text, category text, short_desc text, extra_desc text, context text, vartype text, source text, min_val text, max_val text);
pg_stat_activity | SELECT d.oid AS datid, d.datname, pg_stat_get_backend_pid(s.backendid) AS procpid, pg_stat_get_backend_userid(s.backendid) AS usesysid, u.usename, pg_stat_get_backend_activity(s.backendid) AS current_query, pg_stat_get_backend_activity_start(s.backendid) AS query_start, pg_stat_get_backend_start(s.backendid) AS backend_start, pg_stat_get_backend_client_addr(s.backendid) AS client_addr, pg_stat_get_backend_client_port(s.backendid) AS client_port FROM pg_database d, (SELECT pg_stat_get_backend_idset() AS backendid) s, pg_shadow u WHERE ((pg_stat_get_backend_dbid(s.backendid) = d.oid) AND (pg_stat_get_backend_userid(s.backendid) = u.usesysid));
***************
*** 1316,1322 ****
shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color))));
street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath);
toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp;
! (40 rows)
SELECT tablename, rulename, definition FROM pg_rules
ORDER BY tablename, rulename;
--- 1317,1323 ----
shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color))));
street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath);
toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp;
! (41 rows)
SELECT tablename, rulename, definition FROM pg_rules
ORDER BY tablename, rulename;
*** src/test/regress/parallel_schedule.orig Fri Jun 18 02:14:25 2004
--- src/test/regress/parallel_schedule Thu Jun 16 15:15:38 2005
***************
*** 60,66 ****
# ----------
# The fourth group of parallel test
# ----------
! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace
test: privileges
test: misc
--- 60,66 ----
# ----------
# The fourth group of parallel test
# ----------
! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace prepared_xacts
test: privileges
test: misc
*** src/test/regress/serial_schedule.orig Fri Jun 18 02:14:25 2004
--- src/test/regress/serial_schedule Thu Jun 16 15:15:39 2005
***************
*** 74,79 ****
--- 74,80 ----
test: hash_index
test: update
test: namespace
+ test: prepared_xacts
test: privileges
test: misc
test: select_views
*** src/test/regress/sql/prepared_xacts.sql.orig Thu Jun 16 15:15:22 2005
--- src/test/regress/sql/prepared_xacts.sql Thu Jun 16 18:22:06 2005
***************
*** 0 ****
--- 1,137 ----
+ --
+ -- PREPARED TRANSACTIONS (two-phase commit)
+ --
+ -- We can't readily test persistence of prepared xacts within the
+ -- regression script framework, unfortunately. Note that a crash
+ -- isn't really needed ... stopping and starting the postmaster would
+ -- be enough, but we can't even do that here.
+
+
+ -- create a simple table that we'll use in the tests
+ CREATE TABLE pxtest1 (foobar VARCHAR(10));
+
+ INSERT INTO pxtest1 VALUES ('aaa');
+
+
+ -- Test PREPARE TRANSACTION
+ BEGIN;
+ UPDATE pxtest1 SET foobar = 'bbb' WHERE foobar = 'aaa';
+ SELECT * FROM pxtest1;
+ PREPARE TRANSACTION 'foo1';
+
+ SELECT * FROM pxtest1;
+
+ -- Test pg_prepared_xacts system view
+ SELECT gid FROM pg_prepared_xacts;
+
+ -- Test ROLLBACK PREPARED
+ ROLLBACK PREPARED 'foo1';
+
+ SELECT * FROM pxtest1;
+
+ SELECT gid FROM pg_prepared_xacts;
+
+
+ -- Test COMMIT PREPARED
+ BEGIN;
+ INSERT INTO pxtest1 VALUES ('ddd');
+ SELECT * FROM pxtest1;
+ PREPARE TRANSACTION 'foo2';
+
+ SELECT * FROM pxtest1;
+
+ COMMIT PREPARED 'foo2';
+
+ SELECT * FROM pxtest1;
+
+ -- Test duplicate gids
+ BEGIN;
+ UPDATE pxtest1 SET foobar = 'eee' WHERE foobar = 'ddd';
+ SELECT * FROM pxtest1;
+ PREPARE TRANSACTION 'foo3';
+
+ SELECT gid FROM pg_prepared_xacts;
+
+ BEGIN;
+ INSERT INTO pxtest1 VALUES ('fff');
+ SELECT * FROM pxtest1;
+
+ -- This should fail, because the gid foo3 is already in use
+ PREPARE TRANSACTION 'foo3';
+
+ SELECT * FROM pxtest1;
+
+ ROLLBACK PREPARED 'foo3';
+
+ SELECT * FROM pxtest1;
+
+ -- Clean up
+ DROP TABLE pxtest1;
+
+ -- Test subtransactions
+ BEGIN;
+ CREATE TABLE pxtest2 (a int);
+ INSERT INTO pxtest2 VALUES (1);
+ SAVEPOINT a;
+ INSERT INTO pxtest2 VALUES (2);
+ ROLLBACK TO a;
+ SAVEPOINT b;
+ INSERT INTO pxtest2 VALUES (3);
+ PREPARE TRANSACTION 'regress-one';
+
+ CREATE TABLE pxtest3(fff int);
+
+ -- Test shared invalidation
+ BEGIN;
+ DROP TABLE pxtest3;
+ CREATE TABLE pxtest4 (a int);
+ INSERT INTO pxtest4 VALUES (1);
+ INSERT INTO pxtest4 VALUES (2);
+ DECLARE foo CURSOR FOR SELECT * FROM pxtest4;
+ -- Fetch 1 tuple, keeping the cursor open
+ FETCH 1 FROM foo;
+ PREPARE TRANSACTION 'regress-two';
+
+ -- No such cursor
+ FETCH 1 FROM foo;
+
+ -- Table doesn't exist, the creation hasn't been committed yet
+ SELECT * FROM pxtest2;
+
+ -- There should be two prepared transactions
+ SELECT gid FROM pg_prepared_xacts;
+
+ -- pxtest3 should be locked because of the pending DROP
+ set statement_timeout to 1000;
+ SELECT * FROM pxtest3;
+ reset statement_timeout;
+
+ -- Disconnect, we will continue testing in a different backend
+ \c -
+
+ -- There should still be two prepared transactions
+ SELECT gid FROM pg_prepared_xacts;
+
+ -- pxtest3 should still be locked because of the pending DROP
+ set statement_timeout to 1000;
+ SELECT * FROM pxtest3;
+ reset statement_timeout;
+
+ -- Commit table creation
+ COMMIT PREPARED 'regress-one';
+ \d pxtest2
+ SELECT * FROM pxtest2;
+
+ -- There should be one prepared transaction
+ SELECT gid FROM pg_prepared_xacts;
+
+ -- Commit table drop
+ COMMIT PREPARED 'regress-two';
+ SELECT * FROM pxtest3;
+
+ -- There should be no prepared transactions
+ SELECT gid FROM pg_prepared_xacts;
+
+ -- Clean up
+ DROP TABLE pxtest2;
+ DROP TABLE pxtest4;