Обсуждение: Proposal: Generic WAL logical messages

Поиск
Список
Период
Сортировка

Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Hello,

I would like to introduce concept of generic WAL logical messages.

These messages just create WAL record with arbitrary data as specified
by user. For standard WAL reply, these are basically noop, but in
logical decoding they are be decoded and the special callback of the
output plugin is be called for them.

These messages can be both transactional (decoded on commit) or
non-transactional (decoded immediately). Each message has prefix to
differentiate between individual plugins. The prefix has to be
registered exactly once (in similar manner as security label providers)
to avoid conflicts between plugins.

There are three main use-cases for these:
a) reliable communication between two nodes in multi-master setup - WAL
already handles correctly the recovery etc so we don't have to invent
new complex code to handle crashes in middle of operations

b) out of order messaging in logical replication - if you want to send
something to other node immediately without having to wait for commit
for example to acquire remote lock, this can be used for that

c) "queue tables" - combination of this feature (there is SQL interface)
and before triggers can be used to create tables for which all inserts
only go to the WAL so they can behave like queue without having to store
the data in the table itself (kind of the opposite of unlogged tables)

Initial implementation of this proposal is attached.

--
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Alvaro Herrera
Дата:
Petr Jelinek wrote:

> These messages just create WAL record with arbitrary data as specified by
> user. For standard WAL reply, these are basically noop, but in logical
> decoding they are be decoded and the special callback of the output plugin
> is be called for them.

I had a quick look at this and I couldn't find anything worthy of
comment -- seems a reasonably straightforward addition to stuff that's
mostly already there.

Process-wise, I don't understand why is Andres mentioned as co-author.
Did he actually wrote part of the patch, or advised on the design?
Who is reviewing the patch?

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
> Process-wise, I don't understand why is Andres mentioned as co-author.
> Did he actually wrote part of the patch, or advised on the design?
> Who is reviewing the patch?

It's extracted & extended from BDR, where I added that feature (to
implement distributed locking).



Re: Proposal: Generic WAL logical messages

От
Simon Riggs
Дата:
On 1 January 2016 at 03:59, Petr Jelinek <petr@2ndquadrant.com> wrote:
 
I would like to introduce concept of generic WAL logical messages.

Couple of points...

* Genenric misspelled

* You call them "logical messages" here, but standby messages in code. But they only apply to logical decoding, so "logical message" seems a better name. Can we avoid calling them "messages" cos that will get confusing.
 
For standard WAL reply, these are basically noop

We should document that.
 
These messages can be both transactional (decoded on commit) or non-transactional (decoded immediately). Each message has prefix to differentiate between individual plugins. The prefix has to be registered exactly once (in similar manner as security label providers) to avoid conflicts between plugins.

I'm not sure what "transactional" means, nor is that documented. (Conversely, I think "immediate" fairly clear)

Are they fired only on commit? (Guess so)
Are they fired in the original order, if multiple messages in same transaction? (Hope so)
Are they fired as they come in the original message sequence, or before anything else or after everything else? For example, cache invalidation messages are normally fired right at the end of a transaction, no matter when they were triggered.

--
Simon Riggs                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 2016-01-07 17:22, Simon Riggs wrote:
>
> * You call them "logical messages" here, but standby messages in code.
> But they only apply to logical decoding, so "logical message" seems a
> better name. Can we avoid calling them "messages" cos that will get
> confusing.
>

Yes it's slightly confusing, the "Standby" in the code is mostly for 
consistency with other "Standby*" stuff in neighbouring code, but that 
can be changed. I don't have better name than "messages" though, 
"records" is too generic.

>     For standard WAL reply, these are basically noop
>
>
> We should document that.

Okay.

>
>     These messages can be both transactional (decoded on commit) or
>     non-transactional (decoded immediately). Each message has prefix to
>     differentiate between individual plugins. The prefix has to be
>     registered exactly once (in similar manner as security label
>     providers) to avoid conflicts between plugins.
>
>
> I'm not sure what "transactional" means, nor is that documented.
> (Conversely, I think "immediate" fairly clear)
>
> Are they fired only on commit? (Guess so)
> Are they fired in the original order, if multiple messages in same
> transaction? (Hope so)
> Are they fired as they come in the original message sequence, or before
> anything else or after everything else? For example, cache invalidation
> messages are normally fired right at the end of a transaction, no matter
> when they were triggered.
>

Transnational message is added to the stream same way as any DML 
operation is and has same properties (processed on commit, original 
order, duplicate messages are delivered as they are).

The immediate as you say is obvious, they get to logical decoding 
immediately without dealing with any regards to what's happening around 
(wal order is still preserved though).

Will make this clearer in the docs.

--  Petr Jelinek                  http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training &
Services



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Hi,

here is updated version of this patch, calling the messages logical
(decoding) messages consistently everywhere and removing any connection
to standby messages. Moving this to it's own module gave me place to
write some brief explanation about this so the code documentation has
hopefully improved as well.

The functionality itself didn't change.

--
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Alexander Korotkov
Дата:
Hi, Petr!

On Sat, Jan 23, 2016 at 1:22 AM, Petr Jelinek <petr@2ndquadrant.com> wrote:
here is updated version of this patch, calling the messages logical (decoding) messages consistently everywhere and removing any connection to standby messages. Moving this to it's own module gave me place to write some brief explanation about this so the code documentation has hopefully improved as well.

The functionality itself didn't change.

I'd like to mention that there is my upcoming patch which is named generic WAL records.
But it has to be distinct feature from your generic WAL logical messages. Theoretically, we could have generic messages with arbitrary content and both having custom WAL reply function and being decoded by output plugin. But custom WAL reply function would let extension bug break recovery, archiving and physical replication. And that doesn't seem to be acceptable. This is why we have to develop these as separate features.

Should we think more about naming? Does two kinds of generic records confuse people?

------
Alexander Korotkov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
 

Re: Proposal: Generic WAL logical messages

От
Simon Riggs
Дата:
On 29 January 2016 at 21:11, Alexander Korotkov <a.korotkov@postgrespro.ru> wrote:
Hi, Petr!

On Sat, Jan 23, 2016 at 1:22 AM, Petr Jelinek <petr@2ndquadrant.com> wrote:
here is updated version of this patch, calling the messages logical (decoding) messages consistently everywhere and removing any connection to standby messages. Moving this to it's own module gave me place to write some brief explanation about this so the code documentation has hopefully improved as well.

The functionality itself didn't change.

I'd like to mention that there is my upcoming patch which is named generic WAL records.
But it has to be distinct feature from your generic WAL logical messages. Theoretically, we could have generic messages with arbitrary content and both having custom WAL reply function and being decoded by output plugin. But custom WAL reply function would let extension bug break recovery, archiving and physical replication. And that doesn't seem to be acceptable. This is why we have to develop these as separate features.

Should we think more about naming? Does two kinds of generic records confuse people?

Logical messages

Generic WAL records

Seems like I can tell them apart. Worth checking, but I think we're OK.

--
Simon Riggs                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: Proposal: Generic WAL logical messages

От
Alexander Korotkov
Дата:
On Sat, Jan 30, 2016 at 11:58 AM, Simon Riggs <simon@2ndquadrant.com> wrote:
On 29 January 2016 at 21:11, Alexander Korotkov <a.korotkov@postgrespro.ru> wrote:
Hi, Petr!

On Sat, Jan 23, 2016 at 1:22 AM, Petr Jelinek <petr@2ndquadrant.com> wrote:
here is updated version of this patch, calling the messages logical (decoding) messages consistently everywhere and removing any connection to standby messages. Moving this to it's own module gave me place to write some brief explanation about this so the code documentation has hopefully improved as well.

The functionality itself didn't change.

I'd like to mention that there is my upcoming patch which is named generic WAL records.
But it has to be distinct feature from your generic WAL logical messages. Theoretically, we could have generic messages with arbitrary content and both having custom WAL reply function and being decoded by output plugin. But custom WAL reply function would let extension bug break recovery, archiving and physical replication. And that doesn't seem to be acceptable. This is why we have to develop these as separate features.

Should we think more about naming? Does two kinds of generic records confuse people?

Logical messages

Generic WAL records

Seems like I can tell them apart. Worth checking, but I think we're OK.

I was worrying because topic name is "Generic WAL logical messages". But if we name them just "Logical messages" then it's OK for me.

------

