Re: [HACKERS] logical decoding of two-phase transactions

Поиск
Список
Период
Сортировка
От Peter Smith
Тема Re: [HACKERS] logical decoding of two-phase transactions
Дата
Msg-id CAHut+Ptfp2+y9wEusTNCvfAbLC5c95oQestCdWEM3kf=+YDMWw@mail.gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] logical decoding of two-phase transactions  (Peter Smith <smithpb2250@gmail.com>)
Ответы Re: [HACKERS] logical decoding of two-phase transactions  (Ajin Cherian <itsajin@gmail.com>)
Список pgsql-hackers
Hello Ajin.

I have gone through the v6 patch changes and have a list of review
comments below.

Apologies for the length of this email - I know that many of the
following comments are trivial, but I figured I should either just
ignore everything cosmetic, or list everything regardless. I chose the
latter.

There may be some duplication where the same review comment is written
for multiple files and/or where the same file is in your multiple
patches.

Kind Regards.
Peter Smith
Fujitsu Australia

[BEGIN]

==========
Patch V6-0001, File: contrib/test_decoding/expected/prepared.out (so
prepared.sql also)
==========

COMMENT
Line 30 - The INSERT INTO test_prepared1 VALUES (2); is kind of
strange because it is not really part of the prior test nor the
following test. Maybe it would be better to have a comment describing
the purpose of this isolated INSERT and to also consume the data from
the slot so it does not get jumbled with the data of the following
(abort) test.

;

COMMENT
Line 53 - Same comment for this test INSERT INTO test_prepared1 VALUES
(4); It kind of has nothing really to do with either the prior (abort)
test nor the following (ddl) test.

;

COMMENT
Line 60 - Seems to check which locks are held for the test_prepared_1
table while the transaction is in progress. Maybe it would be better
to have more comments describing what is expected here and why.

;

COMMENT
Line 88 - There is a comment in the test saying "-- We should see '7'
before '5' in our results since it commits first." but I did not see
any test code that actually verifies that happens.

;

QUESTION
Line 120 - I did not really understand the SQL checking the pg_class.
I expected this would be checking table 'test_prepared1' instead. Can
you explain it?
SELECT 'pg_class' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'pg_class'::regclass;
relation | locktype | mode
----------+----------+------
(0 rows)

;

QUESTION
Line 139 - SET statement_timeout = '1s'; is 1 seconds short enough
here for this test, or might it be that these statements would be
completed in less than one seconds anyhow?

;

QUESTION
Line 163 - How is this testing a SAVEPOINT? Or is it only to check
that the SAVEPOINT command is not part of the replicated changes?

;

COMMENT
Line 175 - Missing underscore in comment. Code requires also underscore:
"nodecode" --> "_nodecode"

==========
Patch V6-0001, File: contrib/test_decoding/test_decoding.c
==========

COMMENT
Line 43
@@ -36,6 +40,7 @@ typedef struct
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
+ TransactionId check_xid; /* track abort of this txid */
} TestDecodingData;

The "check_xid" seems a meaningless name. Check what?
IIUC maybe should be something like "check_xid_aborted"

;

COMMENT
Line 105
@ -88,6 +93,19 @@ static void
pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 ReorderBufferTXN *txn,
 int nrelations, Relation relations[],
 ReorderBufferChange *change);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,

Remove extra blank line after these functions

;

COMMENT
Line 149
@@ -116,6 +134,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 cb->stream_change_cb = pg_decode_stream_change;
 cb->stream_message_cb = pg_decode_stream_message;
 cb->stream_truncate_cb = pg_decode_stream_truncate;
+ cb->filter_prepare_cb = pg_decode_filter_prepare;
+ cb->prepare_cb = pg_decode_prepare_txn;
+ cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+ cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
+
 }

There is a confusing mix of terminology where sometimes things are
referred as ROLLBACK/rollback and other times apparently the same
operation is referred as ABORT/abort. I do not know the root cause of
this mixture. IIUC maybe the internal functions and protocol generally
use the term "abort", whereas the SQL syntax is "ROLLBACK"... but
where those two terms collide in the middle it gets quite confusing.

At least I thought the names of the "callbacks" which get exposed to
the user (e.g. in the help) might be better if they would match the
SQL.
"abort_prepared_cb" --> "rollback_prepared_db"

There are similar review comments like this below where the
alternating terms caused me some confusion.

~

Also, Remove the extra blank line before the end of the function.

;

