Re: Fixing memory leaks in postgres_fdw

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: Fixing memory leaks in postgres_fdw
Дата
Msg-id 216286.1748202797@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Fixing memory leaks in postgres_fdw  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: Fixing memory leaks in postgres_fdw
Список pgsql-hackers
Etsuro Fujita <etsuro.fujita@gmail.com> writes:
> On Sat, May 24, 2025 at 10:10 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
>> I thought of fixing this by using a memory context reset callback
>> to ensure that the PGresult is cleaned up when the executor's context
>> goes away, and that seems to work nicely (see 0001 attached).
>> However, I feel like this is just a POC, because now that we have that
>> concept we might be able to use it elsewhere in postgres_fdw to
>> eliminate most or even all of its reliance on PG_TRY.  That should be
>> faster as well as much less bug-prone.  But I'm putting it up at this
>> stage for comments, in case anyone thinks this is not the direction to
>> head in.

> I think that that is a good idea; +1 for removing the reliance not
> only in DirectModify but in other places.  I think that that would be
> also useful if extending batch INSERT to cases with RETURNING data in
> postgres_fdw.

Here is an attempt at making a bulletproof fix by having all backend
users of libpq go through a wrapper layer that provides the memory
context callback.  Perhaps this is more code churn than we want to
accept; I'm not sure.  I thought about avoiding most of the niggling
code changes by adding

#define PGresult BEPGresult
#define PQclear BEPQclear
#define PQresultStatus BEPQresultStatus

and so forth at the bottom of the new header file, but I'm afraid
that would create a lot of confusion.

There is a lot yet to do towards getting rid of no-longer-needed
PG_TRYs and other complication, but I decided to stop here pending
comments on the notational decisions I made.

One point that people might find particularly dubious is that
I put the new stuff into a new header file libpq-be-fe.h, rather
than adding it to libpq-be-fe-helpers.h which would seem more
obvious.  The reason for that is the code layout in postgres_fdw.
postgres_fdw.h needs to include libpq-fe.h to get the PGresult
typedef, and with these changes it instead needs to get BEPGresult.
But only connection.c currently includes libpq-be-fe-helpers.h,
and I didn't like the idea of making all of postgres_fdw's .c
files include that.  Maybe that's not worth worrying about though.

The 0002 patch is the same as before.

            regards, tom lane

From 2fc53191595be88592fcd83b8b78c5849b623049 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sun, 25 May 2025 15:16:23 -0400
Subject: [PATCH v2 1/2] Fix memory leakage in postgres_fdw's DirectModify code
 path.

postgres_fdw tries to use PG_TRY blocks to ensure that it will
eventually free the PGresult created by the remote modify command.
However, it's fundamentally impossible for this scheme to work
reliably when there's RETURNING data, because the query could fail
in between invocations of postgres_fdw's DirectModify methods.
There is at least one instance of exactly this situation in the
regression tests, and the ensuing session-lifespan leak is visible
under Valgrind.

We can improve matters by using a memory context reset callback
attached to the ExecutorState context.  That ensures that the
PGresult will be freed when the ExecutorState context is torn
down, even if control never reaches postgresEndDirectModify.
Also, by switching to this approach, we can eliminate some
PG_TRY overhead since it's no longer necessary to be so
cautious about errors.

This patch adds infrastructure that wraps each PGresult in a
"BEPGresult" that provides the reset callback.  Code using this
abstraction is inherently memory-safe (as long as it attaches
the reset callbacks to an appropriate memory context).  The
amount of notational churn is slightly annoying, but I think
that it's probably worth it to forestall future leaks.

So far I created the infrastructure and made relevant code
use it, but I have not done very much towards making the
caller simplifications that should now be possible.  I did
remove a few PQclear calls in error-exit paths that are now
no longer necessary.  But we should be able to remove a lot
of uses of PG_TRY and "volatile" variable markings, as well
as simplify error handling, now that it's not necessary to be
so paranoid about freeing PGresults before throwing an error.
This state of the patch is good for reviewing the notational
choices I made, though.

Author: Tom Lane <tgl@sss.pgh.pa.us>
Discussion: https://postgr.es/m/2976982.1748049023@sss.pgh.pa.us
---
 contrib/dblink/dblink.c                       | 159 +++++++-------
 contrib/postgres_fdw/connection.c             |  54 ++---
 contrib/postgres_fdw/postgres_fdw.c           | 186 ++++++++--------
 contrib/postgres_fdw/postgres_fdw.h           |  10 +-
 .../libpqwalreceiver/libpqwalreceiver.c       | 145 +++++--------
 src/backend/utils/mmgr/mcxt.c                 |  39 +++-
 src/include/libpq/libpq-be-fe-helpers.h       |  68 +++---
 src/include/libpq/libpq-be-fe.h               | 203 ++++++++++++++++++
 src/include/utils/palloc.h                    |   2 +
 src/tools/pgindent/typedefs.list              |   1 +
 10 files changed, 531 insertions(+), 336 deletions(-)
 create mode 100644 src/include/libpq/libpq-be-fe.h

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 98d4e3d7dac..873a6a8f6d7 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -85,8 +85,8 @@ typedef struct storeInfo
     MemoryContext tmpcontext;
     char      **cstrs;
     /* temp storage for results to avoid leaks on exception */
-    PGresult   *last_res;
-    PGresult   *cur_res;
+    BEPGresult *last_res;
+    BEPGresult *cur_res;
 } storeInfo;

 /*
@@ -95,14 +95,14 @@ typedef struct storeInfo
 static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
-                              PGresult *res);
+                              BEPGresult *res);
 static void materializeQueryResult(FunctionCallInfo fcinfo,
                                    PGconn *conn,
                                    const char *conname,
                                    const char *sql,
                                    bool fail);
-static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
-static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
+static BEPGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
+static void storeRow(volatile storeInfo *sinfo, BEPGresult *res, bool first);
 static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
@@ -120,7 +120,7 @@ static char *generate_relation_name(Relation rel);
 static void dblink_connstr_check(const char *connstr);
 static bool dblink_connstr_has_pw(const char *connstr);
 static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
-static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
+static void dblink_res_error(PGconn *conn, const char *conname, BEPGresult *res,
                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
 static char *get_connect_string(const char *servername);
 static char *escape_param_str(const char *str);
@@ -171,11 +171,11 @@ xpstrdup(const char *in)
 }

 pg_noreturn static void
-dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
+dblink_res_internalerror(PGconn *conn, BEPGresult *res, const char *p2)
 {
     char       *msg = pchomp(PQerrorMessage(conn));

-    PQclear(res);
+    BEPQclear(res);
     elog(ERROR, "%s: %s", p2, msg);
 }

@@ -400,7 +400,7 @@ PG_FUNCTION_INFO_V1(dblink_open);
 Datum
 dblink_open(PG_FUNCTION_ARGS)
 {
-    PGresult   *res = NULL;
+    BEPGresult *res = NULL;
     PGconn       *conn;
     char       *curname = NULL;
     char       *sql = NULL;
@@ -456,9 +456,9 @@ dblink_open(PG_FUNCTION_ARGS)
     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     {
         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
-        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+        if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
             dblink_res_internalerror(conn, res, "begin error");
-        PQclear(res);
+        BEPQclear(res);
         rconn->newXactForCursor = true;

         /*
@@ -475,14 +475,14 @@ dblink_open(PG_FUNCTION_ARGS)

     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
-    if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
     {
         dblink_res_error(conn, conname, res, fail,
                          "while opening cursor \"%s\"", curname);
         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     }

-    PQclear(res);
+    BEPQclear(res);
     PG_RETURN_TEXT_P(cstring_to_text("OK"));
 }

@@ -494,7 +494,7 @@ Datum
 dblink_close(PG_FUNCTION_ARGS)
 {
     PGconn       *conn;
-    PGresult   *res = NULL;
+    BEPGresult *res = NULL;
     char       *curname = NULL;
     char       *conname = NULL;
     StringInfoData buf;
@@ -544,14 +544,14 @@ dblink_close(PG_FUNCTION_ARGS)

     /* close the cursor */
     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
-    if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
     {
         dblink_res_error(conn, conname, res, fail,
                          "while closing cursor \"%s\"", curname);
         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     }

-    PQclear(res);
+    BEPQclear(res);

     /* if we started a transaction, decrement cursor count */
     if (rconn->newXactForCursor)
@@ -564,9 +564,9 @@ dblink_close(PG_FUNCTION_ARGS)
             rconn->newXactForCursor = false;

             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
-            if (PQresultStatus(res) != PGRES_COMMAND_OK)
+            if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
                 dblink_res_internalerror(conn, res, "commit error");
-            PQclear(res);
+            BEPQclear(res);
         }
     }

@@ -580,7 +580,7 @@ PG_FUNCTION_INFO_V1(dblink_fetch);
 Datum
 dblink_fetch(PG_FUNCTION_ARGS)
 {
-    PGresult   *res = NULL;
+    BEPGresult *res = NULL;
     char       *conname = NULL;
     remoteConn *rconn = NULL;
     PGconn       *conn = NULL;
@@ -646,18 +646,17 @@ dblink_fetch(PG_FUNCTION_ARGS)
      * memory context.
      */
     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK &&
+        BEPQresultStatus(res) != PGRES_TUPLES_OK)
     {
         dblink_res_error(conn, conname, res, fail,
                          "while fetching from cursor \"%s\"", curname);
         return (Datum) 0;
     }