Alexander Korotkov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company 

Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 1 February 2016 at 09:45, Alexander Korotkov
<a.korotkov@postgrespro.ru> wrote:
> On Sat, Jan 30, 2016 at 11:58 AM, Simon Riggs <simon@2ndquadrant.com> wrote:
>>
>> On 29 January 2016 at 21:11, Alexander Korotkov
>> <a.korotkov@postgrespro.ru> wrote:
>>>
>>> Should we think more about naming? Does two kinds of generic records
>>> confuse people?
>>
>>
>> Logical messages
>>
>> Generic WAL records
>>
>> Seems like I can tell them apart. Worth checking, but I think we're OK.
>
>
> I was worrying because topic name is "Generic WAL logical messages". But if
> we name them just "Logical messages" then it's OK for me.
>

Yeah the patch talks about logical messages, I use different title in
mailing list and CF to make it more clear on first sight what this
actually is technically.

--  Petr Jelinek                  http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training &
Services



Re: Proposal: Generic WAL logical messages

От
Artur Zakirov
Дата:
On 23.01.2016 01:22, Petr Jelinek wrote:
> Hi,
>
> here is updated version of this patch, calling the messages logical
> (decoding) messages consistently everywhere and removing any connection
> to standby messages. Moving this to it's own module gave me place to
> write some brief explanation about this so the code documentation has
> hopefully improved as well.
>
> The functionality itself didn't change.
>
>
>
>

Hello,

It seems that you forgot regression tests for test_decoding. There is an 
entry in test_decoding/Makefile, but there are not files 
sql/messages.sql and expected/messages.out. However they are included in 
the first version of the patch.

-- 
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
>
> Hello,
>
> It seems that you forgot regression tests for test_decoding. There is an
> entry in test_decoding/Makefile, but there are not files
> sql/messages.sql and expected/messages.out. However they are included in
> the first version of the patch.
>

Hi, yes, git add missing.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
Hi,

I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now?  Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?


On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
> +SET synchronous_commit = on;
> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
> + ?column? 
> +----------
> + init
> +(1 row)

> +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
> + ?column? 
> +----------
> + msg1
> +(1 row)

Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?

> +      <row>
> +       <entry id="pg-logical-send-message-text">
> +        <indexterm>
> +         <primary>pg_logical_send_message</primary>
> +        </indexterm>
> +        <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>,
<parameter>prefix</parameter><type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
 
> +       </entry>
> +       <entry>
> +        void
> +       </entry>
> +       <entry>
> +        Write text logical decoding message. This can be used to pass generic
> +        messages to logical decoding plugins through WAL. The parameter
> +        <parameter>transactional</parameter> specifies if the message should
> +        be part of current transaction or if it should be written and decoded
> +        immediately. The <parameter>prefix</parameter> has to be prefix which
> +        was registered by a plugin. The <parameter>content</parameter> is
> +        content of the message.
> +       </entry>
> +      </row>

It's not decoded immediately, even if emitted non-transactionally.

> +    <sect3 id="logicaldecoding-output-plugin-message">
> +     <title>Generic Message Callback</title>
> +
> +     <para>
> +      The optional <function>message_cb</function> callback is called whenever
> +      a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> +    struct LogicalDecodingContext *,
> +    ReorderBufferTXN *txn,
> +    XLogRecPtr message_lsn,
> +    bool transactional,
> +    const char *prefix,
> +    Size message_size,
> +    const char *message
> +);

We should at least document what txn is set to if not transactional.

> +void
> +logicalmsg_desc(StringInfo buf, XLogReaderState *record)
> +{
> +    char               *rec = XLogRecGetData(record);
> +    xl_logical_message *xlrec = (xl_logical_message *) rec;
> +
> +    appendStringInfo(buf, "%s message size %zu bytes",
> +                     xlrec->transactional ? "transactional" : "nontransactional",
> +                     xlrec->message_size);
> +}

Shouldn't we check         uint8         info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;         if
XLogRecGetInfo(record)== XLOG_LOGICAL_MESSAGE
 
here?

> +const char *
> +logicalmsg_identify(uint8 info)
> +{
> +    return NULL;
> +}

Huh?


> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> +                          bool transactional, const char *prefix, Size msg_sz,
> +                          const char *msg)
> +{
> +    ReorderBufferTXN *txn = NULL;
> +
> +    if (transactional)
> +    {
> +        ReorderBufferChange *change;
> +
> +        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> +
> +        Assert(xid != InvalidTransactionId);
> +        Assert(txn != NULL);
> +
> +        change = ReorderBufferGetChange(rb);
> +        change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> +        change->data.msg.transactional = true;
> +        change->data.msg.prefix = pstrdup(prefix);
> +        change->data.msg.message_size = msg_sz;
> +        change->data.msg.message = palloc(msg_sz);
> +        memcpy(change->data.msg.message, msg, msg_sz);
> +
> +        ReorderBufferQueueChange(rb, xid, lsn, change);
> +    }
> +    else
> +    {
> +        rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
> +    }
> +}


This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.

> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            {
> +                char       *data;
> +                size_t        prefix_size = strlen(change->data.msg.prefix) + 1;
> +
> +                sz += prefix_size + change->data.msg.message_size;
> +                ReorderBufferSerializeReserve(rb, sz);
> +
> +                data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
> +                memcpy(data, change->data.msg.prefix,
> +                       prefix_size);
> +                memcpy(data + prefix_size, change->data.msg.message,
> +                       change->data.msg.message_size);
> +                break;
> +            }
>          case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
>              {
>                  Snapshot    snap;
> @@ -2354,6 +2415,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
>                  data += len;
>              }
>              break;
> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            {
> +                Size        message_size = change->data.msg.message_size;
> +                Size        prefix_size = strlen(data) + 1;
> +
> +                change->data.msg.prefix = pstrdup(data);
> +                change->data.msg.message = palloc(message_size);
> +                memcpy(change->data.msg.message, data + prefix_size,
> +                       message_size);
> +
> +                data += prefix_size + message_size;
> +            }

Please add a test exercising these paths.


Greetings,

Andres Freund



Re: Proposal: Generic WAL logical messages

От
Artur Zakirov
Дата:
Hello,

On 27.02.2016 03:05, Andres Freund wrote:
> Hi,
>
> I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
> not much documentation about what it actually is supposed to
> acomplish. Afaics you're basically forced to use
> shared_preload_libraries with it right now?  Also, iterating through a
> linked list everytime something is logged doesn't seem very satisfying?
>

I have did some tests with a simple plugin. I have used event triggers 
to send messages. It works, but I agree with Andres. We have problems if 
plugin is not loaded. For example, if you will execute the query:

SELECT 'msg2' FROM pg_logical_send_message(false, 'test', 'msg2');

you will get the error (if plugin which should register a prefix is not 
loaded yet):

ERROR:  standby message prefix "test" is not registered

Some stylistic note (logical.c):

> +static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
> +                               XLogRecPtr message_lsn,
> +                               bool transactional, const char *prefix,
> +                               Size sz, const char *message)
> +{
> +    LogicalDecodingContext *ctx = cache->private_data;
> +    LogicalErrorCallbackState state;
> +    ErrorContextCallback errcallback;

It should be written in the following way:

static void
message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,               XLogRecPtr message_lsn,
booltransactional, const char *prefix,               Size sz, const char *message)
 
