Re: [HACKERS] logical decoding of two-phase transactions

Поиск
Список
Период
Сортировка
От Nikhil Sontakke
Тема Re: [HACKERS] logical decoding of two-phase transactions
Дата
Msg-id CAMGcDxc9p7envO8t+29j=NdQXHoba6P1btfU0d3Xeiz7zT-Mvw@mail.gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] logical decoding of two-phase transactions  (Andres Freund <andres@anarazel.de>)
Ответы Re: [HACKERS] logical decoding of two-phase transactions  (Andres Freund <andres@anarazel.de>)
Список pgsql-hackers
Hi Andres,

> First off: This patch has way too many different types of changes as
> part of one huge commit. This needs to be split into several
> pieces. First the cleanups (e.g. the fields -> flag changes), then the
> individual infrastructure pieces (like the twophase.c changes, best
> split into several pieces as well, the locking stuff), then the main
> feature, then support for it in the output plugin.  Each should have an
> individual explanation about why the change is necessary and not a bad
> idea.
>

Ok, I will break this patch into multiple logical pieces and re-submit.

>
> On 2018-02-06 17:50:40 +0530, Nikhil Sontakke wrote:
>> @@ -46,6 +48,9 @@ typedef struct
>>       bool            skip_empty_xacts;
>>       bool            xact_wrote_changes;
>>       bool            only_local;
>> +     bool            twophase_decoding;
>> +     bool            twophase_decode_with_catalog_changes;
>> +     int                     decode_delay; /* seconds to sleep after every change record */
>
> This seems too big a crock to add just for testing. It'll also make the
> testing timing dependent...
>

The idea *was* to make testing timing dependent. We wanted to simulate
the case when a rollback is issued by another backend while the
decoding is still ongoing. This allows that test case to be tested.


>>  } TestDecodingData;
>
>>  void
>>  _PG_init(void)
>> @@ -85,9 +106,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>>       cb->begin_cb = pg_decode_begin_txn;
>>       cb->change_cb = pg_decode_change;
>>       cb->commit_cb = pg_decode_commit_txn;
>> +     cb->abort_cb = pg_decode_abort_txn;
>
>>       cb->filter_by_origin_cb = pg_decode_filter;
>>       cb->shutdown_cb = pg_decode_shutdown;
>>       cb->message_cb = pg_decode_message;
>> +     cb->filter_prepare_cb = pg_filter_prepare;
>> +     cb->filter_decode_txn_cb = pg_filter_decode_txn;
>> +     cb->prepare_cb = pg_decode_prepare_txn;
>> +     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
>> +     cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
>>  }
>
> Why does this introduce both abort_cb and abort_prepared_cb? That seems
> to conflate two separate features.
>

Consider the case when we have a bunch of change records to apply for
a transaction. We sent a "BEGIN" and then start decoding each change
record one by one. Now a rollback was encountered while we were
decoding. In that case it doesn't make sense to keep on decoding and
sending the change records. We immediately send a regular ABORT. We
cannot send "ROLLBACK PREPARED" because the transaction was not
prepared on the subscriber and have to send a regular ABORT instead.
And we need the "ROLLBACK PREPARED" callback for the case when a
prepared transaction gets rolled back and is encountered during the
usual WAL processing.

Please take a look at "contrib/test_decoding/t/001_twophase.pl" where
this test case is enacted.

