Обсуждение: easy task: concurrent select-updates

Поиск
Список
Период
Сортировка

easy task: concurrent select-updates

От
Nickolay
Дата:
Hi All,

I have a trivial task. There is a table with messages queue, let's say
"msg_queue".
There are a few processes and each of them is taking one message from
this table at a time to transmit into communication channel.
I've done it my way, but I have postgresql's messages about deadlocks
and a lot of warnings.

I my program, every process is doing approx the following procedure:
SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1;
if a message was found:
BEGIN;
SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE;
UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND
busy = false;
COMMIT;


I do understand that this way is stupid, but I have not came with
anything else yet.
Could somebody share ideas how to do this so the same message 100% WOULD
NOT be transmitted over two or more channels.
Sorry for the newbie question!

Best regards, Nick.

Re: easy task: concurrent select-updates

От
Nickolay
Дата:
one important addition: the message cannot be removed from queue table
until it is transmitted, so DELETE is not an option :)
> Hi All,
>
> I have a trivial task. There is a table with messages queue, let's say
> "msg_queue".
> There are a few processes and each of them is taking one message from
> this table at a time to transmit into communication channel.
> I've done it my way, but I have postgresql's messages about deadlocks
> and a lot of warnings.
>
> I my program, every process is doing approx the following procedure:
> SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1;
> if a message was found:
> BEGIN;
> SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE;
> UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND
> busy = false;
> COMMIT;
>
>
> I do understand that this way is stupid, but I have not came with
> anything else yet.
> Could somebody share ideas how to do this so the same message 100%
> WOULD NOT be transmitted over two or more channels.
> Sorry for the newbie question!
>
> Best regards, Nick.
>


Re: easy task: concurrent select-updates

От
Andy Colson
Дата:
Nickolay wrote:
> one important addition: the message cannot be removed from queue table
> until it is transmitted, so DELETE is not an option :)
>> Hi All,
>>
>> I have a trivial task. There is a table with messages queue, let's say
>> "msg_queue".
>> There are a few processes and each of them is taking one message from
>> this table at a time to transmit into communication channel.
>> I've done it my way, but I have postgresql's messages about deadlocks
>> and a lot of warnings.
>>
>> I my program, every process is doing approx the following procedure:
>> SELECT ... FROM msg_queue WHERE busy = false ORDER BY ... LIMIT 1;
>> if a message was found:
>> BEGIN;
>> SELECT id FROM msg_queue WHERE id = ... AND busy = false FOR SHARE;
>> UPDATE msg_queue SET busy = true, channel_id = ... WHERE id = ... AND
>> busy = false;
>> COMMIT;
>>
>>
>> I do understand that this way is stupid, but I have not came with
>> anything else yet.
>> Could somebody share ideas how to do this so the same message 100%
>> WOULD NOT be transmitted over two or more channels.
>> Sorry for the newbie question!
>>
>> Best regards, Nick.
>>
>
>


how about this:

andy=# create table msg (id integer, busy boolean, message text);
CREATE TABLE
andy=# insert into msg values (1, false, 'message one');
INSERT 0 1
andy=# insert into msg values (2, false, 'message two');
INSERT 0 1


CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql
AS $function$
declare
        rec record;
begin
        for rec in select id from msg where busy = false order by id loop
                update msg set busy = true where id = rec.id and busy = false;
                if found then
                        return rec.id;
                end if;
        end loop;
        return -1;
end;
$function$



It returns -1 if no message found.  Not 100% sure, but a quick two session test seemed to work.

-Andy


Re: easy task: concurrent select-updates

От
Kevin McConnell
Дата:
> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql
> AS $function$
> declare
>       rec record;
> begin
>       for rec in select id from msg where busy = false order by id loop
>               update msg set busy = true where id = rec.id and busy = false;
>               if found then
>                       return rec.id;
>               end if;
>       end loop;
>       return -1;
> end;
> $function$

I think you could also do something roughly similar in a statement by
using a RETURNING clause on the update, such as:

  update msg set busy = true where id = (select min(id) from msg where
busy = false) returning *;

Cheers,
Kevin

Re: easy task: concurrent select-updates

От
Andy Colson
Дата:
Kevin McConnell wrote:
>> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql
>> AS $function$
>> declare
>>       rec record;
>> begin
>>       for rec in select id from msg where busy = false order by id loop
>>               update msg set busy = true where id = rec.id and busy = false;
>>               if found then
>>                       return rec.id;
>>               end if;
>>       end loop;
>>       return -1;
>> end;
>> $function$
>
> I think you could also do something roughly similar in a statement by
> using a RETURNING clause on the update, such as:
>
>   update msg set busy = true where id = (select min(id) from msg where
> busy = false) returning *;
>
> Cheers,
> Kevin
>