{LogicalDecodingContext *ctx = cache->private_data;LogicalErrorCallbackState state;ErrorContextCallback errcallback;

-- 
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Hi,

thanks for looking Andres,


On 27/02/16 01:05, Andres Freund wrote:
> Hi,
>
> I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
> not much documentation about what it actually is supposed to
> acomplish. Afaics you're basically forced to use
> shared_preload_libraries with it right now?  Also, iterating through a
> linked list everytime something is logged doesn't seem very satisfying?
>

Well, my reasoning there was to stop multiple plugins from using same 
prefix and for that you need some kind of registry. Making this a shared 
catalog seemed like huge overkill given the potentially transient nature 
of output plugins and this was the best I could come up with. And yes it 
requires you to load your plugin before you can log a message for it.

I am not married to this solution so if you have better ideas or if you 
think the clashes are not read issue, we can change it.

>
> On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
>> +SET synchronous_commit = on;
>> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
>> + ?column?
>> +----------
>> + init
>> +(1 row)
>
>> +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
>> + ?column?
>> +----------
>> + msg1
>> +(1 row)
>
> Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?
>
>> +      <row>
>> +       <entry id="pg-logical-send-message-text">
>> +        <indexterm>
>> +         <primary>pg_logical_send_message</primary>
>> +        </indexterm>
>> +        <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>,
<parameter>prefix</parameter><type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
 
>> +       </entry>
>> +       <entry>
>> +        void
>> +       </entry>
>> +       <entry>
>> +        Write text logical decoding message. This can be used to pass generic
>> +        messages to logical decoding plugins through WAL. The parameter
>> +        <parameter>transactional</parameter> specifies if the message should
>> +        be part of current transaction or if it should be written and decoded
>> +        immediately. The <parameter>prefix</parameter> has to be prefix which
>> +        was registered by a plugin. The <parameter>content</parameter> is
>> +        content of the message.
>> +       </entry>
>> +      </row>
>
> It's not decoded immediately, even if emitted non-transactionally.
>

Okay, immediately is somewhat misleading. How does "should be written 
immediately and decoded as soon as logical decoding reads the WAL 
record" sound ?

>> +    <sect3 id="logicaldecoding-output-plugin-message">
>> +     <title>Generic Message Callback</title>
>> +
>> +     <para>
>> +      The optional <function>message_cb</function> callback is called whenever
>> +      a logical decoding message has been decoded.
>> +<programlisting>
>> +typedef void (*LogicalDecodeMessageCB) (
>> +    struct LogicalDecodingContext *,
>> +    ReorderBufferTXN *txn,
>> +    XLogRecPtr message_lsn,
>> +    bool transactional,
>> +    const char *prefix,
>> +    Size message_size,
>> +    const char *message
>> +);
>
> We should at least document what txn is set to if not transactional.
>

Will do (it's NULL).

>> +void
>> +logicalmsg_desc(StringInfo buf, XLogReaderState *record)
>> +{
>> +    char               *rec = XLogRecGetData(record);
>> +    xl_logical_message *xlrec = (xl_logical_message *) rec;
>> +
>> +    appendStringInfo(buf, "%s message size %zu bytes",
>> +                     xlrec->transactional ? "transactional" : "nontransactional",
>> +                     xlrec->message_size);
>> +}
>
> Shouldn't we check
>            uint8         info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
>            if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
> here?
>

I thought it's kinda pointless, but we seem to be doing it in other 
places so will add.

>
>> +void
>> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
>> +                          bool transactional, const char *prefix, Size msg_sz,
>> +                          const char *msg)
>> +{
>> +    ReorderBufferTXN *txn = NULL;
>> +
>> +    if (transactional)
>> +    {
>> +        ReorderBufferChange *change;
>> +
>> +        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
>> +
>> +        Assert(xid != InvalidTransactionId);
>> +        Assert(txn != NULL);
>> +
>> +        change = ReorderBufferGetChange(rb);
>> +        change->action = REORDER_BUFFER_CHANGE_MESSAGE;
>> +        change->data.msg.transactional = true;
>> +        change->data.msg.prefix = pstrdup(prefix);
>> +        change->data.msg.message_size = msg_sz;
>> +        change->data.msg.message = palloc(msg_sz);
>> +        memcpy(change->data.msg.message, msg, msg_sz);
>> +
>> +        ReorderBufferQueueChange(rb, xid, lsn, change);
>> +    }
>> +    else
>> +    {
>> +        rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
>> +    }
>> +}
>
>
> This approach prohibts catalog access when processing a nontransaction
> message as there's no snapshot set up.
>

Hmm I do see usefulness in having snapshot, although I wonder if that 
does not kill the point of non-transactional messages. Question is then 
though which snapshot should the message see, base_snapshot of 
transaction? That would mean we'd have to call SnapBuildProcessChange 
for non-transactional messages which we currently avoid. Alternatively 
we could probably invent lighter version of that interface that would 
just make sure builder->snapshot is valid and if not then build it but 
I am honestly sure if that's a win or not.

--   Petr Jelinek                  http://www.2ndQuadrant.com/  PostgreSQL Development, 24x7 Support, Training &
Services



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
Hi,

On 2016-02-28 22:44:12 +0100, Petr Jelinek wrote:
> On 27/02/16 01:05, Andres Freund wrote:
> >I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
> >not much documentation about what it actually is supposed to
> >acomplish. Afaics you're basically forced to use
> >shared_preload_libraries with it right now?  Also, iterating through a
> >linked list everytime something is logged doesn't seem very satisfying?
> >
> 
> Well, my reasoning there was to stop multiple plugins from using same prefix
> and for that you need some kind of registry. Making this a shared catalog
> seemed like huge overkill given the potentially transient nature of output
> plugins and this was the best I could come up with. And yes it requires you
> to load your plugin before you can log a message for it.

I think right now that's a solution that's worse than the problem.  I'm
inclined to define the problem away with something like "The prefix
should be unique across different users of the messaging facility. Using
the extension name often is a good choice.".


> >>+void
> >>+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
> >>+{
> >>+    char               *rec = XLogRecGetData(record);
> >>+    xl_logical_message *xlrec = (xl_logical_message *) rec;
> >>+
> >>+    appendStringInfo(buf, "%s message size %zu bytes",
> >>+                     xlrec->transactional ? "transactional" : "nontransactional",
> >>+                     xlrec->message_size);
> >>+}
> >
> >Shouldn't we check
> >           uint8         info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
> >           if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
> >here?
> >
> 
> I thought it's kinda pointless, but we seem to be doing it in other places
> so will add.

It leads to a segfault or something similar when adding further message
types, without a compiler warning. So it seems to be a good idea to be
slightly careful.


> >
> >>+void
> >>+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> >>+                          bool transactional, const char *prefix, Size msg_sz,
> >>+                          const char *msg)
> >>+{
> >>+    ReorderBufferTXN *txn = NULL;
> >>+
> >>+    if (transactional)
> >>+    {
> >>+        ReorderBufferChange *change;
> >>+
> >>+        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> >>+
> >>+        Assert(xid != InvalidTransactionId);
> >>+        Assert(txn != NULL);
> >>+
> >>+        change = ReorderBufferGetChange(rb);
> >>+        change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> >>+        change->data.msg.transactional = true;
> >>+        change->data.msg.prefix = pstrdup(prefix);
> >>+        change->data.msg.message_size = msg_sz;
> >>+        change->data.msg.message = palloc(msg_sz);
> >>+        memcpy(change->data.msg.message, msg, msg_sz);
> >>+
> >>+        ReorderBufferQueueChange(rb, xid, lsn, change);
> >>+    }
> >>+    else
> >>+    {
> >>+        rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
> >>+    }
> >>+}
> >
> >
> >This approach prohibts catalog access when processing a nontransaction
> >message as there's no snapshot set up.
> >
> 
> Hmm I do see usefulness in having snapshot, although I wonder if that does
> not kill the point of non-transactional messages.

I don't see how it would? It'd obviously have to be the catalog/historic
snapshot a transaction would have had if it started in that moment in
the original WAL stream?


> Question is then though which snapshot should the message see,
> base_snapshot of transaction?

Well, there'll not be a transaction, but something like snapbuild.c's
->snapshot ought to do the trick.


> That would mean we'd have to call SnapBuildProcessChange for
> non-transactional messages which we currently avoid. Alternatively we
> could probably invent lighter version of that interface that would
> just make sure builder->snapshot is valid and if not then build it

I think the latter is probably the direction we should go in.


> I am honestly sure if that's a win or not.

I think it'll be confusing (bug inducing) if there's no snapshot for
non-transactional messages but for transactional ones, and it'll
severely limit the usefulness of the interface.


Andres



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Hi,

attached is the newest version of the patch.

I removed the registry, renamed the 'send' to 'emit', documented the
callback parameters properly. I also added the test to ddl.sql for the
serialization and deserialization (and of course found a bug there) and
in general fixed all the stuff Andres reported.

(see more inline)

