On Saturday, November 19, 2022 6:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Fri, Nov 18, 2022 at 7:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Wed, Nov 16, 2022 at 1:50 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > > On Tuesday, November 15, 2022 7:58 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> > >
> > > I noticed that I didn't add CHECK_FOR_INTERRUPTS while retrying send
> message.
> > > So, attach the new version which adds that. Also attach the 0004
> > > patch that restarts logical replication with temporarily disabling
> > > the parallel apply if failed to apply a transaction in parallel apply worker.
> > >
> >
> > Few comments on v48-0001
Thanks for the comments !
> > ======================
> >
>
> I have made quite a few changes in the comments, added some new comments,
> and made other cosmetic changes in the attached patch. The is atop v48-0001*.
> If these look okay to you, please include them in the next version. Apart from
> these, I have a few more comments on
> v48-0001*
Thanks, I have checked and merge them.
> 1.
> +static bool
> +pa_can_start(TransactionId xid)
> +{
> + if (!TransactionIdIsValid(xid))
> + return false;
>
> The caller (see caller of pa_start_worker) already has a check that xid passed
> here is valid, so I think this should be an Assert unless I am missing something in
> which case it is better to add a comment here.
Changed to an Assert().
> 2. Will it be better to rename pa_start_worker() as
> pa_allocate_worker() because it sometimes gets the worker from the pool and
> also allocate the hash entry for worker info? That will even match the
> corresponding pa_free_worker().
Agreed and changed.
> 3.
> +pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
> {
> ...
> +
> + oldctx = MemoryContextSwitchTo(ApplyContext);
> + subxactlist = lappend_xid(subxactlist, current_xid);
> + MemoryContextSwitchTo(oldctx);
> ...
>
> Why do we need to allocate this list in a permanent context? IIUC, we need to
> use this to maintain subxacts so that it can be later used to find the given
> subxact at the time of rollback to savepoint in the current in-progress
> transaction, so why do we need it beyond the transaction being applied? If
> there is a reason for the same, it would be better to add some comments for
> the same.
I think you are right, I changed to use TopTransactionContext here.
> 4.
> +pa_stream_abort(LogicalRepStreamAbortData *abort_data)
> {
> ...
> +
> + for (i = list_length(subxactlist) - 1; i >= 0; i--) { TransactionId
> + xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
> +
> + if (xid_tmp == subxid)
> + {
> + found = true;
> + break;
> + }
> + }
> +
> + if (found)
> + {
> + RollbackToSavepoint(spname);
> + CommitTransactionCommand();
> + subxactlist = list_truncate(subxactlist, i + 1); }
>
> I was thinking whether we can have an Assert(false) for the not found case but it
> seems if all the changes of a subxact have been skipped then probably subxid
> corresponding to "rollback to savepoint" won't be found in subxactlist and we
> don't need to do anything for it. If that is the case, then probably adding a
> comment for it would be a good idea, otherwise, we can probably have
> Assert(false) in the else case.
Yes, we might not find the xid for an empty subtransaction. I added some comments
here for the same.
Apart from above, I also addressed the comments in [1] and fixed a bug that
parallel worker exits silently while the leader cannot detect that. In the
latest patch, the parallel apply worker will send a notify('X') message to
leader so that leader can detect the exit.
Here is the new version patch.
[1] https://www.postgresql.org/message-id/CAA4eK1KWgReYbpwEMh1H1ohHoYirv4Aa%3D6v13MutCF9NvHTc5A%40mail.gmail.com
Best regards,
Hou zj