Re: [HACKERS] logical decoding of two-phase transactions

Поиск
Список
Период
Сортировка
От vignesh C
Тема Re: [HACKERS] logical decoding of two-phase transactions
Дата
Msg-id CALDaNm0u=QGwd7jDAj-4u=7vvPn5rarFjBMCgfiJbDte55CWAA@mail.gmail.com
обсуждение исходный текст
Ответ на Re: [HACKERS] logical decoding of two-phase transactions  (Peter Smith <smithpb2250@gmail.com>)
Ответы Re: [HACKERS] logical decoding of two-phase transactions  (Ajin Cherian <itsajin@gmail.com>)
Re: [HACKERS] logical decoding of two-phase transactions  (Peter Smith <smithpb2250@gmail.com>)
Список pgsql-hackers
On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > Please find attached the latest patch set v73`*
> >
> > Differences from v72* are:
> >
> > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly)
> >
> > * Minor documentation correction for protocol messages for Commit Prepared ('K')
> >
> > * Non-functional code tidy (mostly proto.c) to reduce overloading
> > different meanings to same member names for prepare/commit times.
>
>
> Please find attached a re-posting of patch set v73*
>
> This is the same as yesterday's v73 but with a contrib module compile
> error fixed.

Few comments on
v73-0002-Add-prepare-API-support-for-streaming-transactio.patch patch:
1) There are slight differences in error message in case of Alter
subscription ... drop publication, we can keep the error message
similar:
postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH
(refresh = false, copy_data=true, two_phase=true);
ERROR:  unrecognized subscription parameter: "copy_data"
postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH
(refresh = false, two_phase=true, streaming=true);
ERROR:  cannot alter two_phase option

2) We are sending txn->xid twice, I felt we should send only once in
logicalrep_write_stream_prepare:
+       /* transaction ID */
+       Assert(TransactionIdIsValid(txn->xid));
+       pq_sendint32(out, txn->xid);
+
+       /* send the flags field */
+       pq_sendbyte(out, flags);
+
+       /* send fields */
+       pq_sendint64(out, prepare_lsn);
+       pq_sendint64(out, txn->end_lsn);
+       pq_sendint64(out, txn->u_op_time.prepare_time);
+       pq_sendint32(out, txn->xid);
+

3) We could remove xid and return prepare_data->xid
+TransactionId
+logicalrep_read_stream_prepare(StringInfo in,
LogicalRepPreparedTxnData *prepare_data)
+{
+       TransactionId xid;
+       uint8           flags;
+
+       xid = pq_getmsgint(in, 4);

4) Here comments can be above apply_spooled_messages for better readability
+       /*
+        * 1. Replay all the spooled operations - Similar code as for
+        * apply_handle_stream_commit (i.e. non two-phase stream commit)
+        */
+
+       ensure_transaction();
+
+       nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn);
+

5) Similarly this below comment can be above PrepareTransactionBlock
+       /*
+        * 2. Mark the transaction as prepared. - Similar code as for
+        * apply_handle_prepare (i.e. two-phase non-streamed prepare)
+        */
+
+       /*
+        * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+        * called within the PrepareTransactionBlock below.
+        */
+       BeginTransactionBlock();
+       CommitTransactionCommand();
+
+       /*
+        * Update origin state so we can restart streaming from correct position
+        * in case of crash.
+        */
+       replorigin_session_origin_lsn = prepare_data.end_lsn;
+       replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+       PrepareTransactionBlock(gid);
+       CommitTransactionCommand();
+
+       pgstat_report_stat(false);

6) There is a lot of common code between apply_handle_stream_prepare
and apply_handle_prepare, if possible try to have a common function to
avoid fixing at both places.
+       /*
+        * 2. Mark the transaction as prepared. - Similar code as for
+        * apply_handle_prepare (i.e. two-phase non-streamed prepare)
+        */
+
+       /*
+        * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+        * called within the PrepareTransactionBlock below.
+        */
+       BeginTransactionBlock();
+       CommitTransactionCommand();
+
+       /*
+        * Update origin state so we can restart streaming from correct position
+        * in case of crash.
+        */
+       replorigin_session_origin_lsn = prepare_data.end_lsn;
+       replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+       PrepareTransactionBlock(gid);
+       CommitTransactionCommand();
+
+       pgstat_report_stat(false);
+
+       store_flush_position(prepare_data.end_lsn);

7) two-phase commit is slightly misleading, we can just mention
streaming prepare.
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+                                                       ReorderBufferTXN *txn,
+                                                       XLogRecPtr prepare_lsn)

8) should we include Assert of in_streaming similar to other
pgoutput_stream*** functions.
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+                                                       ReorderBufferTXN *txn,
+                                                       XLogRecPtr prepare_lsn)
+{
+       Assert(rbtxn_is_streamed(txn));
+
+       OutputPluginUpdateProgress(ctx);
+       OutputPluginPrepareWrite(ctx, true);
+       logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+       OutputPluginWrite(ctx, true);
+}

9) Here also, we can verify that the transaction is streamed by
checking the pg_stat_replication_slots.
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*),
count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed
on subscriber, and extra columns contain local defaults');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');

Regards,
Vignesh



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Michael Paquier
Дата:
Сообщение: Re: [PATCH] Re: pg_identify_object_as_address() doesn't support pg_event_trigger oids
Следующее
От: Aleksander Alekseev
Дата:
Сообщение: Re: Bug fix for tab completion of ALTER TABLE ... VALIDATE CONSTRAINT ...