On 28/02/16 22:55, Andres Freund wrote:
>
>
>>>
>>>> +void
>>>> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
>>>> +                          bool transactional, const char *prefix, Size msg_sz,
>>>> +                          const char *msg)
>>>> +{
>>>> +    ReorderBufferTXN *txn = NULL;
>>>> +
>>>> +    if (transactional)
>>>> +    {
>>>> +        ReorderBufferChange *change;
>>>> +
>>>> +        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
>>>> +
>>>> +        Assert(xid != InvalidTransactionId);
>>>> +        Assert(txn != NULL);
>>>> +
>>>> +        change = ReorderBufferGetChange(rb);
>>>> +        change->action = REORDER_BUFFER_CHANGE_MESSAGE;
>>>> +        change->data.msg.transactional = true;
>>>> +        change->data.msg.prefix = pstrdup(prefix);
>>>> +        change->data.msg.message_size = msg_sz;
>>>> +        change->data.msg.message = palloc(msg_sz);
>>>> +        memcpy(change->data.msg.message, msg, msg_sz);
>>>> +
>>>> +        ReorderBufferQueueChange(rb, xid, lsn, change);
>>>> +    }
>>>> +    else
>>>> +    {
>>>> +        rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
>>>> +    }
>>>> +}
>>>
>>>
>>> This approach prohibts catalog access when processing a nontransaction
>>> message as there's no snapshot set up.
>>>
>>
>> Hmm I do see usefulness in having snapshot, although I wonder if that does
>> not kill the point of non-transactional messages.
>
> I don't see how it would? It'd obviously have to be the catalog/historic
> snapshot a transaction would have had if it started in that moment in
> the original WAL stream?
>
>
>> Question is then though which snapshot should the message see,
>> base_snapshot of transaction?
>
> Well, there'll not be a transaction, but something like snapbuild.c's
> ->snapshot ought to do the trick.
>

Ok I added interface which returns either existing snapshot or makes new
one, seems like the most reasonable thing to do to me.

>
>> That would mean we'd have to call SnapBuildProcessChange for
>> non-transactional messages which we currently avoid. Alternatively we
>> could probably invent lighter version of that interface that would
>> just make sure builder->snapshot is valid and if not then build it
>
> I think the latter is probably the direction we should go in.
>
>
>> I am honestly sure if that's a win or not.
>
> I think it'll be confusing (bug inducing) if there's no snapshot for
> non-transactional messages but for transactional ones, and it'll
> severely limit the usefulness of the interface.
>

Nono, I meant I am not sure if special interface is a win over just
using SnapBuildProcessChange() in practice.


--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Artur Zakirov
Дата:
I think here

> +const char *
> +logicalmsg_identify(uint8 info)
> +{
> +    if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
> +        return "MESSAGE";
> +
> +    return NULL;
> +}

we should use brackets

const char *
logicalmsg_identify(uint8 info)
{if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)    return "MESSAGE";
return NULL;
}

Because of operator priorities 
http://en.cppreference.com/w/c/language/operator_precedence we may get 
errors.

On 01.03.2016 00:10, Petr Jelinek wrote:
> Hi,
>
> attached is the newest version of the patch.
>
> I removed the registry, renamed the 'send' to 'emit', documented the
> callback parameters properly. I also added the test to ddl.sql for the
> serialization and deserialization (and of course found a bug there) and
> in general fixed all the stuff Andres reported.
>
> (see more inline)

-- 
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 08/03/16 21:21, Artur Zakirov wrote:
> I think here
>
>> +const char *
>> +logicalmsg_identify(uint8 info)
>> +{
>> +    if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
>> +        return "MESSAGE";
>> +
>> +    return NULL;
>> +}
>
> we should use brackets
>
> const char *
> logicalmsg_identify(uint8 info)
> {
>      if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
>          return "MESSAGE";
>
>      return NULL;
> }
>

Correct, fixed, thanks.

I also rebased this as there was conflict after the fixes to logical
decoding by Andres.


--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
David Steele
Дата:
On 3/9/16 6:58 PM, Petr Jelinek wrote:
> On 08/03/16 21:21, Artur Zakirov wrote:
>> I think here
>>
>>> +const char *
>>> +logicalmsg_identify(uint8 info)
>>> +{
>>> +    if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
>>> +        return "MESSAGE";
>>> +
>>> +    return NULL;
>>> +}
>>
>> we should use brackets
>>
>> const char *
>> logicalmsg_identify(uint8 info)
>> {
>>      if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
>>          return "MESSAGE";
>>
>>      return NULL;
>> }
>>
> 
> Correct, fixed, thanks.
> 
> I also rebased this as there was conflict after the fixes to logical
> decoding by Andres.

This patch applies cleanly and is ready for review with no outstanding
issues that I can see.  Simon and Artur, you are both signed up as
reviewers.  Care to take a crack at it?

Thanks,
-- 
-David
david@pgmasters.net



Re: Proposal: Generic WAL logical messages

От
Artur Zakirov
Дата:
On 16.03.2016 18:56, David Steele wrote:
>
> This patch applies cleanly and is ready for review with no outstanding
> issues that I can see.  Simon and Artur, you are both signed up as
> reviewers.  Care to take a crack at it?
>
> Thanks,
>

I have tested the patch once again and have looked the code. It looks 
good for me. I haven't any observation.

The patch does its function correctly. I have tested it with a plugin, 
which writes DDL queries to the WAL using a hook and replicates this 
queries at subscribers.

If Simon is not against, the patch can be marked as "Ready for Commiter".

-- 
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company



Re: Proposal: Generic WAL logical messages

От
Tomas Vondra
Дата:
Hi,

a few comments about the last version of the patch:


1) LogicalDecodeMessageCB

Do we actually need the 'transactional' parameter here? I mean, having 
the 'txn' should be enough, as
    transactional = (txt != NULL)

Of course, having a simple flag is more convenient.


2) pg_logical_emit_message_bytea / pg_logical_emit_message_text

The comment before _bytea is wrong - it's just a copy'n'paste from the 
preceding function (pg_logical_slot_peek_binary_changes). _text has no 
comment at all, but it's true it's  just a simple _bytea wrapper.

The main issue here however is that the functions are not defined as 
strict, but ignore the possibility that the parameters might be NULL. So 
for example this crashes the backend

SELECT pg_logical_emit_message(NULL::boolean, NULL::text, NULL::text);


3) ReorderBufferQueueMessage

No comment. Not a big deal I guess, the method is simple enough, but why 
to break the rule when all the other methods around have at least a 
short one?


4) ReorderBufferChange

The new struct in the 'union' would probably deserve at least a short 
comment explaining the purpose (just like the other structs around).


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Proposal: Generic WAL logical messages

От
Craig Ringer
Дата:
On 17 March 2016 at 19:08, Artur Zakirov <a.zakirov@postgrespro.ru> wrote:
On 16.03.2016 18:56, David Steele wrote:

This patch applies cleanly and is ready for review with no outstanding
issues that I can see.  Simon and Artur, you are both signed up as
reviewers.  Care to take a crack at it?

Thanks,


I have tested the patch once again and have looked the code. It looks good for me. I haven't any observation.

The patch does its function correctly. I have tested it with a plugin, which writes DDL queries to the WAL using a hook and replicates this queries at subscribers.

Would you mind sharing the plugin here? I could add it to src/test/modules and add some t/ tests so it runs under the TAP test framework.


--
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

Re: Proposal: Generic WAL logical messages

От
Artur Zakirov
Дата:
On 17.03.2016 15:42, Craig Ringer wrote:
>
>
> Would you mind sharing the plugin here? I could add it to
> src/test/modules and add some t/ tests so it runs under the TAP test
> framework.
>
>
> --
>   Craig Ringer http://www.2ndQuadrant.com/
>   PostgreSQL Development, 24x7 Support, Training & Services

Of course. I attached it.

In a provider node you need to set shared_preload_libraries to
'pg_ddl_decode'.

In a subscriber node you need:
1 - install pg_ddl_decode by the command:

CREATE EXTENSION pg_ddl_decode;

2 - call the function:

SELECT ddl_create_slot('host=providerhost port=5432 dbname=db');

3 - after some DDL queries in provider you need to execute in a
subscriber the command manually:

SELECT ddl_get_changes('host=providerhost port=5432 dbname=db');


I warn that this plugin has ugly code sometimes. And ddl_get_changes()
has a infinite loop. But it shows concept of DDL replication.

Also this plugin uses some functions from pglogical_output and pglogical.


Earlier I have did tests with pglogical. I attached the patch for pglogical.

--
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company

Вложения

Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Hi,

thanks for review.

On 17/03/16 13:36, Tomas Vondra wrote:
> Hi,
>
> a few comments about the last version of the patch:
>
>
> 1) LogicalDecodeMessageCB
>
> Do we actually need the 'transactional' parameter here? I mean, having
> the 'txn' should be enough, as
>
>      transactional = (txt != NULL)
>

