Re: [HACKERS] logical decoding of two-phase transactions

Поиск
Список
Период
Сортировка
От Peter Smith
Тема Re: [HACKERS] logical decoding of two-phase transactions
Дата
Msg-id CAHut+Pto6OFj=bm5ORvKRFMzidaeTbw0iP13u6Rd6cCeEz5D0w@mail.gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] logical decoding of two-phase transactions  (Ajin Cherian <itsajin@gmail.com>)
Ответы RE: [HACKERS] logical decoding of two-phase transactions  ("tanghy.fnst@fujitsu.com" <tanghy.fnst@fujitsu.com>)
Список pgsql-hackers
On Sun, May 16, 2021 at 12:07 AM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Thu, May 13, 2021 at 7:50 PM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > Please find attached the latest patch set v75*
> >
> > Differences from v74* are:
> >
> > * Rebased to HEAD @ today.
> >
> > * v75 also addresses some of the feedback comments from Vignesh [1].
>
> Adding a patch to this patch-set that avoids empty transactions from
> being sent to the subscriber/replica. This patch is based on the
> logic that was proposed for empty transactions in the thread [1]. This
> patch uses that patch and handles empty prepared transactions
> as well. So, this will avoid empty prepared transactions from being
> sent to the subscriber/replica. This patch also avoids sending
> COMMIT PREPARED /ROLLBACK PREPARED if the prepared transaction was
> skipped provided the COMMIT /ROLLBACK happens
> prior to a restart of the walsender. If the COMMIT/ROLLBACK PREPARED
> happens after a restart, it will not be able know that the
> prepared transaction prior to the restart was not sent, in this case
> the apply worker of the subscription will check if a prepare of the
> same type exists
> and if it does not, it will silently ignore the COMMIT PREPARED
> (ROLLBACK PREPARED logic was already doing this).
> Do have a look and let me know if you have any comments.
>
> [1] - https://www.postgresql.org/message-id/CAFPTHDYegcoS3xjGBj0XHfcdZr6Y35%2BYG1jq79TBD1VCkK7v3A%40mail.gmail.com
>

Hi Ajin.

I have applied the latest patch set v76*.

The patches applied cleanly.

All of the make, make check, and TAP subscriptions tests worked OK.

Below are my REVIEW COMMENTS for the v76-0003 part.

==========

1.  File: doc/src/sgml/logicaldecoding.sgml

1.1

@@ -862,11 +862,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
       The required <function>commit_prepared_cb</function> callback is called
       whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
       The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> in which case
+      it can apply the rollback, otherwise, it can skip the rollback
operation. The
+      <parameter>gid</parameter> alone is not sufficient because the downstream
+      node can have a prepared transaction with same identifier.

This is in the commit prepared section, but that new text is referring
to "it can apply to the rollback" etc.
Is this deliberate text, or maybe cut/paste error?

==========

2. File: src/backend/replication/pgoutput/pgoutput.c

2.1

@@ -76,6 +78,7 @@ static void
pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,

 static bool publications_valid;
 static bool in_streaming;
+static bool in_prepared_txn;

Wondering why this is a module static flag. That makes it looks like
it somehow applies globally to all the functions in this scope, but
really I think this is just a txn property, right?
- e.g. why not use another member of the private TXN data instead? or
- e.g. why not use rbtxn_prepared(txn) macro?

----------

2.2

@@ -404,10 +410,32 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ (void)txn; /* keep compiler quiet */

I guess since now the arg "txn" is being used the added statement to
"keep compiler quiet" is now redundant, so should be removed.

----------

2.3

+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;

  OutputPluginPrepareWrite(ctx, !send_replication_origin);
  logicalrep_write_begin(ctx->out, txn);
+ data->sent_begin_txn = true;


I wondered is it worth adding Assert(data); here?

----------

2.4

@@ -422,8 +450,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

I wondered is it worthwhile to add Assert(data); here also?

----------

2.5
@@ -422,8 +450,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /* skip COMMIT message if nothing was sent */
+ if (!data->sent_begin_txn)
+ return;

Shouldn't this code also be freeing that allocated data? I think you
do free it in similar functions later in this patch.

----------

2.6

@@ -435,10 +469,31 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
 static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->sent_begin_txn = false;
+ txn->output_plugin_private = data;
+ in_prepared_txn = true;
+}