-    else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+    else if (BEPQresultStatus(res) == PGRES_COMMAND_OK)
     {
         /* cursor does not exist - closed already or bad name */
-        PQclear(res);
+        BEPQclear(res);
         ereport(ERROR,
                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
                  errmsg("cursor \"%s\" does not exist", curname)));
@@ -793,13 +792,13 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
         else
         {
             /* async result retrieval, do it the old way */
-            PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
+            BEPGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);

             /* NULL means we're all done with the async results */
             if (res)
             {
-                if (PQresultStatus(res) != PGRES_COMMAND_OK &&
-                    PQresultStatus(res) != PGRES_TUPLES_OK)
+                if (BEPQresultStatus(res) != PGRES_COMMAND_OK &&
+                    BEPQresultStatus(res) != PGRES_TUPLES_OK)
                 {
                     dblink_res_error(conn, conname, res, fail,
                                      "while executing query");
@@ -853,12 +852,12 @@ prepTuplestoreResult(FunctionCallInfo fcinfo)
 }

 /*
- * Copy the contents of the PGresult into a tuplestore to be returned
+ * Copy the contents of the BEPGresult into a tuplestore to be returned
  * as the result of the current function.
- * The PGresult will be released in this function.
+ * The BEPGresult will be released in this function.
  */
 static void
-materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
+materializeResult(FunctionCallInfo fcinfo, PGconn *conn, BEPGresult *res)
 {
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;

@@ -872,7 +871,7 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
         int            ntuples;
         int            nfields;

-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        if (BEPQresultStatus(res) == PGRES_COMMAND_OK)
         {
             is_sql_cmd = true;

@@ -888,7 +887,7 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
         }
         else
         {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+            Assert(BEPQresultStatus(res) == PGRES_TUPLES_OK);

             is_sql_cmd = false;

@@ -913,8 +912,8 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)

             /* make sure we have a persistent copy of the tupdesc */
             tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
+            ntuples = BEPQntuples(res);
+            nfields = BEPQnfields(res);
         }

         /*
@@ -960,15 +959,15 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)

                     for (i = 0; i < nfields; i++)
                     {
-                        if (PQgetisnull(res, row, i))
+                        if (BEPQgetisnull(res, row, i))
                             values[i] = NULL;
                         else
-                            values[i] = PQgetvalue(res, row, i);
+                            values[i] = BEPQgetvalue(res, row, i);
                     }
                 }
                 else
                 {
-                    values[0] = PQcmdStatus(res);
+                    values[0] = BEPQcmdStatus(res);
                 }

                 /* build the tuple and put it into the tuplestore. */
@@ -983,7 +982,7 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     PG_FINALLY();
     {
         /* be sure to release the libpq result */
-        PQclear(res);
+        BEPQclear(res);
     }
     PG_END_TRY();
 }
