Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING
Дата
Msg-id 20170714.173424.239554763.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
Ответы Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
Thank you for the comments.

At Thu, 13 Jul 2017 16:54:42 +0530, Ashutosh Bapat <ashutosh.bapat@enterprisedb.com> wrote in
<CAFjFpRd0yz3v0rL2yxmr95e_iDntkeQia9709KXaHLyVcZ=_mQ@mail.gmail.com>
> On Thu, Jul 13, 2017 at 2:53 PM, Kyotaro HORIGUCHI
> <horiguchi.kyotaro@lab.ntt.co.jp> wrote:
> > Hello, moved to pgsql-hackers.
> >
> > This is the revased and revised version of the previous patch.
> >
> > At Thu, 13 Jul 2017 13:42:49 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote
in<20170713.134249.97825982.horiguchi.kyotaro@lab.ntt.co.jp>
 
> > This patch is postgres_fdw-private but it's annoying that hash
> > value of syscache is handled in connection.c. If we allow to add
> > something to the core for this feature, I could add a new member
> > in FdwRoutine to notify invalidation of mapping and server by
> > oid. (Of course it is not back-patcheable, though)
> >
> > Does anyone have opinitons or suggestions?
> >
> 
> The patch and the idea looks good to me. I haven't reviewed it
> thoroughly though.
> 
> I noticed a type "suporious", I think you meant "spurious"? Probably

Right, it is too bad typo, but fixed it as "unnecessary", which
would more appropriate here.

> that comment should be part of the function which marks the connection
> as invalid e.g. InvalidateConnectionForMapping().

Agreed. It'd been there but somehow I moved it to there. I have
moved it back to the place it used to be.

> pgfdw_xact_callback() reports the reason for disconnection while
> closing a connection. May be we want to report the reason for
> disconnection here as well. Also, may be we want to create a function

Agreed. Also, I had placed LOG message there but removedxs. Now it
emits a DEBUG3 message as shown below.

| DEBUG: closing connection 0xxxx for option changes to take effect
| DEBUG: new postgres_fdw connection 0xxxx for server ".." (user mapping oid 

> to discard connection from an entry so that we consistently do the
> same things while discarding a connection.

Sure. Now there's two places a connection is closed intentionally.

I'm a bit uneasy that many menbers of entry is getting reset in
so many places. Since the validity of an entry is checked only by
conn so it is enough to clear the flags of ConnCacheEntry only at
the time of connection creation. Instead,
pgfdw_reject_incomplete_xact_state_chanbe is no longer complains
on an inactive (conn == NULL) entry. I think this is safe but a
bit inconfident..

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 53,58 **** typedef struct ConnCacheEntry
--- 53,61 ----     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */     bool
have_error;       /* have any subxacts aborted in this xact? */     bool        changing_xact_state;    /* xact state
changein process */
 
+     bool        invalidated;    /* true if reconnect is requried */
+     uint32        server_hashvalue;    /* hash value of foreign server oid */
+     uint32        mapping_hashvalue;  /* hash value of user mapping oid */ } ConnCacheEntry;  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 72,78 ----  /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
 
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 144,160 **** GetConnection(UserMapping *user, bool will_prep_stmt)     entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found);     if (!found)     {
 
!         /* initialize new hashtable entry (key is already filled in) */         entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;     }      /* Reject further use of connections which failed abort
cleanup.*/     pgfdw_reject_incomplete_xact_state_change(entry);      /*      * We don't check the health of cached
connectionhere, because it would      * require some overhead.  Broken connection will be detected when the
 
--- 148,176 ----     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);     if (!found)     {
!         /*
!          * key is already filled in, flags well be initialized at the time of
!          * making a new connection, so just clear conn here.
!          */         entry->conn = NULL;     }      /* Reject further use of connections which failed abort cleanup.
*/    pgfdw_reject_incomplete_xact_state_change(entry); 
 
+ 
+     /*
+      * This connection is no longer valid. Disconnect such connections if no
+      * transaction is running.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         /* reconneced immediately, so the messages is "reconnecting"  */
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+      /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the
 
***************
*** 173,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 189,200 ----         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;
 
