Re: Make CLUSTER MVCC-safe
От | Heikki Linnakangas |
---|---|
Тема | Re: Make CLUSTER MVCC-safe |
Дата | |
Msg-id | 460B7B61.5090902@enterprisedb.com обсуждение исходный текст |
Ответ на | Make CLUSTER MVCC-safe (Heikki Linnakangas <heikki@enterprisedb.com>) |
Ответы |
Re: Make CLUSTER MVCC-safe
Re: Make CLUSTER MVCC-safe Re: Make CLUSTER MVCC-safe |
Список | pgsql-patches |
Here's an update, fixing conflict by Tom's recent commit of Simon's patch to skip WAL-inserts when archiving is not enabled. Heikki Linnakangas wrote: > This patch makes CLUSTER MVCC-safe. Visibility information and update > chains are preserved like in VACUUM FULL. > > I created a new generic rewriteheap-facility to handle rewriting tables > in a visibility-preserving manner. All the update chain tracking is done > in rewriteheap.c, the caller is responsible for supplying the stream of > tuples. > > CLUSTER is currently the only user of the facility, but I'm envisioning > we might have other users in the future. For example, a version of > VACUUM FULL that rewrites the whole table. We could also use it to make > ALTER TABLE MVCC-safe, but there's some issues with that. For example, > what to do if RECENTLY_DEAD tuples don't satisfy a newly added constraint. > > One complication in the implementation was the fact that heap_insert > overwrites the visibility information, and it doesn't write the full > tuple header to WAL. I ended up implementing a special-purpose > raw_heap_insert function instead, which is optimized for bulk inserting > a lot of tuples, knowing that we have exclusive access to the heap. > raw_heap_insert keeps the current buffer locked over calls, until it > gets full, and inserts the whole page to WAL as a single record using > the existing XLOG_HEAP_NEWPAGE record type. > > This makes CLUSTER a more viable alternative to VACUUM FULL. One > motivation for making CLUSTER MVCC-safe is that if some poor soul runs > pg_dump to make a backup concurrently with CLUSTER, the clustered tables > will appear to be empty in the dump file. > > The documentation doesn't anything about CLUSTER not being MVCC-safe, so > I suppose there's no need to touch the docs. I sent a doc patch earlier > to add a note about it, that doc patch should still be applied to older > release branches, IMO. -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com Index: src/backend/access/heap/heapam.c =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v retrieving revision 1.230 diff -c -r1.230 heapam.c *** src/backend/access/heap/heapam.c 29 Mar 2007 00:15:37 -0000 1.230 --- src/backend/access/heap/heapam.c 29 Mar 2007 08:27:21 -0000 *************** *** 3280,3285 **** --- 3280,3322 ---- return log_heap_update(reln, oldbuf, from, newbuf, newtup, true); } + /* + * Perofrm XLogInsert of a full page image to WAL. Caller must make sure + * the page contents on disk are consistent with the page inserted to WAL. + */ + XLogRecPtr + log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page) + { + xl_heap_newpage xlrec; + XLogRecPtr recptr; + XLogRecData rdata[2]; + + /* NO ELOG(ERROR) from here till newpage op is logged */ + START_CRIT_SECTION(); + + xlrec.node = *rnode; + xlrec.blkno = blkno; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapNewpage; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + rdata[1].data = (char *) page; + rdata[1].len = BLCKSZ; + rdata[1].buffer = InvalidBuffer; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata); + + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + + return recptr; + } + static void heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) { Index: src/backend/access/nbtree/nbtsort.c =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtsort.c,v retrieving revision 1.110 diff -c -r1.110 nbtsort.c *** src/backend/access/nbtree/nbtsort.c 9 Jan 2007 02:14:10 -0000 1.110 --- src/backend/access/nbtree/nbtsort.c 20 Mar 2007 17:58:20 -0000 *************** *** 64,69 **** --- 64,70 ---- #include "postgres.h" + #include "access/heapam.h" #include "access/nbtree.h" #include "miscadmin.h" #include "storage/smgr.h" *************** *** 265,296 **** if (wstate->btws_use_wal) { /* We use the heap NEWPAGE record type for this */ ! xl_heap_newpage xlrec; ! XLogRecPtr recptr; ! XLogRecData rdata[2]; ! ! /* NO ELOG(ERROR) from here till newpage op is logged */ ! START_CRIT_SECTION(); ! ! xlrec.node = wstate->index->rd_node; ! xlrec.blkno = blkno; ! ! rdata[0].data = (char *) &xlrec; ! rdata[0].len = SizeOfHeapNewpage; ! rdata[0].buffer = InvalidBuffer; ! rdata[0].next = &(rdata[1]); ! ! rdata[1].data = (char *) page; ! rdata[1].len = BLCKSZ; ! rdata[1].buffer = InvalidBuffer; ! rdata[1].next = NULL; ! ! recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata); ! ! PageSetLSN(page, recptr); ! PageSetTLI(page, ThisTimeLineID); ! ! END_CRIT_SECTION(); } else { --- 266,272 ---- if (wstate->btws_use_wal) { /* We use the heap NEWPAGE record type for this */ ! log_newpage(&wstate->index->rd_node, blkno, page); } else { Index: src/backend/commands/Makefile =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/Makefile,v retrieving revision 1.35 diff -c -r1.35 Makefile *** src/backend/commands/Makefile 20 Jan 2007 17:16:11 -0000 1.35 --- src/backend/commands/Makefile 20 Mar 2007 17:04:58 -0000 *************** *** 16,22 **** conversioncmds.o copy.o \ dbcommands.o define.o explain.o functioncmds.o \ indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \ ! portalcmds.o prepare.o proclang.o \ schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \ typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o --- 16,22 ---- conversioncmds.o copy.o \ dbcommands.o define.o explain.o functioncmds.o \ indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \ ! rewriteheap.o portalcmds.o prepare.o proclang.o \ schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \ typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o Index: src/backend/commands/cluster.c =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/cluster.c,v retrieving revision 1.158 diff -c -r1.158 cluster.c *** src/backend/commands/cluster.c 29 Mar 2007 00:15:37 -0000 1.158 --- src/backend/commands/cluster.c 29 Mar 2007 08:28:57 -0000 *************** *** 20,25 **** --- 20,26 ---- #include "access/genam.h" #include "access/heapam.h" #include "access/xact.h" + #include "access/transam.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/heap.h" *************** *** 36,41 **** --- 37,44 ---- #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/relcache.h" + #include "storage/procarray.h" + #include "commands/rewriteheap.h" /* *************** *** 653,659 **** char *nulls; IndexScanDesc scan; HeapTuple tuple; ! CommandId mycid = GetCurrentCommandId(); bool use_wal; /* --- 656,663 ---- char *nulls; IndexScanDesc scan; HeapTuple tuple; ! TransactionId OldestXmin; ! RewriteState rwstate; bool use_wal; /* *************** *** 677,682 **** --- 681,689 ---- nulls = (char *) palloc(natts * sizeof(char)); memset(nulls, 'n', natts * sizeof(char)); + /* Get an OldestXmin to use to weed out dead tuples */ + OldestXmin = GetOldestXmin(OldHeap->rd_rel->relisshared, false); + /* * We need to log the copied data in WAL iff WAL archiving is enabled AND * it's not a temp rel. (Since we know the target relation is new and *************** *** 688,724 **** /* use_wal off requires rd_targblock be initially invalid */ Assert(NewHeap->rd_targblock == InvalidBlockNumber); /* * Scan through the OldHeap on the OldIndex and copy each tuple into the * NewHeap. */ scan = index_beginscan(OldHeap, OldIndex, ! SnapshotNow, 0, (ScanKey) NULL); while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL) { /* * We cannot simply pass the tuple to heap_insert(), for several * reasons: * ! * 1. heap_insert() will overwrite the commit-status fields of the ! * tuple it's handed. This would trash the source relation, which is ! * bad news if we abort later on. (This was a bug in releases thru ! * 7.0) ! * ! * 2. We'd like to squeeze out the values of any dropped columns, both * to save space and to ensure we have no corner-case failures. (It's * possible for example that the new table hasn't got a TOAST table * and so is unable to store any large values of dropped cols.) * ! * 3. The tuple might not even be legal for the new table; this is * currently only known to happen as an after-effect of ALTER TABLE * SET WITHOUT OIDS. * * So, we must reconstruct the tuple from component Datums. */ - HeapTuple copiedTuple; - int i; heap_deformtuple(tuple, oldTupDesc, values, nulls); --- 695,743 ---- /* use_wal off requires rd_targblock be initially invalid */ Assert(NewHeap->rd_targblock == InvalidBlockNumber); + /* Initialize the rewrite operation */ + rwstate = begin_heap_rewrite(NewHeap, OldestXmin, use_wal); + /* * Scan through the OldHeap on the OldIndex and copy each tuple into the * NewHeap. */ scan = index_beginscan(OldHeap, OldIndex, ! SnapshotAny, 0, (ScanKey) NULL); while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL) { + HeapTuple copiedTuple; + int i; + bool isdead; + + LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE); + isdead = HeapTupleSatisfiesVacuum(tuple->t_data, + OldestXmin, + scan->xs_cbuf) == HEAPTUPLE_DEAD; + LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK); + + if(isdead) + { + rewrite_heap_dead_tuple(rwstate, tuple); + continue; + } + /* * We cannot simply pass the tuple to heap_insert(), for several * reasons: * ! * 1. We'd like to squeeze out the values of any dropped columns, both * to save space and to ensure we have no corner-case failures. (It's * possible for example that the new table hasn't got a TOAST table * and so is unable to store any large values of dropped cols.) * ! * 2. The tuple might not even be legal for the new table; this is * currently only known to happen as an after-effect of ALTER TABLE * SET WITHOUT OIDS. * * So, we must reconstruct the tuple from component Datums. */ heap_deformtuple(tuple, oldTupDesc, values, nulls); *************** *** 735,747 **** if (NewHeap->rd_rel->relhasoids) HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple)); ! heap_insert(NewHeap, copiedTuple, mycid, use_wal, false); heap_freetuple(copiedTuple); CHECK_FOR_INTERRUPTS(); } index_endscan(scan); pfree(values); --- 754,768 ---- if (NewHeap->rd_rel->relhasoids) HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple)); ! rewrite_heap_tuple(rwstate, tuple, copiedTuple); heap_freetuple(copiedTuple); CHECK_FOR_INTERRUPTS(); } + end_heap_rewrite(rwstate); + index_endscan(scan); pfree(values); Index: src/backend/commands/rewriteheap.c =================================================================== RCS file: src/backend/commands/rewriteheap.c diff -N src/backend/commands/rewriteheap.c *** /dev/null 1 Jan 1970 00:00:00 -0000 --- src/backend/commands/rewriteheap.c 29 Mar 2007 08:31:00 -0000 *************** *** 0 **** --- 1,543 ---- + /*------------------------------------------------------------------------- + * + * rewriteheap.c + * support functions to rewrite tables. + * + * These functions provide a facility to completely rewrite a heap, while + * preserving visibility information and update chains. + * + * + * INTERFACE + * + * The caller is responsible for creating the new heap, all catalog + * changes, supplying the tuples to be written to the new heap, and + * rebuilding indexes. The caller must hold an AccessExclusiveLock on the + * table. + * + * To use the facility: + * + * begin_heap_rewrite + * while(fetch next tuple) + * { + * if(tuple is dead) + * rewrite_heap_dead_tuple + * else + * { + * do any transformations here if required + * rewrite_heap_tuple + * } + * } + * end_heap_rewrite + * + * The rewrite facility holds the current page it's inserting into pinned + * and locked over rewrite_heap_tuple calls. To avoid deadlock, the caller + * must not access the new relation while a rewrite is in progress. + * + * The contents of the new relation shouldn't be relied on until + * end_heap_rewrite is called. + * + * + * IMPLEMENTATION + * + * While we copy the tuples, we need to keep track of ctid-references to + * handle update chains correctly. Since the new versions of tuples will + * have different ctids than the original ones, if we just copied + * everything to the new table the ctid pointers would be wrong. For each + * ctid reference from A -> B, we can encounter either A first or B first: + * + * If we encounter A first, we'll store the tuple in the unresolved_ctids + * hash table. When we later encounter B, we remove A from the hash table, + * fix the ctid to point to the new location of B, and insert both A and B + * to the new heap. + * + * If we encounter B first, we'll put an entry in the old_new_tid_map hash + * table and insert B to the new heap right away. When we later encounter + * A, we get the new location of B from the table. + * + * Note that a tuple in the middle of a chain is both A and B at the same + * time. Entries in the hash tables can be removed as soon as the later + * tuple is encountered. That helps to keep the memory usage down. At the + * end, both tables are usually empty; we should have encountered both A + * and B for each ctid reference. However, if there's a chain like this: + * RECENTLY_DEAD -> DEAD -> LIVE as determined by HeapTupleSatisfiesVacuum, + * we can encounter the dead tuple first, and skip it, and bump into the + * RECENTLY_DEAD tuple later. The RECENTLY_DEAD tuple would be added + * to unresolved_ctids, and stay there until end of the rewrite. + * + * Using in-memory hash tables means that we use some memory for each live + * update chain in the table, from the time we find one end of the + * reference until we find the other end. That shouldn't be a problem in + * practice, but if you do something like an UPDATE without a where-clause + * on a large table, and then run CLUSTER in the same transaction, you + * might run out of memory. It doesn't seem worthwhile to add support to + * spill-to-disk, there shouldn't be that many RECENTLY_DEAD tuples in a + * table under normal circumstances, and even if we do fail halfway through + * a CLUSTER, the old table is still valid. + * + * We can't use the normal heap_insert function to insert into the new + * heap, because heap_insert overwrites the visibility information. + * We use a special-purpose raw_heap_insert function instead, which + * is optimized for bulk inserting a lot of tuples, knowing that we have + * exclusive access to the heap. raw_heap_insert keeps the current buffer + * we're inserting into locked over calls, until it gets full. The page + * is inserted to WAL as a single record. + * + * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group + * Portions Copyright (c) 1994-5, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include "access/heapam.h" + #include "access/transam.h" + #include "access/tuptoaster.h" + #include "commands/rewriteheap.h" + #include "utils/memutils.h" + + /* + * State associated with a rewrite operation. This is opaque to the user + * of the rewrite facility. + */ + typedef struct RewriteStateData + { + Relation rs_new_rel; /* new heap */ + bool rs_use_wal; /* WAL-log inserts to new heap? */ + MemoryContext rs_cxt; /* for hash tables and entries and + * tuples in them */ + TransactionId rs_oldest_xmin; /* oldest xmin used by caller to + * determine tuple visibility */ + Buffer rs_buf; /* current buffer we're inserting + * to, exclusively locked */ + HTAB *rs_unresolved_ctids; + HTAB *rs_old_new_tid_map; + } RewriteStateData; + + /* entry structures for the hash tables */ + + typedef struct UnresolvedCtidData { + ItemPointerData old_ctid; /* t_ctid-pointer of the tuple in old heap */ + ItemPointerData old_tid; /* location in the old heap */ + HeapTuple tuple; /* tuple contents */ + } UnresolvedCtidData; + + typedef UnresolvedCtidData *UnresolvedCtid; + + typedef struct OldToNewMappingData { + ItemPointerData old_tid; /* location in the old heap */ + ItemPointerData new_tid; /* location in the new heap */ + } OldToNewMappingData; + + typedef OldToNewMappingData *OldToNewMapping; + + + /* prototypes for internal functions */ + static void raw_heap_insert(RewriteState state, HeapTuple tup); + + + /* + * Begin a rewrite of a table + * + * new_heap new, empty heap relation to insert tuples to + * oldest_xmin xid used by the caller to determine which tuples + * are dead. + * use_wal should the inserts to the new heap be WAL-logged + * + * Returns an opaque RewriteState, allocated in current memory context, + * to be used in subseuqent calls to the other functions. + */ + RewriteState + begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin, + bool use_wal) + { + RewriteState state; + HASHCTL hash_ctl; + + state = palloc(sizeof(RewriteStateData)); + state->rs_cxt = AllocSetContextCreate(CurrentMemoryContext, + "Table rewrite", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + state->rs_oldest_xmin = oldest_xmin; + state->rs_new_rel = new_heap; + state->rs_use_wal = use_wal; + state->rs_buf = InvalidBuffer; + + /* Initialize hash tables used to track update chains */ + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(ItemPointerData); + hash_ctl.entrysize = sizeof(UnresolvedCtidData); + hash_ctl.hcxt = state->rs_cxt; + hash_ctl.hash = tag_hash; + + state->rs_unresolved_ctids = + hash_create("Rewrite / Unresolved ctids", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + hash_ctl.entrysize = sizeof(OldToNewMappingData); + + state->rs_old_new_tid_map = + hash_create("Rewrite / Old to new tid map", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + return state; + } + + /* + * End a rewrite. + * + * state and any other resources are freed. + */ + void + end_heap_rewrite(RewriteState state) + { + HASH_SEQ_STATUS seq_status; + UnresolvedCtid unresolved; + + /* Write any remaining tuples in the unresolvedCtids table. If we have + * any left, they should in fact be dead, but let's err on the safe + * side. + */ + hash_seq_init(&seq_status, state->rs_unresolved_ctids); + + while((unresolved = hash_seq_search(&seq_status)) != NULL) + { + ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid); + raw_heap_insert(state, unresolved->tuple); + + /* don't bother removing and pfreeing the entry, we're going + * to free the whole memory context after the scan anyway + */ + } + + /* WAL-log and unlock the last page */ + if(BufferIsValid(state->rs_buf)) + { + if(state->rs_use_wal) + log_newpage(&state->rs_new_rel->rd_node, + BufferGetBlockNumber(state->rs_buf), + BufferGetPage(state->rs_buf)); + + MarkBufferDirty(state->rs_buf); + + UnlockReleaseBuffer(state->rs_buf); + state->rs_buf = InvalidBuffer; + } + + MemoryContextDelete(state->rs_cxt); + pfree(state); + } + + /* + * Register a dead tuple with an ongoing rewrite. Dead tuples are not + * copied to the new table, but we still make note of them so that we + * can release some resources earlier. + */ + void + rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple) + { + /* If we have already seen an earlier tuple in the update chain that + * points to this tuple, let's forget about that earlier tuple. It's + * in fact dead as well, our simple xmax < OldestXmin test in + * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It + * happens when xmin of a tuple is greater than xmax, which sounds + * counter-intuitive but is perfectly valid. + * + * We don't bother to try to detect the situation the other way + * round, when we encounter the dead tuple first and then the + * recently dead one that points to it. If that happens, we'll + * have some unmatched entries in the unresolvedCtids hash table + * at the end. That can happen anyway, because a vacuum might + * have removed the dead tuple in the chain before us. + */ + hash_search(state->rs_unresolved_ctids, + (void *) &(old_tuple->t_self), + HASH_REMOVE, NULL); + } + + /* + * Add a tuple to the new heap. + * + * Visibility information is copied from the original tuple. + * + * state opaque state as returned by begin_heap_rewrite + * old_tuple original tuple in the old heap + * new_tuple new, rewritten tuple to be inserted to new heap + */ + void + rewrite_heap_tuple(RewriteState state, + HeapTuple old_tuple, HeapTuple new_tuple) + { + MemoryContext saved_cxt; + ItemPointerData old_tid; + + saved_cxt = MemoryContextSwitchTo(state->rs_cxt); + + /* Preserve the visibility information. Also preserve the HEAP_UPDATED + * flag; if the original tuple was an updated row version, so is the + * new one. + */ + memcpy(&new_tuple->t_data->t_choice.t_heap, + &old_tuple->t_data->t_choice.t_heap, + sizeof(HeapTupleFields)); + + new_tuple->t_data->t_infomask &= ~(HEAP_XACT_MASK | HEAP_UPDATED); + new_tuple->t_data->t_infomask |= + old_tuple->t_data->t_infomask & (HEAP_XACT_MASK | HEAP_UPDATED); + + /* Invalid ctid means that ctid should point to the tuple itself. + * We'll override it below if the tuple is part of an update chain. + */ + ItemPointerSetInvalid(&new_tuple->t_data->t_ctid); + + /* When we encounter a tuple that has been updated, check + * the old-to-new mapping hash table. If a match is found there, + * we've already copied the tuple that t_ctid points to, so we + * set the ctid of this tuple point to the new location, and insert + * the new tuple. Otherwise we store the tuple in the unresolved_ctids + * hash table and deal with it later, after inserting the tuple t_ctid + * points to. + */ + if (!(old_tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) && + !(ItemPointerEquals(&(old_tuple->t_self), + &(old_tuple->t_data->t_ctid)))) + { + OldToNewMapping mapping; + + mapping = (OldToNewMapping) + hash_search(state->rs_old_new_tid_map, + (void *) &(old_tuple->t_data->t_ctid), + HASH_FIND, NULL); + + if (mapping != NULL) + { + /* We've already copied the tuple t_ctid points to. + * Use the new tid of that tuple */ + new_tuple->t_data->t_ctid = mapping->new_tid; + + hash_search(state->rs_old_new_tid_map, + (void *) &(old_tuple->t_data->t_ctid), + HASH_REMOVE, NULL); + } + else + { + /* We haven't seen the tuple t_ctid points to yet. Stash + * it into unresolved_ctids table and deal with it later. + */ + UnresolvedCtid unresolved; + bool found; + + unresolved = hash_search(state->rs_unresolved_ctids, + (void *) &(old_tuple->t_data->t_ctid), + HASH_ENTER, &found); + Assert(!found); + + unresolved->old_tid = old_tuple->t_self; + unresolved->tuple = heap_copytuple(new_tuple); + + MemoryContextSwitchTo(saved_cxt); + return; + } + } + + old_tid = old_tuple->t_self; + + for(;;) + { + ItemPointerData new_tid; + + /* Insert the tuple */ + raw_heap_insert(state, new_tuple); + new_tid = new_tuple->t_self; + + /* + * When we encounter a tuple that's an updated version + * of a row, check if the previous tuple in the update chain + * is still visible to someone. The previous tuple's xmax + * is equal to this tuples xmin, so we can do the same check + * HeapTupleSatisfiesVacuum would do for the previous tuple, + * using xmin of this tuple. We know that xmin of this tuple + * committed, otherwise this tuple would be dead and the caller + * wouldn't have passed it to us in the first place. + */ + if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) && + !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data), + state->rs_oldest_xmin)) + { + /* The previous tuple in the update chain is RECENTLY_DEAD. + * Let's see if we've seen it already. */ + UnresolvedCtid unresolved; + + unresolved = hash_search(state->rs_unresolved_ctids, + (void *) &old_tid, + HASH_FIND, NULL); + + if (unresolved != NULL) + { + /* We have seen and memorized the previous tuple already. + * Now that we know where we inserted the tuple its t_ctid + * points to, fix its t_ctid and insert it to the new heap. + */ + HeapTuple prev_tup = unresolved->tuple; + ItemPointerData prev_old_tid = unresolved->old_tid; + + prev_tup->t_data->t_ctid = new_tid; + + hash_search(state->rs_unresolved_ctids, + (void *) &old_tid, + HASH_REMOVE, NULL); + + /* loop back to insert the previous tuple in the chain */ + old_tid = prev_old_tid; + new_tuple = prev_tup; + continue; + } + else + { + /* Remember the new tid of this tuple. We'll use it to set + * the ctid when we find the previous tuple in the chain + */ + OldToNewMapping mapping; + bool found; + + mapping = hash_search(state->rs_old_new_tid_map, + (void *) &old_tid, + HASH_ENTER, &found); + Assert(!found); + + mapping->new_tid = new_tid; + } + } + break; + } + + MemoryContextSwitchTo(saved_cxt); + } + + /* + * Insert a tuple to the new relation. + * + * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the + * tuple is invalid on entry, it's replaced with the new TID as well. + */ + static void + raw_heap_insert(RewriteState state, HeapTuple new_tup) + { + BlockNumber blkno = InvalidBlockNumber; + Page page = NULL; + Size pageFreeSpace, saveFreeSpace; + Size len; + OffsetNumber newoff; + HeapTuple tup; + + /* + * If the new tuple is too big for storage or contains already toasted + * out-of-line attributes from some other relation, invoke the toaster. + * + * Note: below this point, tup is the data we actually intend to store + * into the relation; new_tup is the caller's original untoasted data. + */ + if (HeapTupleHasExternal(new_tup) || new_tup->t_len > TOAST_TUPLE_THRESHOLD) + tup = toast_insert_or_update(state->rs_new_rel, new_tup, NULL, + state->rs_use_wal, false); + else + tup = new_tup; + + len = MAXALIGN(tup->t_len); /* be conservative */ + + /* + * If we're gonna fail for oversize tuple, do it right away + */ + if (len > MaxHeapTupleSize) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("row is too big: size %lu, maximum size %lu", + (unsigned long) len, + (unsigned long) MaxHeapTupleSize))); + + /* Compute desired extra freespace due to fillfactor option */ + saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel, + HEAP_DEFAULT_FILLFACTOR); + + /* Now we can check to see if there's enough free space here. */ + if (BufferIsValid(state->rs_buf)) + { + blkno = BufferGetBlockNumber(state->rs_buf); + page = (Page) BufferGetPage(state->rs_buf); + pageFreeSpace = PageGetFreeSpace(page); + + if (len + saveFreeSpace > pageFreeSpace) + { + if(state->rs_use_wal) + log_newpage(&state->rs_new_rel->rd_node, blkno, page); + + MarkBufferDirty(state->rs_buf); + + UnlockReleaseBuffer(state->rs_buf); + state->rs_buf = InvalidBuffer; + } + } + + if (!BufferIsValid(state->rs_buf)) + { + /* There's no current buffer to insert to. Extend relation to get + * a new page. + */ + state->rs_buf = ReadBuffer(state->rs_new_rel, P_NEW); + blkno = BufferGetBlockNumber(state->rs_buf); + page = (Page) BufferGetPage(state->rs_buf); + + LockBuffer(state->rs_buf, BUFFER_LOCK_EXCLUSIVE); + + /* + * We need to initialize the empty new page. Double-check that it + * really is empty (this should never happen, but if it does we + * don't want to risk wiping out valid data). + */ + + if (!PageIsNew((PageHeader) page)) + elog(ERROR, "page %u of relation \"%s\" should be empty but is not", + BufferGetBlockNumber(state->rs_buf), + RelationGetRelationName(state->rs_new_rel)); + + PageInit(page, BufferGetPageSize(state->rs_buf), 0); + } + + /* We now have a page pinned and locked to insert the new tuple to */ + + newoff = PageAddItem(page, (Item) tup->t_data, len, + InvalidOffsetNumber, LP_USED); + if (newoff == InvalidOffsetNumber) + elog(ERROR, "failed to add tuple"); + + /* Update t_self to the actual position where it was stored */ + ItemPointerSet(&(new_tup->t_self), blkno, newoff); + + /* Insert the correct position into CTID of the stored tuple, too, + * if the caller didn't supply a valid CTID. + */ + if(!ItemPointerIsValid(&new_tup->t_data->t_ctid)) + { + ItemId newitemid; + HeapTupleHeader onpage_tup; + + newitemid = PageGetItemId(page, newoff); + onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid); + + onpage_tup->t_ctid = new_tup->t_self; + } + + /* If tup is a private copy, release it. */ + if (tup != new_tup) + heap_freetuple(tup); + } Index: src/backend/commands/tablecmds.c =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/tablecmds.c,v retrieving revision 1.218 diff -c -r1.218 tablecmds.c *** src/backend/commands/tablecmds.c 19 Mar 2007 23:38:29 -0000 1.218 --- src/backend/commands/tablecmds.c 20 Mar 2007 18:59:08 -0000 *************** *** 5855,5890 **** { smgrread(src, blkno, buf); - /* XLOG stuff */ if (use_wal) ! { ! xl_heap_newpage xlrec; ! XLogRecPtr recptr; ! XLogRecData rdata[2]; ! ! /* NO ELOG(ERROR) from here till newpage op is logged */ ! START_CRIT_SECTION(); ! ! xlrec.node = dst->smgr_rnode; ! xlrec.blkno = blkno; ! ! rdata[0].data = (char *) &xlrec; ! rdata[0].len = SizeOfHeapNewpage; ! rdata[0].buffer = InvalidBuffer; ! rdata[0].next = &(rdata[1]); ! ! rdata[1].data = (char *) page; ! rdata[1].len = BLCKSZ; ! rdata[1].buffer = InvalidBuffer; ! rdata[1].next = NULL; ! ! recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata); ! ! PageSetLSN(page, recptr); ! PageSetTLI(page, ThisTimeLineID); ! ! END_CRIT_SECTION(); ! } /* * Now write the page. We say isTemp = true even if it's not a temp --- 5855,5862 ---- { smgrread(src, blkno, buf); if (use_wal) ! log_newpage(&dst->smgr_rnode, blkno, page); /* * Now write the page. We say isTemp = true even if it's not a temp Index: src/include/access/heapam.h =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v retrieving revision 1.121 diff -c -r1.121 heapam.h *** src/include/access/heapam.h 29 Mar 2007 00:15:39 -0000 1.121 --- src/include/access/heapam.h 29 Mar 2007 08:27:34 -0000 *************** *** 194,199 **** --- 194,200 ---- extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid, OffsetNumber *offsets, int offcnt); + extern XLogRecPtr log_newpage(RelFileNode *rnode, BlockNumber blk, Page page); /* in common/heaptuple.c */ extern Size heap_compute_data_size(TupleDesc tupleDesc, Index: src/include/commands/rewriteheap.h =================================================================== RCS file: src/include/commands/rewriteheap.h diff -N src/include/commands/rewriteheap.h *** /dev/null 1 Jan 1970 00:00:00 -0000 --- src/include/commands/rewriteheap.h 20 Mar 2007 17:52:15 -0000 *************** *** 0 **** --- 1,29 ---- + /*------------------------------------------------------------------------- + * + * rewriteheap.h + * header file for heap rewrite support functions + * + * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group + * Portions Copyright (c) 1994-5, Regents of the University of California + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + #ifndef REWRITE_HEAP_H + #define REWRITE_HEAP_H + + #include "access/htup.h" + #include "utils/rel.h" + + /* definition is private to rewriteheap.c */ + typedef struct RewriteStateData *RewriteState; + + extern RewriteState begin_heap_rewrite(Relation NewHeap, + TransactionId OldestXmin, bool use_wal); + extern void end_heap_rewrite(RewriteState state); + extern void rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple); + extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple, + HeapTuple newTuple); + + #endif /* REWRITE_H */ Index: src/test/regress/expected/cluster.out =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/expected/cluster.out,v retrieving revision 1.17 diff -c -r1.17 cluster.out *** src/test/regress/expected/cluster.out 7 Jul 2005 20:40:01 -0000 1.17 --- src/test/regress/expected/cluster.out 20 Mar 2007 16:50:04 -0000 *************** *** 382,389 **** --- 382,428 ---- 2 (2 rows) + -- Test MVCC-safety of cluster. There isn't much we can do to verify the + -- results with a single backend... + CREATE TABLE clustertest (key int PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "clustertest_pkey" for table "clustertest" + INSERT INTO clustertest VALUES (10); + INSERT INTO clustertest VALUES (20); + INSERT INTO clustertest VALUES (30); + INSERT INTO clustertest VALUES (40); + INSERT INTO clustertest VALUES (50); + -- Test update where the old row version is found first in the scan + UPDATE clustertest SET key = 100 WHERE key = 10; + -- Test update where the new row version is found first in the scan + UPDATE clustertest SET key = 35 WHERE key = 40; + -- Test longer update chain + UPDATE clustertest SET key = 60 WHERE key = 50; + UPDATE clustertest SET key = 70 WHERE key = 60; + UPDATE clustertest SET key = 80 WHERE key = 90; + SELECT * FROM clustertest; + key + ----- + 20 + 30 + 100 + 35 + 70 + (5 rows) + + CLUSTER clustertest_pkey ON clustertest; + SELECT * FROM clustertest; + key + ----- + 20 + 30 + 35 + 70 + 100 + (5 rows) + -- clean up \c - + DROP TABLE clustertest; DROP TABLE clstr_1; DROP TABLE clstr_2; DROP TABLE clstr_3; Index: src/test/regress/sql/cluster.sql =================================================================== RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/sql/cluster.sql,v retrieving revision 1.9 diff -c -r1.9 cluster.sql *** src/test/regress/sql/cluster.sql 7 Jul 2005 20:40:02 -0000 1.9 --- src/test/regress/sql/cluster.sql 20 Mar 2007 17:52:48 -0000 *************** *** 153,160 **** --- 153,187 ---- CLUSTER clstr_1; SELECT * FROM clstr_1; + -- Test MVCC-safety of cluster. There isn't much we can do to verify the + -- results with a single backend... + + CREATE TABLE clustertest (key int PRIMARY KEY); + + INSERT INTO clustertest VALUES (10); + INSERT INTO clustertest VALUES (20); + INSERT INTO clustertest VALUES (30); + INSERT INTO clustertest VALUES (40); + INSERT INTO clustertest VALUES (50); + + -- Test update where the old row version is found first in the scan + UPDATE clustertest SET key = 100 WHERE key = 10; + + -- Test update where the new row version is found first in the scan + UPDATE clustertest SET key = 35 WHERE key = 40; + + -- Test longer update chain + UPDATE clustertest SET key = 60 WHERE key = 50; + UPDATE clustertest SET key = 70 WHERE key = 60; + UPDATE clustertest SET key = 80 WHERE key = 90; + + SELECT * FROM clustertest; + CLUSTER clustertest_pkey ON clustertest; + SELECT * FROM clustertest; + -- clean up \c - + DROP TABLE clustertest; DROP TABLE clstr_1; DROP TABLE clstr_2; DROP TABLE clstr_3;
В списке pgsql-patches по дате отправления: