Re: [HACKERS] logical decoding of two-phase transactions

Поиск
Список
Период
Сортировка
От Ajin Cherian
Тема Re: [HACKERS] logical decoding of two-phase transactions
Дата
Msg-id CAFPTHDYbW3vPtz684HXSxvG9pmMky705JXGCXfft-DEkxhFkEw@mail.gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] logical decoding of two-phase transactions  (Peter Smith <smithpb2250@gmail.com>)
Ответы Re: [HACKERS] logical decoding of two-phase transactions  (Amit Kapila <amit.kapila16@gmail.com>)
Список pgsql-hackers
On Wed, Oct 7, 2020 at 9:36 AM Peter Smith <smithpb2250@gmail.com> wrote:

> ==========
> Patch V6-0001, File: doc/src/sgml/logicaldecoding.sgml
> ==========
>
> COMMENT/QUESTION
> Section 48.6.1
> @ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks
>  LogicalDecodeTruncateCB truncate_cb;
>  LogicalDecodeCommitCB commit_cb;
>  LogicalDecodeMessageCB message_cb;
> + LogicalDecodeFilterPrepareCB filter_prepare_cb;
>
> Confused by the mixing of terminologies "abort" and "rollback".
> Why is it LogicalDecodeAbortPreparedCB instead of
> LogicalDecodeRollbackPreparedCB?
> Why is it abort_prepared_cb instead of rollback_prepared_cb;?
>
> I thought everything the user sees should be ROLLBACK/rollback (like
> the SQL) regardless of what the internal functions might be called.
>
> ;

Modified.

>
> COMMENT
> Section 48.6.1
> The begin_cb, change_cb and commit_cb callbacks are required, while
> startup_cb, filter_by_origin_cb, truncate_cb, and shutdown_cb are
> optional. If truncate_cb is not set but a TRUNCATE is to be decoded,
> the action will be ignored.
>
> The 1st paragraph beneath the typedef does not mention the newly added
> callbacks to say if they are required or optional.
>
Added a new para for this.
> ;
>
> COMMENT
> Section 48.6.4.5
> Section 48.6.4.6
> Section 48.6.4.7
> @@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct
> LogicalDecodingContext *ctx,
>  </para>
>  </sect3>
>
> + <sect3 id="logicaldecoding-output-plugin-prepare">
> +    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
> +    <sect3 id="logicaldecoding-output-plugin-abort-prepared">
> +<programlisting>
>
> The wording and titles are a bit backwards compared to the others.
> e.g. previously was "Transaction Begin" (not "Begin Transaction") and
> "Transaction End" (not "End Transaction").
>
> So for consistently following the existing IMO should change these new
> titles (and wording) to:
> - "Commit Prepared Transaction Callback" --> "Transaction Commit
> Prepared Callback"
> - "Rollback Prepared Transaction Callback" --> "Transaction Rollback
> Prepared Callback"
> - "whenever a commit prepared transaction has been decoded" -->
> "whenever a transaction commit prepared has been decoded"
> - "whenever a rollback prepared transaction has been decoded." -->
> "whenever a transaction rollback prepared has been decoded."
>
> ;
Updated to this