Agreed. Same goes for the ReoderBufferChange struct btw, only
transactional messages go there so no point in marking them as such.

>
>
> 2) pg_logical_emit_message_bytea / pg_logical_emit_message_text
>
> The comment before _bytea is wrong - it's just a copy'n'paste from the
> preceding function (pg_logical_slot_peek_binary_changes). _text has no
> comment at all, but it's true it's  just a simple _bytea wrapper.
>

Heh, blind.

> The main issue here however is that the functions are not defined as
> strict, but ignore the possibility that the parameters might be NULL. So
> for example this crashes the backend
>
> SELECT pg_logical_emit_message(NULL::boolean, NULL::text, NULL::text);
>

Good point.

>
> 3) ReorderBufferQueueMessage
>
> No comment. Not a big deal I guess, the method is simple enough, but why
> to break the rule when all the other methods around have at least a
> short one?
>

Yeah I sometimes am not sure if there is a point to put comment to tiny
straightforward functions that do more or less same as the one above.
But it's public API so probably better to have one.

>
> 4) ReorderBufferChange
>
> The new struct in the 'union' would probably deserve at least a short
> comment explaining the purpose (just like the other structs around).
>


Okay.

Updated version attached.

(BTW please try to CC author of the patch when reviewing)


--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Craig Ringer
Дата:
On 18 March 2016 at 20:36, Artur Zakirov <a.zakirov@postgrespro.ru> wrote:
On 17.03.2016 15:42, Craig Ringer wrote:


Would you mind sharing the plugin here? I could add it to
src/test/modules and add some t/ tests so it runs under the TAP test
framework.


--
  Craig Ringer http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services

Of course. I attached it.

Thanks for that.

Since it incorporates fairly significant chunks of code from a number of places and is really a proof of concept rather than demo/example I don't think it can really be included as a TAP test module, not without more rewriting than I currently have time for anyway. 
 
 
But it shows concept of DDL replication.

Yeah, a part of it anyway. As I outlined in another thread some time ago, getting DDL replication right is fairly tricky and requires more changes than just capturing the raw DDL (or even deparsed DDL) and sending it over the wire.

It's a useful proof of concept, but way too big to use as an in-tree regression for logical WAL messages as I was hoping.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Just noticed there is missing symlink in the pg_xlogdump.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> Just noticed there is missing symlink in the pg_xlogdump.

>  create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
>  create mode 120000 src/bin/pg_xlogdump/logicalmsgdesc.c

Uh, src/bin/pg_xlogdump/logicalmsgdesc.c shouldn't be there. The symlink
is supposed to be automatically created by the Makefile.

Were you perhaps confused because it showed up in git status? If so,
that's probably because it isn't in
src/bin/pg_xlogdump/.gitignore. Perhaps we should change that file to
ignore *desc.c?

> +      <row>
> +       <entry id="pg-logical-emit-message-text">
> +        <indexterm>
> +         <primary>pg_logical_emit_message</primary>
> +        </indexterm>
> +        <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>,
<parameter>prefix</parameter><type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
 
> +       </entry>
> +       <entry>
> +        void
> +       </entry>
> +       <entry>
> +        Write text logical decoding message. This can be used to pass generic
> +        messages to logical decoding plugins through WAL. The parameter
> +        <parameter>transactional</parameter> specifies if the message should
> +        be part of current transaction or if it should be written immediately
> +        and decoded as soon as the logical decoding reads the record. The
> +        <parameter>prefix</parameter> is textual prefix used by the logical
> +        decoding plugins to easily recognize interesting messages for them.
> +        The <parameter>content</parameter> is the text of the message.
> +       </entry>
> +      </row>

s/write/emit/?

> +
> +    <sect3 id="logicaldecoding-output-plugin-message">
> +     <title>Generic Message Callback</title>
> +
> +     <para>
> +      The optional <function>message_cb</function> callback is called whenever
> +      a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> +    struct LogicalDecodingContext *,
> +    ReorderBufferTXN *txn,
> +    XLogRecPtr message_lsn,
> +    const char *prefix,
> +    Size message_size,
> +    const char *message
> +);

I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?


> +/*
> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> + */
> +static void
> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> +    SnapBuild  *builder = ctx->snapshot_builder;
> +    XLogReaderState *r = buf->record;
> +    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> +    xl_logical_message *message;
> +
> +    if (info != XLOG_LOGICAL_MESSAGE)
> +        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
> +
> +    message = (xl_logical_message *) XLogRecGetData(r);
> +
> +    if (message->transactional)
> +    {
> +        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
> +            return;
> +
> +        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> +                                  buf->endptr,
> +                                  message->message, /* first part of message is prefix */
> +                                  message->message_size,
> +                                  message->message + message->prefix_size);
> +    }
> +    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> +             !SnapBuildXactNeedsSkip(builder, buf->origptr))
> +    {
> +        volatile Snapshot    snapshot_now;
> +        ReorderBuffer       *rb = ctx->reorder;
> +
> +        /* setup snapshot to allow catalog access */
> +        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
> +        SetupHistoricSnapshot(snapshot_now, NULL);
> +        rb->message(rb, NULL, buf->origptr, message->message,
> +                    message->message_size,
> +                    message->message + message->prefix_size);
> +        TeardownHistoricSnapshot(false);
> +    }
> +}

A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for  transactional messages - we can't yet necessarily
builda snapshot.
 
2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
3) This lacks error handling, we surely don't want to error out while  still having the historic snapshot setup
4) Without 3) the volatile is bogus.
5) Misses a ReorderBufferProcessXid() call.

> + * Every message carries prefix to avoid conflicts between different decoding
> + * plugins. The prefix has to be registered before the message using that
> + * prefix can be written to XLOG. The prefix can be registered exactly once to
> + * avoid situation where multiple third party extensions try to use same
> + * prefix.

Outdated afaics?


> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>                  change->data.tp.oldtuple = NULL;
>              }
>              break;
> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            if (change->data.msg.prefix != NULL)
> +                pfree(change->data.msg.prefix);
> +            change->data.msg.prefix = NULL;
> +            if (change->data.msg.message != NULL)
> +                pfree(change->data.msg.message);
> +            change->data.msg.message = NULL;
> +            break;

Hm, this will have some overhead, but I guess the messages won't be
super frequent, and usually not very large.

> +/*
> + * Queue message into a transaction so it can be processed upon commit.
> + */
> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> +                          const char *prefix, Size msg_sz, const char *msg)
> +{
> +    ReorderBufferChange *change;
> +
> +    Assert(xid != InvalidTransactionId);
> +
> +    change = ReorderBufferGetChange(rb);
> +    change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> +    change->data.msg.prefix = pstrdup(prefix);
> +    change->data.msg.message_size = msg_sz;
> +    change->data.msg.message = palloc(msg_sz);
> +    memcpy(change->data.msg.message, msg, msg_sz);
> +
> +    ReorderBufferQueueChange(rb, xid, lsn, change);
> +}

I'm not sure right now if there's any guarantee that the current memory
context is meaningful here? IIRC other long-lived allocations explicitly
use a context?

> +        case REORDER_BUFFER_CHANGE_MESSAGE:
> +            {
> +                char       *data;
> +                size_t        prefix_size = strlen(change->data.msg.prefix) + 1;
> +
> +                sz += prefix_size + change->data.msg.message_size;
> +                ReorderBufferSerializeReserve(rb, sz);
> +
> +                data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
> +                memcpy(data, change->data.msg.prefix,
> +                       prefix_size);
> +                memcpy(data + prefix_size, change->data.msg.message,
> +                       change->data.msg.message_size);
> +                break;
> +            }

Can you please include the sizes of the blocks explicitly, rather than
relying on 0 termination?


> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
>  PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
>  PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
>  PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
>  logicalmsg_desc, logicalmsg_identify, NULL, NULL)

Did you consider doing this via the standby rmgr instead?

> +typedef struct xl_logical_message
> +{
> +    bool        transactional;                    /* is message transactional? */
> +    size_t        prefix_size;                    /* length of prefix */
> +    size_t        message_size;                    /* size of the message */
> +    char        message[FLEXIBLE_ARRAY_MEMBER];    /* message including the null
> +                                                 * terminated prefx of length
> +                                                 * prefix_size */
> +} xl_logical_message;
>

"prefx".

Greetings,