>
>> +/* Filter out unnecessary two-phase transactions */
>> +static bool
>> +pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>> +                                     TransactionId xid, const char *gid)
>> +{
>> +     TestDecodingData *data = ctx->output_plugin_private;
>> +
>> +     /* treat all transactions as one-phase */
>> +     if (!data->twophase_decoding)
>> +             return true;
>> +
>> +     if (txn && txn_has_catalog_changes(txn) &&
>> +                     !data->twophase_decode_with_catalog_changes)
>> +             return true;
>
> What? I'm INCREDIBLY doubtful this is a sane thing to expose to output
> plugins. As in, unless I hear a very very convincing reason I'm strongly
> opposed.
>

These bools are specific to the test_decoding plugin.

Again, these are useful in testing decoding in various scenarios with
twophase decoding enabled/disabled. Testing decoding when catalog
changes are allowed/disallowed etc. Please take a look at
"contrib/test_decoding/sql/prepared.sql" for the various scenarios.


>
>> +/*
>> + * Check if we should continue to decode this transaction.
>> + *
>> + * If it has aborted in the meanwhile, then there's no sense
>> + * in decoding and sending the rest of the changes, we might
>> + * as well ask the subscribers to abort immediately.
>> + *
>> + * This should be called if we are streaming a transaction
>> + * before it's committed or if we are decoding a 2PC
>> + * transaction. Otherwise we always decode committed
>> + * transactions
>> + *
>> + * Additional checks can be added here, as needed
>> + */
>> +static bool
>> +pg_filter_decode_txn(LogicalDecodingContext *ctx,
>> +                                        ReorderBufferTXN *txn)
>> +{
>> +     /*
>> +      * Due to caching, repeated TransactionIdDidAbort calls
>> +      * shouldn't be that expensive
>> +      */
>> +     if (txn != NULL &&
>> +                     TransactionIdIsValid(txn->xid) &&
>> +                     TransactionIdDidAbort(txn->xid))
>> +                     return true;
>> +
>> +     /* if txn is NULL, filter it out */
>
> Why can this be NULL?
>

Depending on parameters passed to the ReorderBufferTXNByXid()
function, the txn might be NULL in some cases, especially during
restarts.

>> +     return (txn != NULL)? false:true;
>> +}
>
>
> This definitely shouldn't be a task for each output plugin. Even if we
> want to make this configurable, I'm doubtful that it's a good idea to do
> so here - make its much less likely to hit edge cases.
>

Agreed, I will try to add it to the core logical decoding handling.

>
>
>>  static bool
>>  pg_decode_filter(LogicalDecodingContext *ctx,
>>                                RepOriginId origin_id)
>> @@ -409,8 +622,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>>       }
>>       data->xact_wrote_changes = true;
>>
>> +     if (!LogicalLockTransaction(txn))
>> +             return;
>
> It really really can't be right that this is exposed to output plugins.
>

This was discussed in the other thread
(http://www.postgresql-archive.org/Logical-Decoding-and-HeapTupleSatisfiesVacuum-assumptions-td5998294i20.html).
Any catalog access in any plugins need to interlock with concurrent
aborts. This is only a problem if the transaction is a prepared one or
yet uncommitted one. Rest of the majority of the cases, this function
will do nothing at all.


>
>> +     /* if decode_delay is specified, sleep with above lock held */
>> +     if (data->decode_delay > 0)
>> +     {
>> +             elog(LOG, "sleeping for %d seconds", data->decode_delay);
>> +             pg_usleep(data->decode_delay * 1000000L);
>> +     }
>
> Really not on board.
>

Again, specific to test_decoding plugin. We want to test the
interlocking code for concurrent abort handling which needs to wait
out for plugins in locked state before allowing the rollback to go
ahead. Please take a look at "contrib/test_decoding/t/001_twophase.pl"
and "Waiting for backends to abort" string.

>
>
>
>> @@ -1075,6 +1077,21 @@ EndPrepare(GlobalTransaction gxact)
>>       Assert(hdr->magic == TWOPHASE_MAGIC);
>>       hdr->total_len = records.total_len + sizeof(pg_crc32c);
>>
>> +     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
>> +                               replorigin_session_origin != DoNotReplicateId);
>> +
>> +     if (replorigin)
>> +     {
>> +             Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
>> +             hdr->origin_lsn = replorigin_session_origin_lsn;
>> +             hdr->origin_timestamp = replorigin_session_origin_timestamp;
>> +     }
>> +     else
>> +     {
>> +             hdr->origin_lsn = InvalidXLogRecPtr;
>> +             hdr->origin_timestamp = 0;
>> +     }
>> +
>>       /*
>>        * If the data size exceeds MaxAllocSize, we won't be able to read it in
>>        * ReadTwoPhaseFile. Check for that now, rather than fail in the case
>> @@ -1107,7 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
>>       XLogBeginInsert();
>>       for (record = records.head; record != NULL; record = record->next)
>>               XLogRegisterData(record->data, record->len);
>> +
>> +     XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
>> +
>
> Can we perhaps merge a bit of the code with the plain commit path on
> this?
>

Given that PREPARE ROLLBACK handling is totally separate from the
regular commit code paths, wouldn't it be a little difficult?

>
>>       gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
>> +
>> +     if (replorigin)
>> +             /* Move LSNs forward for this replication origin */
>> +             replorigin_session_advance(replorigin_session_origin_lsn,
>> +                                                                gxact->prepare_end_lsn);
>> +
>
> Why is it ok to do this at PREPARE time? I guess the theory is that the
> origin LSN is going to be from the sources PREPARE too? If so, this
> needs to be commented upon here.
>