>
> ==========
> Patch V6-0001, File: src/backend/replication/logical/decode.c
> ==========
>
> COMMENT
> Line 74
> @@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext
> *ctx, XLogRecordBuffer *buf,
>  xl_xact_parsed_commit *parsed, TransactionId xid);
>  static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
>  xl_xact_parsed_abort *parsed, TransactionId xid);
> +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
> + xl_xact_parsed_prepare * parsed);
>
> The 2nd line of DecodePrepare is misaligned by one space.
>
> ;
>
> COMMENT
> Line 321
> @@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx,
> XLogRecordBuffer *buf)
>  }
>  break;
>  case XLOG_XACT_PREPARE:
> + {
> + xl_xact_parsed_prepare parsed;
> + xl_xact_prepare *xlrec;
> + /* check that output plugin is capable of twophase decoding */
>
> "twophase" --> "two-phase"
>
> ~
>
> Also, add a blank line after the declarations.
>
> ;
>
> ==========
> Patch V6-0001, File: src/backend/replication/logical/logical.c
> ==========
>
> COMMENT
> Line 249
> @@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options,
>  (ctx->callbacks.stream_message_cb != NULL) ||
>  (ctx->callbacks.stream_truncate_cb != NULL);
>
> + /*
> + * To support two phase logical decoding, we require
> prepare/commit-prepare/abort-prepare
> + * callbacks. The filter-prepare callback is optional. We however
> enable two phase logical
> + * decoding when at least one of the methods is enabled so that we
> can easily identify
> + * missing methods.
>
> The terminology is generally well known as "two-phase" (with the
> hyphen) https://en.wikipedia.org/wiki/Two-phase_commit_protocol so
> let's be consistent for all the patch code comments. Please search the
> code and correct this in all places, even where I might have missed to
> identify it.
>
> "two phase" --> "two-phase"
>
> ;
>
> COMMENT
> Line 822
> @@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
>  }
>
>  static void
> +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
> + XLogRecPtr prepare_lsn)
>
> "support 2 phase" --> "supports two-phase" in the comment
>
> ;
>
> COMMENT
> Line 844
> Code condition seems strange and/or broken.
> if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL)
> Because if the flag is null then this condition is skipped.
> But then if the callback was also NULL then attempting to call it to
> "do the actual work" will give NPE.
>
> ~
>
> Also, I wonder should this check be the first thing in this function?
> Because if it fails does it even make sense that all the errcallback
> code was set up?
> E.g errcallback.arg potentially is left pointing to a stack variable
> on a stack that no longer exists.
>

Updated accordingly.

> ;
>
> COMMENT
> Line 857
> +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
>
> "support 2 phase" --> "supports two-phase" in the comment
>
> ~
>
> Also, Same potential trouble with the condition:
> if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL)
> Same as previously asked. Should this check be first thing in this function?
>
> ;
>
> COMMENT
> Line 892
> +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
>
> "support 2 phase" --> "supports two-phase" in the comment
>
> ~
>
> Same potential trouble with the condition:
> if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL)
> Same as previously asked. Should this check be the first thing in this function?
>
> ;
>
> COMMENT
> Line 1013
> @@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
>  error_context_stack = errcallback.previous;
>  }
>
> +static bool
> +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
> + TransactionId xid, const char *gid)
>
> Fix wording in comment:
> "twophase" --> "two-phase transactions"
> "twophase transactions" --> "two-phase transactions"
>

Updated accordingly.

> ==========
> Patch V6-0001, File: src/backend/replication/logical/reorderbuffer.c
> ==========
>
> COMMENT
> Line 255
> @@ -251,7 +251,8 @@ static Size
> ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
>  static void ReorderBufferRestoreChange(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>  char *change);
>  static void ReorderBufferRestoreCleanup(ReorderBuffer *rb,
> ReorderBufferTXN *txn);
> -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
> +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
> + bool txn_prepared);
>
> The alignment is inconsistent. One more space needed before "bool txn_prepared"
>
> ;
>
> COMMENT
> Line 417
> @@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
>  }
>
>  /* free data that's contained */
> + if (txn->gid != NULL)
> + {
> + pfree(txn->gid);
> + txn->gid = NULL;
> + }
>
> Should add the blank link before this new code, as it was before.
>
> ;
>
> COMMENT
> Line 1564
> @ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
>  }
>
>  /*
> - * Discard changes from a transaction (and subtransactions), after streaming
> - * them. Keep the remaining info - transactions, tuplecids, invalidations and
> - * snapshots.
> + * Discard changes from a transaction (and subtransactions), either
> after streaming or
> + * after a PREPARE.
>
> typo "snapshots.If" -> "snapshots. If"
>
> ;

Updated Accordingly.

>
> COMMENT/QUESTION
> Line 1590
> @@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
>  Assert(rbtxn_is_known_subxact(subtxn));
>  Assert(subtxn->nsubtxns == 0);
>
> - ReorderBufferTruncateTXN(rb, subtxn);
> + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
>  }
>
> There are some code paths here I did not understand how they match the comments.
> Because this function is recursive it seems that it may be called
> where the 2nd parameter txn is a sub-transaction.
>
> But then this seems at odds with some of the other code comments of
> this function which are processing the txn without ever testing is it
> really toplevel or not:
>
> e.g. Line 1593 "/* cleanup changes in the toplevel txn */"
> e.g. Line 1632 "They are always stored in the toplevel transaction."
>
> ;

I see that another commit in between has updated this now.
>
> COMMENT
> Line 1644
> @@ -1560,9 +1621,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
>  * about the toplevel xact (we send the XID in all messages), but we never
>  * stream XIDs of empty subxacts.
>  */
> - if ((!txn->toptxn) || (txn->nentries_mem != 0))
> + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
>  txn->txn_flags |= RBTXN_IS_STREAMED;
>
> + if (txn_prepared)
>
> /* remove the change from it's containing list */
> typo "it's" --> "its"