COMMENT
Line 267
@ -227,6 +252,42 @@ pg_decode_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 strVal(elem->arg), elem->defname)));
 }
+ else if (strcmp(elem->defname, "two-phase-commit") == 0)
+ {
+ if (elem->arg == NULL)
+ continue;

IMO the "check-xid" code might be better rearranged so the NULL check
is first instead of if/else.
e.g.
if (elem->arg == NULL)
    ereport(FATAL,
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
        errmsg("check-xid needs an input value")));
~

Also, is it really supposed to be FATAL instead or ERROR. That is not
the same as the other surrounding code.

;

COMMENT
Line 296
if (data->check_xid <= 0)
 ereport(ERROR,
 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 errmsg("Specify positive value for parameter \"%s\","
 " you specified \"%s\"",
 elem->defname, strVal(elem->arg))));

The code checking for <= 0 seems over-complicated. Because conversion
was using strtoul() I fail to see how this can ever be < 0. Wouldn't
it be easier to simply test the result of the strtoul() function?

BEFORE: if (errno == EINVAL || errno == ERANGE)
AFTER: if (data->check_xid == 0)

~

Also, should this be FATAL? Everything else similar is ERROR.

;

COMMENT
(general)
I don't recall seeing any of these decoding options (e.g.
"two-phase-commit", "check-xid") documented anywhere.
So how can a user even know these options exist so they can use them?
Perhaps options should be described on this page?
https://www.postgresql.org/docs/13/functions-admin.html#FUNCTIONS-REPLICATION

;

COMMENT
(general)
"check-xid" is a meaningless option name. Maybe something like
"checked-xid-aborted" is more useful?
Suggest changing the member, the option, and the error messages to
match some better name.

;

COMMENT
Line 314
@@ -238,6 +299,7 @@ pg_decode_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 }

 ctx->streaming &= enable_streaming;
+ ctx->enable_twophase &= enable_2pc;
 }

The "ctx->enable_twophase" is inconsistent naming with the
"ctx->streaming" member.
"enable_twophase" --> "twophase"

;

COMMENT
Line 374
@@ -297,6 +359,94 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
 OutputPluginWrite(ctx, true);
 }

+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here
+ * we demonstrate a simple logic by checking the GID. If the
+ * GID contains the "_nodecode" substring, then we filter
+ * it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

Remove the extra preceding blank line.

~

I did not find anything in the help about "_nodecode". Should it be
there or is this deliberately not documented feature?

;

QUESTION
Line 440
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

Is this a wrong comment
"ABORT PREPARED" --> "ROLLBACK PREPARED" ??

;

COMMENT
Line 620
@@ -455,6 +605,22 @@ pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
 }
 data->xact_wrote_changes = true;

