Re: LISTEN, select/poll question/clarification

Поиск
Список
Период
Сортировка
От Israel Brewster
Тема Re: LISTEN, select/poll question/clarification
Дата
Msg-id 3FBF1AFC-920F-40F8-AF65-975EE21642C9@ravnalaska.net
обсуждение исходный текст
Ответ на Re: LISTEN, select/poll question/clarification  (Daniele Varrazzo <daniele.varrazzo@gmail.com>)
Список psycopg
On Dec 5, 2017, at 10:25 AM, Daniele Varrazzo <daniele.varrazzo@gmail.com> wrote:

On Tue, Dec 5, 2017 at 6:31 PM, Israel Brewster <israel@ravnalaska.net> wrote:
I am using the PostgreSQL PubSub feature with psycopg2 and gevent in the
following manner (dbconn is, of course, a psycopg2 connection object on
which LISTEN has been called):

while True:
   if gevent.select.select([dbconn], [], []) != ([], [], []):
       dbconn.poll()
       gevent.spawn(process_result,dbconn.notifies)



Which works fine. Now my understanding of how this all works is that when
dbconn.poll() is called, it should pull in *all* pending NOTIFYs and append
them to the dbconn.notifies object. So lets say I have NOTIFYs coming in at
a rate of 1 per second. I would think that if I added a "gevent.sleep(5)" to
the end of the above while loop, then each time through I should have 5
notifies in the dbconn.notifies list, since it has been 5 seconds since I
last checked. However, that doesn't appear to be the case - rather, no mater
how long a sleep I put in, I still only get *one* item in the notifies list,
making me think that I am missing data.

Uhm, from the top of my very rusty familiarity with gevent, I think
that gevent.select() is woken up as soon as the notify is received,
and that exactly because you have gevent.sleep(5) that is saying "I
don't have anything else to do for 5 seconds, so take a look if there
is any other fd that needs attention".

Hmmm, I'm not so sure about that. I would think that select would only wake up if the thread was actually on the select, that is it would block at select (while allowing other greenlets to run) until the NOTIFY was received, then move on. Once moved on, like when it gets to the sleep(), I wouldn't think it even know about the select any more. More to the point, I would think the sleep would pause the *entire* greenlet, in this case the while True loop (or, rather, the function containing said loop, but since that is the entire function, same diff) for the specified time before wakeup. I could certainly be wrong though, I'm no gevent expert :-)


   while True:
       if gevent.select.select([dbconn], [], []) != ([], [], []):
           dbconn.poll()
           gevent.spawn(process_result,dbconn.notifies)

Can someone explain why this is? Why am I not getting 5 at a time under that
scenario?

In case someone was wondering, the reasoning behind adding the sleep() lies
in that process_result function. Due to the looping it contains, it can
process, say, 5 data points in one call much more efficiently than it can
process those same 5 data points in 5 calls of 1 data point each. So rather
than run it every time a data point comes in, I would like to let the data
"pile up" as it were for a short period of time before processing. I was
thinking I could easily accomplish this by simply waiting some appropriate
period of time between poll() ing the database for new NOTIFYs, but
apparently that's not working. The select() is there because the data isn't
coming in at a regular speed like in my example, but rather it could come in
much faster, or not at all for a while, depending on the time of day.

At the most simple (and still very IIRC), you can just ignore select
and poll every 5 seconds, and you should receive the batches.

   while True:
       gevent.sleep(5)
       dbconn.poll()
       if dbconn.notifies:
           gevent.spawn(process_result,dbconn.notifies)

Conceptually sound, however the actual numbers I'm dealing with are somewhat faster (the sleep is more like .5 seconds, I just used these to make an easy example), so I like the idea of it not doing anything if no data is received (at night, for example, there will be little or no data for extended periods).


A bit more complex, you can do something at application level, such as
pushing your notifies in a queue as fast as they are received by
select(), and on the other side of the queue having a consumer stuck
on get(): as soon as it receives an object, the consumer can sleep for
5 seconds, then get_nowait() until the queue is empty to gather
everything received while it was napping, and call process_result()
with the whole set.

That seems to fit the bill nicely. Kinda seems like the same idea as what I was going for, but with a local queue rather than the "remote" select, so more predictable/controllable.

Thanks!



-- Daniele

В списке psycopg по дате отправления:

Предыдущее
От: Daniele Varrazzo
Дата:
Сообщение: Re: LISTEN, select/poll question/clarification
Следующее
От: Daniele Varrazzo
Дата:
Сообщение: Solving the problems with wheel packages