Updated.
>
> ;
>
> QUESTION
> Line 1977
> @@ -1880,7 +1965,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>  ReorderBufferChange *specinsert)
>  {
>  /* Discard the changes that we just streamed */
> - ReorderBufferTruncateTXN(rb, txn);
> + ReorderBufferTruncateTXN(rb, txn, false);
>
> How do you know the 3rd parameter - i.e. txn_prepared - should be
> hardwired false here?
> e.g. I thought that maybe rbtxn_prepared(txn) can be true here.
>
> ;

This particular function is only called when streaming and not when
handling a prepared transaction.
>
> COMMENT
> Line 2345
> @@ -2249,7 +2334,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>  break;
>  }
>  }
> -
>  /*
>
> Looks like accidental blank line deletion. This should be put back how it was
>
> ;
>
> COMMENT/QUESTION
> Line 2374
> @@ -2278,7 +2362,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>  }
>  }
>  else
> - rb->commit(rb, txn, commit_lsn);
> + {
> + /*
> + * Call either PREPARE (for twophase transactions) or COMMIT
> + * (for regular ones).
>
> "twophase" --> "two-phase"
>
> ~
Updated.

>
> Also, I was confused by the apparent assumption of exclusiveness of
> streaming and 2PC...
> e.g. what if streaming AND 2PC then it won't do rb->prepare()
>
> ;
>
> QUESTION
> Line 2424
> @@ -2319,11 +2412,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>  */
>  if (streaming)
>  {
> - ReorderBufferTruncateTXN(rb, txn);
> + ReorderBufferTruncateTXN(rb, txn, false);
>
>  /* Reset the CheckXidAlive */
>  CheckXidAlive = InvalidTransactionId;
>  }
> + else if (rbtxn_prepared(txn))
>
> I was confused by the exclusiveness of streaming/2PC.
> e.g. what if streaming AND 2PC at same time - how can you pass false
> as 3rd param to ReorderBufferTruncateTXN?

ReorderBufferProcessTXN can only be called when streaming individual
commands and not for streaming a prepare or a commit, Streaming of
prepare and commit would be handled as part of
ReorderBufferStreamCommit.

>
> ;
>
> COMMENT
> Line 2463
> @@ -2352,17 +2451,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>
>  /*
>  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
> - * abort of the (sub)transaction we are streaming. We need to do the
> + * abort of the (sub)transaction we are streaming or preparing. We
> need to do the
>  * cleanup and return gracefully on this error, see SetupCheckXidLive.
>  */
>
> "twoi phase" --> "two-phase"
>
> ;
>
> QUESTIONS
> Line 2482
> @@ -2370,10 +2470,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>  errdata = NULL;
>  curtxn->concurrent_abort = true;
>
> - /* Reset the TXN so that it is allowed to stream remaining data. */
> - ReorderBufferResetTXN(rb, txn, snapshot_now,
> - command_id, prev_lsn,
> - specinsert);
> + /* If streaming, reset the TXN so that it is allowed to stream
> remaining data. */
> + if (streaming)
>
> Re: /* If streaming, reset the TXN so that it is allowed to stream
> remaining data. */
> I was confused by the exclusiveness of streaming/2PC.
> Is it not possible for streaming flags and rbtxn_prepared(txn) true at
> the same time?

Same as above.

>
> ~
>
> elog(LOG, "stopping decoding of %s (%u)",
>  txn->gid[0] != '\0'? txn->gid:"", txn->xid);
>
> Is this a safe operation, or do you also need to test txn->gid is not NULL?

Since this is in code where it is not streaming and therefore
rbtxn_prepared(txn), so gid has to be NOT NULL.