+ /* if check_xid is specified */
+ if (TransactionIdIsValid(data->check_xid))
+ {
+ elog(LOG, "waiting for %u to abort", data->check_xid);
+ while (TransactionIdIsInProgress(dat

The check_xid seems a meaningless name, and the comment "/* if
check_xid is specified */" was not helpful either.
IIUC purpose of this is to check that the nominated xid always is rolled back.
So the appropriate name may be more like "check-xid-aborted".

;

==========
Patch V6-0001, File: doc/src/sgml/logicaldecoding.sgml
==========

COMMENT/QUESTION
Section 48.6.1
@ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks
 LogicalDecodeTruncateCB truncate_cb;
 LogicalDecodeCommitCB commit_cb;
 LogicalDecodeMessageCB message_cb;
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;

Confused by the mixing of terminologies "abort" and "rollback".
Why is it LogicalDecodeAbortPreparedCB instead of
LogicalDecodeRollbackPreparedCB?
Why is it abort_prepared_cb instead of rollback_prepared_cb;?

I thought everything the user sees should be ROLLBACK/rollback (like
the SQL) regardless of what the internal functions might be called.

;

COMMENT
Section 48.6.1
The begin_cb, change_cb and commit_cb callbacks are required, while
startup_cb, filter_by_origin_cb, truncate_cb, and shutdown_cb are
optional. If truncate_cb is not set but a TRUNCATE is to be decoded,
the action will be ignored.

The 1st paragraph beneath the typedef does not mention the newly added
callbacks to say if they are required or optional.

;

COMMENT
Section 48.6.4.5
Section 48.6.4.6
Section 48.6.4.7
@@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct
LogicalDecodingContext *ctx,
 </para>
 </sect3>

+ <sect3 id="logicaldecoding-output-plugin-prepare">
+    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+    <sect3 id="logicaldecoding-output-plugin-abort-prepared">
+<programlisting>

The wording and titles are a bit backwards compared to the others.
e.g. previously was "Transaction Begin" (not "Begin Transaction") and
"Transaction End" (not "End Transaction").

So for consistently following the existing IMO should change these new
titles (and wording) to:
- "Commit Prepared Transaction Callback" --> "Transaction Commit
Prepared Callback"
- "Rollback Prepared Transaction Callback" --> "Transaction Rollback
Prepared Callback"
- "whenever a commit prepared transaction has been decoded" -->
"whenever a transaction commit prepared has been decoded"
- "whenever a rollback prepared transaction has been decoded." -->
"whenever a transaction rollback prepared has been decoded."

;

==========
Patch V6-0001, File: src/backend/replication/logical/decode.c
==========

COMMENT
Line 74
@@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext
*ctx, XLogRecordBuffer *buf,
 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare * parsed);

The 2nd line of DecodePrepare is misaligned by one space.

;

COMMENT
Line 321
@@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
 }
 break;
 case XLOG_XACT_PREPARE:
+ {
+ xl_xact_parsed_prepare parsed;
+ xl_xact_prepare *xlrec;
+ /* check that output plugin is capable of twophase decoding */

"twophase" --> "two-phase"

~

Also, add a blank line after the declarations.

;

==========
Patch V6-0001, File: src/backend/replication/logical/logical.c
==========

COMMENT
Line 249
@@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options,
 (ctx->callbacks.stream_message_cb != NULL) ||
 (ctx->callbacks.stream_truncate_cb != NULL);

+ /*
+ * To support two phase logical decoding, we require
prepare/commit-prepare/abort-prepare
+ * callbacks. The filter-prepare callback is optional. We however
enable two phase logical
+ * decoding when at least one of the methods is enabled so that we
can easily identify
+ * missing methods.

The terminology is generally well known as "two-phase" (with the
hyphen) https://en.wikipedia.org/wiki/Two-phase_commit_protocol so
let's be consistent for all the patch code comments. Please search the
code and correct this in all places, even where I might have missed to
identify it.

"two phase" --> "two-phase"

;

COMMENT
Line 822
@@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
 }

 static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)

"support 2 phase" --> "supports two-phase" in the comment

;

COMMENT
Line 844
Code condition seems strange and/or broken.
if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL)
Because if the flag is null then this condition is skipped.
But then if the callback was also NULL then attempting to call it to
"do the actual work" will give NPE.

~

Also, I wonder should this check be the first thing in this function?
Because if it fails does it even make sense that all the errcallback
code was set up?
E.g errcallback.arg potentially is left pointing to a stack variable
on a stack that no longer exists.

;

COMMENT
Line 857
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,

"support 2 phase" --> "supports two-phase" in the comment

~

Also, Same potential trouble with the condition:
if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL)
Same as previously asked. Should this check be first thing in this function?

;

COMMENT
Line 892
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,

"support 2 phase" --> "supports two-phase" in the comment

~

Same potential trouble with the condition:
if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL)
Same as previously asked. Should this check be the first thing in this function?

;

COMMENT
Line 1013
@@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
 error_context_stack = errcallback.previous;
 }

+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ TransactionId xid, const char *gid)

Fix wording in comment:
"twophase" --> "two-phase transactions"
"twophase transactions" --> "two-phase transactions"

==========
Patch V6-0001, File: src/backend/replication/logical/reorderbuffer.c
==========

COMMENT
Line 255
@@ -251,7 +251,8 @@ static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb,
ReorderBufferTXN *txn,
 char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb,
ReorderBufferTXN *txn);
-static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ bool txn_prepared);

The alignment is inconsistent. One more space needed before "bool txn_prepared"

;

COMMENT
Line 417
@@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn)
 }

 /* free data that's contained */
+ if (txn->gid != NULL)
+ {
+ pfree(txn->gid);
+ txn->gid = NULL;
+ }

Should add the blank link before this new code, as it was before.

;

COMMENT
Line 1564
@ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn)
 }

 /*
- * Discard changes from a transaction (and subtransactions), after streaming
- * them. Keep the remaining info - transactions, tuplecids, invalidations and
- * snapshots.
+ * Discard changes from a transaction (and subtransactions), either
after streaming or
+ * after a PREPARE.

typo "snapshots.If" -> "snapshots. If"

;

COMMENT/QUESTION
Line 1590
@@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn)
 Assert(rbtxn_is_known_subxact(subtxn));
 Assert(subtxn->nsubtxns == 0);

- ReorderBufferTruncateTXN(rb, subtxn);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
 }

There are some code paths here I did not understand how they match the comments.
Because this function is recursive it seems that it may be called
where the 2nd parameter txn is a sub-transaction.

But then this seems at odds with some of the other code comments of
this function which are processing the txn without ever testing is it
really toplevel or not:

e.g. Line 1593 "/* cleanup changes in the toplevel txn */"
e.g. Line 1632 "They are always stored in the toplevel transaction."