Andres Freund



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 22/03/16 12:47, Andres Freund wrote:
> On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
>
>> +
>> +    <sect3 id="logicaldecoding-output-plugin-message">
>> +     <title>Generic Message Callback</title>
>> +
>> +     <para>
>> +      The optional <function>message_cb</function> callback is called whenever
>> +      a logical decoding message has been decoded.
>> +<programlisting>
>> +typedef void (*LogicalDecodeMessageCB) (
>> +    struct LogicalDecodingContext *,
>> +    ReorderBufferTXN *txn,
>> +    XLogRecPtr message_lsn,
>> +    const char *prefix,
>> +    Size message_size,
>> +    const char *message
>> +);
>
> I see you removed the transactional parameter. I'm doubtful that that's
> a good idea: It seems like it'd be rather helpful to pass the
> transaction for a nontransaction message that's emitted while an xid was
> assigned?
>

Hmm but won't that give the output plugin even transactions that were
later aborted? That seems quite different behavior from how the txn
parameter works everywhere else.

>
>> +/*
>> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
>> + */
>> +static void
>> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
>> +    SnapBuild  *builder = ctx->snapshot_builder;
>> +    XLogReaderState *r = buf->record;
>> +    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
>> +    xl_logical_message *message;
>> +
>> +    if (info != XLOG_LOGICAL_MESSAGE)
>> +        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
>> +
>> +    message = (xl_logical_message *) XLogRecGetData(r);
>> +
>> +    if (message->transactional)
>> +    {
>> +        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
>> +            return;
>> +
>> +        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
>> +                                  buf->endptr,
>> +                                  message->message, /* first part of message is prefix */
>> +                                  message->message_size,
>> +                                  message->message + message->prefix_size);
>> +    }
>> +    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
>> +             !SnapBuildXactNeedsSkip(builder, buf->origptr))
>> +    {
>> +        volatile Snapshot    snapshot_now;
>> +        ReorderBuffer       *rb = ctx->reorder;
>> +
>> +        /* setup snapshot to allow catalog access */
>> +        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
>> +        SetupHistoricSnapshot(snapshot_now, NULL);
>> +        rb->message(rb, NULL, buf->origptr, message->message,
>> +                    message->message_size,
>> +                    message->message + message->prefix_size);
>> +        TeardownHistoricSnapshot(false);
>> +    }
>> +}
>
> A number of things:
> 1) The SnapBuildProcessChange needs to be toplevel, not just for
>     transactional messages - we can't yet necessarily build a snapshot.

Nope, the snapshot state is checked in the else if.

> 2) I'm inclined to move even the non-transactional stuff to reorderbuffer.

Well, it's not doing anything with reorderbuffer but sure it can be done
(didn't do it in the attached though).

> 3) This lacks error handling, we surely don't want to error out while
>     still having the historic snapshot setup
> 4) Without 3) the volatile is bogus.
> 5) Misses a ReorderBufferProcessXid() call.

Fixed (all 3 above).

>
>
>> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>>                   change->data.tp.oldtuple = NULL;
>>               }
>>               break;
>> +        case REORDER_BUFFER_CHANGE_MESSAGE:
>> +            if (change->data.msg.prefix != NULL)
>> +                pfree(change->data.msg.prefix);
>> +            change->data.msg.prefix = NULL;
>> +            if (change->data.msg.message != NULL)
>> +                pfree(change->data.msg.message);
>> +            change->data.msg.message = NULL;
>> +            break;
>
> Hm, this will have some overhead, but I guess the messages won't be
> super frequent, and usually not very large.

Yeah but since we don't really know the size of the future messages it's
hard to have some preallocated buffer for this so I dunno how else to do it.

>
>> +/*
>> + * Queue message into a transaction so it can be processed upon commit.
>> + */
>> +void
>> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
>> +                          const char *prefix, Size msg_sz, const char *msg)
>> +{
>> +    ReorderBufferChange *change;
>> +
>> +    Assert(xid != InvalidTransactionId);
>> +
>> +    change = ReorderBufferGetChange(rb);
>> +    change->action = REORDER_BUFFER_CHANGE_MESSAGE;
>> +    change->data.msg.prefix = pstrdup(prefix);
>> +    change->data.msg.message_size = msg_sz;
>> +    change->data.msg.message = palloc(msg_sz);
>> +    memcpy(change->data.msg.message, msg, msg_sz);
>> +
>> +    ReorderBufferQueueChange(rb, xid, lsn, change);
>> +}
>
> I'm not sure right now if there's any guarantee that the current memory
> context is meaningful here? IIRC other long-lived allocations explicitly
> use a context?
>

I didn't find any explicit guarantee so I added one.

>> +        case REORDER_BUFFER_CHANGE_MESSAGE:
>> +            {
>> +                char       *data;
>> +                size_t        prefix_size = strlen(change->data.msg.prefix) + 1;
>> +
>> +                sz += prefix_size + change->data.msg.message_size;
>> +                ReorderBufferSerializeReserve(rb, sz);
>> +
>> +                data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
>> +                memcpy(data, change->data.msg.prefix,
>> +                       prefix_size);
>> +                memcpy(data + prefix_size, change->data.msg.message,
>> +                       change->data.msg.message_size);
>> +                break;
>> +            }
>
> Can you please include the sizes of the blocks explicitly, rather than
> relying on 0 termination?
>

Okay, I see I did that in WAL, no idea why I didn't do the same here.

>
>> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
>>   PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
>>   PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
>>   PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
>> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
>>   logicalmsg_desc, logicalmsg_identify, NULL, NULL)
>
> Did you consider doing this via the standby rmgr instead?
>

Yes in one of the first versions I did that but Simon didn't like that
in his review as this has nothing to do with standby.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote:
> On 22/03/16 12:47, Andres Freund wrote:
> >On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> >
> >>+
> >>+    <sect3 id="logicaldecoding-output-plugin-message">
> >>+     <title>Generic Message Callback</title>
> >>+
> >>+     <para>
> >>+      The optional <function>message_cb</function> callback is called whenever
> >>+      a logical decoding message has been decoded.
> >>+<programlisting>
> >>+typedef void (*LogicalDecodeMessageCB) (
> >>+    struct LogicalDecodingContext *,
> >>+    ReorderBufferTXN *txn,
> >>+    XLogRecPtr message_lsn,
> >>+    const char *prefix,
> >>+    Size message_size,
> >>+    const char *message
> >>+);
> >
> >I see you removed the transactional parameter. I'm doubtful that that's
> >a good idea: It seems like it'd be rather helpful to pass the
> >transaction for a nontransaction message that's emitted while an xid was
> >assigned?
> >
> 
> Hmm but won't that give the output plugin even transactions that were later
> aborted? That seems quite different behavior from how the txn parameter
> works everywhere else.

Seems harmless to me if called out.


