Re: PostgreSQL as advanced job queuing system

Поиск
Список
Период
Сортировка
От Peter J. Holzer
Тема Re: PostgreSQL as advanced job queuing system
Дата
Msg-id 20240322130624.s6wcjrmwmmkkdchu@hjp.at
обсуждение исходный текст
Ответ на PostgreSQL as advanced job queuing system  (ushi <ushi@mailbox.org>)
Список pgsql-general
On 2024-03-22 12:43:40 +0100, ushi wrote:
> i am playing with the idea to implement a job queuing system using
> PostgreSQL. To meet requirements the system needs to offer some advanced
> features compared to "classic" queuing systems:
>
> - users can create new queues at any time
> - queues have priorities
> - priorities of queues can change at any time
> - jobs in queues with the highest priority should be processed first
>
> A simple schema could look like this:
>
>    create table queues (
>      id integer primary key,
>      priority integer not null default 0
>    );
>
>    create table jobs (
>      id serial primary key,
>      queue_id integer not null references queues(id)
>    );
>
> Right now i am stuck writing an efficient query for dequeuing. This is what i got so far:
>
>    insert into queues (id, priority) values (1, 0), (2, 1), (3, 1);
>    insert into jobs (queue_id) select 1 from generate_series(1, 1000000);
>    insert into jobs (queue_id) select 2 from generate_series(1, 1000000);
>    insert into jobs (queue_id) select 3 from generate_series(1, 1000000);
>
[...]
>
> This is my naive query obeying priority. This is very slow and i could not get it any faster:
>
>    select *
>    from jobs
>    join queues on queues.id = jobs.queue_id
>    order by priority desc
>    limit 1
>    for update of jobs skip locked;
>    --    id    | queue_id | id | priority
>    -- ---------+----------+----+----------
>    --  1000001 |        2 |  2 |        1
>    -- (1 row)
>    --
>    -- Time: 1116.631 ms (00:01.117)
>
> Here is the query plan:
>
>   --                                                                QUERY PLAN
>   --
>
-----------------------------------------------------------------------------------------------------------------------------------------
>   --  Limit  (cost=66225.61..66225.63 rows=1 width=28) (actual time=1333.139..1333.142 rows=1 loops=1)
>   --    ->  LockRows  (cost=66225.61..103725.61 rows=3000000 width=28) (actual time=1333.137..1333.139 rows=1
loops=1)
>   --          ->  Sort  (cost=66225.61..73725.61 rows=3000000 width=28) (actual time=1333.110..1333.112 rows=1
loops=1)
>   --                Sort Key: queues.priority DESC
>   --                Sort Method: external merge  Disk: 111568kB
                                   ^^^^^^^^^^^^^^^^^^^^
                                   You might want to increase work_mem.
>   --                ->  Hash Join  (cost=60.85..51225.61 rows=3000000 width=28) (actual time=0.064..660.317
rows=3000000loops=1) 
[...]
>   --  Planning Time: 0.430 ms
>   --  Execution Time: 1347.448 ms
>   -- (13 rows)

Is 3 queues with 1 million jobs each a realistic scenario in your case?

If you have 3000 queues with 1000 jobs each instead the planner can use
an index on queues(priority):

hjp=> explain(analyze)   select *
   from jobs
   join queues on queues.id = jobs.queue_id
   order by priority desc, jobs.id asc
   limit 1
;

╔═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║                                                                         QUERY PLAN
                                     ║ 

╟─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║ Limit  (cost=1222.42..1222.47 rows=1 width=16) (actual time=9.128..9.129 rows=1 loops=1)
                                     ║ 
║   ->  Incremental Sort  (cost=1222.42..159673.21 rows=3000000 width=16) (actual time=9.127..9.127 rows=1 loops=1)
                                     ║ 
║         Sort Key: queues.priority DESC, jobs.id
                                     ║ 
║         Presorted Key: queues.priority
                                     ║ 
║         Full-sort Groups: 1  Sort Method: top-N heapsort  Average Memory: 25kB  Peak Memory: 25kB
                                     ║ 
║         Pre-sorted Groups: 1  Sort Method: top-N heapsort  Average Memory: 25kB  Peak Memory: 25kB
                                     ║ 
║         ->  Nested Loop  (cost=0.71..107171.21 rows=3000000 width=16) (actual time=0.037..5.824 rows=27001 loops=1)
                                     ║ 
║               ->  Index Scan Backward using queues_priority_idx on queues  (cost=0.28..121.21 rows=3000 width=8)
(actualtime=0.018..0.059 rows=28 loops=1) ║ 
║               ->  Index Only Scan using jobs_queue_id_id_idx on jobs  (cost=0.43..25.68 rows=1000 width=8) (actual
time=0.011..0.119rows=964 loops=28)     ║ 
║                     Index Cond: (queue_id = queues.id)
                                     ║ 
║                     Heap Fetches: 0
                                     ║ 
║ Planning Time: 0.362 ms
                                     ║ 
║ Execution Time: 9.158 ms
                                     ║ 

╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
(13 rows)

hjp=> \d queues
                 Table "public.queues"
╔══════════╤═════════╤═══════════╤══════════╤═════════╗
║  Column  │  Type   │ Collation │ Nullable │ Default ║
╟──────────┼─────────┼───────────┼──────────┼─────────╢
║ id       │ integer │           │ not null │         ║
║ priority │ integer │           │ not null │ 0       ║
╚══════════╧═════════╧═══════════╧══════════╧═════════╝
Indexes:
    "queues_pkey" PRIMARY KEY, btree (id)
    "queues_priority_idx" btree (priority)
Referenced by:
    TABLE "jobs" CONSTRAINT "jobs_queue_id_fkey" FOREIGN KEY (queue_id) REFERENCES queues(id)

hjp=> \d jobs
                              Table "public.jobs"
╔══════════╤═════════╤═══════════╤══════════╤══════════════════════════════════╗
║  Column  │  Type   │ Collation │ Nullable │             Default              ║
╟──────────┼─────────┼───────────┼──────────┼──────────────────────────────────╢
║ id       │ integer │           │ not null │ nextval('jobs_id_seq'::regclass) ║
║ queue_id │ integer │           │ not null │                                  ║
╚══════════╧═════════╧═══════════╧══════════╧══════════════════════════════════╝
Indexes:
    "jobs_pkey" PRIMARY KEY, btree (id)
    "jobs_queue_id_id_idx" btree (queue_id, id)
    "jobs_queue_id_idx" btree (queue_id)
Foreign-key constraints:
    "jobs_queue_id_fkey" FOREIGN KEY (queue_id) REFERENCES queues(id)


If you do have very few very long queues it might be faster to query
each queue separately.

        hp

--
   _  | Peter J. Holzer    | Story must make more sense than reality.
|_|_) |                    |
| |   | hjp@hjp.at         |    -- Charles Stross, "Creative writing
__/   | http://www.hjp.at/ |       challenge!"

Вложения

В списке pgsql-general по дате отправления:

Предыдущее
От: Laurenz Albe
Дата:
Сообщение: Re: uncommitted xmin 3100586 from before xid cutoff 10339367 needs to be frozen
Следующее
От: Fred Habash
Дата:
Сообщение: Re: PostgreSQL as advanced job queuing system