Re: Open 7.4 items

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Open 7.4 items
Дата
Msg-id 14775.1065387804@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: Open 7.4 items  (Stephan Szabo <sszabo@megazone.bigpanda.com>)
Ответы Re: Open 7.4 items  (Bruce Momjian <pgman@candle.pha.pa.us>)
Re: Open 7.4 items  (Stephan Szabo <sszabo@megazone.bigpanda.com>)
Список pgsql-hackers
Stephan Szabo <sszabo@megazone.bigpanda.com> writes:
> It's not cleaned up, but yes. It appears to work for the simple tests I've
> done and should fall back if the permissions don't work to do a single
> query on both tables.

Here's my code-reviewed version of the patch.  Anyone else want to take
a look?

> I wasn't sure what to do about some of the spi error conditions.  For many
> of them I'm just returning false now so that it will try the other
> mechanism in case that might work. I'm not really sure if that's worth it,
> or if I should just elog(ERROR) and give up.

I think you may as well keep it the same as the other RI routines and
just elog() on SPI error.  If SPI is broken, the trigger procedure is
gonna fail too.

I changed that, consolidated the error-reporting code, and fixed a couple
other little issues, notably:

* The generated query applied ONLY to the FK table but not the PK table.
  I assume this was just an oversight.

* The query is now run using SPI_execp_current and selecting "current"
  snapshot.  Without this, we could fail in a serializable transaction
  if someone else has already committed changes to either relation.
  For example:

    create pk and fk tables;

    begin serializable xact;
    insert into pk values(1);
    insert into fk values(1);

                        begin;
                        insert into fk values(2);
                        commit;

    alter table fk add foreign key ...;

  The ALTER will not be blocked from acquiring exclusive lock, since
  T2 already committed.  But if we run the query in the serializable
  snapshot, it won't see the violating row fk=2.

The old trigger-based check avoids this error because the scan loop uses
SnapshotNow to select live rows from the FK table.  There is a dual race
condition where T2 deletes a row from the PK table.  In current CVS tip
this will be detected and reported as a serialization failure, because
T1 won't be able to get SELECT FOR UPDATE lock on the deleted row.  With
the proposed patch you'll instead see a "no such key" failure, which I
think is fine, even though it nominally violates serializability.

Comments?  Can anyone else do a code review (Jan??)?

            regards, tom lane