>
> ;
>
> COMMENT
> Line 2606
> +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
>
> "twophase" --> "two-phase"
>
> ;
>
> QUESTION
> Line 2655
> +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
>
> "This is used to handle COMMIT/ABORT PREPARED"
> Should that say "COMMIT/ROLLBACK PREPARED"?
>
> ;
>
> COMMENT
> Line 2668
>
> "Anyways, 2PC transactions" --> "Anyway, two-phase transactions"
>
> ;
>
> COMMENT
> Line 2765
> @@ -2495,7 +2731,13 @@ ReorderBufferAbort(ReorderBuffer *rb,
> TransactionId xid, XLogRecPtr lsn)
>  /* cosmetic... */
>  txn->final_lsn = lsn;
>
> - /* remove potential on-disk data, and deallocate */
> + /*
> + * remove potential on-disk data, and deallocate.
> + *
>
> Remove the blank between the comment and code.
>
> ==========
> Patch V6-0001, File: src/include/replication/logical.h
> ==========
>
> COMMENT
> Line 89
>
> "two phase" -> "two-phase"
>
> ;
>
> COMMENT
> Line 89
>
> For consistency with the previous member naming really the new member
> should just be called "twophase" rather than "enable_twophase"
>
> ;

Updated accordingly.

>
> ==========
> Patch V6-0001, File: src/include/replication/output_plugin.h
> ==========
>
> QUESTION
> Line 106
>
> As previously asked, why is the callback function/typedef referred as
> AbortPrepared instead of RollbackPrepared?
> It does not match the SQL and the function comment, and seems only to
> add some unnecessary confusion.
>
> ;
>
> ==========
> Patch V6-0001, File: src/include/replication/reorderbuffer.h
> ==========
>
> QUESTION
> Line 116
> @@ -162,9 +163,13 @@ typedef struct ReorderBufferChange
>  #define RBTXN_HAS_CATALOG_CHANGES 0x0001
>  #define RBTXN_IS_SUBXACT 0x0002
>  #define RBTXN_IS_SERIALIZED 0x0004
> -#define RBTXN_IS_STREAMED 0x0008
> -#define RBTXN_HAS_TOAST_INSERT 0x0010
> -#define RBTXN_HAS_SPEC_INSERT 0x0020
> +#define RBTXN_PREPARE 0x0008
> +#define RBTXN_COMMIT_PREPARED 0x0010
> +#define RBTXN_ROLLBACK_PREPARED 0x0020
> +#define RBTXN_COMMIT 0x0040
> +#define RBTXN_IS_STREAMED 0x0080
> +#define RBTXN_HAS_TOAST_INSERT 0x0100
> +#define RBTXN_HAS_SPEC_INSERT 0x0200
>
> I was wondering why when adding new flags, some of the existing flag
> masks were also altered.
> I am assuming this is ok because they are never persisted but are only
> used in the protocol (??)
>
> ;
>
> COMMENT
> Line 226
> @@ -218,6 +223,15 @@ typedef struct ReorderBufferChange
>  ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
>  )
>
> +/* is this txn prepared? */
> +#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE)
> +/* was this prepared txn committed in the meanwhile? */
> +#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED)
> +/* was this prepared txn aborted in the meanwhile? */
> +#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED)
> +/* was this txn committed in the meanwhile? */
> +#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT)
> +
>
> Probably all the "txn->txn_flags" here might be more safely written
> with parentheses in the macro like "(txn)->txn_flags".
>
> ~
>
> Also, Start all comments with capital. And what is the meaning "in the
> meanwhile?"
>
> ;
>
> COMMENT
> Line 410
> @@ -390,6 +407,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
>  ReorderBufferTXN *txn,
>  XLogRecPtr commit_lsn);
>
> The format is inconsistent with all other callback signatures here,
> where the 1st arg was on the same line as the typedef.
>
> ;
>
> COMMENT
> Line 440-442
>
> Excessive blank lines following this change?
>
> ;
>
> COMMENT
> Line 638
> @@ -571,6 +631,15 @@ void
> ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid,
> XLog
>  bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
>  bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
>
> +bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
> + const char *gid);
> +bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
> + const char *gid);
> +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
> + XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
> + TimestampTz commit_time,
> + RepOriginId origin_id, XLogRecPtr origin_lsn,
> + char *gid);
>
> Not aligned consistently with other function prototypes.
>
> ;
Updated
>
> ==========
> Patch V6-0003, File: src/backend/access/transam/twophase.c
> ==========
>
> COMMENT
> Line 551
> @@ -548,6 +548,37 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
>  }
>
>  /*
> + * LookupGXact
> + * Check if the prepared transaction with the given GID is around
> + */
> +bool
> +LookupGXact(const char *gid)
>
> There is potential to refactor/simplify this code:
> e.g.
>
> bool
> LookupGXact(const char *gid)
> {
>  int i;
>  bool found = false;
>
>  LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
>  for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
>  {
>   GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
>   /* Ignore not-yet-valid GIDs */
>   if (gxact->valid && strcmp(gxact->gid, gid) == 0)
>   {
>    found = true;
>    break;
>   }
>  }
>  LWLockRelease(TwoPhaseStateLock);
>  return found;
> }
>
> ;
>
Updated accordingly.

