Re: LISTEN, select/poll question/clarification

Поиск
Список
Период
Сортировка
От Daniele Varrazzo
Тема Re: LISTEN, select/poll question/clarification
Дата
Msg-id CA+mi_8ak6F=V5gs8YDgLf2juCGE9ZM2nan3OL2Y2Rg7sAj0kjw@mail.gmail.com
обсуждение исходный текст
Ответ на LISTEN, select/poll question/clarification  (Israel Brewster <israel@ravnalaska.net>)
Ответы Re: LISTEN, select/poll question/clarification  (Israel Brewster <israel@ravnalaska.net>)
Список psycopg
On Tue, Dec 5, 2017 at 6:31 PM, Israel Brewster <israel@ravnalaska.net> wrote:
> I am using the PostgreSQL PubSub feature with psycopg2 and gevent in the
> following manner (dbconn is, of course, a psycopg2 connection object on
> which LISTEN has been called):
>
> while True:
>     if gevent.select.select([dbconn], [], []) != ([], [], []):
>         dbconn.poll()
>         gevent.spawn(process_result,dbconn.notifies)

>
>
> Which works fine. Now my understanding of how this all works is that when
> dbconn.poll() is called, it should pull in *all* pending NOTIFYs and append
> them to the dbconn.notifies object. So lets say I have NOTIFYs coming in at
> a rate of 1 per second. I would think that if I added a "gevent.sleep(5)" to
> the end of the above while loop, then each time through I should have 5
> notifies in the dbconn.notifies list, since it has been 5 seconds since I
> last checked. However, that doesn't appear to be the case - rather, no mater
> how long a sleep I put in, I still only get *one* item in the notifies list,
> making me think that I am missing data.

Uhm, from the top of my very rusty familiarity with gevent, I think
that gevent.select() is woken up as soon as the notify is received,
and that exactly because you have gevent.sleep(5) that is saying "I
don't have anything else to do for 5 seconds, so take a look if there
is any other fd that needs attention".

    while True:
        if gevent.select.select([dbconn], [], []) != ([], [], []):
            dbconn.poll()
            gevent.spawn(process_result,dbconn.notifies)
>
> Can someone explain why this is? Why am I not getting 5 at a time under that
> scenario?
>
> In case someone was wondering, the reasoning behind adding the sleep() lies
> in that process_result function. Due to the looping it contains, it can
> process, say, 5 data points in one call much more efficiently than it can
> process those same 5 data points in 5 calls of 1 data point each. So rather
> than run it every time a data point comes in, I would like to let the data
> "pile up" as it were for a short period of time before processing. I was
> thinking I could easily accomplish this by simply waiting some appropriate
> period of time between poll() ing the database for new NOTIFYs, but
> apparently that's not working. The select() is there because the data isn't
> coming in at a regular speed like in my example, but rather it could come in
> much faster, or not at all for a while, depending on the time of day.

At the most simple (and still very IIRC), you can just ignore select
and poll every 5 seconds, and you should receive the batches.

    while True:
        gevent.sleep(5)
        dbconn.poll()
        if dbconn.notifies:
            gevent.spawn(process_result,dbconn.notifies)

A bit more complex, you can do something at application level, such as
pushing your notifies in a queue as fast as they are received by
select(), and on the other side of the queue having a consumer stuck
on get(): as soon as it receives an object, the consumer can sleep for
5 seconds, then get_nowait() until the queue is empty to gather
everything received while it was napping, and call process_result()
with the whole set.


-- Daniele


В списке psycopg по дате отправления:

Предыдущее
От: Israel Brewster
Дата:
Сообщение: LISTEN, select/poll question/clarification
Следующее
От: Israel Brewster
Дата:
Сообщение: Re: LISTEN, select/poll question/clarification