[HACKERS] Assorted leaks and weirdness in parallel execution

Поиск
Список
Период
Сортировка
От Tom Lane
Тема [HACKERS] Assorted leaks and weirdness in parallel execution
Дата
Msg-id 8670.1504192177@sss.pgh.pa.us
обсуждение исходный текст
Ответы Re: [HACKERS] Assorted leaks and weirdness in parallel execution  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers
I complained a couple weeks ago that nodeGatherMerge looked like it
leaked a lot of memory when commanded to rescan.  Attached are three
proposed patches that, in combination, demonstrably result in zero
leakage across repeated rescans.

The first thing I noticed when I started digging into this was that
there was some leakage in TopMemoryContext, which seemed pretty weird.
What it turned out to be was on_dsm_detach callback registration records.
This happens because, although the comments for shm_mq_attach() claim
that shm_mq_detach() will free the shm_mq_handle, it does no such thing,
and it doesn't worry about canceling the on_dsm_detach registration
either.  So over repeated attach/detach cycles, we leak shm_mq_handles
and also callback registrations.  This isn't just a memory leak: it
means that, whenever we finally do detach from the DSM segment, we'll
execute a bunch of shm_mq_detach() calls pointed at long-since-detached-
and-reused shm_mq structs.  That seems incredibly dangerous.  It manages
not to fail ATM because our stylized use of DSM means that a shm_mq
would only ever be re-used as another shm_mq; so the only real effect is
that our last counterparty process, if still attached, would receive N
SetLatch events not just one.  But it's going to crash and burn someday.

For extra fun, the error MQs weren't ever explicitly detached from,
just left to rot until on_dsm_detach time.  Although we did pfree the
shm_mq_handles out from under them.

So the first patch attached cleans this up by making shm_mq_detach
do what it was advertised to, ie fully reverse what shm_mq_attach
does.  That means it needs to take a shm_mq_handle, not a bare shm_mq,
but that actually makes the callers cleaner anyway.  (With this patch,
there are no callers of shm_mq_get_queue(); should we remove that?)

The second patch cleans up assorted garden-variety leaks when
rescanning a GatherMerge node, by having it allocate its work
arrays just once and then re-use them across rescans.

The last patch fixes the one remaining leak I saw after applying the
first two patches, namely that execParallel.c leaks the array palloc'd
by ExecParallelSetupTupleQueues --- just the array storage, not any of
the shm_mq_handles it points to.  The given patch just adds a pfree
to ExecParallelFinish, but TBH I find this pretty unsatisfactory.
It seems like a significant modularity violation that execParallel.c
is responsible for creating those shm_mqs but not for cleaning them up.
That cleanup currently happens as a result of DestroyTupleQueueReader
calls done by nodeGather.c or nodeGatherMerge.c.  I'm tempted to
propose that we should move both the creation and the destruction of
the TupleQueueReaders into execParallel.c; the current setup is not
just weird but requires duplicative coding in the Gather nodes.
(That would make it more difficult to do the early reader destruction
that nodeGather currently does, but I am not sure we care about that.)
Another thing that seems like a poor factorization choice is that
DestroyTupleQueueReader is charged with doing shm_mq_detach even though
tqueue.c did not do the shm_mq_attach ... should we rethink that?

Comments?

            regards, tom lane

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 17b1038..ce1b907 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -480,7 +480,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
              */
             any_registrations_failed = true;
             pcxt->worker[i].bgwhandle = NULL;
-            pfree(pcxt->worker[i].error_mqh);
+            shm_mq_detach(pcxt->worker[i].error_mqh);
             pcxt->worker[i].error_mqh = NULL;
         }
     }
@@ -612,7 +612,7 @@ DestroyParallelContext(ParallelContext *pcxt)
             {
                 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);

-                pfree(pcxt->worker[i].error_mqh);
+                shm_mq_detach(pcxt->worker[i].error_mqh);
                 pcxt->worker[i].error_mqh = NULL;
             }
         }