@@ -1004,7 +1003,7 @@ materializeQueryResult(FunctionCallInfo fcinfo,
                        bool fail)
 {
     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;
     volatile storeInfo sinfo = {0};

     /* prepTuplestoreResult must have been called previously */
@@ -1022,22 +1021,21 @@ materializeQueryResult(FunctionCallInfo fcinfo,
         /* execute query, collecting any tuples into the tuplestore */
         res = storeQueryResult(&sinfo, conn, sql);

-        if (!res ||
-            (PQresultStatus(res) != PGRES_COMMAND_OK &&
-             PQresultStatus(res) != PGRES_TUPLES_OK))
+        if (BEPQresultStatus(res) != PGRES_COMMAND_OK &&
+            BEPQresultStatus(res) != PGRES_TUPLES_OK)
         {
             /*
              * dblink_res_error will clear the passed PGresult, so we need
              * this ugly dance to avoid doing so twice during error exit
              */
-            PGresult   *res1 = res;
+            BEPGresult *res1 = res;

             res = NULL;
             dblink_res_error(conn, conname, res1, fail,
                              "while executing query");
             /* if fail isn't set, we'll return an empty query result */
         }
-        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        else if (BEPQresultStatus(res) == PGRES_COMMAND_OK)
         {
             /*
              * storeRow didn't get called, so we need to convert the command
@@ -1065,22 +1063,22 @@ materializeQueryResult(FunctionCallInfo fcinfo,
             rsinfo->setDesc = tupdesc;
             MemoryContextSwitchTo(oldcontext);

-            values[0] = PQcmdStatus(res);
+            values[0] = BEPQcmdStatus(res);

             /* build the tuple and put it into the tuplestore. */
             tuple = BuildTupleFromCStrings(attinmeta, values);
             tuplestore_puttuple(tupstore, tuple);

-            PQclear(res);
+            BEPQclear(res);
             res = NULL;
         }
         else
         {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+            Assert(BEPQresultStatus(res) == PGRES_TUPLES_OK);
             /* storeRow should have created a tuplestore */
             Assert(rsinfo->setResult != NULL);

-            PQclear(res);
+            BEPQclear(res);
             res = NULL;
         }

@@ -1089,21 +1087,21 @@ materializeQueryResult(FunctionCallInfo fcinfo,
             MemoryContextDelete(sinfo.tmpcontext);
         sinfo.tmpcontext = NULL;

-        PQclear(sinfo.last_res);
+        BEPQclear(sinfo.last_res);
         sinfo.last_res = NULL;
-        PQclear(sinfo.cur_res);
+        BEPQclear(sinfo.cur_res);
         sinfo.cur_res = NULL;
     }
     PG_CATCH();
     {
         /* be sure to release any libpq result we collected */
-        PQclear(res);
-        PQclear(sinfo.last_res);
-        PQclear(sinfo.cur_res);
+        BEPQclear(res);
+        BEPQclear(sinfo.last_res);
+        BEPQclear(sinfo.cur_res);
         /* and clear out any pending data in libpq */
         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
                NULL)
-            PQclear(res);
+            BEPQclear(res);
         PG_RE_THROW();
     }
     PG_END_TRY();
@@ -1112,12 +1110,12 @@ materializeQueryResult(FunctionCallInfo fcinfo,
 /*
  * Execute query, and send any result rows to sinfo->tuplestore.
  */
-static PGresult *
+static BEPGresult *
 storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
 {
     bool        first = true;
     int            nestlevel = -1;
-    PGresult   *res;
+    BEPGresult *res;

     if (!PQsendQuery(conn, sql))
         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
@@ -1133,7 +1131,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
         if (!sinfo->cur_res)
             break;

-        if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
+        if (BEPQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
         {
             /* got one row from possibly-bigger resultset */

@@ -1147,18 +1145,18 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)

             storeRow(sinfo, sinfo->cur_res, first);

-            PQclear(sinfo->cur_res);
+            BEPQclear(sinfo->cur_res);
             sinfo->cur_res = NULL;
             first = false;
         }
         else
         {
             /* if empty resultset, fill tuplestore header */
-            if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
+            if (first && BEPQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
                 storeRow(sinfo, sinfo->cur_res, first);

             /* store completed result at last_res */
-            PQclear(sinfo->last_res);
+            BEPQclear(sinfo->last_res);
             sinfo->last_res = sinfo->cur_res;
             sinfo->cur_res = NULL;
             first = true;
@@ -1181,9 +1179,9 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
  * (in this case the PGresult might contain either zero or one row).
  */
 static void
-storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
+storeRow(volatile storeInfo *sinfo, BEPGresult *res, bool first)
 {
-    int            nfields = PQnfields(res);
+    int            nfields = BEPQnfields(res);
     HeapTuple    tuple;
     int            i;
     MemoryContext oldcontext;
@@ -1243,7 +1241,7 @@ storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
         MemoryContextSwitchTo(oldcontext);

         /* Done if empty resultset */
-        if (PQntuples(res) == 0)
+        if (BEPQntuples(res) == 0)
             return;

         /*
@@ -1256,7 +1254,7 @@ storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
     }

     /* Should have a single-row result if we get here */
-    Assert(PQntuples(res) == 1);
+    Assert(BEPQntuples(res) == 1);

     /*
      * Do the following work in a temp context that we reset after each tuple.
@@ -1270,10 +1268,10 @@ storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
      */
     for (i = 0; i < nfields; i++)
     {
-        if (PQgetisnull(res, 0, i))
+        if (BEPQgetisnull(res, 0, i))
             sinfo->cstrs[i] = NULL;
         else
-            sinfo->cstrs[i] = PQgetvalue(res, 0, i);
+            sinfo->cstrs[i] = BEPQgetvalue(res, 0, i);
     }

     /* Convert row to a tuple, and add it to the tuplestore */
@@ -1412,7 +1410,7 @@ dblink_exec(PG_FUNCTION_ARGS)

     PG_TRY();
     {
-        PGresult   *res = NULL;
+        BEPGresult *res = NULL;
         char       *sql = NULL;
         char       *conname = NULL;
         bool        fail = true;    /* default to backward compatible behavior */
@@ -1455,9 +1453,8 @@ dblink_exec(PG_FUNCTION_ARGS)
             dblink_conn_not_avail(conname);

         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
-        if (!res ||
-            (PQresultStatus(res) != PGRES_COMMAND_OK &&
-             PQresultStatus(res) != PGRES_TUPLES_OK))
+        if (BEPQresultStatus(res) != PGRES_COMMAND_OK &&
+            BEPQresultStatus(res) != PGRES_TUPLES_OK)
         {
             dblink_res_error(conn, conname, res, fail,
                              "while executing command");
@@ -1468,18 +1465,18 @@ dblink_exec(PG_FUNCTION_ARGS)
              */
             sql_cmd_status = cstring_to_text("ERROR");
         }
-        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        else if (BEPQresultStatus(res) == PGRES_COMMAND_OK)
         {
             /*
              * and save a copy of the command status string to return as our
              * result tuple
              */
-            sql_cmd_status = cstring_to_text(PQcmdStatus(res));
-            PQclear(res);
+            sql_cmd_status = cstring_to_text(BEPQcmdStatus(res));
+            BEPQclear(res);
         }
         else
         {
-            PQclear(res);
+            BEPQclear(res);
             ereport(ERROR,
                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
                      errmsg("statement returning results not allowed")));
@@ -2788,15 +2785,15 @@ dblink_connstr_check(const char *connstr)
  * the resulting string should be worded like "while <some action>"
  */
 static void
-dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
+dblink_res_error(PGconn *conn, const char *conname, BEPGresult *res,
                  bool fail, const char *fmt,...)
 {
     int            level;
-    char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
-    char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
-    char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
-    char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
-    char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+    char       *pg_diag_sqlstate = BEPQresultErrorField(res, PG_DIAG_SQLSTATE);
+    char       *pg_diag_message_primary = BEPQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+    char       *pg_diag_message_detail = BEPQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+    char       *pg_diag_message_hint = BEPQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+    char       *pg_diag_context = BEPQresultErrorField(res, PG_DIAG_CONTEXT);
     int            sqlstate;
     char       *message_primary;
     char       *message_detail;
@@ -2838,7 +2835,7 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
      * leaking all the strings too, but those are in palloc'd memory that will
      * get cleaned up eventually.
      */
-    PQclear(res);
+    BEPQclear(res);

     /*
      * Format the basic errcontext string.  Below, we'll add on something
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 304f3c20f83..488adb685b5 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -164,7 +164,7 @@ static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
                                          bool ignore_errors);
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
                                      TimestampTz retrycanceltime,
-                                     PGresult **result, bool *timed_out);
+                                     BEPGresult **result, bool *timed_out);
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
                                       List **pending_entries,
@@ -818,7 +818,7 @@ do_sql_command_begin(PGconn *conn, const char *sql)
 static void
 do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
 {
-    PGresult   *res;
+    BEPGresult *res;

     /*
      * If requested, consume whatever data is available from the socket. (Note
@@ -829,9 +829,9 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     if (consume_input && !PQconsumeInput(conn))
         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     res = pgfdw_get_result(conn);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         pgfdw_report_error(ERROR, res, conn, true, sql);
-    PQclear(res);
+    BEPQclear(res);
 }

 /*
@@ -937,7 +937,7 @@ GetPrepStmtNumber(PGconn *conn)
  *
  * Caller is responsible for the error handling on the result.
  */
-PGresult *
+BEPGresult *
 pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 {
     /* First, process a pending asynchronous request, if any. */
@@ -954,7 +954,7 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
  *
  * Caller is responsible for the error handling on the result.
  */
-PGresult *
+BEPGresult *
 pgfdw_get_result(PGconn *conn)
 {
     return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
@@ -974,17 +974,17 @@ pgfdw_get_result(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+pgfdw_report_error(int elevel, BEPGresult *res, PGconn *conn,
                    bool clear, const char *sql)
 {
     /* If requested, PGresult must be released before leaving this function. */
     PG_TRY();
     {
-        char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
-        char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
-        char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
-        char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
-        char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+        char       *diag_sqlstate = BEPQresultErrorField(res, PG_DIAG_SQLSTATE);
+        char       *message_primary = BEPQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+        char       *message_detail = BEPQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+        char       *message_hint = BEPQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+        char       *message_context = BEPQresultErrorField(res, PG_DIAG_CONTEXT);
         int            sqlstate;

         if (diag_sqlstate)
@@ -1017,7 +1017,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
     PG_FINALLY();
     {
         if (clear)
-            PQclear(res);
+            BEPQclear(res);
     }
     PG_END_TRY();
 }
@@ -1048,7 +1048,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
     hash_seq_init(&scan, ConnectionHash);
     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
     {
-        PGresult   *res;
+        BEPGresult *res;

         /* Ignore cache entry if no open connection right now */
         if (entry->conn == NULL)
@@ -1101,7 +1101,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                     {
                         res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
                                                NULL);
-                        PQclear(res);
+                        BEPQclear(res);
                     }
                     entry->have_prep_stmt = false;
                     entry->have_error = false;
@@ -1461,7 +1461,7 @@ static bool
 pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
                        TimestampTz retrycanceltime, bool consume_input)
 {
-    PGresult   *result;
+    BEPGresult *result;
     bool        timed_out;

     /*
@@ -1494,7 +1494,7 @@ pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,

         return false;
     }
-    PQclear(result);
+    BEPQclear(result);

     return true;
 }
@@ -1554,7 +1554,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
                              TimestampTz endtime, bool consume_input,
                              bool ignore_errors)
 {
-    PGresult   *result;
+    BEPGresult *result;
     bool        timed_out;

     Assert(query != NULL);
@@ -1585,12 +1585,12 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     }

     /* Issue a warning if not successful. */
-    if (PQresultStatus(result) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(result) != PGRES_COMMAND_OK)
     {
         pgfdw_report_error(WARNING, result, conn, true, query);
         return ignore_errors;
     }
-    PQclear(result);
+    BEPQclear(result);

     return true;
 }
@@ -1612,11 +1612,11 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
 static bool
 pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
                          TimestampTz retrycanceltime,
-                         PGresult **result,
+                         BEPGresult **result,
                          bool *timed_out)
 {
     volatile bool failed = false;
-    PGresult   *volatile last_res = NULL;
+    BEPGresult *volatile last_res = NULL;

     *result = NULL;
     *timed_out = false;
@@ -1628,7 +1628,7 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,

         for (;;)
         {
-            PGresult   *res;
+            BEPGresult *res;

             while (PQisBusy(conn))
             {
@@ -1696,24 +1696,24 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
                 }
             }

-            res = PQgetResult(conn);
+            BEPQwrap(res, PQgetResult(conn), CurrentMemoryContext);
             if (res == NULL)
                 break;            /* query is complete */

-            PQclear(last_res);
+            BEPQclear(last_res);
             last_res = res;
         }
 exit:    ;
     }
     PG_CATCH();
     {
-        PQclear(last_res);
+        BEPQclear(last_res);
         PG_RE_THROW();
     }
     PG_END_TRY();

     if (failed)
-        PQclear(last_res);
+        BEPQclear(last_res);
     else
         *result = last_res;
     return failed;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 331f3fc088d..f6e06ae0610 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -237,7 +237,7 @@ typedef struct PgFdwDirectModifyState
     const char **param_values;    /* textual values of query parameters */

     /* for storing result tuples */
-    PGresult   *result;            /* result for query */
+    BEPGresult *result;            /* result for query */
     int            num_tuples;        /* # of result tuples */
     int            next_tuple;        /* index of next one to return */
     Relation    resultRel;        /* relcache entry for the target relation */
@@ -476,7 +476,7 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                                              TupleTableSlot **slots,
                                              int numSlots);
 static void store_returning_result(PgFdwModifyState *fmstate,
-                                   TupleTableSlot *slot, PGresult *res);
+                                   TupleTableSlot *slot, BEPGresult *res);
 static void finish_foreign_modify(PgFdwModifyState *fmstate);
 static void deallocate_query(PgFdwModifyState *fmstate);
 static List *build_remote_returning(Index rtindex, Relation rel,
@@ -505,12 +505,12 @@ static int    postgresAcquireSampleRowsFunc(Relation relation, int elevel,
                                           HeapTuple *rows, int targrows,
                                           double *totalrows,
                                           double *totaldeadrows);
-static void analyze_row_processor(PGresult *res, int row,
+static void analyze_row_processor(BEPGresult *res, int row,
                                   PgFdwAnalyzeState *astate);
 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
 static void fetch_more_data_begin(AsyncRequest *areq);
 static void complete_pending_request(AsyncRequest *areq);
-static HeapTuple make_tuple_from_result_row(PGresult *res,
+static HeapTuple make_tuple_from_result_row(BEPGresult *res,
                                             int row,
                                             Relation rel,
                                             AttInMetadata *attinmeta,
@@ -1650,7 +1650,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 {
     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
     char        sql[64];
-    PGresult   *res;
+    BEPGresult *res;

     /* If we haven't created the cursor yet, nothing to do. */
     if (!fsstate->cursor_exists)
@@ -1707,9 +1707,9 @@ postgresReScanForeignScan(ForeignScanState *node)
      * without releasing the PGresult.
      */
     res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
-    PQclear(res);
+    BEPQclear(res);

     /* Now force a fresh FETCH. */
     fsstate->tuples = NULL;
@@ -2817,7 +2817,7 @@ postgresEndDirectModify(ForeignScanState *node)
         return;

     /* Release PGresult */
-    PQclear(dmstate->result);
+    BEPQclear(dmstate->result);

     /* Release remote connection */
     ReleaseConnection(dmstate->conn);
@@ -3601,7 +3601,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
                     double *rows, int *width,
                     Cost *startup_cost, Cost *total_cost)
 {
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;

     /* PGresult must be released before leaving this function. */
     PG_TRY();
@@ -3614,7 +3614,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
          * Execute EXPLAIN remotely.
          */
         res = pgfdw_exec_query(conn, sql, NULL);
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+        if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, sql);

         /*
@@ -3622,7 +3622,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
          * left paren from the end of the line to avoid being confused by
          * other uses of parentheses.
          */
-        line = PQgetvalue(res, 0, 0);
+        line = BEPQgetvalue(res, 0, 0);
         p = strrchr(line, '(');
         if (p == NULL)
             elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
@@ -3633,7 +3633,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
     }
     PG_FINALLY();
     {
-        PQclear(res);
+        BEPQclear(res);
     }
     PG_END_TRY();
 }
@@ -3736,7 +3736,7 @@ create_cursor(ForeignScanState *node)
     const char **values = fsstate->param_values;
     PGconn       *conn = fsstate->conn;
     StringInfoData buf;
-    PGresult   *res;
+    BEPGresult *res;

     /* First, process a pending asynchronous request, if any. */
     if (fsstate->conn_state->pendingAreq)
@@ -3779,14 +3779,11 @@ create_cursor(ForeignScanState *node)

     /*
      * Get the result, and check for success.
-     *
-     * We don't use a PG_TRY block here, so be careful not to throw error
-     * without releasing the PGresult.
      */
     res = pgfdw_get_result(conn);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
-    PQclear(res);
+    BEPQclear(res);

     /* Mark the cursor as created, and show no tuples have been retrieved */
     fsstate->cursor_exists = true;
@@ -3807,7 +3804,7 @@ static void
 fetch_more_data(ForeignScanState *node)
 {
     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;
     MemoryContext oldcontext;

     /*
@@ -3835,7 +3832,7 @@ fetch_more_data(ForeignScanState *node)
              */
             res = pgfdw_get_result(conn);
             /* On error, report the original query, not the FETCH. */
-            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+            if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
                 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);

             /* Reset per-connection state */
@@ -3851,12 +3848,12 @@ fetch_more_data(ForeignScanState *node)

             res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
             /* On error, report the original query, not the FETCH. */
-            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+            if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
                 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
         }

         /* Convert the data into HeapTuples */
-        numrows = PQntuples(res);
+        numrows = BEPQntuples(res);
         fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
         fsstate->num_tuples = numrows;
         fsstate->next_tuple = 0;
@@ -3883,7 +3880,7 @@ fetch_more_data(ForeignScanState *node)
     }
     PG_FINALLY();
     {
-        PQclear(res);
+        BEPQclear(res);
     }
     PG_END_TRY();

@@ -3956,7 +3953,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
              PgFdwConnState *conn_state)
 {
     char        sql[64];
-    PGresult   *res;
+    BEPGresult *res;

     snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);

@@ -3965,9 +3962,9 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
      * without releasing the PGresult.
      */
     res = pgfdw_exec_query(conn, sql, conn_state);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         pgfdw_report_error(ERROR, res, conn, true, sql);
-    PQclear(res);
+    BEPQclear(res);
 }

 /*
@@ -4106,7 +4103,7 @@ execute_foreign_modify(EState *estate,
     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
     ItemPointer ctid = NULL;
     const char **p_values;
-    PGresult   *res;
+    BEPGresult *res;
     int            n_rows;
     StringInfoData sql;

@@ -4178,12 +4175,9 @@ execute_foreign_modify(EState *estate,

     /*
      * Get the result, and check for success.
-     *
-     * We don't use a PG_TRY block here, so be careful not to throw error
-     * without releasing the PGresult.
      */
     res = pgfdw_get_result(fmstate->conn);
-    if (PQresultStatus(res) !=
+    if (BEPQresultStatus(res) !=
         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
         pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);

@@ -4191,15 +4185,15 @@ execute_foreign_modify(EState *estate,
     if (fmstate->has_returning)
     {
         Assert(*numSlots == 1);
-        n_rows = PQntuples(res);
+        n_rows = BEPQntuples(res);
         if (n_rows > 0)
             store_returning_result(fmstate, slots[0], res);
     }
     else
-        n_rows = atoi(PQcmdTuples(res));
+        n_rows = atoi(BEPQcmdTuples(res));

     /* And clean up */
-    PQclear(res);
+    BEPQclear(res);

     MemoryContextReset(fmstate->temp_cxt);

@@ -4220,7 +4214,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 {
     char        prep_name[NAMEDATALEN];
     char       *p_name;
-    PGresult   *res;
+    BEPGresult *res;

     /*
      * The caller would already have processed a pending asynchronous request
@@ -4248,14 +4242,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)

     /*
      * Get the result, and check for success.
-     *
-     * We don't use a PG_TRY block here, so be careful not to throw error
-     * without releasing the PGresult.
      */
     res = pgfdw_get_result(fmstate->conn);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
-    PQclear(res);
+    BEPQclear(res);

     /* This action shows that the prepare has been done. */
     fmstate->p_name = p_name;
@@ -4351,7 +4342,7 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
  */
 static void
 store_returning_result(PgFdwModifyState *fmstate,
-                       TupleTableSlot *slot, PGresult *res)
+                       TupleTableSlot *slot, BEPGresult *res)
 {
     PG_TRY();
     {
@@ -4372,7 +4363,7 @@ store_returning_result(PgFdwModifyState *fmstate,
     }
     PG_CATCH();
     {
-        PQclear(res);
+        BEPQclear(res);
         PG_RE_THROW();
     }
     PG_END_TRY();
@@ -4404,7 +4395,7 @@ static void
 deallocate_query(PgFdwModifyState *fmstate)
 {
     char        sql[64];
-    PGresult   *res;
+    BEPGresult *res;

     /* do nothing if the query is not allocated */
     if (!fmstate->p_name)
@@ -4417,9 +4408,9 @@ deallocate_query(PgFdwModifyState *fmstate)
      * without releasing the PGresult.
      */
     res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
-    PQclear(res);
+    BEPQclear(res);
     pfree(fmstate->p_name);
     fmstate->p_name = NULL;
 }
@@ -4563,6 +4554,7 @@ execute_dml_stmt(ForeignScanState *node)
     ExprContext *econtext = node->ss.ps.ps_ExprContext;
     int            numParams = dmstate->numParams;
     const char **values = dmstate->param_values;
+    MemoryContext oldctx;

     /* First, process a pending asynchronous request, if any. */
     if (dmstate->conn_state->pendingAreq)
@@ -4591,20 +4583,22 @@ execute_dml_stmt(ForeignScanState *node)
     /*
      * Get the result, and check for success.
      *
-     * We don't use a PG_TRY block here, so be careful not to throw error
-     * without releasing the PGresult.
+     * The result potentially needs to survive across multiple executor row
+     * cycles, so keep it in the context where the dmstate is.
      */
+    oldctx = MemoryContextSwitchTo(GetMemoryChunkContext(dmstate));
     dmstate->result = pgfdw_get_result(dmstate->conn);
-    if (PQresultStatus(dmstate->result) !=
+    MemoryContextSwitchTo(oldctx);
+    if (BEPQresultStatus(dmstate->result) !=
         (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
         pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
                            dmstate->query);

     /* Get the number of rows affected. */
     if (dmstate->has_returning)
-        dmstate->num_tuples = PQntuples(dmstate->result);
+        dmstate->num_tuples = BEPQntuples(dmstate->result);
     else
-        dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
+        dmstate->num_tuples = atoi(BEPQcmdTuples(dmstate->result));
 }

 /*
@@ -4660,7 +4654,7 @@ get_returning_data(ForeignScanState *node)
         }
         PG_CATCH();
         {
-            PQclear(dmstate->result);
+            BEPQclear(dmstate->result);
             PG_RE_THROW();
         }
         PG_END_TRY();
@@ -4950,7 +4944,7 @@ postgresAnalyzeForeignTable(Relation relation,
     UserMapping *user;
     PGconn       *conn;
     StringInfoData sql;
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;

     /* Return the row-analysis function pointer */
     *func = postgresAcquireSampleRowsFunc;
@@ -4980,16 +4974,16 @@ postgresAnalyzeForeignTable(Relation relation,
     PG_TRY();
     {
         res = pgfdw_exec_query(conn, sql.data, NULL);
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+        if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, sql.data);

-        if (PQntuples(res) != 1 || PQnfields(res) != 1)
+        if (BEPQntuples(res) != 1 || BEPQnfields(res) != 1)
             elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
-        *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+        *totalpages = strtoul(BEPQgetvalue(res, 0, 0), NULL, 10);
     }
     PG_FINALLY();
     {
-        PQclear(res);
+        BEPQclear(res);
     }
     PG_END_TRY();

@@ -5012,7 +5006,7 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
     UserMapping *user;
     PGconn       *conn;
     StringInfoData sql;
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;
     volatile double reltuples = -1;
     volatile char relkind = 0;

@@ -5037,18 +5031,18 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
     PG_TRY();
     {
         res = pgfdw_exec_query(conn, sql.data, NULL);
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+        if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, sql.data);

-        if (PQntuples(res) != 1 || PQnfields(res) != 2)
+        if (BEPQntuples(res) != 1 || BEPQnfields(res) != 2)
             elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
-        reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
-        relkind = *(PQgetvalue(res, 0, 1));
+        reltuples = strtod(BEPQgetvalue(res, 0, 0), NULL);
+        relkind = *(BEPQgetvalue(res, 0, 1));
     }
     PG_FINALLY();
     {
         if (res)
-            PQclear(res);
+            BEPQclear(res);
     }
     PG_END_TRY();

@@ -5093,7 +5087,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
     double        reltuples;
     unsigned int cursor_number;
     StringInfoData sql;
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;
     ListCell   *lc;

     /* Initialize workspace state */
@@ -5277,9 +5271,9 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
         int            fetch_size;

         res = pgfdw_exec_query(conn, sql.data, NULL);
-        if (PQresultStatus(res) != PGRES_COMMAND_OK)
+        if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
             pgfdw_report_error(ERROR, res, conn, false, sql.data);
-        PQclear(res);
+        BEPQclear(res);
         res = NULL;

         /*
@@ -5330,15 +5324,15 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
             /* Fetch some rows */
             res = pgfdw_exec_query(conn, fetch_sql, NULL);
             /* On error, report the original query, not the FETCH. */
-            if (PQresultStatus(res) != PGRES_TUPLES_OK)
+            if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
                 pgfdw_report_error(ERROR, res, conn, false, sql.data);

             /* Process whatever we got. */
-            numrows = PQntuples(res);
+            numrows = BEPQntuples(res);
             for (i = 0; i < numrows; i++)
                 analyze_row_processor(res, i, &astate);

-            PQclear(res);
+            BEPQclear(res);
             res = NULL;

             /* Must be EOF if we didn't get all the rows requested. */
@@ -5351,7 +5345,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
     }
     PG_CATCH();
     {
-        PQclear(res);
+        BEPQclear(res);
         PG_RE_THROW();
     }
     PG_END_TRY();
@@ -5388,7 +5382,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  *     - Subsequently, replace already-sampled tuples randomly.
  */
 static void
-analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
+analyze_row_processor(BEPGresult *res, int row, PgFdwAnalyzeState *astate)
 {
     int            targrows = astate->targrows;
     int            pos;            /* array index to store tuple in */
@@ -5466,7 +5460,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
     UserMapping *mapping;
     PGconn       *conn;
     StringInfoData buf;
-    PGresult   *volatile res = NULL;
+    BEPGresult *volatile res = NULL;
     int            numrows,
                 i;
     ListCell   *lc;
@@ -5513,16 +5507,16 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
         deparseStringLiteral(&buf, stmt->remote_schema);

         res = pgfdw_exec_query(conn, buf.data, NULL);
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+        if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, buf.data);

-        if (PQntuples(res) != 1)
+        if (BEPQntuples(res) != 1)
             ereport(ERROR,
                     (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
                      errmsg("schema \"%s\" is not present on foreign server \"%s\"",
                             stmt->remote_schema, server->servername)));

-        PQclear(res);
+        BEPQclear(res);
         res = NULL;
         resetStringInfo(&buf);

@@ -5630,15 +5624,15 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)

         /* Fetch the data */
         res = pgfdw_exec_query(conn, buf.data, NULL);
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+        if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
             pgfdw_report_error(ERROR, res, conn, false, buf.data);

         /* Process results */
-        numrows = PQntuples(res);
+        numrows = BEPQntuples(res);
         /* note: incrementation of i happens in inner loop's while() test */
         for (i = 0; i < numrows;)
         {
-            char       *tablename = PQgetvalue(res, i, 0);
+            char       *tablename = BEPQgetvalue(res, i, 0);
             bool        first_item = true;

             resetStringInfo(&buf);
@@ -5657,20 +5651,20 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
                 char       *collnamespace;

                 /* If table has no columns, we'll see nulls here */
-                if (PQgetisnull(res, i, 1))
+                if (BEPQgetisnull(res, i, 1))
                     continue;

-                attname = PQgetvalue(res, i, 1);
-                typename = PQgetvalue(res, i, 2);
-                attnotnull = PQgetvalue(res, i, 3);
-                attdefault = PQgetisnull(res, i, 4) ? NULL :
-                    PQgetvalue(res, i, 4);
-                attgenerated = PQgetisnull(res, i, 5) ? NULL :
-                    PQgetvalue(res, i, 5);
-                collname = PQgetisnull(res, i, 6) ? NULL :
-                    PQgetvalue(res, i, 6);
-                collnamespace = PQgetisnull(res, i, 7) ? NULL :
-                    PQgetvalue(res, i, 7);
+                attname = BEPQgetvalue(res, i, 1);
+                typename = BEPQgetvalue(res, i, 2);
+                attnotnull = BEPQgetvalue(res, i, 3);
+                attdefault = BEPQgetisnull(res, i, 4) ? NULL :
+                    BEPQgetvalue(res, i, 4);
+                attgenerated = BEPQgetisnull(res, i, 5) ? NULL :
+                    BEPQgetvalue(res, i, 5);
+                collname = BEPQgetisnull(res, i, 6) ? NULL :
+                    BEPQgetvalue(res, i, 6);
+                collnamespace = BEPQgetisnull(res, i, 7) ? NULL :
+                    BEPQgetvalue(res, i, 7);

                 if (first_item)
                     first_item = false;
@@ -5717,7 +5711,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
                     appendStringInfoString(&buf, " NOT NULL");
             }
             while (++i < numrows &&
-                   strcmp(PQgetvalue(res, i, 0), tablename) == 0);
+                   strcmp(BEPQgetvalue(res, i, 0), tablename) == 0);

             /*
              * Add server name and table-level options.  We specify remote
@@ -5739,7 +5733,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
     }
     PG_FINALLY();
     {
-        PQclear(res);
+        BEPQclear(res);
     }
     PG_END_TRY();

@@ -7585,7 +7579,7 @@ complete_pending_request(AsyncRequest *areq)
  * context such as ANALYZE, or if we're processing a non-scan query node.
  */
 static HeapTuple
-make_tuple_from_result_row(PGresult *res,
+make_tuple_from_result_row(BEPGresult *res,
                            int row,
                            Relation rel,
                            AttInMetadata *attinmeta,
@@ -7604,7 +7598,7 @@ make_tuple_from_result_row(PGresult *res,
     ListCell   *lc;
     int            j;

-    Assert(row < PQntuples(res));
+    Assert(row < BEPQntuples(res));

     /*
      * Do the following work in a temp context that we reset after each tuple.
@@ -7651,10 +7645,10 @@ make_tuple_from_result_row(PGresult *res,
         char       *valstr;

         /* fetch next column's textual value */
-        if (PQgetisnull(res, row, j))
+        if (BEPQgetisnull(res, row, j))
             valstr = NULL;
         else
-            valstr = PQgetvalue(res, row, j);
+            valstr = BEPQgetvalue(res, row, j);

         /*
          * convert value to internal representation
@@ -7696,7 +7690,7 @@ make_tuple_from_result_row(PGresult *res,
      * Check we got the expected number of columns.  Note: j == 0 and
      * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
      */
-    if (j > 0 && j != PQnfields(res))
+    if (j > 0 && j != BEPQnfields(res))
         elog(ERROR, "remote query result does not match the foreign table");

     /*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 81358f3bde7..afc56186191 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -15,7 +15,7 @@

 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
-#include "libpq-fe.h"
+#include "libpq/libpq-be-fe.h"
 #include "nodes/execnodes.h"
 #include "nodes/pathnodes.h"
 #include "utils/relcache.h"
@@ -163,10 +163,10 @@ extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
-extern PGresult *pgfdw_get_result(PGconn *conn);
-extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
-                                  PgFdwConnState *state);
-extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+extern BEPGresult *pgfdw_get_result(PGconn *conn);
+extern BEPGresult *pgfdw_exec_query(PGconn *conn, const char *query,
+                                    PgFdwConnState *state);
+extern void pgfdw_report_error(int elevel, BEPGresult *res, PGconn *conn,
                                bool clear, const char *sql);

 /* in option.c */
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7b4ddf7a8f5..272fa926d92 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -238,19 +238,19 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
      */
     if (!replication || logical)
     {
-        PGresult   *res;
+        BEPGresult *res;

         res = libpqsrv_exec(conn->streamConn,
                             ALWAYS_SECURE_SEARCH_PATH_SQL,
                             WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-        if (PQresultStatus(res) != PGRES_TUPLES_OK)
+        if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
         {
-            PQclear(res);
+            BEPQclear(res);
             *err = psprintf(_("could not clear search path: %s"),
                             pchomp(PQerrorMessage(conn->streamConn)));
             goto bad_connection;
         }
-        PQclear(res);
+        BEPQclear(res);
     }

     conn->logical = logical;
@@ -407,7 +407,7 @@ libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
 static char *
 libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 {
-    PGresult   *res;
+    BEPGresult *res;
     char       *primary_sysid;

     /*
@@ -417,35 +417,26 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
     res = libpqsrv_exec(conn->streamConn,
                         "IDENTIFY_SYSTEM",
                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-    if (PQresultStatus(res) != PGRES_TUPLES_OK)
-    {
-        PQclear(res);
+    if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("could not receive database system identifier and timeline ID from "
                         "the primary server: %s",
                         pchomp(PQerrorMessage(conn->streamConn)))));
-    }

     /*
      * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
      * 9.4 and onwards.
      */
-    if (PQnfields(res) < 3 || PQntuples(res) != 1)
-    {
-        int            ntuples = PQntuples(res);
-        int            nfields = PQnfields(res);
-
-        PQclear(res);
+    if (BEPQnfields(res) < 3 || BEPQntuples(res) != 1)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("invalid response from primary server"),
                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more
fields.",
-                           ntuples, nfields, 1, 3)));
-    }
-    primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
-    *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
-    PQclear(res);
+                           BEPQntuples(res), BEPQnfields(res), 1, 3)));
+    primary_sysid = pstrdup(BEPQgetvalue(res, 0, 0));
+    *primary_tli = pg_strtoint32(BEPQgetvalue(res, 0, 1));
+    BEPQclear(res);

     return primary_sysid;
 }
@@ -518,7 +509,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
                         const WalRcvStreamOptions *options)
 {
     StringInfoData cmd;
-    PGresult   *res;
+    BEPGresult *res;

     Assert(options->logical == conn->logical);
     Assert(options->slotname || !options->logical);
@@ -598,20 +589,17 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     pfree(cmd.data);

-    if (PQresultStatus(res) == PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) == PGRES_COMMAND_OK)
     {
-        PQclear(res);
+        BEPQclear(res);
         return false;
     }
-    else if (PQresultStatus(res) != PGRES_COPY_BOTH)
-    {
-        PQclear(res);
+    else if (BEPQresultStatus(res) != PGRES_COPY_BOTH)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("could not start WAL streaming: %s",
                         pchomp(PQerrorMessage(conn->streamConn)))));
-    }
-    PQclear(res);
+    BEPQclear(res);
     return true;
 }

