I wrote:
> I think a better fix for the opclass API would be to do what I suggested
> there:
>> * Perhaps it'd be a good idea to move the loop over scankeys to inside
>> the opclass consistent methods, ie call them just once to check all the
>> scankeys. Then we could meaningfully define zero scankeys as a full
>> index scan, and we would also get rid of redundant value reconstruction
>> work when there's more than one scankey.
I've committed that ...
> I'm less sure about what to do to store nulls, but one idea is to have a
> separate SPGiST tree storing only nulls and descending from its own root
> page, similar to the idea in this patch of having a separate root page
> for nulls. It'd be a tad less efficient than GIN-based storage for
> large numbers of nulls, but you probably don't want to use SPGiST to
> index columns with lots of nulls anyway.
... and attached is a WIP patch that handles nulls as a separate SPGiST
tree. Tuple layouts are the same as before, but each page is marked as
to whether it stores nulls or non-nulls. The cache mechanism is
modified to keep separate sets of cached pages for the regular and nulls
trees. I'm fairly happy with this from a code cleanliness point of
view, and it passes the regression tests included in your patch (which
I didn't append here). It still needs doc updates, and also I think
that the WAL logic is probably broken --- the page-stores-nulls flag
will probably need to be added to most of SPGiST's WAL record types.
regards, tom lane
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 85704762a6f242d48298f24b2d894f2cc5db9790..0fe57ac97bdffeff5bac3ca93e423e415125ac56 100644
*** a/src/backend/access/spgist/spgdoinsert.c
--- b/src/backend/access/spgist/spgdoinsert.c
*************** addLeafTuple(Relation index, SpGistState
*** 224,230 ****
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
! current->blkno == SPGIST_HEAD_BLKNO)
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
--- 224,230 ----
START_CRIT_SECTION();
if (current->offnum == InvalidOffsetNumber ||
! SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
*************** checkSplitConditions(Relation index, SpG
*** 337,343 ****
n = 0,
totalSize = 0;
! if (current->blkno == SPGIST_HEAD_BLKNO)
{
/* return impossible values to force split */
*nToSplit = BLCKSZ;
--- 337,343 ----
n = 0,
totalSize = 0;
! if (SpGistBlockIsRoot(current->blkno))
{
/* return impossible values to force split */
*nToSplit = BLCKSZ;
*************** checkSplitConditions(Relation index, SpG
*** 386,392 ****
static void
moveLeafs(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
! SpGistLeafTuple newLeafTuple)
{
int i,
nDelete,
--- 386,392 ----
static void
moveLeafs(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
! SpGistLeafTuple newLeafTuple, bool isNulls)
{
int i,
nDelete,
*************** moveLeafs(Relation index, SpGistState *s
*** 451,457 ****
}
/* Find a leaf page that will hold them */
! nbuf = SpGistGetBuffer(index, GBUF_LEAF, size, &xlrec.newPage);
npage = BufferGetPage(nbuf);
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
--- 451,458 ----
}
/* Find a leaf page that will hold them */
! nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
! size, &xlrec.newPage);
npage = BufferGetPage(nbuf);
nblkno = BufferGetBlockNumber(nbuf);
Assert(nblkno != current->blkno);
*************** checkAllTheSame(spgPickSplitIn *in, spgP
*** 674,680 ****
static bool
doPickSplit(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
! SpGistLeafTuple newLeafTuple, int level, bool isNew)
{
bool insertedNew = false;
spgPickSplitIn in;
--- 675,682 ----
static bool
doPickSplit(Relation index, SpGistState *state,
SPPageDesc *current, SPPageDesc *parent,
! SpGistLeafTuple newLeafTuple,
! int level, bool isNulls, bool isNew)
{
bool insertedNew = false;
spgPickSplitIn in;
*************** doPickSplit(Relation index, SpGistState
*** 737,743 ****
nToInsert = 0;
nToDelete = 0;
spaceToDelete = 0;
! if (current->blkno == SPGIST_HEAD_BLKNO)
{
/*
* We are splitting the root (which up to now is also a leaf page).
--- 739,745 ----
nToInsert = 0;
nToDelete = 0;
spaceToDelete = 0;
! if (SpGistBlockIsRoot(current->blkno))
{
/*
* We are splitting the root (which up to now is also a leaf page).
*************** doPickSplit(Relation index, SpGistState
*** 813,838 ****
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
in.nTuples++;
- /*
- * Perform split using user-defined method.
- */
memset(&out, 0, sizeof(out));
! procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
! FunctionCall2Coll(procinfo,
! index->rd_indcollation[0],
! PointerGetDatum(&in),
! PointerGetDatum(&out));
! /*
! * Form new leaf tuples and count up the total space needed.
! */
! totalLeafSizes = 0;
! for (i = 0; i < in.nTuples; i++)
{
! newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
! out.leafTupleDatums[i]);
! totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
/*
--- 815,867 ----
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
in.nTuples++;
memset(&out, 0, sizeof(out));
! if (!isNulls)
! {
! /*
! * Perform split using user-defined method.
! */
! procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
! FunctionCall2Coll(procinfo,
! index->rd_indcollation[0],
! PointerGetDatum(&in),
! PointerGetDatum(&out));
! /*
! * Form new leaf tuples and count up the total space needed.
! */
! totalLeafSizes = 0;
! for (i = 0; i < in.nTuples; i++)
! {
! newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
! out.leafTupleDatums[i],
! false);
! totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
! }
! }
! else
{
! /*
! * Perform dummy split that puts all tuples into one node.
! * checkAllTheSame will override this and force allTheSame mode.
! */
! out.hasPrefix = false;
! out.nNodes = 1;
! out.nodeLabels = NULL;
! out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
!
! /*
! * Form new leaf tuples and count up the total space needed.
! */
! totalLeafSizes = 0;
! for (i = 0; i < in.nTuples; i++)
! {
! newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
! (Datum) 0,
! true);
! totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
! }
}
/*
*************** doPickSplit(Relation index, SpGistState
*** 872,882 ****
for (i = 0; i < out.nNodes; i++)
{
Datum label = (Datum) 0;
! bool isnull = (out.nodeLabels == NULL);
! if (!isnull)
label = out.nodeLabels[i];
! nodes[i] = spgFormNodeTuple(state, label, isnull);
}
innerTuple = spgFormInnerTuple(state,
out.hasPrefix, out.prefixDatum,
--- 901,911 ----
for (i = 0; i < out.nNodes; i++)
{
Datum label = (Datum) 0;
! bool labelisnull = (out.nodeLabels == NULL);
! if (!labelisnull)
label = out.nodeLabels[i];
! nodes[i] = spgFormNodeTuple(state, label, labelisnull);
}
innerTuple = spgFormInnerTuple(state,
out.hasPrefix, out.prefixDatum,
*************** doPickSplit(Relation index, SpGistState
*** 914,920 ****
*/
xlrec.initInner = false;
if (parent->buffer != InvalidBuffer &&
! parent->blkno != SPGIST_HEAD_BLKNO &&
(SpGistPageGetFreeSpace(parent->page, 1) >=
innerTuple->size + sizeof(ItemIdData)))
{
--- 943,949 ----
*/
xlrec.initInner = false;
if (parent->buffer != InvalidBuffer &&
! !SpGistBlockIsRoot(parent->blkno) &&
(SpGistPageGetFreeSpace(parent->page, 1) >=
innerTuple->size + sizeof(ItemIdData)))
{
*************** doPickSplit(Relation index, SpGistState
*** 925,931 ****
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer(index,
! GBUF_INNER_PARITY(parent->blkno + 1),
innerTuple->size + sizeof(ItemIdData),
&xlrec.initInner);
}
--- 954,961 ----
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer(index,
! GBUF_INNER_PARITY(parent->blkno + 1) |
! (isNulls ? GBUF_NULLS : 0),
innerTuple->size + sizeof(ItemIdData),
&xlrec.initInner);
}
*************** doPickSplit(Relation index, SpGistState
*** 958,964 ****
* must all go somewhere else.
*----------
*/
! if (current->blkno != SPGIST_HEAD_BLKNO)
currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
else
currentFreeSpace = 0; /* prevent assigning any tuples to current */
--- 988,994 ----
* must all go somewhere else.
*----------
*/
! if (!SpGistBlockIsRoot(current->blkno))
currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
else
currentFreeSpace = 0; /* prevent assigning any tuples to current */
*************** doPickSplit(Relation index, SpGistState
*** 996,1002 ****
int curspace;
int newspace;
! newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF,
Min(totalLeafSizes,
SPGIST_PAGE_CAPACITY),
&xlrec.initDest);
--- 1026,1033 ----
int curspace;
int newspace;
! newLeafBuffer = SpGistGetBuffer(index,
! GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
Min(totalLeafSizes,
SPGIST_PAGE_CAPACITY),
&xlrec.initDest);
*************** doPickSplit(Relation index, SpGistState
*** 1091,1097 ****
* the root; in that case there's no need because we'll re-init the page
* below. We do this first to make room for reinserting new leaf tuples.
*/
! if (current->blkno != SPGIST_HEAD_BLKNO)
{
/*
* Init buffer instead of deleting individual tuples, but only if
--- 1122,1128 ----
* the root; in that case there's no need because we'll re-init the page
* below. We do this first to make room for reinserting new leaf tuples.
*/
! if (!SpGistBlockIsRoot(current->blkno))
{
/*
* Init buffer instead of deleting individual tuples, but only if
*************** doPickSplit(Relation index, SpGistState
*** 1102,1108 ****
nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
PageGetMaxOffsetNumber(current->page))
{
! SpGistInitBuffer(current->buffer, SPGIST_LEAF);
xlrec.initSrc = true;
}
else if (isNew)
--- 1133,1140 ----
nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
PageGetMaxOffsetNumber(current->page))
{
! SpGistInitBuffer(current->buffer,
! SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
xlrec.initSrc = true;
}
else if (isNew)
*************** doPickSplit(Relation index, SpGistState
*** 1317,1326 ****
* Splitting root page, which was a leaf but now becomes inner page
* (and so "current" continues to point at it)
*/
! Assert(current->blkno == SPGIST_HEAD_BLKNO);
Assert(redirectTuplePos == InvalidOffsetNumber);
! SpGistInitBuffer(current->buffer, 0);
xlrec.initInner = true;
xlrec.blknoInner = current->blkno;
--- 1349,1358 ----
* Splitting root page, which was a leaf but now becomes inner page
* (and so "current" continues to point at it)
*/
! Assert(SpGistBlockIsRoot(current->blkno));
Assert(redirectTuplePos == InvalidOffsetNumber);
! SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
xlrec.initInner = true;
xlrec.blknoInner = current->blkno;
*************** spgAddNodeAction(Relation index, SpGistS
*** 1461,1466 ****
--- 1493,1501 ----
XLogRecData rdata[5];
spgxlogAddNode xlrec;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/* Construct new inner tuple with additional node */
newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
*************** spgAddNodeAction(Relation index, SpGistS
*** 1527,1533 ****
* allow only one inner tuple on the root page, and spgFormInnerTuple
* always checks that inner tuples don't exceed the size of a page.
*/
! if (current->blkno == SPGIST_HEAD_BLKNO)
elog(ERROR, "cannot enlarge root tuple any more");
Assert(parent->buffer != InvalidBuffer);
--- 1562,1568 ----
* allow only one inner tuple on the root page, and spgFormInnerTuple
* always checks that inner tuples don't exceed the size of a page.
*/
! if (SpGistBlockIsRoot(current->blkno))
elog(ERROR, "cannot enlarge root tuple any more");
Assert(parent->buffer != InvalidBuffer);
*************** spgSplitNodeAction(Relation index, SpGis
*** 1657,1662 ****
--- 1692,1700 ----
spgxlogSplitTuple xlrec;
Buffer newBuffer = InvalidBuffer;
+ /* Should not be applied to nulls */
+ Assert(!SpGistPageStoresNulls(current->page));
+
/*
* Construct new prefix tuple, containing a single node with the
* specified label. (We'll update the node's downlink to point to the
*************** spgSplitNodeAction(Relation index, SpGis
*** 1709,1715 ****
* For the space calculation, note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry.
*/
! if (current->blkno == SPGIST_HEAD_BLKNO ||
SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
{
--- 1747,1753 ----
* For the space calculation, note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry.
*/
! if (SpGistBlockIsRoot(current->blkno) ||
SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
{
*************** spgSplitNodeAction(Relation index, SpGis
*** 1804,1810 ****
*/
void
spgdoinsert(Relation index, SpGistState *state,
! ItemPointer heapPtr, Datum datum)
{
int level = 0;
Datum leafDatum;
--- 1842,1848 ----
*/
void
spgdoinsert(Relation index, SpGistState *state,
! ItemPointer heapPtr, Datum datum, bool isnull)
{
int level = 0;
Datum leafDatum;
*************** spgdoinsert(Relation index, SpGistState
*** 1817,1823 ****
* value to be inserted is not toasted; FormIndexDatum doesn't guarantee
* that.
*/
! if (state->attType.attlen == -1)
datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
leafDatum = datum;
--- 1855,1861 ----
* value to be inserted is not toasted; FormIndexDatum doesn't guarantee
* that.
*/
! if (!isnull && state->attType.attlen == -1)
datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
leafDatum = datum;
*************** spgdoinsert(Relation index, SpGistState
*** 1828,1835 ****
* If it isn't gonna fit, and the opclass can't reduce the datum size by
* suffixing, bail out now rather than getting into an endless loop.
*/
! leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
! SpGistGetTypeSize(&state->attType, leafDatum);
if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
ereport(ERROR,
--- 1866,1876 ----
* If it isn't gonna fit, and the opclass can't reduce the datum size by
* suffixing, bail out now rather than getting into an endless loop.
*/
! if (!isnull)
! leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
! SpGistGetTypeSize(&state->attType, leafDatum);
! else
! leafSize = SGDTSIZE + sizeof(ItemIdData);
if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
ereport(ERROR,
*************** spgdoinsert(Relation index, SpGistState
*** 1840,1847 ****
RelationGetRelationName(index)),
errhint("Values larger than a buffer page cannot be indexed.")));
! /* Initialize "current" to the root page */
! current.blkno = SPGIST_HEAD_BLKNO;
current.buffer = InvalidBuffer;
current.page = NULL;
current.offnum = FirstOffsetNumber;
--- 1881,1888 ----
RelationGetRelationName(index)),
errhint("Values larger than a buffer page cannot be indexed.")));
! /* Initialize "current" to the appropriate root page */
! current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
current.buffer = InvalidBuffer;
current.page = NULL;
current.offnum = FirstOffsetNumber;
*************** spgdoinsert(Relation index, SpGistState
*** 1873,1882 ****
* for doPickSplit to always have a leaf page at hand; so just
* quietly limit our request to a page size.
*/
! current.buffer = SpGistGetBuffer(index, GBUF_LEAF,
! Min(leafSize,
! SPGIST_PAGE_CAPACITY),
! &isNew);
current.blkno = BufferGetBlockNumber(current.buffer);
}
else if (parent.buffer == InvalidBuffer ||
--- 1914,1924 ----
* for doPickSplit to always have a leaf page at hand; so just
* quietly limit our request to a page size.
*/
! current.buffer =
! SpGistGetBuffer(index,
! GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
! Min(leafSize, SPGIST_PAGE_CAPACITY),
! &isNew);
current.blkno = BufferGetBlockNumber(current.buffer);
}
else if (parent.buffer == InvalidBuffer ||
*************** spgdoinsert(Relation index, SpGistState
*** 1892,1904 ****
}
current.page = BufferGetPage(current.buffer);
if (SpGistPageIsLeaf(current.page))
{
SpGistLeafTuple leafTuple;
int nToSplit,
sizeToSplit;
! leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
--- 1934,1952 ----
}
current.page = BufferGetPage(current.buffer);
+ /* should not arrive at a page of the wrong type */
+ if (isnull ? !SpGistPageStoresNulls(current.page) :
+ SpGistPageStoresNulls(current.page))
+ elog(ERROR, "SPGiST index page %u has wrong nulls flag",
+ current.blkno);
+
if (SpGistPageIsLeaf(current.page))
{
SpGistLeafTuple leafTuple;
int nToSplit,
sizeToSplit;
! leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
*************** spgdoinsert(Relation index, SpGistState
*** 1918,1931 ****
* chain to another leaf page rather than splitting it.
*/
Assert(!isNew);
! moveLeafs(index, state, ¤t, &parent, leafTuple);
break; /* we're done */
}
else
{
/* picksplit */
if (doPickSplit(index, state, ¤t, &parent,
! leafTuple, level, isNew))
break; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
--- 1966,1979 ----
* chain to another leaf page rather than splitting it.
*/
Assert(!isNew);
! moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
break; /* we're done */
}
else
{
/* picksplit */
if (doPickSplit(index, state, ¤t, &parent,
! leafTuple, level, isnull, isNew))
break; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
*************** spgdoinsert(Relation index, SpGistState
*** 1972,1982 ****
memset(&out, 0, sizeof(out));
! procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
! FunctionCall2Coll(procinfo,
! index->rd_indcollation[0],
! PointerGetDatum(&in),
! PointerGetDatum(&out));
if (innerTuple->allTheSame)
{
--- 2020,2039 ----
memset(&out, 0, sizeof(out));
! if (!isnull)
! {
! /* use user-defined choose method */
! procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
! FunctionCall2Coll(procinfo,
! index->rd_indcollation[0],
! PointerGetDatum(&in),
! PointerGetDatum(&out));
! }
! else
! {
! /* force "match" action (to insert to random subnode) */
! out.resultType = spgMatchNode;
! }
if (innerTuple->allTheSame)
{
*************** spgdoinsert(Relation index, SpGistState
*** 2001,2009 ****
/* Adjust level as per opclass request */
level += out.result.matchNode.levelAdd;
/* Replace leafDatum and recompute leafSize */
! leafDatum = out.result.matchNode.restDatum;
! leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
! SpGistGetTypeSize(&state->attType, leafDatum);
/*
* Loop around and attempt to insert the new leafDatum
--- 2058,2069 ----
/* Adjust level as per opclass request */
level += out.result.matchNode.levelAdd;
/* Replace leafDatum and recompute leafSize */
! if (!isnull)
! {
! leafDatum = out.result.matchNode.restDatum;
! leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
! SpGistGetTypeSize(&state->attType, leafDatum);
! }
/*
* Loop around and attempt to insert the new leafDatum
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index cbcf655674ac5434fab090fcef0c4ca134bbbf94..8ff9245e179ac71063804b79db08cb9ca35a2c9e 100644
*** a/src/backend/access/spgist/spginsert.c
--- b/src/backend/access/spgist/spginsert.c
*************** spgistBuildCallback(Relation index, Heap
*** 38,55 ****
bool *isnull, bool tupleIsAlive, void *state)
{
SpGistBuildState *buildstate = (SpGistBuildState *) state;
! /* SPGiST doesn't index nulls */
! if (*isnull == false)
! {
! /* Work in temp context, and reset it after each tuple */
! MemoryContext oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
! spgdoinsert(index, &buildstate->spgstate, &htup->t_self, *values);
! MemoryContextSwitchTo(oldCtx);
! MemoryContextReset(buildstate->tmpCtx);
! }
}
/*
--- 38,52 ----
bool *isnull, bool tupleIsAlive, void *state)
{
SpGistBuildState *buildstate = (SpGistBuildState *) state;
+ MemoryContext oldCtx;
! /* Work in temp context, and reset it after each tuple */
! oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
! spgdoinsert(index, &buildstate->spgstate, &htup->t_self, *values, *isnull);
! MemoryContextSwitchTo(oldCtx);
! MemoryContextReset(buildstate->tmpCtx);
}
/*
*************** spgbuild(PG_FUNCTION_ARGS)
*** 65,84 ****
double reltuples;
SpGistBuildState buildstate;
Buffer metabuffer,
! rootbuffer;
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
/*
! * Initialize the meta page and root page
*/
metabuffer = SpGistNewBuffer(index);
rootbuffer = SpGistNewBuffer(index);
Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
! Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_HEAD_BLKNO);
START_CRIT_SECTION();
--- 62,84 ----
double reltuples;
SpGistBuildState buildstate;
Buffer metabuffer,
! rootbuffer,
! nullbuffer;
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
/*
! * Initialize the meta page and root pages
*/
metabuffer = SpGistNewBuffer(index);
rootbuffer = SpGistNewBuffer(index);
+ nullbuffer = SpGistNewBuffer(index);
Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
! Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO);
! Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO);
START_CRIT_SECTION();
*************** spgbuild(PG_FUNCTION_ARGS)
*** 86,91 ****
--- 86,93 ----
MarkBufferDirty(metabuffer);
SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
MarkBufferDirty(rootbuffer);
+ SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
+ MarkBufferDirty(nullbuffer);
if (RelationNeedsWAL(index))
{
*************** spgbuild(PG_FUNCTION_ARGS)
*** 104,115 ****
--- 106,120 ----
PageSetTLI(BufferGetPage(metabuffer), ThisTimeLineID);
PageSetLSN(BufferGetPage(rootbuffer), recptr);
PageSetTLI(BufferGetPage(rootbuffer), ThisTimeLineID);
+ PageSetLSN(BufferGetPage(nullbuffer), recptr);
+ PageSetTLI(BufferGetPage(nullbuffer), ThisTimeLineID);
}
END_CRIT_SECTION();
UnlockReleaseBuffer(metabuffer);
UnlockReleaseBuffer(rootbuffer);
+ UnlockReleaseBuffer(nullbuffer);
/*
* Now insert all the heap data into the index
*************** spgbuildempty(PG_FUNCTION_ARGS)
*** 159,169 ****
/* Likewise for the root page. */
SpGistInitPage(page, SPGIST_LEAF);
! smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_HEAD_BLKNO,
(char *) page, true);
if (XLogIsNeeded())
log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
! SPGIST_HEAD_BLKNO, page);
/*
* An immediate sync is required even if we xlog'd the pages, because the
--- 164,183 ----
/* Likewise for the root page. */
SpGistInitPage(page, SPGIST_LEAF);
! smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO,
(char *) page, true);
if (XLogIsNeeded())
log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
! SPGIST_ROOT_BLKNO, page);
!
! /* Likewise for the null-tuples root page. */
! SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
!
! smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO,
! (char *) page, true);
! if (XLogIsNeeded())
! log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
! SPGIST_NULL_BLKNO, page);
/*
* An immediate sync is required even if we xlog'd the pages, because the
*************** spginsert(PG_FUNCTION_ARGS)
*** 194,203 ****
MemoryContext oldCtx;
MemoryContext insertCtx;
- /* SPGiST doesn't index nulls */
- if (*isnull)
- PG_RETURN_BOOL(false);
-
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"SP-GiST insert temporary context",
ALLOCSET_DEFAULT_MINSIZE,
--- 208,213 ----
*************** spginsert(PG_FUNCTION_ARGS)
*** 207,213 ****
initSpGistState(&spgstate, index);
! spgdoinsert(index, &spgstate, ht_ctid, *values);
SpGistUpdateMetaPage(index);
--- 217,223 ----
initSpGistState(&spgstate, index);
! spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull);
SpGistUpdateMetaPage(index);
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 99b0852611fbc7454d3e1ef04d7446d89f12b380..7a3a96230d176b6d13e78a771373a55c04802f4f 100644
*** a/src/backend/access/spgist/spgscan.c
--- b/src/backend/access/spgist/spgscan.c
***************
*** 23,28 ****
--- 23,31 ----
#include "utils/memutils.h"
+ typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
+ Datum leafValue, bool isnull, bool recheck);
+
typedef struct ScanStackEntry
{
Datum reconstructedValue; /* value reconstructed from parent */
*************** resetSpGistScanOpaque(SpGistScanOpaque s
*** 66,79 ****
freeScanStack(so);
! Assert(!so->searchNulls); /* XXX fixme */
if (so->searchNonNulls)
{
/* Stack a work item to scan the non-null index entries */
startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
! ItemPointerSet(&startEntry->ptr, SPGIST_HEAD_BLKNO, FirstOffsetNumber);
! so->scanStack = list_make1(startEntry);
}
if (so->want_itup)
--- 69,88 ----
freeScanStack(so);
! if (so->searchNulls)
! {
! /* Stack a work item to scan the null index entries */
! startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
! ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
! so->scanStack = lappend(so->scanStack, startEntry);
! }
if (so->searchNonNulls)
{
/* Stack a work item to scan the non-null index entries */
startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
! ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
! so->scanStack = lappend(so->scanStack, startEntry);
}
if (so->want_itup)
*************** spgrestrpos(PG_FUNCTION_ARGS)
*** 243,264 ****
}
/*
! * Test whether a leaf datum satisfies all the scan keys
*
* *leafValue is set to the reconstructed datum, if provided
* *recheck is set true if any of the operators are lossy
*/
static bool
! spgLeafTest(Relation index, SpGistScanOpaque so, Datum leafDatum,
int level, Datum reconstructedValue,
Datum *leafValue, bool *recheck)
{
bool result;
spgLeafConsistentIn in;
spgLeafConsistentOut out;
FmgrInfo *procinfo;
MemoryContext oldCtx;
/* use temp context for calling leaf_consistent */
oldCtx = MemoryContextSwitchTo(so->tempCxt);
--- 252,286 ----
}
/*
! * Test whether a leaf tuple satisfies all the scan keys
*
* *leafValue is set to the reconstructed datum, if provided
* *recheck is set true if any of the operators are lossy
*/
static bool
! spgLeafTest(Relation index, SpGistScanOpaque so,
! SpGistLeafTuple leafTuple, bool isnull,
int level, Datum reconstructedValue,
Datum *leafValue, bool *recheck)
{
bool result;
+ Datum leafDatum;
spgLeafConsistentIn in;
spgLeafConsistentOut out;
FmgrInfo *procinfo;
MemoryContext oldCtx;
+ if (isnull)
+ {
+ /* Should not have arrived on a nulls page unless nulls are wanted */
+ Assert(so->searchNulls);
+ *leafValue = (Datum) 0;
+ *recheck = false;
+ return true;
+ }
+
+ leafDatum = SGLTDATUM(leafTuple, &so->state);
+
/* use temp context for calling leaf_consistent */
oldCtx = MemoryContextSwitchTo(so->tempCxt);
*************** spgLeafTest(Relation index, SpGistScanOp
*** 295,301 ****
*/
static void
spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
! void (*storeRes) (SpGistScanOpaque, ItemPointer, Datum, bool))
{
Buffer buffer = InvalidBuffer;
bool reportedSome = false;
--- 317,323 ----
*/
static void
spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
! storeRes_func storeRes)
{
Buffer buffer = InvalidBuffer;
bool reportedSome = false;
*************** spgWalk(Relation index, SpGistScanOpaque
*** 306,311 ****
--- 328,334 ----
BlockNumber blkno;
OffsetNumber offset;
Page page;
+ bool isnull;
/* Pull next to-do item from the list */
if (so->scanStack == NIL)
*************** redirect:
*** 336,341 ****
--- 359,366 ----
page = BufferGetPage(buffer);
+ isnull = SpGistPageStoresNulls(page) ? true : false;
+
if (SpGistPageIsLeaf(page))
{
SpGistLeafTuple leafTuple;
*************** redirect:
*** 343,349 ****
Datum leafValue = (Datum) 0;
bool recheck = false;
! if (blkno == SPGIST_HEAD_BLKNO)
{
/* When root is a leaf, examine all its tuples */
for (offset = FirstOffsetNumber; offset <= max; offset++)
--- 368,374 ----
Datum leafValue = (Datum) 0;
bool recheck = false;
! if (SpGistBlockIsRoot(blkno))
{
/* When root is a leaf, examine all its tuples */
for (offset = FirstOffsetNumber; offset <= max; offset++)
*************** redirect:
*** 359,371 ****
Assert(ItemPointerIsValid(&leafTuple->heapPtr));
if (spgLeafTest(index, so,
! SGLTDATUM(leafTuple, &so->state),
stackEntry->level,
stackEntry->reconstructedValue,
&leafValue,
&recheck))
{
! storeRes(so, &leafTuple->heapPtr, leafValue, recheck);
reportedSome = true;
}
}
--- 384,397 ----
Assert(ItemPointerIsValid(&leafTuple->heapPtr));
if (spgLeafTest(index, so,
! leafTuple, isnull,
stackEntry->level,
stackEntry->reconstructedValue,
&leafValue,
&recheck))
{
! storeRes(so, &leafTuple->heapPtr,
! leafValue, isnull, recheck);
reportedSome = true;
}
}
*************** redirect:
*** 404,416 ****
Assert(ItemPointerIsValid(&leafTuple->heapPtr));
if (spgLeafTest(index, so,
! SGLTDATUM(leafTuple, &so->state),
stackEntry->level,
stackEntry->reconstructedValue,
&leafValue,
&recheck))
{
! storeRes(so, &leafTuple->heapPtr, leafValue, recheck);
reportedSome = true;
}
--- 430,443 ----
Assert(ItemPointerIsValid(&leafTuple->heapPtr));
if (spgLeafTest(index, so,
! leafTuple, isnull,
stackEntry->level,
stackEntry->reconstructedValue,
&leafValue,
&recheck))
{
! storeRes(so, &leafTuple->heapPtr,
! leafValue, isnull, recheck);
reportedSome = true;
}
*************** redirect:
*** 468,478 ****
memset(&out, 0, sizeof(out));
! procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
! FunctionCall2Coll(procinfo,
! index->rd_indcollation[0],
! PointerGetDatum(&in),
! PointerGetDatum(&out));
MemoryContextSwitchTo(oldCtx);
--- 495,517 ----
memset(&out, 0, sizeof(out));
! if (!isnull)
! {
! /* use user-defined inner consistent method */
! procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
! FunctionCall2Coll(procinfo,
! index->rd_indcollation[0],
! PointerGetDatum(&in),
! PointerGetDatum(&out));
! }
! else
! {
! /* force all children to be visited */
! out.nNodes = in.nNodes;
! out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
! for (i = 0; i < in.nNodes; i++)
! out.nodeNumbers[i] = i;
! }
MemoryContextSwitchTo(oldCtx);
*************** redirect:
*** 524,530 ****
/* storeRes subroutine for getbitmap case */
static void
storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
! Datum leafValue, bool recheck)
{
tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
so->ntids++;
--- 563,569 ----
/* storeRes subroutine for getbitmap case */
static void
storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
! Datum leafValue, bool isnull, bool recheck)
{
tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
so->ntids++;
*************** spggetbitmap(PG_FUNCTION_ARGS)
*** 551,557 ****
/* storeRes subroutine for gettuple case */
static void
storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
! Datum leafValue, bool recheck)
{
Assert(so->nPtrs < MaxIndexTuplesPerPage);
so->heapPtrs[so->nPtrs] = *heapPtr;
--- 590,596 ----
/* storeRes subroutine for gettuple case */
static void
storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
! Datum leafValue, bool isnull, bool recheck)
{
Assert(so->nPtrs < MaxIndexTuplesPerPage);
so->heapPtrs[so->nPtrs] = *heapPtr;
*************** storeGettuple(SpGistScanOpaque so, ItemP
*** 562,569 ****
* Reconstruct desired IndexTuple. We have to copy the datum out of
* the temp context anyway, so we may as well create the tuple here.
*/
- bool isnull = false;
-
so->indexTups[so->nPtrs] = index_form_tuple(so->indexTupDesc,
&leafValue,
&isnull);
--- 601,606 ----
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 1f88562be78e7c103f11814efdc4b802e8eb9767..0cae5e6f3bcfb44fc4eb107d93cd552a9744eed3 100644
*** a/src/backend/access/spgist/spgutils.c
--- b/src/backend/access/spgist/spgutils.c
*************** SpGistNewBuffer(Relation index)
*** 148,157 ****
break; /* nothing known to FSM */
/*
! * The root page shouldn't ever be listed in FSM, but just in case it
! * is, ignore it.
*/
! if (blkno == SPGIST_HEAD_BLKNO)
continue;
buffer = ReadBuffer(index, blkno);
--- 148,157 ----
break; /* nothing known to FSM */
/*
! * The fixed pages shouldn't ever be listed in FSM, but just in case
! * one is, ignore it.
*/
! if (SpGistBlockIsFixed(blkno))
continue;
buffer = ReadBuffer(index, blkno);
*************** SpGistUpdateMetaPage(Relation index)
*** 226,234 ****
}
/* Macro to select proper element of lastUsedPages cache depending on flags */
! #define GET_LUP(c, f) (((f) & GBUF_LEAF) ? \
! &(c)->lastUsedPages.leafPage : \
! &(c)->lastUsedPages.innerPage[(f) & GBUF_PARITY_MASK])
/*
* Allocate and initialize a new buffer of the type and parity specified by
--- 226,233 ----
}
/* Macro to select proper element of lastUsedPages cache depending on flags */
! /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
! #define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
/*
* Allocate and initialize a new buffer of the type and parity specified by
*************** static Buffer
*** 254,268 ****
allocNewBuffer(Relation index, int flags)
{
SpGistCache *cache = spgGetCache(index);
for (;;)
{
Buffer buffer;
buffer = SpGistNewBuffer(index);
! SpGistInitBuffer(buffer, (flags & GBUF_LEAF) ? SPGIST_LEAF : 0);
! if (flags & GBUF_LEAF)
{
/* Leaf pages have no parity concerns, so just use it */
return buffer;
--- 253,273 ----
allocNewBuffer(Relation index, int flags)
{
SpGistCache *cache = spgGetCache(index);
+ uint16 pageflags = 0;
+
+ if ((flags & GBUF_PARITY_MASK) == GBUF_LEAF)
+ pageflags |= SPGIST_LEAF;
+ if (flags & GBUF_NULLS)
+ pageflags |= SPGIST_NULLS;
for (;;)
{
Buffer buffer;
buffer = SpGistNewBuffer(index);
! SpGistInitBuffer(buffer, pageflags);
! if (pageflags & SPGIST_LEAF)
{
/* Leaf pages have no parity concerns, so just use it */
return buffer;
*************** allocNewBuffer(Relation index, int flags
*** 270,278 ****
else
{
BlockNumber blkno = BufferGetBlockNumber(buffer);
! int blkParity = blkno % 3;
! if ((flags & GBUF_PARITY_MASK) == blkParity)
{
/* Page has right parity, use it */
return buffer;
--- 275,285 ----
else
{
BlockNumber blkno = BufferGetBlockNumber(buffer);
! int blkFlags = GBUF_INNER_PARITY(blkno);
! if (pageflags & SPGIST_NULLS)
! blkFlags |= GBUF_NULLS;
! if (flags == blkFlags)
{
/* Page has right parity, use it */
return buffer;
*************** allocNewBuffer(Relation index, int flags
*** 280,287 ****
else
{
/* Page has wrong parity, record it in cache and try again */
! cache->lastUsedPages.innerPage[blkParity].blkno = blkno;
! cache->lastUsedPages.innerPage[blkParity].freeSpace =
PageGetExactFreeSpace(BufferGetPage(buffer));
UnlockReleaseBuffer(buffer);
}
--- 287,294 ----
else
{
/* Page has wrong parity, record it in cache and try again */
! cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
! cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
PageGetExactFreeSpace(BufferGetPage(buffer));
UnlockReleaseBuffer(buffer);
}
*************** SpGistGetBuffer(Relation index, int flag
*** 314,323 ****
* we try to keep 100-fillfactor% available for adding tuples that are
* related to the ones already on it. But fillfactor mustn't cause an
* error for requests that would otherwise be legal.
*/
! needSpace += RelationGetTargetPageFreeSpace(index,
! SPGIST_DEFAULT_FILLFACTOR);
! needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
/* Get the cache entry for this flags setting */
lup = GET_LUP(cache, flags);
--- 321,335 ----
* we try to keep 100-fillfactor% available for adding tuples that are
* related to the ones already on it. But fillfactor mustn't cause an
* error for requests that would otherwise be legal.
+ *
+ * There's no point in leaving fill space on a nulls page, though.
*/
! if (!(flags & GBUF_NULLS))
! {
! needSpace += RelationGetTargetPageFreeSpace(index,
! SPGIST_DEFAULT_FILLFACTOR);
! needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
! }
/* Get the cache entry for this flags setting */
lup = GET_LUP(cache, flags);
*************** SpGistGetBuffer(Relation index, int flag
*** 329,336 ****
return allocNewBuffer(index, flags);
}
! /* root page should never be in cache */
! Assert(lup->blkno != SPGIST_HEAD_BLKNO);
/* If cached freeSpace isn't enough, don't bother looking at the page */
if (lup->freeSpace >= needSpace)
--- 341,348 ----
return allocNewBuffer(index, flags);
}
! /* fixed pages should never be in cache */
! Assert(!SpGistBlockIsFixed(lup->blkno));
/* If cached freeSpace isn't enough, don't bother looking at the page */
if (lup->freeSpace >= needSpace)
*************** SpGistGetBuffer(Relation index, int flag
*** 355,361 ****
if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
{
/* OK to initialize the page */
! SpGistInitBuffer(buffer, (flags & GBUF_LEAF) ? SPGIST_LEAF : 0);
lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
*isNew = true;
return buffer;
--- 367,379 ----
if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
{
/* OK to initialize the page */
! uint16 pageflags = 0;
!
! if ((flags & GBUF_PARITY_MASK) == GBUF_LEAF)
! pageflags |= SPGIST_LEAF;
! if (flags & GBUF_NULLS)
! pageflags |= SPGIST_NULLS;
! SpGistInitBuffer(buffer, pageflags);
lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
*isNew = true;
return buffer;
*************** SpGistGetBuffer(Relation index, int flag
*** 365,372 ****
* Check that page is of right type and has enough space. We must
* recheck this since our cache isn't necessarily up to date.
*/
! if ((flags & GBUF_LEAF) ? SpGistPageIsLeaf(page) :
! !SpGistPageIsLeaf(page))
{
int freeSpace = PageGetExactFreeSpace(page);
--- 383,392 ----
* Check that page is of right type and has enough space. We must
* recheck this since our cache isn't necessarily up to date.
*/
! if ((((flags & GBUF_PARITY_MASK) == GBUF_LEAF) ?
! SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
! ((flags & GBUF_NULLS) ?
! SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
{
int freeSpace = PageGetExactFreeSpace(page);
*************** SpGistSetLastUsedPage(Relation index, Bu
*** 407,420 ****
BlockNumber blkno = BufferGetBlockNumber(buffer);
int flags;
! /* Never enter the root page in cache, though */
! if (blkno == SPGIST_HEAD_BLKNO)
return;
if (SpGistPageIsLeaf(page))
flags = GBUF_LEAF;
else
flags = GBUF_INNER_PARITY(blkno);
lup = GET_LUP(cache, flags);
--- 427,442 ----
BlockNumber blkno = BufferGetBlockNumber(buffer);
int flags;
! /* Never enter fixed pages (root pages) in cache, though */
! if (SpGistBlockIsFixed(blkno))
return;
if (SpGistPageIsLeaf(page))
flags = GBUF_LEAF;
else
flags = GBUF_INNER_PARITY(blkno);
+ if (SpGistPageStoresNulls(page))
+ flags |= GBUF_NULLS;
lup = GET_LUP(cache, flags);
*************** void
*** 459,464 ****
--- 481,487 ----
SpGistInitMetapage(Page page)
{
SpGistMetaPageData *metadata;
+ int i;
SpGistInitPage(page, SPGIST_META);
metadata = SpGistPageGetMeta(page);
*************** SpGistInitMetapage(Page page)
*** 466,475 ****
metadata->magicNumber = SPGIST_MAGIC_NUMBER;
/* initialize last-used-page cache to empty */
! metadata->lastUsedPages.innerPage[0].blkno = InvalidBlockNumber;
! metadata->lastUsedPages.innerPage[1].blkno = InvalidBlockNumber;
! metadata->lastUsedPages.innerPage[2].blkno = InvalidBlockNumber;
! metadata->lastUsedPages.leafPage.blkno = InvalidBlockNumber;
}
/*
--- 489,496 ----
metadata->magicNumber = SPGIST_MAGIC_NUMBER;
/* initialize last-used-page cache to empty */
! for (i = 0; i < SPGIST_CACHED_PAGES; i++)
! metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
}
/*
*************** memcpyDatum(void *target, SpGistTypeDesc
*** 533,549 ****
* Construct a leaf tuple containing the given heap TID and datum value
*/
SpGistLeafTuple
! spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, Datum datum)
{
SpGistLeafTuple tup;
unsigned int size;
/* compute space needed (note result is already maxaligned) */
! size = SGLTHDRSZ + SpGistGetTypeSize(&state->attType, datum);
/*
* Ensure that we can replace the tuple with a dead tuple later. This
! * test is unnecessary given current tuple layouts, but let's be safe.
*/
if (size < SGDTSIZE)
size = SGDTSIZE;
--- 554,573 ----
* Construct a leaf tuple containing the given heap TID and datum value
*/
SpGistLeafTuple
! spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
! Datum datum, bool isnull)
{
SpGistLeafTuple tup;
unsigned int size;
/* compute space needed (note result is already maxaligned) */
! size = SGLTHDRSZ;
! if (!isnull)
! size += SpGistGetTypeSize(&state->attType, datum);
/*
* Ensure that we can replace the tuple with a dead tuple later. This
! * test is unnecessary when !isnull, but let's be safe.
*/
if (size < SGDTSIZE)
size = SGDTSIZE;
*************** spgFormLeafTuple(SpGistState *state, Ite
*** 554,560 ****
tup->size = size;
tup->nextOffset = InvalidOffsetNumber;
tup->heapPtr = *heapPtr;
! memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
return tup;
}
--- 578,585 ----
tup->size = size;
tup->nextOffset = InvalidOffsetNumber;
tup->heapPtr = *heapPtr;
! if (!isnull)
! memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
return tup;
}
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 4598ea8d67fd42854c59a29624683efea91ac451..8bd259063084ce413157efc9ecb8b2b082770ff1 100644
*** a/src/backend/access/spgist/spgvacuum.c
--- b/src/backend/access/spgist/spgvacuum.c
*************** vacuumLeafPage(spgBulkDeleteState *bds,
*** 307,313 ****
}
/*
! * Vacuum the root page when it is a leaf
*
* On the root, we just delete any dead leaf tuples; no fancy business
*/
--- 307,313 ----
}
/*
! * Vacuum a root page when it is also a leaf
*
* On the root, we just delete any dead leaf tuples; no fancy business
*/
*************** vacuumLeafRoot(spgBulkDeleteState *bds,
*** 321,326 ****
--- 321,327 ----
OffsetNumber i,
max = PageGetMaxOffsetNumber(page);
+ xlrec.blkno = BufferGetBlockNumber(buffer);
xlrec.nDelete = 0;
/* Scan page, identify tuples to delete, accumulate stats */
*************** spgvacuumpage(spgBulkDeleteState *bds, B
*** 537,543 ****
}
else if (SpGistPageIsLeaf(page))
{
! if (blkno == SPGIST_HEAD_BLKNO)
{
vacuumLeafRoot(bds, index, buffer);
/* no need for vacuumRedirectAndPlaceholder */
--- 538,544 ----
}
else if (SpGistPageIsLeaf(page))
{
! if (SpGistBlockIsRoot(blkno))
{
vacuumLeafRoot(bds, index, buffer);
/* no need for vacuumRedirectAndPlaceholder */
*************** spgvacuumpage(spgBulkDeleteState *bds, B
*** 560,566 ****
* put a new tuple. Otherwise, check for empty/deletable page, and
* make sure FSM knows about it.
*/
! if (blkno != SPGIST_HEAD_BLKNO)
{
/* If page is now empty, mark it deleted */
if (PageIsEmpty(page) && !SpGistPageIsDeleted(page))
--- 561,567 ----
* put a new tuple. Otherwise, check for empty/deletable page, and
* make sure FSM knows about it.
*/
! if (!SpGistBlockIsRoot(blkno))
{
/* If page is now empty, mark it deleted */
if (PageIsEmpty(page) && !SpGistPageIsDeleted(page))
*************** spgvacuumscan(spgBulkDeleteState *bds)
*** 598,604 ****
/* Finish setting up spgBulkDeleteState */
initSpGistState(&bds->spgstate, index);
bds->OldestXmin = GetOldestXmin(true, false);
! bds->lastFilledBlock = SPGIST_HEAD_BLKNO;
/*
* Reset counts that will be incremented during the scan; needed in case
--- 599,605 ----
/* Finish setting up spgBulkDeleteState */
initSpGistState(&bds->spgstate, index);
bds->OldestXmin = GetOldestXmin(true, false);
! bds->lastFilledBlock = SPGIST_ROOT_BLKNO;
/*
* Reset counts that will be incremented during the scan; needed in case
*************** spgvacuumscan(spgBulkDeleteState *bds)
*** 619,625 ****
* delete some deletable tuples. See more extensive comments about
* this in btvacuumscan().
*/
! blkno = SPGIST_HEAD_BLKNO;
for (;;)
{
/* Get the current relation length */
--- 620,626 ----
* delete some deletable tuples. See more extensive comments about
* this in btvacuumscan().
*/
! blkno = SPGIST_ROOT_BLKNO;
for (;;)
{
/* Get the current relation length */
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index daa8ae300bae9125c9c4732cdf65185c5779dec2..139dfaf12a007aa80c0d105a833d55a706a5a118 100644
*** a/src/backend/access/spgist/spgxlog.c
--- b/src/backend/access/spgist/spgxlog.c
*************** spgRedoCreateIndex(XLogRecPtr lsn, XLogR
*** 84,90 ****
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
! buffer = XLogReadBuffer(*node, SPGIST_HEAD_BLKNO, true);
Assert(BufferIsValid(buffer));
SpGistInitBuffer(buffer, SPGIST_LEAF);
page = (Page) BufferGetPage(buffer);
--- 84,90 ----
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
! buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
SpGistInitBuffer(buffer, SPGIST_LEAF);
page = (Page) BufferGetPage(buffer);
*************** spgRedoCreateIndex(XLogRecPtr lsn, XLogR
*** 92,97 ****
--- 92,106 ----
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
+
+ buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true);
+ Assert(BufferIsValid(buffer));
+ SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
+ page = (Page) BufferGetPage(buffer);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ UnlockReleaseBuffer(buffer);
}
static void
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 545,551 ****
*/
bbi = 0;
! if (xldata->blknoSrc == SPGIST_HEAD_BLKNO)
{
/* when splitting root, we touch it only in the guise of new inner */
srcBuffer = InvalidBuffer;
--- 554,560 ----
*/
bbi = 0;
! if (SpGistBlockIsRoot(xldata->blknoSrc))
{
/* when splitting root, we touch it only in the guise of new inner */
srcBuffer = InvalidBuffer;
*************** spgRedoPickSplit(XLogRecPtr lsn, XLogRec
*** 709,715 ****
if (xldata->blknoParent == InvalidBlockNumber)
{
/* no parent cause we split the root */
! Assert(xldata->blknoInner == SPGIST_HEAD_BLKNO);
}
else if (xldata->blknoInner != xldata->blknoParent)
{
--- 718,724 ----
if (xldata->blknoParent == InvalidBlockNumber)
{
/* no parent cause we split the root */
! Assert(SpGistBlockIsRoot(xldata->blknoInner));
}
else if (xldata->blknoInner != xldata->blknoParent)
{
*************** spgRedoVacuumRoot(XLogRecPtr lsn, XLogRe
*** 842,848 ****
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(xldata->node, SPGIST_HEAD_BLKNO, false);
if (BufferIsValid(buffer))
{
page = BufferGetPage(buffer);
--- 851,857 ----
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
! buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (BufferIsValid(buffer))
{
page = BufferGetPage(buffer);
*************** spg_desc(StringInfo buf, uint8 xl_info,
*** 1039,1045 ****
break;
case XLOG_SPGIST_VACUUM_ROOT:
out_target(buf, ((spgxlogVacuumRoot *) rec)->node);
! appendStringInfo(buf, "vacuum leaf tuples on root page");
break;
case XLOG_SPGIST_VACUUM_REDIRECT:
out_target(buf, ((spgxlogVacuumRedirect *) rec)->node);
--- 1048,1055 ----
break;
case XLOG_SPGIST_VACUUM_ROOT:
out_target(buf, ((spgxlogVacuumRoot *) rec)->node);
! appendStringInfo(buf, "vacuum leaf tuples on root page %u",
! ((spgxlogVacuumRoot *) rec)->blkno);
break;
case XLOG_SPGIST_VACUUM_REDIRECT:
out_target(buf, ((spgxlogVacuumRedirect *) rec)->node);
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index 76ea5a1578fc8c5e44b8194be8c33f120f365a7d..42decd6e0ade5146c38c1dec4499bc28a1d9cadf 100644
*** a/src/include/access/spgist_private.h
--- b/src/include/access/spgist_private.h
***************
*** 21,28 ****
/* Page numbers of fixed-location pages */
! #define SPGIST_METAPAGE_BLKNO (0)
! #define SPGIST_HEAD_BLKNO (1)
/*
* Contents of page special space on SPGiST index pages
--- 21,35 ----
/* Page numbers of fixed-location pages */
! #define SPGIST_METAPAGE_BLKNO (0) /* metapage */
! #define SPGIST_ROOT_BLKNO (1) /* root for normal entries */
! #define SPGIST_NULL_BLKNO (2) /* root for null-value entries */
! #define SPGIST_LAST_FIXED_BLKNO SPGIST_NULL_BLKNO
!
! #define SpGistBlockIsRoot(blkno) \
! ((blkno) == SPGIST_ROOT_BLKNO || (blkno) == SPGIST_NULL_BLKNO)
! #define SpGistBlockIsFixed(blkno) \
! ((BlockNumber) (blkno) <= (BlockNumber) SPGIST_LAST_FIXED_BLKNO)
/*
* Contents of page special space on SPGiST index pages
*************** typedef SpGistPageOpaqueData *SpGistPage
*** 42,56 ****
#define SPGIST_META (1<<0)
#define SPGIST_DELETED (1<<1)
#define SPGIST_LEAF (1<<2)
#define SpGistPageGetOpaque(page) ((SpGistPageOpaque) PageGetSpecialPointer(page))
#define SpGistPageIsMeta(page) (SpGistPageGetOpaque(page)->flags & SPGIST_META)
#define SpGistPageIsDeleted(page) (SpGistPageGetOpaque(page)->flags & SPGIST_DELETED)
#define SpGistPageSetDeleted(page) (SpGistPageGetOpaque(page)->flags |= SPGIST_DELETED)
- #define SpGistPageSetNonDeleted(page) (SpGistPageGetOpaque(page)->flags &= ~SPGIST_DELETED)
#define SpGistPageIsLeaf(page) (SpGistPageGetOpaque(page)->flags & SPGIST_LEAF)
! #define SpGistPageSetLeaf(page) (SpGistPageGetOpaque(page)->flags |= SPGIST_LEAF)
! #define SpGistPageSetInner(page) (SpGistPageGetOpaque(page)->flags &= ~SPGIST_LEAF)
/*
* The page ID is for the convenience of pg_filedump and similar utilities,
--- 49,62 ----
#define SPGIST_META (1<<0)
#define SPGIST_DELETED (1<<1)
#define SPGIST_LEAF (1<<2)
+ #define SPGIST_NULLS (1<<3)
#define SpGistPageGetOpaque(page) ((SpGistPageOpaque) PageGetSpecialPointer(page))
#define SpGistPageIsMeta(page) (SpGistPageGetOpaque(page)->flags & SPGIST_META)
#define SpGistPageIsDeleted(page) (SpGistPageGetOpaque(page)->flags & SPGIST_DELETED)
#define SpGistPageSetDeleted(page) (SpGistPageGetOpaque(page)->flags |= SPGIST_DELETED)
#define SpGistPageIsLeaf(page) (SpGistPageGetOpaque(page)->flags & SPGIST_LEAF)
! #define SpGistPageStoresNulls(page) (SpGistPageGetOpaque(page)->flags & SPGIST_NULLS)
/*
* The page ID is for the convenience of pg_filedump and similar utilities,
*************** typedef struct SpGistLastUsedPage
*** 71,80 ****
int freeSpace; /* its free space (could be obsolete!) */
} SpGistLastUsedPage;
typedef struct SpGistLUPCache
{
! SpGistLastUsedPage innerPage[3]; /* one per triple-parity group */
! SpGistLastUsedPage leafPage;
} SpGistLUPCache;
/*
--- 77,88 ----
int freeSpace; /* its free space (could be obsolete!) */
} SpGistLastUsedPage;
+ /* Note: indexes in cachedPage[] match flag assignments for SpGistGetBuffer */
+ #define SPGIST_CACHED_PAGES 8
+
typedef struct SpGistLUPCache
{
! SpGistLastUsedPage cachedPage[SPGIST_CACHED_PAGES];
} SpGistLUPCache;
/*
*************** typedef struct SpGistMetaPageData
*** 86,92 ****
SpGistLUPCache lastUsedPages; /* shared storage of last-used info */
} SpGistMetaPageData;
! #define SPGIST_MAGIC_NUMBER (0xBA0BABED)
#define SpGistPageGetMeta(p) \
((SpGistMetaPageData *) PageGetContents(p))
--- 94,100 ----
SpGistLUPCache lastUsedPages; /* shared storage of last-used info */
} SpGistMetaPageData;
! #define SPGIST_MAGIC_NUMBER (0xBA0BABEE)
#define SpGistPageGetMeta(p) \
((SpGistMetaPageData *) PageGetContents(p))
*************** typedef SpGistNodeTupleData *SpGistNodeT
*** 266,272 ****
* node (which must be on the same page). But when the root page is a leaf
* page, we don't chain its tuples, so nextOffset is always 0 on the root.
*
! * size must be a multiple of MAXALIGN
*/
typedef struct SpGistLeafTupleData
{
--- 274,283 ----
* node (which must be on the same page). But when the root page is a leaf
* page, we don't chain its tuples, so nextOffset is always 0 on the root.
*
! * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE
! * so that the tuple can be converted to REDIRECT status later. (This
! * restriction only adds bytes for the null-datum case, otherwise alignment
! * restrictions force it anyway.)
*/
typedef struct SpGistLeafTupleData
{
*************** typedef struct spgxlogVacuumLeaf
*** 553,561 ****
typedef struct spgxlogVacuumRoot
{
! /* vacuum root page when it is a leaf */
RelFileNode node;
uint16 nDelete; /* number of tuples to delete */
spgxlogState stateSrc;
--- 564,573 ----
typedef struct spgxlogVacuumRoot
{
! /* vacuum a root page when it is also a leaf */
RelFileNode node;
+ BlockNumber blkno; /* block number to clean */
uint16 nDelete; /* number of tuples to delete */
spgxlogState stateSrc;
*************** typedef struct spgxlogVacuumRedirect
*** 580,589 ****
* page in the same triple-parity group as the specified block number.
* (Typically, this should be GBUF_INNER_PARITY(parentBlockNumber + 1)
* to follow the rule described in spgist/README.)
*/
#define GBUF_PARITY_MASK 0x03
! #define GBUF_LEAF 0x04
#define GBUF_INNER_PARITY(x) ((x) % 3)
/* spgutils.c */
extern SpGistCache *spgGetCache(Relation index);
--- 592,605 ----
* page in the same triple-parity group as the specified block number.
* (Typically, this should be GBUF_INNER_PARITY(parentBlockNumber + 1)
* to follow the rule described in spgist/README.)
+ * In addition, GBUF_NULLS can be OR'd in to get a page for storage of
+ * null-valued tuples.
+ * Note: these flag values are used as indexes into lastUsedPages.
*/
#define GBUF_PARITY_MASK 0x03
! #define GBUF_LEAF 0x03
#define GBUF_INNER_PARITY(x) ((x) % 3)
+ #define GBUF_NULLS 0x04
/* spgutils.c */
extern SpGistCache *spgGetCache(Relation index);
*************** extern void SpGistInitBuffer(Buffer b, u
*** 598,604 ****
extern void SpGistInitMetapage(Page page);
extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum);
extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
! ItemPointer heapPtr, Datum datum);
extern SpGistNodeTuple spgFormNodeTuple(SpGistState *state,
Datum label, bool isnull);
extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
--- 614,621 ----
extern void SpGistInitMetapage(Page page);
extern unsigned int SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum);
extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
! ItemPointer heapPtr,
! Datum datum, bool isnull);
extern SpGistNodeTuple spgFormNodeTuple(SpGistState *state,
Datum label, bool isnull);
extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
*************** extern void spgPageIndexMultiDelete(SpGi
*** 621,626 ****
int firststate, int reststate,
BlockNumber blkno, OffsetNumber offnum);
extern void spgdoinsert(Relation index, SpGistState *state,
! ItemPointer heapPtr, Datum datum);
#endif /* SPGIST_PRIVATE_H */
--- 638,643 ----
int firststate, int reststate,
BlockNumber blkno, OffsetNumber offnum);
extern void spgdoinsert(Relation index, SpGistState *state,
! ItemPointer heapPtr, Datum datum, bool isnull);
#endif /* SPGIST_PRIVATE_H */
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index 9aac9e953b3696c16bb3c0db7879e259b2ef23e0..0d7ed6857e832c0dd4a02bef4a5e2c9dfaa39124 100644
*** a/src/include/catalog/pg_am.h
--- b/src/include/catalog/pg_am.h
*************** DESCR("GiST index access method");
*** 129,135 ****
DATA(insert OID = 2742 ( gin 0 5 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan
ginendscanginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
DESCR("GIN index access method");
#define GIN_AM_OID 2742
! DATA(insert OID = 4000 ( spgist 0 5 f f f f f f f f f f f 0 spginsert spgbeginscan spggettuple spggetbitmap
spgrescanspgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn
spgcostestimatespgoptions ));
DESCR("SP-GiST index access method");
#define SPGIST_AM_OID 4000
--- 129,135 ----
DATA(insert OID = 2742 ( gin 0 5 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan
ginendscanginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
DESCR("GIN index access method");
#define GIN_AM_OID 2742
! DATA(insert OID = 4000 ( spgist 0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap
spgrescanspgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn
spgcostestimatespgoptions ));
DESCR("SP-GiST index access method");
#define SPGIST_AM_OID 4000