@@ -861,7 +861,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)

         case 'X':                /* Terminate, indicating clean exit */
             {
-                pfree(pcxt->worker[i].error_mqh);
+                shm_mq_detach(pcxt->worker[i].error_mqh);
                 pcxt->worker[i].error_mqh = NULL;
                 break;
             }
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 4c4fcf5..cb262d8 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -578,7 +578,7 @@ tqueueShutdownReceiver(DestReceiver *self)
 {
     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;

-    shm_mq_detach(shm_mq_get_queue(tqueue->queue));
+    shm_mq_detach(tqueue->queue);
 }

 /*
@@ -650,7 +650,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 void
 DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-    shm_mq_detach(shm_mq_get_queue(reader->queue));
+    shm_mq_detach(reader->queue);
     if (reader->typmodmap != NULL)
         hash_destroy(reader->typmodmap);
     /* Is it worth trying to free substructure of the remap tree? */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 8fbc038..e1a24b6 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -21,7 +21,6 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"

-static shm_mq *pq_mq;
 static shm_mq_handle *pq_mq_handle;
 static bool pq_mq_busy = false;
 static pid_t pq_mq_parallel_master_pid = 0;
@@ -56,7 +55,6 @@ void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
     PqCommMethods = &PqCommMqMethods;
-    pq_mq = shm_mq_get_queue(mqh);
     pq_mq_handle = mqh;
     whereToSendOutput = DestRemote;
     FrontendProtocol = PG_PROTOCOL_LATEST;
@@ -70,7 +68,6 @@ pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 static void
 pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
 {
-    pq_mq = NULL;
     pq_mq_handle = NULL;
     whereToSendOutput = DestNone;
 }
@@ -135,9 +132,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
      */
     if (pq_mq_busy)
     {
-        if (pq_mq != NULL)
-            shm_mq_detach(pq_mq);
-        pq_mq = NULL;
+        if (pq_mq_handle != NULL)
+            shm_mq_detach(pq_mq_handle);
         pq_mq_handle = NULL;
         return EOF;
     }
@@ -148,7 +144,7 @@ mq_putmessage(char msgtype, const char *s, size_t len)
      * be generated late in the shutdown sequence, after all DSMs have already
      * been detached.
      */
-    if (pq_mq == NULL)
+    if (pq_mq_handle == NULL)
         return 0;

     pq_mq_busy = true;
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f45a67c..5429711 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -83,7 +83,9 @@ struct shm_mq
  * This structure is a backend-private handle for access to a queue.
  *
  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
- * a pointer to the dynamic shared memory segment that contains it.
+ * an optional pointer to the dynamic shared memory segment that contains it.
+ * (If mqh_segment is provided, we register an on_dsm_detach callback to
+ * make sure we detach from the queue before detaching from DSM.)
  *
  * If this queue is intended to connect the current process with a background
  * worker that started it, the user can pass a pointer to the worker handle
@@ -139,6 +141,7 @@ struct shm_mq_handle
     MemoryContext mqh_context;
 };

+static void shm_mq_detach_internal(shm_mq *mq);
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
                   const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
@@ -766,6 +769,25 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)

 /*
  * Detach a shared message queue.
+ */
+void
+shm_mq_detach(shm_mq_handle *mqh)
+{
+    /* Notify counterparty that we're outta here. */
+    shm_mq_detach_internal(mqh->mqh_queue);
+
+    /* Cancel on_dsm_detach callback, if any. */
+    if (mqh->mqh_segment)
+        cancel_on_dsm_detach(mqh->mqh_segment,
+                             shm_mq_detach_callback,
+                             PointerGetDatum(mqh->mqh_queue));
+
+    /* Release memory allocated by shm_mq_attach. */
+    pfree(mqh);
+}
+
+/*
+ * Notify counterparty that we're detaching from shared message queue.
  *
  * The purpose of this function is to make sure that the process
  * with which we're communicating doesn't block forever waiting for us to
@@ -773,9 +795,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
  * detaches, the receiver can read any messages remaining in the queue;
  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ *
+ * This is separated out from shm_mq_detach() because if the on_dsm_detach
+ * callback fires, we only want to do this much.  We do not try to touch
+ * the local shm_mq_handle, as it may have been pfree'd already.
  */
