Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING
Дата
Msg-id 20170721.142551.115979579.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTER USER MAPPING  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [HACKERS] PgFDW connection invalidation by ALTER SERVER/ALTERUSER MAPPING  (Ashutosh Bapat <ashutosh.bapat@enterprisedb.com>)
Список pgsql-hackers
At Thu, 20 Jul 2017 18:15:42 -0400, Tom Lane <tgl@sss.pgh.pa.us> wrote in <18927.1500588942@sss.pgh.pa.us>
> This seems like overkill.  We can test it reasonably easily within the
> existing framework, as shown in the attached patch.  I'm also fairly

It checks for a disconnection caused in a single session. I
thought that its inter-process characteristics is important
(since I had forgot that in the previous version), but it is
reasonable enough if we can rely on the fact that it surely works
through invalidation mechanism.

In shoft, I agree to the test in your patch.

> concerned that what you're showing here would be unstable in the buildfarm
> as a result of race conditions between the multiple sessions.

Sure. It is what I meant by 'fragile'.

> I made some cosmetic updates to the code patch, as well.

Thank you for leaving the hashvalue staff and revising the comment.

By the way I mistakenly had left the following code in the
previous patch.

+     /* hashvalue == 0 means a cache reset, must clear all state */
+     if (hashvalue == 0)
+       entry->invalidated = true;
+     else if ((cacheid == FOREIGNSERVEROID &&
+           entry->server_hashvalue == hashvalue) ||
+          (cacheid == USERMAPPINGOID &&
+           entry->mapping_hashvalue == hashvalue))
+       entry->invalidated = true;

The reason for the redundancy was that it had used switch-case in
the else block just before. However, it is no longer
reasonable. I'd like to change here as the follows.

+     /* hashvalue == 0 means a cache reset, must clear all state */
+     if ((hashvalue == 0) ||
+         ((cacheid == FOREIGNSERVEROID &&
+           entry->server_hashvalue == hashvalue) ||
+          (cacheid == USERMAPPINGOID &&
+           entry->mapping_hashvalue == hashvalue)))
+       entry->invalidated = true;

The attached patch differs only in this point.

> I think this is actually a bug fix, and should not wait for the next
> commitfest.

Agreed.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/contrib/postgres_fdw/connection.c
--- b/contrib/postgres_fdw/connection.c
***************
*** 22,27 ****
--- 22,28 ---- #include "pgstat.h" #include "storage/latch.h" #include "utils/hsearch.h"
+ #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" 
***************
*** 48,58 **** typedef struct ConnCacheEntry
--- 49,63 ---- {     ConnCacheKey key;            /* hash key (must be first) */     PGconn       *conn;            /*
connectionto foreign server, or NULL */
 
+     /* Remaining fields are invalid when conn is NULL: */     int            xact_depth;        /* 0 = no xact open,
1= main xact open, 2 =                                  * one level of subxact open, etc */     bool
have_prep_stmt;/* have we prepared any stmts in this xact? */     bool        have_error;        /* have any subxacts
abortedin this xact? */     bool        changing_xact_state;    /* xact state change in process */
 
+     bool        invalidated;    /* true if reconnect is pending */
+     uint32        server_hashvalue;    /* hash value of foreign server OID */
+     uint32        mapping_hashvalue;    /* hash value of user mapping OID */ } ConnCacheEntry;  /*
***************
*** 69,74 **** static bool xact_got_connection = false;
--- 74,80 ----  /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server,
UserMapping*user);
 
+ static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const
char**values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char
*sql);
***************
*** 78,83 **** static void pgfdw_subxact_callback(SubXactEvent event,
--- 84,90 ----                        SubTransactionId mySubid,                        SubTransactionId parentSubid,
                   void *arg);
 
+ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry*entry); static bool pgfdw_cancel_query(PGconn *conn); static
boolpgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 
***************
*** 130,135 **** GetConnection(UserMapping *user, bool will_prep_stmt)
--- 137,146 ----          */         RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback,NULL);
 
+         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                       pgfdw_inval_callback, (Datum) 0);
+         CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                       pgfdw_inval_callback, (Datum) 0);     }      /* Set flag that we did
GetConnectionduring the current transaction */
 
