Re: invalid tid errors in latest 7.3.4 stable.

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: invalid tid errors in latest 7.3.4 stable.
Дата
Msg-id 9129.1065044188@sss.pgh.pa.us
обсуждение исходный текст
Ответ на invalid tid errors in latest 7.3.4 stable.  (Wade Klaver <archeron@wavefire.com>)
Ответы Re: invalid tid errors in latest 7.3.4 stable.  (Stephan Szabo <sszabo@megazone.bigpanda.com>)
Список pgsql-hackers
Stephan Szabo <sszabo@megazone.bigpanda.com> writes:
> On Tue, 30 Sep 2003, Tom Lane wrote:
>> I think I can implement it and it will act as stated in my proposal.
>> Whether people like the proposed behavior is the big question in my
>> mind.

> I think it's more reasonable than the current behavior or any of
> the others we've hit along the way, and we have to pretty much choose
> now if we want to change it for 7.4.

I've committed the attached patch.  One thing I wanted to double-check
with you is that the SELECT FOR UPDATES done in the noaction cases are
being correctly handled.  I think it is correct to do them with the
current snapshot rather than the start-of-transaction snap; do you
agree?  Also, I did not propagate the crosscheck support into
heap_mark4update, meaning that these SELECT FOR UPDATEs won't complain
if they find a row that was inserted later than the start of the
serializable transaction.  I'm not totally sure if they should or not;
what do you think?
        regards, tom lane