> ==========
> Patch V6-0003, File: src/backend/replication/logical/proto.c
> ==========
>
> COMMENT
> Line 86
> @@ -72,12 +72,17 @@ logicalrep_read_begin(StringInfo in,
> LogicalRepBeginData *begin_data)
>  */
>  void
>  logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
> - XLogRecPtr commit_lsn)
>
> Since now the flags are used the code comment is wrong.
> "/* send the flags field (unused for now) */"
>
> ;
>
> COMMENT
> Line 129
> @ -106,6 +115,77 @@ logicalrep_read_commit(StringInfo in,
> LogicalRepCommitData *commit_data)
>  }
>
>  /*
> + * Write PREPARE to the output stream.
> + */
> +void
> +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
>
> "2PC transactions" --> "two-phase commit transactions"
>
> ;

Updated
>
> COMMENT
> Line 133
>
> Assert(strlen(txn->gid) > 0);
> Shouldn't that assertion also check txn->gid is not NULL (to prevent
> NPE in case gid was NULL)

In this case txn->gid has to be non NULL.

>
> ;
>
> COMMENT
> Line 177
> +logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data)
>
> prepare_data->prepare_type = flags;
> This code may be OK but it does seem a bit of an abuse of the flags.
>
> e.g. Are they flags or are the really enum values?
> e.g. And if they are effectively enums (it appears they are) then
> seemed inconsistent that |= was used when they were previously
> assigned.
>
> ;

I have not updated this as according to Amit this might require
refactoring again.
>
> ==========
> Patch V6-0003, File: src/backend/replication/logical/worker.c
> ==========
>
> COMMENT
> Line 757
> @@ -749,6 +753,141 @@ apply_handle_commit(StringInfo s)
>  pgstat_report_activity(STATE_IDLE, NULL);
>  }
>
> +static void
> +apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data)
> +{
> + Assert(prepare_data->prepare_lsn == remote_final_lsn);
>
> Missing function comment to say this is called from apply_handle_prepare.
>
> ;
>
> COMMENT
> Line 798
> +apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data)
>
> Missing function comment to say this is called from apply_handle_prepare.
>
> ;
>
> COMMENT
> Line 824
> +apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data)
>
> Missing function comment to say this is called from apply_handle_prepare.
>

Updated.

> ==========
> Patch V6-0003, File: src/backend/replication/pgoutput/pgoutput.c
> ==========
>
> COMMENT
> Line 50
> @@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
>  ReorderBufferChange *change);
>  static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
>  RepOriginId origin_id);
> +static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
>
> The parameter indentation (2nd lines) does not match everything else
> in this context.
>
> ;
>
> COMMENT
> Line 152
> @@ -143,6 +149,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>  cb->change_cb = pgoutput_change;
>  cb->truncate_cb = pgoutput_truncate;
>  cb->commit_cb = pgoutput_commit_txn;
> +
> + cb->prepare_cb = pgoutput_prepare_txn;
> + cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
> + cb->abort_prepared_cb = pgoutput_abort_prepared_txn;
>
> Remove the unnecessary blank line.
>
> ;
>
> QUESTION
> Line 386
> @@ -373,7 +383,49 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>  OutputPluginUpdateProgress(ctx);
>
>  OutputPluginPrepareWrite(ctx, true);
> - logicalrep_write_commit(ctx->out, txn, commit_lsn);
> + logicalrep_write_commit(ctx->out, txn, commit_lsn, true);
>
> Is the is_commit parameter of logicalrep_write_commit ever passed as false?
> If yes, where?
> If no, the what is the point of it?

It was dead code from an earlier version. I have removed it, updated
accordingly.

>
> ;
>
> COMMENT
> Line 408
> +pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
> Since all this function is identical to pg_output_prepare it might be
> better to either
> 1. just leave this as a wrapper to delegate to that function
> 2. remove this one entirely and assign the callback to the common
> pgoutput_prepare_txn
>
> ;

I have not changed this as this might require re-factoring according to Amit.

>
> COMMENT
> Line 419
> +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>
> Since all this function is identical to pg_output_prepare if might be
> better to either
> 1. just leave this as a wrapper to delegate to that function
> 2. remove this one entirely and assign the callback to the common
> pgoutput_prepare_tx
>
> ;

Same as above.

>
> COMMENT
> Line 419
> +pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>
> Shouldn't this comment say be "ROLLBACK PREPARED"?
>
> ;

Updated.
>
> ==========
> Patch V6-0003, File: src/include/replication/logicalproto.h
> ==========
>
> QUESTION
> Line 101
> @@ -87,20 +87,55 @@ typedef struct LogicalRepBeginData
>  TransactionId xid;
>  } LogicalRepBeginData;
>
> +/* Commit (and abort) information */
>
> #define LOGICALREP_IS_ABORT 0x02
> Is there a good reason why this is not called:
> #define LOGICALREP_IS_ROLLBACK 0x02
>
> ;