***************
*** 144,161 **** GetConnection(UserMapping *user, bool will_prep_stmt)     entry = hash_search(ConnectionHash, &key,
HASH_ENTER,&found);     if (!found)     {
 
!         /* initialize new hashtable entry (key is already filled in) */         entry->conn = NULL;
-         entry->xact_depth = 0;
-         entry->have_prep_stmt = false;
-         entry->have_error = false;
-         entry->changing_xact_state = false;     }      /* Reject further use of connections which failed abort
cleanup.*/     pgfdw_reject_incomplete_xact_state_change(entry);      /*      * We don't check the health of cached
connectionhere, because it would      * require some overhead.  Broken connection will be detected when the      *
connectionis actually used.
 
--- 155,182 ----     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);     if (!found)     {
!         /*
!          * We need only clear "conn" here; remaining fields will be filled
!          * later when "conn" is set.
!          */         entry->conn = NULL;     }      /* Reject further use of connections which failed abort cleanup.
*/    pgfdw_reject_incomplete_xact_state_change(entry);      /*
 
+      * If the connection needs to be remade due to invalidation, disconnect as
+      * soon as we're out of all transactions.
+      */
+     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
+     {
+         elog(DEBUG3, "closing connection %p for option changes to take effect",
+              entry->conn);
+         disconnect_pg_server(entry);
+     }
+ 
+     /*      * We don't check the health of cached connection here, because it would      * require some overhead.
Brokenconnection will be detected when the      * connection is actually used.
 
***************
*** 164,178 **** GetConnection(UserMapping *user, bool will_prep_stmt)     /*      * If cache entry doesn't have a
connection,we have to establish a new      * connection.  (If connect_pg_server throws an error, the cache entry
 
!      * will be left in a valid empty state.)      */     if (entry->conn == NULL)     {         ForeignServer *server
=GetForeignServer(user->serverid); 
 
!         entry->xact_depth = 0;    /* just to be sure */         entry->have_prep_stmt = false;
entry->have_error= false;         entry->conn = connect_pg_server(server, user);          elog(DEBUG3, "new
postgres_fdwconnection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
--- 185,208 ----     /*      * If cache entry doesn't have a connection, we have to establish a new      * connection.
(Ifconnect_pg_server throws an error, the cache entry
 
!      * will remain in a valid empty state, ie conn == NULL.)      */     if (entry->conn == NULL)     {
ForeignServer*server = GetForeignServer(user->serverid); 
 
!         /* Reset all transient state fields, to be sure all are clean */
!         entry->xact_depth = 0;         entry->have_prep_stmt = false;         entry->have_error = false;
+         entry->changing_xact_state = false;
+         entry->invalidated = false;
+         entry->server_hashvalue =
+             GetSysCacheHashValue1(FOREIGNSERVEROID, server->serverid);
+         entry->mapping_hashvalue =
+             GetSysCacheHashValue1(USERMAPPINGOID, user->umid);
+ 
+         /* Now try to make the connection */         entry->conn = connect_pg_server(server, user);
elog(DEBUG3,"new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 
***************
*** 277,282 **** connect_pg_server(ForeignServer *server, UserMapping *user)
--- 307,325 ---- }  /*
+  * Disconnect any open connection for a connection cache entry.
+  */
+ static void
+ disconnect_pg_server(ConnCacheEntry *entry)
+ {
+     if (entry->conn != NULL)
+     {
+         PQfinish(entry->conn);
+         entry->conn = NULL;
+     }
+ }
+ 
+ /*  * For non-superusers, insist that the connstr specify a password.  This  * prevents a password from being picked
upfrom .pgpass, a service file,  * the environment, etc.  We don't want the postgres user's passwords
 
***************
*** 777,785 **** pgfdw_xact_callback(XactEvent event, void *arg)             entry->changing_xact_state)         {
      elog(DEBUG3, "discarding connection %p", entry->conn);
 
!             PQfinish(entry->conn);
!             entry->conn = NULL;
!             entry->changing_xact_state = false;         }     } 
--- 820,826 ----             entry->changing_xact_state)         {             elog(DEBUG3, "discarding connection %p",
entry->conn);
!             disconnect_pg_server(entry);         }     } 
***************
*** 897,902 **** pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
--- 938,984 ---- }  /*
+  * Connection invalidation callback function
+  *
+  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
+  * mark connections depending on that entry as needing to be remade.
+  * We can't immediately destroy them, since they might be in the midst of
+  * a transaction, but we'll remake them at the next opportunity.
+  *
+  * Although most cache invalidation callbacks blow away all the related stuff
+  * regardless of the given hashvalue, connections are expensive enough that
+  * it's worth trying to avoid that.
+  *
+  * NB: We could avoid unnecessary disconnection more strictly by examining
+  * individual option values, but it seems too much effort for the gain.
+  */
+ static void
+ pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
+ {
+     HASH_SEQ_STATUS scan;
+     ConnCacheEntry *entry;
+ 
+     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
+ 
+     /* ConnectionHash must exist already, if we're registered */
+     hash_seq_init(&scan, ConnectionHash);
+     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+     {
+         /* Ignore invalid entries */
+         if (entry->conn == NULL)
+             continue;
+ 
+         /* hashvalue == 0 means a cache reset, must clear all state */
+         if (hashvalue == 0 ||
+             ((cacheid == FOREIGNSERVEROID &&
+               entry->server_hashvalue == hashvalue) ||
+              (cacheid == USERMAPPINGOID &&
+               entry->mapping_hashvalue == hashvalue)))
+             entry->invalidated = true;
+     }
+ }
+ 
+ /*  * Raise an error if the given connection cache entry is marked as being  * in the middle of an xact state change.
This should be called at which no  * such change is expected to be in progress; if one is found to be in
 
***************
*** 913,921 **** pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)     Form_pg_user_mapping umform;
ForeignServer*server; 
 
!     if (!entry->changing_xact_state)         return;      tup = SearchSysCache1(USERMAPPINGOID,
   ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
--- 995,1008 ----     Form_pg_user_mapping umform;     ForeignServer *server; 
!     /* nothing to do for inactive entries and entries of sane state */
!     if (entry->conn == NULL || !entry->changing_xact_state)         return; 
+     /* make sure this entry is inactive */
+     disconnect_pg_server(entry);
+ 
+     /* find server name to be shown in the message below */     tup = SearchSysCache1(USERMAPPINGOID,
         ObjectIdGetDatum(entry->key));     if (!HeapTupleIsValid(tup))
 
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
***************
*** 191,196 **** ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
--- 191,233 ----  public | ft_pg_type | loopback  | (schema_name 'pg_catalog', table_name 'pg_type') |  (6 rows) 
+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+   c3   |              c4              
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+ 
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  database "no such database" does not exist
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4              
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+ 
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ERROR:  could not connect to server "loopback"
+ DETAIL:  FATAL:  role "no such user" does not exist
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+   c3   |              c4              
+ -------+------------------------------
+  00001 | Fri Jan 02 00:00:00 1970 PST
+ (1 row)
+  -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and
remote-estimatemode on ft2.
 
*** a/contrib/postgres_fdw/sql/postgres_fdw.sql
--- b/contrib/postgres_fdw/sql/postgres_fdw.sql
***************
*** 195,200 **** ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
--- 195,220 ---- ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ 
+ -- Test that alteration of server options causes reconnection
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work
+ ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ DO $d$
+     BEGIN
+         EXECUTE $$ALTER SERVER loopback
+             OPTIONS (SET dbname '$$||current_database()||$$')$$;
+     END;
+ $d$;
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+ 
+ -- Test that alteration of user mapping options causes reconnection
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (ADD user 'no such user');
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should fail
+ ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
+   OPTIONS (DROP user);
+ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
+  -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and
remote-estimatemode on ft2. 

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Jeff Janes
Дата:
Сообщение: Re: [HACKERS] Better error message for trying to drop a DB with open subscriptions?
Следующее
От: Thomas Munro
Дата:
Сообщение: Re: [HACKERS] [TRAP: FailedAssertion] causing server to crash