-void
-shm_mq_detach(shm_mq *mq)
+static void
+shm_mq_detach_internal(shm_mq *mq)
 {
     volatile shm_mq *vmq = mq;
     PGPROC       *victim;
@@ -1193,5 +1219,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 {
     shm_mq       *mq = (shm_mq *) DatumGetPointer(arg);

-    shm_mq_detach(mq);
+    shm_mq_detach_internal(mq);
 }
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 02a93e0..97f0280 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -63,7 +63,7 @@ extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
 extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);

 /* Break connection. */
-extern void shm_mq_detach(shm_mq *);
+extern void shm_mq_detach(shm_mq_handle *mqh);

 /* Get the shm_mq from handle. */
 extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 67da5ff..0ffe120 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -55,8 +55,10 @@ static int32 heap_compare_slots(Datum a, Datum b, void *arg);
 static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
 static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
                   bool nowait, bool *done);
-static void gather_merge_init(GatherMergeState *gm_state);
 static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
+static void gather_merge_setup(GatherMergeState *gm_state);
+static void gather_merge_init(GatherMergeState *gm_state);
+static void gather_merge_clear_tuples(GatherMergeState *gm_state);
 static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
                       bool nowait);
 static void load_tuple_array(GatherMergeState *gm_state, int reader);
@@ -149,14 +151,17 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
     }

     /*
-     * store the tuple descriptor into gather merge state, so we can use it
-     * later while initializing the gather merge slots.
+     * Store the tuple descriptor into gather merge state, so we can use it
+     * while initializing the gather merge slots.
      */
     if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
         hasoid = false;
     tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
     gm_state->tupDesc = tupDesc;

+    /* Now allocate the workspace for gather merge */
+    gather_merge_setup(gm_state);
+
     return gm_state;
 }

@@ -340,6 +345,9 @@ ExecReScanGatherMerge(GatherMergeState *node)
     /* Make sure any existing workers are gracefully shut down */
     ExecShutdownGatherMergeWorkers(node);

+    /* Free any unused tuples, so we don't leak memory across rescans */
+    gather_merge_clear_tuples(node);
+
     /* Mark node so that shared state will be rebuilt at next call */
     node->initialized = false;
     node->gm_initialized = false;
@@ -370,49 +378,86 @@ ExecReScanGatherMerge(GatherMergeState *node)
 }

 /*
- * Initialize the Gather merge tuple read.
+ * Set up the data structures that we'll need for Gather Merge.
  *
- * Pull at least a single tuple from each worker + leader and set up the heap.
+ * We allocate these once on the basis of gm->num_workers, which is an
+ * upper bound for the number of workers we'll actually have.  During
+ * a rescan, we reset the structures to empty.  This approach simplifies
+ * not leaking memory across rescans.
  */
 static void
-gather_merge_init(GatherMergeState *gm_state)
+gather_merge_setup(GatherMergeState *gm_state)
 {
-    int            nreaders = gm_state->nreaders;
-    bool        nowait = true;
+    GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
+    int            nreaders = gm->num_workers;
     int            i;

     /*
      * Allocate gm_slots for the number of workers + one more slot for leader.
-     * Last slot is always for leader. Leader always calls ExecProcNode() to
-     * read the tuple which will return the TupleTableSlot. Later it will
-     * directly get assigned to gm_slot. So just initialize leader gm_slot
-     * with NULL. For other slots, code below will call
-     * ExecInitExtraTupleSlot() to create a slot for the worker's results.
+     * Last slot is always for leader.  Leader always calls ExecProcNode() to
+     * read the tuple, and then stores it directly into its gm_slots entry.
+     * For other slots, code below will call ExecInitExtraTupleSlot() to
+     * create a slot for the worker's results.
      */
-    gm_state->gm_slots =
-        palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
-    gm_state->gm_slots[gm_state->nreaders] = NULL;
-
-    /* Initialize the tuple slot and tuple array for each worker */
-    gm_state->gm_tuple_buffers =
-        (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
-                                        gm_state->nreaders);
-    for (i = 0; i < gm_state->nreaders; i++)
+    gm_state->gm_slots = (TupleTableSlot **)
+        palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
+
+    /* Allocate the tuple slot and tuple array for each worker */
+    gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
+        palloc0(nreaders * sizeof(GMReaderTupleBuffer));
+
+    for (i = 0; i < nreaders; i++)
     {
         /* Allocate the tuple array with length MAX_TUPLE_STORE */
         gm_state->gm_tuple_buffers[i].tuple =
             (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);

-        /* Initialize slot for worker */
+        /* Initialize tuple slot for worker */
         gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state);
         ExecSetSlotDescriptor(gm_state->gm_slots[i],
                               gm_state->tupDesc);
     }

     /* Allocate the resources for the merge */