@@ -622,7 +610,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 static void
 libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 {
-    PGresult   *res;
+    BEPGresult *res;

     /*
      * Send copy-end message.  As in libpqsrv_exec, this could theoretically
@@ -647,26 +635,26 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
      */
     res = libpqsrv_get_result(conn->streamConn,
                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-    if (PQresultStatus(res) == PGRES_TUPLES_OK)
+    if (BEPQresultStatus(res) == PGRES_TUPLES_OK)
     {
         /*
          * Read the next timeline's ID. The server also sends the timeline's
          * starting point, but it is ignored.
          */
-        if (PQnfields(res) < 2 || PQntuples(res) != 1)
+        if (BEPQnfields(res) < 2 || BEPQntuples(res) != 1)
             ereport(ERROR,
                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
                      errmsg("unexpected result set after end-of-streaming")));
-        *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
-        PQclear(res);
+        *next_tli = pg_strtoint32(BEPQgetvalue(res, 0, 0));
+        BEPQclear(res);

         /* the result set should be followed by CommandComplete */
         res = libpqsrv_get_result(conn->streamConn,
                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     }
-    else if (PQresultStatus(res) == PGRES_COPY_OUT)
+    else if (BEPQresultStatus(res) == PGRES_COPY_OUT)
     {
-        PQclear(res);
+        BEPQclear(res);

         /* End the copy */
         if (PQendcopy(conn->streamConn))
@@ -680,12 +668,12 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     }

-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("error reading result of streaming command: %s",
                         pchomp(PQerrorMessage(conn->streamConn)))));