> >
> >>+/*
> >>+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> >>+ */
> >>+static void
> >>+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> >>+{
> >>+    SnapBuild  *builder = ctx->snapshot_builder;
> >>+    XLogReaderState *r = buf->record;
> >>+    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> >>+    xl_logical_message *message;
> >>+
> >>+    if (info != XLOG_LOGICAL_MESSAGE)
> >>+        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
> >>+
> >>+    message = (xl_logical_message *) XLogRecGetData(r);
> >>+
> >>+    if (message->transactional)
> >>+    {
> >>+        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
> >>+            return;
> >>+
> >>+        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> >>+                                  buf->endptr,
> >>+                                  message->message, /* first part of message is prefix */
> >>+                                  message->message_size,
> >>+                                  message->message + message->prefix_size);
> >>+    }
> >>+    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> >>+             !SnapBuildXactNeedsSkip(builder, buf->origptr))
> >>+    {
> >>+        volatile Snapshot    snapshot_now;
> >>+        ReorderBuffer       *rb = ctx->reorder;
> >>+
> >>+        /* setup snapshot to allow catalog access */
> >>+        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
> >>+        SetupHistoricSnapshot(snapshot_now, NULL);
> >>+        rb->message(rb, NULL, buf->origptr, message->message,
> >>+                    message->message_size,
> >>+                    message->message + message->prefix_size);
> >>+        TeardownHistoricSnapshot(false);
> >>+    }
> >>+}
> >
> >A number of things:
> >1) The SnapBuildProcessChange needs to be toplevel, not just for
> >    transactional messages - we can't yet necessarily build a snapshot.
> 
> Nope, the snapshot state is checked in the else if.
> 
> >2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
> 
> Well, it's not doing anything with reorderbuffer but sure it can be done
> (didn't do it in the attached though).

I think there'll be some interactions if we ever do some parts in
parallel and such.  I'd rather keep decode.c to only do the lowest level
stuff.

Greetings,

Andres Freund



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 22/03/16 14:11, Andres Freund wrote:
> On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote:
>> On 22/03/16 12:47, Andres Freund wrote:
>>> On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
>>>
>>>> +
>>>> +    <sect3 id="logicaldecoding-output-plugin-message">
>>>> +     <title>Generic Message Callback</title>
>>>> +
>>>> +     <para>
>>>> +      The optional <function>message_cb</function> callback is called whenever
>>>> +      a logical decoding message has been decoded.
>>>> +<programlisting>
>>>> +typedef void (*LogicalDecodeMessageCB) (
>>>> +    struct LogicalDecodingContext *,
>>>> +    ReorderBufferTXN *txn,
>>>> +    XLogRecPtr message_lsn,
>>>> +    const char *prefix,
>>>> +    Size message_size,
>>>> +    const char *message
>>>> +);
>>>
>>> I see you removed the transactional parameter. I'm doubtful that that's
>>> a good idea: It seems like it'd be rather helpful to pass the
>>> transaction for a nontransaction message that's emitted while an xid was
>>> assigned?
>>>
>>
>> Hmm but won't that give the output plugin even transactions that were later
>> aborted? That seems quite different behavior from how the txn parameter
>> works everywhere else.
>
> Seems harmless to me if called out.
>

All right, after some consideration I agree.

>
>>>
>>>> +/*
>>>> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
>>>> + */
>>>> +static void
>>>> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>>>> +{
>>>> +    SnapBuild  *builder = ctx->snapshot_builder;
>>>> +    XLogReaderState *r = buf->record;
>>>> +    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
>>>> +    xl_logical_message *message;
>>>> +
>>>> +    if (info != XLOG_LOGICAL_MESSAGE)
>>>> +        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
>>>> +
>>>> +    message = (xl_logical_message *) XLogRecGetData(r);
>>>> +
>>>> +    if (message->transactional)
>>>> +    {
>>>> +        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
>>>> +            return;
>>>> +
>>>> +        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
>>>> +                                  buf->endptr,
>>>> +                                  message->message, /* first part of message is prefix */
>>>> +                                  message->message_size,
>>>> +                                  message->message + message->prefix_size);
>>>> +    }
>>>> +    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
>>>> +             !SnapBuildXactNeedsSkip(builder, buf->origptr))
>>>> +    {
>>>> +        volatile Snapshot    snapshot_now;
>>>> +        ReorderBuffer       *rb = ctx->reorder;
>>>> +
>>>> +        /* setup snapshot to allow catalog access */
>>>> +        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
>>>> +        SetupHistoricSnapshot(snapshot_now, NULL);
>>>> +        rb->message(rb, NULL, buf->origptr, message->message,
>>>> +                    message->message_size,
>>>> +                    message->message + message->prefix_size);
>>>> +        TeardownHistoricSnapshot(false);
>>>> +    }
>>>> +}
>>>
>>> A number of things:
>>> 1) The SnapBuildProcessChange needs to be toplevel, not just for
>>>     transactional messages - we can't yet necessarily build a snapshot.
>>
>> Nope, the snapshot state is checked in the else if.
>>
>>> 2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
>>
>> Well, it's not doing anything with reorderbuffer but sure it can be done
>> (didn't do it in the attached though).
>
> I think there'll be some interactions if we ever do some parts in
> parallel and such.  I'd rather keep decode.c to only do the lowest level
> stuff.
>

Did it that way but I do like the resulting code less.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Alvaro Herrera
Дата:
Petr Jelinek wrote:

> +++ b/contrib/test_decoding/sql/messages.sql
> @@ -0,0 +1,17 @@
> +-- predictability
> +SET synchronous_commit = on;
> +
> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
> +
> +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
> +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
> +
> +BEGIN;
> +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
> +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
> +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
> +COMMIT;
> +
> +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts',
'1');
> +
> +SELECT 'init' FROM pg_drop_replication_slot('regression_slot');

No tests for a rolled back transaction?

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 23/03/16 14:17, Alvaro Herrera wrote:
> Petr Jelinek wrote:
>
>> +++ b/contrib/test_decoding/sql/messages.sql
>> @@ -0,0 +1,17 @@
>> +-- predictability
>> +SET synchronous_commit = on;
>> +
>> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
>> +
>> +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
>> +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
>> +
>> +BEGIN;
>> +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
>> +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
>> +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
>> +COMMIT;
>> +
>> +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0',
'skip-empty-xacts','1'); 
>> +
>> +SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
>
> No tests for a rolled back transaction?
>

Good point, probably worth testing especially since we have
non-transactional messages.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
Hi,

I rebased this patch on top of current master as the generic wal commit
added some conflicting changes. Also fixed couple of typos in comments
and added non ascii message to test.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Simon Riggs
Дата:
On 4 April 2016 at 05:23, Petr Jelinek <petr@2ndquadrant.com> wrote:
 
I rebased this patch on top of current master as the generic wal commit added some conflicting changes. Also fixed couple of typos in comments and added non ascii message to test.

This looks good to me, so have marked it Ready For Committer.

I marked myself as Committer to show there was interest in this. If anyone else would like to commit it, I am happy for you to do so. 

--
Simon Riggs                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: Proposal: Generic WAL logical messages

От
Craig Ringer
Дата:
Committed. https://commitfest.postgresql.org/9/468/


Interesting issue. Mainly because the "ť" char it complains about (utf-8 0xc5 0xa5) is accepted in the SELECT that generates the record. If it's valid input it should be valid output, right? We didn't change the client_encoding in the mean time.  It makes sense though:

initdb on that animal says:

The database cluster will be initialized with locale "English_United States.1252".
The default database encoding has accordingly been set to "WIN1252".


The regress script in question sets:

SET client_encoding = 'utf8';

but we're apparently round-tripping the data through the database encoding at some point, then converting back to client_encoding for output.

Presumably that's when we're forming the text 'data' column in the tuplestore produced by the get changes function, which will be in the database encoding.

So setting client_encoding is not sufficient to make this work and the non-7-bit-ascii part should be removed from the test, since it's not allowed on all machines.


In some ways it seems like the argument to pg_logical_emit_message(...) should be 'bytea'. That'd be much more convenient for application use. But then it's a pain when using it via the text-format SQL interface calls, where we've got no sensible way to output it.

--
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services

Re: Proposal: Generic WAL logical messages

От
Tom Lane
Дата:
Craig Ringer <craig@2ndquadrant.com> writes:
> Interesting issue. Mainly because the "ť" char it complains about
> (utf-8 0xc5 0xa5) is accepted in the SELECT that generates the record.

Uh, no, actually it's the SELECT that's failing.

> The regress script in question sets:
> SET client_encoding = 'utf8';
> but we're apparently round-tripping the data through the database encoding
> at some point, then converting back to client_encoding for output.

The conversion to DB encoding will happen the instant the query string
reaches the database.  You can set client_encoding to whatever you want,
but the only characters that can appear in queries are those that exist
in both the client encoding and the database encoding.

> In some ways it seems like the argument to pg_logical_emit_message(...) should
> be 'bytea'. That'd be much more convenient for application use. But then
> it's a pain when using it via the text-format SQL interface calls, where
> we've got no sensible way to output it.

Well, that's something worth thinking about.  I assume that
pg_logical_slot_get_changes could be executed in a database different from
the one where a change was originated?  What's going to happen if a string
in WAL contains characters unrepresentable in that database?  Do we even
have logic in there that will attempt to perform the necessary conversion?
And it is *necessary*, not optional, if you are going to claim that the
output of pg_logical_slot_get_changes is of type text.
        regards, tom lane



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> > In some ways it seems like the argument to pg_logical_emit_message(...) should
> > be 'bytea'. That'd be much more convenient for application use. But then
> > it's a pain when using it via the text-format SQL interface calls, where
> > we've got no sensible way to output it.

There's a bytea version.

> Well, that's something worth thinking about.  I assume that
> pg_logical_slot_get_changes could be executed in a database different from
> the one where a change was originated?

You can execute it, but you'll get an error:if (slot->data.database != MyDatabaseId)    ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),     (errmsg("replication slot \"%s\" was not created in this
database",             NameStr(slot->data.name)))));
 