+         entry->invalidated = false;
+         entry->changing_xact_state = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);         entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 276,281 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 298,315 ----     return conn; } 
+ /* disconnect the connection for a connection cache entry */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+ 
+  /*  * For non-superusers, insist that the connstr specify a password.  This  * prevents a password from being picked
upfrom .pgpass, a service file,
 
***************
*** 429,434 **** ReleaseConnection(PGconn *conn)
--- 463,527 ---- }  /*
+  * Connection invalidation functions.
+  *
+  * Changes of some options of foreign server or user mapping that a connection
+  * depends on requires the connection to be disconnected at an oppotunity. The
+  * parameter is the hash value of target syscache entry given through syscache
+  * invalidation.
+  *
+  * NB: We could avoid unnecessary disconnection by examining individual option
+  * values but it would be too-much for the gain.
+  */
+ 
+ /* Connection invalidation by modifying foreign server options. */
+ void
+ InvalidateConnectionForServer(uint32 server_hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn != NULL &&
+             entry->server_hashvalue == server_hashvalue)
+         {
+             entry->invalidated = true;
+             hash_seq_term(&scan);
+             break;
+         }
+     }
+ }
+ 
+ /* Connection invalidation by modifying user mapping options. */
+ void
+ InvalidateConnectionForMapping(uint32 mapping_hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     if (!ConnectionHash)
+         return;
+ 
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         if (entry->conn != NULL &&
+             entry->mapping_hashvalue == mapping_hashvalue)
+         {
+             entry->invalidated = true;
+             hash_seq_term(&scan);
+             break;
+         }
+     }
+ }
+ 
+ 
+ /*  * Assign a "unique" number for a cursor.  *  * These really only need to be unique per connection within a
transaction.
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)             entry->changing_xact_state)         {
      elog(DEBUG3, "discarding connection %p", entry->conn);
 
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;         }     } 
--- 870,876 ----             entry->changing_xact_state)         {             elog(DEBUG3, "discarding connection %p",
entry->conn);
!             disconnect_pg_server(entry);         }     } 
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)     Form_pg_user_mapping umform;
ForeignServer*server; 
 
!     if (!entry->changing_xact_state)         return;      tup = SearchSysCache1(USERMAPPINGOID,
   ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
--- 1004,1017 ----     Form_pg_user_mapping umform;     ForeignServer *server; 
!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn ==NULL || !entry->changing_xact_state)         return; 
+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+ 
+     /* find server name to be shown in the message below */     tup = SearchSysCache1(USERMAPPINGOID,
         ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 36,41 ****
--- 36,43 ---- #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/guc.h"
+ #include "utils/inval.h"
+ #include "utils/syscache.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h"
***************
*** 420,425 **** static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
--- 422,429 ----                   const PgFdwRelationInfo *fpinfo_o,                   const PgFdwRelationInfo
*fpinfo_i);
 
+ void        _PG_init(void);
+ void        _PG_fini(void);  /*  * Foreign-data wrapper handler function: return a struct with pointers
***************
*** 476,481 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 480,514 ---- }  /*
+  *  Callback functions for connection invalidation by syscache invalidation
+  */
+ static void
+ postgresServerSysCallback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     InvalidateConnectionForServer(hashvalue);
+ }
+ 
+ static void
+ postgresMappingSysCallback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     InvalidateConnectionForMapping(hashvalue);
+ }
+ 
+ void
+ _PG_init(void)
+ {
+     CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                   postgresServerSysCallback, (Datum)0);
+     CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                   postgresMappingSysCallback, (Datum)0);
+ }
+ 
+ void
+ _PG_fini(void)
+ {
+ }
+ 
+ /*  * postgresGetForeignRelSize  *        Estimate # of rows and width of the result of the scan  *
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 117,122 **** extern void reset_transmission_modes(int nestlevel);
--- 117,124 ---- /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern
voidReleaseConnection(PGconn *conn);
 
+ extern void InvalidateConnectionForServer(uint32 server_hashvalue);
+ extern void InvalidateConnectionForMapping(uint32 mapping_hashvalue); extern unsigned int GetCursorNumber(PGconn
*conn);extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char
*query);

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Ashutosh Bapat
Дата:
Сообщение: Re: [HACKERS] Partition-wise join for join between (declaratively)partitioned tables
Следующее
От: Ashutosh Bapat
Дата:
Сообщение: Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING