Patch to allow multiple table locks in "Unison"

Поиск
Список
Период
Сортировка
От Neil Padgett
Тема Patch to allow multiple table locks in "Unison"
Дата
Msg-id 3B5C70A3.ED2DCB70@redhat.com
обсуждение исходный текст
Ответы Re: Patch to allow multiple table locks in "Unison"  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-patches
Below find a patch to allow the syntax:

LOCK TABLE tab1, tab2, tab3 [...]

as outlined in the TODO item Commands::Allow LOCK TABLE tab1, tab2, tab3
so all tables locked in unison.

The behaviour is as follows: Behaviour of locking a single table is
unchanged. For multiple tables, the tables are locked in series. The
request blocks until a lock on all tables is obtained, each in turn. The
tables are locked in OID order always -- this should prevent
deadlocking. (See the comments for details.) The single table lock case
is handled separately to save making the sort call, etc. under what will
likely remain the most common case.

The only possible downside to this system, is that starvation is
possible, but to no greater an extent than with explicitly serial
locking (i.e. LOCK a; LOCK b;). However, unlike with users using
multiple requests, deadlock is prevented if everyone uses the new syntax
to obtain multiple locks.

I think I've hit all the necessary places to change things -- however,
this being my first patch to PostgreSQL, I'd especially like some
feedback on whether I've missed something, or if it is the custom to do
something differently in a way different from what I've done.

Neil
Red Hat Database Team

--
Neil Padgett
Red Hat Canada Ltd.                       E-Mail:  npadgett@redhat.com
2323 Yonge Street, Suite #300,
Toronto, ON  M4P 2C9

Index: src/backend/commands/command.c
===================================================================
RCS file:
/home/projects/pgsql/cvsroot/pgsql/src/backend/commands/command.c,v
retrieving revision 1.136
diff -c -p -r1.136 command.c
*** src/backend/commands/command.c    2001/07/16 05:06:57    1.136
--- src/backend/commands/command.c    2001/07/23 16:24:33
*************** needs_toast_table(Relation rel)
*** 1985,1991 ****
--- 1985,2008 ----
      return (tuple_length > TOAST_TUPLE_THRESHOLD);
  }

+ /*
+  *
+  * Helper for Lock table
+  * Used when sorting relations -- see LockTableCommand()
+  *
+  */

+ static int
+ qsort_compareRelOnOID(const void *a, const void *b)
+ {
+     Oid Oid_a, Oid_b;
+
+     Oid_a = RelationGetRelid(*((Relation *) a));
+     Oid_b = RelationGetRelid(*((Relation *) b));
+
+     return (Oid_a < Oid_b) ? -1 : (Oid_a > Oid_b) ? 1 : 0;
+ }
+
  /*
   *
   * LOCK TABLE
*************** needs_toast_table(Relation rel)
*** 1994,2019 ****
  void
  LockTableCommand(LockStmt *lockstmt)
  {
!     Relation    rel;
!     int            aclresult;

!     rel = heap_openr(lockstmt->relname, NoLock);

!     if (rel->rd_rel->relkind != RELKIND_RELATION)
!         elog(ERROR, "LOCK TABLE: %s is not a table", lockstmt->relname);

!     if (lockstmt->mode == AccessShareLock)
!         aclresult = pg_aclcheck(lockstmt->relname, GetUserId(), ACL_SELECT);
!     else
!         aclresult = pg_aclcheck(lockstmt->relname, GetUserId(),
!                                 ACL_UPDATE | ACL_DELETE);

!     if (aclresult != ACLCHECK_OK)
!         elog(ERROR, "LOCK TABLE: permission denied");

!     LockRelation(rel, lockstmt->mode);

!     heap_close(rel, NoLock);    /* close rel, keep lock */
  }