;

COMMENT
Line 1644
@@ -1560,9 +1621,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn)
 * about the toplevel xact (we send the XID in all messages), but we never
 * stream XIDs of empty subxacts.
 */
- if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
 txn->txn_flags |= RBTXN_IS_STREAMED;

+ if (txn_prepared)

/* remove the change from it's containing list */
typo "it's" --> "its"

;

QUESTION
Line 1977
@@ -1880,7 +1965,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
 ReorderBufferChange *specinsert)
 {
 /* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn);
+ ReorderBufferTruncateTXN(rb, txn, false);

How do you know the 3rd parameter - i.e. txn_prepared - should be
hardwired false here?
e.g. I thought that maybe rbtxn_prepared(txn) can be true here.

;

COMMENT
Line 2345
@@ -2249,7 +2334,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
 break;
 }
 }
-
 /*

Looks like accidental blank line deletion. This should be put back how it was

;

COMMENT/QUESTION
Line 2374
@@ -2278,7 +2362,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
 }
 }
 else
- rb->commit(rb, txn, commit_lsn);
+ {
+ /*
+ * Call either PREPARE (for twophase transactions) or COMMIT
+ * (for regular ones).

"twophase" --> "two-phase"

~

Also, I was confused by the apparent assumption of exclusiveness of
streaming and 2PC...
e.g. what if streaming AND 2PC then it won't do rb->prepare()

;

QUESTION
Line 2424
@@ -2319,11 +2412,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
 */
 if (streaming)
 {
- ReorderBufferTruncateTXN(rb, txn);
+ ReorderBufferTruncateTXN(rb, txn, false);

 /* Reset the CheckXidAlive */
 CheckXidAlive = InvalidTransactionId;
 }
+ else if (rbtxn_prepared(txn))

I was confused by the exclusiveness of streaming/2PC.
e.g. what if streaming AND 2PC at same time - how can you pass false
as 3rd param to ReorderBufferTruncateTXN?

;

COMMENT
Line 2463
@@ -2352,17 +2451,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,

 /*
 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
- * abort of the (sub)transaction we are streaming. We need to do the
+ * abort of the (sub)transaction we are streaming or preparing. We
need to do the
 * cleanup and return gracefully on this error, see SetupCheckXidLive.
 */

"twoi phase" --> "two-phase"

;

QUESTIONS
Line 2482
@@ -2370,10 +2470,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
 errdata = NULL;
 curtxn->concurrent_abort = true;

- /* Reset the TXN so that it is allowed to stream remaining data. */
- ReorderBufferResetTXN(rb, txn, snapshot_now,
- command_id, prev_lsn,
- specinsert);
+ /* If streaming, reset the TXN so that it is allowed to stream
remaining data. */
+ if (streaming)

Re: /* If streaming, reset the TXN so that it is allowed to stream
remaining data. */
I was confused by the exclusiveness of streaming/2PC.
Is it not possible for streaming flags and rbtxn_prepared(txn) true at
the same time?

~

elog(LOG, "stopping decoding of %s (%u)",
 txn->gid[0] != '\0'? txn->gid:"", txn->xid);

Is this a safe operation, or do you also need to test txn->gid is not NULL?

;

COMMENT
Line 2606
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,

"twophase" --> "two-phase"

;

QUESTION
Line 2655
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,

"This is used to handle COMMIT/ABORT PREPARED"
Should that say "COMMIT/ROLLBACK PREPARED"?

;

COMMENT
Line 2668

"Anyways, 2PC transactions" --> "Anyway, two-phase transactions"

;

COMMENT
Line 2765
@@ -2495,7 +2731,13 @@ ReorderBufferAbort(ReorderBuffer *rb,
TransactionId xid, XLogRecPtr lsn)
 /* cosmetic... */
 txn->final_lsn = lsn;

- /* remove potential on-disk data, and deallocate */
+ /*
+ * remove potential on-disk data, and deallocate.
+ *

Remove the blank between the comment and code.

==========
Patch V6-0001, File: src/include/replication/logical.h
==========

COMMENT
Line 89

"two phase" -> "two-phase"

;

COMMENT
Line 89

For consistency with the previous member naming really the new member
should just be called "twophase" rather than "enable_twophase"

;

==========
Patch V6-0001, File: src/include/replication/output_plugin.h
==========

QUESTION
Line 106

As previously asked, why is the callback function/typedef referred as
AbortPrepared instead of RollbackPrepared?
It does not match the SQL and the function comment, and seems only to
add some unnecessary confusion.

;

==========
Patch V6-0001, File: src/include/replication/reorderbuffer.h
==========

QUESTION
Line 116
@@ -162,9 +163,13 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT 0x0002
 #define RBTXN_IS_SERIALIZED 0x0004
-#define RBTXN_IS_STREAMED 0x0008
-#define RBTXN_HAS_TOAST_INSERT 0x0010
-#define RBTXN_HAS_SPEC_INSERT 0x0020
+#define RBTXN_PREPARE 0x0008
+#define RBTXN_COMMIT_PREPARED 0x0010
+#define RBTXN_ROLLBACK_PREPARED 0x0020
+#define RBTXN_COMMIT 0x0040
+#define RBTXN_IS_STREAMED 0x0080
+#define RBTXN_HAS_TOAST_INSERT 0x0100
+#define RBTXN_HAS_SPEC_INSERT 0x0200

I was wondering why when adding new flags, some of the existing flag
masks were also altered.
I am assuming this is ok because they are never persisted but are only
used in the protocol (??)

;

COMMENT
Line 226
@@ -218,6 +223,15 @@ typedef struct ReorderBufferChange
 ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
 )

+/* is this txn prepared? */
+#define rbtxn_prepared(txn) (txn->txn_flags & RBTXN_PREPARE)
+/* was this prepared txn committed in the meanwhile? */
+#define rbtxn_commit_prepared(txn) (txn->txn_flags & RBTXN_COMMIT_PREPARED)
+/* was this prepared txn aborted in the meanwhile? */
+#define rbtxn_rollback_prepared(txn) (txn->txn_flags & RBTXN_ROLLBACK_PREPARED)
+/* was this txn committed in the meanwhile? */
+#define rbtxn_commit(txn) (txn->txn_flags & RBTXN_COMMIT)
+

Probably all the "txn->txn_flags" here might be more safely written
with parentheses in the macro like "(txn)->txn_flags".

~

Also, Start all comments with capital. And what is the meaning "in the
meanwhile?"

;

COMMENT
Line 410
@@ -390,6 +407,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
 ReorderBufferTXN *txn,
 XLogRecPtr commit_lsn);