I had thought of that, but you'd need to add one thing, in the update ' and busy = false ', cuz two people may get the
sameid from the select min(id). 

update msg set busy = true where busy = false and id = (select min(id) from msg where busy = false) returning *;

but then you'd have to fire it over-and-over until you actually got a row updated.

Seemed easer to put the loop in function, then you can:

select id from getmsg();

-Andy

Re: easy task: concurrent select-updates

От
Nickolay
Дата:
Kevin McConnell wrote:
>> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE plpgsql
>> AS $function$
>> declare
>>       rec record;
>> begin
>>       for rec in select id from msg where busy =alse order by id loop
>>               update msg set busy =rue where id = rec.id and busy = false;
>>               if found then
>>                       return rec.id;
>>               end if;
>>       end loop;
>>       return -1;
>> end;
>> $function$
>>
>
> I think you could also do something roughly similar in a statement by
> using a RETURNING clause on the update, such as:
>
>   update msg set busy =rue where id = (select min(id) from msg where
> busy =alse) returning *;
>
> Cheers,
> Kevin
>

Thank you guys! But what's min(id) for? Is it neccessary? Is there any
chance I can replace min(id) to LIMIT 1?

Best regards, Nick.

Re: easy task: concurrent select-updates

От
Nickolay
Дата:
Andy Colson wrote:
> Kevin McConnell wrote:
>> I think you could also do something roughly similar in a statement by
>> using a RETURNING clause on the update, such as:
>>
>>   update msg set busy = true where id = (select min(id) from msg where
>> busy = false) returning *;
>>
>
> I had thought of that, but you'd need to add one thing, in the update
> ' and busy = false ', cuz two people may get the same id from the
> select min(id).
>
> update msg set busy = true where busy = false and id = (select min(id)
> from msg where busy = false) returning *;
>
> but then you'd have to fire it over-and-over until you actually got a
> row updated.
>
> Seemed easer to put the loop in function, then you can:
>
> select id from getmsg();
>


Thanks a lot for your solution! It works great for now.
Here is the thing I did following your advice:

CREATE TYPE queued_msg_row AS
    (id bigint
    ,sender character varying
    ,"text" text
    ...
    ,msg_type integer);

CREATE OR REPLACE FUNCTION public.get_queued_msg
(_route_id    integer
,_channel_id  integer)
RETURNS queued_msg_row LANGUAGE plpgsql
AS $function$
declare
      rec queued_msg_row;
begin
  for rec in SELECT id,sender,"text", ... , msg_type
                    FROM msg_queue WHERE busy=false AND route_id=_route_id
                    ORDER BY priority DESC, date_time ASC LIMIT 10 loop
    UPDATE msg_queue SET busy=true, channel_id=_channel_id WHERE id =
rec.id AND busy=false;
    if found then
      return rec;
    end if;
  end loop;
  return NULL;
end;
$function$

The only problem that remains is that this function returns an empty row
when it should return NULL (no row), but that's not a critical issue.

Best regards, Nick.

Re: easy task: concurrent select-updates

От
Andy Colson
Дата:
Nickolay wrote:
> Kevin McConnell wrote:
>>> CREATE OR REPLACE FUNCTION public.getmsg() RETURNS integer LANGUAGE
>>> plpgsql
>>> AS $function$
>>> declare
>>>       rec record;
>>> begin
>>>       for rec in select id from msg where busy =alse order by id loop
>>>               update msg set busy =rue where id = rec.id and busy =
>>> false;
>>>               if found then
>>>                       return rec.id;
>>>               end if;
>>>       end loop;
>>>       return -1;
>>> end;
>>> $function$
>>>
>>
>> I think you could also do something roughly similar in a statement by
>> using a RETURNING clause on the update, such as:
>>
>>   update msg set busy =rue where id = (select min(id) from msg where
>> busy =alse) returning *;
>>
>> Cheers,
>> Kevin
>>
>
> Thank you guys! But what's min(id) for? Is it neccessary? Is there any
> chance I can replace min(id) to LIMIT 1?
>
> Best regards, Nick.
>

min(id) finds the smallest id in the table.  We made the assumption that you wanted to get the messages out order by id
fromsmallest to largest. 

LIMIT 1 would be ok if you didnt care what order the messages were processed in.

-Andy