Removed.

>
> COMMENT
> Line 105
>
> ((flags == LOGICALREP_IS_COMMIT) || (flags == LOGICALREP_IS_ABORT))
>
> Macros would be safer if flags are in parentheses
> (((flags) == LOGICALREP_IS_COMMIT) || ((flags) == LOGICALREP_IS_ABORT))
>
> ;
>
> COMMENT
> Line 115
>
> Unexpected whitespace for the typedef
> "} LogicalRepPrepareData;"
>
> ;
>
> COMMENT
> Line 122
> /* prepare can be exactly one of PREPARE, [COMMIT|ABORT] PREPARED*/
> #define PrepareFlagsAreValid(flags) \
>  ((flags == LOGICALREP_IS_PREPARE) || \
>  (flags == LOGICALREP_IS_COMMIT_PREPARED) || \
>  (flags == LOGICALREP_IS_ROLLBACK_PREPARED))
>
> There is confusing mixture in macros and comments of ABORT and ROLLBACK terms
> "[COMMIT|ABORT] PREPARED" --> "[COMMIT|ROLLBACK] PREPARED"
>
> ~
>
> Also, it would be safer if flags are in parentheses
>  (((flags) == LOGICALREP_IS_PREPARE) || \
>  ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \
>  ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED))
>
> ;

updated.

>
> ==========
> Patch V6-0003, File: src/test/subscription/t/020_twophase.pl
> ==========
>
> COMMENT
> Line 131 - # check inserts are visible
>
> Isn't this supposed to be checking for rows 12 and 13, instead of 11 and 12?
>
> ;