*** src/backend/commands/tablecmds.c.orig    Thu Oct  2 15:24:52 2003
--- src/backend/commands/tablecmds.c    Sun Oct  5 16:29:51 2003
***************
*** 3455,3460 ****
--- 3455,3467 ----
      int            count;

      /*
+      * See if we can do it with a single LEFT JOIN query.  A FALSE result
+      * indicates we must proceed with the fire-the-trigger method.
+      */
+     if (RI_Initial_Check(fkconstraint, rel, pkrel))
+         return;
+
+     /*
       * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
       * as if that tuple had just been inserted.  If any of those fail, it
       * should ereport(ERROR) and that's that.
*** src/backend/utils/adt/ri_triggers.c.orig    Wed Oct  1 17:30:52 2003
--- src/backend/utils/adt/ri_triggers.c    Sun Oct  5 16:42:37 2003
***************
*** 40,45 ****
--- 40,46 ----
  #include "rewrite/rewriteHandler.h"
  #include "utils/lsyscache.h"
  #include "utils/typcache.h"
+ #include "utils/acl.h"
  #include "miscadmin.h"


***************
*** 164,170 ****
                   Datum *vals, char *nulls);
  static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                     Relation pk_rel, Relation fk_rel,
!                    HeapTuple violator, bool spi_err);


  /* ----------
--- 165,172 ----
                   Datum *vals, char *nulls);
  static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                     Relation pk_rel, Relation fk_rel,
!                    HeapTuple violator, TupleDesc tupdesc,
!                    bool spi_err);


  /* ----------
***************
*** 2540,2546 ****
--- 2542,2743 ----
  }


+ /* ----------
+  * RI_Initial_Check -
+  *
+  *    Check an entire table for non-matching values using a single query.
+  *    This is not a trigger procedure, but is called during ALTER TABLE
+  *    ADD FOREIGN KEY to validate the initial table contents.
+  *
+  *    We expect that an exclusive lock has been taken on rel and pkrel;
+  *    hence, we do not need to lock individual rows for the check.
+  *
+  *    If the check fails because the current user doesn't have permissions
+  *    to read both tables, return false to let our caller know that they will
+  *    need to do something else to check the constraint.
+  * ----------
+  */
+ bool
+ RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
+ {
+     const char *constrname = fkconstraint->constr_name;
+     char        querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
+                         (MAX_QUOTED_NAME_LEN + 32) * ((RI_MAX_NUMKEYS * 4)+1)];
+     char        pkrelname[MAX_QUOTED_REL_NAME_LEN];
+     char        relname[MAX_QUOTED_REL_NAME_LEN];
+     char        attname[MAX_QUOTED_NAME_LEN];
+     char        fkattname[MAX_QUOTED_NAME_LEN];
+     const char *sep;
+     List        *list;
+     List        *list2;
+     int            spi_result;
+     void        *qplan;
+
+     /*
+      * Check to make sure current user has enough permissions to do the
+      * test query.  (If not, caller can fall back to the trigger method,
+      * which works because it changes user IDs on the fly.)
+      *
+      * XXX are there any other show-stopper conditions to check?
+      */
+     if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+         return false;
+     if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+         return false;
+
+     /*----------
+      * The query string built is:
+      *  SELECT fk.keycols FROM ONLY relname fk
+      *   LEFT OUTER JOIN ONLY pkrelname pk
+      *   ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+      *   WHERE pk.pkkeycol1 IS NULL AND
+      * For MATCH unspecified:
+      *   (fk.keycol1 IS NOT NULL [AND ...])
+      * For MATCH FULL:
+      *   (fk.keycol1 IS NOT NULL [OR ...])
+      *----------
+      */
+
+     sprintf(querystr, "SELECT ");
+     sep="";
+     foreach(list, fkconstraint->fk_attrs)
+     {
+         quoteOneName(attname, strVal(lfirst(list)));
+         snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                  "%sfk.%s", sep, attname);
+         sep = ", ";
+     }
+
+     quoteRelationName(pkrelname, pkrel);
+     quoteRelationName(relname, rel);
+     snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+              " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
+              relname, pkrelname);
+
+     sep="";
+     for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs;
+          list != NIL && list2 != NIL;
+          list=lnext(list), list2=lnext(list2))
+     {
+         quoteOneName(attname, strVal(lfirst(list)));
+         quoteOneName(fkattname, strVal(lfirst(list2)));
+         snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                  "%spk.%s=fk.%s",
+                  sep, attname, fkattname);
+         sep = " AND ";
+     }
+     /*
+      * It's sufficient to test any one pk attribute for null to detect a
+      * join failure.
+      */
+     quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
+     snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+              ") WHERE pk.%s IS NULL AND (", attname);
+
+     sep="";
+     foreach(list, fkconstraint->fk_attrs)
+     {
+         quoteOneName(attname, strVal(lfirst(list)));
+         snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                  "%sfk.%s IS NOT NULL",
+                  sep, attname);
+         switch (fkconstraint->fk_matchtype)
+         {
+             case FKCONSTR_MATCH_UNSPECIFIED:
+                 sep=" AND ";
+                 break;
+             case FKCONSTR_MATCH_FULL:
+                 sep=" OR ";
+                 break;
+             case FKCONSTR_MATCH_PARTIAL:
+                 ereport(ERROR,
+                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                          errmsg("MATCH PARTIAL not yet implemented")));
+                 break;
+             default:
+                 elog(ERROR, "unrecognized match type: %d",
+                      fkconstraint->fk_matchtype);
+                 break;
+         }
+     }
+     snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+              ")");
+
+     if (SPI_connect() != SPI_OK_CONNECT)
+         elog(ERROR, "SPI_connect failed");
+
+     /*
+      * Generate the plan.  We don't need to cache it, and there are no
+      * arguments to the plan.
+      */
+     qplan = SPI_prepare(querystr, 0, NULL);
+
+     /*
+      * Run the plan.  For safety we force a current query snapshot to be
+      * used.  (In serializable mode, this arguably violates serializability,
+      * but we really haven't got much choice.)  We need at most one tuple
+      * returned, so pass limit = 1.
+      */
+     spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);

