Re: [BUGS] ON DELETE SET NULL clauses do error when more than two columns are referenced to one table

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [BUGS] ON DELETE SET NULL clauses do error when more than two columns are referenced to one table
Дата
Msg-id 10277.1187118440@sss.pgh.pa.us
обсуждение исходный текст
Список pgsql-patches
I wrote:
> Heikki Linnakangas <heikki@enterprisedb.com> writes:
>> I'm not sure what to do about this. We could change the order the
>> triggers are fired to breadth-first. If all the setnull triggers were
>> executed first, there would be no problem. But that seems like a pretty
>> big change, and I'm afraid it might have other unintended consequences.

> I think it's not so much that they should be "breadth first" as that the
> updates generated by the triggers shouldn't count as their own
> sub-statements.  The trigger events generated by those updates need to
> go at the end of the outer statement's trigger queue.

Actually, Heikki's description is isomorphic to mine ...

I coded this up (patch attached, sans regression test additions) and
it seems to work: Pavel's and Heikki's test cases do what is expected.
The regression tests pass modulo this diff:

*** ./expected/foreign_key.out    Tue Jul 17 13:45:28 2007
--- ./results/foreign_key.out    Tue Aug 14 14:39:38 2007
***************
*** 646,652 ****
  UPDATE PKTABLE set ptest2=5 where ptest2=2;
  ERROR:  insert or update on table "fktable" violates foreign key constraint "constrname3"
  DETAIL:  Key (ftest1,ftest2,ftest3)=(1,-1,3) is not present in table "pktable".
- CONTEXT:  SQL statement "UPDATE ONLY "public"."fktable" SET "ftest2" = DEFAULT WHERE $1 OPERATOR(pg_catalog.=)
"ftest1"AND $2 OPERATOR(pg_catalog.=) "ftest2" AND $3 OPERATOR(pg_catalog.=) "ftest3"" 
  -- Try to update something that will set default
  UPDATE PKTABLE set ptest1=0, ptest2=5, ptest3=10 where ptest2=2;
  UPDATE PKTABLE set ptest2=10 where ptest2=4;
--- 646,651 ----

which is happening because the error is no longer detected while we are
inside the first RI trigger's UPDATE command, but only while handling
triggers queued for the outer query.  This is a little bit annoying IMHO
because the connection of the error message to the original query isn't
very obvious, and seeing the RI update query might help make it a bit
clearer.  OTOH the RI update query is ugly enough that I'm not sure the
average user would be helped by the CONTEXT line.  In any case it seems
difficult to avoid this change.

Another problem is that, because the indirectly-fired RI triggers are
ultimately run in a context where their target table is not the direct
target of the current query, trigger.c doesn't have any rangetable entry
to connect them to (this is why the patch has to touch trigger.c, as it
formerly considered that an error case).  This has a performance cost
(extra heap_opens) that is probably usually negligible, but maybe not
always.  Also, as the patch stands EXPLAIN ANALYZE would fail to report
the runtime of such triggers separately --- they'd be included in the
bottom-line total runtime but not listed by themselves.  (In current
code they aren't listed separately either ... but they'd be charged to
the parent trigger that caused them to be fired, which is better than
nothing.)

I am tempted to try to fix these last two problems by having
afterTriggerInvokeEvents() dynamically add tables to the executor's
range table when it's told to fire triggers for tables that aren't
already listed there.  That's getting a bit hairy for a back-patchable
fix though.  I'm thinking of trying that in HEAD but applying this
patch as-is to 8.0 through 8.2.  (Note that 7.x doesn't exhibit the
bug, mainly because it never tried to fire any triggers earlier than
end-of-interactive-command.)

Comments?

            regards, tom lane

Index: src/backend/commands/trigger.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.216
diff -c -r1.216 trigger.c
*** src/backend/commands/trigger.c    17 Jul 2007 17:45:28 -0000    1.216
--- src/backend/commands/trigger.c    14 Aug 2007 18:48:11 -0000
***************
*** 2332,2337 ****
--- 2332,2338 ----
      AfterTriggerEvent event,
                  prev_event;
      MemoryContext per_tuple_context;
+     bool        locally_opened = false;
      Relation    rel = NULL;
      TriggerDesc *trigdesc = NULL;
      FmgrInfo   *finfo = NULL;