Updated.
>
> ==========
> Patch V6-0004, File: contrib/test_decoding/test_decoding.c
> ==========
>
> COMMENT
> Line 81
> @@ -78,6 +78,15 @@ static void
> pg_decode_stream_stop(LogicalDecodingContext *ctx,
>  static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
>  ReorderBufferTXN *txn,
>  XLogRecPtr abort_lsn);
> +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
> +static
>
> All these functions have a 3rd parameter called commit_lsn. Even
> though the functions are not commit related. It seems like a cut/paste
> error.
>
> ;
>
> COMMENT
> Line 142
> @@ -130,6 +139,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>  cb->stream_start_cb = pg_decode_stream_start;
>  cb->stream_stop_cb = pg_decode_stream_stop;
>  cb->stream_abort_cb = pg_decode_stream_abort;
> + cb->stream_prepare_cb = pg_decode_stream_prepare;
> + cb->stream_commit_prepared_cb = pg_decode_stream_commit_prepared;
> + cb->stream_abort_prepared_cb = pg_decode_stream_abort_prepared;
>  cb->stream_commit_cb = pg_decode_stream_commit;
>
> Can the "cb->stream_abort_prepared_cb" be changed to
> "cb->stream_rollback_prepared_cb"?
>
> ;
>
> COMMENT
> Line 827
> @@ -812,6 +824,78 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
>  }
>
>  static void
> +pg_decode_stream_prepare(LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn)
> +{
> + TestDecodingData *data = ctx->output_plugin_pr
>
> The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error.
>
> ;
>
> COMMENT
> Line 875
> +pg_decode_stream_abort_prepared(LogicalDecodingContext *ctx,
>
> The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error.
>
> ;
>

Updated.

> ==========
> Patch V6-0004, File: doc/src/sgml/logicaldecoding.sgml
> ==========
>
> COMMENT
> 48.6.1
> @@ -396,6 +396,9 @@ typedef struct OutputPluginCallbacks
>  LogicalDecodeStreamStartCB stream_start_cb;
>  LogicalDecodeStreamStopCB stream_stop_cb;
>  LogicalDecodeStreamAbortCB stream_abort_cb;
> + LogicalDecodeStreamPrepareCB stream_prepare_cb;
> + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb;
> + LogicalDecodeStreamAbortPreparedCB stream_abort_prepared_cb;
>
> Same question from previous review comments - why using the
> terminology "abort" instead of "rollback"
>
> ;
>
> COMMENT
> 48.6.1
> @@ -418,7 +421,9 @@ typedef void (*LogicalOutputPluginInit) (struct
> OutputPluginCallbacks *cb);
>  in-progress transactions. The <function>stream_start_cb</function>,
>  <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
>  <function>stream_commit_cb</function> and <function>stream_change_cb</function>
> - are required, while <function>stream_message_cb</function> and
> + are required, while <function>stream_message_cb</function>,
> + <function>stream_prepare_cb</function>,
> <function>stream_commit_prepared_cb</function>,
> + <function>stream_abort_prepared_cb</function>,
>
> Missing "and".
> ... "stream_abort_prepared_cb, stream_truncate_cb are optional." -->
> "stream_abort_prepared_cb, and stream_truncate_cb are optional."
>
> ;
>
> COMMENT
> Section 48.6.4.16
> Section 48.6.4.17
> Section 48.6.4.18
> @@ -839,6 +844,45 @@ typedef void (*LogicalDecodeStreamAbortCB)
> (struct LogicalDecodingContext *ctx,
>  </para>
>  </sect3>
>
> + <sect3 id="logicaldecoding-output-plugin-stream-prepare">
> + <title>Stream Prepare Callback</title>
> + <para>
> + The <function>stream_prepare_cb</function> callback is called to prepare
> + a previously streamed transaction as part of a two phase commit.
> +<programlisting>
> +typedef void (*LogicalDecodeStreamPrepareCB) (struct
> LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + XLogRecPtr abort_lsn);
> +</programlisting>
> + </para>
> + </sect3>
> +
> + <sect3 id="logicaldecoding-output-plugin-stream-commit-prepared">
> + <title>Stream Commit Prepared Callback</title>
> + <para>
> + The <function>stream_commit_prepared_cb</function> callback is
> called to commit prepared
> + a previously streamed transaction as part of a two phase commit.
> +<programlisting>
> +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct
> LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + XLogRecPtr abort_lsn);
> +</programlisting>
> + </para>
> + </sect3>
> +
> + <sect3 id="logicaldecoding-output-plugin-stream-abort-prepared">
> + <title>Stream Abort Prepared Callback</title>
> + <para>
> + The <function>stream_abort_prepared_cb</function> callback is called
> to abort prepared
> + a previously streamed transaction as part of a two phase commit.
> +<programlisting>
> +typedef void (*LogicalDecodeStreamAbortPreparedCB) (struct
> LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn,
> + XLogRecPtr abort_lsn);
> +</programlisting>
> + </para>
> + </sect3>
>
> 1. Everywhere it says "two phase" commit should be consistently
> replaced to say "two-phase" commit (with the hyphen)
>
> 2. Search for "abort_lsn" parameter. It seems to be overused
> (cut/paste error) even when the API is unrelated to abort
>
> 3. 48.6.4.17 and 48.6.4.18
> Is this wording ok? Is the word "prepared" even necessary here?
> - "... called to commit prepared a previously streamed transaction ..."
> - "... called to abort prepared a previously streamed transaction ..."
>
> ;

Updated accordingly.

>
> COMMENT
> Section 48.9
> @@ -1017,9 +1061,13 @@ OutputPluginWrite(ctx, true);
>  When streaming an in-progress transaction, the changes (and messages) are
>  streamed in blocks demarcated by <function>stream_start_cb</function>
>  and <function>stream_stop_cb</function> callbacks. Once all the decoded
> - changes are transmitted, the transaction is committed using the
> - <function>stream_commit_cb</function> callback (or possibly aborted using
> - the <function>stream_abort_cb</function> callback).
> + changes are transmitted, the transaction can be committed using the
> + the <function>stream_commit_cb</function> callback
>
> "two phase" --> "two-phase"
>
> ~
>
> Also, Missing period on end of sentence.
> "or aborted using the stream_abort_prepared_cb" --> "or aborted using
> the stream_abort_prepared_cb."
>
> ;

Updated accordingly.

>
> ==========
> Patch V6-0004, File: src/backend/replication/logical/logical.c
> ==========
>
> COMMENT
> Line 84
> @@ -81,6 +81,12 @@ static void stream_stop_cb_wrapper(ReorderBuffer
> *cache, ReorderBufferTXN *txn,
>  XLogRecPtr last_lsn);
>  static void stream_abort_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
>  XLogRecPtr abort_lsn);
> +static void stream_prepare_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
> +static void stream_commit_prepared_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
> +static void stream_abort_prepared_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
>
> The 3rd parameter is always "commit_lsn" even for API unrelated to
> commit, so seems like cut/paste error.
>
> ;
>
> COMMENT
> Line 1246
> @@ -1231,6 +1243,105 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
>  }
>
>  static void
> +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn)
> +{
> + LogicalDecodingContext *ctx = cache->private_data;
> + LogicalErrorCallbackState state;
>
> Misnamed parameter "commit_lsn" ?
>
> ~
>
> Also, Line 1272
> There seem to be some missing integrity checking to make sure the
> callback is not NULL.
> A null callback will give NPE when wrapper attempts to call it
>
> ;
>
> COMMENT
> Line 1305
> +static void
> +stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
>
> There seem to be some missing integrity checking to make sure the
> callback is not NULL.
> A null callback will give NPE when wrapper attempts to call it.
>
> ;
>
> COMMENT
> Line 1312
> +static void
> +stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
>
> Misnamed parameter "commit_lsn" ?
>
> ~
>
> Also, Line 1338
> There seem to be some missing integrity checking to make sure the
> callback is not NULL.
> A null callback will give NPE when wrapper attempts to call it.
>

Updated accordingly.

>
> ==========
> Patch V6-0004, File: src/backend/replication/logical/reorderbuffer.c
> ==========
>
> COMMENT
> Line 2684
> @@ -2672,15 +2681,31 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb,
> TransactionId xid,
>  txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
>  strcpy(txn->gid, gid);
>
> - if (is_commit)
> + if (rbtxn_is_streamed(txn))
>  {
> - txn->txn_flags |= RBTXN_COMMIT_PREPARED;
> - rb->commit_prepared(rb, txn, commit_lsn);
> + if (is_commit)
> + {
> + txn->txn_flags |= RBTXN_COMMIT_PREPARED;
>
> The setting/checking of the flags could be refactored if you wanted to
> write less code:
> e.g.
> if (is_commit)
>  txn->txn_flags |= RBTXN_COMMIT_PREPARED;
> else
>  txn->txn_flags |= RBTXN_ROLLBACK_PREPARED;
>
> if (rbtxn_is_streamed(txn) && rbtxn_commit_prepared(txn))
>  rb->stream_commit_prepared(rb, txn, commit_lsn);
> else if (rbtxn_is_streamed(txn) && rbtxn_rollback_prepared(txn))
>  rb->stream_abort_prepared(rb, txn, commit_lsn);
> else if (rbtxn_commit_prepared(txn))
>  rb->commit_prepared(rb, txn, commit_lsn);
> else if (rbtxn_rollback_prepared(txn))
>  rb->abort_prepared(rb, txn, commit_lsn);
>
> ;

Updated accordingly.

>
> ==========
> Patch V6-0004, File: src/include/replication/output_plugin.h
> ==========
>
> COMMENT
> Line 171
> @@ -157,6 +157,33 @@ typedef void (*LogicalDecodeStreamAbortCB)
> (struct LogicalDecodingContext *ctx,
>  XLogRecPtr abort_lsn);
>
>  /*
> + * Called to prepare changes streamed to remote node from in-progress
> + * transaction. This is called as part of a two-phase commit and only when
> + * two-phased commits are supported
> + */
>
> 1. Missing period all these comments.
>
> 2. Is the part that says "and only where two-phased commits are
> supported" necessary to say? Is seems redundant since comments already
> says called as part of a two-phase commit.
>
> ;
>
> ==========
> Patch V6-0004, File: src/include/replication/reorderbuffer.h
> ==========
>
> COMMENT
> Line 467
> @@ -466,6 +466,24 @@ typedef void (*ReorderBufferStreamAbortCB) (
>  ReorderBufferTXN *txn,
>  XLogRecPtr abort_lsn);
>
> +/* prepare streamed transaction callback signature */
> +typedef void (*ReorderBufferStreamPrepareCB) (
> + ReorderBuffer *rb,
> + ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
> +
> +/* prepare streamed transaction callback signature */
> +typedef void (*ReorderBufferStreamCommitPreparedCB) (
> + ReorderBuffer *rb,
> + ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
> +
> +/* prepare streamed transaction callback signature */
> +typedef void (*ReorderBufferStreamAbortPreparedCB) (
> + ReorderBuffer *rb,
> + ReorderBufferTXN *txn,
> + XLogRecPtr commit_lsn);
>
> Cut/paste error - repeated same comment 3 times?
>

Updated Accordingly.

>
> [END]
>
>

I believe I have addressed all of Peter's comments. Peter, do have a
look and let me know if I missed anything or if you find anythinge
else. Thanks for your comments, much appreciated.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Julien Rouhaud
Дата:
Сообщение: Re: Feature improvement: can we add queryId for pg_catalog.pg_stat_activity view?
Следующее
От: Amit Kapila
Дата:
Сообщение: Re: Parallel Inserts in CREATE TABLE AS