+     /* Check result */
+     if (spi_result != SPI_OK_SELECT)
+         elog(ERROR, "SPI_execp_current returned %d", spi_result);
+
+     /* Did we find a tuple violating the constraint? */
+     if (SPI_processed > 0)
+     {
+         HeapTuple    tuple = SPI_tuptable->vals[0];
+         TupleDesc    tupdesc = SPI_tuptable->tupdesc;
+         int            nkeys = length(fkconstraint->fk_attrs);
+         int            i;
+         RI_QueryKey    qkey;
+
+         /*
+          * If it's MATCH FULL, and there are any nulls in the FK keys,
+          * complain about that rather than the lack of a match.  MATCH FULL
+          * disallows partially-null FK rows.
+          */
+         if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
+         {
+             bool    isnull = false;
+
+             for (i = 1; i <= nkeys; i++)
+             {
+                 (void) SPI_getbinval(tuple, tupdesc, i, &isnull);
+                 if (isnull)
+                     break;
+             }
+             if (isnull)
+                 ereport(ERROR,
+                         (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                          errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
+                                 RelationGetRelationName(rel),
+                                 constrname),
+                          errdetail("MATCH FULL does not allow mixing of null and nonnull key values.")));
+         }
+
+         /*
+          * Although we didn't cache the query, we need to set up a fake
+          * query key to pass to ri_ReportViolation.
+          */
+         MemSet(&qkey, 0, sizeof(qkey));
+         qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
+         qkey.nkeypairs = nkeys;
+         for (i = 0; i < nkeys; i++)
+             qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
+
+         ri_ReportViolation(&qkey, constrname,
+                            pkrel, rel,
+                            tuple, tupdesc,
+                            false);
+     }
+
+     if (SPI_finish() != SPI_OK_FINISH)
+         elog(ERROR, "SPI_finish failed");
+
+     return true;
+ }


  /* ----------
***************
*** 2905,2910 ****
--- 3102,3108 ----
          ri_ReportViolation(qkey, constrname ? constrname : "",
                             pk_rel, fk_rel,
                             new_tuple ? new_tuple : old_tuple,
+                            NULL,
                             true);

      /* XXX wouldn't it be clearer to do this part at the caller? */
***************
*** 2913,2918 ****
--- 3111,3117 ----
          ri_ReportViolation(qkey, constrname,
                             pk_rel, fk_rel,
                             new_tuple ? new_tuple : old_tuple,
+                            NULL,
                             false);

      return SPI_processed != 0;
***************
*** 2950,2956 ****
  static void
  ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                     Relation pk_rel, Relation fk_rel,
!                    HeapTuple violator, bool spi_err)
  {
  #define BUFLENGTH    512
      char        key_names[BUFLENGTH];
--- 3149,3156 ----
  static void
  ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                     Relation pk_rel, Relation fk_rel,
!                    HeapTuple violator, TupleDesc tupdesc,
!                    bool spi_err)
  {
  #define BUFLENGTH    512
      char        key_names[BUFLENGTH];
***************
*** 2958,2964 ****
      char       *name_ptr = key_names;
      char       *val_ptr = key_values;
      bool        onfk;
-     Relation    rel;
      int            idx,
                  key_idx;

--- 3158,3163 ----
***************
*** 2972,2989 ****
                   errhint("This is most likely due to a rule having rewritten the query.")));

      /*
!      * rel is set to where the tuple description is coming from.
       */
      onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
      if (onfk)
      {
-         rel = fk_rel;
          key_idx = RI_KEYPAIR_FK_IDX;
      }
      else
      {
-         rel = pk_rel;
          key_idx = RI_KEYPAIR_PK_IDX;
      }

      /*
--- 3171,3191 ----
                   errhint("This is most likely due to a rule having rewritten the query.")));

      /*
!      * Determine which relation to complain about.  If tupdesc wasn't
!      * passed by caller, assume the violator tuple came from there.
       */
      onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
      if (onfk)
      {
          key_idx = RI_KEYPAIR_FK_IDX;
+         if (tupdesc == NULL)
+             tupdesc = fk_rel->rd_att;
      }
      else
      {
          key_idx = RI_KEYPAIR_PK_IDX;
+         if (tupdesc == NULL)
+             tupdesc = pk_rel->rd_att;
      }

      /*
***************
*** 3008,3015 ****
          char       *name,
                     *val;

!         name = SPI_fname(rel->rd_att, fnum);
!         val = SPI_getvalue(violator, rel->rd_att, fnum);
          if (!val)
              val = "null";

--- 3210,3217 ----
          char       *name,
                     *val;

!         name = SPI_fname(tupdesc, fnum);
!         val = SPI_getvalue(violator, tupdesc, fnum);
          if (!val)
              val = "null";

*** src/include/commands/trigger.h.orig    Sun Aug  3 23:01:31 2003
--- src/include/commands/trigger.h    Sun Oct  5 16:11:44 2003
***************
*** 197,201 ****
--- 197,204 ----
   * in utils/adt/ri_triggers.c
   */
  extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
+ extern bool RI_Initial_Check(FkConstraint *fkconstraint,
+                              Relation rel,
+                              Relation pkrel);

  #endif   /* TRIGGER_H */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Bruce Momjian
Дата:
Сообщение: Re: PQfnumber and quoted identifiers
Следующее
От: Bruce Momjian
Дата:
Сообщение: Re: Open 7.4 items