The format is inconsistent with all other callback signatures here,
where the 1st arg was on the same line as the typedef.

;

COMMENT
Line 440-442

Excessive blank lines following this change?

;

COMMENT
Line 638
@@ -571,6 +631,15 @@ void
ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid,
XLog
 bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);

+bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
+ const char *gid);
+bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+ const char *gid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid);

Not aligned consistently with other function prototypes.

;

==========
Patch V6-0003, File: src/backend/access/transam/twophase.c
==========

COMMENT
Line 551
@@ -548,6 +548,37 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
 }

 /*
+ * LookupGXact
+ * Check if the prepared transaction with the given GID is around
+ */
+bool
+LookupGXact(const char *gid)

There is potential to refactor/simplify this code:
e.g.

bool
LookupGXact(const char *gid)
{
 int i;
 bool found = false;

 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
 {
  GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
  /* Ignore not-yet-valid GIDs */
  if (gxact->valid && strcmp(gxact->gid, gid) == 0)
  {
   found = true;
   break;
  }
 }
 LWLockRelease(TwoPhaseStateLock);
 return found;
}

;

==========
Patch V6-0003, File: src/backend/replication/logical/proto.c
==========

COMMENT
Line 86
@@ -72,12 +72,17 @@ logicalrep_read_begin(StringInfo in,
LogicalRepBeginData *begin_data)
 */
 void
 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)

Since now the flags are used the code comment is wrong.
"/* send the flags field (unused for now) */"

;