--- 2011,2123 ----
  void
  LockTableCommand(LockStmt *lockstmt)
  {
!     int         relCnt;
!
!     relCnt = length(lockstmt -> rellist);
!
!     /* Handle a single relation lock specially to avoid overhead on
likely the
!        most common case */
!
!     if(relCnt == 1)
!     {
!
!         /* Locking a single table */
!
!         Relation    rel;
!         int            aclresult;
!         char *relname;
!
!         relname = strVal(lfirst(lockstmt->rellist));
!
!         freeList(lockstmt->rellist);
!
!         rel = heap_openr(relname, NoLock);
!
!         if (rel->rd_rel->relkind != RELKIND_RELATION)
!             elog(ERROR, "LOCK TABLE: %s is not a table", relname);
!
!         if (lockstmt->mode == AccessShareLock)
!             aclresult = pg_aclcheck(relname, GetUserId(),
!                                     ACL_SELECT);
!         else
!             aclresult = pg_aclcheck(relname, GetUserId(),
!                                     ACL_UPDATE | ACL_DELETE);
!
!         if (aclresult != ACLCHECK_OK)
!             elog(ERROR, "LOCK TABLE: permission denied");
!
!         LockRelation(rel, lockstmt->mode);
!
!         pfree(relname);
!
!         heap_close(rel, NoLock);    /* close rel, keep lock */
!     }
!     else
!     {
!         List *p;
!         Relation *RelationArray;
!         Relation *pRel;
!
!         /* Locking multiple tables */
!
!         /* Create an array of relations */
!
!         RelationArray = palloc(relCnt * sizeof(Relation));
!         pRel = RelationArray;
!
!         /* Iterate over the list and populate the relation array */
!
!         foreach(p, lockstmt->rellist)
!         {
!             char* relname = strVal(lfirst(p));
!             int            aclresult;
!
!             *pRel = heap_openr(relname, NoLock);
!
!             if ((*pRel)->rd_rel->relkind != RELKIND_RELATION)
!                 elog(ERROR, "LOCK TABLE: %s is not a table",
!                      relname);
!
!             if (lockstmt->mode == AccessShareLock)
!                 aclresult = pg_aclcheck(relname, GetUserId(),
!                                         ACL_SELECT);
!             else
!                 aclresult = pg_aclcheck(relname, GetUserId(),
!                                         ACL_UPDATE | ACL_DELETE);
!
!             if (aclresult != ACLCHECK_OK)
!                 elog(ERROR, "LOCK TABLE: permission denied");
!
!             pRel++;
!             pfree(relname);
!         }
!
!         /* Now, sort the list on OID. OIDs come in very handy here, since
!            they provide a unique unchanging ordering for the relations.
!            Further, new relations always have a higher OID. Hence, they
!            are perfect for our purpose here -- if we always lock relations
!            in OID order, we can avoid deadlock. */

!         qsort((void *) RelationArray, (size_t) relCnt,
!               sizeof(Relation), qsort_compareRelOnOID);

!         /* Now, lock all the relations, closing each after it is locked
!            (Keeping the locks)
!          */

!         for(pRel = RelationArray;
!             pRel < RelationArray + relCnt;
!             pRel++)
!             {
!                 LockRelation(*pRel, lockstmt->mode);

!                 heap_close(*pRel, NoLock);
!             }

!         /* Free the relation array */

!         pfree(RelationArray);
!     }
  }


