Обсуждение: easy task: concurrent select-updates
Hi All, I have a trivial task. There is a table with messages queue, let's say "msg_queue". There are a few processes and each of them is taking one message from this table at a time to transmit into communication channel. I've done it my way, but I have postgresql's messages about deadlocks and a lot of warnings. I my program, every process is doing approx the following procedure: SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1; if a message was found: BEGIN; SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE; UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND busy = false; COMMIT; I do understand that this way is stupid, but I have not came with anything else yet. Could somebody share ideas how to do this so the same message 100% WOULD NOT be transmitted over two or more channels. Sorry for the newbie question! Best regards, Nick.
one important addition: the message cannot be removed from queue table until it is transmitted, so DELETE is not an option :) > Hi All, > > I have a trivial task. There is a table with messages queue, let's say > "msg_queue". > There are a few processes and each of them is taking one message from > this table at a time to transmit into communication channel. > I've done it my way, but I have postgresql's messages about deadlocks > and a lot of warnings. > > I my program, every process is doing approx the following procedure: > SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1; > if a message was found: > BEGIN; > SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE; > UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND > busy = false; > COMMIT; > > > I do understand that this way is stupid, but I have not came with > anything else yet. > Could somebody share ideas how to do this so the same message 100% > WOULD NOT be transmitted over two or more channels. > Sorry for the newbie question! > > Best regards, Nick. >
Nickolay wrote:
> one important addition: the message cannot be removed from queue table
> until it is transmitted, so DELETE is not an option :)
>> Hi All,
>>
>> I have a trivial task. There is a table with messages queue, let's say
>> "msg_queue".
>> There are a few processes and each of them is taking one message from
>> this table at a time to transmit into communication channel.
>> I've done it my way, but I have postgresql's messages about deadlocks
>> and a lot of warnings.
>>
>> I my program, every process is doing approx the following procedure:
>> SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1;
>> if a message was found:
>> BEGIN;
>> SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE;
>> UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND
>> busy = false;
>> COMMIT;
>>
>>
>> I do understand that this way is stupid, but I have not came with
>> anything else yet.
>> Could somebody share ideas how to do this so the same message 100%
>> WOULD NOT be transmitted over two or more channels.
>> Sorry for the newbie question!
>>
>> Best regards, Nick.
>>
>
>
how about this:
andy=# create table msg (id integer, busy boolean, message text);
CREATE TABLE
andy=# insert into msg values (1, false, 'message one');
INSERT 0 1
andy=# insert into msg values (2, false, 'message two');
INSERT 0 1
CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql
AS $function$
declare
rec record;
begin
for rec in select id from msg where busy = false order by id loop
update msg set busy = true where id = rec.id and busy = false;
if found then
return rec.id;
end if;
end loop;
return -1;
end;
$function$
It returns -1 if no message found. Not 100% sure, but a quick two session test seemed to work.
-Andy
> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql > AS $function$ > declare > rec record; > begin > for rec in select id from msg where busy = false order by id loop > update msg set busy = true where id = rec.id and busy = false; > if found then > return rec.id; > end if; > end loop; > return -1; > end; > $function$ I think you could also do something roughly similar in a statement by using a RETURNING clause on the update, such as: update msg set busy = true where id = (select min(id) from msg where busy = false) returning *; Cheers, Kevin
Kevin McConnell wrote: >> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql >> AS $function$ >> declare >> rec record; >> begin >> for rec in select id from msg where busy = false order by id loop >> update msg set busy = true where id = rec.id and busy = false; >> if found then >> return rec.id; >> end if; >> end loop; >> return -1; >> end; >> $function$ > > I think you could also do something roughly similar in a statement by > using a RETURNING clause on the update, such as: > > update msg set busy = true where id = (select min(id) from msg where > busy = false) returning *; > > Cheers, > Kevin > I had thought of that, but you'd need to add one thing, in the update ' and busy = false ', cuz two people may get the sameid from the select min(id). update msg set busy = true where busy = false and id = (select min(id) from msg where busy = false) returning *; but then you'd have to fire it over-and-over until you actually got a row updated. Seemed easer to put the loop in function, then you can: select id from getmsg(); -Andy
Kevin McConnell wrote: >> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql >> AS $function$ >> declare >> rec record; >> begin >> for rec in select id from msg where busy =alse order by id loop >> update msg set busy =rue where id = rec.id and busy = false; >> if found then >> return rec.id; >> end if; >> end loop; >> return -1; >> end; >> $function$ >> > > I think you could also do something roughly similar in a statement by > using a RETURNING clause on the update, such as: > > update msg set busy =rue where id = (select min(id) from msg where > busy =alse) returning *; > > Cheers, > Kevin > Thank you guys! But what's min(id) for? Is it neccessary? Is there any chance I can replace min(id) to LIMIT 1? Best regards, Nick.
Andy Colson wrote:
> Kevin McConnell wrote:
>> I think you could also do something roughly similar in a statement by
>> using a RETURNING clause on the update, such as:
>>
>> update msg set busy = true where id = (select min(id) from msg where
>> busy = false) returning *;
>>
>
> I had thought of that, but you'd need to add one thing, in the update
> ' and busy = false ', cuz two people may get the same id from the
> select min(id).
>
> update msg set busy = true where busy = false and id = (select min(id)
> from msg where busy = false) returning *;
>
> but then you'd have to fire it over-and-over until you actually got a
> row updated.
>
> Seemed easer to put the loop in function, then you can:
>
> select id from getmsg();
>
Thanks a lot for your solution! It works great for now.
Here is the thing I did following your advice:
CREATE TYPE queued_msg_row AS
(id bigint
,sender character varying
,"text" text
...
,msg_type integer);
CREATE OR REPLACE FUNCTION public.get_queued_msg
(_route_id integer
,_channel_id integer)
RETURNS queued_msg_row LANGUAGE plpgsql
AS $function$
declare
rec queued_msg_row;
begin
for rec in SELECT id,sender,"text", ... , msg_type
FROM msg_queue WHERE busy=false AND route_id=_route_id
ORDER BY priority DESC, date_time ASC LIMIT 10 loop
UPDATE msg_queue SET busy=true, channel_id=_channel_id WHERE id =
rec.id AND busy=false;
if found then
return rec;
end if;
end loop;
return NULL;
end;
$function$
The only problem that remains is that this function returns an empty row
when it should return NULL (no row), but that's not a critical issue.
Best regards, Nick.
Nickolay wrote: > Kevin McConnell wrote: >>> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE >>> plpgsql >>> AS $function$ >>> declare >>> rec record; >>> begin >>> for rec in select id from msg where busy =alse order by id loop >>> update msg set busy =rue where id = rec.id and busy = >>> false; >>> if found then >>> return rec.id; >>> end if; >>> end loop; >>> return -1; >>> end; >>> $function$ >>> >> >> I think you could also do something roughly similar in a statement by >> using a RETURNING clause on the update, such as: >> >> update msg set busy =rue where id = (select min(id) from msg where >> busy =alse) returning *; >> >> Cheers, >> Kevin >> > > Thank you guys! But what's min(id) for? Is it neccessary? Is there any > chance I can replace min(id) to LIMIT 1? > > Best regards, Nick. > min(id) finds the smallest id in the table. We made the assumption that you wanted to get the messages out order by id fromsmallest to largest. LIMIT 1 would be ok if you didnt care what order the messages were processed in. -Andy