COMMENT
Line 129
@ -106,6 +115,77 @@ logicalrep_read_commit(StringInfo in,
LogicalRepCommitData *commit_data)
 }

 /*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,

"2PC transactions" --> "two-phase commit transactions"

;

COMMENT
Line 133

Assert(strlen(txn->gid) > 0);
Shouldn't that assertion also check txn->gid is not NULL (to prevent
NPE in case gid was NULL)

;

COMMENT
Line 177
+logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data)

prepare_data->prepare_type = flags;
This code may be OK but it does seem a bit of an abuse of the flags.

e.g. Are they flags or are the really enum values?
e.g. And if they are effectively enums (it appears they are) then
seemed inconsistent that |= was used when they were previously
assigned.

;

==========
Patch V6-0003, File: src/backend/replication/logical/worker.c
==========

COMMENT
Line 757
@@ -749,6 +753,141 @@ apply_handle_commit(StringInfo s)
 pgstat_report_activity(STATE_IDLE, NULL);
 }

+static void
+apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data)
+{
+ Assert(prepare_data->prepare_lsn == remote_final_lsn);

Missing function comment to say this is called from apply_handle_prepare.

;

COMMENT
Line 798
+apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data)

Missing function comment to say this is called from apply_handle_prepare.

;

COMMENT
Line 824
+apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data)

Missing function comment to say this is called from apply_handle_prepare.

==========
Patch V6-0003, File: src/backend/replication/pgoutput/pgoutput.c
==========

COMMENT
Line 50
@@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
 ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 RepOriginId origin_id);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);

The parameter indentation (2nd lines) does not match everything else
in this context.

;

COMMENT
Line 152
@@ -143,6 +149,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 cb->change_cb = pgoutput_change;
 cb->truncate_cb = pgoutput_truncate;
 cb->commit_cb = pgoutput_commit_txn;
+
+ cb->prepare_cb = pgoutput_prepare_txn;
+ cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+ cb->abort_prepared_cb = pgoutput_abort_prepared_txn;

Remove the unnecessary blank line.

;

QUESTION
Line 386
@@ -373,7 +383,49 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
 OutputPluginUpdateProgress(ctx);

 OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_commit(ctx->out, txn, commit_lsn);
+ logicalrep_write_commit(ctx->out, txn, commit_lsn, true);

Is the is_commit parameter of logicalrep_write_commit ever passed as false?
If yes, where?
If no, the what is the point of it?

;

COMMENT
Line 408
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

Since all this function is identical to pg_output_prepare it might be
better to either
1. just leave this as a wrapper to delegate to that function
2. remove this one entirely and assign the callback to the common
pgoutput_prepare_txn

;

COMMENT
Line 419
+pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

Since all this function is identical to pg_output_prepare if might be
better to either
1. just leave this as a wrapper to delegate to that function
2. remove this one entirely and assign the callback to the common
pgoutput_prepare_tx

;

COMMENT
Line 419
+pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

Shouldn't this comment say be "ROLLBACK PREPARED"?

;

==========
Patch V6-0003, File: src/include/replication/logicalproto.h
==========

QUESTION
Line 101
@@ -87,20 +87,55 @@ typedef struct LogicalRepBeginData
 TransactionId xid;
 } LogicalRepBeginData;

+/* Commit (and abort) information */

#define LOGICALREP_IS_ABORT 0x02
Is there a good reason why this is not called:
#define LOGICALREP_IS_ROLLBACK 0x02

;

COMMENT
Line 105

((flags == LOGICALREP_IS_COMMIT) || (flags == LOGICALREP_IS_ABORT))

Macros would be safer if flags are in parentheses
(((flags) == LOGICALREP_IS_COMMIT) || ((flags) == LOGICALREP_IS_ABORT))

;

COMMENT
Line 115

Unexpected whitespace for the typedef
"} LogicalRepPrepareData;"

;

COMMENT
Line 122
/* prepare can be exactly one of PREPARE, [COMMIT|ABORT] PREPARED*/
#define PrepareFlagsAreValid(flags) \
 ((flags == LOGICALREP_IS_PREPARE) || \
 (flags == LOGICALREP_IS_COMMIT_PREPARED) || \
 (flags == LOGICALREP_IS_ROLLBACK_PREPARED))

There is confusing mixture in macros and comments of ABORT and ROLLBACK terms
"[COMMIT|ABORT] PREPARED" --> "[COMMIT|ROLLBACK] PREPARED"

~

Also, it would be safer if flags are in parentheses
 (((flags) == LOGICALREP_IS_PREPARE) || \
 ((flags) == LOGICALREP_IS_COMMIT_PREPARED) || \
 ((flags) == LOGICALREP_IS_ROLLBACK_PREPARED))

;

==========
Patch V6-0003, File: src/test/subscription/t/020_twophase.pl
==========

COMMENT
Line 131 - # check inserts are visible