-    PQclear(res);
+    BEPQclear(res);

     /* Verify that there are no more results */
     res = libpqsrv_get_result(conn->streamConn,
@@ -705,7 +693,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
                                  TimeLineID tli, char **filename,
                                  char **content, int *len)
 {
-    PGresult   *res;
+    BEPGresult *res;
     char        cmd[64];

     Assert(!conn->logical);
@@ -717,33 +705,24 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
     res = libpqsrv_exec(conn->streamConn,
                         cmd,
                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-    if (PQresultStatus(res) != PGRES_TUPLES_OK)
-    {
-        PQclear(res);
+    if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("could not receive timeline history file from "
                         "the primary server: %s",
                         pchomp(PQerrorMessage(conn->streamConn)))));
-    }
-    if (PQnfields(res) != 2 || PQntuples(res) != 1)
-    {
-        int            ntuples = PQntuples(res);
-        int            nfields = PQnfields(res);
-
-        PQclear(res);
+    if (BEPQnfields(res) != 2 || BEPQntuples(res) != 1)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("invalid response from primary server"),
                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
-                           ntuples, nfields)));
-    }
-    *filename = pstrdup(PQgetvalue(res, 0, 0));
+                           BEPQntuples(res), BEPQnfields(res))));
+    *filename = pstrdup(BEPQgetvalue(res, 0, 0));

