Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING
Дата
Msg-id 20170718.181213.206979369.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING  (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>)
Список pgsql-hackers
Thank you for the comments.

At Mon, 17 Jul 2017 16:09:04 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <9897.1500322144@sss.pgh.pa.us>
> Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> writes:
> > This is the revased and revised version of the previous patch.
> 
> A few more comments:
> 
> * Per the spec for CacheRegisterSyscacheCallback, a zero hash value means
> to flush all associated state.  This isn't handling that case correctly.

Right, fixed.

> Even when you are given a specific hash value, I think exiting after
> finding one match is incorrect: there could be multiple connections
> to the same server with different user mappings, or vice versa.

Sure. I'm confused that key hash value nails an entry in "the
connection cache". Thank you for pointing out that.

> * I'm not really sure that it's worth the trouble to pay attention
> to the hash value at all.  Very few other syscache callbacks do,
> and the pg_server/pg_user_mapping catalogs don't seem likely to
> have higher than average traffic.

Agreed to the points. But there is another point that reconection
is far intensive than re-looking up of a system catalog or maybe
even than replanning. For now I choosed to avoid a possibility of
causing massive number of simultaneous reconnection.

> * Personally I'd be inclined to register the callbacks at the same
> time the hashtables are created, which is a pattern we use elsewhere
> ... in fact, postgres_fdw itself does it that way for the transaction
> related callbacks, so it seems a bit weird that you are going in a
> different direction for these callbacks.  That way avoids the need to
> depend on a _PG_init function and means that the callbacks don't have to
> worry about the hashtables not being valid.

Sure, seems more reasonable than it is now. Changed the way of
registring a callback in the attached.

>  Also, it seems a bit
> pointless to have separate layers of postgresMappingSysCallback and
> InvalidateConnectionForMapping functions.

It used to be one function but it seemed a bit wierd that the
function is called from two places (two catalogs) then branchs
according to the caller. I don't have a firm opinion on this so
changed.

As the result the changes in postgres_fdw.c has been disappeard.

> * How about some regression test cases?  You couldn't really exercise
> cross-session invalidation easily, but I don't think you need to.

Ha Ha. You got me. I will add some test cases for this in the
next version. Thanks.


Ashutosh, I'll register this to the next CF after providing a
regression, thanks.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h"
+ #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" 
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 54,62 ----     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */     bool
have_error;       /* have any subxacts aborted in this xact? */     bool        changing_xact_state;    /* xact state
changein process */
 
+     bool        invalidated;    /* true if reconnect is requried */
+     uint32        server_hashvalue;    /* hash value of foreign server oid */
+     uint32        mapping_hashvalue;  /* hash value of user mapping oid */ } ConnCacheEntry;  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 73,79 ----  /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
 
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 83,89 ----                        SubTransactionId mySubid,                        SubTransactionId parentSubid,
                   void *arg);
 
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry); static bool pgfdw_cancel_query(PGconn *conn); static
boolpgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 136,145 ----          */         RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);
 
+         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                       pgfdw_inval_callback, (Datum)0);
+         CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                       pgfdw_inval_callback, (Datum)0);     }      /* Set flag that we did
GetConnectionduring the current transaction */
 
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)     entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found);     if (!found)     {
 
!         /* initialize new hashtable entry (key is already filled in) */         entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;     }      /* Reject further use of connections which failed abort
cleanup.*/     pgfdw_reject_incomplete_xact_state_change(entry);      /*      * We don't check the health of cached
connectionhere, because it would      * require some overhead.  Broken connection will be detected when the
 
--- 154,182 ----     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);     if (!found)     {
!         /*
!          * key is already filled in, flags well be initialized at the time of
!          * making a new connection, so just clear conn here.
!          */         entry->conn = NULL;     }      /* Reject further use of connections which failed abort cleanup.
*/    pgfdw_reject_incomplete_xact_state_change(entry); 
 
+ 
+     /*
+      * This connection is no longer valid. Disconnect such connections if no
+      * transaction is running.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         /* reconneced immediately, so the messages is "reconnecting"  */
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+      /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the
 
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 195,206 ----         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;
 
+         entry->invalidated = false;
+         entry->changing_xact_state = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);         entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 304,321 ----     return conn; } 
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+ 
+  /*  * For non-superusers, insist that the connstr specify a password.  This  * prevents a password from being picked
upfrom .pgpass, a service file,
 
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 469,519 ---- }  /*
+  * Connection invalidation callback function
+  *
+  * Mark the connections to be disconnected if it depends on a foreign server
+  * or user mapping any options on which have been modified.
+  *
+  * Although most invalidate callbacks blow away all the related stuff
+  * regardless of the given hashvalue, if we blow away all existing connection
+  * from this server, it can cause a massive number of simultaneous
+  * reconnection to all the remotes. We restrict the connection to break to
+  * avoid such a mess.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values but it would be too-much for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn == NULL)
+             continue;
+ 
+         if (hashvalue == 0)
+             entry->invalidated = true;
+         else
+         {
+             Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+ 
+             if ((cacheid == FOREIGNSERVEROID &&
+                  entry->server_hashvalue == hashvalue) ||
+                 (cacheid == USERMAPPINGOID &&
+                  entry->mapping_hashvalue == hashvalue))
+                 entry->invalidated = true;
+         }
+     }
+ }
+ 
+ /*  * Assign a "unique" number for a cursor.  *  * These really only need to be unique per connection within a
transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)             entry->changing_xact_state)         {
      elog(DEBUG3, "discarding connection %p", entry->conn);
 
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;         }     } 
--- 862,868 ----             entry->changing_xact_state)         {             elog(DEBUG3, "discarding connection %p",
entry->conn);
!             disconnect_pg_server(entry);         }     } 
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)     Form_pg_user_mapping umform;
ForeignServer*server; 
 
!     if (!entry->changing_xact_state)         return;      tup = SearchSysCache1(USERMAPPINGOID,
   ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
--- 996,1009 ----     Form_pg_user_mapping umform;     ForeignServer *server; 
!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn ==NULL || !entry->changing_xact_state)         return; 
+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+ 
+     /* find server name to be shown in the message below */     tup = SearchSysCache1(USERMAPPINGOID,
         ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup)) 

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Kyotaro HORIGUCHI
Дата:
Сообщение: Re: [HACKERS] asynchronous execution
Следующее
От: Sokolov Yura
Дата:
Сообщение: [HACKERS] Increase Vacuum ring buffer.