Isn't this supposed to be checking for rows 12 and 13, instead of 11 and 12?

;

==========
Patch V6-0004, File: contrib/test_decoding/test_decoding.c
==========

COMMENT
Line 81
@@ -78,6 +78,15 @@ static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
 ReorderBufferTXN *txn,
 XLogRecPtr abort_lsn);
+static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static

All these functions have a 3rd parameter called commit_lsn. Even
though the functions are not commit related. It seems like a cut/paste
error.

;

COMMENT
Line 142
@@ -130,6 +139,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 cb->stream_start_cb = pg_decode_stream_start;
 cb->stream_stop_cb = pg_decode_stream_stop;
 cb->stream_abort_cb = pg_decode_stream_abort;
+ cb->stream_prepare_cb = pg_decode_stream_prepare;
+ cb->stream_commit_prepared_cb = pg_decode_stream_commit_prepared;
+ cb->stream_abort_prepared_cb = pg_decode_stream_abort_prepared;
 cb->stream_commit_cb = pg_decode_stream_commit;

Can the "cb->stream_abort_prepared_cb" be changed to
"cb->stream_rollback_prepared_cb"?

;

COMMENT
Line 827
@@ -812,6 +824,78 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 }

 static void
+pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ TestDecodingData *data = ctx->output_plugin_pr

The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error.

;

COMMENT
Line 875
+pg_decode_stream_abort_prepared(LogicalDecodingContext *ctx,

The commit_lsn (3rd parameter) is unused and seems like a cut/paste name error.

;

==========
Patch V6-0004, File: doc/src/sgml/logicaldecoding.sgml
==========

COMMENT
48.6.1
@@ -396,6 +396,9 @@ typedef struct OutputPluginCallbacks
 LogicalDecodeStreamStartCB stream_start_cb;
 LogicalDecodeStreamStopCB stream_stop_cb;
 LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
+ LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb;
+ LogicalDecodeStreamAbortPreparedCB stream_abort_prepared_cb;

Same question from previous review comments - why using the
terminology "abort" instead of "rollback"

;

COMMENT
48.6.1
@@ -418,7 +421,9 @@ typedef void (*LogicalOutputPluginInit) (struct
OutputPluginCallbacks *cb);
 in-progress transactions. The <function>stream_start_cb</function>,
 <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
 <function>stream_commit_cb</function> and <function>stream_change_cb</function>
- are required, while <function>stream_message_cb</function> and
+ are required, while <function>stream_message_cb</function>,
+ <function>stream_prepare_cb</function>,
<function>stream_commit_prepared_cb</function>,
+ <function>stream_abort_prepared_cb</function>,

Missing "and".
... "stream_abort_prepared_cb, stream_truncate_cb are optional." -->
"stream_abort_prepared_cb, and stream_truncate_cb are optional."

;

COMMENT
Section 48.6.4.16
Section 48.6.4.17
Section 48.6.4.18
@@ -839,6 +844,45 @@ typedef void (*LogicalDecodeStreamAbortCB)
(struct LogicalDecodingContext *ctx,
 </para>
 </sect3>

+ <sect3 id="logicaldecoding-output-plugin-stream-prepare">
+ <title>Stream Prepare Callback</title>
+ <para>
+ The <function>stream_prepare_cb</function> callback is called to prepare
+ a previously streamed transaction as part of a two phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamPrepareCB) (struct
LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-commit-prepared">
+ <title>Stream Commit Prepared Callback</title>
+ <para>
+ The <function>stream_commit_prepared_cb</function> callback is
called to commit prepared
+ a previously streamed transaction as part of a two phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct
LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-abort-prepared">
+ <title>Stream Abort Prepared Callback</title>
+ <para>
+ The <function>stream_abort_prepared_cb</function> callback is called
to abort prepared
+ a previously streamed transaction as part of a two phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamAbortPreparedCB) (struct
LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ </para>
+ </sect3>

1. Everywhere it says "two phase" commit should be consistently
replaced to say "two-phase" commit (with the hyphen)

2. Search for "abort_lsn" parameter. It seems to be overused
(cut/paste error) even when the API is unrelated to abort

3. 48.6.4.17 and 48.6.4.18
Is this wording ok? Is the word "prepared" even necessary here?
- "... called to commit prepared a previously streamed transaction ..."
- "... called to abort prepared a previously streamed transaction ..."

;

COMMENT
Section 48.9
@@ -1017,9 +1061,13 @@ OutputPluginWrite(ctx, true);
 When streaming an in-progress transaction, the changes (and messages) are
 streamed in blocks demarcated by <function>stream_start_cb</function>
 and <function>stream_stop_cb</function> callbacks. Once all the decoded
- changes are transmitted, the transaction is committed using the
- <function>stream_commit_cb</function> callback (or possibly aborted using
- the <function>stream_abort_cb</function> callback).
+ changes are transmitted, the transaction can be committed using the
+ the <function>stream_commit_cb</function> callback

"two phase" --> "two-phase"

~

Also, Missing period on end of sentence.
"or aborted using the stream_abort_prepared_cb" --> "or aborted using
the stream_abort_prepared_cb."

;

==========
Patch V6-0004, File: src/backend/replication/logical/logical.c
==========

COMMENT
Line 84
@@ -81,6 +81,12 @@ static void stream_stop_cb_wrapper(ReorderBuffer
*cache, ReorderBufferTXN *txn,
 XLogRecPtr last_lsn);
 static void stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
 XLogRecPtr abort_lsn);
+static void stream_prepare_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void stream_commit_prepared_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void stream_abort_prepared_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);

