Обсуждение: Performance issues with parallelism and LIMIT
Hi hackers, While migrating from PostgreSQL 14 to 15, we encountered the following performance degradation caused by commit 46846433a03dff: "shm_mq: Update mq_bytes_written less often", discussion in [1]. The batching can make queries with a LIMIT clause run significantly slower compared to PostgreSQL 14, because neither the ring buffer write position is updated, nor the latch to inform the leader that there's data available is set, before a worker's queue is 1/4th full. This can be seen in the number of rows produced by a parallel worker. Worst-case, the data set is large and all rows to answer the query appear early, but are not big enough to fill the queue to 1/4th (e.g. when the LIMIT and the tuple sizes are small). Here is an example to reproduce the problem. CREATE TABLE t(id1 INT, id2 INT, id3 INT, id4 INT, id5 INT); INSERT INTO t(id1, id2, id3, id4, id5) SELECT i%1000, i, i, i, i FROM generate_series(1, 10000000) AS i; ANALYZE t; SET parallel_tuple_cost = 0; SET parallel_setup_cost = 0; SET min_parallel_table_scan_size = 0; SET max_parallel_workers_per_gather = 8; EXPLAIN ANALYZE VERBOSE SELECT id2 FROM t WHERE id1 = 100 LIMIT 100; PostgreSQL 15: Limit (cost=0.00..797.43 rows=100 width=4) (actual time=65.083..69.207 rows=100 loops=1) Output: id2 -> Gather (cost=0.00..79320.18 rows=9947 width=4) (actual time=65.073..68.417 rows=100 loops=1) Output: id2 Workers Planned: 8 Workers Launched: 7 -> Parallel Seq Scan on public.t (cost=0.00..79320.18 rows=1243 width=4) (actual time=0.204..33.049 rows=100 loops=7) Output: id2 Filter: (t.id1 = 100) Rows Removed by Filter: 99345 Worker 0: actual time=0.334..32.284 rows=100 loops=1 Worker 1: actual time=0.060..32.680 rows=100 loops=1 Worker 2: actual time=0.637..33.954 rows=98 loops=1 Worker 3: actual time=0.136..33.301 rows=100 loops=1 Worker 4: actual time=0.140..31.942 rows=100 loops=1 Worker 5: actual time=0.062..33.673 rows=100 loops=1 Worker 6: actual time=0.062..33.512 rows=100 loops=1 Planning Time: 0.113 ms Execution Time: 69.772 ms PostgreSQL 14: Limit (cost=0.00..797.75 rows=100 width=4) (actual time=30.602..38.459 rows=100 loops=1) Output: id2 -> Gather (cost=0.00..79320.18 rows=9943 width=4) (actual time=30.592..37.669 rows=100 loops=1) Output: id2 Workers Planned: 8 Workers Launched: 7 -> Parallel Seq Scan on public.t (cost=0.00..79320.18 rows=1243 width=4) (actual time=0.221..5.181 rows=15 loops=7) Output: id2 Filter: (t.id1 = 100) Rows Removed by Filter: 15241 Worker 0: actual time=0.129..4.840 rows=15 loops=1 Worker 1: actual time=0.125..4.924 rows=15 loops=1 Worker 2: actual time=0.314..5.249 rows=17 loops=1 Worker 3: actual time=0.252..5.341 rows=15 loops=1 Worker 4: actual time=0.163..5.179 rows=15 loops=1 Worker 5: actual time=0.422..5.248 rows=15 loops=1 Worker 6: actual time=0.139..5.489 rows=16 loops=1 Planning Time: 0.084 ms Execution Time: 38.880 ms I had a quick look at the code and I started wondering if we can't achieve the same performance improvement without batching by e.g.: - Only set the latch if new data is written to an empty queue. Otherwise, the leader should anyways keep try reading from the queues without waiting for the latch, so no need to set the latch again. - Reorganize struct shm_mq. There seems to be false sharing happening between at least mq_ring_size and the atomics and potentially also between the atomics. I'm wondering if the that's not the root cause of the "slow atomics" observed in [1]? I'm happy to do some profiling. Alternatively, we could always set the latch if numberTuples in ExecutePlan() is reasonably low. To do so, the DestReceiver's receive() method would only need an additional "force flush" argument. A slightly different but related problem is when some workers have already produced enough rows to answer the LIMIT query, but other workers are still running without producing any new rows. In that case the "already done" workers will stop running even though they haven't reached 1/4th of the queue size, because the for-loop in execMain.c bails out in the following condition: if (numberTuples && numberTuples == current_tuple_count) break; Subsequently, the leader will end the plan and then wait in the Gather node for all workers to shutdown. However, workers still running but not producing any new rows will never reach the following condition in execMain.c to check if they're supposed to stop (the shared memory queue dest receiver will return false on detached queues): /* * If we are not able to send the tuple, we assume the destination * has closed and no more tuples can be sent. If that's the case, * end the loop. */ if (!dest->receiveSlot(slot, dest)) break; Reproduction steps for this problem are below. Here the worker getting the first table page will be done right away, but the query takes as long as it takes to scan all pages of the entire table. CREATE TABLE bar (col INT); INSERT INTO bar SELECT generate_series(1, 5000000); SET max_parallel_workers_per_gather = 8; EXPLAIN ANALYZE VERBOSE SELECT col FROM bar WHERE col = 1 LIMIT 1; Limit (cost=0.00..1.10 rows=1 width=4) (actual time=32.289..196.200 rows=1 loops=1) Output: col -> Gather (cost=0.00..30939.03 rows=28208 width=4) (actual time=32.278..196.176 rows=1 loops=1) Output: col Workers Planned: 8 Workers Launched: 7 -> Parallel Seq Scan on public.bar (cost=0.00..30939.03 rows=3526 width=4) (actual time=137.251..137.255 rows=0 loops=7) Output: col Filter: (bar.col = 1) Rows Removed by Filter: 713769 Worker 0: actual time=160.177..160.181 rows=0 loops=1 Worker 1: actual time=160.111..160.115 rows=0 loops=1 Worker 2: actual time=0.043..0.047 rows=1 loops=1 Worker 3: actual time=160.040..160.044 rows=0 loops=1 Worker 4: actual time=160.167..160.171 rows=0 loops=1 Worker 5: actual time=160.018..160.022 rows=0 loops=1 Worker 6: actual time=160.201..160.205 rows=0 loops=1 Planning Time: 0.087 ms Execution Time: 196.247 ms We would need something similar to CHECK_FOR_INTERRUPTS() which returns a NULL slot if a parallel worker is supposed to stop execution (we could e.g. check if the queue got detached). Or could we amend CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue got detached? Jasper Smit, Spiros Agathos and Dimos Stamatakis helped working on this. [1] https://www.postgresql.org/message-id/flat/CAFiTN-tVXqn_OG7tHNeSkBbN%2BiiCZTiQ83uakax43y1sQb2OBA%40mail.gmail.com -- David Geier (ServiceNow)
On 2/1/23 14:41, David Geier wrote: > Hi hackers, > > While migrating from PostgreSQL 14 to 15, we encountered the following > performance degradation caused by commit 46846433a03dff: "shm_mq: Update > mq_bytes_written less often", discussion in [1]. > > The batching can make queries with a LIMIT clause run significantly > slower compared to PostgreSQL 14, because neither the ring buffer write > position is updated, nor the latch to inform the leader that there's > data available is set, before a worker's queue is 1/4th full. This can > be seen in the number of rows produced by a parallel worker. Worst-case, > the data set is large and all rows to answer the query appear early, but > are not big enough to fill the queue to 1/4th (e.g. when the LIMIT and > the tuple sizes are small). Here is an example to reproduce the problem. > Yeah, this is a pretty annoying regression. We already can hit poor behavior when matching rows are not distributed uniformly in the tables (which is what LIMIT costing assumes), and this makes it more likely to hit similar issues. A bit like when doing many HTTP requests makes it more likely to hit at least one 99% outlier. > ... > > I had a quick look at the code and I started wondering if we can't > achieve the same performance improvement without batching by e.g.: > > - Only set the latch if new data is written to an empty queue. > Otherwise, the leader should anyways keep try reading from the queues > without waiting for the latch, so no need to set the latch again. > > - Reorganize struct shm_mq. There seems to be false sharing happening > between at least mq_ring_size and the atomics and potentially also > between the atomics. I'm wondering if the that's not the root cause of > the "slow atomics" observed in [1]? I'm happy to do some profiling. > > Alternatively, we could always set the latch if numberTuples in > ExecutePlan() is reasonably low. To do so, the DestReceiver's receive() > method would only need an additional "force flush" argument. > No opinion on these options, but worth a try. Alternatively, we could try the usual doubling approach - start with a low threshold (and set the latch frequently), and then gradually increase it up to the 1/4. That should work both for queries expecting only few rows and those producing a lot of data. > > A slightly different but related problem is when some workers have > already produced enough rows to answer the LIMIT query, but other > workers are still running without producing any new rows. In that case > the "already done" workers will stop running even though they haven't > reached 1/4th of the queue size, because the for-loop in execMain.c > bails out in the following condition: > > if (numberTuples && numberTuples == current_tuple_count) > break; > > Subsequently, the leader will end the plan and then wait in the Gather > node for all workers to shutdown. However, workers still running but not > producing any new rows will never reach the following condition in > execMain.c to check if they're supposed to stop (the shared memory queue > dest receiver will return false on detached queues): > > /* > * If we are not able to send the tuple, we assume the > destination > * has closed and no more tuples can be sent. If that's the > case, > * end the loop. > */ > if (!dest->receiveSlot(slot, dest)) > break; > > Reproduction steps for this problem are below. Here the worker getting > the first table page will be done right away, but the query takes as > long as it takes to scan all pages of the entire table. > Ouch! > ... > > We would need something similar to CHECK_FOR_INTERRUPTS() which returns > a NULL slot if a parallel worker is supposed to stop execution (we could > e.g. check if the queue got detached). Or could we amend > CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue > got detached? > That sounds reasonable, but I'm not very familiar the leader-worker communication, so no opinion on how it should be done. regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Hi, On 2/8/23 11:42, Tomas Vondra wrote: > On 2/1/23 14:41, David Geier wrote: > > Yeah, this is a pretty annoying regression. We already can hit poor > behavior when matching rows are not distributed uniformly in the tables > (which is what LIMIT costing assumes), and this makes it more likely to > hit similar issues. A bit like when doing many HTTP requests makes it > more likely to hit at least one 99% outlier. Are you talking about the use of ordering vs filtering indexes in queries where there's both an ORDER BY and a filter present (e.g. using an ordering index but then all rows passing the filter are at the end of the table)? If not, can you elaborate a bit more on that and maybe give an example. > No opinion on these options, but worth a try. Alternatively, we could > try the usual doubling approach - start with a low threshold (and set > the latch frequently), and then gradually increase it up to the 1/4. > > That should work both for queries expecting only few rows and those > producing a lot of data. I was thinking about this variant as well. One more alternative would be latching the leader once a worker has produced 1/Nth of the LIMIT where N is the number of workers. Both variants have the disadvantage that there are still corner cases where the latch is set too late; but it would for sure be much better than what we have today. I also did some profiling and - at least on my development laptop with 8 physical cores - the original example, motivating the batching change is slower than when it's disabled by commenting out: if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) SET parallel_tuple_cost TO 0; CREATE TABLE b (a int); INSERT INTO b SELECT generate_series(1, 200000000); ANALYZE b; EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b; Gather (cost=1000.00..1200284.61 rows=200375424 width=4) (actual rows=200000000 loops=1) Workers Planned: 7 Workers Launched: 7 -> Parallel Seq Scan on b (cost=0.00..1199284.61 rows=28625061 width=4) (actual rows=25000000 loops=8) Always latch: 19055 ms Batching: 19575 ms If I find some time, I'll play around a bit more and maybe propose a patch. >> ... >> >> We would need something similar to CHECK_FOR_INTERRUPTS() which returns >> a NULL slot if a parallel worker is supposed to stop execution (we could >> e.g. check if the queue got detached). Or could we amend >> CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue >> got detached? >> > That sounds reasonable, but I'm not very familiar the leader-worker > communication, so no opinion on how it should be done. I think an extra macro that needs to be called from dozens of places to check if parallel execution is supposed to end is the least preferred approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if we cannot actively signal the workers that they should stop. -- David Geier (ServiceNow)
On 2/20/23 19:18, David Geier wrote: > Hi, > > On 2/8/23 11:42, Tomas Vondra wrote: >> On 2/1/23 14:41, David Geier wrote: >> >> Yeah, this is a pretty annoying regression. We already can hit poor >> behavior when matching rows are not distributed uniformly in the tables >> (which is what LIMIT costing assumes), and this makes it more likely to >> hit similar issues. A bit like when doing many HTTP requests makes it >> more likely to hit at least one 99% outlier. > Are you talking about the use of ordering vs filtering indexes in > queries where there's both an ORDER BY and a filter present (e.g. using > an ordering index but then all rows passing the filter are at the end of > the table)? If not, can you elaborate a bit more on that and maybe give > an example. Yeah, roughly. I don't think the explicit ORDER BY is a requirement for this to happen - it's enough when the part of the plan below LIMIT produces many rows, but the matching rows are at the end. >> No opinion on these options, but worth a try. Alternatively, we could >> try the usual doubling approach - start with a low threshold (and set >> the latch frequently), and then gradually increase it up to the 1/4. >> >> That should work both for queries expecting only few rows and those >> producing a lot of data. > > I was thinking about this variant as well. One more alternative would be > latching the leader once a worker has produced 1/Nth of the LIMIT where > N is the number of workers. Both variants have the disadvantage that > there are still corner cases where the latch is set too late; but it > would for sure be much better than what we have today. > > I also did some profiling and - at least on my development laptop with 8 > physical cores - the original example, motivating the batching change is > slower than when it's disabled by commenting out: > > if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) > > SET parallel_tuple_cost TO 0; > CREATE TABLE b (a int); > INSERT INTO b SELECT generate_series(1, 200000000); > ANALYZE b; > EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b; > > Gather (cost=1000.00..1200284.61 rows=200375424 width=4) (actual > rows=200000000 loops=1) > Workers Planned: 7 > Workers Launched: 7 > -> Parallel Seq Scan on b (cost=0.00..1199284.61 rows=28625061 > width=4) (actual rows=25000000 loops=8) > > Always latch: 19055 ms > Batching: 19575 ms > > If I find some time, I'll play around a bit more and maybe propose a patch. > OK. Once you have a WIP patch maybe share it and I'll try to do some profiling too. >>> ... >>> >>> We would need something similar to CHECK_FOR_INTERRUPTS() which returns >>> a NULL slot if a parallel worker is supposed to stop execution (we could >>> e.g. check if the queue got detached). Or could we amend >>> CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue >>> got detached? >>> >> That sounds reasonable, but I'm not very familiar the leader-worker >> communication, so no opinion on how it should be done. > > I think an extra macro that needs to be called from dozens of places to > check if parallel execution is supposed to end is the least preferred > approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if > we cannot actively signal the workers that they should stop. > IMHO if this requires adding another macro to a bunch of ad hoc places is rather inconvenient. It'd be much better to fix this in a localized manner (especially as it seems related to a fairly specific place). regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
On 2/22/23 13:22, Tomas Vondra wrote: > ... > >>> No opinion on these options, but worth a try. Alternatively, we could >>> try the usual doubling approach - start with a low threshold (and set >>> the latch frequently), and then gradually increase it up to the 1/4. >>> >>> That should work both for queries expecting only few rows and those >>> producing a lot of data. >> >> I was thinking about this variant as well. One more alternative would be >> latching the leader once a worker has produced 1/Nth of the LIMIT where >> N is the number of workers. Both variants have the disadvantage that >> there are still corner cases where the latch is set too late; but it >> would for sure be much better than what we have today. >> >> I also did some profiling and - at least on my development laptop with 8 >> physical cores - the original example, motivating the batching change is >> slower than when it's disabled by commenting out: >> >> if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) >> >> SET parallel_tuple_cost TO 0; >> CREATE TABLE b (a int); >> INSERT INTO b SELECT generate_series(1, 200000000); >> ANALYZE b; >> EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b; >> >> Gather (cost=1000.00..1200284.61 rows=200375424 width=4) (actual >> rows=200000000 loops=1) >> Workers Planned: 7 >> Workers Launched: 7 >> -> Parallel Seq Scan on b (cost=0.00..1199284.61 rows=28625061 >> width=4) (actual rows=25000000 loops=8) >> >> Always latch: 19055 ms >> Batching: 19575 ms >> >> If I find some time, I'll play around a bit more and maybe propose a patch. >> > > OK. Once you have a WIP patch maybe share it and I'll try to do some > profiling too. > >>>> ... >>>> >>>> We would need something similar to CHECK_FOR_INTERRUPTS() which returns >>>> a NULL slot if a parallel worker is supposed to stop execution (we could >>>> e.g. check if the queue got detached). Or could we amend >>>> CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue >>>> got detached? >>>> >>> That sounds reasonable, but I'm not very familiar the leader-worker >>> communication, so no opinion on how it should be done. >> >> I think an extra macro that needs to be called from dozens of places to >> check if parallel execution is supposed to end is the least preferred >> approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if >> we cannot actively signal the workers that they should stop. >> > > IMHO if this requires adding another macro to a bunch of ad hoc places > is rather inconvenient. It'd be much better to fix this in a localized > manner (especially as it seems related to a fairly specific place). > David, do you still plan to try fixing these issues? I have a feeling those issues may be fairly common but often undetected, or just brushed of as "slow query" (AFAICS it was only caught thanks to comparing timings before/after upgrade). Would be great to improve this. regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Hi Tomas! I've finally got time again to work on PostgreSQL. On 03.11.2023 21:48, Tomas Vondra wrote: > On 2/22/23 13:22, Tomas Vondra wrote: >> ... >> >>>> No opinion on these options, but worth a try. Alternatively, we could >>>> try the usual doubling approach - start with a low threshold (and set >>>> the latch frequently), and then gradually increase it up to the 1/4. >>>> >>>> That should work both for queries expecting only few rows and those >>>> producing a lot of data. >>> I was thinking about this variant as well. One more alternative would be >>> latching the leader once a worker has produced 1/Nth of the LIMIT where >>> N is the number of workers. Both variants have the disadvantage that >>> there are still corner cases where the latch is set too late; but it >>> would for sure be much better than what we have today. Or always latching when a LIMIT is present. When a LIMIT is present, it's much more likely that the latency hurts than that it doesn't. >>> I also did some profiling and - at least on my development laptop with 8 >>> physical cores - the original example, motivating the batching change is >>> slower than when it's disabled by commenting out: >>> >>> if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) >>> >>> SET parallel_tuple_cost TO 0; >>> CREATE TABLE b (a int); >>> INSERT INTO b SELECT generate_series(1, 200000000); >>> ANALYZE b; >>> EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b; >>> >>> Gather (cost=1000.00..1200284.61 rows=200375424 width=4) (actual >>> rows=200000000 loops=1) >>> Workers Planned: 7 >>> Workers Launched: 7 >>> -> Parallel Seq Scan on b (cost=0.00..1199284.61 rows=28625061 >>> width=4) (actual rows=25000000 loops=8) >>> >>> Always latch: 19055 ms >>> Batching: 19575 ms >>> >>> If I find some time, I'll play around a bit more and maybe propose a patch. I've also remeasured the shared memory latching with and without the 1/4th check using the original example from [1]. Apart from the code line mentioned by you, I also commented out the check on the consumer side: if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) On my dev laptop (i9-13950HX) the runtimes are pretty much the same with 8 workers (16-17 seconds with some variance). It would be great to understand when this truly helps, if at all, to see if we need some smartness to latch the consumer or if we can just remove the 1/4th check. If this turns out to be more involved we could also move this discussion into a separate thread and have this thread focus on stopping the parallel workers early, see below. >> >> OK. Once you have a WIP patch maybe share it and I'll try to do some >> profiling too. >> >>>>> ... >>>>> >>>>> We would need something similar to CHECK_FOR_INTERRUPTS() which returns >>>>> a NULL slot if a parallel worker is supposed to stop execution (we could >>>>> e.g. check if the queue got detached). Or could we amend >>>>> CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue >>>>> got detached? >>>>> >>>> That sounds reasonable, but I'm not very familiar the leader-worker >>>> communication, so no opinion on how it should be done. >>> >>> I think an extra macro that needs to be called from dozens of places to >>> check if parallel execution is supposed to end is the least preferred >>> approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if >>> we cannot actively signal the workers that they should stop. >>> >> >> IMHO if this requires adding another macro to a bunch of ad hoc places >> is rather inconvenient. It'd be much better to fix this in a localized >> manner (especially as it seems related to a fairly specific place). I've written up a draft patch that instructs workers to stop, once the leader has gotten enough rows according to the LIMIT clause. I'm using SendProcSignal() to inform the workers to take action and stop executing ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see a good way of doing this differently which is not much more intrusive. The patch is incomplete (comments, support for Gather Merge, more testing, etc.) but I'm mostly interested at this point if the overall approach is deemed fine. I first tried to use TerminateBackgroundWorker() but postmaster.c then logs the worker termination and also some of the cleanup code needed for proper instrumentation doesn't run any longer in the parallel workers. With the patch applied, the query from the first mail of this thread runs in a few milliseconds. That it still takes that long is because forking, plan (de-)serialization and remaining initialization are fairly heavy weight. With threads, the "fork" time would already much lower and no (de-)serialization would be necessary. In the process-based architecture it would be interesting to think about adding a parallel worker pool. > > David, do you still plan to try fixing these issues? I have a feeling > those issues may be fairly common but often undetected, or just brushed > of as "slow query" (AFAICS it was only caught thanks to comparing > timings before/after upgrade). Would be great to improve this. I completely agree. And while they look like corner cases, if the workload is diverse enough they will be encountered (both findings are from the field). If it's then a query that runs frequently enough it causes a real issue that is hard to be diagnosed by the DBA. [1] https://www.postgresql.org/message-id/flat/CAFiTN-tVXqn_OG7tHNeSkBbN%2BiiCZTiQ83uakax43y1sQb2OBA%40mail.gmail.com -- David Geier
Вложения
Hi David, Sorry for not responding to this thread earlier. On 9/2/25 13:38, David Geier wrote: > Hi Tomas! > > I've finally got time again to work on PostgreSQL. > > On 03.11.2023 21:48, Tomas Vondra wrote: >> On 2/22/23 13:22, Tomas Vondra wrote: >>> ... >>> >>>>> No opinion on these options, but worth a try. Alternatively, we could >>>>> try the usual doubling approach - start with a low threshold (and set >>>>> the latch frequently), and then gradually increase it up to the 1/4. >>>>> >>>>> That should work both for queries expecting only few rows and those >>>>> producing a lot of data. >>>> I was thinking about this variant as well. One more alternative would be >>>> latching the leader once a worker has produced 1/Nth of the LIMIT where >>>> N is the number of workers. Both variants have the disadvantage that >>>> there are still corner cases where the latch is set too late; but it >>>> would for sure be much better than what we have today. > > Or always latching when a LIMIT is present. When a LIMIT is present, > it's much more likely that the latency hurts than that it doesn't. > I think something like this is probably the way to go ... >>>> I also did some profiling and - at least on my development laptop with 8 >>>> physical cores - the original example, motivating the batching change is >>>> slower than when it's disabled by commenting out: >>>> >>>> if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2)) >>>> >>>> SET parallel_tuple_cost TO 0; >>>> CREATE TABLE b (a int); >>>> INSERT INTO b SELECT generate_series(1, 200000000); >>>> ANALYZE b; >>>> EXPLAIN (ANALYZE, TIMING OFF) SELECT * FROM b; >>>> >>>> Gather (cost=1000.00..1200284.61 rows=200375424 width=4) (actual >>>> rows=200000000 loops=1) >>>> Workers Planned: 7 >>>> Workers Launched: 7 >>>> -> Parallel Seq Scan on b (cost=0.00..1199284.61 rows=28625061 >>>> width=4) (actual rows=25000000 loops=8) >>>> >>>> Always latch: 19055 ms >>>> Batching: 19575 ms >>>> >>>> If I find some time, I'll play around a bit more and maybe propose a patch. > > I've also remeasured the shared memory latching with and without the > 1/4th check using the original example from [1]. Apart from the code > line mentioned by you, I also commented out the check on the consumer side: > > if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) > > On my dev laptop (i9-13950HX) the runtimes are pretty much the same with > 8 workers (16-17 seconds with some variance). It would be great to > understand when this truly helps, if at all, to see if we need some > smartness to latch the consumer or if we can just remove the 1/4th check. > > If this turns out to be more involved we could also move this discussion > into a separate thread and have this thread focus on stopping the > parallel workers early, see below. > Yeah. It's a good question if this is really necessary. The number of signals we can send between processes is hot huge (like ~200k/s per process). I'd guess it might matter for data sets that fit into shared buffers, with very narrow rows. Or something like that. But as you say, better to move that discussion into a separate thread. >>> >>> OK. Once you have a WIP patch maybe share it and I'll try to do some >>> profiling too. >>> >>>>>> ... >>>>>> >>>>>> We would need something similar to CHECK_FOR_INTERRUPTS() which returns >>>>>> a NULL slot if a parallel worker is supposed to stop execution (we could >>>>>> e.g. check if the queue got detached). Or could we amend >>>>>> CHECK_FOR_INTERRUPTS() to just stop the worker gracefully if the queue >>>>>> got detached? >>>>>> >>>>> That sounds reasonable, but I'm not very familiar the leader-worker >>>>> communication, so no opinion on how it should be done. >>>> >>>> I think an extra macro that needs to be called from dozens of places to >>>> check if parallel execution is supposed to end is the least preferred >>>> approach. I'll read up more on how CHECK_FOR_INTERRUPTS() works and if >>>> we cannot actively signal the workers that they should stop. >>>> >>> >>> IMHO if this requires adding another macro to a bunch of ad hoc places >>> is rather inconvenient. It'd be much better to fix this in a localized >>> manner (especially as it seems related to a fairly specific place). > > I've written up a draft patch that instructs workers to stop, once the > leader has gotten enough rows according to the LIMIT clause. I'm using > SendProcSignal() to inform the workers to take action and stop executing > ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see > a good way of doing this differently which is not much more intrusive. > The patch is incomplete (comments, support for Gather Merge, more > testing, etc.) but I'm mostly interested at this point if the overall > approach is deemed fine. > > I first tried to use TerminateBackgroundWorker() but postmaster.c then > logs the worker termination and also some of the cleanup code needed for > proper instrumentation doesn't run any longer in the parallel workers. > > With the patch applied, the query from the first mail of this thread > runs in a few milliseconds. That it still takes that long is because > forking, plan (de-)serialization and remaining initialization are fairly > heavy weight. With threads, the "fork" time would already much lower and > no (de-)serialization would be necessary. In the process-based > architecture it would be interesting to think about adding a parallel > worker pool. > Unfortunately, I don't think this patch is the way to go. When I apply it, I get: ERROR: InstrEndLoop called on running node CONTEXT: parallel worker And I very much doubt inventing a new ad hoc way to signal workers is the right solution (even if there wasn't the InstrEndLoop issue). What I think we should do is much simpler - make the threshold in shm_mq dynamic, start with a very low value and gradually ramp up (up to 1/4). So we'd have if (mqh->mqh_consume_pending > threshold) We might start with threshold = (mq->mq_ring_size / 1024) or maybe some fixed value, list thredhold = 128 And on every signal we'd double it, capping it to 1/4 of mq_ring_size. threshold = Min(threshold * 2, mq->mq_ring_size / 1024); This is very similar to other places doing this gradual ramp up, like in the prefetching / read_stream, etc. It allows fast termination for low LIMIT values, but quickly amortizes the cost for high LIMIT values. regards -- Tomas Vondra
On 11/13/25 23:36, Tomas Vondra wrote:
> ...
>
> What I think we should do is much simpler - make the threshold in shm_mq
> dynamic, start with a very low value and gradually ramp up (up to 1/4).
> So we'd have
>
> if (mqh->mqh_consume_pending > threshold)
>
> We might start with
>
> threshold = (mq->mq_ring_size / 1024)
>
> or maybe some fixed value, list
>
> thredhold = 128
>
> And on every signal we'd double it, capping it to 1/4 of mq_ring_size.
>
> threshold = Min(threshold * 2, mq->mq_ring_size / 1024);
>
> This is very similar to other places doing this gradual ramp up, like in
> the prefetching / read_stream, etc. It allows fast termination for low
> LIMIT values, but quickly amortizes the cost for high LIMIT values.
>
I gave this a try today, to see if it can actually solve the regression.
Attached is a WIP patch, and a set of benchmarking scripts. On my ryzen
machine I got this (timings of the queries):
fill dataset | 14 15 16 17 18 patched
-----------------------------------------------------------------
10 random | 64.1 319.3 328.7 340.5 344.3 79.5
sequential | 54.6 323.4 347.5 350.5 399.2 78.3
100 random | 11.8 42.9 42.3 42.3 68.5 18.6
sequential | 10.0 44.3 45.0 44.3 60.6 20.0
Clearly 15 is a significant regression, with timings ~4x higher. And the
patch improves that quite a bit. It's not down all the way back to 14,
there's still ~10ms regression, for some reason.
Also, I didn't measure if this patch causes some other regressions for
other queries. I don't think it does, but maybe there's some weird
corner case affected.
regards
--
Tomas Vondra
Вложения
Hi Tomas! On 13.11.2025 23:36, Tomas Vondra wrote: > Sorry for not responding to this thread earlier. No worries. Thanks for looking at it! >>>> IMHO if this requires adding another macro to a bunch of ad hoc places >>>> is rather inconvenient. It'd be much better to fix this in a localized >>>> manner (especially as it seems related to a fairly specific place). >> >> I've written up a draft patch that instructs workers to stop, once the >> leader has gotten enough rows according to the LIMIT clause. I'm using >> SendProcSignal() to inform the workers to take action and stop executing >> ExecutePlan(). I've implemented the stopping via sigsetjmp. I cannot see >> a good way of doing this differently which is not much more intrusive. >> The patch is incomplete (comments, support for Gather Merge, more >> testing, etc.) but I'm mostly interested at this point if the overall >> approach is deemed fine. >> >> I first tried to use TerminateBackgroundWorker() but postmaster.c then >> logs the worker termination and also some of the cleanup code needed for >> proper instrumentation doesn't run any longer in the parallel workers. >> >> With the patch applied, the query from the first mail of this thread >> runs in a few milliseconds. That it still takes that long is because >> forking, plan (de-)serialization and remaining initialization are fairly >> heavy weight. With threads, the "fork" time would already much lower and >> no (de-)serialization would be necessary. In the process-based >> architecture it would be interesting to think about adding a parallel >> worker pool. >> > > Unfortunately, I don't think this patch is the way to go. When I apply > it, I get: > > ERROR: InstrEndLoop called on running node > CONTEXT: parallel worker > Ooops. That can likely be fixed. > And I very much doubt inventing a new ad hoc way to signal workers is > the right solution (even if there wasn't the InstrEndLoop issue). > > What I think we should do is much simpler - make the threshold in shm_mq > dynamic, start with a very low value and gradually ramp up (up to 1/4). > So we'd have > > if (mqh->mqh_consume_pending > threshold) > > We might start with > > threshold = (mq->mq_ring_size / 1024) > > or maybe some fixed value, list > > thredhold = 128 > > And on every signal we'd double it, capping it to 1/4 of mq_ring_size. > > threshold = Min(threshold * 2, mq->mq_ring_size / 1024); > > This is very similar to other places doing this gradual ramp up, like in > the prefetching / read_stream, etc. It allows fast termination for low > LIMIT values, but quickly amortizes the cost for high LIMIT values. That's a different problem though, isn't it? The original thread contained two problems: (1) signaling the queue to late and (2) workers stopping to late in the presence of LIMIT if they're not finding any rows in their portion of the data. Changing the queue thresholds is a solution for (1) but not for (2). For (2) we need a mechanism to instruct the parallel workers to stop when we find that other parallel workers have already produced enough rows to answer the query. An alternative mechanism that might work is using some stop_worker boolean in shared memory that we check in CHECK_FOR_INTERRUPTS(). stop_worker is set to true by the leader as soon as it has collected enough tuples. But then CHECK_FOR_INTERRUPTS() would have to have access to the parallel context, which might also be a bit ugly. -- David Geier
On 11/14/25 19:20, David Geier wrote: > Hi Tomas! > > On 13.11.2025 23:36, Tomas Vondra wrote: >> ... >> >> Unfortunately, I don't think this patch is the way to go. When I apply >> it, I get: >> >> ERROR: InstrEndLoop called on running node >> CONTEXT: parallel worker >> > > Ooops. That can likely be fixed. > >> And I very much doubt inventing a new ad hoc way to signal workers is >> the right solution (even if there wasn't the InstrEndLoop issue). >> >> What I think we should do is much simpler - make the threshold in shm_mq >> dynamic, start with a very low value and gradually ramp up (up to 1/4). >> So we'd have >> >> if (mqh->mqh_consume_pending > threshold) >> >> We might start with >> >> threshold = (mq->mq_ring_size / 1024) >> >> or maybe some fixed value, list >> >> thredhold = 128 >> >> And on every signal we'd double it, capping it to 1/4 of mq_ring_size. >> >> threshold = Min(threshold * 2, mq->mq_ring_size / 1024); >> >> This is very similar to other places doing this gradual ramp up, like in >> the prefetching / read_stream, etc. It allows fast termination for low >> LIMIT values, but quickly amortizes the cost for high LIMIT values. > > That's a different problem though, isn't it? The original thread > contained two problems: (1) signaling the queue to late and (2) workers > stopping to late in the presence of LIMIT if they're not finding any > rows in their portion of the data. > > Changing the queue thresholds is a solution for (1) but not for (2). For > (2) we need a mechanism to instruct the parallel workers to stop when we > find that other parallel workers have already produced enough rows to > answer the query. > Good point, I completely forgot about (2). > An alternative mechanism that might work is using some stop_worker > boolean in shared memory that we check in CHECK_FOR_INTERRUPTS(). > stop_worker is set to true by the leader as soon as it has collected > enough tuples. But then CHECK_FOR_INTERRUPTS() would have to have access > to the parallel context, which might also be a bit ugly. > Hmmm, yeah. We already do have shared state for the parallel scan. Do you think we could maybe integrate that into that? So the scan would just "finished" for all the workers ... regards -- Tomas Vondra
Hi Tomas! On 14.11.2025 17:00, Tomas Vondra wrote: > On 11/13/25 23:36, Tomas Vondra wrote: >> ... >> >> What I think we should do is much simpler - make the threshold in shm_mq >> dynamic, start with a very low value and gradually ramp up (up to 1/4). >> So we'd have >> >> if (mqh->mqh_consume_pending > threshold) >> >> We might start with >> >> threshold = (mq->mq_ring_size / 1024) >> >> or maybe some fixed value, list >> >> thredhold = 128 >> >> And on every signal we'd double it, capping it to 1/4 of mq_ring_size. >> >> threshold = Min(threshold * 2, mq->mq_ring_size / 1024); >> >> This is very similar to other places doing this gradual ramp up, like in >> the prefetching / read_stream, etc. It allows fast termination for low >> LIMIT values, but quickly amortizes the cost for high LIMIT values. >> > > I gave this a try today, to see if it can actually solve the regression. > Attached is a WIP patch, and a set of benchmarking scripts. On my ryzen > machine I got this (timings of the queries): > > fill dataset | 14 15 16 17 18 patched > ----------------------------------------------------------------- > 10 random | 64.1 319.3 328.7 340.5 344.3 79.5 > sequential | 54.6 323.4 347.5 350.5 399.2 78.3 > 100 random | 11.8 42.9 42.3 42.3 68.5 18.6 > sequential | 10.0 44.3 45.0 44.3 60.6 20.0 > > Clearly 15 is a significant regression, with timings ~4x higher. And the > patch improves that quite a bit. It's not down all the way back to 14, > there's still ~10ms regression, for some reason. > > Also, I didn't measure if this patch causes some other regressions for > other queries. I don't think it does, but maybe there's some weird > corner case affected. > > > regards > Thanks for working on that. This is certainly an improvement. It doesn't work always though. You can still get into the situation where enough data is waiting in the queues to satisfy the limit but the threshold hasn't been reached and also won't be reached anymore because no more rows will match. I'm especially passionate about that case because currently you can get arbitrarily bad query runtimes with big data sets and small LIMITs. As shared previously in this thread, I cannot reproduce any slowdown when deactivating the late latching. The test used a very narrow row (single INT) and the data set fit into shared memory. I've only tried with 8 parallel workers. Could you test if you can reproduce the slowdown, in case you have a machine with more cores at hand? If we can somehow reproduce the original problem, I would also like to check if there's not other issues at play that can be fixed differently (e.g. false sharing). If that optimization is truly necessary, how about always latching if a LIMIT clause is present? Or in the presence of a LIMIT clause, keeping the row count of totally produced rows in shared memory and latching in all workers once the LIMIT has been reached? The overhead of changing the shared atomic should be neglectable for reasonable LIMITs. Another alternative would be periodically latching. Given that the minimum runtime of any parallel query is a few dozen milliseconds due to forking and plan (de-)serialization, we could live with latching only say every millisecond or so. -- David Geier
Hi Tomas! On 15.11.2025 00:00, Tomas Vondra wrote: > On 11/14/25 19:20, David Geier wrote: >> >> Ooops. That can likely be fixed. >> I'll take a look at why this happens the next days, if you think this approach generally has a chance to be accepted. See below. >>> And I very much doubt inventing a new ad hoc way to signal workers is >>> the right solution (even if there wasn't the InstrEndLoop issue). >>> > > Good point, I completely forgot about (2). > In that light, could you take another look at my patch? Some clarifications: I'm not inventing a new way to signal workers but I'm using the existing SendProcSignal() machinery to inform parallel workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and the corresponding functions to handle it from ProcessInterrupts(). What is "new" is how I'm stopping the parallel workers once they've received the stop signal: the challenge is that the workers need to actually jump out of whatever they are doing - even if they aren't producing any rows at this point; but e.g. are scanning a table somewhere deep down in ExecScan() / SeqNext(). The only way I can see to make this work, without a huge patch that adds new code all over the place, is to instruct process termination from inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun() function so that all parallel worker cleanup code still runs as if the worker processed to completion. I've tried to end the process without but that caused all sorts of fallout (instrumentation not collected, postmaster thinking the process stopped unexpectedly, ...). Instead of siglongjmp-ing we could maybe call some parallel worker shutdown function but that would require access to the parallel worker state variables, which are currently not globally accessible. -- David Geier
On 11/18/25 15:06, David Geier wrote: > Hi Tomas! > > On 15.11.2025 00:00, Tomas Vondra wrote: >> On 11/14/25 19:20, David Geier wrote: >>> >>> Ooops. That can likely be fixed. >>> > > I'll take a look at why this happens the next days, if you think this > approach generally has a chance to be accepted. See below. > >>>> And I very much doubt inventing a new ad hoc way to signal workers is >>>> the right solution (even if there wasn't the InstrEndLoop issue). >>>> >> >> Good point, I completely forgot about (2). >> > > In that light, could you take another look at my patch? > > Some clarifications: I'm not inventing a new way to signal workers but > I'm using the existing SendProcSignal() machinery to inform parallel > workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and > the corresponding functions to handle it from ProcessInterrupts(). > Sure, but I still don't quite see the need to do all this. > What is "new" is how I'm stopping the parallel workers once they've > received the stop signal: the challenge is that the workers need to > actually jump out of whatever they are doing - even if they aren't > producing any rows at this point; but e.g. are scanning a table > somewhere deep down in ExecScan() / SeqNext(). > > The only way I can see to make this work, without a huge patch that adds > new code all over the place, is to instruct process termination from > inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun() > function so that all parallel worker cleanup code still runs as if the > worker processed to completion. I've tried to end the process without > but that caused all sorts of fallout (instrumentation not collected, > postmaster thinking the process stopped unexpectedly, ...). > > Instead of siglongjmp-ing we could maybe call some parallel worker > shutdown function but that would require access to the parallel worker > state variables, which are currently not globally accessible. > But why? The leader and workers already share state - the parallel scan state (for the parallel-aware scan on the "driving" table). Why couldn't the leader set a flag in the scan, and force it to end in workers? Which AFAICS should lead to workers terminating shortly after that. All the code / handling is already in place. It will need a bit of new code in the parallel scans, but but not much I think. regards -- Tomas Vondra
Hi Tomas! On 18.11.2025 15:59, Tomas Vondra wrote: >> >> Some clarifications: I'm not inventing a new way to signal workers but >> I'm using the existing SendProcSignal() machinery to inform parallel >> workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and >> the corresponding functions to handle it from ProcessInterrupts(). >> > > Sure, but I still don't quite see the need to do all this. > >> What is "new" is how I'm stopping the parallel workers once they've >> received the stop signal: the challenge is that the workers need to >> actually jump out of whatever they are doing - even if they aren't >> producing any rows at this point; but e.g. are scanning a table >> somewhere deep down in ExecScan() / SeqNext(). >> >> The only way I can see to make this work, without a huge patch that adds >> new code all over the place, is to instruct process termination from >> inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun() >> function so that all parallel worker cleanup code still runs as if the >> worker processed to completion. I've tried to end the process without >> but that caused all sorts of fallout (instrumentation not collected, >> postmaster thinking the process stopped unexpectedly, ...). >> >> Instead of siglongjmp-ing we could maybe call some parallel worker >> shutdown function but that would require access to the parallel worker >> state variables, which are currently not globally accessible. >> > > But why? The leader and workers already share state - the parallel scan > state (for the parallel-aware scan on the "driving" table). Why couldn't > the leader set a flag in the scan, and force it to end in workers? Which > AFAICS should lead to workers terminating shortly after that. > > All the code / handling is already in place. It will need a bit of new > code in the parallel scans, but but not much I think. > But this would only work for the SeqScan case, wouldn't it? The parallel worker might equally well be executing other code which doesn't produce tuples, such as parallel index scan, a big sort, building a hash table, etc. I thought this is not a viable solution because it would need changes in all these places. -- David Geier
On 11/18/25 16:07, David Geier wrote: > Hi Tomas! > > On 18.11.2025 15:59, Tomas Vondra wrote: >>> >>> Some clarifications: I'm not inventing a new way to signal workers but >>> I'm using the existing SendProcSignal() machinery to inform parallel >>> workers to stop. I just added another signal PROCSIG_PARALLEL_STOP and >>> the corresponding functions to handle it from ProcessInterrupts(). >>> >> >> Sure, but I still don't quite see the need to do all this. >> >>> What is "new" is how I'm stopping the parallel workers once they've >>> received the stop signal: the challenge is that the workers need to >>> actually jump out of whatever they are doing - even if they aren't >>> producing any rows at this point; but e.g. are scanning a table >>> somewhere deep down in ExecScan() / SeqNext(). >>> >>> The only way I can see to make this work, without a huge patch that adds >>> new code all over the place, is to instruct process termination from >>> inside ProcessInterrupts(). I'm siglongjmp-ing out of the ExecutorRun() >>> function so that all parallel worker cleanup code still runs as if the >>> worker processed to completion. I've tried to end the process without >>> but that caused all sorts of fallout (instrumentation not collected, >>> postmaster thinking the process stopped unexpectedly, ...). >>> >>> Instead of siglongjmp-ing we could maybe call some parallel worker >>> shutdown function but that would require access to the parallel worker >>> state variables, which are currently not globally accessible. >>> >> >> But why? The leader and workers already share state - the parallel scan >> state (for the parallel-aware scan on the "driving" table). Why couldn't >> the leader set a flag in the scan, and force it to end in workers? Which >> AFAICS should lead to workers terminating shortly after that. >> >> All the code / handling is already in place. It will need a bit of new >> code in the parallel scans, but but not much I think. >> > > But this would only work for the SeqScan case, wouldn't it? The parallel > worker might equally well be executing other code which doesn't produce > tuples, such as parallel index scan, a big sort, building a hash table, etc. > > I thought this is not a viable solution because it would need changes in > all these places. It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. I don't think you'd need code in other plans, because all parallel plans have one "driving" table. Maybe that's not enough, not sure. If we want to terminate "immediately" (and not when getting the next tuple from a scan on the driving table), we'd need a different solution. regards -- Tomas Vondra
On 18.11.2025 16:40, Tomas Vondra wrote: >>> >>> But why? The leader and workers already share state - the parallel scan >>> state (for the parallel-aware scan on the "driving" table). Why couldn't >>> the leader set a flag in the scan, and force it to end in workers? Which >>> AFAICS should lead to workers terminating shortly after that. >>> >>> All the code / handling is already in place. It will need a bit of new >>> code in the parallel scans, but but not much I think. >>> >> >> But this would only work for the SeqScan case, wouldn't it? The parallel >> worker might equally well be executing other code which doesn't produce >> tuples, such as parallel index scan, a big sort, building a hash table, etc. >> >> I thought this is not a viable solution because it would need changes in >> all these places. > > It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. > I don't think you'd need code in other plans, because all parallel plans > have one "driving" table. > > Maybe that's not enough, not sure. If we want to terminate "immediately" > (and not when getting the next tuple from a scan on the driving table), > we'd need a different solution. A sort node for example makes this no longer work. As soon as the sort node pulled all rows from its driving table, the sort node becomes the driving table for its parent nodes. If no more tables are involved in the plan from that point on, early termination no longer works. -- David Geier
David Geier <geidav.pg@gmail.com> writes:
> On 18.11.2025 16:40, Tomas Vondra wrote:
>> It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index.
>> I don't think you'd need code in other plans, because all parallel plans
>> have one "driving" table.
> A sort node for example makes this no longer work. As soon as the sort
> node pulled all rows from its driving table, the sort node becomes the
> driving table for its parent nodes. If no more tables are involved in
> the plan from that point on, early termination no longer works.
You're assuming that the planner will insert Gather nodes at arbitrary
places in the plan, which isn't true. If it does generate plans that
are problematic from this standpoint, maybe the answer is "don't
parallelize in exactly that way".
regards, tom lane
On 11/18/25 17:51, Tom Lane wrote: > David Geier <geidav.pg@gmail.com> writes: >> On 18.11.2025 16:40, Tomas Vondra wrote: >>> It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. >>> I don't think you'd need code in other plans, because all parallel plans >>> have one "driving" table. > >> A sort node for example makes this no longer work. As soon as the sort >> node pulled all rows from its driving table, the sort node becomes the >> driving table for its parent nodes. If no more tables are involved in >> the plan from that point on, early termination no longer works. > > You're assuming that the planner will insert Gather nodes at arbitrary > places in the plan, which isn't true. If it does generate plans that > are problematic from this standpoint, maybe the answer is "don't > parallelize in exactly that way". > I think David has a point that nodes that "buffer" tuples (like Sort or HashAgg) would break the approach making this the responsibility of the parallel-aware scan. I don't see anything particularly wrong with such plans - plans with partial aggregation often look like that. Maybe this should be the responsibility of execProcnode.c, not the various nodes? It'd be nice to show this in EXPLAIN (that some of the workers were terminated early, before processing all the data). regards -- Tomas Vondra
On 18.11.2025 18:31, Tomas Vondra wrote: > On 11/18/25 17:51, Tom Lane wrote: >> David Geier <geidav.pg@gmail.com> writes: >>> On 18.11.2025 16:40, Tomas Vondra wrote: >>>> It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. >>>> I don't think you'd need code in other plans, because all parallel plans >>>> have one "driving" table. >> >> You're assuming that the planner will insert Gather nodes at arbitrary >> places in the plan, which isn't true. If it does generate plans that >> are problematic from this standpoint, maybe the answer is "don't >> parallelize in exactly that way". >> > > I think David has a point that nodes that "buffer" tuples (like Sort or > HashAgg) would break the approach making this the responsibility of the > parallel-aware scan. I don't see anything particularly wrong with such > plans - plans with partial aggregation often look like that. > > Maybe this should be the responsibility of execProcnode.c, not the > various nodes? > I like that idea, even though it would still not work while a node is doing the crunching. That is after it has pulled all rows and before it can return the first row. During this time the node won't call ExecProcNode(). But that seems like an acceptable limitation. At least it keeps working above "buffer" nodes. I'll give this idea a try. Then we can contrast this approach with the approach in my initial patch. > It'd be nice to show this in EXPLAIN (that some of the workers were > terminated early, before processing all the data). Inspectability on that end seems useful. Maybe only with VERBOSE, similarly to the extended per-worker information. -- David Geier
On 11/18/25 19:35, David Geier wrote: > > On 18.11.2025 18:31, Tomas Vondra wrote: >> On 11/18/25 17:51, Tom Lane wrote: >>> David Geier <geidav.pg@gmail.com> writes: >>>> On 18.11.2025 16:40, Tomas Vondra wrote: >>>>> It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. >>>>> I don't think you'd need code in other plans, because all parallel plans >>>>> have one "driving" table. >>> >>> You're assuming that the planner will insert Gather nodes at arbitrary >>> places in the plan, which isn't true. If it does generate plans that >>> are problematic from this standpoint, maybe the answer is "don't >>> parallelize in exactly that way". >>> >> >> I think David has a point that nodes that "buffer" tuples (like Sort or >> HashAgg) would break the approach making this the responsibility of the >> parallel-aware scan. I don't see anything particularly wrong with such >> plans - plans with partial aggregation often look like that. >> >> Maybe this should be the responsibility of execProcnode.c, not the >> various nodes? >> > > I like that idea, even though it would still not work while a node is > doing the crunching. That is after it has pulled all rows and before it > can return the first row. During this time the node won't call > ExecProcNode(). > True. Perhaps we could provide a function nodes could call in suitable places to check whether to end? Actually, how does canceling queries with parallel workers work? Is that done similarly to what your patch did? > But that seems like an acceptable limitation. At least it keeps working > above "buffer" nodes. > > I'll give this idea a try. Then we can contrast this approach with the > approach in my initial patch. > >> It'd be nice to show this in EXPLAIN (that some of the workers were >> terminated early, before processing all the data). > > Inspectability on that end seems useful. Maybe only with VERBOSE, > similarly to the extended per-worker information. > Maybe, no opinion. But it probably needs to apply to all nodes in the parallel worker, right? Or maybe it's even a per-worker detail. regards -- Tomas Vondra
On 18.11.2025 20:37, Tomas Vondra wrote: > > > On 11/18/25 19:35, David Geier wrote: >> >> On 18.11.2025 18:31, Tomas Vondra wrote: >>> On 11/18/25 17:51, Tom Lane wrote: >>>> David Geier <geidav.pg@gmail.com> writes: >>>>> On 18.11.2025 16:40, Tomas Vondra wrote: >>>>>> It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. >>>>>> I don't think you'd need code in other plans, because all parallel plans >>>>>> have one "driving" table. >>>> >>>> You're assuming that the planner will insert Gather nodes at arbitrary >>>> places in the plan, which isn't true. If it does generate plans that >>>> are problematic from this standpoint, maybe the answer is "don't >>>> parallelize in exactly that way". >>>> >>> >>> I think David has a point that nodes that "buffer" tuples (like Sort or >>> HashAgg) would break the approach making this the responsibility of the >>> parallel-aware scan. I don't see anything particularly wrong with such >>> plans - plans with partial aggregation often look like that. >>> >>> Maybe this should be the responsibility of execProcnode.c, not the >>> various nodes? I hadn't realized that this approach has the same limitation: ExecProcNode() is only called when e.g. heap_nextslot() or index_getnext_slot() have found a qualifying tuple. Otherwise, they just keep crunching without returning. >> I like that idea, even though it would still not work while a node is >> doing the crunching. That is after it has pulled all rows and before it >> can return the first row. During this time the node won't call >> ExecProcNode(). >> > True. Perhaps we could provide a function nodes could call in suitable > places to check whether to end? This function would then also be required by the base relation scans. And we would have to call it more or less in the same places CHECK_FOR_INTERRUPTS() is called today. Beyond that, code such as heap_nextslot() or index_getnext_slot() don't have access to the PlanState which would contain the stop flag. So that would have to be propagated downwards as well. All of that would make for a fairly big patch, which the initial patch avoids. > > Actually, how does canceling queries with parallel workers work? Is that > done similarly to what your patch did? Parallel workers use the same mechanism as normal backends, except that parallel workers quit instead of waiting for the next query. The flow is as follows: parallel workers catch SIGINT via StatementCancelHandler() which sets QueryCancelPending = true. When ProcessInterrupts() is called the next time, it will elog(ERROR) out. BackgroundWorkerMain() will catch the error and proc_exit(). This mechanism is very similar to what I have in my patch, with the difference that (1) I use SendProcSignal() to inform the workers to stop and (2) that I added another sigsetjmp target around ExecutorRun() to be able to bail but still call all the shutdown code. (1) is necessary to be able to distinguish between query cancel and early stopping but not cancelling. (2) is necessary because the parallel shutdown code needs to be called before exiting. I tried to piggy back on the existing error handling mechanism by siglongjmp(*PG_exception_stack, 1) and there to not calling EmitErrorReport() if got_stopped == true. That gave me the following error: ERROR: lost connection to parallel worker >> Inspectability on that end seems useful. Maybe only with VERBOSE, >> similarly to the extended per-worker information. >> > > Maybe, no opinion. But it probably needs to apply to all nodes in the > parallel worker, right? Or maybe it's even a per-worker detail. I thought to make it a per-worker detail such as time or number of rows returned. Let's discuss this again, once we've settled on a solution. -- David Geier
On 11/19/25 13:28, David Geier wrote: > On 18.11.2025 20:37, Tomas Vondra wrote: >> >> >> On 11/18/25 19:35, David Geier wrote: >>> >>> On 18.11.2025 18:31, Tomas Vondra wrote: >>>> On 11/18/25 17:51, Tom Lane wrote: >>>>> David Geier <geidav.pg@gmail.com> writes: >>>>>> On 18.11.2025 16:40, Tomas Vondra wrote: >>>>>>> It'd need code in the parallel-aware scans, i.e. seqscan, bitmap, index. >>>>>>> I don't think you'd need code in other plans, because all parallel plans >>>>>>> have one "driving" table. >>>>> >>>>> You're assuming that the planner will insert Gather nodes at arbitrary >>>>> places in the plan, which isn't true. If it does generate plans that >>>>> are problematic from this standpoint, maybe the answer is "don't >>>>> parallelize in exactly that way". >>>>> >>>> >>>> I think David has a point that nodes that "buffer" tuples (like Sort or >>>> HashAgg) would break the approach making this the responsibility of the >>>> parallel-aware scan. I don't see anything particularly wrong with such >>>> plans - plans with partial aggregation often look like that. >>>> >>>> Maybe this should be the responsibility of execProcnode.c, not the >>>> various nodes? > I hadn't realized that this approach has the same limitation: > ExecProcNode() is only called when e.g. heap_nextslot() or > index_getnext_slot() have found a qualifying tuple. Otherwise, they just > keep crunching without returning. > Right, that's why I suggested to have a function the nodes would call in suitable places. >>> I like that idea, even though it would still not work while a node is >>> doing the crunching. That is after it has pulled all rows and before it >>> can return the first row. During this time the node won't call >>> ExecProcNode(). >>> >> True. Perhaps we could provide a function nodes could call in suitable >> places to check whether to end? > This function would then also be required by the base relation scans. > And we would have to call it more or less in the same places > CHECK_FOR_INTERRUPTS() is called today. > Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to manipulate the executor state. Maybe you could do some magic with siglongjmp(), but I have "funny" feeling about that - I wouldn't be surprised if that interfered with elog(), which is the only other place using siglongjmp() AFAICS. Which is why I suggested maybe it should be handled in execProcnode (which would take care of cases where we produce a tuple), and then let nodes to optionally check the flag too (through a new function). I haven't tried doing this, so maybe I'm missing something ... > Beyond that, code such as heap_nextslot() or index_getnext_slot() don't > have access to the PlanState which would contain the stop flag. So that > would have to be propagated downwards as well. > > All of that would make for a fairly big patch, which the initial patch > avoids. > Right. I don't think we can set the flag in plan/executor state, because that's not available in signal handler / ProcessInterrupts() ... It'd need to be a global variable, I guess. >> >> Actually, how does canceling queries with parallel workers work? Is that >> done similarly to what your patch did? > Parallel workers use the same mechanism as normal backends, except that > parallel workers quit instead of waiting for the next query. > > The flow is as follows: parallel workers catch SIGINT via > StatementCancelHandler() which sets QueryCancelPending = true. When > ProcessInterrupts() is called the next time, it will elog(ERROR) out. > BackgroundWorkerMain() will catch the error and proc_exit(). > > This mechanism is very similar to what I have in my patch, with the > difference that (1) I use SendProcSignal() to inform the workers to stop > and (2) that I added another sigsetjmp target around ExecutorRun() to be > able to bail but still call all the shutdown code. > OK > (1) is necessary to be able to distinguish between query cancel and > early stopping but not cancelling. > > (2) is necessary because the parallel shutdown code needs to be called > before exiting. I tried to piggy back on the existing error handling > mechanism by siglongjmp(*PG_exception_stack, 1) and there to not calling > EmitErrorReport() if got_stopped == true. That gave me the following error: > > ERROR: lost connection to parallel worker > Not sure. I have my doubts about using siglongjmp() for this. >>> Inspectability on that end seems useful. Maybe only with VERBOSE, >>> similarly to the extended per-worker information. >>> >> >> Maybe, no opinion. But it probably needs to apply to all nodes in the >> parallel worker, right? Or maybe it's even a per-worker detail. > I thought to make it a per-worker detail such as time or number of rows > returned. Let's discuss this again, once we've settled on a solution. > OK regards -- Tomas Vondra
On 19.11.2025 21:03, Tomas Vondra wrote: > Right, that's why I suggested to have a function the nodes would call in > suitable places. > >>>> I like that idea, even though it would still not work while a node is >>>> doing the crunching. That is after it has pulled all rows and before it >>>> can return the first row. During this time the node won't call >>>> ExecProcNode(). >>>> >>> True. Perhaps we could provide a function nodes could call in suitable >>> places to check whether to end? >> This function would then also be required by the base relation scans. >> And we would have to call it more or less in the same places >> CHECK_FOR_INTERRUPTS() is called today. >> > > Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to > manipulate the executor state. Maybe you could do some magic with > siglongjmp(), but I have "funny" feeling about that - I wouldn't be > surprised if that interfered with elog(), which is the only other place > using siglongjmp() AFAICS. You had the right intuition. siglongjmp-ing out leaves behind per-node instrumentation state and CurrentMemoryContext in an unexpected state. Example: jumping out of the executor, after we've called InstrStartNode(), but before we call InstrStopNode(). Subsequently calling InstrEndLoop() will give the error you encountered. A similar problem exists for CurrentMemoryContext which is not properly reset. I didn't encounter these issues during my testing because they're largely timing dependent. Execution can end before the other workers have started executing. So the stopping logic didn't kick in. Both issues can be accounted for when jumping out but this seems somewhat hacky. > Which is why I suggested maybe it should be handled in execProcnode > (which would take care of cases where we produce a tuple), and then let > nodes to optionally check the flag too (through a new function). > > I haven't tried doing this, so maybe I'm missing something ... > >> Beyond that, code such as heap_nextslot() or index_getnext_slot() don't >> have access to the PlanState which would contain the stop flag. So that >> would have to be propagated downwards as well. >> >> All of that would make for a fairly big patch, which the initial patch >> avoids. This turned out to be false. See below. > Right. I don't think we can set the flag in plan/executor state, because > that's not available in signal handler / ProcessInterrupts() ... It'd > need to be a global variable, I guess. What we can do is use a global variable. That also makes checking the flag a lot easier because we don't need to pass it around through multiple abstraction layers. What needs to be taken care of though is to only bail from scans that are actually initiated from plan nodes. There are many places in the code that use e.g. the table AM API directly. We don't want to bail from these scans. Without flagging if a scan should bail or not, e.g. ScanPgRelation() will return no tuple and therefore relcache code starts failing. The new patch accounts for that by introducing a new TableScanDescData flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should adhere to the parallel worker stop or not. Stopping is broadcasted to all parallel workers via SendProcSignal(). The stop variable is checked with the new CHECK_FOR_PARALLEL_WORKER_STOP() macro. In the PoC implementation I've for now only changed nodeSeqScan.c. If there's agreement to pursue this approach, I'll change the other places as well. Naming can also likely be still improved. -- David Geier
Вложения
On 26.11.2025 09:15, David Geier wrote: > What we can do is use a global variable. That also makes checking the > flag a lot easier because we don't need to pass it around through > multiple abstraction layers. > > What needs to be taken care of though is to only bail from scans that > are actually initiated from plan nodes. There are many places in the > code that use e.g. the table AM API directly. We don't want to bail from > these scans. Without flagging if a scan should bail or not, e.g. > ScanPgRelation() will return no tuple and therefore relcache code starts > failing. > > The new patch accounts for that by introducing a new TableScanDescData > flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should > adhere to the parallel worker stop or not. > > Stopping is broadcasted to all parallel workers via SendProcSignal(). > The stop variable is checked with the new > CHECK_FOR_PARALLEL_WORKER_STOP() macro. > > In the PoC implementation I've for now only changed nodeSeqScan.c. If > there's agreement to pursue this approach, I'll change the other places > as well. Naming can also likely be still improved. I missed attaching the example I used for testing. CREATE TABLE bar (col INT); INSERT INTO bar SELECT generate_series(1, 50000000); ANALYZE bar; SET parallel_leader_participation = OFF; SET synchronize_seqscans = OFF; EXPLAIN ANALYZE VERBOSE SELECT col FROM bar WHERE col = 1 LIMIT 1; I disabled sychronize_seqscans to make the test deterministic. I disabled parallel_leader_participation to make sure one of the workers finds the first row and quits. Running with parallel_leader_participation enabled revealed one more issue: if the leader finds the row, before the parallel workers have started up, the stop signal is lost and the workers don't stop early. Instead of SendProcSignal() we can store a flag in shared memory that indicates that the leader has already enough rows. I'll give this approach a try. -- David Geier
On 11/26/25 09:15, David Geier wrote: > On 19.11.2025 21:03, Tomas Vondra wrote: > >> Right, that's why I suggested to have a function the nodes would call in >> suitable places. >> >>>>> I like that idea, even though it would still not work while a node is >>>>> doing the crunching. That is after it has pulled all rows and before it >>>>> can return the first row. During this time the node won't call >>>>> ExecProcNode(). >>>>> >>>> True. Perhaps we could provide a function nodes could call in suitable >>>> places to check whether to end? >>> This function would then also be required by the base relation scans. >>> And we would have to call it more or less in the same places >>> CHECK_FOR_INTERRUPTS() is called today. >>> >> >> Yes, but I don't think CHECK_FOR_INTERRUPTS() would be a good place to >> manipulate the executor state. Maybe you could do some magic with >> siglongjmp(), but I have "funny" feeling about that - I wouldn't be >> surprised if that interfered with elog(), which is the only other place >> using siglongjmp() AFAICS. > You had the right intuition. siglongjmp-ing out leaves behind per-node > instrumentation state and CurrentMemoryContext in an unexpected state. > > Example: jumping out of the executor, after we've called > InstrStartNode(), but before we call InstrStopNode(). Subsequently > calling InstrEndLoop() will give the error you encountered. A similar > problem exists for CurrentMemoryContext which is not properly reset. > > I didn't encounter these issues during my testing because they're > largely timing dependent. Execution can end before the other workers > have started executing. So the stopping logic didn't kick in. > > Both issues can be accounted for when jumping out but this seems > somewhat hacky. > The question is if this are the only two such issues possible, and I'm afraid the answer is "no" :-( The question is if "exiting" from any place calling CFI leaves the execution state in a valid state. Valid enough so that we can call ExecEndNode() on all the nodes, including the one from which we exited. But I don't think we can rely on that. The node can do multiple steps, interleaved with CFI, not expecting that only one of them happens. I assume this would cause a lot of issues ... >> Which is why I suggested maybe it should be handled in execProcnode >> (which would take care of cases where we produce a tuple), and then let >> nodes to optionally check the flag too (through a new function). >> >> I haven't tried doing this, so maybe I'm missing something ... >> >>> Beyond that, code such as heap_nextslot() or index_getnext_slot() don't >>> have access to the PlanState which would contain the stop flag. So that >>> would have to be propagated downwards as well. >>> >>> All of that would make for a fairly big patch, which the initial patch >>> avoids. > This turned out to be false. See below. > >> Right. I don't think we can set the flag in plan/executor state, because >> that's not available in signal handler / ProcessInterrupts() ... It'd >> need to be a global variable, I guess. > What we can do is use a global variable. That also makes checking the > flag a lot easier because we don't need to pass it around through > multiple abstraction layers. > > What needs to be taken care of though is to only bail from scans that > are actually initiated from plan nodes. There are many places in the > code that use e.g. the table AM API directly. We don't want to bail from > these scans. Without flagging if a scan should bail or not, e.g. > ScanPgRelation() will return no tuple and therefore relcache code starts > failing. > > The new patch accounts for that by introducing a new TableScanDescData > flag SO_OBEY_PARALLEL_WORKER_STOP, which indicates if the scan should > adhere to the parallel worker stop or not. > > Stopping is broadcasted to all parallel workers via SendProcSignal(). > The stop variable is checked with the new > CHECK_FOR_PARALLEL_WORKER_STOP() macro. > > In the PoC implementation I've for now only changed nodeSeqScan.c. If > there's agreement to pursue this approach, I'll change the other places > as well. Naming can also likely be still improved. > This assumes we need to "exit" only from a heapam scan. That's true for the example, but is that enough in general? What if the worker already finished it's plan, and is now busy doing something else expensive? Could be a big sort, aggregation, ... Can we do something about these cases too? regards -- Tomas Vondra