Re: [HACKERS] logical decoding of two-phase transactions
От | Ajin Cherian |
---|---|
Тема | Re: [HACKERS] logical decoding of two-phase transactions |
Дата | |
Msg-id | CAFPTHDYbW3vPtz684HXSxvG9pmMky705JXGCXfft-DEkxhFkEw@mail.gmail.com обсуждение исходный текст |
Ответ на | Re: [HACKERS] logical decoding of two-phase transactions (Peter Smith <smithpb2250@gmail.com>) |
Ответы |
Re: [HACKERS] logical decoding of two-phase transactions
(Amit Kapila <amit.kapila16@gmail.com>)
|
Список | pgsql-hackers |
On Wed, Oct 7, 2020 at 9:36 AM Peter Smith <smithpb2250@gmail.com> wrote: > ========== > Patch V6-0001, File: doc/src/sgml/logicaldecoding.sgml > ========== > > COMMENT/QUESTION > Section 48.6.1 > @ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks > LogicalDecodeTruncateCB truncate_cb; > LogicalDecodeCommitCB commit_cb; > LogicalDecodeMessageCB message_cb; > + LogicalDecodeFilterPrepareCB filter_prepare_cb; > > Confused by the mixing of terminologies "abort" and "rollback". > Why is it LogicalDecodeAbortPreparedCB instead of > LogicalDecodeRollbackPreparedCB? > Why is it abort_prepared_cb instead of rollback_prepared_cb;? > > I thought everything the user sees should be ROLLBACK/rollback (like > the SQL) regardless of what the internal functions might be called. > > ; Modified. > > COMMENT > Section 48.6.1 > The begin_cb, change_cb and commit_cb callbacks are required, while > startup_cb, filter_by_origin_cb, truncate_cb, and shutdown_cb are > optional. If truncate_cb is not set but a TRUNCATE is to be decoded, > the action will be ignored. > > The 1st paragraph beneath the typedef does not mention the newly added > callbacks to say if they are required or optional. > Added a new para for this. > ; > > COMMENT > Section 48.6.4.5 > Section 48.6.4.6 > Section 48.6.4.7 > @@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct > LogicalDecodingContext *ctx, > </para> > </sect3> > > + <sect3 id="logicaldecoding-output-plugin-prepare"> > + <sect3 id="logicaldecoding-output-plugin-commit-prepared"> > + <sect3 id="logicaldecoding-output-plugin-abort-prepared"> > +<programlisting> > > The wording and titles are a bit backwards compared to the others. > e.g. previously was "Transaction Begin" (not "Begin Transaction") and > "Transaction End" (not "End Transaction"). > > So for consistently following the existing IMO should change these new > titles (and wording) to: > - "Commit Prepared Transaction Callback" --> "Transaction Commit > Prepared Callback" > - "Rollback Prepared Transaction Callback" --> "Transaction Rollback > Prepared Callback" > - "whenever a commit prepared transaction has been decoded" --> > "whenever a transaction commit prepared has been decoded" > - "whenever a rollback prepared transaction has been decoded." --> > "whenever a transaction rollback prepared has been decoded." > > ; Updated to this > > ========== > Patch V6-0001, File: src/backend/replication/logical/decode.c > ========== > > COMMENT > Line 74 > @@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext > *ctx, XLogRecordBuffer *buf, > xl_xact_parsed_commit *parsed, TransactionId xid); > static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, > xl_xact_parsed_abort *parsed, TransactionId xid); > +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, > + xl_xact_parsed_prepare * parsed); > > The 2nd line of DecodePrepare is misaligned by one space. > > ; > > COMMENT > Line 321 > @@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx, > XLogRecordBuffer *buf) > } > break; > case XLOG_XACT_PREPARE: > + { > + xl_xact_parsed_prepare parsed; > + xl_xact_prepare *xlrec; > + /* check that output plugin is capable of twophase decoding */ > > "twophase" --> "two-phase" > > ~ > > Also, add a blank line after the declarations. > > ; > > ========== > Patch V6-0001, File: src/backend/replication/logical/logical.c > ========== > > COMMENT > Line 249 > @@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options, > (ctx->callbacks.stream_message_cb != NULL) || > (ctx->callbacks.stream_truncate_cb != NULL); > > + /* > + * To support two phase logical decoding, we require > prepare/commit-prepare/abort-prepare > + * callbacks. The filter-prepare callback is optional. We however > enable two phase logical > + * decoding when at least one of the methods is enabled so that we > can easily identify > + * missing methods. > > The terminology is generally well known as "two-phase" (with the > hyphen) https://en.wikipedia.org/wiki/Two-phase_commit_protocol so > let's be consistent for all the patch code comments. Please search the > code and correct this in all places, even where I might have missed to > identify it. > > "two phase" --> "two-phase" > > ; > > COMMENT > Line 822 > @@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > } > > static void > +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > + XLogRecPtr prepare_lsn) > > "support 2 phase" --> "supports two-phase" in the comment > > ; > > COMMENT > Line 844 > Code condition seems strange and/or broken. > if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL) > Because if the flag is null then this condition is skipped. > But then if the callback was also NULL then attempting to call it to > "do the actual work" will give NPE. > > ~ > > Also, I wonder should this check be the first thing in this function? > Because if it fails does it even make sense that all the errcallback > code was set up? > E.g errcallback.arg potentially is left pointing to a stack variable > on a stack that no longer exists. > Updated accordingly. > ; > > COMMENT > Line 857 > +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > > "support 2 phase" --> "supports two-phase" in the comment > > ~ > > Also, Same potential trouble with the condition: > if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL) > Same as previously asked. Should this check be first thing in this function? > > ; > > COMMENT > Line 892 > +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > > "support 2 phase" --> "supports two-phase" in the comment > > ~ > > Same potential trouble with the condition: > if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL) > Same as previously asked. Should this check be the first thing in this function? > > ; > > COMMENT > Line 1013 > @@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > error_context_stack = errcallback.previous; > } > > +static bool > +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > + TransactionId xid, const char *gid) > > Fix wording in comment: > "twophase" --> "two-phase transactions" > "twophase transactions" --> "two-phase transactions" > Updated accordingly. > ========== > Patch V6-0001, File: src/backend/replication/logical/reorderbuffer.c > ========== > > COMMENT > Line 255 > @@ -251,7 +251,8 @@ static Size > ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn > static void ReorderBufferRestoreChange(ReorderBuffer *rb, > ReorderBufferTXN *txn, > char *change); > static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, > ReorderBufferTXN *txn); > -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); > +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, > + bool txn_prepared); > > The alignment is inconsistent. One more space needed before "bool txn_prepared" > > ; > > COMMENT > Line 417 > @@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn) > } > > /* free data that's contained */ > + if (txn->gid != NULL) > + { > + pfree(txn->gid); > + txn->gid = NULL; > + } > > Should add the blank link before this new code, as it was before. > > ; > > COMMENT > Line 1564 > @ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn) > } > > /* > - * Discard changes from a transaction (and subtransactions), after streaming > - * them. Keep the remaining info - transactions, tuplecids, invalidations and > - * snapshots. > + * Discard changes from a transaction (and subtransactions), either > after streaming or > + * after a PREPARE. > > typo "snapshots.If" -> "snapshots. If" > > ; Updated Accordingly. > > COMMENT/QUESTION > Line 1590 > @@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn) > Assert(rbtxn_is_known_subxact(subtxn)); > Assert(subtxn->nsubtxns == 0); > > - ReorderBufferTruncateTXN(rb, subtxn); > + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); > } > > There are some code paths here I did not understand how they match the comments. > Because this function is recursive it seems that it may be called > where the 2nd parameter txn is a sub-transaction. > > But then this seems at odds with some of the other code comments of > this function which are processing the txn without ever testing is it > really toplevel or not: > > e.g. Line 1593 "/* cleanup changes in the toplevel txn */" > e.g. Line 1632 "They are always stored in the toplevel transaction." > > ; I see that another commit in between has updated this now. > > COMMENT > Line 1644 > @@ -1560,9 +1621,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn) > * about the toplevel xact (we send the XID in all messages), but we never > * stream XIDs of empty subxacts. > */ > - if ((!txn->toptxn) || (txn->nentries_mem != 0)) > + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) > txn->txn_flags |= RBTXN_IS_STREAMED; > > + if (txn_prepared) > > /* remove the change from it's containing list */ > typo "it's" --> "its" Updated. > > ; > > QUESTION > Line 1977 > @@ -1880,7 +1965,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn, > ReorderBufferChange *specinsert) > { > /* Discard the changes that we just streamed */ > - ReorderBufferTruncateTXN(rb, txn); > + ReorderBufferTruncateTXN(rb, txn, false); > > How do you know the 3rd parameter - i.e. txn_prepared - should be > hardwired false here? > e.g. I thought that maybe rbtxn_prepared(txn) can be true here. > > ; This particular function is only called when streaming and not when handling a prepared transaction. > > COMMENT > Line 2345 > @@ -2249,7 +2334,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn, > break; > } > } > - > /* > > Looks like accidental blank line deletion. This should be put back how it was > > ; > > COMMENT/QUESTION > Line 2374 > @@ -2278,7 +2362,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn, > } > } > else > - rb->commit(rb, txn, commit_lsn); > + { > + /* > + * Call either PREPARE (for twophase transactions) or COMMIT > + * (for regular ones). > > "twophase" --> "two-phase" > > ~ Updated. > > Also, I was confused by the apparent assumption of exclusiveness of > streaming and 2PC... > e.g. what if streaming AND 2PC then it won't do rb->prepare() > > ; > > QUESTION > Line 2424 > @@ -2319,11 +2412,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn, > */ > if (streaming) > { > - ReorderBufferTruncateTXN(rb, txn); > + ReorderBufferTruncateTXN(rb, txn, false); > > /* Reset the CheckXidAlive */ > CheckXidAlive = InvalidTransactionId; > } > + else if (rbtxn_prepared(txn)) > > I was confused by the exclusiveness of streaming/2PC. > e.g. what if streaming AND 2PC at same time - how can you pass false > as 3rd param to ReorderBufferTruncateTXN? ReorderBufferProcessTXN can only be called when streaming individual commands and not for streaming a prepare or a commit, Streaming of prepare and commit would be handled as part of ReorderBufferStreamCommit. > > ; > > COMMENT > Line 2463 > @@ -2352,17 +2451,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn, > > /* > * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent > - * abort of the (sub)transaction we are streaming. We need to do the > + * abort of the (sub)transaction we are streaming or preparing. We > need to do the > * cleanup and return gracefully on this error, see SetupCheckXidLive. > */ > > "twoi phase" --> "two-phase" > > ; > > QUESTIONS > Line 2482 > @@ -2370,10 +2470,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, > ReorderBufferTXN *txn, > errdata = NULL; > curtxn->concurrent_abort = true; > > - /* Reset the TXN so that it is allowed to stream remaining data. */ > - ReorderBufferResetTXN(rb, txn, snapshot_now, > - command_id, prev_lsn, > - specinsert); > + /* If streaming, reset the TXN so that it is allowed to stream > remaining data. */ > + if (streaming) > > Re: /* If streaming, reset the TXN so that it is allowed to stream > remaining data. */ > I was confused by the exclusiveness of streaming/2PC. > Is it not possible for streaming flags and rbtxn_prepared(txn) true at > the same time? Same as above. > > ~ > > elog(LOG, "stopping decoding of %s (%u)", > txn->gid[0] != '\0'? txn->gid:"", txn->xid); > > Is this a safe operation, or do you also need to test txn->gid is not NULL? Since this is in code where it is not streaming and therefore rbtxn_prepared(txn), so gid has to be NOT NULL. > > ; > > COMMENT > Line 2606 > +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, > > "twophase" --> "two-phase" > > ; > > QUESTION > Line 2655 > +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, > > "This is used to handle COMMIT/ABORT PREPARED" > Should that say "COMMIT/ROLLBACK PREPARED"? > > ; > > COMMENT > Line 2668 > > "Anyways, 2PC transactions" --> "Anyway, two-phase transactions" > > ; > > COMMENT > Line 2765 > @@ -2495,7 +2731,13 @@ ReorderBufferAbort(ReorderBuffer *rb, > TransactionId xid, XLogRecPtr lsn) > /* cosmetic... */ > txn->final_lsn = lsn; > > - /* remove potential on-disk data, and deallocate */ > + /* > + * remove potential on-disk data, and deallocate. > + * > > Remove the blank between the comment and code. > > ========== > Patch V6-0001, File: src/include/replication/logical.h > ========== > > COMMENT > Line 89 > > "two phase" -> "two-phase" > > ; > > COMMENT > Line 89 > > For consistency with the previous member naming really the new member > should just be called "twophase" rather than "enable_twophase" > > ; Updated accordingly. > > ========== > Patch V6-0001, File: src/include/replication/output_plugin.h > ========== > > QUESTION > Line 106 > > As previously asked, why is the callback function/typedef referred as > AbortPrepared instead of RollbackPrepared? > It does not match the SQL and the function comment, and seems only to > add some unnecessary confusion. > > ; > > ========== > Patch V6-0001, File: src/include/replication/reorderbuffer.h > ========== > > QUESTION > Line 116 > @@ -162,9 +163,13 @@ typedef struct ReorderBufferChange > #define RBTXN_HAS_CATALOG_CHANGES 0x0001 > #define RBTXN_IS_SUBXACT 0x0002 > #define RBTXN_IS_SERIALIZED 0x0004 > -#define RBTXN_IS_STREAMED 0x0008 > -#define RBTXN_HAS_TOAST_INSERT 0x0010 > -#define RBTXN_HAS_SPEC_INSERT 0x0020 > +#define RBTXN_PREPARE 0x0008 > +#define RBTXN_COMMIT_PREPARED 0x0010 > +#define RBTXN_ROLLBACK_PREPARED 0x0020 > +#define RBTXN_COMMIT 0x0040 > +#define RBTXN_IS_STREAMED 0x0080 > +#define RBTXN_HAS_TOAST_INSERT 0x0100 > +#define RBTXN_HAS_SPEC_INSERT 0x0200 > > I was wondering why when adding new flags, some of the existing flag > masks were also altered. > I am assuming this is ok because they are never persisted but are only > used in the protocol (??) > > ; > > COMMENT > Line 226 > @@ -218,6 +223,15 @@ typedef struct ReorderBufferChange > ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ > ) > > +/* is this txn prepared? */ > +#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE) > +/* was this prepared txn committed in the meanwhile? */ > +#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED) > +/* was this prepared txn aborted in the meanwhile? */ > +#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED) > +/* was this txn committed in the meanwhile? */ > +#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT) > + > > Probably all the "txn->txn_flags" here might be more safely written > with parentheses in the macro like "(txn)->txn_flags". > > ~ > > Also, Start all comments with capital. And what is the meaning "in the > meanwhile?" > > ; > > COMMENT > Line 410 > @@ -390,6 +407,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, > ReorderBufferTXN *txn, > XLogRecPtr commit_lsn); > > The format is inconsistent with all other callback signatures here, > where the 1st arg was on the same line as the typedef. > > ; > > COMMENT > Line 440-442 > > Excessive blank lines following this change? > > ; > > COMMENT > Line 638 > @@ -571,6 +631,15 @@ void > ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, > XLog > bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); > bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); > > +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, > + const char *gid); > +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid, > + const char *gid); > +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, > + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, > + TimestampTz commit_time, > + RepOriginId origin_id, XLogRecPtr origin_lsn, > + char *gid); > > Not aligned consistently with other function prototypes. > > ; Updated > > ========== > Patch V6-0003, File: src/backend/access/transam/twophase.c > ========== > > COMMENT > Line 551 > @@ -548,6 +548,37 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held) > } > > /* > + * LookupGXact > + * Check if the prepared transaction with the given GID is around > + */ > +bool > +LookupGXact(const char *gid) > > There is potential to refactor/simplify this code: > e.g. > > bool > LookupGXact(const char *gid) > { > int i; > bool found = false; > > LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); > for (i = 0; i < TwoPhaseState->numPrepXacts; i++) > { > GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; > /* Ignore not-yet-valid GIDs */ > if (gxact->valid && strcmp(gxact->gid, gid) == 0) > { > found = true; > break; > } > } > LWLockRelease(TwoPhaseStateLock); > return found; > } > > ; > Updated accordingly. > ========== > Patch V6-0003, File: src/backend/replication/logical/proto.c > ========== > > COMMENT > Line 86 > @@ -72,12 +72,17 @@ logicalrep_read_begin(StringInfo in, > LogicalRepBeginData *begin_data) > */ > void > logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, > - XLogRecPtr commit_lsn) > > Since now the flags are used the code comment is wrong. > "/* send the flags field (unused for now) */" > > ; > > COMMENT > Line 129 > @ -106,6 +115,77 @@ logicalrep_read_commit(StringInfo in, > LogicalRepCommitData *commit_data) > } > > /* > + * Write PREPARE to the output stream. > + */ > +void > +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, > > "2PC transactions" --> "two-phase commit transactions" > > ; Updated > > COMMENT > Line 133 > > Assert(strlen(txn->gid) > 0); > Shouldn't that assertion also check txn->gid is not NULL (to prevent > NPE in case gid was NULL) In this case txn->gid has to be non NULL. > > ; > > COMMENT > Line 177 > +logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data) > > prepare_data->prepare_type = flags; > This code may be OK but it does seem a bit of an abuse of the flags. > > e.g. Are they flags or are the really enum values? > e.g. And if they are effectively enums (it appears they are) then > seemed inconsistent that |= was used when they were previously > assigned. > > ; I have not updated this as according to Amit this might require refactoring again. > > ========== > Patch V6-0003, File: src/backend/replication/logical/worker.c > ========== > > COMMENT > Line 757 > @@ -749,6 +753,141 @@ apply_handle_commit(StringInfo s) > pgstat_report_activity(STATE_IDLE, NULL); > } > > +static void > +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data) > +{ > + Assert(prepare_data->prepare_lsn == remote_final_lsn); > > Missing function comment to say this is called from apply_handle_prepare. > > ; > > COMMENT > Line 798 > +apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data) > > Missing function comment to say this is called from apply_handle_prepare. > > ; > > COMMENT > Line 824 > +apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data) > > Missing function comment to say this is called from apply_handle_prepare. > Updated. > ========== > Patch V6-0003, File: src/backend/replication/pgoutput/pgoutput.c > ========== > > COMMENT > Line 50 > @@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, > ReorderBufferChange *change); > static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, > RepOriginId origin_id); > +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); > > The parameter indentation (2nd lines) does not match everything else > in this context. > > ; > > COMMENT > Line 152 > @@ -143,6 +149,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) > cb->change_cb = pgoutput_change; > cb->truncate_cb = pgoutput_truncate; > cb->commit_cb = pgoutput_commit_txn; > + > + cb->prepare_cb = pgoutput_prepare_txn; > + cb->commit_prepared_cb = pgoutput_commit_prepared_txn; > + cb->abort_prepared_cb = pgoutput_abort_prepared_txn; > > Remove the unnecessary blank line. > > ; > > QUESTION > Line 386 > @@ -373,7 +383,49 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > OutputPluginUpdateProgress(ctx); > > OutputPluginPrepareWrite(ctx, true); > - logicalrep_write_commit(ctx->out, txn, commit_lsn); > + logicalrep_write_commit(ctx->out, txn, commit_lsn, true); > > Is the is_commit parameter of logicalrep_write_commit ever passed as false? > If yes, where? > If no, the what is the point of it? It was dead code from an earlier version. I have removed it, updated accordingly. > > ; > > COMMENT > Line 408 > +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > > Since all this function is identical to pg_output_prepare it might be > better to either > 1. just leave this as a wrapper to delegate to that function > 2. remove this one entirely and assign the callback to the common > pgoutput_prepare_txn > > ; I have not changed this as this might require re-factoring according to Amit. > > COMMENT > Line 419 > +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > > Since all this function is identical to pg_output_prepare if might be > better to either > 1. just leave this as a wrapper to delegate to that function > 2. remove this one entirely and assign the callback to the common > pgoutput_prepare_tx > > ; Same as above. > > COMMENT > Line 419 > +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, > > Shouldn't this comment say be "ROLLBACK PREPARED"? > > ; Updated. > > ========== > Patch V6-0003, File: src/include/replication/logicalproto.h > ========== > > QUESTION > Line 101 > @@ -87,20 +87,55 @@ typedef struct LogicalRepBeginData > TransactionId xid; > } LogicalRepBeginData; > > +/* Commit (and abort) information */ > > #define LOGICALREP_IS_ABORT 0x02 > Is there a good reason why this is not called: > #define LOGICALREP_IS_ROLLBACK 0x02 > > ; Removed. > > COMMENT > Line 105 > > ((flags == LOGICALREP_IS_COMMIT) || (flags == LOGICALREP_IS_ABORT)) > > Macros would be safer if flags are in parentheses > (((flags) == LOGICALREP_IS_COMMIT) || ((flags) == LOGICALREP_IS_ABORT)) > > ; > > COMMENT > Line 115 > > Unexpected whitespace for the typedef > "} LogicalRepPrepareData;" > > ; > > COMMENT > Line 122 > /* prepare can be exactly one of PREPARE, [COMMIT|ABORT] PREPARED*/ > #define PrepareFlagsAreValid(flags) \ > ((flags == LOGICALREP_IS_PREPARE) || \ > (flags == LOGICALREP_IS_COMMIT_PREPARED) || \ > (flags == LOGICALREP_IS_ROLLBACK_PREPARED)) > > There is confusing mixture in macros and comments of ABORT and ROLLBACK terms > "[COMMIT|ABORT] PREPARED" --> "[COMMIT|ROLLBACK] PREPARED" > > ~ > > Also, it would be safer if flags are in parentheses > (((flags) == LOGICALREP_IS_PREPARE) || \ > ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \ > ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED)) > > ; updated. > > ========== > Patch V6-0003, File: src/test/subscription/t/020_twophase.pl > ========== > > COMMENT > Line 131 - # check inserts are visible > > Isn't this supposed to be checking for rows 12 and 13, instead of 11 and 12? > > ; Updated. > > ========== > Patch V6-0004, File: contrib/test_decoding/test_decoding.c > ========== > > COMMENT > Line 81 > @@ -78,6 +78,15 @@ static void > pg_decode_stream_stop(LogicalDecodingContext *ctx, > static void pg_decode_stream_abort(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > XLogRecPtr abort_lsn); > +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > +static > > All these functions have a 3rd parameter called commit_lsn. Even > though the functions are not commit related. It seems like a cut/paste > error. > > ; > > COMMENT > Line 142 > @@ -130,6 +139,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) > cb->stream_start_cb = pg_decode_stream_start; > cb->stream_stop_cb = pg_decode_stream_stop; > cb->stream_abort_cb = pg_decode_stream_abort; > + cb->stream_prepare_cb = pg_decode_stream_prepare; > + cb->stream_commit_prepared_cb = pg_decode_stream_commit_prepared; > + cb->stream_abort_prepared_cb = pg_decode_stream_abort_prepared; > cb->stream_commit_cb = pg_decode_stream_commit; > > Can the "cb->stream_abort_prepared_cb" be changed to > "cb->stream_rollback_prepared_cb"? > > ; > > COMMENT > Line 827 > @@ -812,6 +824,78 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, > } > > static void > +pg_decode_stream_prepare(LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn) > +{ > + TestDecodingData *data = ctx->output_plugin_pr > > The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error. > > ; > > COMMENT > Line 875 > +pg_decode_stream_abort_prepared(LogicalDecodingContext *ctx, > > The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error. > > ; > Updated. > ========== > Patch V6-0004, File: doc/src/sgml/logicaldecoding.sgml > ========== > > COMMENT > 48.6.1 > @@ -396,6 +396,9 @@ typedef struct OutputPluginCallbacks > LogicalDecodeStreamStartCB stream_start_cb; > LogicalDecodeStreamStopCB stream_stop_cb; > LogicalDecodeStreamAbortCB stream_abort_cb; > + LogicalDecodeStreamPrepareCB stream_prepare_cb; > + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb; > + LogicalDecodeStreamAbortPreparedCB stream_abort_prepared_cb; > > Same question from previous review comments - why using the > terminology "abort" instead of "rollback" > > ; > > COMMENT > 48.6.1 > @@ -418,7 +421,9 @@ typedef void (*LogicalOutputPluginInit) (struct > OutputPluginCallbacks *cb); > in-progress transactions. The <function>stream_start_cb</function>, > <function>stream_stop_cb</function>, <function>stream_abort_cb</function>, > <function>stream_commit_cb</function> and <function>stream_change_cb</function> > - are required, while <function>stream_message_cb</function> and > + are required, while <function>stream_message_cb</function>, > + <function>stream_prepare_cb</function>, > <function>stream_commit_prepared_cb</function>, > + <function>stream_abort_prepared_cb</function>, > > Missing "and". > ... "stream_abort_prepared_cb, stream_truncate_cb are optional." --> > "stream_abort_prepared_cb, and stream_truncate_cb are optional." > > ; > > COMMENT > Section 48.6.4.16 > Section 48.6.4.17 > Section 48.6.4.18 > @@ -839,6 +844,45 @@ typedef void (*LogicalDecodeStreamAbortCB) > (struct LogicalDecodingContext *ctx, > </para> > </sect3> > > + <sect3 id="logicaldecoding-output-plugin-stream-prepare"> > + <title>Stream Prepare Callback</title> > + <para> > + The <function>stream_prepare_cb</function> callback is called to prepare > + a previously streamed transaction as part of a two phase commit. > +<programlisting> > +typedef void (*LogicalDecodeStreamPrepareCB) (struct > LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr abort_lsn); > +</programlisting> > + </para> > + </sect3> > + > + <sect3 id="logicaldecoding-output-plugin-stream-commit-prepared"> > + <title>Stream Commit Prepared Callback</title> > + <para> > + The <function>stream_commit_prepared_cb</function> callback is > called to commit prepared > + a previously streamed transaction as part of a two phase commit. > +<programlisting> > +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct > LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr abort_lsn); > +</programlisting> > + </para> > + </sect3> > + > + <sect3 id="logicaldecoding-output-plugin-stream-abort-prepared"> > + <title>Stream Abort Prepared Callback</title> > + <para> > + The <function>stream_abort_prepared_cb</function> callback is called > to abort prepared > + a previously streamed transaction as part of a two phase commit. > +<programlisting> > +typedef void (*LogicalDecodeStreamAbortPreparedCB) (struct > LogicalDecodingContext *ctx, > + ReorderBufferTXN *txn, > + XLogRecPtr abort_lsn); > +</programlisting> > + </para> > + </sect3> > > 1. Everywhere it says "two phase" commit should be consistently > replaced to say "two-phase" commit (with the hyphen) > > 2. Search for "abort_lsn" parameter. It seems to be overused > (cut/paste error) even when the API is unrelated to abort > > 3. 48.6.4.17 and 48.6.4.18 > Is this wording ok? Is the word "prepared" even necessary here? > - "... called to commit prepared a previously streamed transaction ..." > - "... called to abort prepared a previously streamed transaction ..." > > ; Updated accordingly. > > COMMENT > Section 48.9 > @@ -1017,9 +1061,13 @@ OutputPluginWrite(ctx, true); > When streaming an in-progress transaction, the changes (and messages) are > streamed in blocks demarcated by <function>stream_start_cb</function> > and <function>stream_stop_cb</function> callbacks. Once all the decoded > - changes are transmitted, the transaction is committed using the > - <function>stream_commit_cb</function> callback (or possibly aborted using > - the <function>stream_abort_cb</function> callback). > + changes are transmitted, the transaction can be committed using the > + the <function>stream_commit_cb</function> callback > > "two phase" --> "two-phase" > > ~ > > Also, Missing period on end of sentence. > "or aborted using the stream_abort_prepared_cb" --> "or aborted using > the stream_abort_prepared_cb." > > ; Updated accordingly. > > ========== > Patch V6-0004, File: src/backend/replication/logical/logical.c > ========== > > COMMENT > Line 84 > @@ -81,6 +81,12 @@ static void stream_stop_cb_wrapper(ReorderBuffer > *cache, ReorderBufferTXN *txn, > XLogRecPtr last_lsn); > static void stream_abort_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > XLogRecPtr abort_lsn); > +static void stream_prepare_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > +static void stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > +static void stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > > The 3rd parameter is always "commit_lsn" even for API unrelated to > commit, so seems like cut/paste error. > > ; > > COMMENT > Line 1246 > @@ -1231,6 +1243,105 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, > ReorderBufferTXN *txn, > } > > static void > +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn) > +{ > + LogicalDecodingContext *ctx = cache->private_data; > + LogicalErrorCallbackState state; > > Misnamed parameter "commit_lsn" ? > > ~ > > Also, Line 1272 > There seem to be some missing integrity checking to make sure the > callback is not NULL. > A null callback will give NPE when wrapper attempts to call it > > ; > > COMMENT > Line 1305 > +static void > +stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > > There seem to be some missing integrity checking to make sure the > callback is not NULL. > A null callback will give NPE when wrapper attempts to call it. > > ; > > COMMENT > Line 1312 > +static void > +stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, > > Misnamed parameter "commit_lsn" ? > > ~ > > Also, Line 1338 > There seem to be some missing integrity checking to make sure the > callback is not NULL. > A null callback will give NPE when wrapper attempts to call it. > Updated accordingly. > > ========== > Patch V6-0004, File: src/backend/replication/logical/reorderbuffer.c > ========== > > COMMENT > Line 2684 > @@ -2672,15 +2681,31 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, > TransactionId xid, > txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ > strcpy(txn->gid, gid); > > - if (is_commit) > + if (rbtxn_is_streamed(txn)) > { > - txn->txn_flags |= RBTXN_COMMIT_PREPARED; > - rb->commit_prepared(rb, txn, commit_lsn); > + if (is_commit) > + { > + txn->txn_flags |= RBTXN_COMMIT_PREPARED; > > The setting/checking of the flags could be refactored if you wanted to > write less code: > e.g. > if (is_commit) > txn->txn_flags |= RBTXN_COMMIT_PREPARED; > else > txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; > > if (rbtxn_is_streamed(txn) && rbtxn_commit_prepared(txn)) > rb->stream_commit_prepared(rb, txn, commit_lsn); > else if (rbtxn_is_streamed(txn) && rbtxn_rollback_prepared(txn)) > rb->stream_abort_prepared(rb, txn, commit_lsn); > else if (rbtxn_commit_prepared(txn)) > rb->commit_prepared(rb, txn, commit_lsn); > else if (rbtxn_rollback_prepared(txn)) > rb->abort_prepared(rb, txn, commit_lsn); > > ; Updated accordingly. > > ========== > Patch V6-0004, File: src/include/replication/output_plugin.h > ========== > > COMMENT > Line 171 > @@ -157,6 +157,33 @@ typedef void (*LogicalDecodeStreamAbortCB) > (struct LogicalDecodingContext *ctx, > XLogRecPtr abort_lsn); > > /* > + * Called to prepare changes streamed to remote node from in-progress > + * transaction. This is called as part of a two-phase commit and only when > + * two-phased commits are supported > + */ > > 1. Missing period all these comments. > > 2. Is the part that says "and only where two-phased commits are > supported" necessary to say? Is seems redundant since comments already > says called as part of a two-phase commit. > > ; > > ========== > Patch V6-0004, File: src/include/replication/reorderbuffer.h > ========== > > COMMENT > Line 467 > @@ -466,6 +466,24 @@ typedef void (*ReorderBufferStreamAbortCB) ( > ReorderBufferTXN *txn, > XLogRecPtr abort_lsn); > > +/* prepare streamed transaction callback signature */ > +typedef void (*ReorderBufferStreamPrepareCB) ( > + ReorderBuffer *rb, > + ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > + > +/* prepare streamed transaction callback signature */ > +typedef void (*ReorderBufferStreamCommitPreparedCB) ( > + ReorderBuffer *rb, > + ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > + > +/* prepare streamed transaction callback signature */ > +typedef void (*ReorderBufferStreamAbortPreparedCB) ( > + ReorderBuffer *rb, > + ReorderBufferTXN *txn, > + XLogRecPtr commit_lsn); > > Cut/paste error - repeated same comment 3 times? > Updated Accordingly. > > [END] > > I believe I have addressed all of Peter's comments. Peter, do have a look and let me know if I missed anything or if you find anythinge else. Thanks for your comments, much appreciated. regards, Ajin Cherian Fujitsu Australia
Вложения
В списке pgsql-hackers по дате отправления:
Предыдущее
От: Julien RouhaudДата:
Сообщение: Re: Feature improvement: can we add queryId for pg_catalog.pg_stat_activity view?