Ok, will add a comment.

>
>> +/*
>> + * ParsePrepareRecord
>> + */
>> +void
>> +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
>> +{
>> +     TwoPhaseFileHeader *hdr;
>> +     char *bufptr;
>> +
>> +     hdr = (TwoPhaseFileHeader *) xlrec;
>> +     bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
>> +
>> +     parsed->origin_lsn = hdr->origin_lsn;
>> +     parsed->origin_timestamp = hdr->origin_timestamp;
>> +     parsed->twophase_xid = hdr->xid;
>> +     parsed->dbId = hdr->database;
>> +     parsed->nsubxacts = hdr->nsubxacts;
>> +     parsed->ncommitrels = hdr->ncommitrels;
>> +     parsed->nabortrels = hdr->nabortrels;
>> +     parsed->nmsgs = hdr->ninvalmsgs;
>> +
>> +     strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
>> +     bufptr += MAXALIGN(hdr->gidlen);
>> +
>> +     parsed->subxacts = (TransactionId *) bufptr;
>> +     bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
>> +
>> +     parsed->commitrels = (RelFileNode *) bufptr;
>> +     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
>> +
>> +     parsed->abortrels = (RelFileNode *) bufptr;
>> +     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
>> +
>> +     parsed->msgs = (SharedInvalidationMessage *) bufptr;
>> +     bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
>> +}
>
> So this is now basically a commit record. I quite dislike duplicating
> things this way. Can't we make commit records versatile enough to
> represent this without problems?
>

Maybe we can. We have already re-used existing records for
XLOG_XACT_COMMIT_PREPARED and XLOG_XACT_ABORT_PREPARED. We can add a
flag to existing commit records to indicate that it's a PREPARE and
not a COMMIT.

>
>>  /*
>>   * Reads 2PC data from xlog. During checkpoint this data will be moved to
>> @@ -1365,7 +1428,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
>>   * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
>>   */
>>  void
>> -FinishPreparedTransaction(const char *gid, bool isCommit)
>> +FinishPreparedTransaction(const char *gid, bool isCommit, bool missing_ok)
>>  {
>>       GlobalTransaction gxact;
>>       PGPROC     *proc;
>> @@ -1386,8 +1449,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
>>       /*
>>        * Validate the GID, and lock the GXACT to ensure that two backends do not
>>        * try to commit the same GID at once.
>> +      *
>> +      * During logical decoding, on the apply side, it's possible that a prepared
>> +      * transaction got aborted while decoding. In that case, we stop the
>> +      * decoding and abort the transaction immediately. However the ROLLBACK
>> +      * prepared processing still reaches the subscriber. In that case it's ok
>> +      * to have a missing gid
>>        */
>> -     gxact = LockGXact(gid, GetUserId());
>> +     gxact = LockGXact(gid, GetUserId(), missing_ok);
>> +     if (gxact == NULL)
>> +     {
>> +             Assert(missing_ok && !isCommit);
>> +             return;
>> +     }
>
> I'm very doubtful it is sane to handle this at such a low level.
>

FinishPreparedTransaction() is called directly from ProcessUtility. If
not here, where else could we do this?

>
>> @@ -2358,6 +2443,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
>>       Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
>>       TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
>>
>> +     if (origin_id != InvalidRepOriginId)
>> +     {
>> +             /* recover apply progress */
>> +             replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
>> +                                                false /* backward */ , false /* WAL */ );
>> +     }
>> +
>
> It's unclear to me why this is necessary / a good idea?
>