***************
*** 2364,2369 ****
--- 2365,2383 ----
               */
              if (rel == NULL || rel->rd_id != event->ate_relid)
              {
+                 if (locally_opened)
+                 {
+                     /* close prior rel if any */
+                     if (rel)
+                         heap_close(rel, NoLock);
+                     if (trigdesc)
+                         FreeTriggerDesc(trigdesc);
+                     if (finfo)
+                         pfree(finfo);
+                     Assert(instr == NULL);        /* never used in this case */
+                 }
+                 locally_opened = true;
+
                  if (estate)
                  {
                      /* Find target relation among estate's result rels */
***************
*** 2375,2402 ****
                      while (nr > 0)
                      {
                          if (rInfo->ri_RelationDesc->rd_id == event->ate_relid)
                              break;
                          rInfo++;
                          nr--;
                      }
-                     if (nr <= 0)    /* should not happen */
-                         elog(ERROR, "could not find relation %u among query result relations",
-                              event->ate_relid);
-                     rel = rInfo->ri_RelationDesc;
-                     trigdesc = rInfo->ri_TrigDesc;
-                     finfo = rInfo->ri_TrigFunctions;
-                     instr = rInfo->ri_TrigInstrument;
                  }
!                 else
                  {
!                     /* Hard way: we manage the resources for ourselves */
!                     if (rel)
!                         heap_close(rel, NoLock);
!                     if (trigdesc)
!                         FreeTriggerDesc(trigdesc);
!                     if (finfo)
!                         pfree(finfo);
!                     Assert(instr == NULL);        /* never used in this case */

                      /*
                       * We assume that an appropriate lock is still held by the
--- 2389,2410 ----
                      while (nr > 0)
                      {
                          if (rInfo->ri_RelationDesc->rd_id == event->ate_relid)
+                         {
+                             rel = rInfo->ri_RelationDesc;
+                             trigdesc = rInfo->ri_TrigDesc;
+                             finfo = rInfo->ri_TrigFunctions;
+                             instr = rInfo->ri_TrigInstrument;
+                             locally_opened = false;
                              break;
+                         }
                          rInfo++;
                          nr--;
                      }
                  }
!
!                 if (locally_opened)
                  {
!                     /* Hard way: open target relation for ourselves */

                      /*
                       * We assume that an appropriate lock is still held by the
***************
*** 2421,2426 ****
--- 2429,2435 ----
                          palloc0(trigdesc->numtriggers * sizeof(FmgrInfo));

                      /* Never any EXPLAIN info in this case */
+                     instr = NULL;
                  }
              }

***************
*** 2471,2477 ****
      events->tail = prev_event;

      /* Release working resources */
