On Mon, Dec 26, 2022 at 6:33 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> ---
> + if (!pa_can_start(xid))
> + return;
> +
> + /* First time through, initialize parallel apply worker state
> hashtable. */
> + if (!ParallelApplyTxnHash)
> + {
> + HASHCTL ctl;
> +
> + MemSet(&ctl, 0, sizeof(ctl));
> + ctl.keysize = sizeof(TransactionId);
> + ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
> + ctl.hcxt = ApplyContext;
> +
> + ParallelApplyTxnHash = hash_create("logical
> replication parallel apply workershash",
> +
> 16, &ctl,
> +
> HASH_ELEM |HASH_BLOBS | HASH_CONTEXT);
> + }
> +
> + /*
> + * It's necessary to reread the subscription information
> before assigning
> + * the transaction to a parallel apply worker. Otherwise, the
> leader may
> + * not be able to reread the subscription information if streaming
> + * transactions keep coming and are handled by parallel apply workers.
> + */
> + maybe_reread_subscription();
>
> pa_can_start() checks if the skiplsn is an invalid xid or not, and
> then maybe_reread_subscription() could update the skiplsn to a valid
> value. As the comments in pa_can_start() says, it won't work. I think
> we should call maybe_reread_subscription() in
> apply_handle_stream_start() before calling pa_allocate_worker().
>
But I think a similar thing can happen when we start the worker and
then before the transaction ends, we do maybe_reread_subscription(). I
think we should try to call maybe_reread_subscription() when we are
reasonably sure that we are going to enter parallel mode, otherwise,
anyway, it will be later called by the leader worker.
--
With Regards,
Amit Kapila.