Re: shared memory message queues

Поиск
Список
Период
Сортировка
От Robert Haas
Тема Re: shared memory message queues
Дата
Msg-id CA+TgmobVcsMTbSSFGkB8L7MfgeFODArZhRzvjabTxPUZ3quABQ@mail.gmail.com
обсуждение исходный текст
Ответ на Re: shared memory message queues  (Andres Freund <andres@2ndquadrant.com>)
Ответы Re: shared memory message queues
Список pgsql-hackers
On Fri, Dec 20, 2013 at 4:04 PM, Andres Freund <andres@2ndquadrant.com> wrote:
> * shm_mq_minimum_size should be const

OK.

> * the MAXALIGNing in shm_mq_create looks odd. We're computing
>   data_offset using MAXALIGN, determine the ring size using it, but then
>   set mq_ring_offset to data_offset - offsetof(shm_mq, mq_ring)?

So the goals are that the size of the ring should be a multiple of
MAXIMUM_ALIGNOF and that the start of the ring should be MAXALIGN'd.
To ensure that this will always be the case, we need to possibly throw
away a few bytes from the end (depending on whether the size is a nice
round number), so to do that we just set size = MAXALIGN_DOWN(size).

We might also need to throw away a few bytes from the beginning
(depending on whether offsetof(shm_mq, mq_ring) is a nice round
number).  We set data_offset to be the number of bytes from the
beginning of the chunk of memory (aka "address", aka the start of the
message queue header) to the beginning of the ring buffer.
mq->mq_ring_offset is the distance from mq->mq_ring to the beginning
of the ring buffer.  So mq->mq_ring_offset will be zero if
offsetof(shm_mq, mq_ring) is MAXALIGN'd, but if not mq->mq_ring_offset
will be set to the number of  bytes that we have to skip from
mq->mq_ring to get back to an MAXALIGN'd address.

> * same problem as the toc stuff with a dynamic number of spinnlocks.