Keeping PREPARE handling as close to regular COMMIT handling seems
like a good idea, no?

>
>
>>               case XLOG_XACT_PREPARE:
>> +                     {
>> +                             xl_xact_parsed_prepare parsed;
>>
>> -                     /*
>> -                      * Currently decoding ignores PREPARE TRANSACTION and will just
>> -                      * decode the transaction when the COMMIT PREPARED is sent or
>> -                      * throw away the transaction's contents when a ROLLBACK PREPARED
>> -                      * is received. In the future we could add code to expose prepared
>> -                      * transactions in the changestream allowing for a kind of
>> -                      * distributed 2PC.
>> -                      */
>> -                     ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
>> +                             /* check that output plugin is capable of twophase decoding */
>> +                             if (!ctx->enable_twophase)
>> +                             {
>> +                                     ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
>> +                                     break;
>> +                             }
>> +
>> +                             /* ok, parse it */
>> +                             ParsePrepareRecord(XLogRecGetInfo(buf->record),
>> +                                                                XLogRecGetData(buf->record), &parsed);
>> +
>> +                             /* does output plugin want this particular transaction? */
>> +                             if (ctx->callbacks.filter_prepare_cb &&
>> +                                     ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
>> +
parsed.twophase_gid))
>> +                             {
>> +                                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
>> +                                                                                     buf->origptr);
>
> We're calling ReorderBufferProcessXid() on two different xids in
> different branches, is that intentional?
>

Don't think that's intentional. Maybe Stas can also provide his views on this?

>> +     if (TransactionIdIsValid(parsed->twophase_xid) &&
>> +             ReorderBufferTxnIsPrepared(ctx->reorder,
>> +                                                                parsed->twophase_xid, parsed->twophase_gid))
>> +     {
>> +             Assert(xid == parsed->twophase_xid);
>> +             /* we are processing COMMIT PREPARED */
>> +             ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
>> +                                     commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
>> +     }
>> +     else
>> +     {
>> +             /* replay actions of all transaction + subtransactions in order */
>> +             ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
>> +                                                     commit_time, origin_id, origin_lsn);
>> +     }
>> +}
>
> Why do we want this via the same routine?
>

As I mentioned above, xl_xact_parsed_commit handles both regular
commits and also "COMMIT PREPARED". That's why one routine for them
both.