-    gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+    gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
                                             heap_compare_slots,
                                             gm_state);
+}
+
+/*
+ * Initialize the Gather Merge.
+ *
+ * Reset data structures to ensure they're empty.  Then pull at least one
+ * tuple from each worker + leader (or set its "done" indicator), and set up
+ * the heap.
+ */
+static void
+gather_merge_init(GatherMergeState *gm_state)
+{
+    int            nreaders = gm_state->nreaders;
+    bool        nowait = true;
+    int            i;
+
+    /* Assert that gather_merge_setup made enough space */
+    Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
+
+    /* Reset leader's tuple slot to empty */
+    gm_state->gm_slots[nreaders] = NULL;
+
+    /* Reset the tuple slot and tuple array for each worker */
+    for (i = 0; i < nreaders; i++)
+    {
+        /* Reset tuple array to empty */
+        gm_state->gm_tuple_buffers[i].nTuples = 0;
+        gm_state->gm_tuple_buffers[i].readCounter = 0;
+        /* Reset done flag to not-done */
+        gm_state->gm_tuple_buffers[i].done = false;
+        /* Ensure output slot is empty */
+        ExecClearTuple(gm_state->gm_slots[i]);
+    }
+
+    /* Reset binary heap to empty */
+    binaryheap_reset(gm_state->gm_heap);

     /*
      * First, try to read a tuple from each worker (including leader) in
@@ -467,23 +512,23 @@ reread:
 }

 /*
- * Clear out the tuple table slots for each gather merge input.
+ * Clear out the tuple table slot, and any unused pending tuples,
+ * for each gather merge input.
  */
 static void
-gather_merge_clear_slots(GatherMergeState *gm_state)
+gather_merge_clear_tuples(GatherMergeState *gm_state)
 {
     int            i;

     for (i = 0; i < gm_state->nreaders; i++)
     {
-        pfree(gm_state->gm_tuple_buffers[i].tuple);
+        GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
+
+        while (tuple_buffer->readCounter < tuple_buffer->nTuples)
+            heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+
         ExecClearTuple(gm_state->gm_slots[i]);
     }
-
-    /* Free tuple array as we don't need it any more */
-    pfree(gm_state->gm_tuple_buffers);
-    /* Free the binaryheap, which was created for sort */
-    binaryheap_free(gm_state->gm_heap);
 }

 /*
@@ -526,7 +571,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
     if (binaryheap_empty(gm_state->gm_heap))
     {
         /* All the queues are exhausted, and so is the heap */
-        gather_merge_clear_slots(gm_state);
+        gather_merge_clear_tuples(gm_state);
         return NULL;
     }
     else
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c713b85..3234900 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -753,11 +753,16 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
     for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
         InstrAccumParallelQuery(&pei->buffer_usage[i]);

-    /* Finally, accumulate instrumentation, if any. */
+    /* Accumulate instrumentation, if any. */
     if (pei->instrumentation)
         ExecParallelRetrieveInstrumentation(pei->planstate,
                                             pei->instrumentation);

+    /* Clean up assorted storage. */
+    if (pei->tqueue)
+        pfree(pei->tqueue);
+    pei->tqueue = NULL;
+
     pei->finished = true;
 }


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Michael Paquier
Дата:
Сообщение: Re: [HACKERS] Hooks to track changed pages for backup purposes
Следующее
От: "Bossart, Nathan"
Дата:
Сообщение: Re: [HACKERS] [Proposal] Allow users to specify multiple tables inVACUUM commands