Index: src/backend/nodes/copyfuncs.c
===================================================================
RCS file:
/home/projects/pgsql/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v
retrieving revision 1.148
diff -c -p -r1.148 copyfuncs.c
*** src/backend/nodes/copyfuncs.c    2001/07/16 19:07:37    1.148
--- src/backend/nodes/copyfuncs.c    2001/07/23 16:24:33
*************** _copyLockStmt(LockStmt *from)
*** 2425,2432 ****
  {
      LockStmt   *newnode = makeNode(LockStmt);

!     if (from->relname)
!         newnode->relname = pstrdup(from->relname);
      newnode->mode = from->mode;

      return newnode;
--- 2425,2432 ----
  {
      LockStmt   *newnode = makeNode(LockStmt);

!     Node_Copy(from, newnode, rellist);
!
      newnode->mode = from->mode;

      return newnode;
Index: src/backend/nodes/equalfuncs.c
===================================================================
RCS file:
/home/projects/pgsql/cvsroot/pgsql/src/backend/nodes/equalfuncs.c,v
retrieving revision 1.96
diff -c -p -r1.96 equalfuncs.c
*** src/backend/nodes/equalfuncs.c    2001/07/16 19:07:38    1.96
--- src/backend/nodes/equalfuncs.c    2001/07/23 16:24:33
*************** _equalDropUserStmt(DropUserStmt *a, Drop
*** 1283,1289 ****
  static bool
  _equalLockStmt(LockStmt *a, LockStmt *b)
  {
!     if (!equalstr(a->relname, b->relname))
          return false;
      if (a->mode != b->mode)
          return false;
--- 1283,1289 ----
  static bool
  _equalLockStmt(LockStmt *a, LockStmt *b)
  {
!     if (!equal(a->rellist, b->rellist))
          return false;
      if (a->mode != b->mode)
          return false;
Index: src/backend/parser/gram.y
===================================================================
RCS file: /home/projects/pgsql/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.238
diff -c -p -r2.238 gram.y
*** src/backend/parser/gram.y    2001/07/16 19:07:40    2.238
--- src/backend/parser/gram.y    2001/07/23 16:24:36
*************** DeleteStmt:  DELETE FROM relation_expr w
*** 3280,3290 ****
                  }
          ;

! LockStmt:    LOCK_P opt_table relation_name opt_lock
                  {
                      LockStmt *n = makeNode(LockStmt);

!                     n->relname = $3;
                      n->mode = $4;
                      $$ = (Node *)n;
                  }
--- 3280,3290 ----
                  }
          ;

! LockStmt:    LOCK_P opt_table relation_name_list opt_lock
                  {
                      LockStmt *n = makeNode(LockStmt);

!                     n->rellist = $3;
                      n->mode = $4;
                      $$ = (Node *)n;
                  }
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file:
/home/projects/pgsql/cvsroot/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.136
diff -c -p -r1.136 parsenodes.h
*** src/include/nodes/parsenodes.h    2001/07/16 19:07:40    1.136
--- src/include/nodes/parsenodes.h    2001/07/23 16:24:37
*************** typedef struct VariableResetStmt
*** 760,766 ****
  typedef struct LockStmt
  {
      NodeTag        type;
!     char       *relname;        /* relation to lock */
      int            mode;            /* lock mode */
  } LockStmt;

--- 760,766 ----
  typedef struct LockStmt
  {
      NodeTag        type;
!     List       *rellist;        /* relations to lock */
      int            mode;            /* lock mode */
  } LockStmt;

Index: src/interfaces/ecpg/preproc/preproc.y
===================================================================
RCS file:
/home/projects/pgsql/cvsroot/pgsql/src/interfaces/ecpg/preproc/preproc.y,v
retrieving revision 1.146
diff -c -p -r1.146 preproc.y
*** src/interfaces/ecpg/preproc/preproc.y    2001/07/16 05:07:00    1.146
--- src/interfaces/ecpg/preproc/preproc.y    2001/07/23 16:24:40
*************** DeleteStmt:  DELETE FROM relation_expr w
*** 2421,2427 ****
                  }
          ;

! LockStmt:  LOCK_P opt_table relation_name opt_lock
                  {
                      $$ = cat_str(4, make_str("lock"), $2, $3, $4);
                  }
--- 2421,2427 ----
                  }
          ;

! LockStmt:  LOCK_P opt_table relation_name_list opt_lock
                  {
                      $$ = cat_str(4, make_str("lock"), $2, $3, $4);
                  }

В списке pgsql-patches по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: pg_hba.conf caching
Следующее
От: Bruce Momjian
Дата:
Сообщение: Re: pg_hba.conf caching