The 3rd parameter is always "commit_lsn" even for API unrelated to
commit, so seems like cut/paste error.

;

COMMENT
Line 1246
@@ -1231,6 +1243,105 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,
 }

 static void
+stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;

Misnamed parameter "commit_lsn" ?

~

Also, Line 1272
There seem to be some missing integrity checking to make sure the
callback is not NULL.
A null callback will give NPE when wrapper attempts to call it

;

COMMENT
Line 1305
+static void
+stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,

There seem to be some missing integrity checking to make sure the
callback is not NULL.
A null callback will give NPE when wrapper attempts to call it.

;

COMMENT
Line 1312
+static void
+stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,

Misnamed parameter "commit_lsn" ?

~

Also, Line 1338
There seem to be some missing integrity checking to make sure the
callback is not NULL.
A null callback will give NPE when wrapper attempts to call it.


==========
Patch V6-0004, File: src/backend/replication/logical/reorderbuffer.c
==========

COMMENT
Line 2684
@@ -2672,15 +2681,31 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb,
TransactionId xid,
 txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
 strcpy(txn->gid, gid);

- if (is_commit)
+ if (rbtxn_is_streamed(txn))
 {
- txn->txn_flags |= RBTXN_COMMIT_PREPARED;
- rb->commit_prepared(rb, txn, commit_lsn);
+ if (is_commit)
+ {
+ txn->txn_flags |= RBTXN_COMMIT_PREPARED;

The setting/checking of the flags could be refactored if you wanted to
write less code:
e.g.
if (is_commit)
 txn->txn_flags |= RBTXN_COMMIT_PREPARED;
else
 txn->txn_flags |= RBTXN_ROLLBACK_PREPARED;

if (rbtxn_is_streamed(txn) && rbtxn_commit_prepared(txn))
 rb->stream_commit_prepared(rb, txn, commit_lsn);
else if (rbtxn_is_streamed(txn) && rbtxn_rollback_prepared(txn))
 rb->stream_abort_prepared(rb, txn, commit_lsn);
else if (rbtxn_commit_prepared(txn))
 rb->commit_prepared(rb, txn, commit_lsn);
else if (rbtxn_rollback_prepared(txn))
 rb->abort_prepared(rb, txn, commit_lsn);

;

==========
Patch V6-0004, File: src/include/replication/output_plugin.h
==========

COMMENT
Line 171
@@ -157,6 +157,33 @@ typedef void (*LogicalDecodeStreamAbortCB)
(struct LogicalDecodingContext *ctx,
 XLogRecPtr abort_lsn);

 /*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit and only when
+ * two-phased commits are supported
+ */

1. Missing period all these comments.

2. Is the part that says "and only where two-phased commits are
supported" necessary to say? Is seems redundant since comments already
says called as part of a two-phase commit.

;

==========
Patch V6-0004, File: src/include/replication/reorderbuffer.h
==========

COMMENT
Line 467
@@ -466,6 +466,24 @@ typedef void (*ReorderBufferStreamAbortCB) (
 ReorderBufferTXN *txn,
 XLogRecPtr abort_lsn);

+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamCommitPreparedCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamAbortPreparedCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);

Cut/paste error - repeated same comment 3 times?


[END]



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: pg_dump bug for extension owned tables
Следующее
От: Mark Dilger
Дата:
Сообщение: Re: new heapcheck contrib module