diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c new file mode 100644 index 2812681..f2b88a5 *** a/src/backend/access/transam/twophase.c --- b/src/backend/access/transam/twophase.c *************** RecordTransactionCommitPrepared(Transact *** 1964,1972 **** bool initfileinval) { XLogRecData rdata[4]; - int lastrdata = 0; - xl_xact_commit_prepared xlrec; XLogRecPtr recptr; START_CRIT_SECTION(); --- 1964,1987 ---- bool initfileinval) { XLogRecData rdata[4]; XLogRecPtr recptr; + int lastrdata = 0; + xl_xact_commit_prepared *xlrec; + int xlrec_len = MinSizeOfXactCommitPrepared; + if (nrels > 0) + { + xlrec_len += sizeof(int); + } + if (nchildren > 0) + { + xlrec_len += sizeof(int); + } + if (ninvalmsgs > 0) + { + xlrec_len += sizeof(int); + } + + xlrec = (xl_xact_commit_prepared *)palloc(xlrec_len); START_CRIT_SECTION(); *************** RecordTransactionCommitPrepared(Transact *** 1974,1994 **** MyProc->inCommit = true; /* Emit the XLOG commit record */ ! xlrec.xid = xid; ! xlrec.crec.xact_time = GetCurrentTimestamp(); ! xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0; ! xlrec.crec.nmsgs = 0; ! xlrec.crec.nrels = nrels; ! xlrec.crec.nsubxacts = nchildren; ! xlrec.crec.nmsgs = ninvalmsgs; ! ! rdata[0].data = (char *) (&xlrec); ! rdata[0].len = MinSizeOfXactCommitPrepared; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; --- 1989,2006 ---- MyProc->inCommit = true; /* Emit the XLOG commit record */ ! xlrec->xid = xid; ! xlrec->crec.xact_time = GetCurrentTimestamp(); ! xlrec->crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0; ! rdata[0].data = (char *) (xlrec); ! rdata[0].len = xlrec_len; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); + xlrec->crec.xinfo |= XACT_COMPLETION_DROP_REL_NODES_PRESENT; + xlrec->crec.counts[0] = nrels; rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; *************** RecordTransactionCommitPrepared(Transact *** 1998,2012 **** if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); rdata[2].data = (char *) children; rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } ! /* dump cache invalidation messages */ if (ninvalmsgs > 0) { rdata[lastrdata].next = &(rdata[3]); rdata[3].data = (char *) invalmsgs; rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; --- 2010,2028 ---- if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); + xlrec->crec.xinfo |= XACT_COMPLETION_COMMITTED_SUB_PRESENT; + xlrec->crec.counts[lastrdata] = nchildren; rdata[2].data = (char *) children; rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; lastrdata = 2; } ! /* dump shared cache invalidation messages */ if (ninvalmsgs > 0) { rdata[lastrdata].next = &(rdata[3]); + xlrec->crec.counts[lastrdata] = ninvalmsgs; + xlrec->crec.xinfo |= XACT_COMPLETION_SHARED_INV_PRESENT; rdata[3].data = (char *) invalmsgs; rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c new file mode 100644 index 2ca1c14..f758b41 *** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** RecordTransactionCommit(void) *** 964,970 **** */ XLogRecData rdata[4]; int lastrdata = 0; ! xl_xact_commit xlrec; /* Tell bufmgr and smgr to prepare for commit */ BufmgrCommit(); --- 964,985 ---- */ XLogRecData rdata[4]; int lastrdata = 0; ! xl_xact_commit *xlrec; ! int xlrec_len = MinSizeOfXactCommit; ! if (nrels > 0) ! { ! xlrec_len += sizeof(int); ! } ! if (nchildren > 0) ! { ! xlrec_len += sizeof(int); ! } ! if (nmsgs > 0) ! { ! xlrec_len += sizeof(int); ! } ! ! xlrec = (xl_xact_commit *)palloc(xlrec_len); /* Tell bufmgr and smgr to prepare for commit */ BufmgrCommit(); *************** RecordTransactionCommit(void) *** 972,985 **** /* * Set flags required for recovery processing of commits. */ ! xlrec.xinfo = 0; if (RelcacheInitFileInval) ! xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; if (forceSyncCommit) ! xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; ! xlrec.dbId = MyDatabaseId; ! xlrec.tsId = MyDatabaseTableSpace; /* * Mark ourselves as within our "commit critical section". This --- 987,1000 ---- /* * Set flags required for recovery processing of commits. */ ! xlrec->xinfo = 0; if (RelcacheInitFileInval) ! xlrec->xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; if (forceSyncCommit) ! xlrec->xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; ! xlrec->dbId = MyDatabaseId; ! xlrec->tsId = MyDatabaseTableSpace; /* * Mark ourselves as within our "commit critical section". This *************** RecordTransactionCommit(void) *** 1002,1018 **** MyProc->inCommit = true; SetCurrentTransactionStopTimestamp(); ! xlrec.xact_time = xactStopTimestamp; ! xlrec.nrels = nrels; ! xlrec.nsubxacts = nchildren; ! xlrec.nmsgs = nmsgs; ! rdata[0].data = (char *) (&xlrec); ! rdata[0].len = MinSizeOfXactCommit; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; --- 1017,1032 ---- MyProc->inCommit = true; SetCurrentTransactionStopTimestamp(); ! xlrec->xact_time = xactStopTimestamp; ! rdata[0].data = (char *) (xlrec); ! rdata[0].len = xlrec_len; rdata[0].buffer = InvalidBuffer; /* dump rels to delete */ if (nrels > 0) { rdata[0].next = &(rdata[1]); + xlrec->xinfo |= XACT_COMPLETION_DROP_REL_NODES_PRESENT; + xlrec->counts[0] = nrels; rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; *************** RecordTransactionCommit(void) *** 1022,1027 **** --- 1036,1043 ---- if (nchildren > 0) { rdata[lastrdata].next = &(rdata[2]); + xlrec->xinfo |= XACT_COMPLETION_COMMITTED_SUB_PRESENT; + xlrec->counts[lastrdata] = nchildren; rdata[2].data = (char *) children; rdata[2].len = nchildren * sizeof(TransactionId); rdata[2].buffer = InvalidBuffer; *************** RecordTransactionCommit(void) *** 1031,1036 **** --- 1047,1054 ---- if (nmsgs > 0) { rdata[lastrdata].next = &(rdata[3]); + xlrec->counts[lastrdata] = nmsgs; + xlrec->xinfo |= XACT_COMPLETION_SHARED_INV_PRESENT; rdata[3].data = (char *) invalMessages; rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); rdata[3].buffer = InvalidBuffer; *************** xactGetCommittedChildren(TransactionId * *** 4437,4459 **** */ /* * Before 9.0 this was a fairly short function, but now it performs many * actions for which the order of execution is critical. */ static void xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) { TransactionId *sub_xids; SharedInvalidationMessage *inval_msgs; TransactionId max_xid; int i; ! /* subxid array follows relfilenodes */ ! sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); ! /* invalidation messages array follows subxids */ ! inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]); ! ! max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. --- 4455,4520 ---- */ /* + * Based on xlrec->xinfo, reads the optional arrays out of xlrec. + * The pointers returned by the function are valid only in case the + * corresponding int is not 0. + */ + static void + xact_read_commit(xl_xact_commit *xlrec, + RelFileNode **drop_rels, + TransactionId **sub_xids, + SharedInvalidationMessage **inval_msgs, + int *nrels, + int *nsubxacts, + int *nmsgs) + { + int lastpos = 0; + *nrels = 0; + *nsubxacts = 0; + *nmsgs = 0; + if (XactCompletionDropRelPresent(xlrec)) + { + *nrels = xlrec->counts[lastpos]; + lastpos++; + } + + if (XactCompletionCommittedSubPresent(xlrec)) + { + *nsubxacts = xlrec->counts[lastpos]; + lastpos++; + } + + if (XactCompletionSharedInvPresent(xlrec)) + { + *nmsgs = xlrec->counts[lastpos]; + lastpos++; + } + + + *drop_rels = (RelFileNode *) &xlrec->counts[lastpos]; + *sub_xids = (TransactionId *) &drop_rels[*nrels]; + *inval_msgs = (SharedInvalidationMessage *) &sub_xids[*nsubxacts]; + } + + /* * Before 9.0 this was a fairly short function, but now it performs many * actions for which the order of execution is critical. */ static void xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) { + RelFileNode *drop_rels; TransactionId *sub_xids; SharedInvalidationMessage *inval_msgs; TransactionId max_xid; int i; + int nrels = 0; + int nsubxacts = 0; + int nmsgs = 0; ! xact_read_commit(xlrec, &drop_rels, &sub_xids, &inval_msgs, &nrels, ! &nsubxacts, &nmsgs); ! max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); /* * Make sure nextXid is beyond any XID mentioned in the record. *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4476,4482 **** /* * Mark the transaction committed in pg_clog. */ ! TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids); } else { --- 4537,4543 ---- /* * Mark the transaction committed in pg_clog. */ ! TransactionIdCommitTree(xid, nsubxacts, sub_xids); } else { *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4500,4518 **** * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ ! TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ ! ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, XactCompletionRelcacheInitFileInval(xlrec), xlrec->dbId, xlrec->tsId); --- 4561,4579 ---- * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ ! TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn); /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ ! ProcessCommittedInvalidationMessages(inval_msgs, nmsgs, XactCompletionRelcacheInitFileInval(xlrec), xlrec->dbId, xlrec->tsId); *************** xact_redo_commit(xl_xact_commit *xlrec, *** 4521,4540 **** * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ ! StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ ! for (i = 0; i < xlrec->nrels; i++) { ! SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { ! XLogDropRelation(xlrec->xnodes[i], fork); smgrdounlink(srel, fork, true); } } --- 4582,4601 ---- * phase transactions. In effect we are ignoring the prepare phase and * just going straight to lock release. */ ! StandbyReleaseLockTree(xid, nsubxacts, sub_xids); } /* Make sure files supposed to be dropped are dropped */ ! for (i = 0; i < nrels; i++) { ! SMgrRelation srel = smgropen(drop_rels[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) { if (smgrexists(srel, fork)) { ! XLogDropRelation(drop_rels[i], fork); smgrdounlink(srel, fork, true); } } *************** xact_desc_commit(StringInfo buf, xl_xact *** 4704,4745 **** { int i; TransactionId *xacts; ! xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); ! if (xlrec->nrels > 0) { appendStringInfo(buf, "; rels:"); ! for (i = 0; i < xlrec->nrels; i++) { ! char *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM); appendStringInfo(buf, " %s", path); pfree(path); } } ! if (xlrec->nsubxacts > 0) { appendStringInfo(buf, "; subxacts:"); ! for (i = 0; i < xlrec->nsubxacts; i++) appendStringInfo(buf, " %u", xacts[i]); } ! if (xlrec->nmsgs > 0) { - SharedInvalidationMessage *msgs; - - msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts]; - if (XactCompletionRelcacheInitFileInval(xlrec)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); appendStringInfo(buf, "; inval msgs:"); ! for (i = 0; i < xlrec->nmsgs; i++) { ! SharedInvalidationMessage *msg = &msgs[i]; if (msg->id >= 0) appendStringInfo(buf, " catcache %d", msg->id); --- 4765,4808 ---- { int i; TransactionId *xacts; + RelFileNode *drop_rels; + SharedInvalidationMessage *inval_msgs; + int nrels = 0; + int nsubxacts = 0; + int nmsgs = 0; ! xact_read_commit(xlrec, &drop_rels, &xacts, &inval_msgs, &nrels, ! &nsubxacts, &nmsgs); appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); ! if (nrels > 0) { appendStringInfo(buf, "; rels:"); ! for (i = 0; i < nrels; i++) { ! char *path = relpathperm(drop_rels[i], MAIN_FORKNUM); appendStringInfo(buf, " %s", path); pfree(path); } } ! if (nsubxacts > 0) { appendStringInfo(buf, "; subxacts:"); ! for (i = 0; i < nsubxacts; i++) appendStringInfo(buf, " %u", xacts[i]); } ! if (nmsgs > 0) { if (XactCompletionRelcacheInitFileInval(xlrec)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", xlrec->dbId, xlrec->tsId); appendStringInfo(buf, "; inval msgs:"); ! for (i = 0; i < nmsgs; i++) { ! SharedInvalidationMessage *msg = &inval_msgs[i]; if (msg->id >= 0) appendStringInfo(buf, " catcache %d", msg->id); diff --git a/src/include/access/xact.h b/src/include/access/xact.h new file mode 100644 index cb440d4..402437f *** a/src/include/access/xact.h --- b/src/include/access/xact.h *************** typedef struct xl_xact_assignment *** 116,137 **** #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ uint32 xinfo; /* info flags */ - int nrels; /* number of RelFileNodes */ - int nsubxacts; /* number of subtransaction XIDs */ - int nmsgs; /* number of shared inval msgs */ Oid dbId; /* MyDatabaseId */ Oid tsId; /* MyDatabaseTableSpace */ ! /* Array of RelFileNode(s) to drop at commit */ ! RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */ ! /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ ! /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ } xl_xact_commit; ! #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) /* * These flags are set in the xinfo fields of WAL commit records, --- 116,146 ---- #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) + /* + * As of 9.2, the record format changed to reduce space: instead of having + * ints that represent the length of the optional arrays at the end of + * xl_xact_commit, some bits in xinfo are used to flag which optional arrays + * are, in fact, present. For each present array, an int that represents the + * array length is placed at the end of the struct. The present arrays are + * stored after those ints. + */ typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ uint32 xinfo; /* info flags */ Oid dbId; /* MyDatabaseId */ Oid tsId; /* MyDatabaseTableSpace */ ! int counts[1]; /* variable-length array of counts, xinfo flags */ ! /* define length of array and meaning of counts */ ! /* OPTIONAL RELATIONS TO DROP LENGTH FOLLOW */ ! /* OPTIONAL COMMITTED SUBTRANSACTION XIDs LENGTH FOLLOW */ ! /* OPTIONAL SHARED INVALIDATION MESSAGES LENGTH FOLLOW */ ! /* ARRAY OF OPTIONAL RELATIONS TO DROP FOLLOWS */ ! /* ARRAY OF OPTIONAL COMMITTED SUBTRANSACTION XIDs FOLLOWS */ ! /* ARRAY OF OPTIONAL SHARED INVALIDATION MESSAGES FOLLOWS */ } xl_xact_commit; ! #define MinSizeOfXactCommit offsetof(xl_xact_commit, counts) /* * These flags are set in the xinfo fields of WAL commit records, *************** typedef struct xl_xact_commit *** 148,153 **** --- 157,176 ---- #define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) #define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) + /* + * These flags are set in the xinfo fields of WAL commit records, + * indicating the presence of optionals xl_xact_commit_opt structures + * (RelFileNodes to drop at commit, subtransaction XIDs, shared inval msgs) + */ + #define XACT_COMPLETION_DROP_REL_NODES_PRESENT 0x04 + #define XACT_COMPLETION_COMMITTED_SUB_PRESENT 0x08 + #define XACT_COMPLETION_SHARED_INV_PRESENT 0x10 + + /* Access macros for above flags */ + #define XactCompletionDropRelPresent(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_DROP_REL_NODES_PRESENT) + #define XactCompletionCommittedSubPresent(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_COMMITTED_SUB_PRESENT) + #define XactCompletionSharedInvPresent(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_SHARED_INV_PRESENT) + typedef struct xl_xact_abort { TimestampTz xact_time; /* time of abort */ *************** typedef struct xl_xact_commit_prepared *** 176,182 **** /* MORE DATA FOLLOWS AT END OF STRUCT */ } xl_xact_commit_prepared; ! #define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes) typedef struct xl_xact_abort_prepared { --- 199,205 ---- /* MORE DATA FOLLOWS AT END OF STRUCT */ } xl_xact_commit_prepared; ! #define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.counts) typedef struct xl_xact_abort_prepared {