-    *len = PQgetlength(res, 0, 1);
+    *len = BEPQgetlength(res, 0, 1);
     *content = palloc(*len);
-    memcpy(*content, PQgetvalue(res, 0, 1), *len);
-    PQclear(res);
+    memcpy(*content, BEPQgetvalue(res, 0, 1), *len);
+    BEPQclear(res);
 }

 /*
@@ -804,20 +783,20 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
     }
     if (rawlen == -1)            /* end-of-streaming or error */
     {
-        PGresult   *res;
+        BEPGresult *res;

         res = libpqsrv_get_result(conn->streamConn,
                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        if (BEPQresultStatus(res) == PGRES_COMMAND_OK)
         {
-            PQclear(res);
+            BEPQclear(res);

             /* Verify that there are no more results. */
             res = libpqsrv_get_result(conn->streamConn,
                                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
             if (res != NULL)
             {
-                PQclear(res);
+                BEPQclear(res);

                 /*
                  * If the other side closed the connection orderly (otherwise
@@ -835,19 +814,16 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,

             return -1;
         }
-        else if (PQresultStatus(res) == PGRES_COPY_IN)
+        else if (BEPQresultStatus(res) == PGRES_COPY_IN)
         {
-            PQclear(res);
+            BEPQclear(res);
             return -1;
         }
         else
-        {
-            PQclear(res);
             ereport(ERROR,
                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
                      errmsg("could not receive data from WAL stream: %s",
                             pchomp(PQerrorMessage(conn->streamConn)))));
-        }
     }
     if (rawlen < -1)
         ereport(ERROR,
@@ -886,7 +862,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
                      bool temporary, bool two_phase, bool failover,
                      CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 {
-    PGresult   *res;
+    BEPGresult *res;
     StringInfoData cmd;
     char       *snapshot;
     int            use_new_options_syntax;
@@ -970,25 +946,22 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     pfree(cmd.data);

-    if (PQresultStatus(res) != PGRES_TUPLES_OK)
-    {
-        PQclear(res);
+    if (BEPQresultStatus(res) != PGRES_TUPLES_OK)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("could not create replication slot \"%s\": %s",
                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));
-    }

     if (lsn)
         *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
-                                                   CStringGetDatum(PQgetvalue(res, 0, 1))));
+                                                   CStringGetDatum(BEPQgetvalue(res, 0, 1))));

-    if (!PQgetisnull(res, 0, 2))
-        snapshot = pstrdup(PQgetvalue(res, 0, 2));
+    if (!BEPQgetisnull(res, 0, 2))
+        snapshot = pstrdup(BEPQgetvalue(res, 0, 2));
     else
         snapshot = NULL;

-    PQclear(res);
+    BEPQclear(res);

     return snapshot;
 }
@@ -1001,7 +974,7 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
                     const bool *failover, const bool *two_phase)
 {
     StringInfoData cmd;
-    PGresult   *res;
+    BEPGresult *res;

     initStringInfo(&cmd);
     appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
@@ -1024,13 +997,13 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     pfree(cmd.data);

-    if (PQresultStatus(res) != PGRES_COMMAND_OK)
+    if (BEPQresultStatus(res) != PGRES_COMMAND_OK)
         ereport(ERROR,
                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
                  errmsg("could not alter replication slot \"%s\": %s",
                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));

-    PQclear(res);
+    BEPQclear(res);
 }

 /*
@@ -1046,12 +1019,12 @@ libpqrcv_get_backend_pid(WalReceiverConn *conn)
  * Convert tuple query result to tuplestore.
  */
 static void
-libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
+libpqrcv_processTuples(BEPGresult *pgres, WalRcvExecResult *walres,
                        const int nRetTypes, const Oid *retTypes)
 {
     int            tupn;
     int            coln;
-    int            nfields = PQnfields(pgres);
+    int            nfields = BEPQnfields(pgres);
     HeapTuple    tuple;
     AttInMetadata *attinmeta;
     MemoryContext rowcontext;
@@ -1071,11 +1044,11 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
     walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
     for (coln = 0; coln < nRetTypes; coln++)
         TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
-                           PQfname(pgres, coln), retTypes[coln], -1, 0);
+                           BEPQfname(pgres, coln), retTypes[coln], -1, 0);
     attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);

     /* No point in doing more here if there were no tuples returned. */
-    if (PQntuples(pgres) == 0)
+    if (BEPQntuples(pgres) == 0)
         return;

     /* Create temporary context for local allocations. */
@@ -1084,7 +1057,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
                                        ALLOCSET_DEFAULT_SIZES);

     /* Process returned rows. */
-    for (tupn = 0; tupn < PQntuples(pgres); tupn++)
+    for (tupn = 0; tupn < BEPQntuples(pgres); tupn++)
     {
         char       *cstrs[MaxTupleAttributeNumber];

@@ -1098,10 +1071,10 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
          */
         for (coln = 0; coln < nfields; coln++)
         {
-            if (PQgetisnull(pgres, tupn, coln))
+            if (BEPQgetisnull(pgres, tupn, coln))
                 cstrs[coln] = NULL;
             else
-                cstrs[coln] = PQgetvalue(pgres, tupn, coln);
+                cstrs[coln] = BEPQgetvalue(pgres, tupn, coln);
         }

         /* Convert row to a tuple, and add it to the tuplestore */
@@ -1125,7 +1098,7 @@ static WalRcvExecResult *
 libpqrcv_exec(WalReceiverConn *conn, const char *query,
               const int nRetTypes, const Oid *retTypes)
 {
-    PGresult   *pgres = NULL;
+    BEPGresult *pgres = NULL;
     WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
     char       *diag_sqlstate;

@@ -1138,7 +1111,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
                           query,
                           WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);

-    switch (PQresultStatus(pgres))
+    switch (BEPQresultStatus(pgres))
     {
         case PGRES_TUPLES_OK:
         case PGRES_SINGLE_TUPLE:
@@ -1180,7 +1153,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
         case PGRES_BAD_RESPONSE:
             walres->status = WALRCV_ERROR;
             walres->err = pchomp(PQerrorMessage(conn->streamConn));
-            diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
+            diag_sqlstate = BEPQresultErrorField(pgres, PG_DIAG_SQLSTATE);
             if (diag_sqlstate)
                 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
                                                  diag_sqlstate[1],
@@ -1190,7 +1163,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
             break;
     }

-    PQclear(pgres);
+    BEPQclear(pgres);

     return walres;
 }
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c
index 15fa4d0a55e..ce01dce9861 100644
--- a/src/backend/utils/mmgr/mcxt.c
+++ b/src/backend/utils/mmgr/mcxt.c
@@ -560,9 +560,7 @@ MemoryContextDeleteChildren(MemoryContext context)
  * the specified context, since that means it will automatically be freed
  * when no longer needed.
  *
- * There is no API for deregistering a callback once registered.  If you
- * want it to not do anything anymore, adjust the state pointed to by its
- * "arg" to indicate that.
+ * Note that callers can assume this cannot fail.
  */
 void
 MemoryContextRegisterResetCallback(MemoryContext context,
@@ -577,6 +575,41 @@ MemoryContextRegisterResetCallback(MemoryContext context,
     context->isReset = false;
 }

+/*
+ * MemoryContextUnregisterResetCallback
+ *        Undo the effects of MemoryContextRegisterResetCallback.
+ *
+ * This can be used if a callback's effects are no longer required
+ * at some point before the context has been reset/deleted.  It is the
+ * caller's responsibility to pfree the callback struct (if needed).
+ *
+ * An assertion failure occurs if the callback was not registered.
+ * We could alternatively define that case as a no-op, but that seems too
+ * likely to mask programming errors such as passing the wrong context.
+ */
+void
+MemoryContextUnregisterResetCallback(MemoryContext context,
+                                     MemoryContextCallback *cb)
+{
+    MemoryContextCallback *prev,
+               *cur;
+
+    Assert(MemoryContextIsValid(context));
+
+    for (prev = NULL, cur = context->reset_cbs; cur != NULL;
+         prev = cur, cur = cur->next)
+    {
+        if (cur != cb)
+            continue;
+        if (prev)
+            prev->next = cur->next;
+        else
+            context->reset_cbs = cur->next;
+        return;
+    }
+    Assert(false);
+}
+
 /*
  * MemoryContextCallResetCallbacks
  *        Internal function to call all registered callbacks for context.
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 16205b824fa..04b3104ab49 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -37,10 +37,10 @@
  * otherwise, but perhaps still protects against a few mistakes...
  */
 #ifdef BUILDING_DLL
-#error "libpq may not be used code directly built into the backend"
+#error "libpq may not be used in code directly built into the backend"
 #endif

-#include "libpq-fe.h"
+#include "libpq/libpq-be-fe.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
@@ -50,8 +50,8 @@

 static inline void libpqsrv_connect_prepare(void);
 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
-static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
-static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
+static inline BEPGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
+static inline BEPGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);


 /*
@@ -252,7 +252,7 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
  * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
  * notably, PQexec() would silently discard any prior query results.
  */
-static inline PGresult *
+static inline BEPGresult *
 libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
 {
     if (!PQsendQuery(conn, query))
@@ -265,7 +265,7 @@ libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
  *
  * See notes at libpqsrv_exec().
  */
-static inline PGresult *
+static inline BEPGresult *
 libpqsrv_exec_params(PGconn *conn,
                      const char *command,
                      int nParams,
@@ -286,53 +286,44 @@ libpqsrv_exec_params(PGconn *conn,
  * Like PQexec(), loop over PQgetResult() until it returns NULL or another
  * terminal state.  Return the last non-NULL result or the terminal state.
  */
-static inline PGresult *
+static inline BEPGresult *
 libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
 {
-    PGresult   *volatile lastResult = NULL;
+    BEPGresult *lastResult = NULL;

-    /* In what follows, do not leak any PGresults on an error. */
-    PG_TRY();
+    for (;;)
     {
-        for (;;)
-        {
-            /* Wait for, and collect, the next PGresult. */
-            PGresult   *result;
+        /* Wait for, and collect, the next PGresult. */
+        BEPGresult *result;

-            result = libpqsrv_get_result(conn, wait_event_info);
-            if (result == NULL)
-                break;            /* query is complete, or failure */
+        result = libpqsrv_get_result(conn, wait_event_info);
+        if (result == NULL)
+            break;                /* query is complete, or failure */

-            /*
-             * Emulate PQexec()'s behavior of returning the last result when
-             * there are many.
-             */
-            PQclear(lastResult);
-            lastResult = result;
+        /*
+         * Emulate PQexec()'s behavior of returning the last result when there
+         * are many.
+         */
+        BEPQclear(lastResult);
+        lastResult = result;

-            if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-                PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-                PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-                PQstatus(conn) == CONNECTION_BAD)
-                break;
-        }
-    }
-    PG_CATCH();
-    {
-        PQclear(lastResult);
-        PG_RE_THROW();
+        if (BEPQresultStatus(lastResult) == PGRES_COPY_IN ||
+            BEPQresultStatus(lastResult) == PGRES_COPY_OUT ||
+            BEPQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+            PQstatus(conn) == CONNECTION_BAD)
+            break;
     }
-    PG_END_TRY();
-
     return lastResult;
 }

 /*
  * Perform the equivalent of PQgetResult(), but watch for interrupts.
  */
-static inline PGresult *
+static inline BEPGresult *
 libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
 {
+    BEPGresult *bres;
+
     /*
      * Collect data until PQgetResult is ready to get the result without
      * blocking.
@@ -364,7 +355,8 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
     }

     /* Now we can collect and return the next PGresult */
-    return PQgetResult(conn);
+    BEPQwrap(bres, PQgetResult(conn), CurrentMemoryContext);
+    return bres;
 }

 /*
diff --git a/src/include/libpq/libpq-be-fe.h b/src/include/libpq/libpq-be-fe.h
new file mode 100644
index 00000000000..d4e88e46938
--- /dev/null
+++ b/src/include/libpq/libpq-be-fe.h
@@ -0,0 +1,203 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq-be-fe.h
+ *      Wrapper functions for using libpq in extensions
+ *
+ * Code built directly into the backend is not allowed to link to libpq
+ * directly. Extension code is allowed to use libpq however. One of the
+ * main risks in doing so is leaking the malloc-allocated structures
+ * returned by libpq, causing a process-lifespan memory leak.
+ *
+ * This file provides wrapper objects to help in building memory-safe code.
+ * A PGresult object wrapped this way acts much as if it were palloc'd:
+ * it will go away when the specified context is reset or deleted.
+ * We might later extend the concept to other objects such as PGconns.
+ *
+ * See also the libpq-be-fe-helpers.h file, which provides additional
+ * facilities built on top of this one.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/libpq-be-fe.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LIBPQ_BE_FE_H
+#define LIBPQ_BE_FE_H
+
+/*
+ * Despite the name, BUILDING_DLL is set only when building code directly part
+ * of the backend. Which also is where libpq isn't allowed to be
+ * used. Obviously this doesn't protect against libpq-fe.h getting included
+ * otherwise, but perhaps still protects against a few mistakes...
+ */
+#ifdef BUILDING_DLL
+#error "libpq may not be used in code directly built into the backend"
+#endif
+
+#include "libpq-fe.h"
+
+/*
+ * Memory-context-safe wrapper object for a PGresult.  In general,
+ * one of these should be allocated within the indicated memory context.
+ * (Do not, for example, put it on the stack.)  Safe usage is to allocate
+ * the struct, then call libpq to get the PGresult, then immediately call
+ * libpqsrv_wrap_result to establish the reset callback that makes it safe.
+ * Use BEPQclear() to free a wrapped PGresult.
+ */
+typedef struct BEPGresult
+{
+    PGresult   *res;            /* the wrapped PGresult, or NULL if none yet */
+    MemoryContext ctx;            /* the MemoryContext it's attached to */
+    MemoryContextCallback cb;    /* the callback that implements freeing */
+} BEPGresult;
+
+
+/*
+ * Set up a previously-allocated BEPGresult to wrap the given PGresult,
+ * attaching it to the specified memory context (which had better contain
+ * the BEPGresult).
+ */
+static inline void
+libpqsrv_wrap_result(BEPGresult *bres, PGresult *res, MemoryContext ctx)
+{
+    bres->res = res;
+    bres->ctx = ctx;
+    bres->cb.func = (MemoryContextCallbackFunction) PQclear;
+    bres->cb.arg = res;
+    MemoryContextRegisterResetCallback(ctx, &bres->cb);
+}
+
+/*
+ * Wrapper macro to simplify converting existing calls of PQgetResult()
+ * and similar functions.  "action" is an expression that returns a
+ * PGresult or NULL.  "bres" receives a pointer to a BEPGresult allocated
+ * in "ctx", or NULL if "action" returned NULL.
+ */
+#define BEPQwrap(bres, action, ctx) \
+    do { \
+        MemoryContext ctx_ = (ctx); \
+        PGresult *res_; \
+        bres = (BEPGresult *) MemoryContextAlloc(ctx_, sizeof(BEPGresult)); \
+        res_ = (action); \
+        if (res_) \
+            libpqsrv_wrap_result(bres, res_, ctx_); \
+        else \
+        { \
+            pfree(bres); \
+            bres = NULL; \
+        } \
+    } while (0)
+
+/*
+ * Free a wrapped PGresult, after detaching it from the memory context.
+ * Like PQclear(), allow the argument to be NULL.
+ */
+static inline void
+BEPQclear(BEPGresult *bres)
+{
+    if (bres)
+    {
+        MemoryContextUnregisterResetCallback(bres->ctx, &bres->cb);
+        PQclear(bres->res);
+        pfree(bres);
+    }
+}
+
+/*
+ * Accessor functions for BEPGresult.  While it's not necessary to use these,
+ * they emulate the behavior of the underlying libpq functions when passed
+ * a NULL pointer.  This is particularly important for PQresultStatus, which
+ * is often the first check on a result.
+ */
+
+static inline ExecStatusType
+BEPQresultStatus(const BEPGresult *res)
+{
+    if (!res)
+        return PGRES_FATAL_ERROR;
+    return PQresultStatus(res->res);
+}
+
+static inline char *
+BEPQresultErrorMessage(const BEPGresult *res)
+{
+    if (!res)
+        return "";
+    return PQresultErrorMessage(res->res);
+}
+
+static inline char *
+BEPQresultErrorField(const BEPGresult *res, int fieldcode)
+{
+    if (!res)
+        return NULL;
+    return PQresultErrorField(res->res, fieldcode);
+}
+
+static inline char *
+BEPQcmdStatus(const BEPGresult *res)
+{
+    if (!res)
+        return NULL;
+    return PQcmdStatus(res->res);
+}
+
+static inline int
+BEPQntuples(const BEPGresult *res)
+{
+    if (!res)
+        return 0;
+    return PQntuples(res->res);
+}
+
+static inline int
+BEPQnfields(const BEPGresult *res)
+{
+    if (!res)
+        return 0;
+    return PQnfields(res->res);
+}
+
+static inline char *
+BEPQgetvalue(const BEPGresult *res, int tup_num, int field_num)
+{
+    if (!res)
+        return NULL;
+    return PQgetvalue(res->res, tup_num, field_num);
+}
+
+static inline int
+BEPQgetlength(const BEPGresult *res, int tup_num, int field_num)
+{
+    if (!res)
+        return 0;
+    return PQgetlength(res->res, tup_num, field_num);
+}
+
+static inline int
+BEPQgetisnull(const BEPGresult *res, int tup_num, int field_num)
+{
+    if (!res)
+        return 1;                /* pretend it is null */
+    return PQgetisnull(res->res, tup_num, field_num);
+}
+
+static inline char *
+BEPQfname(const BEPGresult *res, int field_num)
+{
+    if (!res)
+        return NULL;
+    return PQfname(res->res, field_num);
+}
+
+static inline char *
+BEPQcmdTuples(const BEPGresult *res)
+{
+    if (!res)
+        return "";
+    return PQcmdTuples(res->res);
+}
+
+#endif                            /* LIBPQ_BE_FE_H */
diff --git a/src/include/utils/palloc.h b/src/include/utils/palloc.h
index e1b42267b22..039b9cba61a 100644
--- a/src/include/utils/palloc.h
+++ b/src/include/utils/palloc.h
@@ -133,6 +133,8 @@ MemoryContextSwitchTo(MemoryContext context)
 /* Registration of memory context reset/delete callbacks */
 extern void MemoryContextRegisterResetCallback(MemoryContext context,
                                                MemoryContextCallback *cb);
+extern void MemoryContextUnregisterResetCallback(MemoryContext context,
+                                                 MemoryContextCallback *cb);

 /*
  * These are like standard strdup() except the copied string is
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a8346cda633..d0a9b3dbf68 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -180,6 +180,7 @@ AutoVacOpts
 AutoVacuumShmemStruct
 AutoVacuumWorkItem
 AutoVacuumWorkItemType
+BEPGresult
 BF_ctx
 BF_key
 BF_word
--
2.43.5

From db31951db472a4e315cd9b23d5289affe09be2f2 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sun, 25 May 2025 15:17:27 -0400
Subject: [PATCH v2 2/2] Silence leakage complaint about postgres_fdw's
 InitPgFdwOptions.

Valgrind complains that the PQconninfoOption array returned by libpq
is leaked.  We apparently believed that we could suppress that warning
by storing that array's address in a static variable.  However, modern
C compilers are bright enough to optimize the static variable away.

We could escalate that arms race by making the variable global.
But on the whole it seems better to revise the code so that it
can free libpq's result properly.  The only thing that costs
us is copying the parameter-name keywords; which seems like a
pretty negligible cost in a function that runs at most once per
process.
---
 contrib/postgres_fdw/option.c | 33 ++++++++++++---------------------
 1 file changed, 12 insertions(+), 21 deletions(-)

diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index c2f936640bc..d6fa89bad93 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -21,6 +21,7 @@
 #include "libpq/libpq-be.h"
 #include "postgres_fdw.h"
 #include "utils/guc.h"
+#include "utils/memutils.h"
 #include "utils/varlena.h"

 /*
@@ -39,12 +40,6 @@ typedef struct PgFdwOption
  */
 static PgFdwOption *postgres_fdw_options;

-/*
- * Valid options for libpq.
- * Allocated and filled in InitPgFdwOptions.
- */
-static PQconninfoOption *libpq_options;
-
 /*
  * GUC parameters
  */
@@ -239,6 +234,7 @@ static void
 InitPgFdwOptions(void)
 {
     int            num_libpq_opts;
+    PQconninfoOption *libpq_options;
     PQconninfoOption *lopt;
     PgFdwOption *popt;

@@ -307,8 +303,8 @@ InitPgFdwOptions(void)
      * Get list of valid libpq options.
      *
      * To avoid unnecessary work, we get the list once and use it throughout
-     * the lifetime of this backend process.  We don't need to care about
-     * memory context issues, because PQconndefaults allocates with malloc.
+     * the lifetime of this backend process.  Hence, we'll allocate it in
+     * TopMemoryContext.
      */
     libpq_options = PQconndefaults();
     if (!libpq_options)            /* assume reason for failure is OOM */
@@ -325,19 +321,11 @@ InitPgFdwOptions(void)
     /*
      * Construct an array which consists of all valid options for
      * postgres_fdw, by appending FDW-specific options to libpq options.
-     *
-     * We use plain malloc here to allocate postgres_fdw_options because it
-     * lives as long as the backend process does.  Besides, keeping
-     * libpq_options in memory allows us to avoid copying every keyword
-     * string.
      */
     postgres_fdw_options = (PgFdwOption *)
-        malloc(sizeof(PgFdwOption) * num_libpq_opts +
-               sizeof(non_libpq_options));
-    if (postgres_fdw_options == NULL)
-        ereport(ERROR,
-                (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
-                 errmsg("out of memory")));
+        MemoryContextAlloc(TopMemoryContext,
+                           sizeof(PgFdwOption) * num_libpq_opts +
+                           sizeof(non_libpq_options));

     popt = postgres_fdw_options;
     for (lopt = libpq_options; lopt->keyword; lopt++)
@@ -355,8 +343,8 @@ InitPgFdwOptions(void)
         if (strncmp(lopt->keyword, "oauth_", strlen("oauth_")) == 0)
             continue;

-        /* We don't have to copy keyword string, as described above. */
-        popt->keyword = lopt->keyword;
+        popt->keyword = MemoryContextStrdup(TopMemoryContext,
+                                            lopt->keyword);

         /*
          * "user" and any secret options are allowed only on user mappings.
@@ -371,6 +359,9 @@ InitPgFdwOptions(void)
         popt++;
     }

+    /* Done with libpq's output structure. */
+    PQconninfoFree(libpq_options);
+
     /* Append FDW-specific options and dummy terminator. */
     memcpy(popt, non_libpq_options, sizeof(non_libpq_options));
 }
--
2.43.5


В списке pgsql-hackers по дате отправления: