Re: Proposal: Generic WAL logical messages

Поиск
Список
Период
Сортировка
От Petr Jelinek
Тема Re: Proposal: Generic WAL logical messages
Дата
Msg-id 56F1428A.7080506@2ndquadrant.com
обсуждение исходный текст
Ответ на Re: Proposal: Generic WAL logical messages  (Andres Freund <andres@anarazel.de>)
Ответы Re: Proposal: Generic WAL logical messages  (Andres Freund <andres@anarazel.de>)
Список pgsql-hackers
On 22/03/16 12:47, Andres Freund wrote:
> On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
>
>> +
>> +    <sect3 id="logicaldecoding-output-plugin-message">
>> +     <title>Generic Message Callback</title>
>> +
>> +     <para>
>> +      The optional <function>message_cb</function> callback is called whenever
>> +      a logical decoding message has been decoded.
>> +<programlisting>
>> +typedef void (*LogicalDecodeMessageCB) (
>> +    struct LogicalDecodingContext *,
>> +    ReorderBufferTXN *txn,
>> +    XLogRecPtr message_lsn,
>> +    const char *prefix,
>> +    Size message_size,
>> +    const char *message
>> +);
>
> I see you removed the transactional parameter. I'm doubtful that that's
> a good idea: It seems like it'd be rather helpful to pass the
> transaction for a nontransaction message that's emitted while an xid was
> assigned?
>

Hmm but won't that give the output plugin even transactions that were
later aborted? That seems quite different behavior from how the txn
parameter works everywhere else.

>
>> +/*
>> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
>> + */
>> +static void
>> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
>> +    SnapBuild  *builder = ctx->snapshot_builder;
>> +    XLogReaderState *r = buf->record;
>> +    uint8        info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
>> +    xl_logical_message *message;
>> +
>> +    if (info != XLOG_LOGICAL_MESSAGE)
>> +        elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
>> +
>> +    message = (xl_logical_message *) XLogRecGetData(r);
>> +
>> +    if (message->transactional)
>> +    {
>> +        if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
>> +            return;
>> +
>> +        ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
>> +                                  buf->endptr,
>> +                                  message->message, /* first part of message is prefix */
>> +                                  message->message_size,
>> +                                  message->message + message->prefix_size);
>> +    }
>> +    else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
>> +             !SnapBuildXactNeedsSkip(builder, buf->origptr))
>> +    {
>> +        volatile Snapshot    snapshot_now;
>> +        ReorderBuffer       *rb = ctx->reorder;
>> +
>> +        /* setup snapshot to allow catalog access */
>> +        snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
>> +        SetupHistoricSnapshot(snapshot_now, NULL);
>> +        rb->message(rb, NULL, buf->origptr, message->message,
>> +                    message->message_size,
>> +                    message->message + message->prefix_size);
>> +        TeardownHistoricSnapshot(false);
>> +    }
>> +}
>
> A number of things:
> 1) The SnapBuildProcessChange needs to be toplevel, not just for
>     transactional messages - we can't yet necessarily build a snapshot.

Nope, the snapshot state is checked in the else if.

> 2) I'm inclined to move even the non-transactional stuff to reorderbuffer.

Well, it's not doing anything with reorderbuffer but sure it can be done
(didn't do it in the attached though).

> 3) This lacks error handling, we surely don't want to error out while
>     still having the historic snapshot setup
> 4) Without 3) the volatile is bogus.
> 5) Misses a ReorderBufferProcessXid() call.

Fixed (all 3 above).

>
>
>> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>>                   change->data.tp.oldtuple = NULL;
>>               }
>>               break;
>> +        case REORDER_BUFFER_CHANGE_MESSAGE:
>> +            if (change->data.msg.prefix != NULL)
>> +                pfree(change->data.msg.prefix);
>> +            change->data.msg.prefix = NULL;
>> +            if (change->data.msg.message != NULL)
>> +                pfree(change->data.msg.message);
>> +            change->data.msg.message = NULL;
>> +            break;
>
> Hm, this will have some overhead, but I guess the messages won't be
> super frequent, and usually not very large.

Yeah but since we don't really know the size of the future messages it's
hard to have some preallocated buffer for this so I dunno how else to do it.

>
>> +/*
>> + * Queue message into a transaction so it can be processed upon commit.
>> + */
>> +void
>> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
>> +                          const char *prefix, Size msg_sz, const char *msg)
>> +{
>> +    ReorderBufferChange *change;
>> +
>> +    Assert(xid != InvalidTransactionId);
>> +
>> +    change = ReorderBufferGetChange(rb);
>> +    change->action = REORDER_BUFFER_CHANGE_MESSAGE;
>> +    change->data.msg.prefix = pstrdup(prefix);
>> +    change->data.msg.message_size = msg_sz;
>> +    change->data.msg.message = palloc(msg_sz);
>> +    memcpy(change->data.msg.message, msg, msg_sz);
>> +
>> +    ReorderBufferQueueChange(rb, xid, lsn, change);
>> +}
>
> I'm not sure right now if there's any guarantee that the current memory
> context is meaningful here? IIRC other long-lived allocations explicitly
> use a context?
>

I didn't find any explicit guarantee so I added one.

>> +        case REORDER_BUFFER_CHANGE_MESSAGE:
>> +            {
>> +                char       *data;
>> +                size_t        prefix_size = strlen(change->data.msg.prefix) + 1;
>> +
>> +                sz += prefix_size + change->data.msg.message_size;
>> +                ReorderBufferSerializeReserve(rb, sz);
>> +
>> +                data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
>> +                memcpy(data, change->data.msg.prefix,
>> +                       prefix_size);
>> +                memcpy(data + prefix_size, change->data.msg.message,
>> +                       change->data.msg.message_size);
>> +                break;
>> +            }
>
> Can you please include the sizes of the blocks explicitly, rather than
> relying on 0 termination?
>

Okay, I see I did that in WAL, no idea why I didn't do the same here.

>
>> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
>>   PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
>>   PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
>>   PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
>> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
>>   logicalmsg_desc, logicalmsg_identify, NULL, NULL)
>
> Did you consider doing this via the standby rmgr instead?
>

Yes in one of the first versions I did that but Simon didn't like that
in his review as this has nothing to do with standby.

--
   Petr Jelinek                  http://www.2ndQuadrant.com/
   PostgreSQL Development, 24x7 Support, Training & Services

Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: Re: NOT EXIST for PREPARE
Следующее
От: Fujii Masao
Дата:
Сообщение: Re: trivial typo in vacuum progress doc