Refactoring xlogutils.c
От | Heikki Linnakangas |
---|---|
Тема | Refactoring xlogutils.c |
Дата | |
Msg-id | 484E9489.3020107@enterprisedb.com обсуждение исходный текст |
Ответы |
Re: Refactoring xlogutils.c
Re: Refactoring xlogutils.c |
Список | pgsql-patches |
Attached is an updated version of my patch to refactor the XLogOpenRelation/XLogReadBuffer interface, in preparation for the relation forks patch, and subsequently the FSM rewrite patch. I'm satisfied enough with it that I plan to commit it in a few days, barring objections. Summary of changes: - XLogOpenRelation is gone. XLogReadBuffer now takes a RelFileNode as argument, instead of Relation. Fix all callers. - For the routines that need a fake, or lightweight, Relation struct anyway, there's a new function called XLogFakeRelcacheEntry(RelFileNode), that returns a palloc'd Relation struct. - Add ReadBufferWithoutRelcache variant of ReadBuffer, that takes RelFileNode instead of Relation as argument. This is what XLogReadBuffer uses internally. The only user-visible changes are these error message changes: 1. "invalid page header in block %u of relation %s" and "unexpected data beyond EOF in block %u of relation \"%s\" messages, emitted from ReadBuffer, no longer print the relation name, but only its relfilenode. (This is because relation name is no longer conveniently available where that message is emitted. It could be passed there if necessary, but it doesn't seem worth the extra complexity) 2. elog("failed to additem to index page in \"%s\""), in gistutil.c. For similar reasons, no longer prints the relation name. Unfortunately, gistfillbuffer() doesn't know the relfilenode either. Again we could pass it, but it doesn't seem worth the extra complexity, given that this is just a "should never happen" elog, and the user should usually know which index is at fault from the context information. -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com *** a/src/backend/access/gin/ginxlog.c --- b/src/backend/access/gin/ginxlog.c *************** *** 71,82 **** static void ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { RelFileNode *node = (RelFileNode *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; ! reln = XLogOpenRelation(*node); ! buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 71,80 ---- ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { RelFileNode *node = (RelFileNode *) XLogRecGetData(record); Buffer buffer; Page page; ! buffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 94,105 **** ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) { ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record); ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree)); - Relation reln; Buffer buffer; Page page; ! reln = XLogOpenRelation(data->node); ! buffer = XLogReadBuffer(reln, data->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 92,101 ---- { ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record); ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree)); Buffer buffer; Page page; ! buffer = XLogReadBuffer(data->node, data->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 118,124 **** static void ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) { ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; --- 114,119 ---- *************** *** 126,133 **** ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(data->node); ! buffer = XLogReadBuffer(reln, data->blkno, false); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 121,127 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(data->node, data->blkno, false); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 228,253 **** static void ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) { ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record); - Relation reln; Buffer lbuffer, rbuffer; Page lpage, rpage; uint32 flags = 0; - reln = XLogOpenRelation(data->node); - if (data->isLeaf) flags |= GIN_LEAF; if (data->isData) flags |= GIN_DATA; ! lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit); Assert(BufferIsValid(lbuffer)); lpage = (Page) BufferGetPage(lbuffer); GinInitBuffer(lbuffer, flags); ! rbuffer = XLogReadBuffer(reln, data->rblkno, true); Assert(BufferIsValid(rbuffer)); rpage = (Page) BufferGetPage(rbuffer); GinInitBuffer(rbuffer, flags); --- 222,244 ---- ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) { ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record); Buffer lbuffer, rbuffer; Page lpage, rpage; uint32 flags = 0; if (data->isLeaf) flags |= GIN_LEAF; if (data->isData) flags |= GIN_DATA; ! lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit); Assert(BufferIsValid(lbuffer)); lpage = (Page) BufferGetPage(lbuffer); GinInitBuffer(lbuffer, flags); ! rbuffer = XLogReadBuffer(data->node, data->rblkno, true); Assert(BufferIsValid(rbuffer)); rpage = (Page) BufferGetPage(rbuffer); GinInitBuffer(rbuffer, flags); *************** *** 319,325 **** ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (data->isRootSplit) { ! Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false); Page rootPage = BufferGetPage(rootBuf); GinInitBuffer(rootBuf, flags & ~GIN_LEAF); --- 310,316 ---- if (data->isRootSplit) { ! Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false); Page rootPage = BufferGetPage(rootBuf); GinInitBuffer(rootBuf, flags & ~GIN_LEAF); *************** *** 352,358 **** static void ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) { ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; --- 343,348 ---- *************** *** 360,367 **** ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(data->node); ! buffer = XLogReadBuffer(reln, data->blkno, false); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 350,356 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(data->node, data->blkno, false); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 403,417 **** static void ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) { ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; - reln = XLogOpenRelation(data->node); - if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! buffer = XLogReadBuffer(reln, data->blkno, false); page = BufferGetPage(buffer); Assert(GinPageIsData(page)); GinPageGetOpaque(page)->flags = GIN_DELETED; --- 392,403 ---- ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) { ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record); Buffer buffer; Page page; if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! buffer = XLogReadBuffer(data->node, data->blkno, false); page = BufferGetPage(buffer); Assert(GinPageIsData(page)); GinPageGetOpaque(page)->flags = GIN_DELETED; *************** *** 423,429 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) if (!(record->xl_info & XLR_BKP_BLOCK_2)) { ! buffer = XLogReadBuffer(reln, data->parentBlkno, false); page = BufferGetPage(buffer); Assert(GinPageIsData(page)); Assert(!GinPageIsLeaf(page)); --- 409,415 ---- if (!(record->xl_info & XLR_BKP_BLOCK_2)) { ! buffer = XLogReadBuffer(data->node, data->parentBlkno, false); page = BufferGetPage(buffer); Assert(GinPageIsData(page)); Assert(!GinPageIsLeaf(page)); *************** *** 436,442 **** ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) { ! buffer = XLogReadBuffer(reln, data->leftBlkno, false); page = BufferGetPage(buffer); Assert(GinPageIsData(page)); GinPageGetOpaque(page)->rightlink = data->rightLink; --- 422,428 ---- if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) { ! buffer = XLogReadBuffer(data->node, data->leftBlkno, false); page = BufferGetPage(buffer); Assert(GinPageIsData(page)); GinPageGetOpaque(page)->rightlink = data->rightLink; *************** *** 557,565 **** ginContinueSplit(ginIncompleteSplit *split) * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno, * split->leftBlkno, split->rightBlkno); */ ! reln = XLogOpenRelation(split->node); ! buffer = XLogReadBuffer(reln, split->leftBlkno, false); if (split->rootBlkno == GIN_ROOT_BLKNO) { --- 543,551 ---- * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno, * split->leftBlkno, split->rightBlkno); */ ! buffer = XLogReadBuffer(split->node, split->leftBlkno, false); ! reln = XLogFakeRelcacheEntry(split->node); if (split->rootBlkno == GIN_ROOT_BLKNO) { *************** *** 581,586 **** ginContinueSplit(ginIncompleteSplit *split) --- 567,574 ---- GinPageGetOpaque(page)->maxoff))->key; } + pfree(reln); + btree.rightblkno = split->rightBlkno; stack.blkno = split->leftBlkno; *** a/src/backend/access/gist/gist.c --- b/src/backend/access/gist/gist.c *************** *** 462,468 **** gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) if (!is_leaf) PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); ! gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); MarkBufferDirty(state->stack->buffer); --- 462,468 ---- if (!is_leaf) PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); ! gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); MarkBufferDirty(state->stack->buffer); *************** *** 1008,1014 **** gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke START_CRIT_SECTION(); GISTInitBuffer(buffer, 0); ! gistfillbuffer(r, page, itup, len, FirstOffsetNumber); MarkBufferDirty(buffer); --- 1008,1014 ---- START_CRIT_SECTION(); GISTInitBuffer(buffer, 0); ! gistfillbuffer(page, itup, len, FirstOffsetNumber); MarkBufferDirty(buffer); *** a/src/backend/access/gist/gistutil.c --- b/src/backend/access/gist/gistutil.c *************** *** 27,37 **** static Datum attrS[INDEX_MAX_KEYS]; static bool isnullS[INDEX_MAX_KEYS]; /* ! * Write itup vector to page, has no control of free space */ ! OffsetNumber ! gistfillbuffer(Relation r, Page page, IndexTuple *itup, ! int len, OffsetNumber off) { OffsetNumber l = InvalidOffsetNumber; int i; --- 27,36 ---- static bool isnullS[INDEX_MAX_KEYS]; /* ! * Write itup vector to page, has no control of free space. */ ! void ! gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off) { OffsetNumber l = InvalidOffsetNumber; int i; *************** *** 42,55 **** gistfillbuffer(Relation r, Page page, IndexTuple *itup, for (i = 0; i < len; i++) { ! l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), ! off, false, false); if (l == InvalidOffsetNumber) ! elog(ERROR, "failed to add item to index page in \"%s\"", ! RelationGetRelationName(r)); off++; } - return l; } /* --- 41,53 ---- for (i = 0; i < len; i++) { ! Size sz = IndexTupleSize(itup[i]); ! l = PageAddItem(page, (Item) itup[i], sz, off, false, false); if (l == InvalidOffsetNumber) ! elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes", ! i, len, sz); off++; } } /* *** a/src/backend/access/gist/gistvacuum.c --- b/src/backend/access/gist/gistvacuum.c *************** *** 403,409 **** gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion) } else /* enough free space */ ! gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber); } } --- 403,409 ---- } else /* enough free space */ ! gistfillbuffer(tempPage, addon, curlenaddon, InvalidOffsetNumber); } } *** a/src/backend/access/gist/gistxlog.c --- b/src/backend/access/gist/gistxlog.c *************** *** 189,195 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); PageUpdateRecord xlrec; - Relation reln; Buffer buffer; Page page; --- 189,194 ---- *************** *** 208,215 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) decodePageUpdateRecord(&xlrec, record); ! reln = XLogOpenRelation(xlrec.data->node); ! buffer = XLogReadBuffer(reln, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 207,213 ---- decodePageUpdateRecord(&xlrec, record); ! buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** *** 234,240 **** gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) /* add tuples */ if (xlrec.len > 0) ! gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber); /* * special case: leafpage, nothing to insert, nothing to delete, then --- 232,238 ---- /* add tuples */ if (xlrec.len > 0) ! gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); /* * special case: leafpage, nothing to insert, nothing to delete, then *************** *** 262,268 **** static void gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) { gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; --- 260,265 ---- *************** *** 270,277 **** gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(xldata->node); ! buffer = XLogReadBuffer(reln, xldata->blkno, false); if (!BufferIsValid(buffer)) return; --- 267,273 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; *************** *** 319,332 **** static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { PageSplitRecord xlrec; - Relation reln; Buffer buffer; Page page; int i; int flags; decodePageSplitRecord(&xlrec, record); - reln = XLogOpenRelation(xlrec.data->node); flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ --- 315,326 ---- *************** *** 334,340 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { NewPage *newpage = xlrec.page + i; ! buffer = XLogReadBuffer(reln, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 328,334 ---- { NewPage *newpage = xlrec.page + i; ! buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 342,348 **** gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) GISTInitBuffer(buffer, flags); /* and fill it */ ! gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); --- 336,342 ---- GISTInitBuffer(buffer, flags); /* and fill it */ ! gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); *************** *** 361,372 **** static void gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { RelFileNode *node = (RelFileNode *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; ! reln = XLogOpenRelation(*node); ! buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 355,364 ---- gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { RelFileNode *node = (RelFileNode *) XLogRecGetData(record); Buffer buffer; Page page; ! buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 602,608 **** gistContinueInsert(gistIncompleteInsert *insert) lenitup; Relation index; ! index = XLogOpenRelation(insert->node); /* * needed vector itup never will be more than initial lenblkno+2, because --- 594,600 ---- lenitup; Relation index; ! index = XLogFakeRelcacheEntry(insert->node); /* * needed vector itup never will be more than initial lenblkno+2, because *************** *** 624,630 **** gistContinueInsert(gistIncompleteInsert *insert) * it was split root, so we should only make new root. it can't be * simple insert into root, we should replace all content of root. */ ! Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true); gistnewroot(index, buffer, itup, lenitup, NULL); UnlockReleaseBuffer(buffer); --- 616,622 ---- * it was split root, so we should only make new root. it can't be * simple insert into root, we should replace all content of root. */ ! Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true); gistnewroot(index, buffer, itup, lenitup, NULL); UnlockReleaseBuffer(buffer); *************** *** 703,709 **** gistContinueInsert(gistIncompleteInsert *insert) LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); ! gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber); numbuffer++; if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) --- 695,701 ---- LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); ! gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber); numbuffer++; if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) *************** *** 749,755 **** gistContinueInsert(gistIncompleteInsert *insert) for (j = 0; j < ntodelete; j++) PageIndexTupleDelete(pages[0], todelete[j]); ! gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber); rdata = formUpdateRdata(index->rd_node, buffers[0], todelete, ntodelete, --- 741,747 ---- for (j = 0; j < ntodelete; j++) PageIndexTupleDelete(pages[0], todelete[j]); ! gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber); rdata = formUpdateRdata(index->rd_node, buffers[0], todelete, ntodelete, *************** *** 794,799 **** gistContinueInsert(gistIncompleteInsert *insert) --- 786,793 ---- } } + pfree(index); + ereport(LOG, (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", insert->node.spcNode, insert->node.dbNode, insert->node.relNode), *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 3944,3950 **** static void heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move) { xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; OffsetNumber *end; --- 3944,3949 ---- *************** *** 3958,3965 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(xlrec->node); ! buffer = XLogReadBuffer(reln, xlrec->block, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 3957,3963 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** *** 3980,3986 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move) Assert(nunused >= 0); /* Update all item pointers per the record, and repair fragmentation */ ! heap_page_prune_execute(reln, buffer, redirected, nredirected, nowdead, ndead, nowunused, nunused, --- 3978,3984 ---- Assert(nunused >= 0); /* Update all item pointers per the record, and repair fragmentation */ ! heap_page_prune_execute(buffer, redirected, nredirected, nowdead, ndead, nowunused, nunused, *************** *** 4002,4016 **** heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record) { xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; - Relation reln; Buffer buffer; Page page; if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(xlrec->node); ! buffer = XLogReadBuffer(reln, xlrec->block, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 4000,4012 ---- { xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; Buffer buffer; Page page; if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** *** 4050,4056 **** static void heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) { xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; --- 4046,4051 ---- *************** *** 4058,4065 **** heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) * Note: the NEWPAGE log record is used for both heaps and indexes, so do * not do anything that assumes we are touching a heap. */ ! reln = XLogOpenRelation(xlrec->node); ! buffer = XLogReadBuffer(reln, xlrec->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 4053,4059 ---- * Note: the NEWPAGE log record is used for both heaps and indexes, so do * not do anything that assumes we are touching a heap. */ ! buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 4076,4082 **** static void heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) { xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; OffsetNumber offnum; --- 4070,4075 ---- *************** *** 4086,4093 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(xlrec->target.node); ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) --- 4079,4085 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) *************** *** 4133,4139 **** static void heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) { xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; OffsetNumber offnum; --- 4125,4130 ---- *************** *** 4149,4159 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; - reln = XLogOpenRelation(xlrec->target.node); - if (record->xl_info & XLOG_HEAP_INIT_PAGE) { ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), true); Assert(BufferIsValid(buffer)); --- 4140,4148 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; if (record->xl_info & XLOG_HEAP_INIT_PAGE) { ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), true); Assert(BufferIsValid(buffer)); *************** *** 4163,4169 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) } else { ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) --- 4152,4158 ---- } else { ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) *************** *** 4216,4222 **** static void heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); - Relation reln = XLogOpenRelation(xlrec->target.node); Buffer buffer; bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == ItemPointerGetBlockNumber(&(xlrec->target.tid))); --- 4205,4210 ---- *************** *** 4242,4248 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update) /* Deal with old tuple version */ ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) --- 4230,4236 ---- /* Deal with old tuple version */ ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) *************** *** 4317,4323 **** newt:; if (record->xl_info & XLOG_HEAP_INIT_PAGE) { ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->newtid)), true); Assert(BufferIsValid(buffer)); --- 4305,4311 ---- if (record->xl_info & XLOG_HEAP_INIT_PAGE) { ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->newtid)), true); Assert(BufferIsValid(buffer)); *************** *** 4327,4333 **** newt:; } else { ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->newtid)), false); if (!BufferIsValid(buffer)) --- 4315,4321 ---- } else { ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->newtid)), false); if (!BufferIsValid(buffer)) *************** *** 4399,4405 **** static void heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) { xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; OffsetNumber offnum; --- 4387,4392 ---- *************** *** 4409,4416 **** heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! reln = XLogOpenRelation(xlrec->target.node); ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) --- 4396,4402 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) *************** *** 4458,4464 **** static void heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) { xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record); - Relation reln = XLogOpenRelation(xlrec->target.node); Buffer buffer; Page page; OffsetNumber offnum; --- 4444,4449 ---- *************** *** 4470,4476 **** heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) --- 4455,4461 ---- if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (!BufferIsValid(buffer)) *** a/src/backend/access/heap/pruneheap.c --- b/src/backend/access/heap/pruneheap.c *************** *** 225,231 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin, * and update the page's hint bit about whether it has free line * pointers. */ ! heap_page_prune_execute(relation, buffer, prstate.redirected, prstate.nredirected, prstate.nowdead, prstate.ndead, prstate.nowunused, prstate.nunused, --- 225,231 ---- * and update the page's hint bit about whether it has free line * pointers. */ ! heap_page_prune_execute(buffer, prstate.redirected, prstate.nredirected, prstate.nowdead, prstate.ndead, prstate.nowunused, prstate.nunused, *************** *** 696,702 **** heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum) * arguments are identical to those of log_heap_clean(). */ void ! heap_page_prune_execute(Relation reln, Buffer buffer, OffsetNumber *redirected, int nredirected, OffsetNumber *nowdead, int ndead, OffsetNumber *nowunused, int nunused, --- 696,702 ---- * arguments are identical to those of log_heap_clean(). */ void ! heap_page_prune_execute(Buffer buffer, OffsetNumber *redirected, int nredirected, OffsetNumber *nowdead, int ndead, OffsetNumber *nowunused, int nunused, *** a/src/backend/access/nbtree/nbtxlog.c --- b/src/backend/access/nbtree/nbtxlog.c *************** *** 150,156 **** _bt_restore_page(Page page, char *from, int len) } static void ! _bt_restore_meta(Relation reln, XLogRecPtr lsn, BlockNumber root, uint32 level, BlockNumber fastroot, uint32 fastlevel) { --- 150,156 ---- } static void ! _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn, BlockNumber root, uint32 level, BlockNumber fastroot, uint32 fastlevel) { *************** *** 159,165 **** _bt_restore_meta(Relation reln, XLogRecPtr lsn, BTMetaPageData *md; BTPageOpaque pageop; ! metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true); Assert(BufferIsValid(metabuf)); metapg = BufferGetPage(metabuf); --- 159,165 ---- BTMetaPageData *md; BTPageOpaque pageop; ! metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true); Assert(BufferIsValid(metabuf)); metapg = BufferGetPage(metabuf); *************** *** 194,200 **** btree_xlog_insert(bool isleaf, bool ismeta, XLogRecPtr lsn, XLogRecord *record) { xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; char *datapos; --- 194,199 ---- *************** *** 220,230 **** btree_xlog_insert(bool isleaf, bool ismeta, if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf) return; /* nothing to do */ - reln = XLogOpenRelation(xlrec->target.node); - if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! buffer = XLogReadBuffer(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (BufferIsValid(buffer)) --- 219,227 ---- if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf) return; /* nothing to do */ if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), false); if (BufferIsValid(buffer)) *************** *** 251,257 **** btree_xlog_insert(bool isleaf, bool ismeta, } if (ismeta) ! _bt_restore_meta(reln, lsn, md.root, md.level, md.fastroot, md.fastlevel); --- 248,254 ---- } if (ismeta) ! _bt_restore_meta(xlrec->target.node, lsn, md.root, md.level, md.fastroot, md.fastlevel); *************** *** 265,271 **** btree_xlog_split(bool onleft, bool isroot, XLogRecPtr lsn, XLogRecord *record) { xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); - Relation reln; Buffer rbuf; Page rpage; BTPageOpaque ropaque; --- 262,267 ---- *************** *** 277,284 **** btree_xlog_split(bool onleft, bool isroot, Item left_hikey = NULL; Size left_hikeysz = 0; - reln = XLogOpenRelation(xlrec->node); - datapos = (char *) xlrec + SizeOfBtreeSplit; datalen = record->xl_len - SizeOfBtreeSplit; --- 273,278 ---- *************** *** 328,334 **** btree_xlog_split(bool onleft, bool isroot, } /* Reconstruct right (new) sibling from scratch */ ! rbuf = XLogReadBuffer(reln, xlrec->rightsib, true); Assert(BufferIsValid(rbuf)); rpage = (Page) BufferGetPage(rbuf); --- 322,328 ---- } /* Reconstruct right (new) sibling from scratch */ ! rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true); Assert(BufferIsValid(rbuf)); rpage = (Page) BufferGetPage(rbuf); *************** *** 369,375 **** btree_xlog_split(bool onleft, bool isroot, */ if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! Buffer lbuf = XLogReadBuffer(reln, xlrec->leftsib, false); if (BufferIsValid(lbuf)) { --- 363,369 ---- */ if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! Buffer lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false); if (BufferIsValid(lbuf)) { *************** *** 439,445 **** btree_xlog_split(bool onleft, bool isroot, /* Fix left-link of the page to the right of the new right sibling */ if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2)) { ! Buffer buffer = XLogReadBuffer(reln, xlrec->rnext, false); if (BufferIsValid(buffer)) { --- 433,439 ---- /* Fix left-link of the page to the right of the new right sibling */ if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2)) { ! Buffer buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false); if (BufferIsValid(buffer)) { *************** *** 468,474 **** static void btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) { xl_btree_delete *xlrec; - Relation reln; Buffer buffer; Page page; BTPageOpaque opaque; --- 462,467 ---- *************** *** 477,484 **** btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record) return; xlrec = (xl_btree_delete *) XLogRecGetData(record); ! reln = XLogOpenRelation(xlrec->node); ! buffer = XLogReadBuffer(reln, xlrec->block, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 470,476 ---- return; xlrec = (xl_btree_delete *) XLogRecGetData(record); ! buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** *** 517,523 **** static void btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) { xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record); - Relation reln; BlockNumber parent; BlockNumber target; BlockNumber leftsib; --- 509,514 ---- *************** *** 526,532 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) Page page; BTPageOpaque pageop; - reln = XLogOpenRelation(xlrec->target.node); parent = ItemPointerGetBlockNumber(&(xlrec->target.tid)); target = xlrec->deadblk; leftsib = xlrec->leftblk; --- 517,522 ---- *************** *** 535,541 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) /* parent page */ if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! buffer = XLogReadBuffer(reln, parent, false); if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); --- 525,531 ---- /* parent page */ if (!(record->xl_info & XLR_BKP_BLOCK_1)) { ! buffer = XLogReadBuffer(xlrec->target.node, parent, false); if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); *************** *** 581,587 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) /* Fix left-link of right sibling */ if (!(record->xl_info & XLR_BKP_BLOCK_2)) { ! buffer = XLogReadBuffer(reln, rightsib, false); if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); --- 571,577 ---- /* Fix left-link of right sibling */ if (!(record->xl_info & XLR_BKP_BLOCK_2)) { ! buffer = XLogReadBuffer(xlrec->target.node, rightsib, false); if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); *************** *** 607,613 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) { if (leftsib != P_NONE) { ! buffer = XLogReadBuffer(reln, leftsib, false); if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); --- 597,603 ---- { if (leftsib != P_NONE) { ! buffer = XLogReadBuffer(xlrec->target.node, leftsib, false); if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); *************** *** 630,636 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) } /* Rewrite target page as empty deleted page */ ! buffer = XLogReadBuffer(reln, target, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 620,626 ---- } /* Rewrite target page as empty deleted page */ ! buffer = XLogReadBuffer(xlrec->target.node, target, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 655,661 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record) memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage, sizeof(xl_btree_metadata)); ! _bt_restore_meta(reln, lsn, md.root, md.level, md.fastroot, md.fastlevel); } --- 645,651 ---- memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage, sizeof(xl_btree_metadata)); ! _bt_restore_meta(xlrec->target.node, lsn, md.root, md.level, md.fastroot, md.fastlevel); } *************** *** 672,685 **** static void btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) { xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record); - Relation reln; Buffer buffer; Page page; BTPageOpaque pageop; BlockNumber downlink = 0; ! reln = XLogOpenRelation(xlrec->node); ! buffer = XLogReadBuffer(reln, xlrec->rootblk, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 662,673 ---- btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) { xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record); Buffer buffer; Page page; BTPageOpaque pageop; BlockNumber downlink = 0; ! buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 711,717 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); ! _bt_restore_meta(reln, lsn, xlrec->rootblk, xlrec->level, xlrec->rootblk, xlrec->level); --- 699,705 ---- MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); ! _bt_restore_meta(xlrec->node, lsn, xlrec->rootblk, xlrec->level, xlrec->rootblk, xlrec->level); *************** *** 904,912 **** btree_xlog_cleanup(void) foreach(l, incomplete_actions) { bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l); - Relation reln; - reln = XLogOpenRelation(action->node); if (action->is_split) { /* finish an incomplete split */ --- 892,898 ---- *************** *** 917,930 **** btree_xlog_cleanup(void) BTPageOpaque lpageop, rpageop; bool is_only; ! lbuf = XLogReadBuffer(reln, action->leftblk, false); /* failure is impossible because we wrote this page earlier */ if (!BufferIsValid(lbuf)) elog(PANIC, "btree_xlog_cleanup: left block unfound"); lpage = (Page) BufferGetPage(lbuf); lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage); ! rbuf = XLogReadBuffer(reln, action->rightblk, false); /* failure is impossible because we wrote this page earlier */ if (!BufferIsValid(rbuf)) elog(PANIC, "btree_xlog_cleanup: right block unfound"); --- 903,917 ---- BTPageOpaque lpageop, rpageop; bool is_only; + Relation reln; ! lbuf = XLogReadBuffer(action->node, action->leftblk, false); /* failure is impossible because we wrote this page earlier */ if (!BufferIsValid(lbuf)) elog(PANIC, "btree_xlog_cleanup: left block unfound"); lpage = (Page) BufferGetPage(lbuf); lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage); ! rbuf = XLogReadBuffer(action->node, action->rightblk, false); /* failure is impossible because we wrote this page earlier */ if (!BufferIsValid(rbuf)) elog(PANIC, "btree_xlog_cleanup: right block unfound"); *************** *** 934,951 **** btree_xlog_cleanup(void) /* if the pages are all of their level, it's a only-page split */ is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop); _bt_insert_parent(reln, lbuf, rbuf, NULL, action->is_root, is_only); } else { /* finish an incomplete deletion (of a half-dead page) */ Buffer buf; ! buf = XLogReadBuffer(reln, action->delblk, false); if (BufferIsValid(buf)) if (_bt_pagedel(reln, buf, NULL, true) == 0) elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed"); } } incomplete_actions = NIL; --- 921,946 ---- /* if the pages are all of their level, it's a only-page split */ is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop); + reln = XLogFakeRelcacheEntry(action->node); _bt_insert_parent(reln, lbuf, rbuf, NULL, action->is_root, is_only); + pfree(reln); } else { /* finish an incomplete deletion (of a half-dead page) */ Buffer buf; ! buf = XLogReadBuffer(action->node, action->delblk, false); if (BufferIsValid(buf)) + { + Relation reln; + + reln = XLogFakeRelcacheEntry(action->node); if (_bt_pagedel(reln, buf, NULL, true) == 0) elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed"); + pfree(reln); + } } } incomplete_actions = NIL; *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 2840,2846 **** CleanupBackupHistory(void) static void RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) { - Relation reln; Buffer buffer; Page page; BkpBlock bkpb; --- 2840,2845 ---- *************** *** 2856,2863 **** RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) memcpy(&bkpb, blk, sizeof(BkpBlock)); blk += sizeof(BkpBlock); ! reln = XLogOpenRelation(bkpb.node); ! buffer = XLogReadBuffer(reln, bkpb.block, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 2855,2861 ---- memcpy(&bkpb, blk, sizeof(BkpBlock)); blk += sizeof(BkpBlock); ! buffer = XLogReadBuffer(bkpb.node, bkpb.block, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *************** *** 5064,5072 **** StartupXLOG(void) BACKUP_LABEL_FILE, BACKUP_LABEL_OLD))); } ! /* Start up the recovery environment */ ! XLogInitRelationCache(); ! for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { if (RmgrTable[rmid].rm_startup != NULL) --- 5062,5068 ---- BACKUP_LABEL_FILE, BACKUP_LABEL_OLD))); } ! /* Initialize resource managers */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { if (RmgrTable[rmid].rm_startup != NULL) *************** *** 5330,5340 **** StartupXLOG(void) * allows some extra error checking in xlog_redo. */ CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); - - /* - * Close down recovery environment - */ - XLogCloseRelationCache(); } /* --- 5326,5331 ---- *** a/src/backend/access/transam/xlogutils.c --- b/src/backend/access/transam/xlogutils.c *************** *** 190,195 **** XLogCheckInvalidPages(void) --- 190,198 ---- if (foundone) elog(PANIC, "WAL contains references to invalid pages"); + + hash_destroy(invalid_page_tab); + invalid_page_tab = NULL; } *************** *** 218,244 **** XLogCheckInvalidPages(void) * at the end of WAL replay.) */ Buffer ! XLogReadBuffer(Relation reln, BlockNumber blkno, bool init) { ! BlockNumber lastblock = RelationGetNumberOfBlocks(reln); Buffer buffer; Assert(blkno != P_NEW); if (blkno < lastblock) { /* page exists in file */ ! if (init) ! buffer = ReadOrZeroBuffer(reln, blkno); ! else ! buffer = ReadBuffer(reln, blkno); } else { /* hm, page doesn't exist in file */ if (!init) { ! log_invalid_page(reln->rd_node, blkno, false); return InvalidBuffer; } /* OK to extend the file */ --- 221,260 ---- * at the end of WAL replay.) */ Buffer ! XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init) { ! BlockNumber lastblock; Buffer buffer; + SMgrRelation smgr; Assert(blkno != P_NEW); + /* Open the relation at smgr level */ + smgr = smgropen(rnode); + + /* + * Create the target file if it doesn't already exist. This lets us cope + * if the replay sequence contains writes to a relation that is later + * deleted. (The original coding of this routine would instead suppress + * the writes, but that seems like it risks losing valuable data if the + * filesystem loses an inode during a crash. Better to write the data + * until we are actually told to delete the file.) + */ + smgrcreate(smgr, false, true); + + lastblock = smgrnblocks(smgr); + if (blkno < lastblock) { /* page exists in file */ ! buffer = ReadBufferWithoutRelcache(rnode, false, blkno, init); } else { /* hm, page doesn't exist in file */ if (!init) { ! log_invalid_page(rnode, blkno, false); return InvalidBuffer; } /* OK to extend the file */ *************** *** 249,255 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init) { if (buffer != InvalidBuffer) ReleaseBuffer(buffer); ! buffer = ReadBuffer(reln, P_NEW); lastblock++; } Assert(BufferGetBlockNumber(buffer) == blkno); --- 265,271 ---- { if (buffer != InvalidBuffer) ReleaseBuffer(buffer); ! buffer = ReadBufferWithoutRelcache(rnode, false, P_NEW, false); lastblock++; } Assert(BufferGetBlockNumber(buffer) == blkno); *************** *** 265,271 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init) if (PageIsNew((PageHeader) page)) { UnlockReleaseBuffer(buffer); ! log_invalid_page(reln->rd_node, blkno, true); return InvalidBuffer; } } --- 281,287 ---- if (PageIsNew((PageHeader) page)) { UnlockReleaseBuffer(buffer); ! log_invalid_page(rnode, blkno, true); return InvalidBuffer; } } *************** *** 275,500 **** XLogReadBuffer(Relation reln, BlockNumber blkno, bool init) /* ! * Lightweight "Relation" cache --- this substitutes for the normal relcache ! * during XLOG replay. */ ! ! typedef struct XLogRelDesc ! { ! RelationData reldata; ! struct XLogRelDesc *lessRecently; ! struct XLogRelDesc *moreRecently; ! } XLogRelDesc; ! ! typedef struct XLogRelCacheEntry { ! RelFileNode rnode; ! XLogRelDesc *rdesc; ! } XLogRelCacheEntry; ! static HTAB *_xlrelcache; ! static XLogRelDesc *_xlrelarr = NULL; ! static Form_pg_class _xlpgcarr = NULL; ! static int _xlast = 0; ! static int _xlcnt = 0; ! ! #define _XLOG_RELCACHESIZE 512 ! ! static void ! _xl_init_rel_cache(void) ! { ! HASHCTL ctl; ! ! _xlcnt = _XLOG_RELCACHESIZE; ! _xlast = 0; ! _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt); ! memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt); ! _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt); ! memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt); ! ! _xlrelarr[0].moreRecently = &(_xlrelarr[0]); ! _xlrelarr[0].lessRecently = &(_xlrelarr[0]); ! ! memset(&ctl, 0, sizeof(ctl)); ! ctl.keysize = sizeof(RelFileNode); ! ctl.entrysize = sizeof(XLogRelCacheEntry); ! ctl.hash = tag_hash; ! ! _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE, ! &ctl, HASH_ELEM | HASH_FUNCTION); ! } ! ! static void ! _xl_remove_hash_entry(XLogRelDesc *rdesc) ! { ! Form_pg_class tpgc = rdesc->reldata.rd_rel; ! XLogRelCacheEntry *hentry; ! ! rdesc->lessRecently->moreRecently = rdesc->moreRecently; ! rdesc->moreRecently->lessRecently = rdesc->lessRecently; ! ! hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache, ! (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL); ! if (hentry == NULL) ! elog(PANIC, "_xl_remove_hash_entry: file was not found in cache"); ! ! RelationCloseSmgr(&(rdesc->reldata)); ! ! memset(rdesc, 0, sizeof(XLogRelDesc)); ! memset(tpgc, 0, sizeof(FormData_pg_class)); ! rdesc->reldata.rd_rel = tpgc; ! } ! ! static XLogRelDesc * ! _xl_new_reldesc(void) ! { ! XLogRelDesc *res; ! ! _xlast++; ! if (_xlast < _xlcnt) ! { ! _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]); ! return &(_xlrelarr[_xlast]); ! } ! ! /* reuse */ ! res = _xlrelarr[0].moreRecently; ! ! _xl_remove_hash_entry(res); ! ! _xlast--; ! return res; ! } ! ! ! void ! XLogInitRelationCache(void) ! { ! _xl_init_rel_cache(); ! invalid_page_tab = NULL; ! } ! ! void ! XLogCloseRelationCache(void) ! { ! HASH_SEQ_STATUS status; ! XLogRelCacheEntry *hentry; ! ! if (!_xlrelarr) ! return; ! ! hash_seq_init(&status, _xlrelcache); ! ! while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL) ! _xl_remove_hash_entry(hentry->rdesc); ! ! hash_destroy(_xlrelcache); ! ! free(_xlrelarr); ! free(_xlpgcarr); ! ! _xlrelarr = NULL; ! } /* ! * Open a relation during XLOG replay * ! * Note: this once had an API that allowed NULL return on failure, but it ! * no longer does; any failure results in elog(). */ Relation ! XLogOpenRelation(RelFileNode rnode) { ! XLogRelDesc *res; ! XLogRelCacheEntry *hentry; ! bool found; ! hentry = (XLogRelCacheEntry *) ! hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL); ! if (hentry) ! { ! res = hentry->rdesc; ! res->lessRecently->moreRecently = res->moreRecently; ! res->moreRecently->lessRecently = res->lessRecently; ! } ! else ! { ! res = _xl_new_reldesc(); ! ! sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode); ! ! res->reldata.rd_node = rnode; ! ! /* ! * We set up the lockRelId in case anything tries to lock the dummy ! * relation. Note that this is fairly bogus since relNode may be ! * different from the relation's OID. It shouldn't really matter ! * though, since we are presumably running by ourselves and can't have ! * any lock conflicts ... ! */ ! res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode; ! res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode; ! ! hentry = (XLogRelCacheEntry *) ! hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found); ! ! if (found) ! elog(PANIC, "xlog relation already present on insert into cache"); ! ! hentry->rdesc = res; ! ! res->reldata.rd_targblock = InvalidBlockNumber; ! res->reldata.rd_smgr = NULL; ! RelationOpenSmgr(&(res->reldata)); ! ! /* ! * Create the target file if it doesn't already exist. This lets us ! * cope if the replay sequence contains writes to a relation that is ! * later deleted. (The original coding of this routine would instead ! * return NULL, causing the writes to be suppressed. But that seems ! * like it risks losing valuable data if the filesystem loses an inode ! * during a crash. Better to write the data until we are actually ! * told to delete the file.) ! */ ! smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true); ! } ! res->moreRecently = &(_xlrelarr[0]); ! res->lessRecently = _xlrelarr[0].lessRecently; ! _xlrelarr[0].lessRecently = res; ! res->lessRecently->moreRecently = res; ! return &(res->reldata); } /* * Drop a relation during XLOG replay * ! * This is called when the relation is about to be deleted; we need to ensure ! * that there is no dangling smgr reference in the xlog relation cache. ! * ! * Currently, we don't bother to physically remove the relation from the ! * cache, we just let it age out normally. ! * ! * This also takes care of removing any open "invalid-page" records for ! * the relation. */ void XLogDropRelation(RelFileNode rnode) { ! XLogRelCacheEntry *hentry; ! ! hentry = (XLogRelCacheEntry *) ! hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL); ! ! if (hentry) ! { ! XLogRelDesc *rdesc = hentry->rdesc; ! ! RelationCloseSmgr(&(rdesc->reldata)); ! } forget_invalid_pages(rnode, 0); } --- 291,365 ---- /* ! * Struct actually returned by XLogFakeRelcacheEntry, though the declared ! * return type is Relation. */ ! typedef struct { ! RelationData reldata; /* Note: this must be first */ ! FormData_pg_class pgc; ! } FakeRelCacheEntryData; ! typedef FakeRelCacheEntryData *FakeRelCacheEntry; /* ! * Create a fake relation cache entry for a physical relation * ! * It's often convenient to use the same functions in XLOG replay as in the ! * main codepath, but those functions typically work with a relcache entry. ! * We don't have a working relation cache during XLOG replay, but this ! * function can be used to create a fake relcache entry instead. Only the ! * fields related to physical storage, like rd_rel, are initialized, so the ! * fake entry is only usable in low-level operations. ! * ! * Caller must pfree() the returned entry. */ Relation ! XLogFakeRelcacheEntry(RelFileNode rnode) { ! FakeRelCacheEntry fakeentry; ! Relation rel; ! /* ! * We allocate the RelationData struct and all related space in one ! * block, so that the caller can free it with one simple pfree call. ! */ ! fakeentry = palloc0(sizeof(FakeRelCacheEntryData)); ! rel = (Relation) fakeentry; ! rel->rd_rel = &fakeentry->pgc; ! rel->rd_node = rnode; ! /* We don't know the name of the relation; use relfilenode instead */ ! sprintf(RelationGetRelationName(rel), "%u", rnode.relNode); ! ! /* ! * We set up the lockRelId in case anything tries to lock the dummy ! * relation. Note that this is fairly bogus since relNode may be ! * different from the relation's OID. It shouldn't really matter ! * though, since we are presumably running by ourselves and can't have ! * any lock conflicts ... ! */ ! rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode; ! rel->rd_lockInfo.lockRelId.relId = rnode.relNode; ! rel->rd_targblock = InvalidBlockNumber; ! rel->rd_smgr = NULL; ! return rel; } /* * Drop a relation during XLOG replay * ! * This is called when the relation is about to be deleted; we need to remove ! * any open "invalid-page" records for the relation. */ void XLogDropRelation(RelFileNode rnode) { ! /* Tell smgr to forget about this relation as well */ ! smgrclosenode(rnode); forget_invalid_pages(rnode, 0); } *************** *** 507,524 **** XLogDropRelation(RelFileNode rnode) void XLogDropDatabase(Oid dbid) { ! HASH_SEQ_STATUS status; ! XLogRelCacheEntry *hentry; ! ! hash_seq_init(&status, _xlrelcache); ! ! while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL) ! { ! XLogRelDesc *rdesc = hentry->rdesc; ! ! if (hentry->rnode.dbNode == dbid) ! RelationCloseSmgr(&(rdesc->reldata)); ! } forget_invalid_pages_db(dbid); } --- 372,385 ---- void XLogDropDatabase(Oid dbid) { ! /* ! * This is unnecessarily heavy-handed, as it will close SMgrRelation ! * objects for other databases as well. DROP DATABASE occurs seldom ! * enough that it's not worth introducing a variant of smgrclose for ! * just this purpose. XXX: Or should we rather leave the smgr entries ! * dangling? ! */ ! smgrcloseall(); forget_invalid_pages_db(dbid); } *************** *** 526,533 **** XLogDropDatabase(Oid dbid) /* * Truncate a relation during XLOG replay * ! * We don't need to do anything to the fake relcache, but we do need to ! * clean up any open "invalid-page" records for the dropped pages. */ void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks) --- 387,393 ---- /* * Truncate a relation during XLOG replay * ! * We need to clean up any open "invalid-page" records for the dropped pages. */ void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks) *** a/src/backend/commands/sequence.c --- b/src/backend/commands/sequence.c *************** *** 1332,1338 **** void seq_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - Relation reln; Buffer buffer; Page page; char *item; --- 1332,1337 ---- *************** *** 1343,1350 **** seq_redo(XLogRecPtr lsn, XLogRecord *record) if (info != XLOG_SEQ_LOG) elog(PANIC, "seq_redo: unknown op code %u", info); ! reln = XLogOpenRelation(xlrec->node); ! buffer = XLogReadBuffer(reln, 0, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); --- 1342,1348 ---- if (info != XLOG_SEQ_LOG) elog(PANIC, "seq_redo: unknown op code %u", info); ! buffer = XLogReadBuffer(xlrec->node, 0, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); *** a/src/backend/storage/buffer/bufmgr.c --- b/src/backend/storage/buffer/bufmgr.c *************** *** 76,84 **** static bool IsForInput; static volatile BufferDesc *PinCountWaitBuf = NULL; ! static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum, ! bool zeroPage, ! BufferAccessStrategy strategy); static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(volatile BufferDesc *buf); static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner); --- 76,85 ---- static volatile BufferDesc *PinCountWaitBuf = NULL; ! static Buffer ReadBuffer_relcache(Relation reln, BlockNumber blockNum, ! bool zeroPage, BufferAccessStrategy strategy); ! static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf, BlockNumber blockNum, ! bool zeroPage, BufferAccessStrategy strategy, bool *hit); static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(volatile BufferDesc *buf); static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner); *************** *** 89,95 **** static bool StartBufferIO(volatile BufferDesc *buf, bool forInput); static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits); static void buffer_write_error_callback(void *arg); ! static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr); static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); --- 90,96 ---- static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits); static void buffer_write_error_callback(void *arg); ! static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr); static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); *************** *** 114,120 **** static void AtProcExit_Buffers(int code, Datum arg); Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_common(reln, blockNum, false, NULL); } /* --- 115,121 ---- Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_relcache(reln, blockNum, false, NULL); } /* *************** *** 125,131 **** Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum, BufferAccessStrategy strategy) { ! return ReadBuffer_common(reln, blockNum, false, strategy); } /* --- 126,132 ---- ReadBufferWithStrategy(Relation reln, BlockNumber blockNum, BufferAccessStrategy strategy) { ! return ReadBuffer_relcache(reln, blockNum, false, strategy); } /* *************** *** 142,182 **** ReadBufferWithStrategy(Relation reln, BlockNumber blockNum, Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_common(reln, blockNum, true, NULL); } /* ! * ReadBuffer_common -- common logic for ReadBuffer variants */ static Buffer ! ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, ! BufferAccessStrategy strategy) { volatile BufferDesc *bufHdr; Block bufBlock; bool found; bool isExtend; ! bool isLocalBuf; /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); isExtend = (blockNum == P_NEW); - isLocalBuf = reln->rd_istemp; - - /* Open it at the smgr level if not already done */ - RelationOpenSmgr(reln); /* Substitute proper block number if caller asked for P_NEW */ if (isExtend) ! blockNum = smgrnblocks(reln->rd_smgr); ! ! pgstat_count_buffer_read(reln); if (isLocalBuf) { ReadLocalBufferCount++; ! bufHdr = LocalBufferAlloc(reln, blockNum, &found); if (found) LocalBufferHitCount++; } --- 143,221 ---- Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum) { ! return ReadBuffer_relcache(reln, blockNum, true, NULL); } /* ! * ReadBufferWithoutRelcache -- like ReadBuffer, but can be used to read ! * read a page without having a relcache entry. If zeroPage is true, ! * this behaves like ReadOrZeroBuffer rather than ReadBuffer. ! */ ! Buffer ! ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp, ! BlockNumber blockNum, bool zeroPage) ! { ! bool hit; ! ! SMgrRelation smgr = smgropen(rnode); ! return ReadBuffer_common(smgr, isTemp, blockNum, zeroPage, NULL, &hit); ! } ! ! /* ! * ReadBuffer_relcache -- common logic for ReadBuffer-variants that ! * operate on a Relation. ! */ ! static Buffer ! ReadBuffer_relcache(Relation reln, BlockNumber blockNum, ! bool zeroPage, BufferAccessStrategy strategy) ! { ! bool hit; ! Buffer buf; ! ! /* Open it at the smgr level if not already done */ ! RelationOpenSmgr(reln); ! ! /* ! * Read the buffer, and update pgstat counters to reflect a cache ! * hit or miss. ! */ ! pgstat_count_buffer_read(reln); ! buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, ! zeroPage, strategy, &hit); ! if (hit) ! pgstat_count_buffer_hit(reln); ! return buf; ! } ! ! /* ! * ReadBuffer_common -- common logic for all ReadBuffer variants ! * ! * *hit is set to true if the request was satisfied from shared buffer cache. */ static Buffer ! ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, BlockNumber blockNum, ! bool zeroPage, BufferAccessStrategy strategy, bool *hit) { volatile BufferDesc *bufHdr; Block bufBlock; bool found; bool isExtend; ! ! *hit = false; /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); isExtend = (blockNum == P_NEW); /* Substitute proper block number if caller asked for P_NEW */ if (isExtend) ! blockNum = smgrnblocks(smgr); if (isLocalBuf) { ReadLocalBufferCount++; ! bufHdr = LocalBufferAlloc(smgr, blockNum, &found); if (found) LocalBufferHitCount++; } *************** *** 188,194 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, * lookup the buffer. IO_IN_PROGRESS is set if the requested block is * not currently in memory. */ ! bufHdr = BufferAlloc(reln, blockNum, strategy, &found); if (found) BufferHitCount++; } --- 227,233 ---- * lookup the buffer. IO_IN_PROGRESS is set if the requested block is * not currently in memory. */ ! bufHdr = BufferAlloc(smgr, blockNum, strategy, &found); if (found) BufferHitCount++; } *************** *** 201,207 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, if (!isExtend) { /* Just need to update stats before we exit */ ! pgstat_count_buffer_hit(reln); if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; --- 240,246 ---- if (!isExtend) { /* Just need to update stats before we exit */ ! *hit = true; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; *************** *** 225,232 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); if (!PageIsNew((PageHeader) bufBlock)) ereport(ERROR, ! (errmsg("unexpected data beyond EOF in block %u of relation \"%s\"", ! blockNum, RelationGetRelationName(reln)), errhint("This has been seen to occur with buggy kernels; consider updating your system."))); /* --- 264,271 ---- bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); if (!PageIsNew((PageHeader) bufBlock)) ereport(ERROR, ! (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u", ! blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode), errhint("This has been seen to occur with buggy kernels; consider updating your system."))); /* *************** *** 278,285 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, { /* new buffers are zero-filled */ MemSet((char *) bufBlock, 0, BLCKSZ); ! smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock, ! reln->rd_istemp); } else { --- 317,324 ---- { /* new buffers are zero-filled */ MemSet((char *) bufBlock, 0, BLCKSZ); ! smgrextend(smgr, blockNum, (char *) bufBlock, ! isLocalBuf); } else { *************** *** 290,296 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, if (zeroPage) MemSet((char *) bufBlock, 0, BLCKSZ); else ! smgrread(reln->rd_smgr, blockNum, (char *) bufBlock); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) bufBlock)) { --- 329,335 ---- if (zeroPage) MemSet((char *) bufBlock, 0, BLCKSZ); else ! smgrread(smgr, blockNum, (char *) bufBlock); /* check for garbage data */ if (!PageHeaderIsValid((PageHeader) bufBlock)) { *************** *** 298,312 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, { ereport(WARNING, (errcode(ERRCODE_DATA_CORRUPTED), ! errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page", ! blockNum, RelationGetRelationName(reln)))); MemSet((char *) bufBlock, 0, BLCKSZ); } else ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), ! errmsg("invalid page header in block %u of relation \"%s\"", ! blockNum, RelationGetRelationName(reln)))); } } --- 337,356 ---- { ereport(WARNING, (errcode(ERRCODE_DATA_CORRUPTED), ! errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page", ! blockNum, ! smgr->smgr_rnode.spcNode, ! smgr->smgr_rnode.dbNode, ! smgr->smgr_rnode.relNode))); MemSet((char *) bufBlock, 0, BLCKSZ); } else ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), ! errmsg("invalid page header in block %u of relation %u/%u/%u", ! blockNum, smgr->smgr_rnode.spcNode, ! smgr->smgr_rnode.dbNode, ! smgr->smgr_rnode.relNode))); } } *************** *** 347,353 **** ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage, * No locks are held either at entry or exit. */ static volatile BufferDesc * ! BufferAlloc(Relation reln, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr) --- 391,397 ---- * No locks are held either at entry or exit. */ static volatile BufferDesc * ! BufferAlloc(SMgrRelation smgr, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr) *************** *** 364,370 **** BufferAlloc(Relation reln, bool valid; /* create a tag so we can lookup the buffer */ ! INIT_BUFFERTAG(newTag, reln, blockNum); /* determine its hash code and partition lock ID */ newHash = BufTableHashCode(&newTag); --- 408,414 ---- bool valid; /* create a tag so we can lookup the buffer */ ! INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum); /* determine its hash code and partition lock ID */ newHash = BufTableHashCode(&newTag); *** a/src/backend/storage/buffer/localbuf.c --- b/src/backend/storage/buffer/localbuf.c *************** *** 61,67 **** static Block GetLocalBufferStorage(void); * (hence, usage_count is always advanced). */ BufferDesc * ! LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) { BufferTag newTag; /* identity of requested block */ LocalBufferLookupEnt *hresult; --- 61,67 ---- * (hence, usage_count is always advanced). */ BufferDesc * ! LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr) { BufferTag newTag; /* identity of requested block */ LocalBufferLookupEnt *hresult; *************** *** 70,76 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) int trycounter; bool found; ! INIT_BUFFERTAG(newTag, reln, blockNum); /* Initialize local buffers if first request in this session */ if (LocalBufHash == NULL) --- 70,76 ---- int trycounter; bool found; ! INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum); /* Initialize local buffers if first request in this session */ if (LocalBufHash == NULL) *************** *** 87,93 **** LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag)); #ifdef LBDEBUG fprintf(stderr, "LB ALLOC (%u,%d) %d\n", ! RelationGetRelid(reln), blockNum, -b - 1); #endif /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) --- 87,93 ---- Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag)); #ifdef LBDEBUG fprintf(stderr, "LB ALLOC (%u,%d) %d\n", ! smgr->smgr_rnode.relNode, blockNum, -b - 1); #endif /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) *** a/src/backend/storage/smgr/md.c --- b/src/backend/storage/smgr/md.c *************** *** 208,216 **** mdcreate(SMgrRelation reln, bool isRedo) char *path; File fd; - if (isRedo && reln->md_fd != NULL) - return; /* created and opened already... */ - Assert(reln->md_fd == NULL); path = relpath(reln->smgr_rnode); --- 208,213 ---- *** a/src/backend/storage/smgr/smgr.c --- b/src/backend/storage/smgr/smgr.c *************** *** 330,335 **** smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo) --- 330,338 ---- xl_smgr_create xlrec; PendingRelDelete *pending; + if (isRedo && reln->md_fd != NULL) + return; /* created and opened already... */ + /* * We may be using the target table space for the first time in this * database, so create a per-database subdirectory if needed. *** a/src/backend/utils/init/flatfiles.c --- b/src/backend/utils/init/flatfiles.c *************** *** 704,715 **** BuildFlatFiles(bool database_only) rel_authid, rel_authmem; - /* - * We don't have any hope of running a real relcache, but we can use the - * same fake-relcache facility that WAL replay uses. - */ - XLogInitRelationCache(); - /* Need a resowner to keep the heapam and buffer code happy */ owner = ResourceOwnerCreate(NULL, "BuildFlatFiles"); CurrentResourceOwner = owner; --- 704,709 ---- *************** *** 719,727 **** BuildFlatFiles(bool database_only) rnode.dbNode = 0; rnode.relNode = DatabaseRelationId; ! /* No locking is needed because no one else is alive yet */ ! rel_db = XLogOpenRelation(rnode); write_database_file(rel_db, true); if (!database_only) { --- 713,727 ---- rnode.dbNode = 0; rnode.relNode = DatabaseRelationId; ! /* ! * We don't have any hope of running a real relcache, but we can use the ! * same fake-relcache facility that WAL replay uses. ! * ! * No locking is needed because no one else is alive yet. ! */ ! rel_db = XLogFakeRelcacheEntry(rnode); write_database_file(rel_db, true); + pfree(rel_db); if (!database_only) { *************** *** 729,749 **** BuildFlatFiles(bool database_only) rnode.spcNode = GLOBALTABLESPACE_OID; rnode.dbNode = 0; rnode.relNode = AuthIdRelationId; ! rel_authid = XLogOpenRelation(rnode); /* hard-wired path to pg_auth_members */ rnode.spcNode = GLOBALTABLESPACE_OID; rnode.dbNode = 0; rnode.relNode = AuthMemRelationId; ! rel_authmem = XLogOpenRelation(rnode); write_auth_file(rel_authid, rel_authmem); } CurrentResourceOwner = NULL; ResourceOwnerDelete(owner); - - XLogCloseRelationCache(); } --- 729,749 ---- rnode.spcNode = GLOBALTABLESPACE_OID; rnode.dbNode = 0; rnode.relNode = AuthIdRelationId; ! rel_authid = XLogFakeRelcacheEntry(rnode); /* hard-wired path to pg_auth_members */ rnode.spcNode = GLOBALTABLESPACE_OID; rnode.dbNode = 0; rnode.relNode = AuthMemRelationId; ! rel_authmem = XLogFakeRelcacheEntry(rnode); write_auth_file(rel_authid, rel_authmem); + pfree(rel_authid); + pfree(rel_authmem); } CurrentResourceOwner = NULL; ResourceOwnerDelete(owner); } *** a/src/include/access/gist_private.h --- b/src/include/access/gist_private.h *************** *** 284,291 **** extern bool gistfitpage(IndexTuple *itvec, int len); extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace); extern void gistcheckpage(Relation rel, Buffer buf); extern Buffer gistNewBuffer(Relation r); ! extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, ! int len, OffsetNumber off); extern IndexTuple *gistextractpage(Page page, int *len /* out */ ); extern IndexTuple *gistjoinvector( IndexTuple *itvec, int *len, --- 284,291 ---- extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace); extern void gistcheckpage(Relation rel, Buffer buf); extern Buffer gistNewBuffer(Relation r); ! extern void gistfillbuffer(Page page, IndexTuple *itup, int len, ! OffsetNumber off); extern IndexTuple *gistextractpage(Page page, int *len /* out */ ); extern IndexTuple *gistjoinvector( IndexTuple *itvec, int *len, *** a/src/include/access/heapam.h --- b/src/include/access/heapam.h *************** *** 124,130 **** extern void heap_page_prune_opt(Relation relation, Buffer buffer, extern int heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin, bool redirect_move, bool report_stats); ! extern void heap_page_prune_execute(Relation reln, Buffer buffer, OffsetNumber *redirected, int nredirected, OffsetNumber *nowdead, int ndead, OffsetNumber *nowunused, int nunused, --- 124,130 ---- extern int heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin, bool redirect_move, bool report_stats); ! extern void heap_page_prune_execute(Buffer buffer, OffsetNumber *redirected, int nredirected, OffsetNumber *nowdead, int ndead, OffsetNumber *nowunused, int nunused, *** a/src/include/access/xlogutils.h --- b/src/include/access/xlogutils.h *************** *** 12,29 **** #define XLOG_UTILS_H #include "storage/buf.h" #include "utils/rel.h" - extern void XLogInitRelationCache(void); extern void XLogCheckInvalidPages(void); - extern void XLogCloseRelationCache(void); ! extern Relation XLogOpenRelation(RelFileNode rnode); extern void XLogDropRelation(RelFileNode rnode); extern void XLogDropDatabase(Oid dbid); extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks); ! extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init); #endif --- 12,29 ---- #define XLOG_UTILS_H #include "storage/buf.h" + #include "storage/smgr.h" #include "utils/rel.h" extern void XLogCheckInvalidPages(void); ! extern Relation XLogFakeRelcacheEntry(RelFileNode rnode); ! extern void XLogDropRelation(RelFileNode rnode); extern void XLogDropDatabase(Oid dbid); extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks); ! extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init); #endif *** a/src/include/storage/buf_internals.h --- b/src/include/storage/buf_internals.h *************** *** 18,23 **** --- 18,24 ---- #include "storage/buf.h" #include "storage/lwlock.h" #include "storage/shmem.h" + #include "storage/smgr.h" #include "storage/spin.h" #include "utils/rel.h" *************** *** 75,83 **** typedef struct buftag (a).blockNum = InvalidBlockNumber \ ) ! #define INIT_BUFFERTAG(a,xx_reln,xx_blockNum) \ ( \ ! (a).rnode = (xx_reln)->rd_node, \ (a).blockNum = (xx_blockNum) \ ) --- 76,84 ---- (a).blockNum = InvalidBlockNumber \ ) ! #define INIT_BUFFERTAG(a,xx_rnode,xx_blockNum) \ ( \ ! (a).rnode = (xx_rnode), \ (a).blockNum = (xx_blockNum) \ ) *************** *** 201,207 **** extern int BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id); extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode); /* localbuf.c */ ! extern BufferDesc *LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr); extern void MarkLocalBufferDirty(Buffer buffer); extern void DropRelFileNodeLocalBuffers(RelFileNode rnode, --- 202,208 ---- extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode); /* localbuf.c */ ! extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, BlockNumber blockNum, bool *foundPtr); extern void MarkLocalBufferDirty(Buffer buffer); extern void DropRelFileNodeLocalBuffers(RelFileNode rnode, *** a/src/include/storage/bufmgr.h --- b/src/include/storage/bufmgr.h *************** *** 145,150 **** extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); --- 145,152 ---- extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum, BufferAccessStrategy strategy); extern Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum); + extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp, + BlockNumber blockNum, bool zeroPage); extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern void MarkBufferDirty(Buffer buffer);
В списке pgsql-patches по дате отправления:
Предыдущее
От: Alvaro HerreraДата:
Сообщение: Re: Tentative patch for making DROP put dependency info in DETAIL