!     if (!estate)
      {
          if (rel)
              heap_close(rel, NoLock);
--- 2480,2486 ----
      events->tail = prev_event;

      /* Release working resources */
!     if (locally_opened)
      {
          if (rel)
              heap_close(rel, NoLock);
***************
*** 2600,2610 ****
       * IMMEDIATE: all events we have decided to defer will be available for it
       * to fire.
       *
       * If we find no firable events, we don't have to increment
       * firing_counter.
       */
      events = &afterTriggers->query_stack[afterTriggers->query_depth];
!     if (afterTriggerMarkEvents(events, &afterTriggers->events, true))
      {
          CommandId    firing_id = afterTriggers->firing_counter++;

--- 2609,2621 ----
       * IMMEDIATE: all events we have decided to defer will be available for it
       * to fire.
       *
+      * We loop in case a trigger queues more events.
+      *
       * If we find no firable events, we don't have to increment
       * firing_counter.
       */
      events = &afterTriggers->query_stack[afterTriggers->query_depth];
!     while (afterTriggerMarkEvents(events, &afterTriggers->events, true))
      {
          CommandId    firing_id = afterTriggers->firing_counter++;

***************
*** 2648,2654 ****
          ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());

      /*
!      * Run all the remaining triggers.    Loop until they are all gone, just in
       * case some trigger queues more for us to do.
       */
      while (afterTriggerMarkEvents(events, NULL, false))
--- 2659,2665 ----
          ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());

      /*
!      * Run all the remaining triggers.    Loop until they are all gone, in
       * case some trigger queues more for us to do.
       */
      while (afterTriggerMarkEvents(events, NULL, false))
***************
*** 3211,3217 ****
      {
          AfterTriggerEventList *events = &afterTriggers->events;

!         if (afterTriggerMarkEvents(events, NULL, true))
          {
              CommandId    firing_id = afterTriggers->firing_counter++;

--- 3222,3228 ----
      {
          AfterTriggerEventList *events = &afterTriggers->events;

!         while (afterTriggerMarkEvents(events, NULL, true))
          {
              CommandId    firing_id = afterTriggers->firing_counter++;

Index: src/backend/executor/spi.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/executor/spi.c,v
retrieving revision 1.179
diff -c -r1.179 spi.c
*** src/backend/executor/spi.c    27 Apr 2007 22:05:47 -0000    1.179
--- src/backend/executor/spi.c    14 Aug 2007 18:48:11 -0000
***************
*** 39,47 ****
  static int _SPI_execute_plan(SPIPlanPtr plan,
                    Datum *Values, const char *Nulls,
                    Snapshot snapshot, Snapshot crosscheck_snapshot,
!                   bool read_only, long tcount);

! static int    _SPI_pquery(QueryDesc *queryDesc, long tcount);

  static void _SPI_error_callback(void *arg);

--- 39,47 ----
  static int _SPI_execute_plan(SPIPlanPtr plan,
                    Datum *Values, const char *Nulls,
                    Snapshot snapshot, Snapshot crosscheck_snapshot,
!                   bool read_only, bool fire_triggers, long tcount);

! static int    _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount);

  static void _SPI_error_callback(void *arg);

***************
*** 316,322 ****

      res = _SPI_execute_plan(&plan, NULL, NULL,
                              InvalidSnapshot, InvalidSnapshot,
!                             read_only, tcount);

      _SPI_end_call(true);
      return res;
--- 316,322 ----

      res = _SPI_execute_plan(&plan, NULL, NULL,
                              InvalidSnapshot, InvalidSnapshot,
!                             read_only, true, tcount);

      _SPI_end_call(true);
      return res;
***************
*** 349,355 ****
      res = _SPI_execute_plan(plan,
                              Values, Nulls,
                              InvalidSnapshot, InvalidSnapshot,
!                             read_only, tcount);

      _SPI_end_call(true);
      return res;
--- 349,355 ----
      res = _SPI_execute_plan(plan,
                              Values, Nulls,
                              InvalidSnapshot, InvalidSnapshot,
!                             read_only, true, tcount);

      _SPI_end_call(true);
      return res;
***************
*** 364,372 ****

  /*
   * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
!  * the caller to specify exactly which snapshots to use.  This is currently
!  * not documented in spi.sgml because it is only intended for use by RI
!  * triggers.
   *
   * Passing snapshot == InvalidSnapshot will select the normal behavior of
   * fetching a new snapshot for each query.
--- 364,375 ----

  /*
   * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
!  * the caller to specify exactly which snapshots to use.  Also, the caller
!  * may specify that AFTER triggers should be queued as part of the outer
!  * query rather than being fired immediately at the end of the command.
!  *
!  * This is currently not documented in spi.sgml because it is only intended
!  * for use by RI triggers.
   *
   * Passing snapshot == InvalidSnapshot will select the normal behavior of
   * fetching a new snapshot for each query.
***************
*** 375,381 ****
  SPI_execute_snapshot(SPIPlanPtr plan,
                       Datum *Values, const char *Nulls,
                       Snapshot snapshot, Snapshot crosscheck_snapshot,
!                      bool read_only, long tcount)
  {
      int            res;

--- 378,384 ----
  SPI_execute_snapshot(SPIPlanPtr plan,
                       Datum *Values, const char *Nulls,
                       Snapshot snapshot, Snapshot crosscheck_snapshot,
!                      bool read_only, bool fire_triggers, long tcount)
  {
      int            res;

***************
*** 392,398 ****
      res = _SPI_execute_plan(plan,
                              Values, Nulls,
                              snapshot, crosscheck_snapshot,
!                             read_only, tcount);

      _SPI_end_call(true);
      return res;
--- 395,401 ----
      res = _SPI_execute_plan(plan,
                              Values, Nulls,
                              snapshot, crosscheck_snapshot,
!                             read_only, fire_triggers, tcount);

      _SPI_end_call(true);
      return res;
***************
*** 1428,1439 ****
   *        behavior of taking a new snapshot for each query.
   * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
   * read_only: TRUE for read-only execution (no CommandCounterIncrement)
   * tcount: execution tuple-count limit, or 0 for none
   */
  static int
  _SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                    Snapshot snapshot, Snapshot crosscheck_snapshot,
!                   bool read_only, long tcount)
  {
      volatile int my_res = 0;
      volatile uint32 my_processed = 0;
--- 1431,1444 ----
   *        behavior of taking a new snapshot for each query.
   * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
   * read_only: TRUE for read-only execution (no CommandCounterIncrement)
+  * fire_triggers: TRUE to fire AFTER triggers at end of query (normal case);
+  *        FALSE means any AFTER triggers are postponed to end of outer query
   * tcount: execution tuple-count limit, or 0 for none
   */
  static int
  _SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                    Snapshot snapshot, Snapshot crosscheck_snapshot,
!                   bool read_only, bool fire_triggers, long tcount)
  {
      volatile int my_res = 0;
      volatile uint32 my_processed = 0;
***************
*** 1589,1595 ****
                                              crosscheck_snapshot,
                                              dest,
                                              paramLI, false);
!                     res = _SPI_pquery(qdesc, canSetTag ? tcount : 0);
                      FreeQueryDesc(qdesc);
                  }
                  else
--- 1594,1601 ----
                                              crosscheck_snapshot,
                                              dest,
                                              paramLI, false);
!                     res = _SPI_pquery(qdesc, fire_triggers,
!                                       canSetTag ? tcount : 0);
                      FreeQueryDesc(qdesc);
                  }
                  else
***************
*** 1680,1686 ****
  }

  static int
