Обсуждение: easy task: concurrent select-updates
Hi All, I have a trivial task. There is a table with messages queue, let's say "msg_queue". There are a few processes and each of them is taking one message from this table at a time to transmit into communication channel. I've done it my way, but I have postgresql's messages about deadlocks and a lot of warnings. I my program, every process is doing approx the following procedure: SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1; if a message was found: BEGIN; SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE; UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND busy = false; COMMIT; I do understand that this way is stupid, but I have not came with anything else yet. Could somebody share ideas how to do this so the same message 100% WOULD NOT be transmitted over two or more channels. Sorry for the newbie question! Best regards, Nick.
one important addition: the message cannot be removed from queue table until it is transmitted, so DELETE is not an option :) > Hi All, > > I have a trivial task. There is a table with messages queue, let's say > "msg_queue". > There are a few processes and each of them is taking one message from > this table at a time to transmit into communication channel. > I've done it my way, but I have postgresql's messages about deadlocks > and a lot of warnings. > > I my program, every process is doing approx the following procedure: > SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1; > if a message was found: > BEGIN; > SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE; > UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND > busy = false; > COMMIT; > > > I do understand that this way is stupid, but I have not came with > anything else yet. > Could somebody share ideas how to do this so the same message 100% > WOULD NOT be transmitted over two or more channels. > Sorry for the newbie question! > > Best regards, Nick. >
Nickolay wrote: > one important addition: the message cannot be removed from queue table > until it is transmitted, so DELETE is not an option :) >> Hi All, >> >> I have a trivial task. There is a table with messages queue, let's say >> "msg_queue". >> There are a few processes and each of them is taking one message from >> this table at a time to transmit into communication channel. >> I've done it my way, but I have postgresql's messages about deadlocks >> and a lot of warnings. >> >> I my program, every process is doing approx the following procedure: >> SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1; >> if a message was found: >> BEGIN; >> SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE; >> UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND >> busy = false; >> COMMIT; >> >> >> I do understand that this way is stupid, but I have not came with >> anything else yet. >> Could somebody share ideas how to do this so the same message 100% >> WOULD NOT be transmitted over two or more channels. >> Sorry for the newbie question! >> >> Best regards, Nick. >> > > how about this: andy=# create table msg (id integer, busy boolean, message text); CREATE TABLE andy=# insert into msg values (1, false, 'message one'); INSERT 0 1 andy=# insert into msg values (2, false, 'message two'); INSERT 0 1 CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql AS $function$ declare rec record; begin for rec in select id from msg where busy = false order by id loop update msg set busy = true where id = rec.id and busy = false; if found then return rec.id; end if; end loop; return -1; end; $function$ It returns -1 if no message found. Not 100% sure, but a quick two session test seemed to work. -Andy
> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql > AS $function$ > declare > rec record; > begin > for rec in select id from msg where busy = false order by id loop > update msg set busy = true where id = rec.id and busy = false; > if found then > return rec.id; > end if; > end loop; > return -1; > end; > $function$ I think you could also do something roughly similar in a statement by using a RETURNING clause on the update, such as: update msg set busy = true where id = (select min(id) from msg where busy = false) returning *; Cheers, Kevin
Kevin McConnell wrote: >> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql >> AS $function$ >> declare >> rec record; >> begin >> for rec in select id from msg where busy = false order by id loop >> update msg set busy = true where id = rec.id and busy = false; >> if found then >> return rec.id; >> end if; >> end loop; >> return -1; >> end; >> $function$ > > I think you could also do something roughly similar in a statement by > using a RETURNING clause on the update, such as: > > update msg set busy = true where id = (select min(id) from msg where > busy = false) returning *; > > Cheers, > Kevin > I had thought of that, but you'd need to add one thing, in the update ' and busy = false ', cuz two people may get the sameid from the select min(id). update msg set busy = true where busy = false and id = (select min(id) from msg where busy = false) returning *; but then you'd have to fire it over-and-over until you actually got a row updated. Seemed easer to put the loop in function, then you can: select id from getmsg(); -Andy
Kevin McConnell wrote: >> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql >> AS $function$ >> declare >> rec record; >> begin >> for rec in select id from msg where busy =alse order by id loop >> update msg set busy =rue where id = rec.id and busy = false; >> if found then >> return rec.id; >> end if; >> end loop; >> return -1; >> end; >> $function$ >> > > I think you could also do something roughly similar in a statement by > using a RETURNING clause on the update, such as: > > update msg set busy =rue where id = (select min(id) from msg where > busy =alse) returning *; > > Cheers, > Kevin > Thank you guys! But what's min(id) for? Is it neccessary? Is there any chance I can replace min(id) to LIMIT 1? Best regards, Nick.
Andy Colson wrote: > Kevin McConnell wrote: >> I think you could also do something roughly similar in a statement by >> using a RETURNING clause on the update, such as: >> >> update msg set busy = true where id = (select min(id) from msg where >> busy = false) returning *; >> > > I had thought of that, but you'd need to add one thing, in the update > ' and busy = false ', cuz two people may get the same id from the > select min(id). > > update msg set busy = true where busy = false and id = (select min(id) > from msg where busy = false) returning *; > > but then you'd have to fire it over-and-over until you actually got a > row updated. > > Seemed easer to put the loop in function, then you can: > > select id from getmsg(); > Thanks a lot for your solution! It works great for now. Here is the thing I did following your advice: CREATE TYPE queued_msg_row AS (id bigint ,sender character varying ,"text" text ... ,msg_type integer); CREATE OR REPLACE FUNCTION public.get_queued_msg (_route_id integer ,_channel_id integer) RETURNS queued_msg_row LANGUAGE plpgsql AS $function$ declare rec queued_msg_row; begin for rec in SELECT id,sender,"text", ... , msg_type FROM msg_queue WHERE busy=false AND route_id=_route_id ORDER BY priority DESC, date_time ASC LIMIT 10 loop UPDATE msg_queue SET busy=true, channel_id=_channel_id WHERE id = rec.id AND busy=false; if found then return rec; end if; end loop; return NULL; end; $function$ The only problem that remains is that this function returns an empty row when it should return NULL (no row), but that's not a critical issue. Best regards, Nick.
Nickolay wrote: > Kevin McConnell wrote: >>> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE >>> plpgsql >>> AS $function$ >>> declare >>> rec record; >>> begin >>> for rec in select id from msg where busy =alse order by id loop >>> update msg set busy =rue where id = rec.id and busy = >>> false; >>> if found then >>> return rec.id; >>> end if; >>> end loop; >>> return -1; >>> end; >>> $function$ >>> >> >> I think you could also do something roughly similar in a statement by >> using a RETURNING clause on the update, such as: >> >> update msg set busy =rue where id = (select min(id) from msg where >> busy =alse) returning *; >> >> Cheers, >> Kevin >> > > Thank you guys! But what's min(id) for? Is it neccessary? Is there any > chance I can replace min(id) to LIMIT 1? > > Best regards, Nick. > min(id) finds the smallest id in the table. We made the assumption that you wanted to get the messages out order by id fromsmallest to largest. LIMIT 1 would be ok if you didnt care what order the messages were processed in. -Andy