*** src/backend/access/heap/heapam.c.orig    Thu Sep 25 10:22:54 2003
--- src/backend/access/heap/heapam.c    Wed Oct  1 16:02:27 2003
***************
*** 1207,1220 ****  * NB: do not call this directly unless you are prepared to deal with  * concurrent-update
conditions. Use simple_heap_delete instead.  *  * Normal, successful return value is HeapTupleMayBeUpdated, which  *
actuallymeans we did delete it.  Failure return codes are  * HeapTupleSelfUpdated, HeapTupleUpdated, or
HeapTupleBeingUpdated
!  * (the last only possible if wait == false).  */ int heap_delete(Relation relation, ItemPointer tid,
!             ItemPointer ctid, CommandId cid, bool wait) {     ItemId        lp;     HeapTupleData tp;
--- 1207,1229 ----  * NB: do not call this directly unless you are prepared to deal with  * concurrent-update
conditions. Use simple_heap_delete instead.  *
 
+  *    relation - table to be modified
+  *    tid - TID of tuple to be deleted
+  *    ctid - output parameter, used only for failure case (see below)
+  *    cid - delete command ID to use in verifying tuple visibility
+  *    crosscheck - if not SnapshotAny, also check tuple against this
+  *    wait - true if should wait for any conflicting update to commit/abort
+  *  * Normal, successful return value is HeapTupleMayBeUpdated, which  * actually means we did delete it.  Failure
returncodes are  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
 
!  * (the last only possible if wait == false).  On a failure return,
!  * *ctid is set to the ctid link of the target tuple (possibly a later
!  * version of the row).  */ int heap_delete(Relation relation, ItemPointer tid,
!             ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) {     ItemId        lp;
HeapTupleDatatp;
 
***************
*** 1240,1246 ****     tp.t_tableOid = relation->rd_id;  l1:
!     result = HeapTupleSatisfiesUpdate(&tp, cid);      if (result == HeapTupleInvisible)     {
--- 1249,1255 ----     tp.t_tableOid = relation->rd_id;  l1:
!     result = HeapTupleSatisfiesUpdate(tp.t_data, cid);      if (result == HeapTupleInvisible)     {
***************
*** 1278,1283 ****
--- 1287,1300 ----         else             result = HeapTupleUpdated;     }
+ 
+     if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated)
+     {
+         /* Perform additional check for serializable RI updates */
+         if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck))
+             result = HeapTupleUpdated;
+     }
+      if (result != HeapTupleMayBeUpdated)     {         Assert(result == HeapTupleSelfUpdated ||
***************
*** 1378,1384 ****      result = heap_delete(relation, tid,                          &ctid,
!                          GetCurrentCommandId(),                          true /* wait for commit */);     switch
(result)    {
 
--- 1395,1401 ----      result = heap_delete(relation, tid,                          &ctid,
!                          GetCurrentCommandId(), SnapshotAny,                          true /* wait for commit */);
switch (result)     {
 
***************
*** 1407,1420 ****  * NB: do not call this directly unless you are prepared to deal with  * concurrent-update
conditions. Use simple_heap_update instead.  *  * Normal, successful return value is HeapTupleMayBeUpdated, which  *
actuallymeans we *did* update it.  Failure return codes are  * HeapTupleSelfUpdated, HeapTupleUpdated, or
HeapTupleBeingUpdated
!  * (the last only possible if wait == false).  */ int heap_update(Relation relation, ItemPointer otid, HeapTuple
newtup,
!             ItemPointer ctid, CommandId cid, bool wait) {     ItemId        lp;     HeapTupleData oldtup;
--- 1424,1449 ----  * NB: do not call this directly unless you are prepared to deal with  * concurrent-update
conditions. Use simple_heap_update instead.  *
 
+  *    relation - table to be modified
+  *    otid - TID of old tuple to be replaced
+  *    newtup - newly constructed tuple data to store
+  *    ctid - output parameter, used only for failure case (see below)
+  *    cid - update command ID to use in verifying old tuple visibility
+  *    crosscheck - if not SnapshotAny, also check old tuple against this
+  *    wait - true if should wait for any conflicting update to commit/abort
+  *  * Normal, successful return value is HeapTupleMayBeUpdated, which  * actually means we *did* update it.  Failure
returncodes are  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
 
!  * (the last only possible if wait == false).  On a failure return,
!  * *ctid is set to the ctid link of the old tuple (possibly a later
!  * version of the row).
!  * On success, newtup->t_self is set to the TID where the new tuple
!  * was inserted.  */ int heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
!             ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) {     ItemId        lp;
HeapTupleDataoldtup;
 
***************
*** 1450,1456 ****      */  l2:
!     result = HeapTupleSatisfiesUpdate(&oldtup, cid);      if (result == HeapTupleInvisible)     {
--- 1479,1485 ----      */  l2:
!     result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid);      if (result == HeapTupleInvisible)     {
***************
*** 1488,1493 ****
--- 1517,1530 ----         else             result = HeapTupleUpdated;     }
+ 
+     if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated)
+     {
+         /* Perform additional check for serializable RI updates */
+         if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck))
+             result = HeapTupleUpdated;
+     }
+      if (result != HeapTupleMayBeUpdated)     {         Assert(result == HeapTupleSelfUpdated ||
***************
*** 1718,1724 ****      result = heap_update(relation, otid, tup,                          &ctid,
!                          GetCurrentCommandId(),                          true /* wait for commit */);     switch
(result)    {
 
--- 1755,1761 ----      result = heap_update(relation, otid, tup,                          &ctid,
!                          GetCurrentCommandId(), SnapshotAny,                          true /* wait for commit */);
switch (result)     {
 
***************
*** 1767,1773 ****     tuple->t_len = ItemIdGetLength(lp);  l3:
!     result = HeapTupleSatisfiesUpdate(tuple, cid);      if (result == HeapTupleInvisible)     {
--- 1804,1810 ----     tuple->t_len = ItemIdGetLength(lp);  l3:
!     result = HeapTupleSatisfiesUpdate(tuple->t_data, cid);      if (result == HeapTupleInvisible)     {
*** src/backend/commands/async.c.orig    Mon Sep 15 19:33:39 2003
--- src/backend/commands/async.c    Wed Oct  1 15:36:02 2003
***************
*** 537,543 ****                  */                 result = heap_update(lRel, &lTuple->t_self, rTuple,
                     &ctid,
 
!                                      GetCurrentCommandId(),                                      false /* no wait for
commit*/);                 switch (result)                 {
 
--- 537,543 ----                  */                 result = heap_update(lRel, &lTuple->t_self, rTuple,
                     &ctid,
 
!                                      GetCurrentCommandId(), SnapshotAny,                                      false
/*no wait for commit */);                 switch (result)                 {
 
*** src/backend/executor/execMain.c.orig    Thu Sep 25 14:58:35 2003
--- src/backend/executor/execMain.c    Wed Oct  1 17:22:30 2003
***************
*** 104,111 ****  * field of the QueryDesc is filled in to describe the tuples that will be  * returned, and the
internalfields (estate and planstate) are set up.  *
 
!  * If useSnapshotNow is true, run the query with SnapshotNow time qual rules
!  * instead of the normal use of QuerySnapshot.  *  * If explainOnly is true, we are not actually intending to run the
plan, * only to set up for EXPLAIN; so skip unwanted side-effects.
 
--- 104,117 ----  * field of the QueryDesc is filled in to describe the tuples that will be  * returned, and the
internalfields (estate and planstate) are set up.  *
 
!  * If useCurrentSnapshot is true, run the query with the latest available
!  * snapshot, instead of the normal QuerySnapshot.  Also, if it's an update
!  * or delete query, check that the rows to be updated or deleted would be
!  * visible to the normal QuerySnapshot.  (This is a special-case behavior
!  * needed for referential integrity updates in serializable transactions.
!  * We must check all currently-committed rows, but we want to throw a
!  * can't-serialize error if any rows that would need updates would not be
!  * visible under the normal serializable snapshot.)  *  * If explainOnly is true, we are not actually intending to
runthe plan,  * only to set up for EXPLAIN; so skip unwanted side-effects.
 
***************
*** 115,121 ****  * ----------------------------------------------------------------  */ void
! ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly) {     EState       *estate;
MemoryContextoldcontext;
 
--- 121,127 ----  * ----------------------------------------------------------------  */ void
! ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot, bool explainOnly) {     EState       *estate;
MemoryContextoldcontext;
 
***************
*** 157,171 ****      * the life of this query, even if it outlives the current command and      * current snapshot.
 */
 
!     if (useSnapshotNow)     {
!         estate->es_snapshot = SnapshotNow;
!         estate->es_snapshot_cid = GetCurrentCommandId();     }     else     {         estate->es_snapshot =
CopyQuerySnapshot();
!         estate->es_snapshot_cid = estate->es_snapshot->curcid;     }      /*
--- 163,180 ----      * the life of this query, even if it outlives the current command and      * current snapshot.
 */
 
!     if (useCurrentSnapshot)     {
!         /* RI update/delete query --- must use an up-to-date snapshot */
!         estate->es_snapshot = CopyCurrentSnapshot();
!         /* crosscheck updates/deletes against transaction snapshot */
!         estate->es_crosscheck_snapshot = CopyQuerySnapshot();     }     else     {
+         /* normal query --- use query snapshot, no crosscheck */         estate->es_snapshot = CopyQuerySnapshot();
!         estate->es_crosscheck_snapshot = SnapshotAny;     }      /*
***************
*** 1118,1124 ****                      tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
test= heap_mark4update(erm->relation, &tuple, &buffer,
 
!                                             estate->es_snapshot_cid);                     ReleaseBuffer(buffer);
              switch (test)                     {
 
--- 1127,1133 ----                      tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
test= heap_mark4update(erm->relation, &tuple, &buffer,
 
!                                             estate->es_snapshot->curcid);                     ReleaseBuffer(buffer);
                  switch (test)                     {
 
***************
*** 1278,1284 ****     if (estate->es_into_relation_descriptor != NULL)     {
heap_insert(estate->es_into_relation_descriptor,tuple,
 
!                     estate->es_snapshot_cid);         IncrAppended();     } 
--- 1287,1293 ----     if (estate->es_into_relation_descriptor != NULL)     {
heap_insert(estate->es_into_relation_descriptor,tuple,
 
!                     estate->es_snapshot->curcid);         IncrAppended();     } 
***************
*** 1354,1360 ****      * insert the tuple      */     newId = heap_insert(resultRelationDesc, tuple,
!                         estate->es_snapshot_cid);      IncrAppended();     (estate->es_processed)++;
--- 1363,1369 ----      * insert the tuple      */     newId = heap_insert(resultRelationDesc, tuple,
!                         estate->es_snapshot->curcid);      IncrAppended();     (estate->es_processed)++;
***************
*** 1406,1412 ****         bool        dodelete;          dodelete = ExecBRDeleteTriggers(estate, resultRelInfo,
tupleid,
!                                         estate->es_snapshot_cid);          if (!dodelete)            /* "do nothing"
*/            return;
 
--- 1415,1421 ----         bool        dodelete;          dodelete = ExecBRDeleteTriggers(estate, resultRelInfo,
tupleid,
!                                         estate->es_snapshot->curcid);          if (!dodelete)            /* "do
nothing"*/             return;
 
***************
*** 1418,1424 **** ldelete:;     result = heap_delete(resultRelationDesc, tupleid,                          &ctid,
!                          estate->es_snapshot_cid,                          true /* wait for commit */);     switch
(result)    {
 
--- 1427,1434 ---- ldelete:;     result = heap_delete(resultRelationDesc, tupleid,                          &ctid,
!                          estate->es_snapshot->curcid,
!                          estate->es_crosscheck_snapshot,                          true /* wait for commit */);
switch(result)     {
 
***************
*** 1517,1523 ****          newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
  tupleid, tuple,
 
!                                         estate->es_snapshot_cid);          if (newtuple == NULL)    /* "do nothing"
*/            return;
 
--- 1527,1533 ----          newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
  tupleid, tuple,
 
!                                         estate->es_snapshot->curcid);          if (newtuple == NULL)    /* "do
nothing"*/             return;
 
***************
*** 1553,1559 ****      */     result = heap_update(resultRelationDesc, tupleid, tuple,
&ctid,
!                          estate->es_snapshot_cid,                          true /* wait for commit */);     switch
(result)    {
 
--- 1563,1570 ----      */     result = heap_update(resultRelationDesc, tupleid, tuple,
&ctid,
!                          estate->es_snapshot->curcid,
!                          estate->es_crosscheck_snapshot,                          true /* wait for commit */);
switch(result)     {
 
***************
*** 2039,2045 ****      */     epqstate->es_direction = ForwardScanDirection;     epqstate->es_snapshot =
estate->es_snapshot;
!     epqstate->es_snapshot_cid = estate->es_snapshot_cid;     epqstate->es_range_table = estate->es_range_table;
epqstate->es_result_relations= estate->es_result_relations;     epqstate->es_num_result_relations =
estate->es_num_result_relations;
--- 2050,2056 ----      */     epqstate->es_direction = ForwardScanDirection;     epqstate->es_snapshot =
estate->es_snapshot;
!     epqstate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot;     epqstate->es_range_table =
estate->es_range_table;    epqstate->es_result_relations = estate->es_result_relations;
epqstate->es_num_result_relations= estate->es_num_result_relations;
 
*** src/backend/executor/execUtils.c.orig    Thu Sep 25 14:58:35 2003
--- src/backend/executor/execUtils.c    Wed Oct  1 15:35:55 2003
***************
*** 178,184 ****      */     estate->es_direction = ForwardScanDirection;     estate->es_snapshot = SnapshotNow;
!     estate->es_snapshot_cid = FirstCommandId;     estate->es_range_table = NIL;      estate->es_result_relations =
NULL;
--- 178,184 ----      */     estate->es_direction = ForwardScanDirection;     estate->es_snapshot = SnapshotNow;
!     estate->es_crosscheck_snapshot = SnapshotAny; /* means no crosscheck */     estate->es_range_table = NIL;
estate->es_result_relations= NULL;
 
*** src/backend/executor/nodeSubplan.c.orig    Thu Sep 25 14:58:35 2003
--- src/backend/executor/nodeSubplan.c    Wed Oct  1 17:22:30 2003
***************
*** 709,715 ****     sp_estate->es_tupleTable =         ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
 sp_estate->es_snapshot = estate->es_snapshot;
 
!     sp_estate->es_snapshot_cid = estate->es_snapshot_cid;     sp_estate->es_instrument = estate->es_instrument;
/*
--- 709,715 ----     sp_estate->es_tupleTable =         ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
 sp_estate->es_snapshot = estate->es_snapshot;
 
!     sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot;     sp_estate->es_instrument =
estate->es_instrument;     /*
 
*** src/backend/executor/nodeSubqueryscan.c.orig    Thu Sep 25 14:58:35 2003
--- src/backend/executor/nodeSubqueryscan.c    Wed Oct  1 17:22:31 2003
***************
*** 177,183 ****     sp_estate->es_tupleTable =         ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
 sp_estate->es_snapshot = estate->es_snapshot;
 
!     sp_estate->es_snapshot_cid = estate->es_snapshot_cid;     sp_estate->es_instrument = estate->es_instrument;
/*
--- 177,183 ----     sp_estate->es_tupleTable =         ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
 sp_estate->es_snapshot = estate->es_snapshot;
 
!     sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot;     sp_estate->es_instrument =
estate->es_instrument;     /*
 
*** src/backend/executor/spi.c.orig    Thu Sep 25 14:58:35 2003
--- src/backend/executor/spi.c    Wed Oct  1 16:33:44 2003
***************
*** 33,43 ****  static int    _SPI_execute(const char *src, int tcount, _SPI_plan *plan); static int
_SPI_pquery(QueryDesc*queryDesc, bool runit,
 
!                         bool useSnapshotNow, int tcount);  static int _SPI_execute_plan(_SPI_plan *plan,
               Datum *Values, const char *Nulls,
 
!                              bool useSnapshotNow, int tcount);  static void _SPI_cursor_operation(Portal portal, bool
forward,int count,                       DestReceiver *dest);
 
--- 33,43 ----  static int    _SPI_execute(const char *src, int tcount, _SPI_plan *plan); static int
_SPI_pquery(QueryDesc*queryDesc, bool runit,
 
!                         bool useCurrentSnapshot, int tcount);  static int _SPI_execute_plan(_SPI_plan *plan,
                   Datum *Values, const char *Nulls,
 
!                              bool useCurrentSnapshot, int tcount);  static void _SPI_cursor_operation(Portal portal,
boolforward, int count,                       DestReceiver *dest);
 
***************
*** 245,256 **** }  /*
!  * SPI_execp_now -- identical to SPI_execp, except that we use SnapshotNow
!  * instead of the normal QuerySnapshot.  This is currently not documented
!  * in spi.sgml because it is only intended for use by RI triggers.  */ int
! SPI_execp_now(void *plan, Datum *Values, const char *Nulls, int tcount) {     int            res; 
--- 245,258 ---- }  /*
!  * SPI_execp_current -- identical to SPI_execp, except that we expose the
!  * Executor option to use a current snapshot instead of the normal
!  * QuerySnapshot.  This is currently not documented in spi.sgml because
!  * it is only intended for use by RI triggers.  */ int
! SPI_execp_current(void *plan, Datum *Values, const char *Nulls,
!                   bool useCurrentSnapshot, int tcount) {     int            res; 
***************
*** 264,270 ****     if (res < 0)         return res; 
!     res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, true, tcount);      _SPI_end_call(true);     return
res;
--- 266,273 ----     if (res < 0)         return res; 
!     res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls,
!                             useCurrentSnapshot, tcount);      _SPI_end_call(true);     return res;
***************
*** 1124,1130 ****  static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
!                   bool useSnapshotNow, int tcount) {     List       *query_list_list = plan->qtlist;     List
*plan_list= plan->ptlist;
 
--- 1127,1133 ----  static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
!                   bool useCurrentSnapshot, int tcount) {     List       *query_list_list = plan->qtlist;     List
 *plan_list = plan->ptlist;
 
***************
*** 1195,1201 ****             {                 qdesc = CreateQueryDesc(queryTree, planTree, dest,
                   paramLI, false);
 
!                 res = _SPI_pquery(qdesc, true, useSnapshotNow,                                   queryTree->canSetTag
?tcount : 0);                 if (res < 0)                     return res;
 
--- 1198,1204 ----             {                 qdesc = CreateQueryDesc(queryTree, planTree, dest,
                   paramLI, false);
 
!                 res = _SPI_pquery(qdesc, true, useCurrentSnapshot,
queryTree->canSetTag? tcount : 0);                 if (res < 0)                     return res;
 
***************
*** 1208,1214 **** }  static int
! _SPI_pquery(QueryDesc *queryDesc, bool runit, bool useSnapshotNow, int tcount) {     int            operation =
queryDesc->operation;    int            res;
 
--- 1211,1218 ---- }  static int
! _SPI_pquery(QueryDesc *queryDesc, bool runit,
!             bool useCurrentSnapshot, int tcount) {     int            operation = queryDesc->operation;     int
    res;
 
***************
*** 1245,1251 ****         ResetUsage(); #endif 
!     ExecutorStart(queryDesc, useSnapshotNow, false);      ExecutorRun(queryDesc, ForwardScanDirection, (long)
tcount);
 
--- 1249,1255 ----         ResetUsage(); #endif 
!     ExecutorStart(queryDesc, useCurrentSnapshot, false);      ExecutorRun(queryDesc, ForwardScanDirection, (long)
tcount);
 
*** src/backend/storage/ipc/sinval.c.orig    Wed Sep 24 14:54:01 2003
--- src/backend/storage/ipc/sinval.c    Wed Oct  1 16:02:42 2003
***************
*** 330,339 ****      * lastBackend would be sufficient.  But it seems better to do the      * malloc while not holding
thelock, so we can't look at lastBackend.      *
 
!      * if (snapshot->xip != NULL) no need to free and reallocate xip;
!      *
!      * We can reuse the old xip array, because MaxBackends does not change at
!      * runtime.      */     if (snapshot->xip == NULL)     {
--- 330,339 ----      * lastBackend would be sufficient.  But it seems better to do the      * malloc while not holding
thelock, so we can't look at lastBackend.      *
 
!      * This does open a possibility for avoiding repeated malloc/free:
!      * since MaxBackends does not change at runtime, we can simply reuse
!      * the previous xip array if any.  (This relies on the fact that all
!      * calls pass static SnapshotData structs.)      */     if (snapshot->xip == NULL)     {
*** src/backend/utils/adt/ri_triggers.c.orig    Mon Sep 29 13:44:31 2003
--- src/backend/utils/adt/ri_triggers.c    Wed Oct  1 16:49:40 2003
***************
*** 157,162 ****
--- 157,163 ---- static bool ri_PerformCheck(RI_QueryKey *qkey, void *qplan,                 Relation fk_rel, Relation
pk_rel,                HeapTuple old_tuple, HeapTuple new_tuple,
 
+                 bool detectNewRows,                 int expect_OK, const char *constrname); static void
ri_ExtractValues(RI_QueryKey*qkey, int key_idx,                  Relation rel, HeapTuple tuple,
 
***************
*** 276,281 ****
--- 277,283 ----         ri_PerformCheck(&qkey, qplan,                         fk_rel, pk_rel,
NULL,NULL,
 
+                         false,                         SPI_OK_SELECT,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
 
***************
*** 433,438 ****
--- 435,441 ----     ri_PerformCheck(&qkey, qplan,                     fk_rel, pk_rel,                     NULL,
new_row,
+                     false,                     SPI_OK_SELECT,                     tgargs[RI_CONSTRAINT_NAME_ARGNO]);

***************
*** 594,599 ****
--- 597,603 ----     result = ri_PerformCheck(&qkey, qplan,                              fk_rel, pk_rel,
             old_row, NULL,
 
+                              true,        /* treat like update */                              SPI_OK_SELECT, NULL);
   if (SPI_finish() != SPI_OK_FINISH)
 
***************
*** 752,757 ****
--- 756,762 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
          old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_SELECT,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 942,947 ****
--- 947,953 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
          old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_SELECT,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 1102,1107 ****
--- 1108,1114 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_DELETE,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 1285,1290 ****
--- 1292,1298 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, new_row,
 
+                             true, /* must detect new rows */                             SPI_OK_UPDATE,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 1453,1458 ****
--- 1461,1467 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_SELECT,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 1633,1638 ****
--- 1642,1648 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_SELECT,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 1802,1807 ****
--- 1812,1818 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_UPDATE,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 2019,2024 ****
--- 2030,2036 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_UPDATE,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 2188,2193 ****
--- 2200,2206 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_UPDATE,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 2392,2397 ****
--- 2405,2411 ----             ri_PerformCheck(&qkey, qplan,                             fk_rel, pk_rel,
            old_row, NULL,
 
+                             true, /* must detect new rows */                             SPI_OK_UPDATE,
             tgargs[RI_CONSTRAINT_NAME_ARGNO]); 
 
***************
*** 2788,2798 ****
--- 2802,2814 ---- ri_PerformCheck(RI_QueryKey *qkey, void *qplan,                 Relation fk_rel, Relation pk_rel,
            HeapTuple old_tuple, HeapTuple new_tuple,
 
+                 bool detectNewRows,                 int expect_OK, const char *constrname) {     Relation
query_rel,                source_rel;     int            key_idx;
 
+     bool        useCurrentSnapshot;     int            limit;     int            spi_result;     AclId
save_uid;
***************
*** 2842,2850 ****                          vals, nulls);     } 
!     /* Switch to proper UID to perform check as */
!     save_uid = GetUserId();
!     SetUserId(RelationGetForm(query_rel)->relowner);      /*      * If this is a select query (e.g., for a 'no
action'or 'restrict'
 
--- 2858,2882 ----                          vals, nulls);     } 
!     /*
!      * In READ COMMITTED mode, we just need to make sure the regular query
!      * snapshot is up-to-date, and we will see all rows that could be
!      * interesting.  In SERIALIZABLE mode, we can't update the regular query
!      * snapshot.  If the caller passes detectNewRows == false then it's okay
!      * to do the query with the transaction snapshot; otherwise we tell the
!      * executor to force a current snapshot (and error out if it finds any
!      * rows under current snapshot that wouldn't be visible per the
!      * transaction snapshot).
!      */
!     if (XactIsoLevel == XACT_SERIALIZABLE)
!     {
!         useCurrentSnapshot = detectNewRows;
!     }
!     else
!     {
!         SetQuerySnapshot();
!         useCurrentSnapshot = false;
!     }      /*      * If this is a select query (e.g., for a 'no action' or 'restrict'
***************
*** 2854,2872 ****      */     limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0; 
!     /*
!      * Run the plan, using SnapshotNow time qual rules so that we can see
!      * all committed tuples, even those committed after our own transaction
!      * or query started.
!      */
!     spi_result = SPI_execp_now(qplan, vals, nulls, limit);      /* Restore UID */     SetUserId(save_uid);      /*
Checkresult */     if (spi_result < 0)
 
!         elog(ERROR, "SPI_execp_now returned %d", spi_result);      if (expect_OK >= 0 && spi_result != expect_OK)
   ri_ReportViolation(qkey, constrname ? constrname : "",
 
--- 2886,2905 ----      */     limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0; 
!     /* Switch to proper UID to perform check as */
!     save_uid = GetUserId();
!     SetUserId(RelationGetForm(query_rel)->relowner);
! 
!     /* Finally we can run the query. */
!     spi_result = SPI_execp_current(qplan, vals, nulls,
!                                    useCurrentSnapshot, limit);      /* Restore UID */     SetUserId(save_uid);
/*Check result */     if (spi_result < 0)
 
!         elog(ERROR, "SPI_execp_current returned %d", spi_result);      if (expect_OK >= 0 && spi_result != expect_OK)
       ri_ReportViolation(qkey, constrname ? constrname : "",
 
*** src/backend/utils/time/tqual.c.orig    Thu Sep 25 14:58:35 2003
--- src/backend/utils/time/tqual.c    Wed Oct  1 16:02:52 2003
***************
*** 26,39 **** #include "storage/sinval.h" #include "utils/tqual.h" 
! 
! static SnapshotData SnapshotDirtyData;
! Snapshot    SnapshotDirty = &SnapshotDirtyData;
!  static SnapshotData QuerySnapshotData; static SnapshotData SerializableSnapshotData; Snapshot    QuerySnapshot =
NULL;Snapshot    SerializableSnapshot = NULL;  /* These are updated by GetSnapshotData: */ TransactionId RecentXmin =
InvalidTransactionId;
--- 26,44 ---- #include "storage/sinval.h" #include "utils/tqual.h" 
! /*
!  * The SnapshotData structs are static to simplify memory allocation
!  * (see the hack in GetSnapshotData to avoid repeated malloc/free).
!  */ static SnapshotData QuerySnapshotData; static SnapshotData SerializableSnapshotData;
+ static SnapshotData CurrentSnapshotData;
+ static SnapshotData SnapshotDirtyData;
+ 
+ /* Externally visible pointers to valid snapshots: */ Snapshot    QuerySnapshot = NULL; Snapshot
SerializableSnapshot= NULL;
 
+ Snapshot    SnapshotDirty = &SnapshotDirtyData;  /* These are updated by GetSnapshotData: */ TransactionId RecentXmin
=InvalidTransactionId;
 
***************
*** 387,396 ****  *    CurrentCommandId.  */ int
! HeapTupleSatisfiesUpdate(HeapTuple htuple, CommandId curcid) {
-     HeapTupleHeader tuple = htuple->t_data;
-      if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))     {         if (tuple->t_infomask & HEAP_XMIN_INVALID)
--- 392,399 ----  *    CurrentCommandId.  */ int
! HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid) {     if (!(tuple->t_infomask &
HEAP_XMIN_COMMITTED))    {         if (tuple->t_infomask & HEAP_XMIN_INVALID)
 
***************
*** 1024,1029 ****
--- 1027,1068 ---- }  /*
+  * CopyCurrentSnapshot
+  *        Make a snapshot that is up-to-date as of the current instant,
+  *        and return a copy.
+  *
+  * The copy is palloc'd in the current memory context.
+  */
+ Snapshot
+ CopyCurrentSnapshot(void)
+ {
+     Snapshot    currentSnapshot;
+     Snapshot    snapshot;
+ 
+     if (QuerySnapshot == NULL)    /* should not be first call in xact */
+         elog(ERROR, "no snapshot has been set");
+ 
+     /* Update the static struct */
+     currentSnapshot = GetSnapshotData(&CurrentSnapshotData, false);
+     currentSnapshot->curcid = GetCurrentCommandId();
+ 
+     /* Make a copy */
+     snapshot = (Snapshot) palloc(sizeof(SnapshotData));
+     memcpy(snapshot, currentSnapshot, sizeof(SnapshotData));
+     if (snapshot->xcnt > 0)
+     {
+         snapshot->xip = (TransactionId *)
+             palloc(snapshot->xcnt * sizeof(TransactionId));
+         memcpy(snapshot->xip, currentSnapshot->xip,
+                snapshot->xcnt * sizeof(TransactionId));
+     }
+     else
+         snapshot->xip = NULL;
+ 
+     return snapshot;
+ }
+ 
+ /*  * FreeXactSnapshot  *        Free snapshot(s) at end of transaction.  */
***************
*** 1031,1038 **** FreeXactSnapshot(void) {     /*
!      * We do not free(QuerySnapshot->xip); or
!      * free(SerializableSnapshot->xip); they will be reused soon      */     QuerySnapshot = NULL;
SerializableSnapshot= NULL;
 
--- 1070,1078 ---- FreeXactSnapshot(void) {     /*
!      * We do not free the xip arrays for the snapshot structs;
!      * they will be reused soon.  So this is now just a state
!      * change to prevent outside callers from accessing the snapshots.      */     QuerySnapshot = NULL;
SerializableSnapshot= NULL;
 
*** src/include/access/heapam.h.orig    Mon Sep 15 19:33:43 2003
--- src/include/access/heapam.h    Wed Oct  1 15:35:11 2003
***************
*** 155,163 ****  extern Oid    heap_insert(Relation relation, HeapTuple tup, CommandId cid); extern int
heap_delete(Relationrelation, ItemPointer tid, ItemPointer ctid,
 
!             CommandId cid, bool wait); extern int heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
!             ItemPointer ctid, CommandId cid, bool wait); extern int heap_mark4update(Relation relation, HeapTuple
tup,                 Buffer *userbuf, CommandId cid); 
 
--- 155,163 ----  extern Oid    heap_insert(Relation relation, HeapTuple tup, CommandId cid); extern int
heap_delete(Relationrelation, ItemPointer tid, ItemPointer ctid,
 
!             CommandId cid, Snapshot crosscheck, bool wait); extern int heap_update(Relation relation, ItemPointer
otid,HeapTuple tup,
 
!             ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait); extern int heap_mark4update(Relation
relation,HeapTuple tup,                  Buffer *userbuf, CommandId cid); 
 
*** src/include/executor/executor.h.orig    Thu Sep 25 14:58:35 2003
--- src/include/executor/executor.h    Wed Oct  1 15:35:05 2003
***************
*** 85,91 **** /*  * prototypes from functions in execMain.c  */
! extern void ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow,                           bool explainOnly);
externTupleTableSlot *ExecutorRun(QueryDesc *queryDesc,             ScanDirection direction, long count);
 
--- 85,91 ---- /*  * prototypes from functions in execMain.c  */
! extern void ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot,                           bool explainOnly);
externTupleTableSlot *ExecutorRun(QueryDesc *queryDesc,             ScanDirection direction, long count);
 
*** src/include/executor/spi.h.orig    Thu Sep 25 14:58:36 2003
--- src/include/executor/spi.h    Wed Oct  1 16:33:39 2003
***************
*** 84,91 **** extern int    SPI_exec(const char *src, int tcount); extern int SPI_execp(void *plan, Datum *values,
constchar *Nulls,           int tcount);
 
! extern int SPI_execp_now(void *plan, Datum *values, const char *Nulls,
!           int tcount); extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes); extern void
*SPI_saveplan(void*plan); extern int    SPI_freeplan(void *plan);
 
--- 84,91 ---- extern int    SPI_exec(const char *src, int tcount); extern int SPI_execp(void *plan, Datum *values,
constchar *Nulls,           int tcount);
 
! extern int SPI_execp_current(void *plan, Datum *values, const char *Nulls,
!                              bool useCurrentSnapshot, int tcount); extern void *SPI_prepare(const char *src, int
nargs,Oid *argtypes); extern void *SPI_saveplan(void *plan); extern int    SPI_freeplan(void *plan);
 
*** src/include/nodes/execnodes.h.orig    Thu Sep 25 14:58:36 2003
--- src/include/nodes/execnodes.h    Wed Oct  1 15:35:00 2003
***************
*** 286,292 ****     /* Basic state for all query types: */     ScanDirection es_direction; /* current scan direction
*/    Snapshot    es_snapshot;    /* time qual to use */
 
!     CommandId    es_snapshot_cid;    /* CommandId component of time qual */     List       *es_range_table; /* List
ofRangeTableEntrys */      /* Info about target table for insert/update/delete queries: */
 
--- 286,292 ----     /* Basic state for all query types: */     ScanDirection es_direction; /* current scan direction
*/    Snapshot    es_snapshot;    /* time qual to use */
 
!     Snapshot    es_crosscheck_snapshot;    /* crosscheck time qual for RI */     List       *es_range_table; /* List
ofRangeTableEntrys */      /* Info about target table for insert/update/delete queries: */
 
*** src/include/utils/tqual.h.orig    Thu Sep 25 14:58:36 2003
--- src/include/utils/tqual.h    Wed Oct  1 16:02:21 2003
***************
*** 100,106 **** extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple); extern bool
HeapTupleSatisfiesSnapshot(HeapTupleHeadertuple,                            Snapshot snapshot);
 
! extern int HeapTupleSatisfiesUpdate(HeapTuple tuple,                          CommandId curcid); extern HTSV_Result
HeapTupleSatisfiesVacuum(HeapTupleHeadertuple,                          TransactionId OldestXmin);
 
--- 100,106 ---- extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple); extern bool
HeapTupleSatisfiesSnapshot(HeapTupleHeadertuple,                            Snapshot snapshot);
 
! extern int HeapTupleSatisfiesUpdate(HeapTupleHeader tuple,                          CommandId curcid); extern
HTSV_ResultHeapTupleSatisfiesVacuum(HeapTupleHeader tuple,                          TransactionId OldestXmin);
 
***************
*** 108,113 ****
--- 108,114 ---- extern Snapshot GetSnapshotData(Snapshot snapshot, bool serializable); extern void
SetQuerySnapshot(void);extern Snapshot CopyQuerySnapshot(void);
 
+ extern Snapshot CopyCurrentSnapshot(void); extern void FreeXactSnapshot(void);  #endif   /* TQUAL_H */


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andrew Dunstan
Дата:
Сообщение: Re: initdb
Следующее
От: Tom Lane
Дата:
Сообщение: Re: FreeSpaceMap hashtable out of memory