>
>
>> +bool
>> +LogicalLockTransaction(ReorderBufferTXN *txn)
>> +{
>> +     bool    ok = false;
>> +
>> +     /*
>> +      * Prepared transactions and uncommitted transactions
>> +      * that have modified catalogs need to interlock with
>> +      * concurrent rollback to ensure that there are no
>> +      * issues while decoding
>> +      */
>> +
>> +     if (!txn_has_catalog_changes(txn))
>> +             return true;
>> +
>> +     /*
>> +      * Is it a prepared txn? Similar checks for uncommitted
>> +      * transactions when we start supporting them
>> +      */
>> +     if (!txn_prepared(txn))
>> +             return true;
>> +
>> +     /* check cached status */
>> +     if (txn_commit(txn))
>> +             return true;
>> +     if (txn_rollback(txn))
>> +             return false;
>> +
>> +     /*
>> +      * Find the PROC that is handling this XID and add ourself as a
>> +      * decodeGroupMember
>> +      */
>> +     if (MyProc->decodeGroupLeader == NULL)
>> +     {
>> +             PGPROC *proc = BecomeDecodeGroupLeader(txn->xid, txn_prepared(txn));
>> +
>> +             /*
>> +              * If decodeGroupLeader is NULL, then the only possibility
>> +              * is that the transaction completed and went away
>> +              */
>> +             if (proc == NULL)
>> +             {
>> +                     Assert(!TransactionIdIsInProgress(txn->xid));
>> +                     if (TransactionIdDidCommit(txn->xid))
>> +                     {
>> +                             txn->txn_flags |= TXN_COMMIT;
>> +                             return true;
>> +                     }
>> +                     else
>> +                     {
>> +                             txn->txn_flags |= TXN_ROLLBACK;
>> +                             return false;
>> +                     }
>> +             }
>> +
>> +             /* Add ourself as a decodeGroupMember */
>> +             if (!BecomeDecodeGroupMember(proc, proc->pid, txn_prepared(txn)))
>> +             {
>> +                     Assert(!TransactionIdIsInProgress(txn->xid));
>> +                     if (TransactionIdDidCommit(txn->xid))
>> +                     {
>> +                             txn->txn_flags |= TXN_COMMIT;
>> +                             return true;
>> +                     }
>> +                     else
>> +                     {
>> +                             txn->txn_flags |= TXN_ROLLBACK;
>> +                             return false;
>> +                     }
>> +             }
>> +     }
>
> Are we ok with this low-level lock / pgproc stuff happening outside of
> procarray / lock related files?  Where is the locking scheme documented?
>

Some details are in src/include/storage/proc.h where these fields have
been added.

This implementation is similar to the existing lockGroupLeader
implementation and uses the same locking mechanism using
LockHashPartitionLockByProc.

>
>
>> +/* ReorderBufferTXN flags */
>> +#define TXN_HAS_CATALOG_CHANGES 0x0001
>> +#define TXN_IS_SUBXACT          0x0002
>> +#define TXN_SERIALIZED          0x0004
>> +#define TXN_PREPARE             0x0008
>> +#define TXN_COMMIT_PREPARED     0x0010
>> +#define TXN_ROLLBACK_PREPARED   0x0020
>> +#define TXN_COMMIT              0x0040
>> +#define TXN_ROLLBACK            0x0080
>> +
>> +/* does the txn have catalog changes */
>> +#define txn_has_catalog_changes(txn) (txn->txn_flags & TXN_HAS_CATALOG_CHANGES)
>> +/* is the txn known as a subxact? */
>> +#define txn_is_subxact(txn)          (txn->txn_flags & TXN_IS_SUBXACT)
>> +/*
>> + * Has this transaction been spilled to disk?  It's not always possible to
>> + * deduce that fact by comparing nentries with nentries_mem, because e.g.
>> + * subtransactions of a large transaction might get serialized together
>> + * with the parent - if they're restored to memory they'd have
>> + * nentries_mem == nentries.
>> + */
>> +#define txn_is_serialized(txn)       (txn->txn_flags & TXN_SERIALIZED)
>> +/* is this txn prepared? */
>> +#define txn_prepared(txn)            (txn->txn_flags & TXN_PREPARE)
>> +/* was this prepared txn committed in the meanwhile? */
>> +#define txn_commit_prepared(txn)     (txn->txn_flags & TXN_COMMIT_PREPARED)
>> +/* was this prepared txn aborted in the meanwhile? */
>> +#define txn_rollback_prepared(txn)   (txn->txn_flags & TXN_ROLLBACK_PREPARED)
>> +/* was this txn committed in the meanwhile? */
>> +#define txn_commit(txn)              (txn->txn_flags & TXN_COMMIT)
>> +/* was this prepared txn aborted in the meanwhile? */
>> +#define txn_rollback(txn)            (txn->txn_flags & TXN_ROLLBACK)
>> +
>
> These txn_* names seem too generic imo - fairly likely to conflict with
> other pieces of code imo.
>

Happy to add the RB prefix to all of them for clarity. E.g.

/* ReorderBufferTXN flags */
#define RBTXN_HAS_CATALOG_CHANGES 0x0001

I will submit multiple patches with cleanups where needed as discussed
above soon.

Regards,
Nikhils
-- 
 Nikhil Sontakke                   http://www.2ndQuadrant.com/
 PostgreSQL/Postgres-XL Development, 24x7 Support, Training & Services


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Pavan Deolasee
Дата:
Сообщение: Re: [HACKERS] MERGE SQL Statement for PG11
Следующее
От: Aleksandr Parfenov
Дата:
Сообщение: Re: Flexible configuration for full-text search