Apart from setting the in_prepared_txn = true; this is all identical
code to pgoutput_begin_txn so you could consider just delegating to
call that other function to save all the cut/paste data allocation and
big comment. Or maybe this way is better - I am not sure.

----------

2.7

+static void
+pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;

  OutputPluginPrepareWrite(ctx, !send_replication_origin);
  logicalrep_write_begin_prepare(ctx->out, txn);
+ data->sent_begin_txn = true;

I wondered is it worth adding Assert(data); here also?

----------

2.8

@@ -453,11 +508,18 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr prepare_lsn)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

I wondered is it worth adding Assert(data); here also?

----------

2.9

@@ -465,12 +527,28 @@ pgoutput_prepare_txn(LogicalDecodingContext
*ctx, ReorderBufferTXN *txn,
  */
 static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data && !data->sent_begin_txn)
+ {
+ pfree(data);
+ return;
+ }
+
+ if (data)
+ pfree(data);
  OutputPluginPrepareWrite(ctx, true);

I think this pfree logic might be refactored more simply to just be
done in one place. e.g. like:

if (data)
{
    bool skip = !data->sent_begin_txn;
    pfree(data);
    if (skip)
        return;
}

BTW, is it even possible to get in this function with NULL private
data? Perhaps that should be an Assert(data) ?

----------

2.10

@@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     XLogRecPtr prepare_end_lsn,
     TimestampTz prepare_time)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data && !data->sent_begin_txn)
+ {
+ pfree(data);
+ return;
+ }
+
+ if (data)
+ pfree(data);

Same comment as above for refactoring the pfree logic.

----------

2.11

@@ -483,8 +561,22 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     XLogRecPtr prepare_end_lsn,
     TimestampTz prepare_time)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data && !data->sent_begin_txn)
+ {
+ pfree(data);
+ return;
+ }
+
+ if (data)
+ pfree(data);

Is that comment correct or cut/paste error? Why does it say "COMMIT PREPARED" ?

----------

2.12

@@ -613,6 +705,7 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Relation relation, ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
  MemoryContext old;

I wondered is it worth adding Assert(txndata); here also?

----------

2.13

@@ -750,6 +852,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
    int nrelations, Relation relations[], ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
  MemoryContext old;

I wondered is it worth adding Assert(txndata); here also?

----------

2.14

@@ -813,11 +925,15 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  const char *message)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
  TransactionId xid = InvalidTransactionId;

  if (!data->messages)
  return;

+ if (txn && txn->output_plugin_private)
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
  /*
  * Remember the xid for the message in streaming mode. See
  * pgoutput_change.
@@ -825,6 +941,19 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  if (in_streaming)
  xid = txn->xid;

+ /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+ if (!in_streaming && transactional)
+ {
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ if (!txndata->sent_begin_txn)
+ {
+ if (!in_prepared_txn)
+ pgoutput_begin(ctx, txn);
+ else
+ pgoutput_begin_prepare(ctx, txn);
+ }
+ }

That code:
+ if (txn && txn->output_plugin_private)
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
looked misplaced to me.

Shouldn't all that be relocated to be put inside the if block:
+ if (!in_streaming && transactional)

And when you do that maybe the condition can be simplified because you could
Assert(txn);

==========

3. File src/include/replication/pgoutput.h

3.1

@@ -30,4 +30,9 @@ typedef struct PGOutputData
  bool two_phase;
 } PGOutputData;

+typedef struct PGOutputTxnData
+{
+ bool sent_begin_txn; /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+

Why is this typedef here? IIUC it is only used inside the pgoutput.c,
so shouldn't it be declared in that file also?

----------

3.2

@@ -30,4 +30,9 @@ typedef struct PGOutputData
  bool two_phase;
 } PGOutputData;

+typedef struct PGOutputTxnData
+{
+ bool sent_begin_txn; /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+

That is a new typedef so maybe your patch also should update the
src/tools/pgindent/typedefs.list to name this new typedef.

----------
Kind Regards,
Peter Smith.
Fujitsu Australia



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Dilip Kumar
Дата:
Сообщение: Re: pg_get_wal_replay_pause_state() should not return 'paused' while a promotion is ongoing.
Следующее
От: Magnus Hagander
Дата:
Сообщение: Re: allow specifying direct role membership in pg_hba.conf