Greetings,

Andres Freund



Re: Proposal: Generic WAL logical messages

От
Tom Lane
Дата:
Andres Freund <andres@anarazel.de> writes:
> On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
>> Well, that's something worth thinking about.  I assume that
>> pg_logical_slot_get_changes could be executed in a database different from
>> the one where a change was originated?

> You can execute it, but you'll get an error:

Oh good.  I was afraid we had an unrecognized can o' worms here.
        regards, tom lane



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-04-06 16:20:29 +0200, Andres Freund wrote:
> On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> > > In some ways it seems like the argument to pg_logical_emit_message(...) should
> > > be 'bytea'. That'd be much more convenient for application use. But then
> > > it's a pain when using it via the text-format SQL interface calls, where
> > > we've got no sensible way to output it.
> 
> There's a bytea version.
> 
> > Well, that's something worth thinking about.  I assume that
> > pg_logical_slot_get_changes could be executed in a database different from
> > the one where a change was originated?
> 
> You can execute it, but you'll get an error:
>     if (slot->data.database != MyDatabaseId)
>         ereport(ERROR,
>                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>           (errmsg("replication slot \"%s\" was not created in this database",
>                   NameStr(slot->data.name)))));

Or so I thought. A look at the code shows a lack of database filtering
in DecodeLogicalMsgOp().  I think it also misses a FilterByOrigin()
check.

Greetings,

Andres Freund



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-04-06 10:24:52 -0400, Tom Lane wrote:
> Andres Freund <andres@anarazel.de> writes:
> > On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> >> Well, that's something worth thinking about.  I assume that
> >> pg_logical_slot_get_changes could be executed in a database different from
> >> the one where a change was originated?
> 
> > You can execute it, but you'll get an error:
> 
> Oh good.  I was afraid we had an unrecognized can o' worms here.

As posted nearby, there's a hole in that defense; for the messages
only. Pretty easy to solve though.

Allowing logical decoding from a difference would have a lot of
problems; primarily we couldn't actually look up any catalog state. But
even leaving that aside, it'd end up being pretty hard to interpret
database contents without knowledge about encoding.

Andres



Re: Proposal: Generic WAL logical messages

От
Simon Riggs
Дата:
On 6 April 2016 at 15:29, Andres Freund <andres@anarazel.de> wrote:
On 2016-04-06 10:24:52 -0400, Tom Lane wrote:
> Andres Freund <andres@anarazel.de> writes:
> > On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> >> Well, that's something worth thinking about.  I assume that
> >> pg_logical_slot_get_changes could be executed in a database different from
> >> the one where a change was originated?
>
> > You can execute it, but you'll get an error:
>
> Oh good.  I was afraid we had an unrecognized can o' worms here.

As posted nearby, there's a hole in that defense; for the messages
only. Pretty easy to solve though.

My instinct was to put in a test for non-ascii text; even if we can't keep that test, it has highlighted a hole we wouldn't have spotted for a while, so I'll call that "good catch" then.

Perhaps easy to solve, but how do we test it is solved?
 
--
Simon Riggs                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
> Perhaps easy to solve, but how do we test it is solved?

Maybe something like

-- drain
pg_logical_slot_get_changes(...);
-- generate message in different database, to ensure it's not processed
-- in this database
\c template1
SELECT pg_logical_emit_message(...);
\c postgres
-- check
pg_logical_slot_get_changes(..);

It's a bit ugly to hardcode database names :/

Andres



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 06/04/16 17:55, Andres Freund wrote:
> On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
>> Perhaps easy to solve, but how do we test it is solved?
>
> Maybe something like
>
> -- drain
> pg_logical_slot_get_changes(...);
> -- generate message in different database, to ensure it's not processed
> -- in this database
> \c template1
> SELECT pg_logical_emit_message(...);
> \c postgres
> -- check
> pg_logical_slot_get_changes(..);
>
> It's a bit ugly to hardcode database names :/
>

Attached patch adds filtering of both database and origin. Added tests
with slightly less hardcoding than what you proposed above.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Michael Paquier
Дата:
On Thu, Apr 7, 2016 at 12:55 AM, Andres Freund <andres@anarazel.de> wrote:
> On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
>> Perhaps easy to solve, but how do we test it is solved?
>
> Maybe something like
>
> -- drain
> pg_logical_slot_get_changes(...);
> -- generate message in different database, to ensure it's not processed
> -- in this database
> \c template1
> SELECT pg_logical_emit_message(...);
> \c postgres
> -- check
> pg_logical_slot_get_changes(..);
>
> It's a bit ugly to hardcode database names :/

When running installcheck, there is no way to be sure that databases
template1 and/or postgres exist on a server, so this test would fail
because of that.
-- 
Michael



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:

On April 7, 2016 2:26:41 AM GMT+02:00, Michael Paquier <michael.paquier@gmail.com> wrote:
>On Thu, Apr 7, 2016 at 12:55 AM, Andres Freund <andres@anarazel.de>
>wrote:
>> On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
>>> Perhaps easy to solve, but how do we test it is solved?
>>
>> Maybe something like
>>
>> -- drain
>> pg_logical_slot_get_changes(...);
>> -- generate message in different database, to ensure it's not
>processed
>> -- in this database
>> \c template1
>> SELECT pg_logical_emit_message(...);
>> \c postgres
>> -- check
>> pg_logical_slot_get_changes(..);
>>
>> It's a bit ugly to hardcode database names :/
>
>When running installcheck, there is no way to be sure that databases
>template1 and/or postgres exist on a server, so this test would fail
>because of that.

No need to hardcode postgres, see Petr's reply. I'm not concerned about template 1 not being there -if you tinkered
withthings in that level it's unlikely that tests will succeed.  Also, remember, this is in a test cluster created by
theregression script, and there's no installcheck support anyway (because of the required settings for logical
decoding)anyway
 
-- 
Sent from my Android device with K-9 Mail. Please excuse my brevity.



Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
Hi,

On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:
> Attached patch adds filtering of both database and origin. Added tests with
> slightly less hardcoding than what you proposed above.

Not a fan of creating & dropping another database - that's really rather
expensive. And we're in a target that doesn't support installcheck, so
relying on template1's existance seems fine...


> diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
> index 70130e9..a5b13c5 100644
> --- a/contrib/test_decoding/expected/messages.out
> +++ b/contrib/test_decoding/expected/messages.out
> @@ -1,6 +1,5 @@
>  -- predictability
>  SET synchronous_commit = on;
> -SET client_encoding = 'utf8';

I guess that's just from the previous test with czech text?

Greetings,

Andres Freund



Re: Proposal: Generic WAL logical messages

От
Petr Jelinek
Дата:
On 07/04/16 12:26, Andres Freund wrote:
> Hi,
>
> On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:
>> Attached patch adds filtering of both database and origin. Added tests with
>> slightly less hardcoding than what you proposed above.
>
> Not a fan of creating & dropping another database - that's really rather
> expensive. And we're in a target that doesn't support installcheck, so
> relying on template1's existance seems fine...
>

Makes sense, changed that bit.

>
>> diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
>> index 70130e9..a5b13c5 100644
>> --- a/contrib/test_decoding/expected/messages.out
>> +++ b/contrib/test_decoding/expected/messages.out
>> @@ -1,6 +1,5 @@
>>   -- predictability
>>   SET synchronous_commit = on;
>> -SET client_encoding = 'utf8';
>
> I guess that's just from the previous test with czech text?
>

Yeah it's cleanup after the d25379eb23383f1d2f969e65e0332b47c19aea94 .

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

Re: Proposal: Generic WAL logical messages

От
Andres Freund
Дата:
On 2016-04-07 19:53:56 +0200, Petr Jelinek wrote:
> On 07/04/16 12:26, Andres Freund wrote:
> >Hi,
> >
> >On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:
> >>Attached patch adds filtering of both database and origin. Added tests with
> >>slightly less hardcoding than what you proposed above.
> >
> >Not a fan of creating & dropping another database - that's really rather
> >expensive. And we're in a target that doesn't support installcheck, so
> >relying on template1's existance seems fine...
> >
> 
> Makes sense, changed that bit.

I've pushed this.  I also noticed that both this patch (that's ok,
committers commonly do that, but a reminder is nice) and the original
commit ommitted to bump XLOG_PAGE_MAGIC.  The original commit also
omitted bumping catversion. btw.

Thanks for the patch.

Andres