! _SPI_pquery(QueryDesc *queryDesc, long tcount)
  {
      int            operation = queryDesc->operation;
      int            res;
--- 1686,1692 ----
  }

  static int
! _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount)
  {
      int            operation = queryDesc->operation;
      int            res;
***************
*** 1726,1732 ****
          ResetUsage();
  #endif

!     AfterTriggerBeginQuery();

      ExecutorStart(queryDesc, 0);

--- 1732,1739 ----
          ResetUsage();
  #endif

!     if (fire_triggers)
!         AfterTriggerBeginQuery();

      ExecutorStart(queryDesc, 0);

***************
*** 1743,1749 ****
      }

      /* Take care of any queued AFTER triggers */
!     AfterTriggerEndQuery(queryDesc->estate);

      ExecutorEnd(queryDesc);

--- 1750,1757 ----
      }

      /* Take care of any queued AFTER triggers */
!     if (fire_triggers)
!         AfterTriggerEndQuery(queryDesc->estate);

      ExecutorEnd(queryDesc);

Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.95
diff -c -r1.95 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c    5 Jun 2007 21:31:06 -0000    1.95
--- src/backend/utils/adt/ri_triggers.c    14 Aug 2007 18:48:12 -0000
***************
*** 2774,2780 ****
                                        NULL, NULL,
                                        CopySnapshot(GetLatestSnapshot()),
                                        InvalidSnapshot,
!                                       true, 1);

      /* Check result */
      if (spi_result != SPI_OK_SELECT)
--- 2774,2780 ----
                                        NULL, NULL,
                                        CopySnapshot(GetLatestSnapshot()),
                                        InvalidSnapshot,
!                                       true, false, 1);

      /* Check result */
      if (spi_result != SPI_OK_SELECT)
***************
*** 3308,3314 ****
      spi_result = SPI_execute_snapshot(qplan,
                                        vals, nulls,
                                        test_snapshot, crosscheck_snapshot,
!                                       false, limit);

      /* Restore UID */
      SetUserId(save_uid);
--- 3308,3314 ----
      spi_result = SPI_execute_snapshot(qplan,
                                        vals, nulls,
                                        test_snapshot, crosscheck_snapshot,
!                                       false, false, limit);

      /* Restore UID */
      SetUserId(save_uid);
Index: src/include/executor/spi.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/executor/spi.h,v
retrieving revision 1.62
diff -c -r1.62 spi.h
*** src/include/executor/spi.h    25 Jul 2007 12:22:53 -0000    1.62
--- src/include/executor/spi.h    14 Aug 2007 18:48:12 -0000
***************
*** 104,110 ****
                       Datum *Values, const char *Nulls,
                       Snapshot snapshot,
                       Snapshot crosscheck_snapshot,
!                      bool read_only, long tcount);
  extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
  extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
                                       int cursorOptions);
--- 104,110 ----
                       Datum *Values, const char *Nulls,
                       Snapshot snapshot,
                       Snapshot crosscheck_snapshot,
!                      bool read_only, bool fire_triggers, long tcount);
  extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
  extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
                                       int cursorOptions);

В списке pgsql-patches по дате отправления:

Предыдущее
От: Bruce Momjian
Дата:
Сообщение: Re: [HACKERS] Function structure in formatting.c
Следующее
От: Tom Lane
Дата:
Сообщение: Re: tsearch core path, v0.58