Yeah. :-(

> * you don't seem to to initialize the spinlock anywhere. That's clearly
>   not ok, independent of the spinlock implementation.

Gah, I always forget to do that.

> * The comments around shm_mq/shm_mq_handle are a clear improvement. I'd
>   be pretty happy if you additionally add someting akin to 'This struct
>   describes the shared state of a queue' and 'Backend-local reference to
>   a queue'.

OK.

> * s/MQH_BUFSIZE/MQH_INITIAL_BUFSIZE/, I was already wondering whether
>   there's a limit on the max number of bytes.

OK.

> * I think shm_mq_attach()'s documentation should mention that we'll
>   initially and later allocate memory in the current
>   CurrentMemoryContext when attaching. That will likely require some
>   care for some callers. Yes, it's documented in a bigger comment
>   somewhere else, but still.

OK.

> * I don't really understand the mqh_consume_pending stuff. If we just
>   read a message spanning from the beginning to the end of the
>   ringbuffer, without wrapping, we might not confirm the read in
>   shm_mq_receive() until the next the it is called? Do I understand that
>   correctly?
> I am talking about the following and other similar pieces of code:
> +               if (rb >= needed)
> +               {
> +                       /*
> +                        * Technically, we could consume the message length information at
> +                        * this point, but the extra write to shared memory wouldn't be
> +                        * free and in most cases we would reap no benefit.
> +                        */
> +                       mqh->mqh_consume_pending = needed;
> +                       *nbytesp = nbytes;
> +                       *datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
> +                       return SHM_MQ_SUCCESS;
> +               }

To send a message, we pad it out to a MAXALIGN boundary and then
preface it with a MAXALIGN'd length word.  So, for example, suppose
that you decide to send "Hello."  That message is 6 characters, and
MAXALIGN is 8.  So we're going to consume 16 bytes of space in the
buffer: 8 bytes for the length word and then another 8 bytes for the
message, of which only the first 6 will actually contain valid data.

Now, when the receiver sees that message, he's going to return a
pointer *directly into the buffer* to the caller of shm_mq_receive().
He therefore cannot mark the whole message as read, because if he
does, the sender might overwrite it with new data before the caller
finishes examining it, which would be bad.  So what he does instead is
set mqh->mqh_consume_pending to the number of bytes of data that
should be acknowledged as read when the *next* call to
shm_mq_receive() is made.  By that time, the caller is required to
either be done with the previous message, or have copied it.

The point that the comment is making is that shm_mq_receive() is
actually returning a pointer to the message, not the length word that
proceeds it.  So while that function can't free the buffer space
containing the message to which it's returning a pointer, it *could*
free the buffer space used by the length word that precedes it.  In
other words, the message is consuming 16 bytes (8 for length word and
8 for payload) and the first 8 of those could be freed immediately,
without waiting for the next call.

The benefit of doing that is that it would allow those 8 bytes to be
used immediately by the sender.  But 8 bytes is not much, and if the
sender is blocked on a full queue, waking him up to only send 8 bytes
is kind of pathetic.

> * ISTM the messages around needing to use the same arguments for
>   send/receive again after SHM_MQ_WOULD_BLOCK could stand to be more
>   forceful. "In this case, the caller should call this function again
>   after the process latch has been set." doesn't make it all that clear
>   that failure to do so will corrupt the next message. Maybe there also
>   should be an assert checking that the size is the same when retrying
>   as when starting a partial read?

Well, I didn't want to say "must", because the caller could just give
up on the queue altogether.  I've added an additional note along those
lines to shm_mq_send().  shm_mq_receive() has no similar constraint;
the message size and payload are both out parameters for that
function.

In terms of an Assert, this would catch at least some attempts to
decrease the message size between one call and the next:

    /* Write the actual data bytes into the buffer. */
    Assert(mqh->mqh_partial_message_bytes <= nbytes);

I don't see what more than that we can check.  We could store nbytes
and data someplace just for the purpose of double-checking them, but
that seems like overkill to me.  This is a fairly common protocol for
non-blocking communication and if callers can't follow it, tough luck
for them.

> * You have some CHECK_FOR_INTERRUPTS(), are you sure the code is safe to
>   longjmp out from there in all the cases? E.g. I am not sure it is for
>   shm_mq_send_bytes(), when we've first written a partial message, done
>   a shm_mq_inc_bytes_written() and then go into the available == 0
>   branch and jump out.

Hmm.  I guess I was assuming that jumping out via an interrupt would
result in detaching from the queue, after which its state is moot.  I
agree that if somebody jumps out of a queue read or write and then
tries to use the queue thereafter, they are in for a world of hurt.
For my use cases, that doesn't matter, so I'm not excited about
changing it.  But being able to process interrupts there *is*
important to me, because for example the user backend might send a
query cancel to a worker stuck at that point.

> * Do I understand it correctly that mqh->mqh_counterparty_attached is
>   just sort of a cache, and we'll still detect a detached counterparty
>   when we're actually sending/receiving?

Yeah.  It's best to think of attaching and detaching as two separate
things.  Detach is something that can only happen after you've
attached in the first place.  That flag is checking whether we know
that the counterparty attached *at some point*, not whether they have
subsequently detached.  It becomes true when we first confirm that
they have attached and never afterward becomes false.

> One more thing:
> static uint64
> shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
> {
>         uint64  v;
>
>         SpinLockAcquire(&mq->mq_mutex);
>         v = mq->mq_bytes_written;
>         *detached = mq->mq_detached;
>         SpinLockRelease(&mq->mq_mutex);
>
>         return mq->mq_bytes_written;
> }
>
> Note how you're returning mq->mq_bytes_written instead of v.

Oh, dear.  That's rather embarrassing.

Incremental (incremental-shm-mq.patch) and full (shm-mq-v3.patch)
patches attached.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Peter Eisentraut
Дата:
Сообщение: Re: XML Issue with DTDs
Следующее
От: Amit Kapila
Дата:
Сообщение: Re: ALTER SYSTEM SET command to change postgresql.conf parameters