Обсуждение: logical replication empty transactions

Поиск
Список
Период
Сортировка

logical replication empty transactions

От
Jeff Janes
Дата:
After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way more network traffic than made sense.  Looking into I see that every transaction in that database on the master gets sent to the replica.  99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between) because the transactions don't touch any tables in the publication, only non-replicated tables.  Is doing it this way necessary for some reason?  Couldn't we hold the transmission of 'B' until something else comes along, and then if that next thing is 'C' drop both of them?

There is a comment for WalSndPrepareWrite which seems to foreshadow such a thing, but I don't really see how to use it in this case.  I want to drop two messages, not one.
 
 * Don't do anything lasting in here, it's quite possible that nothing will be done
 * with the data.

This applies to all version which have support for pub/sub, including the recent commits to 13dev.

I've searched through the voluminous mailing list threads for when this feature was being presented to see if it was already discussed, but since every word I can think to search on occurs in virtually every message in the threads in some context or another, I didn't have much luck.

Cheers,

Jeff

Re: logical replication empty transactions

От
Euler Taveira
Дата:
Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:
>
> After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way
morenetwork traffic than made sense.  Looking into I see that every transaction in that database on the master gets
sentto the replica.  99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between)
becausethe transactions don't touch any tables in the publication, only non-replicated tables.  Is doing it this way
necessaryfor some reason?  Couldn't we hold the transmission of 'B' until something else comes along, and then if that
nextthing is 'C' drop both of them? 
>
That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.


--
   Euler Taveira                                   Timbira -
http://www.timbira.com.br/
   PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento

Вложения

Re: logical replication empty transactions

От
Jeff Janes
Дата:
On Fri, Nov 8, 2019 at 8:59 PM Euler Taveira <euler@timbira.com.br> wrote:
Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.janes@gmail.com> escreveu:
>
> After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way more network traffic than made sense.  Looking into I see that every transaction in that database on the master gets sent to the replica.  99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between) because the transactions don't touch any tables in the publication, only non-replicated tables.  Is doing it this way necessary for some reason?  Couldn't we hold the transmission of 'B' until something else comes along, and then if that next thing is 'C' drop both of them?
>
That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.

Thanks.  I didn't think it would be that simple, because I thought we would need some way to fake an acknowledgement for any dropped empty transactions, to keep the LSN advancing and allow WAL to get recycled on the master.  But it turns out the opposite.  While your patch drops the network traffic by a lot, there is still a lot of traffic.  Now it is keep-alives, rather than 'B' and 'C'.  I don't know why I am getting a few hundred keep alives every second when the timeouts are at their defaults, but it is better than several thousand 'B' and 'C'.
 
My setup here was just to create, publish, and subscribe to a inactive dummy table, while having pgbench running on the master (with unpublished tables).  I have not created an intentionally slow network, but I am testing it over wifi, which is inherently kind of slow.

Cheers,

Jeff

Re: logical replication empty transactions

От
Dilip Kumar
Дата:
On Sat, Nov 9, 2019 at 7:29 AM Euler Taveira <euler@timbira.com.br> wrote:
>
> Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
> <jeff.janes@gmail.com> escreveu:
> >
> > After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed way
morenetwork traffic than made sense.  Looking into I see that every transaction in that database on the master gets
sentto the replica.  99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between)
becausethe transactions don't touch any tables in the publication, only non-replicated tables.  Is doing it this way
necessaryfor some reason?  Couldn't we hold the transmission of 'B' until something else comes along, and then if that
nextthing is 'C' drop both of them? 
> >
> That is not optimal. Those empty transactions is a waste of bandwidth.
> We can suppress them if no changes will be sent. test_decoding
> implements "skip empty transaction" as you described above and I did
> something similar to it. Patch is attached.

I think this significantly reduces the network bandwidth for empty
transactions.  I have briefly reviewed the patch and it looks good to
me.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Mon, Mar 2, 2020 at 9:01 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Sat, Nov 9, 2019 at 7:29 AM Euler Taveira <euler@timbira.com.br> wrote:
> >
> > Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
> > <jeff.janes@gmail.com> escreveu:
> > >
> > > After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed
waymore network traffic than made sense.  Looking into I see that every transaction in that database on the master gets
sentto the replica.  99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between)
becausethe transactions don't touch any tables in the publication, only non-replicated tables.  Is doing it this way
necessaryfor some reason?  Couldn't we hold the transmission of 'B' until something else comes along, and then if that
nextthing is 'C' drop both of them? 
> > >
> > That is not optimal. Those empty transactions is a waste of bandwidth.
> > We can suppress them if no changes will be sent. test_decoding
> > implements "skip empty transaction" as you described above and I did
> > something similar to it. Patch is attached.
>
> I think this significantly reduces the network bandwidth for empty
> transactions.  I have briefly reviewed the patch and it looks good to
> me.
>

One thing that is not clear to me is how will we advance restart_lsn
if we don't send any empty xact in a system where there are many such
xacts?  IIRC, the restart_lsn is advanced based on confirmed_flush lsn
sent by subscriber.  After this change, the subscriber won't be able
to send the confirmed_flush and for a long time, we won't be able to
advance restart_lsn.  Is that correct, if so, why do we think that is
acceptable?  One might argue that restart_lsn will be advanced as soon
as we send the first non-empty xact, but not sure if that is good
enough.  What do you think?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Dilip Kumar
Дата:
On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Mon, Mar 2, 2020 at 9:01 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Sat, Nov 9, 2019 at 7:29 AM Euler Taveira <euler@timbira.com.br> wrote:
> > >
> > > Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
> > > <jeff.janes@gmail.com> escreveu:
> > > >
> > > > After setting up logical replication of a slowly changing table using the built in pub/sub facility, I noticed
waymore network traffic than made sense.  Looking into I see that every transaction in that database on the master gets
sentto the replica.  99.999+% of them are empty transactions ('B' message and 'C' message with nothing in between)
becausethe transactions don't touch any tables in the publication, only non-replicated tables.  Is doing it this way
necessaryfor some reason?  Couldn't we hold the transmission of 'B' until something else comes along, and then if that
nextthing is 'C' drop both of them? 
> > > >
> > > That is not optimal. Those empty transactions is a waste of bandwidth.
> > > We can suppress them if no changes will be sent. test_decoding
> > > implements "skip empty transaction" as you described above and I did
> > > something similar to it. Patch is attached.
> >
> > I think this significantly reduces the network bandwidth for empty
> > transactions.  I have briefly reviewed the patch and it looks good to
> > me.
> >
>
> One thing that is not clear to me is how will we advance restart_lsn
> if we don't send any empty xact in a system where there are many such
> xacts?  IIRC, the restart_lsn is advanced based on confirmed_flush lsn
> sent by subscriber.  After this change, the subscriber won't be able
> to send the confirmed_flush and for a long time, we won't be able to
> advance restart_lsn.  Is that correct, if so, why do we think that is
> acceptable?  One might argue that restart_lsn will be advanced as soon
> as we send the first non-empty xact, but not sure if that is good
> enough.  What do you think?

It seems like a valid point.  One idea could be that we can track the
last commit LSN which we streamed and if the confirmed flush location
is already greater than that then even if we skip the sending the
commit message we can increase the confirm flush location locally.
Logically, it should not cause any problem because once we have got
the confirmation for whatever we have streamed so far.  So for other
commits(which we are skipping), we can we advance it locally because
we are sure that we don't have any streamed commit which is not yet
confirmed by the subscriber.   This is just my thought, but if we
think from the code and design perspective then it might complicate
the things and sounds hackish.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Tue, Mar 3, 2020 at 9:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> >
> > One thing that is not clear to me is how will we advance restart_lsn
> > if we don't send any empty xact in a system where there are many such
> > xacts?  IIRC, the restart_lsn is advanced based on confirmed_flush lsn
> > sent by subscriber.  After this change, the subscriber won't be able
> > to send the confirmed_flush and for a long time, we won't be able to
> > advance restart_lsn.  Is that correct, if so, why do we think that is
> > acceptable?  One might argue that restart_lsn will be advanced as soon
> > as we send the first non-empty xact, but not sure if that is good
> > enough.  What do you think?
>
> It seems like a valid point.  One idea could be that we can track the
> last commit LSN which we streamed and if the confirmed flush location
> is already greater than that then even if we skip the sending the
> commit message we can increase the confirm flush location locally.
> Logically, it should not cause any problem because once we have got
> the confirmation for whatever we have streamed so far.  So for other
> commits(which we are skipping), we can we advance it locally because
> we are sure that we don't have any streamed commit which is not yet
> confirmed by the subscriber.
>

Will this work after restart?  Do you want to persist the information
of last streamed commit LSN?

>   This is just my thought, but if we
> think from the code and design perspective then it might complicate
> the things and sounds hackish.
>

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts.  This will reduce the traffic without tinkering with the core
design too much.

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Dilip Kumar
Дата:
On Tue, Mar 3, 2020 at 1:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Tue, Mar 3, 2020 at 9:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > >
> > > One thing that is not clear to me is how will we advance restart_lsn
> > > if we don't send any empty xact in a system where there are many such
> > > xacts?  IIRC, the restart_lsn is advanced based on confirmed_flush lsn
> > > sent by subscriber.  After this change, the subscriber won't be able
> > > to send the confirmed_flush and for a long time, we won't be able to
> > > advance restart_lsn.  Is that correct, if so, why do we think that is
> > > acceptable?  One might argue that restart_lsn will be advanced as soon
> > > as we send the first non-empty xact, but not sure if that is good
> > > enough.  What do you think?
> >
> > It seems like a valid point.  One idea could be that we can track the
> > last commit LSN which we streamed and if the confirmed flush location
> > is already greater than that then even if we skip the sending the
> > commit message we can increase the confirm flush location locally.
> > Logically, it should not cause any problem because once we have got
> > the confirmation for whatever we have streamed so far.  So for other
> > commits(which we are skipping), we can we advance it locally because
> > we are sure that we don't have any streamed commit which is not yet
> > confirmed by the subscriber.
> >
>
> Will this work after restart?  Do you want to persist the information
> of last streamed commit LSN?

We will not persist the last streamed commit LSN, this variable is in
memory just to track whether we have got confirmation up to that
location or not,  once we have confirmation up to that location and if
we are not streaming any transaction (because those are empty
transactions) then we can just advance the confirmed flush location
and based on that we can update the restart point as well and those
will be persisted.  Basically, "last streamed commit LSN" is just a
marker that their still something pending to be confirmed from the
subscriber so until that we can not simply advance the confirm flush
location or restart point based on the empty transactions.  But, if
there is nothing pending to be confirmed we can advance.  So if we are
streaming then we will get confirmation from subscriber otherwise we
can advance it locally.  So, in either case, the confirmed flush
location and restart point will keep moving.

>
> >   This is just my thought, but if we
> > think from the code and design perspective then it might complicate
> > the things and sounds hackish.
> >
>
> Another idea could be that we stream the transaction after some
> threshold number (say 100 or anything we think is reasonable) of empty
> xacts.  This will reduce the traffic without tinkering with the core
> design too much.

Yeah, this could be also an option.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Tue, Mar 3, 2020 at 2:17 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Tue, Mar 3, 2020 at 1:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Tue, Mar 3, 2020 at 9:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > >
> > > On Mon, Mar 2, 2020 at 4:56 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > > >
> > > >
> > > > One thing that is not clear to me is how will we advance restart_lsn
> > > > if we don't send any empty xact in a system where there are many such
> > > > xacts?  IIRC, the restart_lsn is advanced based on confirmed_flush lsn
> > > > sent by subscriber.  After this change, the subscriber won't be able
> > > > to send the confirmed_flush and for a long time, we won't be able to
> > > > advance restart_lsn.  Is that correct, if so, why do we think that is
> > > > acceptable?  One might argue that restart_lsn will be advanced as soon
> > > > as we send the first non-empty xact, but not sure if that is good
> > > > enough.  What do you think?
> > >
> > > It seems like a valid point.  One idea could be that we can track the
> > > last commit LSN which we streamed and if the confirmed flush location
> > > is already greater than that then even if we skip the sending the
> > > commit message we can increase the confirm flush location locally.
> > > Logically, it should not cause any problem because once we have got
> > > the confirmation for whatever we have streamed so far.  So for other
> > > commits(which we are skipping), we can we advance it locally because
> > > we are sure that we don't have any streamed commit which is not yet
> > > confirmed by the subscriber.
> > >
> >
> > Will this work after restart?  Do you want to persist the information
> > of last streamed commit LSN?
>
> We will not persist the last streamed commit LSN, this variable is in
> memory just to track whether we have got confirmation up to that
> location or not,  once we have confirmation up to that location and if
> we are not streaming any transaction (because those are empty
> transactions) then we can just advance the confirmed flush location
> and based on that we can update the restart point as well and those
> will be persisted.  Basically, "last streamed commit LSN" is just a
> marker that their still something pending to be confirmed from the
> subscriber so until that we can not simply advance the confirm flush
> location or restart point based on the empty transactions.  But, if
> there is nothing pending to be confirmed we can advance.  So if we are
> streaming then we will get confirmation from subscriber otherwise we
> can advance it locally.  So, in either case, the confirmed flush
> location and restart point will keep moving.
>

Okay, so this might work out, but it might look a bit ad-hoc.

> >
> > >   This is just my thought, but if we
> > > think from the code and design perspective then it might complicate
> > > the things and sounds hackish.
> > >
> >
> > Another idea could be that we stream the transaction after some
> > threshold number (say 100 or anything we think is reasonable) of empty
> > xacts.  This will reduce the traffic without tinkering with the core
> > design too much.
>
> Yeah, this could be also an option.
>

Okay.

Peter E, Petr J, others, do you have any opinion on what is the best
way forward for this thread?  I think it would be really good if we
can reduce the network traffic due to these empty transactions.

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Euler Taveira
Дата:
On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:

Another idea could be that we stream the transaction after some
threshold number (say 100 or anything we think is reasonable) of empty
xacts.  This will reduce the traffic without tinkering with the core
design too much.


Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't (depending on workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute thousands of transactions in a second. If we agree on that idea I can add it to the patch.


Regards,
 

--
Euler Taveira                 http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
<euler.taveira@2ndquadrant.com> wrote:
>
> On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:
>>
>>
>> Another idea could be that we stream the transaction after some
>> threshold number (say 100 or anything we think is reasonable) of empty
>> xacts.  This will reduce the traffic without tinkering with the core
>> design too much.
>>
>>
> Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't
(dependingon workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute
thousandsof transactions in a second. If we agree on that idea I can add it to the patch. 
>

Do you mean to say that if for some threshold interval we didn't
stream any transaction, then we can send the next empty transaction to
the subscriber?  If so, then isn't it possible that the empty xacts
happen irregularly after the specified interval and then we still end
up sending them all.  I might be missing something here, so can you
please explain your idea in detail?  Basically, how will it work and
how will it solve the problem.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Dilip Kumar
Дата:
On Wed, Mar 4, 2020 at 9:12 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
> <euler.taveira@2ndquadrant.com> wrote:
> >
> > On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:
> >>
> >>
> >> Another idea could be that we stream the transaction after some
> >> threshold number (say 100 or anything we think is reasonable) of empty
> >> xacts.  This will reduce the traffic without tinkering with the core
> >> design too much.
> >>
> >>
> > Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't
(dependingon workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute
thousandsof transactions in a second. If we agree on that idea I can add it to the patch. 
> >
>
> Do you mean to say that if for some threshold interval we didn't
> stream any transaction, then we can send the next empty transaction to
> the subscriber?  If so, then isn't it possible that the empty xacts
> happen irregularly after the specified interval and then we still end
> up sending them all.  I might be missing something here, so can you
> please explain your idea in detail?  Basically, how will it work and
> how will it solve the problem.

IMHO, the threshold should be based on the commit LSN.  Our main
reason we want to send empty transactions after a certain
transaction/duration is that we want the restart_lsn to be moving
forward so that if we need to restart the replication slot we don't
need to process a lot of extra WAL.  So assume we set the threshold
based on transaction count then there is still a possibility that we
might process a few very big transactions then we will have to process
them again after the restart.  OTOH, if we set based on an interval
then even if there is not much work going on, still we end up sending
the empty transaction as pointed by Amit.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Wed, Mar 4, 2020 at 9:12 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
> > <euler.taveira@2ndquadrant.com> wrote:
> > >
> > > On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >>
> > >>
> > >> Another idea could be that we stream the transaction after some
> > >> threshold number (say 100 or anything we think is reasonable) of empty
> > >> xacts.  This will reduce the traffic without tinkering with the core
> > >> design too much.
> > >>
> > >>
> > > Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't
(dependingon workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute
thousandsof transactions in a second. If we agree on that idea I can add it to the patch. 
> > >
> >
> > Do you mean to say that if for some threshold interval we didn't
> > stream any transaction, then we can send the next empty transaction to
> > the subscriber?  If so, then isn't it possible that the empty xacts
> > happen irregularly after the specified interval and then we still end
> > up sending them all.  I might be missing something here, so can you
> > please explain your idea in detail?  Basically, how will it work and
> > how will it solve the problem.
>
> IMHO, the threshold should be based on the commit LSN.  Our main
> reason we want to send empty transactions after a certain
> transaction/duration is that we want the restart_lsn to be moving
> forward so that if we need to restart the replication slot we don't
> need to process a lot of extra WAL.  So assume we set the threshold
> based on transaction count then there is still a possibility that we
> might process a few very big transactions then we will have to process
> them again after the restart.
>

Won't the subscriber eventually send the flush location for the large
transactions which will move the restart_lsn?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Dilip Kumar
Дата:
On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Wed, Mar 4, 2020 at 9:12 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > > On Wed, Mar 4, 2020 at 7:17 AM Euler Taveira
> > > <euler.taveira@2ndquadrant.com> wrote:
> > > >
> > > > On Tue, 3 Mar 2020 at 05:24, Amit Kapila <amit.kapila16@gmail.com> wrote:
> > > >>
> > > >>
> > > >> Another idea could be that we stream the transaction after some
> > > >> threshold number (say 100 or anything we think is reasonable) of empty
> > > >> xacts.  This will reduce the traffic without tinkering with the core
> > > >> design too much.
> > > >>
> > > >>
> > > > Amit, I suggest an interval to control this setting. Time is something we have control; transactions aren't
(dependingon workload). pg_stat_replication query interval usually is not milliseconds, however, you can execute
thousandsof transactions in a second. If we agree on that idea I can add it to the patch. 
> > > >
> > >
> > > Do you mean to say that if for some threshold interval we didn't
> > > stream any transaction, then we can send the next empty transaction to
> > > the subscriber?  If so, then isn't it possible that the empty xacts
> > > happen irregularly after the specified interval and then we still end
> > > up sending them all.  I might be missing something here, so can you
> > > please explain your idea in detail?  Basically, how will it work and
> > > how will it solve the problem.
> >
> > IMHO, the threshold should be based on the commit LSN.  Our main
> > reason we want to send empty transactions after a certain
> > transaction/duration is that we want the restart_lsn to be moving
> > forward so that if we need to restart the replication slot we don't
> > need to process a lot of extra WAL.  So assume we set the threshold
> > based on transaction count then there is still a possibility that we
> > might process a few very big transactions then we will have to process
> > them again after the restart.
> >
>
> Won't the subscriber eventually send the flush location for the large
> transactions which will move the restart_lsn?

I meant large empty transactions (basically we can not send anything
to the subscriber).  So my point was if there are only large
transactions in the system which we can not stream because those
tables are not published.  Then keeping threshold based on transaction
count will not help much because even if we don't reach the
transaction count threshold, we still might need to process a lot of
data if we don't stream the commit for the empty transactions.  So
instead of tracking transaction count can we track LSN,   and LSN
different since we last stream some change cross the threshold then we
will stream the next empty transaction.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Mar 4, 2020 at 11:16 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > >
> > >
> > > IMHO, the threshold should be based on the commit LSN.  Our main
> > > reason we want to send empty transactions after a certain
> > > transaction/duration is that we want the restart_lsn to be moving
> > > forward so that if we need to restart the replication slot we don't
> > > need to process a lot of extra WAL.  So assume we set the threshold
> > > based on transaction count then there is still a possibility that we
> > > might process a few very big transactions then we will have to process
> > > them again after the restart.
> > >
> >
> > Won't the subscriber eventually send the flush location for the large
> > transactions which will move the restart_lsn?
>
> I meant large empty transactions (basically we can not send anything
> to the subscriber).  So my point was if there are only large
> transactions in the system which we can not stream because those
> tables are not published.  Then keeping threshold based on transaction
> count will not help much because even if we don't reach the
> transaction count threshold, we still might need to process a lot of
> data if we don't stream the commit for the empty transactions.  So
> instead of tracking transaction count can we track LSN,   and LSN
> different since we last stream some change cross the threshold then we
> will stream the next empty transaction.
>

You have a point and it may be better to keep threshold based on LSN
if we want to keep any threshold, but keeping on transaction count
seems to be a bit straightforward.  Let us see if anyone else has any
opinion on this matter?

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Dilip Kumar
Дата:
On Wed, Mar 4, 2020 at 3:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Mar 4, 2020 at 11:16 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> >
> > On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > >
> > > On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > > >
> > > >
> > > > IMHO, the threshold should be based on the commit LSN.  Our main
> > > > reason we want to send empty transactions after a certain
> > > > transaction/duration is that we want the restart_lsn to be moving
> > > > forward so that if we need to restart the replication slot we don't
> > > > need to process a lot of extra WAL.  So assume we set the threshold
> > > > based on transaction count then there is still a possibility that we
> > > > might process a few very big transactions then we will have to process
> > > > them again after the restart.
> > > >
> > >
> > > Won't the subscriber eventually send the flush location for the large
> > > transactions which will move the restart_lsn?
> >
> > I meant large empty transactions (basically we can not send anything
> > to the subscriber).  So my point was if there are only large
> > transactions in the system which we can not stream because those
> > tables are not published.  Then keeping threshold based on transaction
> > count will not help much because even if we don't reach the
> > transaction count threshold, we still might need to process a lot of
> > data if we don't stream the commit for the empty transactions.  So
> > instead of tracking transaction count can we track LSN,   and LSN
> > different since we last stream some change cross the threshold then we
> > will stream the next empty transaction.
> >
>
> You have a point and it may be better to keep threshold based on LSN
> if we want to keep any threshold, but keeping on transaction count
> seems to be a bit straightforward.  Let us see if anyone else has any
> opinion on this matter?

Ok, that make sense.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Mar 4, 2020 at 4:04 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Wed, Mar 4, 2020 at 3:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Wed, Mar 4, 2020 at 11:16 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > >
> > > On Wed, Mar 4, 2020 at 10:50 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > > >
> > > > On Wed, Mar 4, 2020 at 9:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
> > > > >
> > > > >
> > > > > IMHO, the threshold should be based on the commit LSN.  Our main
> > > > > reason we want to send empty transactions after a certain
> > > > > transaction/duration is that we want the restart_lsn to be moving
> > > > > forward so that if we need to restart the replication slot we don't
> > > > > need to process a lot of extra WAL.  So assume we set the threshold
> > > > > based on transaction count then there is still a possibility that we
> > > > > might process a few very big transactions then we will have to process
> > > > > them again after the restart.
> > > > >
> > > >
> > > > Won't the subscriber eventually send the flush location for the large
> > > > transactions which will move the restart_lsn?
> > >
> > > I meant large empty transactions (basically we can not send anything
> > > to the subscriber).  So my point was if there are only large
> > > transactions in the system which we can not stream because those
> > > tables are not published.  Then keeping threshold based on transaction
> > > count will not help much because even if we don't reach the
> > > transaction count threshold, we still might need to process a lot of
> > > data if we don't stream the commit for the empty transactions.  So
> > > instead of tracking transaction count can we track LSN,   and LSN
> > > different since we last stream some change cross the threshold then we
> > > will stream the next empty transaction.
> > >
> >
> > You have a point and it may be better to keep threshold based on LSN
> > if we want to keep any threshold, but keeping on transaction count
> > seems to be a bit straightforward.  Let us see if anyone else has any
> > opinion on this matter?
>
> Ok, that make sense.
>

Euler, can we try to update the patch based on the number of
transactions threshold and see how it works?

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



Re: logical replication empty transactions

От
Euler Taveira
Дата:
On Thu, 5 Mar 2020 at 05:45, Amit Kapila <amit.kapila16@gmail.com> wrote: 
Euler, can we try to update the patch based on the number of
transactions threshold and see how it works?

I will do.


--
Euler Taveira                 http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Re: logical replication empty transactions

От
Craig Ringer
Дата:
On Mon, 2 Mar 2020 at 19:26, Amit Kapila <amit.kapila16@gmail.com> wrote:

> One thing that is not clear to me is how will we advance restart_lsn
> if we don't send any empty xact in a system where there are many such
> xacts?

Same way we already do it for writes that are not replicated over
logical replication, like vacuum work etc. The upstream sends feedback
with reply-requested. The downstream replies. The upstream advances
confirmed_flush_lsn, and that lazily updates restart_lsn.

The bigger issue here is that if you don't send empty txns on logical
replication you don't get an eager, timely response from the
replica(s), which delays synchronous replication. You need to send
empty txns when synchronous replication is enabled, or instead poke
the walsender to force immediate feedback with reply requested.


-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 2ndQuadrant - PostgreSQL Solutions for the Enterprise



Re: logical replication empty transactions

От
Andres Freund
Дата:
Hi,

On 2020-03-06 13:53:02 +0800, Craig Ringer wrote:
> On Mon, 2 Mar 2020 at 19:26, Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> > One thing that is not clear to me is how will we advance restart_lsn
> > if we don't send any empty xact in a system where there are many such
> > xacts?
> 
> Same way we already do it for writes that are not replicated over
> logical replication, like vacuum work etc. The upstream sends feedback
> with reply-requested. The downstream replies. The upstream advances
> confirmed_flush_lsn, and that lazily updates restart_lsn.

It'll still delay it a bit.


> The bigger issue here is that if you don't send empty txns on logical
> replication you don't get an eager, timely response from the
> replica(s), which delays synchronous replication. You need to send
> empty txns when synchronous replication is enabled, or instead poke
> the walsender to force immediate feedback with reply requested.

Somewhat independent from the issue at hand: It'd be really good if we
could evolve the syncrep framework to support per-database waiting... It
shouldn't be that hard, and the current situation sucks quite a bit (and
yes, I'm to blame).

I'm not quite sure what you mean by "poke the walsender"? Kinda sounds
like sending a signal, but decoding happens inside after the walsender,
so there's no need for that. Do you just mean somehow requesting that
walsender sends a feedback message?

To address the volume we could:

1a) Introduce a pgoutput message type to indicate that the LSN has
  advanced, without needing separate BEGIN/COMMIT. Right now BEGIN is
  21 bytes, COMMIT is 26. But we really don't need that much here. A
  single message should do the trick.

1b) Add a LogicalOutputPluginWriterUpdateProgress parameter (and
  possibly rename) that indicates that we are intentionally "ignoring"
  WAL. For walsender that callback then could check if it could just
  forward the position of the client (if it was entirely caught up
  before), or if it should send a feedback request (if syncrep is
  enabled, or distance is big).

2) Reduce the rate of 'empty transaction'/feedback request messages. If
  we know that we're not going to be blocked waiting for more WAL, or
  blocked sending messages out to the network, we don't immediately need
  to send out the messages. Instead we could continue decoding until
  there's actual data, or until we're going to get blocked.

  We could e.g. have a new LogicalDecodingContext callback that is
  called whenever WalSndWaitForWal() would wait. That'd check if there's
  a pending "need" to send out a 'empty transaction'/feedback request
  message. The "need" flag would get cleared whenever we send out data
  bearing an LSN for other reasons.

Greetings,

Andres Freund



Re: logical replication empty transactions

От
Craig Ringer
Дата:
On Tue, 10 Mar 2020 at 02:30, Andres Freund <andres@anarazel.de> wrote:
Hi,

On 2020-03-06 13:53:02 +0800, Craig Ringer wrote:
> On Mon, 2 Mar 2020 at 19:26, Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> > One thing that is not clear to me is how will we advance restart_lsn
> > if we don't send any empty xact in a system where there are many such
> > xacts?
>
> Same way we already do it for writes that are not replicated over
> logical replication, like vacuum work etc. The upstream sends feedback
> with reply-requested. The downstream replies. The upstream advances
> confirmed_flush_lsn, and that lazily updates restart_lsn.

It'll still delay it a bit.

Right, but we don't generally care because there's no sync rep txn waiting for confirmation. If we lose progress due to a crash it doesn't matter. It does delay removal of old WAL a little, but it hardly matters.
 
Somewhat independent from the issue at hand: It'd be really good if we
could evolve the syncrep framework to support per-database waiting... It
shouldn't be that hard, and the current situation sucks quite a bit (and
yes, I'm to blame).

Hardly, you just didn't get the chance to fix that on top of the umpteen other things you had to change to make all the logical stuff work. You didn't break it, just didn't implement every single possible enhancement all at once. Shocking, I tell you.


I'm not quite sure what you mean by "poke the walsender"? Kinda sounds
like sending a signal, but decoding happens inside after the walsender,
so there's no need for that. Do you just mean somehow requesting that
walsender sends a feedback message?

Right. I had in mind something like sending a ProcSignal via our funky multiplexed signal mechanism to ask the walsender to immediately generate a keepalive message with a reply-requested flag, then set the walsender's latch so we wake it promptly.
 
To address the volume we could:

1a) Introduce a pgoutput message type to indicate that the LSN has
  advanced, without needing separate BEGIN/COMMIT. Right now BEGIN is
  21 bytes, COMMIT is 26. But we really don't need that much here. A
  single message should do the trick.

It would. Is it worth caring though? Especially since it seems rather unlikely that the actual network data volume of begin/commit msgs will be much of a concern. It's not like we're PITRing logical streams, and if we did, we could just filter out empty commits on the receiver side.

That message pretty much already exists in the form of a walsender keepalive anyway so we might as well re-use that and not upset the protocol.
 
1b) Add a LogicalOutputPluginWriterUpdateProgress parameter (and
  possibly rename) that indicates that we are intentionally "ignoring"
  WAL. For walsender that callback then could check if it could just
  forward the position of the client (if it was entirely caught up
  before), or if it should send a feedback request (if syncrep is
  enabled, or distance is big).

I can see something like that being very useful, because at present only the output plugin knows if a txn is "empty" as far as that particular slot and output plugin is concerned. The reorder buffering mechanism cannot do relation-level filtering before it sends the changes to the output plugin during ReorderBufferCommit, since it only knows about relfilenodes not relation oids. And the output plugin might be doing finer grained filtering using row-filter expressions or who knows what else.

But as described above that will only help for txns done in DBs other than the one the logical slot is for or txns known to have an empty ReorderBuffer when the commit is seen.

If there's a txn in the slot's db with a non-empty reorderbuffer, the output plugin won't know if the txn is empty or not until it finishes processing all callbacks and sees the commit for the txn. So it will generally have emitted the Begin message on the wire by the time it knows it has nothing useful to say. And Pg won't know that this txn is empty as far as this output plugin with this particular slot, set of output plugin params, and current user-catalog state is concerned, so it won't have any way to call the output plugin's "update progress" callback instead of the usual begin/change/commit callbacks.

But I think we can already skip empty txns unless sync-rep is enabled with no core changes, and send empty txns as walsender keepalives instead, by altering only output plugins, like this:

* Stash BEGIN data in plugin's LogicalDecodingContext.output_plugin_private when plugin's begin callback called, don't write anything to the outstream
* Write out BEGIN message lazily when any other callback generates a message that does need to be written out
* If no BEGIN written by the time COMMIT callback called, discard the COMMIT too. Check if sync rep enabled. if it is, call LogicalDecodingContext.update_progress from within the output plugin commit handler, otherwise just ignore the commit totally. Probably by calling OutputPluginUpdateProgress().

  We could e.g. have a new LogicalDecodingContext callback that is
  called whenever WalSndWaitForWal() would wait. That'd check if there's
  a pending "need" to send out a 'empty transaction'/feedback request
  message. The "need" flag would get cleared whenever we send out data
  bearing an LSN for other reasons.

I can see that being handy, yes. But it won't necessarily help with the sync rep issue, since other sync rep txns may continue to generate WAL while others wait for commit-confirmations that won't come from the logical replica.

While we're speaking of adding output plugin hooks, I keep on trying to think of a sensible way to do a plugin-defined reply handler, so the downstream end can send COPY BOTH messages of some new msgkind back to the walsender, which will pass them to the output plugin if it implements the appropriate handle_reply_message (or whatever) callback. That much is trivial to implement, where I keep getting a bit stuck is with whether there's a sensible snapshot that can be set to call the output plugin reply handler with. We wouldn't want to switch to a current non-historic snapshot because of all the cache flushes that'd cause, but there isn't necessarily a valid and safe historic snapshot to set when we're not within ReorderBufferCommit is there?

I'd love to get rid of the need to "connect back" to a provider over plain libpq connections to communicate with it. The ability to run SQL on the walsender conn helps. But really, so much more would be possible if we could just have the downstream end *reply* on the same connection using COPY BOTH, much like it sends replay progress updates right now. It'd let us manage relation/attribute/type metadata caches better for example. 

Thoughts?

--
 Craig Ringer                   http://www.2ndQuadrant.com/
 2ndQuadrant - PostgreSQL Solutions for the Enterprise

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
The patch no longer applies, because of additions in the test source. Otherwise, I have tested the patch and confirmed
thatupdates and deletes on tables with deferred primary keys work with logical replication. 

The new status of this patch is: Waiting on Author

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
Sorry, I replied in the wrong thread. Please ignore above mail.

Re: logical replication empty transactions

От
Rahila Syed
Дата:

Hi,

Please see below review of the  0001-Skip-empty-transactions-for-logical-replication.patch

The make check passes.


 +               /* output BEGIN if we haven't yet */
 +               if (!data->xact_wrote_changes)
 +                       pgoutput_begin(ctx, txn);
 +
 +               data->xact_wrote_changes = true;
 +
IMO, xact_wrote_changes flag is better set inside the if condition as it does not need to 
be set repeatedly in subsequent calls to the same function. 


* Stash BEGIN data in plugin's LogicalDecodingContext.output_plugin_private when plugin's begin callback called, don't write anything to the outstream
* Write out BEGIN message lazily when any other callback generates a message that does need to be written out
* If no BEGIN written by the time COMMIT callback called, discard the COMMIT too. Check if sync rep enabled. if it is, call LogicalDecodingContext.update_progress
from within the output plugin commit handler, otherwise just ignore the commit totally. Probably by calling OutputPluginUpdateProgress().


I think the code in the patch is similar to what has been described by Craig in the above snippet, 
except instead of stashing the BEGIN message and sending the message lazily, it simply maintains a flag
in LogicalDecodingContext.output_plugin_private which defers calling output plugin's begin callback,
until any other callback actually generates a remote write. 

Also, the patch does not contain the last part where he describes having OutputPluginUpdateProgress()
for synchronous replication enabled transactions. 
However, some basic testing suggests that the patch does not have any notable adverse effect on
either the replication lag or the sync_rep performance. 

I performed tests by setting up publisher and subscriber on the same machine with synchronous_commit = on and
ran pgbench -c 12 -j 6 -T 300 on unpublished pgbench tables. 

I see that  confirmed_flush_lsn is catching up just fine without any notable delay as compared to the test results without
the patch.

Also, the TPS for synchronous replication of empty txns with and without the patch remains similar.

Having said that, these are initial findings and I understand better performance tests are required to measure
reduction in consumption of network bandwidth and impact on synchronous replication and replication lag.

Thank you,
Rahila Syed



Re: logical replication empty transactions

От
Michael Paquier
Дата:
On Wed, Jul 29, 2020 at 08:08:06PM +0530, Rahila Syed wrote:
> The make check passes.

Since then, the patch is failing to apply, waiting on author and the
thread has died 6 weeks or so ago, so I am marking it as RwF in the
CF.
--
Michael

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:


On Thu, Sep 17, 2020 at 3:29 PM Michael Paquier <michael@paquier.xyz> wrote:
On Wed, Jul 29, 2020 at 08:08:06PM +0530, Rahila Syed wrote:
> The make check passes.

Since then, the patch is failing to apply, waiting on author and the
thread has died 6 weeks or so ago, so I am marking it as RwF in the
CF.


I've rebased the patch and made changes so that the patch supports "streaming in-progress transactions" and handling of logical decoding
messages (transactional and non-transactional).
I see that this patch not only makes sure that empty transactions are not sent but also does call OutputPluginUpdateProgress when an empty
transaction is not sent, as a result the confirmed_flush_lsn is kept moving. I also see no hangs when synchronous_standby is configured.
Do let me know your thoughts on this patch. 

regards,
Ajin Cherian
Fujitsu Australia
Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:


On Thu, Apr 15, 2021 at 1:29 PM Ajin Cherian <itsajin@gmail.com> wrote:

I've rebased the patch and made changes so that the patch supports "streaming in-progress transactions" and handling of logical decoding
messages (transactional and non-transactional).
I see that this patch not only makes sure that empty transactions are not sent but also does call OutputPluginUpdateProgress when an empty
transaction is not sent, as a result the confirmed_flush_lsn is kept moving. I also see no hangs when synchronous_standby is configured.
Do let me know your thoughts on this patch. 


Removed some debug logs and typos. 

regards,
Ajin Cherian
Fujitsu Australia 
Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
On Thu, Apr 15, 2021 at 4:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
>
>
> On Thu, Apr 15, 2021 at 1:29 PM Ajin Cherian <itsajin@gmail.com> wrote:
>>
>>
>> I've rebased the patch and made changes so that the patch supports "streaming in-progress transactions" and handling
oflogical decoding
 
>> messages (transactional and non-transactional).
>> I see that this patch not only makes sure that empty transactions are not sent but also does call
OutputPluginUpdateProgresswhen an empty
 
>> transaction is not sent, as a result the confirmed_flush_lsn is kept moving. I also see no hangs when
synchronous_standbyis configured.
 
>> Do let me know your thoughts on this patch.

REVIEW COMMENTS

I applied this patch to today's HEAD and successfully ran "make check"
and also the subscription TAP tests.

Here are a some review comments:

------

1. The patch v3 applied OK but with whitespace warnings

[postgres@CentOS7-x64 oss_postgres_2PC]$ git apply
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98:
indent with spaces.
    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99:
indent with spaces.
    if (!data->xact_wrote_changes && !in_streaming && transactional)
warning: 2 lines add whitespace errors.

------

2. Please create a CF entry in [1] for this patch.

------

3. Patch comment

The comment  describes the problem and then suddenly just says
"Postpone the BEGIN message until the first change."

I suggest changing it to say more like... "(blank line) This patch
addresses the above problem by postponing the BEGIN message until the
first change."

------

4. pgoutput.h

Maybe for consistency with the context member, the comment for the new
member should be to the right instead of above it?

@@ -20,6 +20,9 @@ typedef struct PGOutputData
  MemoryContext context; /* private memory context for transient
  * allocations */

+ /* flag indicating whether messages have previously been sent */
+ bool        xact_wrote_changes;
+

------

5. pgoutput.h

+ /* flag indicating whether messages have previously been sent */

"previously been sent" --> "already been sent" ??

------

6. pgoutput.h - misleading member name

Actually, now that I have read all the rest of the code and how this
member is used I feel that this name is very misleading. e.g. For
"streaming" case then you still are writing changes but are not
setting this member at all - therefore it does not always mean what it
says.

I feel a better name for this would be something like
"sent_begin_txn". Then if you have sent BEGIN it is true. If you
haven't sent BEGIN it is false. It eliminates all ambiguity naming it
this way instead.

(This makes my feedback #5 redundant because the comment will be a bit
different if you do this).

------

7. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {

I guess that you still needed to pass the txn because that is how the
API is documented, right?

But I am wondering if you ought to flag it as unused so you wont get
some BF machine giving warnings about it.

e.g. Syntax like this?

pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) {
(void)txn;
...

------

8. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputData *data = ctx->output_plugin_private;
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->xact_wrote_changes = false;
+ elog(LOG,"Holding of begin");
+}

Why is this loglevel LOG? Looks like leftover debugging.

------

9. pgoutput.c - function pgoutput_commit_txn

@@ -384,8 +401,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputData *data = ctx->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /* skip COMMIT message if nothing was sent */
+ if (!data->xact_wrote_changes)
+ return;
+

In the case where you decided to do nothing does it make sense that
you still called the function OutputPluginUpdateProgress(ctx); ?
I thought perhaps that your new check should come first so this call
would never happen.

------

10. pgoutput.c - variable declarations without casts

+ PGOutputData *data = ctx->output_plugin_private;

I noticed the new stack variable you declare have no casts.

This differs from the existing code which always looks like:
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;

There are a couple of examples of this so please search new code to
find them all.

------

11. pgoutput.c - function pgoutput_change

@@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Assert(false);
  }

+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

12. pgoutput.c - pgoutput_truncate function

@@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

13. pgoutput.c - pgoutput_message

@@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  if (in_streaming)
  xid = txn->xid;

+    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+    if (!data->xact_wrote_changes && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

14. Test Code.

I noticed that there is no test code specifically for seeing if empty
transactions get sent or not. Is it possible to write such a test or
is this traffic improvement only observable using the debugger?

------
[1] https://commitfest.postgresql.org/33/

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:


On Mon, Apr 19, 2021 at 6:22 PM Peter Smith <smithpb2250@gmail.com> wrote:

Here are a some review comments:

------

1. The patch v3 applied OK but with whitespace warnings

[postgres@CentOS7-x64 oss_postgres_2PC]$ git apply
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98:
indent with spaces.
    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99:
indent with spaces.
    if (!data->xact_wrote_changes && !in_streaming && transactional)
warning: 2 lines add whitespace errors.

------

Fixed.
 

2. Please create a CF entry in [1] for this patch.

------

3. Patch comment

The comment  describes the problem and then suddenly just says
"Postpone the BEGIN message until the first change."

I suggest changing it to say more like... "(blank line) This patch
addresses the above problem by postponing the BEGIN message until the
first change."

------

 
Updated.
 
4. pgoutput.h

Maybe for consistency with the context member, the comment for the new
member should be to the right instead of above it?

@@ -20,6 +20,9 @@ typedef struct PGOutputData
  MemoryContext context; /* private memory context for transient
  * allocations */

+ /* flag indicating whether messages have previously been sent */
+ bool        xact_wrote_changes;
+

------

5. pgoutput.h

+ /* flag indicating whether messages have previously been sent */

"previously been sent" --> "already been sent" ??

------

6. pgoutput.h - misleading member name

Actually, now that I have read all the rest of the code and how this
member is used I feel that this name is very misleading. e.g. For
"streaming" case then you still are writing changes but are not
setting this member at all - therefore it does not always mean what it
says.

I feel a better name for this would be something like
"sent_begin_txn". Then if you have sent BEGIN it is true. If you
haven't sent BEGIN it is false. It eliminates all ambiguity naming it
this way instead.

(This makes my feedback #5 redundant because the comment will be a bit
different if you do this).

------

Fixed above comments.

7. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {

I guess that you still needed to pass the txn because that is how the
API is documented, right?

But I am wondering if you ought to flag it as unused so you wont get
some BF machine giving warnings about it.

e.g. Syntax like this?

pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) {
(void)txn;
...
 
Updated.
------

8. pgoutput.c - function pgoutput_begin_txn

@@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputData *data = ctx->output_plugin_private;
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->xact_wrote_changes = false;
+ elog(LOG,"Holding of begin");
+}

Why is this loglevel LOG? Looks like leftover debugging.

Removed. 

------

9. pgoutput.c - function pgoutput_commit_txn

@@ -384,8 +401,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputData *data = ctx->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /* skip COMMIT message if nothing was sent */
+ if (!data->xact_wrote_changes)
+ return;
+

In the case where you decided to do nothing does it make sense that
you still called the function OutputPluginUpdateProgress(ctx); ?
I thought perhaps that your new check should come first so this call
would never happen.

Even though the empty transaction is not sent, the LSN is tracked as decoded, hence the progress needs to be updated.


------

10. pgoutput.c - variable declarations without casts

+ PGOutputData *data = ctx->output_plugin_private;

I noticed the new stack variable you declare have no casts.

This differs from the existing code which always looks like:
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;

There are a couple of examples of this so please search new code to
find them all.

-----

Fixed.
 
11. pgoutput.c - function pgoutput_change

@@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Assert(false);
  }

+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------
 
Updated. 

12. pgoutput.c - pgoutput_truncate function

@@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ /* output BEGIN if we haven't yet */
+ if (!data->xact_wrote_changes && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

13. pgoutput.c - pgoutput_message

@@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  if (in_streaming)
  xid = txn->xid;

+    /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+    if (!data->xact_wrote_changes && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ data->xact_wrote_changes = true;
+ }

(same comment as above)

If the variable is renamed as previously suggested then the assignment
data->sent_BEGIN_txn = true; can be assigned in just 1 common place
INSIDE the pgoutput_begin function.

------

Fixed. 

14. Test Code.

I noticed that there is no test code specifically for seeing if empty
transactions get sent or not. Is it possible to write such a test or
is this traffic improvement only observable using the debugger?


The  020_messages.pl actually has a test case for tracking empty messages even though it is part of the messages test.

regards,
Ajin Cherian
Fujitsu Australia 
Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
An earlier comment from Anders:
> We could e.g. have a new LogicalDecodingContext callback that is
> called whenever WalSndWaitForWal() would wait. That'd check if there's
> a pending "need" to send out a 'empty transaction'/feedback request
> message. The "need" flag would get cleared whenever we send out data
> bearing an LSN for other reasons.
>

I think the current Keep Alive messages already achieve this by
sending the current LSN as part of the Keep Alive messages.
    /* construct the message... */
    resetStringInfo(&output_message);
    pq_sendbyte(&output_message, 'k');
    pq_sendint64(&output_message, sentPtr); <=== Last sent WAL LSN
    pq_sendint64(&output_message, GetCurrentTimestamp());
    pq_sendbyte(&output_message, requestReply ? 1 : 0);

I'm not sure if anything more is required to keep empty transactions
updated as part of synchronous replicas. If my understanding on this
is not correct, let me know.

regards,
Ajin Cherian
Fujitsu Australia



Re: logical replication empty transactions

От
Peter Smith
Дата:
On Fri, Apr 23, 2021 at 3:46 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
>
>
> On Mon, Apr 19, 2021 at 6:22 PM Peter Smith <smithpb2250@gmail.com> wrote:
>>
>>
>> Here are a some review comments:
>>
>> ------
>>
>> 1. The patch v3 applied OK but with whitespace warnings
>>
>> [postgres@CentOS7-x64 oss_postgres_2PC]$ git apply
>> ../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch
>> ../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:98:
>> indent with spaces.
>>     /* output BEGIN if we haven't yet, avoid for streaming and
>> non-transactional messages */
>> ../patches_misc/v3-0001-Skip-empty-transactions-for-logical-replication.patch:99:
>> indent with spaces.
>>     if (!data->xact_wrote_changes && !in_streaming && transactional)
>> warning: 2 lines add whitespace errors.
>>
>> ------
>
>
> Fixed.
>
>>
>>
>> 2. Please create a CF entry in [1] for this patch.
>>
>> ------
>>
>> 3. Patch comment
>>
>> The comment  describes the problem and then suddenly just says
>> "Postpone the BEGIN message until the first change."
>>
>> I suggest changing it to say more like... "(blank line) This patch
>> addresses the above problem by postponing the BEGIN message until the
>> first change."
>>
>> ------
>>
>
> Updated.
>
>>
>> 4. pgoutput.h
>>
>> Maybe for consistency with the context member, the comment for the new
>> member should be to the right instead of above it?
>>
>> @@ -20,6 +20,9 @@ typedef struct PGOutputData
>>   MemoryContext context; /* private memory context for transient
>>   * allocations */
>>
>> + /* flag indicating whether messages have previously been sent */
>> + bool        xact_wrote_changes;
>> +
>>
>> ------
>>
>> 5. pgoutput.h
>>
>> + /* flag indicating whether messages have previously been sent */
>>
>> "previously been sent" --> "already been sent" ??
>>
>> ------
>>
>> 6. pgoutput.h - misleading member name
>>
>> Actually, now that I have read all the rest of the code and how this
>> member is used I feel that this name is very misleading. e.g. For
>> "streaming" case then you still are writing changes but are not
>> setting this member at all - therefore it does not always mean what it
>> says.
>>
>> I feel a better name for this would be something like
>> "sent_begin_txn". Then if you have sent BEGIN it is true. If you
>> haven't sent BEGIN it is false. It eliminates all ambiguity naming it
>> this way instead.
>>
>> (This makes my feedback #5 redundant because the comment will be a bit
>> different if you do this).
>>
>> ------
>
>
> Fixed above comments.
>>
>>
>> 7. pgoutput.c - function pgoutput_begin_txn
>>
>> @@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
>> OutputPluginOptions *opt,
>>  static void
>>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>>  {
>>
>> I guess that you still needed to pass the txn because that is how the
>> API is documented, right?
>>
>> But I am wondering if you ought to flag it as unused so you wont get
>> some BF machine giving warnings about it.
>>
>> e.g. Syntax like this?
>>
>> pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN * txn) {
>> (void)txn;
>> ...
>
>
> Updated.
>>
>> ------
>>
>> 8. pgoutput.c - function pgoutput_begin_txn
>>
>> @@ -345,6 +345,23 @@ pgoutput_startup(LogicalDecodingContext *ctx,
>> OutputPluginOptions *opt,
>>  static void
>>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>>  {
>> + PGOutputData *data = ctx->output_plugin_private;
>> +
>> + /*
>> + * Don't send BEGIN message here. Instead, postpone it until the first
>> + * change. In logical replication, a common scenario is to replicate a set
>> + * of tables (instead of all tables) and transactions whose changes were on
>> + * table(s) that are not published will produce empty transactions. These
>> + * empty transactions will send BEGIN and COMMIT messages to subscribers,
>> + * using bandwidth on something with little/no use for logical replication.
>> + */
>> + data->xact_wrote_changes = false;
>> + elog(LOG,"Holding of begin");
>> +}
>>
>> Why is this loglevel LOG? Looks like leftover debugging.
>
>
> Removed.
>>
>>
>> ------
>>
>> 9. pgoutput.c - function pgoutput_commit_txn
>>
>> @@ -384,8 +401,14 @@ static void
>>  pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>>   XLogRecPtr commit_lsn)
>>  {
>> + PGOutputData *data = ctx->output_plugin_private;
>> +
>>   OutputPluginUpdateProgress(ctx);
>>
>> + /* skip COMMIT message if nothing was sent */
>> + if (!data->xact_wrote_changes)
>> + return;
>> +
>>
>> In the case where you decided to do nothing does it make sense that
>> you still called the function OutputPluginUpdateProgress(ctx); ?
>> I thought perhaps that your new check should come first so this call
>> would never happen.
>
>
> Even though the empty transaction is not sent, the LSN is tracked as decoded, hence the progress needs to be
updated.
>
>>
>> ------
>>
>> 10. pgoutput.c - variable declarations without casts
>>
>> + PGOutputData *data = ctx->output_plugin_private;
>>
>> I noticed the new stack variable you declare have no casts.
>>
>> This differs from the existing code which always looks like:
>> PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
>>
>> There are a couple of examples of this so please search new code to
>> find them all.
>>
>> -----
>
>
> Fixed.
>
>>
>> 11. pgoutput.c - function pgoutput_change
>>
>> @@ -551,6 +574,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
>> ReorderBufferTXN *txn,
>>   Assert(false);
>>   }
>>
>> + /* output BEGIN if we haven't yet */
>> + if (!data->xact_wrote_changes && !in_streaming)
>> + {
>> + pgoutput_begin(ctx, txn);
>> + data->xact_wrote_changes = true;
>> + }
>>
>> If the variable is renamed as previously suggested then the assignment
>> data->sent_BEGIN_txn = true; can be assigned in just 1 common place
>> INSIDE the pgoutput_begin function.
>>
>> ------
>
>
> Updated.
>>
>>
>> 12. pgoutput.c - pgoutput_truncate function
>>
>> @@ -693,6 +723,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
>> ReorderBufferTXN *txn,
>>
>>   if (nrelids > 0)
>>   {
>> + /* output BEGIN if we haven't yet */
>> + if (!data->xact_wrote_changes && !in_streaming)
>> + {
>> + pgoutput_begin(ctx, txn);
>> + data->xact_wrote_changes = true;
>> + }
>>
>> (same comment as above)
>>
>> If the variable is renamed as previously suggested then the assignment
>> data->sent_BEGIN_txn = true; can be assigned in just 1 common place
>> INSIDE the pgoutput_begin function.
>>
>> 13. pgoutput.c - pgoutput_message
>>
>> @@ -725,6 +762,13 @@ pgoutput_message(LogicalDecodingContext *ctx,
>> ReorderBufferTXN *txn,
>>   if (in_streaming)
>>   xid = txn->xid;
>>
>> +    /* output BEGIN if we haven't yet, avoid for streaming and
>> non-transactional messages */
>> +    if (!data->xact_wrote_changes && !in_streaming && transactional)
>> + {
>> + pgoutput_begin(ctx, txn);
>> + data->xact_wrote_changes = true;
>> + }
>>
>> (same comment as above)
>>
>> If the variable is renamed as previously suggested then the assignment
>> data->sent_BEGIN_txn = true; can be assigned in just 1 common place
>> INSIDE the pgoutput_begin function.
>>
>> ------
>
>
> Fixed.
>>
>>
>> 14. Test Code.
>>
>> I noticed that there is no test code specifically for seeing if empty
>> transactions get sent or not. Is it possible to write such a test or
>> is this traffic improvement only observable using the debugger?
>>
>
> The  020_messages.pl actually has a test case for tracking empty messages even though it is part of the messages
test.
>
> regards,
> Ajin Cherian
> Fujitsu Australia

Thanks for addressing my v3 review comments above.

I tested the latest v4.

The v4 patch applied cleanly.

make check-world completed successfully.

So this patch v4 looks LGTM, apart from the following 2 nitpick comments:

======

1. Suggest to add a blank line after the (void)txn; ?

@@ -345,10 +345,29 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ (void)txn; /* keep compiler quiet */
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first


======

2. Unnecessary statement blocks?

AFAIK those { } are not the usual PG code-style when there is only one
statement, so suggest to remove them.

Appies to 3 places:

@@ -551,6 +576,12 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Assert(false);
  }

+ /* output BEGIN if we haven't yet */
+ if (!data->sent_begin_txn && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ }

@@ -693,6 +724,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ /* output BEGIN if we haven't yet */
+ if (!data->sent_begin_txn && !in_streaming)
+ {
+ pgoutput_begin(ctx, txn);
+ }

@@ -725,6 +762,12 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  if (in_streaming)
  xid = txn->xid;

+ /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+ if (!data->sent_begin_txn && !in_streaming && transactional)
+ {
+ pgoutput_begin(ctx, txn);
+ }

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Apr 26, 2021 at 4:29 PM Peter Smith <smithpb2250@gmail.com> wrote:

> The v4 patch applied cleanly.
>
> make check-world completed successfully.
>
> So this patch v4 looks LGTM, apart from the following 2 nitpick comments:
>
> ======
>
> 1. Suggest to add a blank line after the (void)txn; ?
>
> @@ -345,10 +345,29 @@ pgoutput_startup(LogicalDecodingContext *ctx,
> OutputPluginOptions *opt,
>  static void
>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> +
> + (void)txn; /* keep compiler quiet */
> + /*
> + * Don't send BEGIN message here. Instead, postpone it until the first
>
>

Fixed.

> ======
>
> 2. Unnecessary statement blocks?
>
> AFAIK those { } are not the usual PG code-style when there is only one
> statement, so suggest to remove them.
>
> Appies to 3 places:
>
> @@ -551,6 +576,12 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   Assert(false);
>   }
>
> + /* output BEGIN if we haven't yet */
> + if (!data->sent_begin_txn && !in_streaming)
> + {
> + pgoutput_begin(ctx, txn);
> + }
>
> @@ -693,6 +724,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
>   if (nrelids > 0)
>   {
> + /* output BEGIN if we haven't yet */
> + if (!data->sent_begin_txn && !in_streaming)
> + {
> + pgoutput_begin(ctx, txn);
> + }
>
> @@ -725,6 +762,12 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   if (in_streaming)
>   xid = txn->xid;
>
> + /* output BEGIN if we haven't yet, avoid for streaming and
> non-transactional messages */
> + if (!data->sent_begin_txn && !in_streaming && transactional)
> + {
> + pgoutput_begin(ctx, txn);
> + }
>

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Tue, Apr 27, 2021 at 1:49 PM Ajin Cherian <itsajin@gmail.com> wrote:

Rebased the patch as it was no longer applying.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
vignesh C
Дата:
On Tue, May 25, 2021 at 6:36 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Tue, Apr 27, 2021 at 1:49 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> Rebased the patch as it was no longer applying.

Thanks for the updated patch, few comments:
1) I'm not sure if we could add some tests for skip empty
transactions, if possible add a few tests.

2) We could add some debug level log messages for the transaction that
will be skipped.

3) You could keep this variable below the other bool variables in the structure:
+       bool        sent_begin_txn;     /* flag indicating whether begin
+
  * has already been sent */
+

4) You can split the comments to multi-line as it exceeds 80 chars
+       /* output BEGIN if we haven't yet, avoid for streaming and
non-transactional messages */
+       if (!data->sent_begin_txn && !in_streaming && transactional)
+               pgoutput_begin(ctx, txn);

Regards,
Vignesh



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Thu, May 27, 2021 at 8:58 PM vignesh C <vignesh21@gmail.com> wrote:

> Thanks for the updated patch, few comments:
> 1) I'm not sure if we could add some tests for skip empty
> transactions, if possible add a few tests.
>
Added a few tests for prepared transactions as well as the existing
test in 020_messages.pl also tests regular transactions.

> 2) We could add some debug level log messages for the transaction that
> will be skipped.

Added.

>
> 3) You could keep this variable below the other bool variables in the structure:
> +       bool        sent_begin_txn;     /* flag indicating whether begin
> +
>   * has already been sent */
> +

I've moved this variable around, so this comment no longer is valid.

>
> 4) You can split the comments to multi-line as it exceeds 80 chars
> +       /* output BEGIN if we haven't yet, avoid for streaming and
> non-transactional messages */
> +       if (!data->sent_begin_txn && !in_streaming && transactional)
> +               pgoutput_begin(ctx, txn);

Done.

I've had to rebase the patch after a recent commit by Amit Kapila of
supporting two-phase commits in pub-sub [1].
Also I've modified the patch to also skip replicating empty prepared
transactions. Do let me know if you have any comments.

regards,
Ajin Cherian
Fujitsu Australia
[1]- https://www.postgresql.org/message-id/CAHut+PueG6u3vwG8DU=JhJiWa2TwmZ=bDqPchZkBky7ykzA7MA@mail.gmail.com

Вложения

RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
On Wednesday, July 14, 2021 9:30 PM Ajin Cherian <itsajin@gmail.com> wrote:
> I've had to rebase the patch after a recent commit by Amit Kapila of supporting
> two-phase commits in pub-sub [1].
> Also I've modified the patch to also skip replicating empty prepared
> transactions. Do let me know if you have any comments.
Hi

I started to test this patch but will give you some really minor quick feedbacks.

(1) pg_logical_slot_get_binary_changes() params.

Technically, looks better to have proto_version 3 & two_phase option for the function
to test empty prepare ? I felt proto_version 1 doesn't support 2PC.
[1] says "The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
are available since protocol version 3." Then, if the test wants to skip empty *prepares*,
I suggest to update the proto_version and set two_phase 'on'.

+##############################
+# Test empty prepares
+##############################
...
+# peek at the contents of the slot
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0)
+       FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub')
+));

(2) The following sentences may start with a lowercase letter.
There are other similar codes for this.

+               elog(DEBUG1, "Skipping replication of an empty transaction");

[1] - https://www.postgresql.org/docs/devel/protocol-logicalrep-message-formats.html


Best Regards,
    Takamichi Osumi


Re: logical replication empty transactions

От
Peter Smith
Дата:
Hi Ajin,

I have reviewed the v7 patch and given my feedback comments below.

Apply OK
Build OK
make check OK
TAP (subscriptions) make check OK
Build PG Docs (html) OK

Although I made lots of review comments below, the important point is
that none of them are functional - they are only minore re-wordings
and some code refactoring that I thought would make the code simpler
and/or easier to read. YMMV, so please feel free to disagree with any
of them.

//////////

1a. Commit Comment - wording

BEFORE
This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE message until the first change.

AFTER
This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE messages until the first change is encountered.

------

1b. Commit Comment - wording

BEFORE
While processing a COMMIT message or a PREPARE message, if there is no
other change for that transaction, do not send COMMIT message or
PREPARE message.

AFTER
If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message.

------

2. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
       The required <function>commit_prepared_cb</function> callback is called
       whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
       The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> in which case
+      it can commit the transaction, otherwise, it can skip the commit. The
+      <parameter>gid</parameter> alone is not sufficient because the downstream
+      node can have a prepared transaction with the same identifier.

=>

(some minor rewording of the last part)

AFTER:

The parameters <parameter>prepare_end_lsn</parameter> and
<parameter>prepare_time</parameter> can be used to check if the plugin
has received this <command>PREPARE TRANSACTION</command> or not. If
yes, it can commit the transaction, otherwise, it can skip the commit.
The <parameter>gid</parameter> alone is not sufficient to determine
this because the downstream node may already have a prepared
transaction with the same identifier.


------

3. src/backend/replication/logical/proto.c - whitespace

@@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in,
LogicalRepCommitPreparedTxnData *
  elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

  /* read fields */
+ prepare_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR,"prepare_end_lsn is not set in commit prepared message");

=>

There is missing space before the 2nd elog param.

------

4. src/backend/replication/logical/worker.c - comment typos

  /*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
+ * It is possible that we haven't received the prepare because
+ * the transaction did not have any changes relevant to this
+ * subscription and was essentially an empty prepare. In which case,
+ * the walsender is optimized to drop the empty transaction and the
+ * accompanying prepare. Silently ignore if we don't find the prepared
+ * transaction.
  */

4a. =>

"and was essentially an empty prepare" --> "so was essentially an empty prepare"

4b. =>

"In which case" --> "In this case"

------

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

@@ -410,10 +417,32 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->sent_begin_txn = false;
+ txn->output_plugin_private = data;
+}

=>

I felt that since this message postponement is now the new behaviour
of this function then probably this should all be a function level
comment instead of the comment being in the body of the function

------

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Even though it is kind of obvious, it is probably better to provide a
function comment here too

------

7. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_txn

@@ -428,8 +457,22 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr commit_lsn)
 {
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+ bool            skip;
+
+ Assert(data);
+ skip = !data->sent_begin_txn;
+ pfree(data);
+ txn->output_plugin_private = NULL;
  OutputPluginUpdateProgress(ctx);

+ /* skip COMMIT message if nothing was sent */
+ if (skip)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction");
+ return;
+ }
+

7a. =>

I felt that the comment "skip COMMIT message if nothing was sent"
should be done at the point where you *decide* to skip or not. So you
could either move that comment to where the skip variable is assigned.
Or (my preference) leave the comment where it is but change the
variable name to be sent_begin = !data->sent_begin_txn;

------

Regardless I think the comment should be elaborated a bit to describe
the reason more.

7b. =>

BEFORE
/* skip COMMIT message if nothing was sent */

AFTER
/* If a BEGIN message was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT message too.
*/

------

8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare_txn

@@ -441,10 +484,28 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
 static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ /*
+ * Don't send BEGIN PREPARE message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages
+ * to subscribers, using bandwidth on something with little/no use
+ * for logical replication.
+ */
+ pgoutput_begin_txn(ctx, txn);
+}

8a. =>

Like previously, I felt that this big comment should be at the
function level of pgoutput_begin_prepare_txn instead of in the body of
the function.

------

8b. =>

And then the body comment would be something simple like:

/* Delegate to assign the begin sent flag as false same as for the
BEGIN message. */
pgoutput_begin_txn(ctx, txn);

------

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare

+
+static void
+pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Probably this needs a function comment.

------

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn

@@ -459,8 +520,18 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  XLogRecPtr prepare_lsn)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(data);
  OutputPluginUpdateProgress(ctx);

+ /* skip PREPARE message if nothing was sent */
+ if (!data->sent_begin_txn)

=>

Maybe elaborate on that "skip PREPARE message if nothing was sent"
comment in a way similar to my review comment 7b. For example,

AFTER
/* If the BEGIN was not yet sent, then it means there were no relevant
changes encountered, so we can skip the PREPARE message too. */

------

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn

@@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext
*ctx, ReorderBufferTXN *txn,
  */
 static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
 {
+ PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data)

=>

Similar to previous review comment 10, I think the reason for the skip
should be elaborated a little bit. For example,

AFTER
/* If the BEGIN PREPARE was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT PREPARED
message too. */

------

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn

=> Similar as for pgoutput_comment_prepared_txn (see review comment 11)

------

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Relation relation, ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
  MemoryContext old;
  RelationSyncEntry *relentry;
  TransactionId xid = InvalidTransactionId;
  Relation ancestor = NULL;

+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+
  if (!is_publishable_relation(relation))
  return;

13a. =>

I felt the streaming logic with the txndata is a bit confusing. I
think it would be easier to have another local variable (sent_begin)
and use it like this...

bool sent_begin;
if (in_streaming)
{
    sent_begin = true;
else
{
    PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    Assert(txndata)
    sent_begin = txn->sent_begin_txn;
}

...

------

+ /* output BEGIN if we haven't yet */

13b. =>

I thought the comment is not quite right

AFTER
/* Output BEGIN / BEGIN PREPARE if we haven't yet */

------

+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ if (rbtxn_prepared(txn))
+ pgoutput_begin_prepare(ctx, txn);
+ else
+ pgoutput_begin(ctx, txn);
+ }
+

13.c =>

If you introduce the variable (as suggested in 13a) this code becomes
much simpler:

AFTER

if (!sent_begin)
{
    if (rbtxn_prepared(txn))
        pgoutput_begin_prepare(ctx, txn)
    else
        pgoutput_begin(ctx, txn);
}


------

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

=>

All the similar review comments made for pg_change (13a, 13b, 13c)
apply to pgoutput_truncate here also.

------

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  const char *message)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
  TransactionId xid = InvalidTransactionId;


=>

This variable should be declared in the block where it is used,
similar to the suggestion 13a.

Also is it just an accidental omission that you did Assert(txndata)
for all the other places but not here?

------

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Jul 19, 2021 at 3:24 PM Peter Smith <smithpb2250@gmail.com> wrote:

> 1a. Commit Comment - wording
>
updated.
>
> 1b. Commit Comment - wording
>
updated.

> 2. doc/src/sgml/logicaldecoding.sgml - wording
>
> @@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
> LogicalDecodingContext *ctx,
>        The required <function>commit_prepared_cb</function> callback is called
>        whenever a transaction <command>COMMIT PREPARED</command> has
> been decoded.
>        The <parameter>gid</parameter> field, which is part of the
> -      <parameter>txn</parameter> parameter, can be used in this callback.
> +      <parameter>txn</parameter> parameter, can be used in this callback. The
> +      parameters <parameter>prepare_end_lsn</parameter> and
> +      <parameter>prepare_time</parameter> can be used to check if the plugin
> +      has received this <command>PREPARE TRANSACTION</command> in which case
> +      it can commit the transaction, otherwise, it can skip the commit. The
> +      <parameter>gid</parameter> alone is not sufficient because the downstream
> +      node can have a prepared transaction with the same identifier.
>
> =>
>
> (some minor rewording of the last part)

updated.

>
> 3. src/backend/replication/logical/proto.c - whitespace
>
> @@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in,
> LogicalRepCommitPreparedTxnData *
>   elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
>
>   /* read fields */
> + prepare_data->prepare_end_lsn = pq_getmsgint64(in);
> + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
> + elog(ERROR,"prepare_end_lsn is not set in commit prepared message");
>
> =>
>
> There is missing space before the 2nd elog param.
>

fixed.

>
> 4a. =>
>
> "and was essentially an empty prepare" --> "so was essentially an empty prepare"
>
> 4b. =>
>
> "In which case" --> "In this case"
>
> ------

fixed.

> I felt that since this message postponement is now the new behaviour
> of this function then probably this should all be a function level
> comment instead of the comment being in the body of the function
>
> ------
>
> 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin
>
> +
> +static void
> +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>
> =>
>
> Even though it is kind of obvious, it is probably better to provide a
> function comment here too
>
> ------

Changed accordingly.

>

> I felt that the comment "skip COMMIT message if nothing was sent"
> should be done at the point where you *decide* to skip or not. So you
> could either move that comment to where the skip variable is assigned.
> Or (my preference) leave the comment where it is but change the
> variable name to be sent_begin = !data->sent_begin_txn;
>

Updated the comment to where the skip variable is assigned.


> ------
>
> Regardless I think the comment should be elaborated a bit to describe
> the reason more.
>
> 7b. =>
>
> BEFORE
> /* skip COMMIT message if nothing was sent */
>
> AFTER
> /* If a BEGIN message was not yet sent, then it means there were no
> relevant changes encountered, so we can skip the COMMIT message too.
> */
>

Updated accordingly.


> ------

> Like previously, I felt that this big comment should be at the
> function level of pgoutput_begin_prepare_txn instead of in the body of
> the function.
>
> ------
>
> 8b. =>
>
> And then the body comment would be something simple like:
>
> /* Delegate to assign the begin sent flag as false same as for the
> BEGIN message. */
> pgoutput_begin_txn(ctx, txn);
>

Updated accordingly.

> ------
>
> 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare
>
> +
> +static void
> +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>
> =>
>
> Probably this needs a function comment.
>

Updated.

> ------
>
> 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn
>
> @@ -459,8 +520,18 @@ static void
>  pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>   XLogRecPtr prepare_lsn)
>  {
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + Assert(data);
>   OutputPluginUpdateProgress(ctx);
>
> + /* skip PREPARE message if nothing was sent */
> + if (!data->sent_begin_txn)
>
> =>
>
> Maybe elaborate on that "skip PREPARE message if nothing was sent"
> comment in a way similar to my review comment 7b. For example,
>
> AFTER
> /* If the BEGIN was not yet sent, then it means there were no relevant
> changes encountered, so we can skip the PREPARE message too. */
>

Updated.

> ------
>
> 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn
>
> @@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext
> *ctx, ReorderBufferTXN *txn,
>   */
>  static void
>  pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> - XLogRecPtr commit_lsn)
> + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
> + TimestampTz prepare_time)
>  {
> + PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> + /*
> + * skip sending COMMIT PREPARED message if prepared transaction
> + * has not been sent.
> + */
> + if (data)
>
> =>
>
> Similar to previous review comment 10, I think the reason for the skip
> should be elaborated a little bit. For example,
>
> AFTER
> /* If the BEGIN PREPARE was not yet sent, then it means there were no
> relevant changes encountered, so we can skip the COMMIT PREPARED
> message too. */
>
> ------

Updated accordingly.

>
> 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn
>
> => Similar as for pgoutput_comment_prepared_txn (see review comment 11)
>
> ------

Updated,

>
> 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change
>
> @@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   Relation relation, ReorderBufferChange *change)
>  {
>   PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>   MemoryContext old;
>   RelationSyncEntry *relentry;
>   TransactionId xid = InvalidTransactionId;
>   Relation ancestor = NULL;
>
> + /* If not streaming, should have setup txndata as part of
> BEGIN/BEGIN PREPARE */
> + if (!in_streaming)
> + Assert(txndata);
> +
>   if (!is_publishable_relation(relation))
>   return;
>
> 13a. =>
>
> I felt the streaming logic with the txndata is a bit confusing. I
> think it would be easier to have another local variable (sent_begin)
> and use it like this...
>
> bool sent_begin;
> if (in_streaming)
> {
>     sent_begin = true;
> else
> {
>     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>     Assert(txndata)
>     sent_begin = txn->sent_begin_txn;
> }
>

I did not make the change, because in case of streaming "Sent_begin"
is not true, so it seemed incorrect coding it
that way. Instead , I have modified the comment to mention that
streaming transaction do not send BEG / BEGIN PREPARE.

> ...
>
> ------
>
> + /* output BEGIN if we haven't yet */
>
> 13b. =>
>
> I thought the comment is not quite right
>
> AFTER
> /* Output BEGIN / BEGIN PREPARE if we haven't yet */
>
> ------

Updated.

>
> + if (!in_streaming && !txndata->sent_begin_txn)
> + {
> + if (rbtxn_prepared(txn))
> + pgoutput_begin_prepare(ctx, txn);
> + else
> + pgoutput_begin(ctx, txn);
> + }
> +
>
> 13.c =>
>
> If you introduce the variable (as suggested in 13a) this code becomes
> much simpler:
>

Skipped this. (reason mentioned above)

> ------
>
> 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate
>
> =>
>
> All the similar review comments made for pg_change (13a, 13b, 13c)
> apply to pgoutput_truncate here also.
>
> ------

Updated.

>
> 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message
>
> @@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   const char *message)
>  {
>   PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata;
>   TransactionId xid = InvalidTransactionId;
>
>
> =>
>
> This variable should be declared in the block where it is used,
> similar to the suggestion 13a.
>
> Also is it just an accidental omission that you did Assert(txndata)
> for all the other places but not here?
>

Moved location of the variable and added an assert.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Thu, Jul 15, 2021 at 3:50 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
> I started to test this patch but will give you some really minor quick feedbacks.
>
> (1) pg_logical_slot_get_binary_changes() params.
>
> Technically, looks better to have proto_version 3 & two_phase option for the function
> to test empty prepare ? I felt proto_version 1 doesn't support 2PC.
> [1] says "The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
> are available since protocol version 3." Then, if the test wants to skip empty *prepares*,
> I suggest to update the proto_version and set two_phase 'on'.

Updated accordingly.

> (2) The following sentences may start with a lowercase letter.
> There are other similar codes for this.
>
> +               elog(DEBUG1, "Skipping replication of an empty transaction");

Fixed this.

I've addressed these comments in version 8 of the patch.

regards,
Ajin Cherian
Fujitsu Australia



Re: logical replication empty transactions

От
Peter Smith
Дата:
Hi Ajin.

I have reviewed the v8 patch and my feedback comments are below:

//////////

1. Apply v8 gave multiple whitespace warnings.

------

2. Commit comment - wording

If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
or BEGIN PREPARE / PREPARE  messages for transactions that are empty.

=>

Shouldn't this also mention some other messages that may be skipped?
- COMMIT PREPARED
- ROLLBACK PREPARED

------

3. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,20 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
       The required <function>commit_prepared_cb</function> callback is called
       whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
       The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> command or not.
+      If yes, it can commit the transaction, otherwise, it can skip the commit.
+      The <parameter>gid</parameter> alone is not sufficient to determine this
+      because the downstream may already have a prepared transaction with the
+      same identifier.

=>

Typo: Should that say "downstream node" instead of just "downstream" ?

------

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn
callback comment

@@ -406,14 +413,38 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,

 /*
  * BEGIN callback
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on

=>

Typo: "BEGIN callback" --> "BEGIN callback." (with the period).

And, I think maybe it will be better if it has a separating blank line too.

e.g.

/*
 * BEGIN callback.
 *
 * Don't send BEGIN ....

(NOTE: this review comment applies to other callback function comments
too, so please hunt them all down)

------

5. src/backend/replication/pgoutput/pgoutput.c - data / txndata

 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));

=>

There is some inconsistent naming of the local variable in the patch.
Sometimes it is called "data"; Sometimes it is called "txdata" etc. It
would be better to just stick with the same variable name everywhere.

(NOTE: this comment applies to several places in this patch)

------

6. src/backend/replication/pgoutput/pgoutput.c - Strange way to use Assert

+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+

=>

This style of Assert code seemed strange to me. In production mode
isn't that going to evaluate to some condition with a ((void) true)
body? IMO it might be better to just include the streaming check as
part of the Assert. For example:

BEFORE
if (!in_streaming)
Assert(txndata);

AFTER
Assert(in_streaming || txndata);

(NOTE: This same review comment applies in at least 3 places in this
patch, so please hunt them all down)

------

7. src/backend/replication/pgoutput/pgoutput.c - comment wording

@@ -677,6 +810,18 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Assert(false);
  }

+ /*
+ * output BEGIN / BEGIN PREPARE if we haven't yet,
+     * while streaming no need to send BEGIN / BEGIN PREPARE.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)

=>

English not really that comment is. The comment should also start with
uppercase.

(NOTE: This same comment was in couple of places in the patch)

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Thu, Jul 22, 2021 at 6:11 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Hi Ajin.
>
> I have reviewed the v8 patch and my feedback comments are below:
>
> //////////
>
> 1. Apply v8 gave multiple whitespace warnings.
>
> ------
>
> 2. Commit comment - wording
>
> If (when processing a COMMIT / PREPARE message) we find there had been
> no other change for that transaction, then do not send the COMMIT /
> PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
> or BEGIN PREPARE / PREPARE  messages for transactions that are empty.
>
> =>
>
> Shouldn't this also mention some other messages that may be skipped?
> - COMMIT PREPARED
> - ROLLBACK PREPARED
>

Updated.

> ------
>
> 3. doc/src/sgml/logicaldecoding.sgml - wording
>
> @@ -884,11 +884,20 @@ typedef void (*LogicalDecodePrepareCB) (struct
> LogicalDecodingContext *ctx,
>        The required <function>commit_prepared_cb</function> callback is called
>        whenever a transaction <command>COMMIT PREPARED</command> has
> been decoded.
>        The <parameter>gid</parameter> field, which is part of the
> -      <parameter>txn</parameter> parameter, can be used in this callback.
> +      <parameter>txn</parameter> parameter, can be used in this callback. The
> +      parameters <parameter>prepare_end_lsn</parameter> and
> +      <parameter>prepare_time</parameter> can be used to check if the plugin
> +      has received this <command>PREPARE TRANSACTION</command> command or not.
> +      If yes, it can commit the transaction, otherwise, it can skip the commit.
> +      The <parameter>gid</parameter> alone is not sufficient to determine this
> +      because the downstream may already have a prepared transaction with the
> +      same identifier.
>
> =>
>
> Typo: Should that say "downstream node" instead of just "downstream" ?
>
> ------

Updated.

>
> 4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn
> callback comment
>
> @@ -406,14 +413,38 @@ pgoutput_startup(LogicalDecodingContext *ctx,
> OutputPluginOptions *opt,
>
>  /*
>   * BEGIN callback
> + * Don't send BEGIN message here. Instead, postpone it until the first
> + * change. In logical replication, a common scenario is to replicate a set
> + * of tables (instead of all tables) and transactions whose changes were on
>
> =>
>
> Typo: "BEGIN callback" --> "BEGIN callback." (with the period).
>
> And, I think maybe it will be better if it has a separating blank line too.
>
> e.g.
>
> /*
>  * BEGIN callback.
>  *
>  * Don't send BEGIN ....
>
> (NOTE: this review comment applies to other callback function comments
> too, so please hunt them all down)
>
> ------

Updated.

>
> 5. src/backend/replication/pgoutput/pgoutput.c - data / txndata
>
>  static void
>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
> + PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
> + sizeof(PGOutputTxnData));
>
> =>
>
> There is some inconsistent naming of the local variable in the patch.
> Sometimes it is called "data"; Sometimes it is called "txdata" etc. It
> would be better to just stick with the same variable name everywhere.
>
> (NOTE: this comment applies to several places in this patch)
>
> ------

I've changed all occurance of PGOutputTxnData to txndata. Note that
there is another structure PGOutputData which still uses the name
data.

>
> 6. src/backend/replication/pgoutput/pgoutput.c - Strange way to use Assert
>
> + /* If not streaming, should have setup txndata as part of
> BEGIN/BEGIN PREPARE */
> + if (!in_streaming)
> + Assert(txndata);
> +
>
> =>
>
> This style of Assert code seemed strange to me. In production mode
> isn't that going to evaluate to some condition with a ((void) true)
> body? IMO it might be better to just include the streaming check as
> part of the Assert. For example:
>
> BEFORE
> if (!in_streaming)
> Assert(txndata);
>
> AFTER
> Assert(in_streaming || txndata);
>
> (NOTE: This same review comment applies in at least 3 places in this
> patch, so please hunt them all down)
>

Updated.

> ------
>
> 7. src/backend/replication/pgoutput/pgoutput.c - comment wording
>
> @@ -677,6 +810,18 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   Assert(false);
>   }
>
> + /*
> + * output BEGIN / BEGIN PREPARE if we haven't yet,
> +     * while streaming no need to send BEGIN / BEGIN PREPARE.
> + */
> + if (!in_streaming && !txndata->sent_begin_txn)
>
> =>
>
> English not really that comment is. The comment should also start with
> uppercase.
>
> (NOTE: This same comment was in couple of places in the patch)
>

Updated.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
I have reviewed the v9 patch and my feedback comments are below:

//////////

1. Apply v9 gave multiple whitespace warnings

$ git apply v9-0001-Skip-empty-transactions-for-logical-replication.patch
v9-0001-Skip-empty-transactions-for-logical-replication.patch:479:
indent with spaces.
    * If the BEGIN PREPARE was not yet sent, then it means there were no
v9-0001-Skip-empty-transactions-for-logical-replication.patch:480:
indent with spaces.
    * relevant changes encountered, so we can skip the ROLLBACK PREPARED
v9-0001-Skip-empty-transactions-for-logical-replication.patch:481:
indent with spaces.
    * messsage too.
v9-0001-Skip-empty-transactions-for-logical-replication.patch:482:
indent with spaces.
    */
warning: 4 lines add whitespace errors.

------

2. Commit comment - wording

pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions which were skipped.

=>

Is that correct? Or did you mean to say:

AFTER
pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions that are empty.

------

3. src/backend/replication/pgoutput/pgoutput.c - typo

+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * messsage too.
+ */

Typo: "messsage" --> "message"

(NOTE this same typo is in 2 places)

------

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Greg Nancarrow
Дата:
On Thu, Jul 22, 2021 at 11:37 PM Ajin Cherian <itsajin@gmail.com> wrote:
>

I have some minor comments on the v9 patch:

(1) Several whitespace warnings on patch application

(2) Suggested patch comment change:

BEFORE:
The current logical replication behaviour is to send every transaction to
subscriber even though the transaction is empty (because it does not
AFTER:
The current logical replication behaviour is to send every transaction to
subscriber even though the transaction might be empty (because it does not

(3) Comment needed for added struct defn:

typedef struct PGOutputTxnData

(4) Improve comment.

Can you add a comma (or add words) in the below sentence, so we know
how to read it?

+ /*
+ * Delegate to assign the begin sent flag as false same as for the
+ * BEGIN message.
+ */


Regards,
Greg Nancarrow
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Fri, Jul 23, 2021 at 10:26 AM Greg Nancarrow <gregn4422@gmail.com> wrote:
>
> On Thu, Jul 22, 2021 at 11:37 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
>
> I have some minor comments on the v9 patch:
>
> (1) Several whitespace warnings on patch application
>

Fixed.

> (2) Suggested patch comment change:
>
> BEFORE:
> The current logical replication behaviour is to send every transaction to
> subscriber even though the transaction is empty (because it does not
> AFTER:
> The current logical replication behaviour is to send every transaction to
> subscriber even though the transaction might be empty (because it does not
>
Changed accordingly.

> (3) Comment needed for added struct defn:
>
> typedef struct PGOutputTxnData
>

Added.

> (4) Improve comment.
>
> Can you add a comma (or add words) in the below sentence, so we know
> how to read it?
>

Updated.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Fri, Jul 23, 2021 at 10:13 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> I have reviewed the v9 patch and my feedback comments are below:
>
> //////////
>
> 1. Apply v9 gave multiple whitespace warnings

Fixed.

>
> ------
>
> 2. Commit comment - wording
>
> pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
> for transactions which were skipped.
>
> =>
>
> Is that correct? Or did you mean to say:
>
> AFTER
> pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
> for transactions that are empty.
>
> ------

Updated.

>
> 3. src/backend/replication/pgoutput/pgoutput.c - typo
>
> + /*
> + * If the BEGIN PREPARE was not yet sent, then it means there were no
> + * relevant changes encountered, so we can skip the COMMIT PREPARED
> + * messsage too.
> + */
>
> Typo: "messsage" --> "message"
>
> (NOTE this same typo is in 2 places)
>
Fixed.

I have made these changes in v10 of the patch.

regards,
Ajin Cherian
Fujitsu Australia



Re: logical replication empty transactions

От
Peter Smith
Дата:
I have reviewed the v10 patch.

Apply / build / test was all OK.

Just one review comment:

//////////

1. Typo

@@ -130,6 +132,17 @@ typedef struct RelationSyncEntry
  TupleConversionMap *map;
 } RelationSyncEntry;

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
+ * is only sent when the first change in a transaction is processed.
+ * This make it possible to skip transactions that are empty.
+ */

=>

typo: "make it possible" --> "makes it possible"

------

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Fri, Jul 23, 2021 at 7:38 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> I have reviewed the v10 patch.
>
> Apply / build / test was all OK.
>
> Just one review comment:
>
> //////////
>
> 1. Typo
>
> @@ -130,6 +132,17 @@ typedef struct RelationSyncEntry
>   TupleConversionMap *map;
>  } RelationSyncEntry;
>
> +/*
> + * Maintain a per-transaction level variable to track whether the
> + * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
> + * is only sent when the first change in a transaction is processed.
> + * This make it possible to skip transactions that are empty.
> + */
>
> =>
>
> typo: "make it possible" --> "makes it possible"
>

fixed.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
FYI - I have checked the v11 patch. Everything applies, builds, and
tests OK for me, and I have no more review comments. So v11 LGTM.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Greg Nancarrow
Дата:
On Fri, Jul 23, 2021 at 8:09 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> fixed.


The v11 patch LGTM.

Regards,
Greg Nancarrow
Fujitsu Australia



RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
On Friday, July 23, 2021 7:10 PM Ajin Cherian <itsajin@gmail.com> wrote:
> On Fri, Jul 23, 2021 at 7:38 PM Peter Smith <smithpb2250@gmail.com> wrote:
> >
> > I have reviewed the v10 patch.
The patch v11 looks good to me as well. 
Thanks for addressing my past comments.


Best Regards,
    Takamichi Osumi


Re: logical replication empty transactions

От
Peter Smith
Дата:
Hi Ajin.

I have spent some time studying how your "empty transaction" (v11)
patch will affect network traffic and transaction throughput.

BLUF
====

For my test environment the general observations with the patch applied are:
- There is a potentially large reduction of network traffic (depends
on the number of empty transactions sent)
- Transaction throughput improved up to 7% (average ~2% across
mixtures) for Synchronous mode
- Transaction throughput improved up to 7% (average ~3% across
mixtures) for NOT Synchronous mode

So this patch LGTM.


TEST INFORMATION
================

Overview
-------------

1. There are 2 similar tables. One table is published; the other is not.

2. Equivalent simple SQL operations are performed on these tables. E.g.
- INSERT/UPDATE/DELETE using normal COMMIT
- INSERT/UPDATE/DELETE using 2PC COMMIT PREPARED

3. pg_bench is used to measure the throughput for different mixes of
empty and not-empty transactions sent. E.g.
- 0% are empty
- 25% are empty
- 50% are empty
- 75% are empty
- 100% are empty

4. The apply_dispatch code has been temporarily modified to log the
number of protocol messages/bytes being processed.
- At the conclusion of the test run the logs are processed to extract
the numbers.

5. Each test run is 15 minutes elapsed time.

6. The tests are repeated without/with your patch applied
- So, there are 2 (without/with patch) x 5 (different mixes) = 10 test results
- Transaction throughput results are from pg_bench
- Protocol message bytes are extracted from the logs (from modified
apply_dispatch)

7. Also, the entire set of 10 test cases was repeated with
synchronous_standby_names setting enable/disabled.
- Enabled, so the results are for total round-trip processing of the pub/sub.
- Disabled. no waiting at the publisher side.


Configuration
-------------------

My environment is a single test machine with 2 PG instances (for pub and sub).

Using default configs except:

PUB-node
- wal_level = logical
- max_wal_senders = 10
- logical_decoding_work_mem = 64kB
- checkpoint_timeout = 30min
- min_wal_size = 10GB
- max_wal_size = 20GB
- shared_buffers = 2GB
- synchronous_standby_names = 'sync_sub' (for synchronous testing only)

SUB-node
- max_worker_processes = 11
- max_logical_replication_workers = 10
- checkpoint_timeout = 30min
- min_wal_size = 10GB
- max_wal_size = 20GB
- shared_buffers = 2GB

SQL files
-------------

Contents of test_empty_not_published.sql:

-- Operations for table not published
BEGIN;
INSERT INTO test_tab_nopub VALUES(1, 'foo');
UPDATE test_tab_nopub SET b = 'bar' WHERE a = 1;
DELETE FROM test_tab_nopub WHERE a = 1;
COMMIT;

-- 2PC operations for table not published
BEGIN;
INSERT INTO test_tab_nopub VALUES(2, 'fizz');
UPDATE test_tab_nopub SET b = 'bang' WHERE a = 2;
DELETE FROM test_tab_nopub WHERE a = 2;
PREPARE TRANSACTION 'gid_nopub';
COMMIT PREPARED 'gid_nopub';

~~

Contents of test_empty_published.sql:

(same as above but the table is called test_tab)


SQL Tables
----------------

(tables are the same apart from the name)

CREATE TABLE test_tab (a int primary key, b text, c timestamptz
DEFAULT now(), d bigint DEFAULT 999);

CREATE TABLE test_tab_nopub (a int primary key, b text, c timestamptz
DEFAULT now(), d bigint DEFAULT 999);


Example pg_bench command
------------------------

(this example is showing a test for a 25% mix of empty transactions)

pgbench -s 100 -T 900 -c 1 -f test_empty_not_published.sql@5 -f
test_empty_published.sql@15 test_pub


RESULTS / OBSERVATIONS
======================

Synchronous Mode
----------------

- As the percentage mix of empty transactions increases, so does the
transaction throughput. I assume this is because we are using
synchronous mode; so when there is less waiting time, then there is
more time available for transaction processing

- The performance was generally similar before/after the patch, but
there was an observed throughput improvement of ~2% (averaged across
all mixes)

- The number of protocol bytes is associated with the number of
transactions that are processed during the test time of 15 minutes.
This adds up to a significant number of bytes even when the
transactions are empty.

- For the unpatched code as the transaction rate increases, then so
does the number of traffic bytes.

- The patch improves this significantly by eliminating all the empty
transaction traffic.

- Before the patch, even "empty transactions" are processing some
bytes, so it can never reach zero. After the patch, empty transaction
traffic is eliminated entirely.


NOT Synchronous Mode
--------------------

- Since there is no synchronous waiting for round trips, the
transaction throughput is generally consistent regardless of the empty
transaction mix.

- There is a hint of a small overall improvement in throughput as the
empty transaction mix approaches near 100%. For my test environment
both the pub/sub nodes are using the same machine/CPU, so I guess is
that when there is less CPU spent processing messages in the Apply
Worker then there is more CPU available to pump transactions at the
publisher side.

- The patch transaction throughput seems ~3% better than for
non-patched. This might also be attributable to the same reason
mentioned above - less CPU spent processing empty messages at the
subscriber side leaves more CPU available to pump transactions from
the publisher side.

- The number of protocol bytes is associated with the number of
transactions that are processed during the test time of 15 minutes.

- Because the transaction throughput is consistent, the traffic of
protocol bytes here is determined mainly by the proportion of "empty
transactions" in the mixture.

- Before the patch, even “empty transactions” are processing some
bytes, so it can never reach zero. After the patch, the empty
transaction traffic is eliminated entirely.

- Before the patch, even “empty transactions” are processing some
bytes, so it can never reach zero. After the patch, the empty
transaction traffic is eliminated entirely.


ATTACHMENTS
===========

PSA

A1. A PDF version of my test report (also includes raw result data)
A2. Sync: Graph of Transaction throughput
A3. Sync: Graph of Protocol bytes (total)
A4. Sync: Graph of Protocol bytes (per transaction)
A5. Not-Sync: Graph of Transaction throughput
A6. Not-Sync: Graph of Protocol bytes (total)
A7. Not-Sync: Graph of Protocol bytes (per transaction)

------
Kind Regards,
Peter Smith.
Fujitsu Australia.

Вложения

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
>

Let's first split the patch for prepared and non-prepared cases as
that will help to focus on each of them separately. BTW, why haven't
you considered implementing point 1b as explained by Andres in his
email [1]? I think we can send a keepalive message in case of
synchronous replication when we skip an empty transaction, otherwise,
it might delay in responding to transactions synchronous_commit mode.
I think in the tests done in the thread, it might not have been shown
because we are already sending keepalives too frequently. But what if
someone disables wal_sender_timeout or kept it to a very large value?
See WalSndKeepaliveIfNecessary. The other thing you might want to look
at is if the reason for frequent keepalives is the same as described
in the email [2].

Few other miscellaneous comments:
1.
static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
 {
+ PGOutputTxnData    *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
  OutputPluginUpdateProgress(ctx);

+ /*
+ * If the BEGIN PREPARE was not yet sent, then it means there were no
+ * relevant changes encountered, so we can skip the COMMIT PREPARED
+ * message too.
+ */
+ if (txndata)
+ {
+ bool skip = !txndata->sent_begin_txn;
+ pfree(txndata);
+ txn->output_plugin_private = NULL;

How is this supposed to work after the restart when prepared is sent
before the restart and we are just sending commit_prepared after
restart? Won't this lead to sending commit_prepared even when the
corresponding prepare is not sent? Can we think of a better way to
deal with this?

2.
@@ -222,8 +224,10 @@ logicalrep_write_commit_prepared(StringInfo out,
ReorderBufferTXN *txn,
  pq_sendbyte(out, flags);

  /* send fields */
+ pq_sendint64(out, prepare_end_lsn);
  pq_sendint64(out, commit_lsn);
  pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);

Doesn't this means a change of protocol and how is it suppose to work
when say publisher is 15 and subscriber from 14 which I think works
without such a change?


[1] - https://www.postgresql.org/message-id/20200309183018.tzkzwu635sd366ej%40alap3.anarazel.de
[2] - https://www.postgresql.org/message-id/CALtH27cip5uQNJb4uHjLXtx1R52ELqXVfcP9fhHr%3DAvFo1dtqw%40mail.gmail.com

-- 
With Regards,
Amit Kapila.



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
>
> Let's first split the patch for prepared and non-prepared cases as
> that will help to focus on each of them separately.

As a first shot, I have split the patch into prepared and non-prepared cases,

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
On Sat, Aug 7, 2021 at 12:01 AM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > >
> >
> > Let's first split the patch for prepared and non-prepared cases as
> > that will help to focus on each of them separately.
>
> As a first shot, I have split the patch into prepared and non-prepared cases,

I have reviewed the v12* split patch set.

Apply / build / test was all OK

Below are my code review comments (mostly cosmetic).

//////////

Comments for v12-0001
=====================

1. Patch comment

=>

This comment as-is might have been OK before the 2PC code was
committed, but now that the 2PC is part of the HEAD perhaps this
comment needs to be expanded just to say this patch is ONLY for fixing
empty transactions for the cases of non-"streaming" and
non-"two_phase", and the other kinds will be tackled separately.

------

2. src/backend/replication/pgoutput/pgoutput.c - PGOutputTxnData comment

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN or BEGIN PREPARE. BEGIN or BEGIN PREPARE
+ * is only sent when the first change in a transaction is processed.
+ * This makes it possible to skip transactions that are empty.
+ */

=>

Maybe this is true for the combined v12-0001/v12-0002 case but just
for the v12-0001 patch I think it is nor right to imply that some
skipping of the BEGIN_PREPARE is possible, because IIUC it isn;t
implemented in the *this* patch/

------

3. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn whitespace

+ PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));

=>

Misaligned indentation?

------

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change brackets

+ /*
+ * Output BEGIN if we haven't yet, unless streaming.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, txn);
+ }

=>

The brackets are not needed for the if with a single statement.

------

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate
brackets/comment

+ /*
+ * output BEGIN if we haven't yet,
+ * while streaming no need to send BEGIN / BEGIN PREPARE.
+ */
+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, txn);
+ }

5a. =>

Same as review comment 4. The brackets are not needed for the if with
a single statement.

5b. =>

Notice this code is the same as cited in review comment 4. So probably
the code comment should be consistent/same also?

------

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message brackets

+ Assert(txndata);
+ if (!txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, txn);
+ }

=>

The brackets are not needed for the if with a single statement.

------

7. typdefs.list

=> The structure PGOutputTxnData was added in v12-0001, so the
typedefs.list probably should also be updated.

//////////

Comments for v12-0002
=====================

8. Patch comment

This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE messages until the first change is encountered.
If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
or BEGIN PREPARE / PREPARE  messages for transactions that are empty.
pgoutput will also skip COMMIT PREPARED and ROLLBACK PREPARED messages
for transactions that are empty.

8a. =>

I’m not sure this comment is 100% correct for this specific patch. The
whole BEGIN/COMMIT was already handled by the v12-0001 patch, right?
So really this comment should only be mentioning about BEGIN PREPARE
and COMMIT PREPARED I thought.

8b. =>

I think there should also be some mention that this patch is not
handling the "streaming" case of empty tx at all.

------

9. src/backend/replication/logical/proto.c - protocol version

@@ -248,8 +250,10 @@ logicalrep_write_commit_prepared(StringInfo out,
ReorderBufferTXN *txn,
  pq_sendbyte(out, flags);

  /* send fields */
+ pq_sendint64(out, prepare_end_lsn);
  pq_sendint64(out, commit_lsn);
  pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);
  pq_sendint64(out, txn->xact_time.commit_time);
  pq_sendint32(out, txn->xid);

=>

I agree with a previous feedback comment from Amit - Probably there is
some protocol version requirement/implications here because the
message format has been changed in logicalrep_write_commit_prepared
and logicalrep_read_commit_prepared.

e.g. Does this code need to be cognisant of the version and behave
differently accordingly?

------

10. src/backend/replication/pgoutput/pgoutput.c -
pgoutput_begin_prepare flag moved?

+ Assert(txndata);
  OutputPluginPrepareWrite(ctx, !send_replication_origin);
  logicalrep_write_begin_prepare(ctx->out, txn);
+ txndata->sent_begin_txn = true;

  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
  send_replication_origin);

  OutputPluginWrite(ctx, true);
- txndata->sent_begin_txn = true;
- txn->output_plugin_private = txndata;
 }

=>

In the v12-0001 patch, you set the begin_txn flags AFTER the
OuputPluginWrite, but in the v12-0002 you set them BEFORE the
OuputPluginWrite. Why the difference? Maybe it should be consistent?

------

11. src/test/subscription/t/021_twophase.pl - proto_version tests needed?

Does this need some other tests to make sure the older proto_version
is still usable? Refer also to the review comment 9.

------

12. src/tools/pgindent/typedefs.list - PGOutputTxnData

 PGOutputData
+PGOutputTxnData
 PGPROC

=>

This change looks good, but I think it should have been done in
v12-0001 and not here in v12-0002.

------

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
>
> Let's first split the patch for prepared and non-prepared cases as
> that will help to focus on each of them separately. BTW, why haven't
> you considered implementing point 1b as explained by Andres in his
> email [1]? I think we can send a keepalive message in case of
> synchronous replication when we skip an empty transaction, otherwise,
> it might delay in responding to transactions synchronous_commit mode.
> I think in the tests done in the thread, it might not have been shown
> because we are already sending keepalives too frequently. But what if
> someone disables wal_sender_timeout or kept it to a very large value?
> See WalSndKeepaliveIfNecessary. The other thing you might want to look
> at is if the reason for frequent keepalives is the same as described
> in the email [2].
>

I have tried to address the comment here by modifying the
ctx->update_progress callback function (WalSndUpdateProgress) provided
for plugins. I have added an option
by which the callback can specify if it wants to send keep_alives. And
when the callback is called with that option set, walsender updates a
flag force_keep_alive_syncrep.
The Walsender in the WalSndWaitForWal for loop, checks this flag and
if synchronous replication is enabled, then sends a keep alive.
Currently this logic
is added as an else to the current logic that is already there in
WalSndWaitForWal, which is probably considered unnecessary and a
source of the keep alive flood
that you talked about. So, I can change that according to how that fix
shapes up there. I have also added an extern function in syncrep.c
that makes it possible
for walsender to query if synchronous replication is turned on.

The reason I had to turn on a flag and rely on the WalSndWaitForWal to
send the keep alive in its next iteration is because I tried doing
this directly when a
commit is skipped but it didn't work. The reason for this is that when
the commit is being decoded the sentptr at the moment is at the commit
LSN and the keep alive
will be sent for the commit LSN but the syncrep wait is waiting for
end_lsn of the transaction which is the next LSN. So, sending a keep
alive at the moment the
commit is decoded doesn't seem to solve the problem of the waiting
synchronous reply.

> Few other miscellaneous comments:
> 1.
> static void
>  pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> - XLogRecPtr commit_lsn)
> + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
> + TimestampTz prepare_time)
>  {
> + PGOutputTxnData    *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
>   OutputPluginUpdateProgress(ctx);
>
> + /*
> + * If the BEGIN PREPARE was not yet sent, then it means there were no
> + * relevant changes encountered, so we can skip the COMMIT PREPARED
> + * message too.
> + */
> + if (txndata)
> + {
> + bool skip = !txndata->sent_begin_txn;
> + pfree(txndata);
> + txn->output_plugin_private = NULL;
>
> How is this supposed to work after the restart when prepared is sent
> before the restart and we are just sending commit_prepared after
> restart? Won't this lead to sending commit_prepared even when the
> corresponding prepare is not sent? Can we think of a better way to
> deal with this?
>

I have tried to resolve this by adding logic in worker,c to silently
ignore spurious commit_prepareds. But this change required checking if
the prepare exists on the
subscriber before attempting the commit_prepared but the current API
that checks this requires prepare time and transaction end_lsn. But
for this I had to
change the protocol of commit_prepared, and I understand that this
would break backward compatibility between subscriber and publisher
(you have raised this issue as well).
I am not sure how else to handle this, let me know if you have any
other ideas. One option could be to have another API to check if the
prepare exists on the subscriber with
the prepared 'gid' alone, without checking prepare_time or end_lsn.
Let me know if this idea works.

I have left out the patch 0002 for prepared transactions until we
arrive at a decision on how to address the above issue.

Peter,
I have also addressed the comments you've raised on patch 0001, please
have a look and confirm.

Regards,
Ajin Cherian
Fujitsu Australia.

Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
On Fri, Aug 13, 2021 at 9:01 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > >
> >
> > Let's first split the patch for prepared and non-prepared cases as
> > that will help to focus on each of them separately. BTW, why haven't
> > you considered implementing point 1b as explained by Andres in his
> > email [1]? I think we can send a keepalive message in case of
> > synchronous replication when we skip an empty transaction, otherwise,
> > it might delay in responding to transactions synchronous_commit mode.
> > I think in the tests done in the thread, it might not have been shown
> > because we are already sending keepalives too frequently. But what if
> > someone disables wal_sender_timeout or kept it to a very large value?
> > See WalSndKeepaliveIfNecessary. The other thing you might want to look
> > at is if the reason for frequent keepalives is the same as described
> > in the email [2].
> >
>
> I have tried to address the comment here by modifying the
> ctx->update_progress callback function (WalSndUpdateProgress) provided
> for plugins. I have added an option
> by which the callback can specify if it wants to send keep_alives. And
> when the callback is called with that option set, walsender updates a
> flag force_keep_alive_syncrep.
> The Walsender in the WalSndWaitForWal for loop, checks this flag and
> if synchronous replication is enabled, then sends a keep alive.
> Currently this logic
> is added as an else to the current logic that is already there in
> WalSndWaitForWal, which is probably considered unnecessary and a
> source of the keep alive flood
> that you talked about. So, I can change that according to how that fix
> shapes up there. I have also added an extern function in syncrep.c
> that makes it possible
> for walsender to query if synchronous replication is turned on.
>
> The reason I had to turn on a flag and rely on the WalSndWaitForWal to
> send the keep alive in its next iteration is because I tried doing
> this directly when a
> commit is skipped but it didn't work. The reason for this is that when
> the commit is being decoded the sentptr at the moment is at the commit
> LSN and the keep alive
> will be sent for the commit LSN but the syncrep wait is waiting for
> end_lsn of the transaction which is the next LSN. So, sending a keep
> alive at the moment the
> commit is decoded doesn't seem to solve the problem of the waiting
> synchronous reply.
>
> > Few other miscellaneous comments:
> > 1.
> > static void
> >  pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> > ReorderBufferTXN *txn,
> > - XLogRecPtr commit_lsn)
> > + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
> > + TimestampTz prepare_time)
> >  {
> > + PGOutputTxnData    *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> > +
> >   OutputPluginUpdateProgress(ctx);
> >
> > + /*
> > + * If the BEGIN PREPARE was not yet sent, then it means there were no
> > + * relevant changes encountered, so we can skip the COMMIT PREPARED
> > + * message too.
> > + */
> > + if (txndata)
> > + {
> > + bool skip = !txndata->sent_begin_txn;
> > + pfree(txndata);
> > + txn->output_plugin_private = NULL;
> >
> > How is this supposed to work after the restart when prepared is sent
> > before the restart and we are just sending commit_prepared after
> > restart? Won't this lead to sending commit_prepared even when the
> > corresponding prepare is not sent? Can we think of a better way to
> > deal with this?
> >
>
> I have tried to resolve this by adding logic in worker,c to silently
> ignore spurious commit_prepareds. But this change required checking if
> the prepare exists on the
> subscriber before attempting the commit_prepared but the current API
> that checks this requires prepare time and transaction end_lsn. But
> for this I had to
> change the protocol of commit_prepared, and I understand that this
> would break backward compatibility between subscriber and publisher
> (you have raised this issue as well).
> I am not sure how else to handle this, let me know if you have any
> other ideas. One option could be to have another API to check if the
> prepare exists on the subscriber with
> the prepared 'gid' alone, without checking prepare_time or end_lsn.
> Let me know if this idea works.
>
> I have left out the patch 0002 for prepared transactions until we
> arrive at a decision on how to address the above issue.
>
> Peter,
> I have also addressed the comments you've raised on patch 0001, please
> have a look and confirm.

I have reviewed the v13-0001 patch.

Apply / build / test was all OK

Below are my code review comments.

//////////

Comments for v13-0001
=====================

1. Patch comment

=>

Probably this comment should include some description for the new
"keepalive" logic as well.

------

2. src/backend/replication/syncrep.c - new function

@@ -330,6 +330,18 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 }

 /*
+ * Check if Sync Rep is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ if (SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined)
+ return true;
+ else
+ return false;
+}
+

2a. Function comment =>

Why abbreviations in the comment? Why not say "synchronous
replication" instead of "Sync Rep".

~~

2b. if/else =>

Remove the if/else. e.g.

return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;

~~

2c. Call the new function =>

There is some existing similar code in SyncRepWaitForLSN(), e.g.

if (!SyncRepRequested() ||
!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
return;

Now that you have a new function you maybe can call it from here, e.g.

if (!SyncRepEnabled())
return;

------

3. src/backend/replication/walsender.c - whitespace

+ if (send_keep_alive)
+ force_keep_alive_syncrep = true;
+
+

=>

Extra blank line?

------

4. src/backend/replication/walsender.c - call keepalive

  if (MyWalSnd->flush < sentPtr &&
  MyWalSnd->write < sentPtr &&
  !waiting_for_ping_response)
+ {
  WalSndKeepalive(false);
+ }
+ else
+ {
+ if (force_keep_alive_syncrep && SyncRepEnabled())
+ WalSndKeepalive(false);
+ }


4a. Move the SynRepEnabled() call =>

I think it is not necessary to call the SynRepEnabled() here. Instead,
it might be better if this is called back when you assign the
force_keep_alive_syncrep flag. So change the WalSndUpdateProgress,
e.g.

BEFORE
if (send_keep_alive)
  force_keep_alive_syncrep = true;
AFTER
force_keep_alive_syncrep = send_keep_alive && SyncRepEnabled();

Note: Also, that assignment also deserves a big comment to say what it is doing.

~~

4b. Change the if/else =>

If you make the change for 4a. then perhaps the keepalive if/else is
overkill and could be changed.e.g.

if (force_keep_alive_syncrep ||
  MyWalSnd->flush < sentPtr &&
  MyWalSnd->write < sentPtr &&
  !waiting_for_ping_response)
  WalSndKeepalive(false);

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Aug 16, 2021 at 4:44 PM Peter Smith <smithpb2250@gmail.com> wrote:

> I have reviewed the v13-0001 patch.
>
> Apply / build / test was all OK
>
> Below are my code review comments.
>
> //////////
>
> Comments for v13-0001
> =====================
>
> 1. Patch comment
>
> =>
>
> Probably this comment should include some description for the new
> "keepalive" logic as well.

Added.

>
> ------
>
> 2. src/backend/replication/syncrep.c - new function
>
> @@ -330,6 +330,18 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
>  }
>
>  /*
> + * Check if Sync Rep is enabled
> + */
> +bool
> +SyncRepEnabled(void)
> +{
> + if (SyncRepRequested() && ((volatile WalSndCtlData *)
> WalSndCtl)->sync_standbys_defined)
> + return true;
> + else
> + return false;
> +}
> +
>
> 2a. Function comment =>
>
> Why abbreviations in the comment? Why not say "synchronous
> replication" instead of "Sync Rep".
>

Changed.

> ~~
>
> 2b. if/else =>
>
> Remove the if/else. e.g.
>
> return SyncRepRequested() && ((volatile WalSndCtlData *)
> WalSndCtl)->sync_standbys_defined;
>
> ~~

Changed.

>
> 2c. Call the new function =>
>
> There is some existing similar code in SyncRepWaitForLSN(), e.g.
>
> if (!SyncRepRequested() ||
> !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
> return;
>
> Now that you have a new function you maybe can call it from here, e.g.
>
> if (!SyncRepEnabled())
> return;
>

Updated.

> ------
>
> 3. src/backend/replication/walsender.c - whitespace
>
> + if (send_keep_alive)
> + force_keep_alive_syncrep = true;
> +
> +
>
> =>
>
> Extra blank line?

Removed.

>
> ------
>
> 4. src/backend/replication/walsender.c - call keepalive
>
>   if (MyWalSnd->flush < sentPtr &&
>   MyWalSnd->write < sentPtr &&
>   !waiting_for_ping_response)
> + {
>   WalSndKeepalive(false);
> + }
> + else
> + {
> + if (force_keep_alive_syncrep && SyncRepEnabled())
> + WalSndKeepalive(false);
> + }
>
>
> 4a. Move the SynRepEnabled() call =>
>
> I think it is not necessary to call the SynRepEnabled() here. Instead,
> it might be better if this is called back when you assign the
> force_keep_alive_syncrep flag. So change the WalSndUpdateProgress,
> e.g.
>
> BEFORE
> if (send_keep_alive)
>   force_keep_alive_syncrep = true;
> AFTER
> force_keep_alive_syncrep = send_keep_alive && SyncRepEnabled();
>
> Note: Also, that assignment also deserves a big comment to say what it is doing.
>
> ~~

changed.

>
> 4b. Change the if/else =>
>
> If you make the change for 4a. then perhaps the keepalive if/else is
> overkill and could be changed.e.g.
>
> if (force_keep_alive_syncrep ||
>   MyWalSnd->flush < sentPtr &&
>   MyWalSnd->write < sentPtr &&
>   !waiting_for_ping_response)
>   WalSndKeepalive(false);
>

Changed.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
I reviewed the v14-0001 patch.

All my previous comments have been addressed.

Apply / build / test was all OK.

------

More review comments:

1. Params names in the function declarations should match the rest of the code.

1a. src/include/replication/logical.h

@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite
LogicalOutputPluginWriterPrepareWrite;

 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct
LogicalDecodingContext *lr,
  XLogRecPtr Ptr,
- TransactionId xid
+ TransactionId xid,
+ bool send_keep_alive

=>
Change "send_keep_alive" --> "send_keepalive"

~~

1b. src/include/replication/output_plugin.h

@@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext
*ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx,
bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext
*ctx, bool send_keep_alive);

=>
Change "send_keep_alive" --> "send_keepalive"

------

2. Comment should be capitalized - src/backend/replication/walsender.c

@@ -170,6 +170,9 @@ static TimestampTz last_reply_timestamp = 0;
 /* Have we sent a heartbeat message asking for reply, since last reply? */
 static bool waiting_for_ping_response = false;

+/* force keep alive when skipping transactions in synchronous
replication mode */
+static bool force_keepalive_syncrep = false;

=>
"force" --> "Force"

------

Otherwise, v14-0001 LGTM.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Wed, Aug 25, 2021 at 5:15 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> I reviewed the v14-0001 patch.
>
> All my previous comments have been addressed.
>
> Apply / build / test was all OK.
>
> ------
>
> More review comments:
>
> 1. Params names in the function declarations should match the rest of the code.
>
> 1a. src/include/replication/logical.h
>
> @@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite
> LogicalOutputPluginWriterPrepareWrite;
>
>  typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct
> LogicalDecodingContext *lr,
>   XLogRecPtr Ptr,
> - TransactionId xid
> + TransactionId xid,
> + bool send_keep_alive
>
> =>
> Change "send_keep_alive" --> "send_keepalive"
>
> ~~
>
> 1b. src/include/replication/output_plugin.h
>
> @@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks
>  /* Functions in replication/logical/logical.c */
>  extern void OutputPluginPrepareWrite(struct LogicalDecodingContext
> *ctx, bool last_write);
>  extern void OutputPluginWrite(struct LogicalDecodingContext *ctx,
> bool last_write);
> -extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
> +extern void OutputPluginUpdateProgress(struct LogicalDecodingContext
> *ctx, bool send_keep_alive);
>
> =>
> Change "send_keep_alive" --> "send_keepalive"
>
> ------
>
> 2. Comment should be capitalized - src/backend/replication/walsender.c
>
> @@ -170,6 +170,9 @@ static TimestampTz last_reply_timestamp = 0;
>  /* Have we sent a heartbeat message asking for reply, since last reply? */
>  static bool waiting_for_ping_response = false;
>
> +/* force keep alive when skipping transactions in synchronous
> replication mode */
> +static bool force_keepalive_syncrep = false;
>
> =>
> "force" --> "Force"
>
> ------
>
> Otherwise, v14-0001 LGTM.
>

Thanks for the comments. Addressed them in the attached patch.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Wed, Sep 1, 2021 at 8:57 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> Thanks for the comments. Addressed them in the attached patch.
>
> regards,
> Ajin Cherian
> Fujitsu Australia

Minor update to rebase the patch so that it applies clean on HEAD.

regards,
Ajin Cherian

regards,
Ajin Cherian

Вложения

RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com> wrote:
> Minor update to rebase the patch so that it applies clean on HEAD.
Hi, thanks for you rebase.

Several comments.

(1) the commit message

"
transactions, keepalive messages are sent to keep the LSN locations updated
on the standby.
This patch does not skip empty transactions that are "streaming" or "two-phase".
"

I suggest that one blank line might be needed before the last paragraph.

(2) Could you please remove one pair of curly brackets for one sentence below ?

@@ -1546,10 +1557,13 @@ WalSndWaitForWal(XLogRecPtr loc)
                 * otherwise idle, this keepalive will trigger a reply. Processing the
                 * reply will update these MyWalSnd locations.
                 */
-               if (MyWalSnd->flush < sentPtr &&
+               if (force_keepalive_syncrep ||
+                       (MyWalSnd->flush < sentPtr &&
                        MyWalSnd->write < sentPtr &&
-                       !waiting_for_ping_response)
+                       !waiting_for_ping_response))
+               {
                        WalSndKeepalive(false);
+               }


(3) Is this patch's reponsibility to intialize the data in pgoutput_begin_prepare_txn ?

@@ -433,6 +487,8 @@ static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
        bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+
sizeof(PGOutputTxnData));

        OutputPluginPrepareWrite(ctx, !send_replication_origin);
        logicalrep_write_begin_prepare(ctx->out, txn);


Even if we need this initialization for either non streaming case
or non two_phase case, there can be another issue.
We don't free the allocated memory for this data, right ?
There's only one place to use free in the entire patch,
which is in the pgoutput_commit_txn(). So,
corresponding free of memory looked necessary
in the two phase commit functions.

(4) SyncRepEnabled's better alignment.

IIUC, SyncRepEnabled is called not only by the walsender but also by other backends
via CommitTransaction -> RecordTransactionCommit -> SyncRepWaitForLSN.
Then, the place to add the prototype function for SyncRepEnabled seems not appropriate,
strictly speaking or requires a comment like /* called by wal sender or other backends */.

@@ -90,6 +90,7 @@ extern void SyncRepCleanupAtProcExit(void);
 /* called by wal sender */
 extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
+extern bool SyncRepEnabled(void);

Even if we intend it is only used by the walsender, the current code place
of SyncRepEnabled in the syncrep.c might not be perfect.
In this file, seemingly we have a section for functions for wal sender processes
and the place where you wrote it is not here.

at src/backend/replication/syncrep.c, find a comment below.
/*
 * ===========================================================
 * Synchronous Replication functions for wal sender processes
 * ===========================================================
 */

(5) minor alignment for expressing a couple of messages.

@@ -777,6 +846,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        Oid                *relids;
        TransactionId xid = InvalidTransactionId;

+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);


In the commit message, the way you write is below.
...
skip BEGIN / COMMIT messages for transactions that are empty. The patch
...

In this case, we have spaces back and forth for "BEGIN / COMMIT".
Then, I suggest to unify all of those to show better alignment.

Best Regards,
    Takamichi Osumi


RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
On Tuesday, January 11, 2022 6:43 PM From: Ajin Cherian <itsajin@gmail.com> wrote:
> Minor update to rebase the patch so that it applies clean on HEAD.
Hi, let me share some additional comments on v16.


(1) comment of pgoutput_change

@@ -630,11 +688,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                Relation relation, ReorderBufferChange *change)
 {
        PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        MemoryContext old;
        RelationSyncEntry *relentry;
        TransactionId xid = InvalidTransactionId;
        Relation        ancestor = NULL;

+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);
+

In my humble opinion, the comment should not touch BEGIN PREPARE,
because this patch's scope doesn't include two phase commit.
(We could add this in another patch to extend the scope after the commit ?)

This applies to pgoutput_truncate's comment.

(2) "keep alive" should be "keepalive" in WalSndUpdateProgress

        /*
+        * When skipping empty transactions in synchronous replication, we need
+        * to send a keep alive to keep the MyWalSnd locations updated.
+        */
+       force_keepalive_syncrep = send_keepalive && SyncRepEnabled();
+

Also, this applies to the comment for force_keepalive_syncrep.

(3) Should finish the second sentence with period in the comment of pgoutput_message.

@@ -845,6 +923,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        if (in_streaming)
                xid = txn->xid;

+       /*
+        * Output BEGIN if we haven't yet.
+        * Avoid for streaming and non-transactional messages

(4) "begin" can be changed to "BEGIN" in the comment of PGOutputTxnData definition.

In the entire patch, when we express BEGIN message,
we use capital letters "BEGIN" except for one place.
We can apply the same to this place as well.

+typedef struct PGOutputTxnData
+{
+       bool sent_begin_txn;    /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+

(5) inconsistent way to write Assert statements with blank lines

In the below case, it'd be better to insert one blank line
after the Assert();

+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
        bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;

+       Assert(txndata);
        OutputPluginPrepareWrite(ctx, !send_replication_origin);


(6) new codes in the pgoutput_commit_txn looks messy slightly

@@ -419,7 +455,25 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                        XLogRecPtr commit_lsn)
 {
-       OutputPluginUpdateProgress(ctx);
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+       bool            skip;
+
+       Assert(txndata);
+
+       /*
+        * If a BEGIN message was not yet sent, then it means there were no relevant
+        * changes encountered, so we can skip the COMMIT message too.
+        */
+       skip = !txndata->sent_begin_txn;
+       pfree(txndata);
+       txn->output_plugin_private = NULL;
+       OutputPluginUpdateProgress(ctx, skip);

Could we conduct a refactoring for this new part ?
IMO, writing codes to free the data structure at the top
of function seems weird.

One idea is to export some part there
and write a new function, something like below.

static bool
txn_sent_begin(ReorderBufferTXN *txn)
{
    PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    bool            needs_skip;

    Assert(txndata);

    needs_skip = !txndata->sent_begin_txn;

    pfree(txndata);
    txn->output_plugin_private = NULL;

    return needs_skip;
}

FYI, I had a look at the v12-0002-Skip-empty-prepared-transactions-for-logical-rep.patch
for reference of pgoutput_rollback_prepared_txn and pgoutput_commit_prepared_txn.
Looks this kind of function might work for future extensions as well.
What did you think ?

Best Regards,
    Takamichi Osumi


Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Wed, Jan 26, 2022 at 8:33 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
>
> On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > Minor update to rebase the patch so that it applies clean on HEAD.
> Hi, thanks for you rebase.
>
> Several comments.
>
> (1) the commit message
>
> "
> transactions, keepalive messages are sent to keep the LSN locations updated
> on the standby.
> This patch does not skip empty transactions that are "streaming" or "two-phase".
> "
>
> I suggest that one blank line might be needed before the last paragraph.

Changed.

>
> (2) Could you please remove one pair of curly brackets for one sentence below ?
>
> @@ -1546,10 +1557,13 @@ WalSndWaitForWal(XLogRecPtr loc)
>                  * otherwise idle, this keepalive will trigger a reply. Processing the
>                  * reply will update these MyWalSnd locations.
>                  */
> -               if (MyWalSnd->flush < sentPtr &&
> +               if (force_keepalive_syncrep ||
> +                       (MyWalSnd->flush < sentPtr &&
>                         MyWalSnd->write < sentPtr &&
> -                       !waiting_for_ping_response)
> +                       !waiting_for_ping_response))
> +               {
>                         WalSndKeepalive(false);
> +               }
>
>

Changed.

> (3) Is this patch's reponsibility to intialize the data in pgoutput_begin_prepare_txn ?
>
> @@ -433,6 +487,8 @@ static void
>  pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
>         bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
> +       PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
> +
sizeof(PGOutputTxnData));
>
>         OutputPluginPrepareWrite(ctx, !send_replication_origin);
>         logicalrep_write_begin_prepare(ctx->out, txn);
>
>
> Even if we need this initialization for either non streaming case
> or non two_phase case, there can be another issue.
> We don't free the allocated memory for this data, right ?
> There's only one place to use free in the entire patch,
> which is in the pgoutput_commit_txn(). So,
> corresponding free of memory looked necessary
> in the two phase commit functions.
>

Actually it is required for begin_prepare to set the data type, so
that the checks in the pgoutput_change can make sure that
the begin prepare is sent. I've also added a free in commit_prepared code.

> (4) SyncRepEnabled's better alignment.
>
> IIUC, SyncRepEnabled is called not only by the walsender but also by other backends
> via CommitTransaction -> RecordTransactionCommit -> SyncRepWaitForLSN.
> Then, the place to add the prototype function for SyncRepEnabled seems not appropriate,
> strictly speaking or requires a comment like /* called by wal sender or other backends */.
>
> @@ -90,6 +90,7 @@ extern void SyncRepCleanupAtProcExit(void);
>  /* called by wal sender */
>  extern void SyncRepInitConfig(void);
>  extern void SyncRepReleaseWaiters(void);
> +extern bool SyncRepEnabled(void);
>
> Even if we intend it is only used by the walsender, the current code place
> of SyncRepEnabled in the syncrep.c might not be perfect.
> In this file, seemingly we have a section for functions for wal sender processes
> and the place where you wrote it is not here.
>
> at src/backend/replication/syncrep.c, find a comment below.
> /*
>  * ===========================================================
>  * Synchronous Replication functions for wal sender processes
>  * ===========================================================
>  */

Changed.
>
> (5) minor alignment for expressing a couple of messages.
>
> @@ -777,6 +846,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>         Oid                *relids;
>         TransactionId xid = InvalidTransactionId;
>
> +       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
> +       Assert(in_streaming || txndata);
>
>
> In the commit message, the way you write is below.
> ...
> skip BEGIN / COMMIT messages for transactions that are empty. The patch
> ...
>
> In this case, we have spaces back and forth for "BEGIN / COMMIT".
> Then, I suggest to unify all of those to show better alignment.

fixed.

regards,
Ajin Cherian

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Thu, Jan 27, 2022 at 12:16 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
>
> On Tuesday, January 11, 2022 6:43 PM From: Ajin Cherian <itsajin@gmail.com> wrote:
> > Minor update to rebase the patch so that it applies clean on HEAD.
> Hi, let me share some additional comments on v16.
>
>
> (1) comment of pgoutput_change
>
> @@ -630,11 +688,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>                                 Relation relation, ReorderBufferChange *change)
>  {
>         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> +       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>         MemoryContext old;
>         RelationSyncEntry *relentry;
>         TransactionId xid = InvalidTransactionId;
>         Relation        ancestor = NULL;
>
> +       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
> +       Assert(in_streaming || txndata);
> +
>
> In my humble opinion, the comment should not touch BEGIN PREPARE,
> because this patch's scope doesn't include two phase commit.
> (We could add this in another patch to extend the scope after the commit ?)
>

We have to include BEGIN PREPARE as well, as the txndata has to be
setup. Only difference is that we will not skip empty transaction in
BEGIN PREPARE

> This applies to pgoutput_truncate's comment.
>
> (2) "keep alive" should be "keepalive" in WalSndUpdateProgress
>
>         /*
> +        * When skipping empty transactions in synchronous replication, we need
> +        * to send a keep alive to keep the MyWalSnd locations updated.
> +        */
> +       force_keepalive_syncrep = send_keepalive && SyncRepEnabled();
> +
>
> Also, this applies to the comment for force_keepalive_syncrep.

Fixed.

>
> (3) Should finish the second sentence with period in the comment of pgoutput_message.
>
> @@ -845,6 +923,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>         if (in_streaming)
>                 xid = txn->xid;
>
> +       /*
> +        * Output BEGIN if we haven't yet.
> +        * Avoid for streaming and non-transactional messages
>

Fixed.

> (4) "begin" can be changed to "BEGIN" in the comment of PGOutputTxnData definition.
>
> In the entire patch, when we express BEGIN message,
> we use capital letters "BEGIN" except for one place.
> We can apply the same to this place as well.
>
> +typedef struct PGOutputTxnData
> +{
> +       bool sent_begin_txn;    /* flag indicating whether begin has been sent */
> +} PGOutputTxnData;
> +
>

Fixed.

> (5) inconsistent way to write Assert statements with blank lines
>
> In the below case, it'd be better to insert one blank line
> after the Assert();
>
> +static void
> +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
> +{
>         bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
> +       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>
> +       Assert(txndata);
>         OutputPluginPrepareWrite(ctx, !send_replication_origin);
>
>

Fixed.

> (6) new codes in the pgoutput_commit_txn looks messy slightly
>
> @@ -419,7 +455,25 @@ static void
>  pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>                                         XLogRecPtr commit_lsn)
>  {
> -       OutputPluginUpdateProgress(ctx);
> +       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +       bool            skip;
> +
> +       Assert(txndata);
> +
> +       /*
> +        * If a BEGIN message was not yet sent, then it means there were no relevant
> +        * changes encountered, so we can skip the COMMIT message too.
> +        */
> +       skip = !txndata->sent_begin_txn;
> +       pfree(txndata);
> +       txn->output_plugin_private = NULL;
> +       OutputPluginUpdateProgress(ctx, skip);
>
> Could we conduct a refactoring for this new part ?
> IMO, writing codes to free the data structure at the top
> of function seems weird.
>
> One idea is to export some part there
> and write a new function, something like below.
>
> static bool
> txn_sent_begin(ReorderBufferTXN *txn)
> {
>     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>     bool            needs_skip;
>
>     Assert(txndata);
>
>     needs_skip = !txndata->sent_begin_txn;
>
>     pfree(txndata);
>     txn->output_plugin_private = NULL;
>
>     return needs_skip;
> }
>
> FYI, I had a look at the v12-0002-Skip-empty-prepared-transactions-for-logical-rep.patch
> for reference of pgoutput_rollback_prepared_txn and pgoutput_commit_prepared_txn.
> Looks this kind of function might work for future extensions as well.
> What did you think ?

I changed a bit, but I'd hold a comprehensive rewrite when a future
patch supports skipping
empty transactions in two-phase transactions and streaming transactions.

regards,
Ajin Cherian



RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
On Thursday, January 27, 2022 9:57 PM Ajin Cherian <itsajin@gmail.com> wrote:
Hi, thanks for your patch update.


> On Wed, Jan 26, 2022 at 8:33 PM osumi.takamichi@fujitsu.com
> <osumi.takamichi@fujitsu.com> wrote:
> >
> > On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com>
> wrote:
> > (3) Is this patch's reponsibility to intialize the data in
> pgoutput_begin_prepare_txn ?
> >
> > @@ -433,6 +487,8 @@ static void
> >  pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
> > ReorderBufferTXN *txn)  {
> >         bool            send_replication_origin = txn->origin_id !=
> InvalidRepOriginId;
> > +       PGOutputTxnData    *txndata =
> MemoryContextAllocZero(ctx->context,
> > +
> > + sizeof(PGOutputTxnData));
> >
> >         OutputPluginPrepareWrite(ctx, !send_replication_origin);
> >         logicalrep_write_begin_prepare(ctx->out, txn);
> >
> >
> > Even if we need this initialization for either non streaming case or
> > non two_phase case, there can be another issue.
> > We don't free the allocated memory for this data, right ?
> > There's only one place to use free in the entire patch, which is in
> > the pgoutput_commit_txn(). So, corresponding free of memory looked
> > necessary in the two phase commit functions.
> >
> 
> Actually it is required for begin_prepare to set the data type, so that the checks
> in the pgoutput_change can make sure that the begin prepare is sent. I've also
> added a free in commit_prepared code.
Okay, but if we choose the design that this patch takes
care of the initialization in pgoutput_begin_prepare_txn(),
we need another free in pgoutput_rollback_prepared_txn().
Could you please add some codes similar to pgoutput_commit_prepared_txn() to the same ?
If we simply execute rollback prepared for non streaming transaction,
we don't free it.


Some other new minor comments.

(a) can be "synchronous replication", instead of "Synchronous Replication"

When we have a look at the syncrep.c, we use the former usually in
a normal comment.

 /*
+ * Check if Synchronous Replication is enabled
+ */

(b) move below pgoutput_truncate two codes to the case where if nrelids > 0.

@@ -770,6 +850,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
        PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
        MemoryContext old;
        RelationSyncEntry *relentry;
        int                     i;
@@ -777,6 +858,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        Oid                *relids;
        TransactionId xid = InvalidTransactionId;

+       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+       Assert(in_streaming || txndata);
+

(c) fix indent with spaces (for the one sentence of SyncRepEnabled)

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
 }

 /*
+ * Check if Synchronous Replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+    return SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined;
+}
+
+/*

This can be detected by git am.


Best Regards,
    Takamichi Osumi


Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Sun, Jan 30, 2022 at 7:04 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
>
> On Thursday, January 27, 2022 9:57 PM Ajin Cherian <itsajin@gmail.com> wrote:
> Hi, thanks for your patch update.
>
>
> > On Wed, Jan 26, 2022 at 8:33 PM osumi.takamichi@fujitsu.com
> > <osumi.takamichi@fujitsu.com> wrote:
> > >
> > > On Tuesday, January 11, 2022 6:43 PM Ajin Cherian <itsajin@gmail.com>
> > wrote:
> > > (3) Is this patch's reponsibility to intialize the data in
> > pgoutput_begin_prepare_txn ?
> > >
> > > @@ -433,6 +487,8 @@ static void
> > >  pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
> > > ReorderBufferTXN *txn)  {
> > >         bool            send_replication_origin = txn->origin_id !=
> > InvalidRepOriginId;
> > > +       PGOutputTxnData    *txndata =
> > MemoryContextAllocZero(ctx->context,
> > > +
> > > + sizeof(PGOutputTxnData));
> > >
> > >         OutputPluginPrepareWrite(ctx, !send_replication_origin);
> > >         logicalrep_write_begin_prepare(ctx->out, txn);
> > >
> > >
> > > Even if we need this initialization for either non streaming case or
> > > non two_phase case, there can be another issue.
> > > We don't free the allocated memory for this data, right ?
> > > There's only one place to use free in the entire patch, which is in
> > > the pgoutput_commit_txn(). So, corresponding free of memory looked
> > > necessary in the two phase commit functions.
> > >
> >
> > Actually it is required for begin_prepare to set the data type, so that the checks
> > in the pgoutput_change can make sure that the begin prepare is sent. I've also
> > added a free in commit_prepared code.
> Okay, but if we choose the design that this patch takes
> care of the initialization in pgoutput_begin_prepare_txn(),
> we need another free in pgoutput_rollback_prepared_txn().
> Could you please add some codes similar to pgoutput_commit_prepared_txn() to the same ?
> If we simply execute rollback prepared for non streaming transaction,
> we don't free it.
>

Fixed.

>
> Some other new minor comments.
>
> (a) can be "synchronous replication", instead of "Synchronous Replication"
>
> When we have a look at the syncrep.c, we use the former usually in
> a normal comment.
>
>  /*
> + * Check if Synchronous Replication is enabled
> + */

Fixed.

>
> (b) move below pgoutput_truncate two codes to the case where if nrelids > 0.
>
> @@ -770,6 +850,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>                                   int nrelations, Relation relations[], ReorderBufferChange *change)
>  {
>         PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> +       PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
>         MemoryContext old;
>         RelationSyncEntry *relentry;
>         int                     i;
> @@ -777,6 +858,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>         Oid                *relids;
>         TransactionId xid = InvalidTransactionId;
>
> +       /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
> +       Assert(in_streaming || txndata);
> +
>

Fixed.

> (c) fix indent with spaces (for the one sentence of SyncRepEnabled)
>
> @@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
>  }
>
>  /*
> + * Check if Synchronous Replication is enabled
> + */
> +bool
> +SyncRepEnabled(void)
> +{
> +    return SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined;
> +}
> +
> +/*
>
> This can be detected by git am.
>

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
Hi,


Thank you for your updating the patch.

I'll quote one of the past discussions
in order to make this thread go forward or more active.
On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:
> On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> >
> > On Fri, Jul 23, 2021 at 3:39 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > >
> >
> > Let's first split the patch for prepared and non-prepared cases as
> > that will help to focus on each of them separately. BTW, why haven't
> > you considered implementing point 1b as explained by Andres in his
> > email [1]? I think we can send a keepalive message in case of
> > synchronous replication when we skip an empty transaction, otherwise,
> > it might delay in responding to transactions synchronous_commit mode.
> > I think in the tests done in the thread, it might not have been shown
> > because we are already sending keepalives too frequently. But what if
> > someone disables wal_sender_timeout or kept it to a very large value?
> > See WalSndKeepaliveIfNecessary. The other thing you might want to look
> > at is if the reason for frequent keepalives is the same as described
> > in the email [2].
> >
> 
> I have tried to address the comment here by modifying the
> ctx->update_progress callback function (WalSndUpdateProgress) provided
> for plugins. I have added an option
> by which the callback can specify if it wants to send keep_alives. And when
> the callback is called with that option set, walsender updates a flag
> force_keep_alive_syncrep.
> The Walsender in the WalSndWaitForWal for loop, checks this flag and if
> synchronous replication is enabled, then sends a keep alive.
> Currently this logic
> is added as an else to the current logic that is already there in
> WalSndWaitForWal, which is probably considered unnecessary and a source of
> the keep alive flood that you talked about. So, I can change that according to
> how that fix shapes up there. I have also added an extern function in syncrep.c
> that makes it possible for walsender to query if synchronous replication is
> turned on.
Changing the timing to send the keepalive to the decoding commit
timing didn't look impossible to me, although my suggestion
can be ad-hoc.

After the initialization of sentPtr(by confirmed_flush lsn),
sentPtr is updated from logical_decoding_ctx->reader->EndRecPtr in XLogSendLogical.
In the XLogSendLogical, we update it after we execute LogicalDecodingProcessRecord.
This order leads to the current implementation to wait the next iteration
to send a keepalive in WalSndWaitForWal.

But, I felt we can utilize end_lsn passed to ReorderBufferCommit for updating
sentPtr. The end_lsn is the lsn same as the ctx->reader->EndRecPtr,
which means advancing the timing to update the sentPtr for the commit case.
Then if the transaction is empty in synchronous mode,
send the keepalive in WalSndUpdateProgress directly,
instead of having the force_keepalive_syncrep flag and having it true.


Best Regards,
    Takamichi Osumi


RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
Hi


I'll quote one other remaining discussion of this thread again
to invoke more attentions from the community.
On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:
> On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> > Few other miscellaneous comments:
> > 1.
> > static void
> >  pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> > ReorderBufferTXN *txn,
> > - XLogRecPtr commit_lsn)
> > + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, TimestampTz
> > + prepare_time)
> >  {
> > + PGOutputTxnData    *txndata = (PGOutputTxnData *)
> txn->output_plugin_private;
> > +
> >   OutputPluginUpdateProgress(ctx);
> >
> > + /*
> > + * If the BEGIN PREPARE was not yet sent, then it means there were no
> > + * relevant changes encountered, so we can skip the COMMIT PREPARED
> > + * message too.
> > + */
> > + if (txndata)
> > + {
> > + bool skip = !txndata->sent_begin_txn; pfree(txndata);
> > + txn->output_plugin_private = NULL;
> >
> > How is this supposed to work after the restart when prepared is sent
> > before the restart and we are just sending commit_prepared after
> > restart? Won't this lead to sending commit_prepared even when the
> > corresponding prepare is not sent? Can we think of a better way to
> > deal with this?
> >
> 
> I have tried to resolve this by adding logic in worker,c to silently ignore spurious
> commit_prepareds. But this change required checking if the prepare exists on
> the subscriber before attempting the commit_prepared but the current API that
> checks this requires prepare time and transaction end_lsn. But for this I had to
> change the protocol of commit_prepared, and I understand that this would
> break backward compatibility between subscriber and publisher (you have
> raised this issue as well).
> I am not sure how else to handle this, let me know if you have any other ideas.
I feel if we don't want to change the protocol of commit_prepared,
we need to make the publisher solely judge whether the prepare was empty or not,
after the restart.

One idea I thought at the beginning was to utilize and apply
the existing mechanism to spill ReorderBufferSerializeTXN object to local disk,
by postponing the prepare txn object cleanup and when the walsender exits
and commit prepared didn't come, spilling the transaction's data,
then restoring it after the restart in the DecodePrepare.
However, this idea wasn't crash-safe fundamentally. It means,
if the publisher crashes before spilling the empty prepare transaction,
we fail to detect the prepare was empty and come down to send the commit_prepared
in the situation where the subscriber didn't get the prepare data again.
So, I thought to utilize the spill mechanism didn't work for this purpose.

Another idea would be, to create an empty file under the the pg_replslot/slotname
with a prefix different from "xid"  in the DecodePrepare before the shutdown
if the prepare was empty, and bypass the cleanup of the serialized txns
and check the existence after the restart. But, this is pretty ad-hoc and I wasn't sure
if to address the corner case of the restart has the strong enough justification
to create this new file format.

Therefore, in my humble opinion, the idea of protocol change slightly wins,
since the impact of the protocol change would not be big. We introduced
the protocol version 3 in the devel version and the number of users should be little.


Best Regards,
    Takamichi Osumi


Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Mon, Jan 31, 2022 at 6:18 PM Ajin Cherian <itsajin@gmail.com> wrote:
>

Few comments:
=============
1. Is there any particular why the patch is not skipping empty xacts
for streaming (in-progress) transactions as noted in the commit
message as well?

2.
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(txndata);

I think here you can add an assert for sent_begin_txn to be always false?

3.
+/*
+ * Send BEGIN.
+ * This is where the BEGIN is actually sent. This is called
+ * while processing the first change of the transaction.
+ */

Have an empty line between the first two lines to ensure consistency
with nearby comments. Also, the formatting of these lines appears
awkward, either run pgindent or make sure lines are not too short.

4. Do we really need to make any changes in PREPARE
transaction-related functions if can't skip in that case? I think you
can have a check if the output plugin private variable is not set then
ignore special optimization for sending begin.

-- 
With Regards,
Amit Kapila.



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Feb 16, 2022 at 8:45 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:

[ideas to skip empty prepare/commit_prepare ....]

>
> I feel if we don't want to change the protocol of commit_prepared,
> we need to make the publisher solely judge whether the prepare was empty or not,
> after the restart.
>
> One idea I thought at the beginning was to utilize and apply
> the existing mechanism to spill ReorderBufferSerializeTXN object to local disk,
> by postponing the prepare txn object cleanup and when the walsender exits
> and commit prepared didn't come, spilling the transaction's data,
> then restoring it after the restart in the DecodePrepare.
> However, this idea wasn't crash-safe fundamentally. It means,
> if the publisher crashes before spilling the empty prepare transaction,
> we fail to detect the prepare was empty and come down to send the commit_prepared
> in the situation where the subscriber didn't get the prepare data again.
> So, I thought to utilize the spill mechanism didn't work for this purpose.
>
> Another idea would be, to create an empty file under the the pg_replslot/slotname
> with a prefix different from "xid"  in the DecodePrepare before the shutdown
> if the prepare was empty, and bypass the cleanup of the serialized txns
> and check the existence after the restart. But, this is pretty ad-hoc and I wasn't sure
> if to address the corner case of the restart has the strong enough justification
> to create this new file format.
>

I think for this idea to work you need to create such an empty file
each time we skip empty prepare as the system might crash after
prepare and we won't get time to create such a file. I don't think it
is advisable to do I/O to save the network message.

> Therefore, in my humble opinion, the idea of protocol change slightly wins,
> since the impact of the protocol change would not be big. We introduced
> the protocol version 3 in the devel version and the number of users should be little.
>

There is also the cost of the additional check (whether prepared xact
exists) at the time of processing each commit prepared message. I
think if we want to go in this direction then it is better to do it
via a subscription parameter (say skip_empty_prepare_xact or something
like that) so that we can pay the additional cost of such a check
conditionally when such a parameter is set by the user. I feel for now
we can document in comments why we can't skip empty prepared
transactions and maybe as an idea(s) worth exploring to implement the
same. OTOH, if multiple agree on such a solution we can even try to
implement it and see if that works.

-- 
With Regards,
Amit Kapila.



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Wed, Feb 16, 2022 at 2:15 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
> Another idea would be, to create an empty file under the the pg_replslot/slotname
> with a prefix different from "xid"  in the DecodePrepare before the shutdown
> if the prepare was empty, and bypass the cleanup of the serialized txns
> and check the existence after the restart. But, this is pretty ad-hoc and I wasn't sure
> if to address the corner case of the restart has the strong enough justification
> to create this new file format.
>

Yes, this doesn't look very efficient.

> Therefore, in my humble opinion, the idea of protocol change slightly wins,
> since the impact of the protocol change would not be big. We introduced
> the protocol version 3 in the devel version and the number of users should be little.

Yes, but we don't want to break backward compatibility for this small
added optimization.

Amit,

I will work on your comments.

regards,
Ajin Cherian
Fujitsu Australia



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Thu, Feb 17, 2022 at 4:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Mon, Jan 31, 2022 at 6:18 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
>
> Few comments:
> =============
>

One more comment:
@@ -1546,10 +1557,11 @@ WalSndWaitForWal(XLogRecPtr loc)
  * otherwise idle, this keepalive will trigger a reply. Processing the
  * reply will update these MyWalSnd locations.
  */
- if (MyWalSnd->flush < sentPtr &&
+ if (force_keepalive_syncrep ||
+ (MyWalSnd->flush < sentPtr &&
  MyWalSnd->write < sentPtr &&
- !waiting_for_ping_response)
- WalSndKeepalive(false);
+ !waiting_for_ping_response))
+ WalSndKeepalive(false);

Will this allow syncrep to proceed in case we are skipping the
transaction? Won't we need to send a feedback message with
'requestReply' true in this case as we release syncrep waiters while
processing standby message, see
ProcessStandbyReplyMessage->SyncRepReleaseWaiters. Without
'requestReply', the subscriber might not send any message and the
syncrep won't proceed. Why do you decide to delay sending this message
till WalSndWaitForWal()? It may not be called for each transaction.

I feel we should try to device a test case to test this sync
replication mechanism such that without this particular change the
sync rep transaction waits momentarily but with this change it doesn't
wait. I am not entirely sure whether we can devise an automated test
as this is timing related issue but I guess we can at least manually
try to produce a case.

-- 
With Regards,
Amit Kapila.



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Tue, Feb 8, 2022 at 5:27 AM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
>
> On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
> > wrote:
> Changing the timing to send the keepalive to the decoding commit
> timing didn't look impossible to me, although my suggestion
> can be ad-hoc.
>
> After the initialization of sentPtr(by confirmed_flush lsn),
> sentPtr is updated from logical_decoding_ctx->reader->EndRecPtr in XLogSendLogical.
> In the XLogSendLogical, we update it after we execute LogicalDecodingProcessRecord.
> This order leads to the current implementation to wait the next iteration
> to send a keepalive in WalSndWaitForWal.
>
> But, I felt we can utilize end_lsn passed to ReorderBufferCommit for updating
> sentPtr. The end_lsn is the lsn same as the ctx->reader->EndRecPtr,
> which means advancing the timing to update the sentPtr for the commit case.
> Then if the transaction is empty in synchronous mode,
> send the keepalive in WalSndUpdateProgress directly,
> instead of having the force_keepalive_syncrep flag and having it true.
>

You have a point in that we don't need to delay sending this message
till next WalSndWaitForWal() but I don't see why we need to change
anything about update of sentPtr.


-- 
With Regards,
Amit Kapila.



RE: logical replication empty transactions

От
"osumi.takamichi@fujitsu.com"
Дата:
On Friday, February 18, 2022 6:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Tue, Feb 8, 2022 at 5:27 AM osumi.takamichi@fujitsu.com
> <osumi.takamichi@fujitsu.com> wrote:
> >
> > On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > > On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
> > > wrote:
> > Changing the timing to send the keepalive to the decoding commit
> > timing didn't look impossible to me, although my suggestion can be
> > ad-hoc.
> >
> > After the initialization of sentPtr(by confirmed_flush lsn), sentPtr
> > is updated from logical_decoding_ctx->reader->EndRecPtr in
> XLogSendLogical.
> > In the XLogSendLogical, we update it after we execute
> LogicalDecodingProcessRecord.
> > This order leads to the current implementation to wait the next
> > iteration to send a keepalive in WalSndWaitForWal.
> >
> > But, I felt we can utilize end_lsn passed to ReorderBufferCommit for
> > updating sentPtr. The end_lsn is the lsn same as the
> > ctx->reader->EndRecPtr, which means advancing the timing to update the
> sentPtr for the commit case.
> > Then if the transaction is empty in synchronous mode, send the
> > keepalive in WalSndUpdateProgress directly, instead of having the
> > force_keepalive_syncrep flag and having it true.
> >
> 
> You have a point in that we don't need to delay sending this message till next
> WalSndWaitForWal() but I don't see why we need to change anything about
> update of sentPtr.
Yeah, you're right.
Now I think we don't need the update of sentPtr to send a keepalive.

I thought we can send a keepalive message
after its update in XLogSendLogical or any appropriate place for it after the existing update.


Best Regards,
    Takamichi Osumi


Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Fri, Feb 18, 2022 at 3:06 PM osumi.takamichi@fujitsu.com
<osumi.takamichi@fujitsu.com> wrote:
>
> On Friday, February 18, 2022 6:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> > On Tue, Feb 8, 2022 at 5:27 AM osumi.takamichi@fujitsu.com
> > <osumi.takamichi@fujitsu.com> wrote:
> > >
> > > On Friday, August 13, 2021 8:01 PM Ajin Cherian <itsajin@gmail.com> wrote:
> > > > On Mon, Aug 2, 2021 at 7:20 PM Amit Kapila <amit.kapila16@gmail.com>
> > > > wrote:
> > > Changing the timing to send the keepalive to the decoding commit
> > > timing didn't look impossible to me, although my suggestion can be
> > > ad-hoc.
> > >
> > > After the initialization of sentPtr(by confirmed_flush lsn), sentPtr
> > > is updated from logical_decoding_ctx->reader->EndRecPtr in
> > XLogSendLogical.
> > > In the XLogSendLogical, we update it after we execute
> > LogicalDecodingProcessRecord.
> > > This order leads to the current implementation to wait the next
> > > iteration to send a keepalive in WalSndWaitForWal.
> > >
> > > But, I felt we can utilize end_lsn passed to ReorderBufferCommit for
> > > updating sentPtr. The end_lsn is the lsn same as the
> > > ctx->reader->EndRecPtr, which means advancing the timing to update the
> > sentPtr for the commit case.
> > > Then if the transaction is empty in synchronous mode, send the
> > > keepalive in WalSndUpdateProgress directly, instead of having the
> > > force_keepalive_syncrep flag and having it true.
> > >
> >
> > You have a point in that we don't need to delay sending this message till next
> > WalSndWaitForWal() but I don't see why we need to change anything about
> > update of sentPtr.
> Yeah, you're right.
> Now I think we don't need the update of sentPtr to send a keepalive.
>
> I thought we can send a keepalive message
> after its update in XLogSendLogical or any appropriate place for it after the existing update.
>

Yeah, I think there could be multiple ways (a) We can send such a keep
alive in WalSndUpdateProgress() itself by using ctx->write_location.
For this, we need to modify WalSndKeepalive() to take sentPtr as
input. (b) set some flag in WalSndUpdateProgress() and then do it
somewhere in WalSndLoop probably in WalSndKeepaliveIfNecessary, or
maybe there is another better way.

-- 
With Regards,
Amit Kapila.



Re: logical replication empty transactions

От
Peter Smith
Дата:
FYI - the latest v18 patch no longer applies due to a recent push [1].

------
[1] https://github.com/postgres/postgres/commit/52e4f0cd472d39d07732b99559989ea3b615be78

Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Thu, Feb 17, 2022 at 9:42 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Mon, Jan 31, 2022 at 6:18 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
>
> Few comments:
> =============
> 1. Is there any particular why the patch is not skipping empty xacts
> for streaming (in-progress) transactions as noted in the commit
> message as well?
>

I have added support for skipping streaming transaction.

> 2.
> +static void
> +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
> +{
>   bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + Assert(txndata);
>
> I think here you can add an assert for sent_begin_txn to be always false?
>

Added.

> 3.
> +/*
> + * Send BEGIN.
> + * This is where the BEGIN is actually sent. This is called
> + * while processing the first change of the transaction.
> + */
>
> Have an empty line between the first two lines to ensure consistency
> with nearby comments. Also, the formatting of these lines appears
> awkward, either run pgindent or make sure lines are not too short.
>

Changed.

> 4. Do we really need to make any changes in PREPARE
> transaction-related functions if can't skip in that case? I think you
> can have a check if the output plugin private variable is not set then
> ignore special optimization for sending begin.
>

I have modified this as well.

I have also rebased the patch after it did not apply due to a new commit.

I will next work on testing and improving the keepalive logic while
skipping transactions.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

RE: logical replication empty transactions

От
"wangw.fnst@fujitsu.com"
Дата:
On Feb, Wed 23, 2022 at 10:58 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
Few comments to V19-0001:

1. I think we should adjust the alignment format.
git am ../v19-0001-Skip-empty-transactions-for-logical-replication.patch
.git/rebase-apply/patch:197: indent with spaces.
    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
.git/rebase-apply/patch:198: indent with spaces.
    * is sent. If not, send now.
.git/rebase-apply/patch:199: indent with spaces.
    */
.git/rebase-apply/patch:201: indent with spaces.
       pgoutput_send_stream_start(ctx, toptxn);
.git/rebase-apply/patch:204: indent with spaces.
       pgoutput_begin(ctx, toptxn);
warning: 5 lines add whitespace errors.

2. Structure member initialization.
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+    PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+                                                         sizeof(PGOutputTxnData));
+
+    txndata->sent_begin_txn = false;
+    txn->output_plugin_private = txndata;
+}
Do we need to set sent_stream_start and sent_any_stream to false here?

3. Maybe we should add Assert(txndata) like function pgoutput_commit_txn in
other functions.

4. In addition, I think we should keep a unified style.
a). log style (maybe first one is better.)
First style  : "Skipping replication of an empty transaction in XXX"
Second style : "skipping replication of an empty transaction"
b) flag name (maybe second one is better.)
First style  : variable "sent_begin_txn" in function pgoutput_stream_*.
Second style : variable "skip" in function pgoutput_commit_txn.


Regards,
Wang wei

Re: logical replication empty transactions

От
Peter Smith
Дата:
Hi. Here are my review comments for the v19 patch.

======

1. Commit message

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications).

SUGGESTION
"to subscriber even though" --> "to the subscriber even if"

~~~

2. Commit message

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN/COMMIT messages for transactions that are empty.

SUGGESTION
"if there is" --> "if there was"
"do not send COMMIT message" --> "do not send the COMMIT message"
"It means that pgoutput" --> "This means that pgoutput"

~~~

3. Commit message

Shouldn't there be some similar description about using a lazy send
mechanism for STREAM START?

~~~

4. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+   bool sent_begin_txn;    /* flag indicating whether BEGIN has been sent */
+   bool sent_stream_start; /* flag indicating if stream start has been sent */
+   bool sent_any_stream;   /* flag indicating if any stream has been sent */
+} PGOutputTxnData;
+

The struct comment looks stale because it doesn't mention anything
about the similar lazy send mechanism for STREAM_START.

~~~

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+ PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txndata->sent_begin_txn = false;
+ txn->output_plugin_private = txndata;
+}

You don’t need to assign the other members 'sent_stream_start',
'sent_any_stream' because you are doing MemoryContextAllocZero anyway,
but for the same reason you did not really need to assign the
'sent_begin_txn' flag either.

I guess for consistency maybe it is better to (a)  set all of them or
(b) set none of them. I prefer (b).

~~~

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

I feel the 'pgoutput_begin' function is not well named. It makes some
of the code where they are called look quite confusing.

For streaming there is:
1. pgoutput_stream_start (does not send)
2. pgoutput_send_stream_start (does send)
so it is very clear.

OTOH there are
3. pgoutput_begin_txn (does not send)
4. pgoutput_begin (does send)

For consistency I think the 'pgoutput_begin' name should be changed to
include "send" verb
1. pgoutput_begin_txn (does not send)
2. pgoutput_send_begin_txn (does send)

~~~

7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  if (schema_sent)
  return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+   /*
+    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+    * is sent. If not, send now.
+    */
+   if (in_streaming && !txndata->sent_stream_start)
+       pgoutput_send_stream_start(ctx, toptxn);
+   else if (txndata && !txndata->sent_begin_txn)
+   {
+       pgoutput_begin(ctx, toptxn);
+   }
+

How come the in_streaming case is not checking for a NULL txndata
before referencing it? Even if it is OK to do that, some more comments
or assertions might help for this piece of code.
(Stop-Press: see later comments #9, #10)

~~~

8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  if (schema_sent)
  return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;
+
+   /*
+    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+    * is sent. If not, send now.
+    */

What part of this code is doing anything about "BEGIN PREPARE" ?

~~~

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Assert(false);
  }

+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * Output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);
+

The above code fragment looks more like what IU was expecting should
be in 'maybe_send_schema',

If you expand it out (and tweak the comments) it can become much less
complex looking IMO

e.g.

if (in_streaming)
{
/* If streaming, send STREAM START if we haven't yet */
if (txndata && !txndata->sent_stream_start)
pgoutput_send_stream_start(ctx, txn);
}
else
{
/* If not streaming, send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_begin(ctx, txn);
}

Also, IIUC for the 'in_streaming' case you can Assert(txndata); so
then the code can be made even simpler.

~~~

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

@ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

  if (nrelids > 0)
  {
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);

So now I have seen almost identical code repeated in 3 places so I am
beginning to think these should just be encapsulated in some common
function to call to do the deferred "send". Thoughts?

~~~

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  if (in_streaming)
  xid = txn->xid;

+ /*
+ * Output BEGIN if we haven't yet.
+ * Avoid for streaming and non-transactional messages.
+ */
+ if (in_streaming || transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ else if (transactional)
+ {
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_begin(ctx, txn);
+ }
+ }

Does that comment at the top of that code fragment accurately match
this code? It seemed a bit muddled/stale to me.

~~~

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start

  /*
+ * Don't actually send stream start here, instead set a flag that indicates
+ * that stream start hasn't been sent and wait for the first actual change
+ * for this stream to be sent and then send stream start. This is done
+ * to avoid sending empty streams without any changes.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
+ txndata->sent_begin_txn = false;
+ txndata->sent_any_stream = false;
+ txn->output_plugin_private = txndata;
+ }

IMO there is no need to set the members – just let the
MemoryContextAllocZero take care of all that. Then the code is simpler
and it also saves wondering if anything was accidentally missed.

~~~

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

+pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+   ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+
+ /*
  * If we already sent the first stream for this transaction then don't
  * send the origin id in the subsequent streams.
  */
- if (rbtxn_is_streamed(txn))
+ if (txndata->sent_any_stream)
  send_replication_origin = false;

Given this usage, I wonder if there is a better name for the txndata
member - e.g. 'sent_first_stream' ?

~~~

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

- /* we're streaming a chunk of transaction now */
- in_streaming = true;
+ /*
+ * Set the flags that indicate that changes were sent as part of
+ * the transaction and the stream.
+ */
+ txndata->sent_begin_txn = txndata->sent_stream_start = true;
+ txndata->sent_any_stream = true;

Why is this setting member 'sent_begin_txn' true also? It seems odd to
say so because the BEGIN was not actually sent at all, right?

~~~

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort

@@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,

  /* determine the toplevel transaction */
  toptxn = (txn->toptxn) ? txn->toptxn : txn;
+ txndata = toptxn->output_plugin_private;
+ sent_begin_txn = txndata->sent_begin_txn;
+
+ if (txn->toptxn == NULL)
+ {
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
+ return;
+ }

I didn't really understand why this code is checking the
'sent_begin_txn' member instead of the 'sent_stream_start' member?

~~~

16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit

@@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct
LogicalDecodingContext *ctx,
  Assert(!in_streaming);
  Assert(rbtxn_is_streamed(txn));

- OutputPluginUpdateProgress(ctx);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ /* If no changes were part of this transaction then drop the commit */
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
+ return;
+ }

(Same as previous comment #15). I didn't really understand why this
code is checking the 'sent_begin_txn' member instead of the
'sent_stream_start' member?

~~~

17. src/backend/replication/syncrep.c - SyncRepEnabled

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
 }

 /*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;
+}

That code was once inline in 'SyncRepWaitForLSN' before it was turned
into a function, and there is a long comment in SyncRepWaitForLSN
describing the risks of this logic. e.g.

<quote>
... If it's true, we need to check it again
* later while holding the lock, to check the flag and operate the sync
* rep queue atomically. This is necessary to avoid the race condition
* described in SyncRepUpdateSyncStandbysDefined().
</quote>

This same function is now called from walsender.c. I think maybe it is
OK but please confirm it.

Anyway, the point is maybe this SyncRepEnabled function should be
better commented to make some reference about the race concerns of the
original comment. Otherwise some future caller of this function may be
unaware of it and come to grief.

-------
Kind Regards,
Peter Smith.
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Fri, Feb 18, 2022 at 9:27 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
>
> Yeah, I think there could be multiple ways (a) We can send such a keep
> alive in WalSndUpdateProgress() itself by using ctx->write_location.
> For this, we need to modify WalSndKeepalive() to take sentPtr as
> input. (b) set some flag in WalSndUpdateProgress() and then do it
> somewhere in WalSndLoop probably in WalSndKeepaliveIfNecessary, or
> maybe there is another better way.
>

Thanks for the suggestion Amit and Osumi-san, I experimented with both
the suggestions but finally decided to use
 (a)Modifying WalSndKeepalive() to take an LSN optionally as input and
passed in the  ctx->write_location.

I also verified that if I block the WalSndKeepalive() in
WalSndWaitForWal, then my new code sends the keepalive
when skipping transactions and the syncrep gets back feedback..

I will address comments from Peter and Wang in my next patch update.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Fri, Feb 25, 2022 at 9:17 PM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Hi. Here are my review comments for the v19 patch.
>
> ======
>
> 1. Commit message
>
> The current logical replication behavior is to send every transaction to
> subscriber even though the transaction is empty (because it does not
> contain changes from the selected publications).
>
> SUGGESTION
> "to subscriber even though" --> "to the subscriber even if"

Fixed.

>
> ~~~
>
> 2. Commit message
>
> This patch addresses the above problem by postponing the BEGIN message
> until the first change. While processing a COMMIT message,
> if there is no other change for that transaction,
> do not send COMMIT message. It means that pgoutput will
> skip BEGIN/COMMIT messages for transactions that are empty.
>
> SUGGESTION
> "if there is" --> "if there was"
> "do not send COMMIT message" --> "do not send the COMMIT message"
> "It means that pgoutput" --> "This means that pgoutput"
>
> ~~~

Fixed.

>
> 3. Commit message
>
> Shouldn't there be some similar description about using a lazy send
> mechanism for STREAM START?
>
> ~~~

Added.

>
> 4. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData
>
> +/*
> + * Maintain a per-transaction level variable to track whether the
> + * transaction has sent BEGIN. BEGIN is only sent when the first
> + * change in a transaction is processed. This makes it possible
> + * to skip transactions that are empty.
> + */
> +typedef struct PGOutputTxnData
> +{
> +   bool sent_begin_txn;    /* flag indicating whether BEGIN has been sent */
> +   bool sent_stream_start; /* flag indicating if stream start has been sent */
> +   bool sent_any_stream;   /* flag indicating if any stream has been sent */
> +} PGOutputTxnData;
> +
>
> The struct comment looks stale because it doesn't mention anything
> about the similar lazy send mechanism for STREAM_START.
>
> ~~~

Added.

>
> 5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn
>
>  static void
>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
> + PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
> + sizeof(PGOutputTxnData));
> +
> + txndata->sent_begin_txn = false;
> + txn->output_plugin_private = txndata;
> +}
>
> You don’t need to assign the other members 'sent_stream_start',
> 'sent_any_stream' because you are doing MemoryContextAllocZero anyway,
> but for the same reason you did not really need to assign the
> 'sent_begin_txn' flag either.
>
> I guess for consistency maybe it is better to (a)  set all of them or
> (b) set none of them. I prefer (b).
>
> ~~~

Did (b)


>
> 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin
>
> I feel the 'pgoutput_begin' function is not well named. It makes some
> of the code where they are called look quite confusing.
>
> For streaming there is:
> 1. pgoutput_stream_start (does not send)
> 2. pgoutput_send_stream_start (does send)
> so it is very clear.
>
> OTOH there are
> 3. pgoutput_begin_txn (does not send)
> 4. pgoutput_begin (does send)
>
> For consistency I think the 'pgoutput_begin' name should be changed to
> include "send" verb
> 1. pgoutput_begin_txn (does not send)
> 2. pgoutput_send_begin_txn (does send)
>
> ~~~

Changed as mentioned.

>
> 7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema
>
> @@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
>   if (schema_sent)
>   return;
>
> +   /* set up txndata */
> +   txndata = toptxn->output_plugin_private;
> +
> +   /*
> +    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
> +    * is sent. If not, send now.
> +    */
> +   if (in_streaming && !txndata->sent_stream_start)
> +       pgoutput_send_stream_start(ctx, toptxn);
> +   else if (txndata && !txndata->sent_begin_txn)
> +   {
> +       pgoutput_begin(ctx, toptxn);
> +   }
> +
>
> How come the in_streaming case is not checking for a NULL txndata
> before referencing it? Even if it is OK to do that, some more comments
> or assertions might help for this piece of code.
> (Stop-Press: see later comments #9, #10)
>
> ~~~

Updated.

>
> 8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema
>
> @@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
>   if (schema_sent)
>   return;
>
> +   /* set up txndata */
> +   txndata = toptxn->output_plugin_private;
> +
> +   /*
> +    * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
> +    * is sent. If not, send now.
> +    */
>
> What part of this code is doing anything about "BEGIN PREPARE" ?
>
> ~~~

Removed that reference.

>
> 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change
>
> @@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   Assert(false);
>   }
>
> + /* If streaming, send STREAM START if we haven't yet */
> + if (in_streaming && (txndata && !txndata->sent_stream_start))
> + pgoutput_send_stream_start(ctx, txn);
> + /*
> + * Output BEGIN if we haven't yet, unless streaming.
> + */
> + else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
> + pgoutput_begin(ctx, txn);
> +
>
> The above code fragment looks more like what IU was expecting should
> be in 'maybe_send_schema',
>
> If you expand it out (and tweak the comments) it can become much less
> complex looking IMO
>
> e.g.
>
> if (in_streaming)
> {
> /* If streaming, send STREAM START if we haven't yet */
> if (txndata && !txndata->sent_stream_start)
> pgoutput_send_stream_start(ctx, txn);
> }
> else
> {
> /* If not streaming, send BEGIN if we haven't yet */
> if (txndata && !txndata->sent_begin_txn)
> pgoutput_begin(ctx, txn);
> }
>
> Also, IIUC for the 'in_streaming' case you can Assert(txndata); so
> then the code can be made even simpler.
>

Chose your example.

> ~~~
>
> 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate
>
> @ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
>   if (nrelids > 0)
>   {
> + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + /* If streaming, send STREAM START if we haven't yet */
> + if (in_streaming && (txndata && !txndata->sent_stream_start))
> + pgoutput_send_stream_start(ctx, txn);
> + /*
> + * output BEGIN if we haven't yet, unless streaming.
> + */
> + else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
> + pgoutput_begin(ctx, txn);
>
> So now I have seen almost identical code repeated in 3 places so I am
> beginning to think these should just be encapsulated in some common
> function to call to do the deferred "send". Thoughts?
>
> ~~~

Not sure if we want to add a function call overhead.

>
> 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message
>
> @@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   if (in_streaming)
>   xid = txn->xid;
>
> + /*
> + * Output BEGIN if we haven't yet.
> + * Avoid for streaming and non-transactional messages.
> + */
> + if (in_streaming || transactional)
> + {
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + /* If streaming, send STREAM START if we haven't yet */
> + if (in_streaming && (txndata && !txndata->sent_stream_start))
> + pgoutput_send_stream_start(ctx, txn);
> + else if (transactional)
> + {
> + if (txndata && !txndata->sent_begin_txn)
> + pgoutput_begin(ctx, txn);
> + }
> + }
>
> Does that comment at the top of that code fragment accurately match
> this code? It seemed a bit muddled/stale to me.
>
> ~~~

Fixed.

>
> 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start
>
>   /*
> + * Don't actually send stream start here, instead set a flag that indicates
> + * that stream start hasn't been sent and wait for the first actual change
> + * for this stream to be sent and then send stream start. This is done
> + * to avoid sending empty streams without any changes.
> + */
> + if (txndata == NULL)
> + {
> + txndata =
> + MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
> + txndata->sent_begin_txn = false;
> + txndata->sent_any_stream = false;
> + txn->output_plugin_private = txndata;
> + }
>
> IMO there is no need to set the members – just let the
> MemoryContextAllocZero take care of all that. Then the code is simpler
> and it also saves wondering if anything was accidentally missed.
>

Fixed.

> ~~~
>
> 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start
>
> +pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
> +   ReorderBufferTXN *txn)
> +{
> + bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> +
> + /*
>   * If we already sent the first stream for this transaction then don't
>   * send the origin id in the subsequent streams.
>   */
> - if (rbtxn_is_streamed(txn))
> + if (txndata->sent_any_stream)
>   send_replication_origin = false;
>
> Given this usage, I wonder if there is a better name for the txndata
> member - e.g. 'sent_first_stream' ?
>
> ~~~

Changed.

>
> 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start
>
> - /* we're streaming a chunk of transaction now */
> - in_streaming = true;
> + /*
> + * Set the flags that indicate that changes were sent as part of
> + * the transaction and the stream.
> + */
> + txndata->sent_begin_txn = txndata->sent_stream_start = true;
> + txndata->sent_any_stream = true;
>
> Why is this setting member 'sent_begin_txn' true also? It seems odd to
> say so because the BEGIN was not actually sent at all, right?
>
> ~~~

You can have transactions that are partially streamed and partially
not. So if there
is a transaction that started as streaming, but when it is committed,
it is replicated
as part of the commit, then when the changes are decoded, we shouldn't
be sending a "begin"
again.

>
> 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort
>
> @@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
>
>   /* determine the toplevel transaction */
>   toptxn = (txn->toptxn) ? txn->toptxn : txn;
> + txndata = toptxn->output_plugin_private;
> + sent_begin_txn = txndata->sent_begin_txn;
> +
> + if (txn->toptxn == NULL)
> + {
> + pfree(txndata);
> + txn->output_plugin_private = NULL;
> + }
> +
> + if (!sent_begin_txn)
> + {
> + elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
> + return;
> + }
>
> I didn't really understand why this code is checking the
> 'sent_begin_txn' member instead of the 'sent_stream_start' member?
>

Yes, changed this to check "sent_first_stream"
> ~~~
>
> 16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit
>
> @@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct
> LogicalDecodingContext *ctx,
>   Assert(!in_streaming);
>   Assert(rbtxn_is_streamed(txn));
>
> - OutputPluginUpdateProgress(ctx);
> + pfree(txndata);
> + txn->output_plugin_private = NULL;
> +
> + /* If no changes were part of this transaction then drop the commit */
> + if (!sent_begin_txn)
> + {
> + elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
> + return;
> + }
>
> (Same as previous comment #15). I didn't really understand why this
> code is checking the 'sent_begin_txn' member instead of the
> 'sent_stream_start' member?
>
> ~~~

Changed.

>
> 17. src/backend/replication/syncrep.c - SyncRepEnabled
>
> @@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
>  }
>
>  /*
> + * Check if synchronous replication is enabled
> + */
> +bool
> +SyncRepEnabled(void)
> +{
> + return SyncRepRequested() && ((volatile WalSndCtlData *)
> WalSndCtl)->sync_standbys_defined;
> +}
>
> That code was once inline in 'SyncRepWaitForLSN' before it was turned
> into a function, and there is a long comment in SyncRepWaitForLSN
> describing the risks of this logic. e.g.
>
> <quote>
> ... If it's true, we need to check it again
> * later while holding the lock, to check the flag and operate the sync
> * rep queue atomically. This is necessary to avoid the race condition
> * described in SyncRepUpdateSyncStandbysDefined().
> </quote>
>
> This same function is now called from walsender.c. I think maybe it is
> OK but please confirm it.
>
> Anyway, the point is maybe this SyncRepEnabled function should be
> better commented to make some reference about the race concerns of the
> original comment. Otherwise some future caller of this function may be
> unaware of it and come to grief.
>

Leaving this for now, not sure what wording is appropriate to use here.

On Wed, Feb 23, 2022 at 5:24 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:
>
> On Feb, Wed 23, 2022 at 10:58 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
> Few comments to V19-0001:
>
> 1. I think we should adjust the alignment format.
> git am ../v19-0001-Skip-empty-transactions-for-logical-replication.patch
> .git/rebase-apply/patch:197: indent with spaces.
>     * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
> .git/rebase-apply/patch:198: indent with spaces.
>     * is sent. If not, send now.
> .git/rebase-apply/patch:199: indent with spaces.
>     */
> .git/rebase-apply/patch:201: indent with spaces.
>        pgoutput_send_stream_start(ctx, toptxn);
> .git/rebase-apply/patch:204: indent with spaces.
>        pgoutput_begin(ctx, toptxn);
> warning: 5 lines add whitespace errors.

Fixed.


>
> 2. Structure member initialization.
>  static void
>  pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>  {
> +       PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
> +
sizeof(PGOutputTxnData));
> +
> +       txndata->sent_begin_txn = false;
> +       txn->output_plugin_private = txndata;
> +}
> Do we need to set sent_stream_start and sent_any_stream to false here?

Fixed

>
> 3. Maybe we should add Assert(txndata) like function pgoutput_commit_txn in
> other functions.
>
> 4. In addition, I think we should keep a unified style.
> a). log style (maybe first one is better.)
> First style  : "Skipping replication of an empty transaction in XXX"
> Second style : "skipping replication of an empty transaction"
> b) flag name (maybe second one is better.)
> First style  : variable "sent_begin_txn" in function pgoutput_stream_*.
> Second style : variable "skip" in function pgoutput_commit_txn.
>

Fixed,

Regards,
Ajin Cherian
Fujitsu Australia

Вложения

RE: logical replication empty transactions

От
"shiy.fnst@fujitsu.com"
Дата:
Hi,

Here are some comments on the v21 patch.

1.
+            WalSndKeepalive(false, 0);

Maybe we can use InvalidXLogRecPtr here, instead of 0.

2.
+    pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);

Similarly, should we use XLogRecPtrIsInvalid()?

3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
             Assert(false);
     }
 
+   if (in_streaming)
+    {
+        /* If streaming, send STREAM START if we haven't yet */
+        if (txndata && !txndata->sent_stream_start)
+        pgoutput_send_stream_start(ctx, txn);
+    }
+    else
+    {
+        /* If not streaming, send BEGIN if we haven't yet */
+        if (txndata && !txndata->sent_begin_txn)
+        pgoutput_send_begin(ctx, txn);
+    }
+
+
     /* Avoid leaking memory by using and resetting our own context */
     old = MemoryContextSwitchTo(data->context);


I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.

4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
                             ReorderBufferTXN *txn,
                             XLogRecPtr prepare_lsn)
 {
+    PGOutputTxnData *txndata = txn->output_plugin_private;
+    bool            sent_begin_txn = txndata->sent_begin_txn;
+
     Assert(rbtxn_is_streamed(txn));
 
-    OutputPluginUpdateProgress(ctx);
+    pfree(txndata);
+    txn->output_plugin_private = NULL;
+
+    if (!sent_begin_txn)
+    {
+        elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+        return;
+    }
+
+    OutputPluginUpdateProgress(ctx, false);
     OutputPluginPrepareWrite(ctx, true);
     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
     OutputPluginWrite(ctx, true);

I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.

For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)

-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;

-- subscriber 
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);

-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';

The error message in subscriber log:
ERROR:  prepared transaction with identifier "pg_gid_16391_722" does not exist


Regards,
Shi yu

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Wed, Mar 2, 2022 at 1:01 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> 4.
> @@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
>                                                         ReorderBufferTXN *txn,
>                                                         XLogRecPtr prepare_lsn)
>  {
> +       PGOutputTxnData *txndata = txn->output_plugin_private;
> +       bool                    sent_begin_txn = txndata->sent_begin_txn;
> +
>         Assert(rbtxn_is_streamed(txn));
>
> -       OutputPluginUpdateProgress(ctx);
> +       pfree(txndata);
> +       txn->output_plugin_private = NULL;
> +
> +       if (!sent_begin_txn)
> +       {
> +               elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
> +               return;
> +       }
> +
> +       OutputPluginUpdateProgress(ctx, false);
>         OutputPluginPrepareWrite(ctx, true);
>         logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
>         OutputPluginWrite(ctx, true);
>
> I notice that the patch skips stream prepared transaction, this would cause an
> error on subscriber side when committing this transaction on publisher side, so
> I think we'd better not do that.
>
> For example:
> (set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
> postgresql.conf)
>
> -- publisher
> create table test (a int, b text, primary key(a));
> create table test2 (a int, b text, primary key(a));
> create publication pub for table test;
>
> -- subscriber
> create table test (a int, b text, primary key(a));
> create table test2 (a int, b text, primary key(a));
> create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);
>
> -- publisher
> begin;
> INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
> prepare transaction 't';
> commit prepared 't';
>
> The error message in subscriber log:
> ERROR:  prepared transaction with identifier "pg_gid_16391_722" does not exist
>

Thanks for the test. I guess this mixed streaming+two-phase runs into
the same problem that
was there while skipping two-phased transactions. If the eventual
commit prepared comes after a restart,
then there is no way of knowing if the original transaction was
skipped or not and we can't know if the commit prepared
needs to be sent. I tried not skipping the "stream prepare", but that
causes a crash in the apply worker
as it tries to find the non-existent streamed file. We could add logic
to silently ignore a spurious "stream prepare"
but that might not be ideal. Any thoughts on how to address this? Or
else, we will need to avoid skipping streamed
transactions as well.

regards,
Ajin Cherian
Fujitsu Australia



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Wed, Mar 2, 2022 at 1:01 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> Hi,
>
> Here are some comments on the v21 patch.
>
> 1.
> +                       WalSndKeepalive(false, 0);
>
> Maybe we can use InvalidXLogRecPtr here, instead of 0.
>

Fixed.

> 2.
> +       pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);
>
> Similarly, should we use XLogRecPtrIsInvalid()?

Fixed

>
> 3.
> @@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>                         Assert(false);
>         }
>
> +   if (in_streaming)
> +       {
> +               /* If streaming, send STREAM START if we haven't yet */
> +               if (txndata && !txndata->sent_stream_start)
> +               pgoutput_send_stream_start(ctx, txn);
> +       }
> +       else
> +       {
> +               /* If not streaming, send BEGIN if we haven't yet */
> +               if (txndata && !txndata->sent_begin_txn)
> +               pgoutput_send_begin(ctx, txn);
> +       }
> +
> +
>         /* Avoid leaking memory by using and resetting our own context */
>         old = MemoryContextSwitchTo(data->context);
>
>
> I am not sure if it is suitable to send begin or stream_start here, because the
> row filter is not checked yet. That means, empty transactions caused by row
> filter are not skipped.
>

Moved the check down, so that row_filters are taken into account.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
I have split the patch into two. I have kept the logic of skipping
streaming changes in the second patch.
I will work on the second patch once we can figure out a solution for
the COMMIT PREPARED after restart problem.

regards,
Ajin Cherian

Вложения

Re: logical replication empty transactions

От
Peter Smith
Дата:
On Fri, Mar 4, 2022 at 12:41 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> I have split the patch into two. I have kept the logic of skipping
> streaming changes in the second patch.
> I will work on the second patch once we can figure out a solution for
> the COMMIT PREPARED after restart problem.
>

Please see below my review comments for the first patch only (v23-0001)

======

1. Patch failed to apply cleanly - whitespace warnings.

git apply ../patches_misc/v23-0001-Skip-empty-transactions-for-logical-replication.patch
../patches_misc/v23-0001-Skip-empty-transactions-for-logical-replication.patch:68:
trailing whitespace.
 * change in a transaction is processed. This makes it possible
warning: 1 line adds whitespace errors.

~~~

2. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData

I felt that this comment is describing details all about its bool
member but I think maybe it should be describing something also about
the structure itself (because this is the structure comment). E.g. it
should say about it only being allocated by the pgoutput_begin_txn()
and it is accessible via txn->output_plugin_private. Maybe also say
this has subtle implications if this is NULL then it means the tx
can't be 2PC etc...

~~~

3. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_begin

+/*
+ * Send BEGIN.
+ *
+ * This is where the BEGIN is actually sent. This is called while processing
+ * the first change of the transaction.
+ */
+static void
+pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

IMO there is no need to repeat "This is where the BEGIN is actually
sent.", because "Send BEGIN." already said the same thing :-)

~~~

4. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_txn

+ /*
+ * If a BEGIN message was not yet sent, then it means there were no relevant
+ * changes encountered, so we can skip the COMMIT message too.
+ */
+ sent_begin_txn = txndata->sent_begin_txn;
+ txn->output_plugin_private = NULL;
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+
+ pfree(txndata);

Not quite sure why this pfree is positioned where it is (after that
function call). I felt this should be a couple of lines up so txndata
is freed as soon as you had no more use for it (i.e. after you copied
the bool from it)

~~~

5. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +658,13 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  if (schema_sent)
  return;

+   /* set up txndata */
+   txndata = toptxn->output_plugin_private;

The comment does quite feel right. Nothing is "setting up" anything.
Really, all this does is assign a reference to the tx private data.
Probably better with no comment at all?

~~~

6. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

I observed that every call to the maybe_send_schema function also has
adjacent code that already/always is checking to call
pgoutput_send_begin_tx function.

So then I am wondering is the added logic to the maybe_send_schema
even needed at all? It looks a bit redundant. Thoughts?

~~~

7. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1141,6 +1212,7 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  Relation relation, ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
  MemoryContext old;

Maybe if is worth deferring this assignment until after the row-filter
check. Otherwise, you are maybe doing it for nothing and IIRC this is
hot code so the less you do here the better. OTOH a single assignment
probably amounts to almost nothing.

~~~

8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1354,6 +1438,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
    int nrelations, Relation relations[], ReorderBufferChange *change)
 {
  PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
  MemoryContext old;

This variable declaration should be done later in the block where it
is assigned.

~~~

9. src/backend/replication/pgoutput/pgoutput.c - suggestion

I notice there is quite a few places in the patch that look like:

+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+

It might be worth considering encapsulating all those in a helper function like:
pgoutput_maybe_send_begin(ctx, txn)

It would certainly be a lot tidier.

~~~

10. src/backend/replication/syncrep.c - SyncRepEnabled

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
 }

 /*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)

Missing period for that function comment.

------
Kind Regards,
Peter Smith.
Fujitsu Australia



RE: logical replication empty transactions

От
"shiy.fnst@fujitsu.com"
Дата:
On Fri, Mar 4, 2022 9:41 AM Ajin Cherian <itsajin@gmail.com> wrote:
> 
> I have split the patch into two. I have kept the logic of skipping
> streaming changes in the second patch.
> I will work on the second patch once we can figure out a solution for
> the COMMIT PREPARED after restart problem.
> 

Thanks for updating the patch.

A comment on v23-0001 patch.

@@ -1429,6 +1520,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     if (in_streaming)
         xid = txn->xid;
 
+    /*
+     * Output BEGIN if we haven't yet.
+     * Avoid for non-transactional messages.
+     */
+    if (in_streaming || transactional)
+    {
+        PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+        /* Send BEGIN if we haven't yet */
+        if (txndata && !txndata->sent_begin_txn)
+            pgoutput_send_begin(ctx, txn);
+    }
+
     OutputPluginPrepareWrite(ctx, true);
     logicalrep_write_message(ctx->out,
                              xid,

I think we don't need to send BEGIN if in_streaming is true, right? The first
patch doesn't skip streamed transaction, so should we modify
+    if (in_streaming || transactional)
to
+    if (!in_streaming && transactional)
?

Regards,
Shi yu


Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Mar 7, 2022 at 7:50 PM shiy.fnst@fujitsu.com
<shiy.fnst@fujitsu.com> wrote:
>
> On Fri, Mar 4, 2022 9:41 AM Ajin Cherian <itsajin@gmail.com> wrote:
> >
> > I have split the patch into two. I have kept the logic of skipping
> > streaming changes in the second patch.
> > I will work on the second patch once we can figure out a solution for
> > the COMMIT PREPARED after restart problem.
> >
>
> Thanks for updating the patch.
>
> A comment on v23-0001 patch.
>
> @@ -1429,6 +1520,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>         if (in_streaming)
>                 xid = txn->xid;
>
> +       /*
> +        * Output BEGIN if we haven't yet.
> +        * Avoid for non-transactional messages.
> +        */
> +       if (in_streaming || transactional)
> +       {
> +               PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> +               /* Send BEGIN if we haven't yet */
> +               if (txndata && !txndata->sent_begin_txn)
> +                       pgoutput_send_begin(ctx, txn);
> +       }
> +
>         OutputPluginPrepareWrite(ctx, true);
>         logicalrep_write_message(ctx->out,
>                                                          xid,
>
> I think we don't need to send BEGIN if in_streaming is true, right? The first
> patch doesn't skip streamed transaction, so should we modify
> +       if (in_streaming || transactional)
> to
> +       if (!in_streaming && transactional)
> ?
>

Fixed.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Mon, Mar 7, 2022 at 11:44 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> Fixed.
>
> regards,
> Ajin Cherian
> Fujitsu Australia

Rebased the patch and fixed some whitespace errors.
regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Mar 16, 2022 at 12:33 PM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Mon, Mar 7, 2022 at 11:44 PM Ajin Cherian <itsajin@gmail.com> wrote:
> >
> > Fixed.
> >

Review comments/suggestions:
=========================
1. Isn't it sufficient to call pgoutput_send_begin from
maybe_send_schema as that is commonplace for all others and is always
the first message we send? If so, I think we can remove it from other
places?
2. Can we write some comments to explain why we don't skip streaming
or prepared empty transactions and some possible solutions (the
protocol change and additional subscription parameter as discussed
[1]) as discussed in this thread pgoutput.c?
3. Can we add a simple test for it in one of the existing test
files(say in 001_rep_changes.pl)?
4. I think we can drop the skip streaming patch as we can't do that for now.


-- 
With Regards,
Amit Kapila.



Re: logical replication empty transactions

От
Ajin Cherian
Дата:
On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> Review comments/suggestions:
> =========================
> 1. Isn't it sufficient to call pgoutput_send_begin from
> maybe_send_schema as that is commonplace for all others and is always
> the first message we send? If so, I think we can remove it from other
> places?

I've done the other way, I've removed it from maybe_send_schema as we
always call this
prior to calling maybe_send_schema.

> 2. Can we write some comments to explain why we don't skip streaming
> or prepared empty transactions and some possible solutions (the
> protocol change and additional subscription parameter as discussed
> [1]) as discussed in this thread pgoutput.c?

I've added comment in the header of pgoutput_begin_prepare_txn() and
pgoutput_stream_start()

> 3. Can we add a simple test for it in one of the existing test
> files(say in 001_rep_changes.pl)?

added a simple test.

> 4. I think we can drop the skip streaming patch as we can't do that for now.

Dropped,

In addition, I have also added a few more comments explaining why the begin send
is delayed in pgoutput_change till row_filter is checked and also ran pgindent.

regards,
Ajin Cherian
Fujitsu Australia

Вложения

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Sat, Mar 19, 2022 at 9:10 AM Ajin Cherian <itsajin@gmail.com> wrote:
>
> On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> > 3. Can we add a simple test for it in one of the existing test
> > files(say in 001_rep_changes.pl)?
>
> added a simple test.
>

This doesn't verify if the transaction is skipped. I think we should
extend this test to check for a DEBUG message in the Logs (you need to
probably set log_min_messages to DEBUG1 for this test). As an example,
you can check the patch [1]. Also, it seems by mistake you have added
wait_for_catchup() twice.

Few other comments:
=================
1. Let's keep the parameter name as skipped_empty_xact in
OutputPluginUpdateProgress so as to not confuse with the other patch's
[2] keep_alive parameter. I think in this case we must send the
keep_alive message so as to not make the syncrep wait whereas in the
other patch we only need to send it periodically based on
wal_sender_timeout parameter.
2. The new function SyncRepEnabled() seems confusing to me as the
comments in SyncRepWaitForLSN() clearly state why we need to first
read the parameter 'sync_standbys_defined' without any lock then read
it again with a lock if the parameter is true. So, I just put that
check back and also added a similar check in WalSndUpdateProgress.
3.
@@ -1392,11 +1481,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
  continue;

  relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
  maybe_send_schema(ctx, change, relation, relentry);
  }

  if (nrelids > 0)
  {
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+

Why do we need to try sending the begin in the second check? I think
it should be sufficient to do it in the above loop.

I have made these and a number of other changes in the attached patch.
Do let me know what you think of the attached?

[1] - https://www.postgresql.org/message-id/CAA4eK1JbLRj6pSUENfDFsqj0%2BadNob_%3DRPXpnUnWFBskVi5JhA%40mail.gmail.com
[2] - https://www.postgresql.org/message-id/CAA4eK1LGnaPuWs2M4sDfpd6JQZjoh4DGAsgUvNW%3DOr8i9z6K8w%40mail.gmail.com

-- 
With Regards,
Amit Kapila.

Вложения

RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Monday, March 21, 2022 6:01 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Sat, Mar 19, 2022 at 9:10 AM Ajin Cherian <itsajin@gmail.com> wrote:
> >
> > On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> >
> > > 3. Can we add a simple test for it in one of the existing test
> > > files(say in 001_rep_changes.pl)?
> >
> > added a simple test.
> >
> 
> This doesn't verify if the transaction is skipped. I think we should
> extend this test to check for a DEBUG message in the Logs (you need to
> probably set log_min_messages to DEBUG1 for this test). As an example,
> you can check the patch [1]. Also, it seems by mistake you have added
> wait_for_catchup() twice.

I added a testcase to check the DEBUG message.

> Few other comments:
> =================
> 1. Let's keep the parameter name as skipped_empty_xact in
> OutputPluginUpdateProgress so as to not confuse with the other patch's
> [2] keep_alive parameter. I think in this case we must send the
> keep_alive message so as to not make the syncrep wait whereas in the
> other patch we only need to send it periodically based on
> wal_sender_timeout parameter.
> 2. The new function SyncRepEnabled() seems confusing to me as the
> comments in SyncRepWaitForLSN() clearly state why we need to first
> read the parameter 'sync_standbys_defined' without any lock then read
> it again with a lock if the parameter is true. So, I just put that
> check back and also added a similar check in WalSndUpdateProgress.
> 3.
> @@ -1392,11 +1481,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>   continue;
> 
>   relids[nrelids++] = relid;
> +
> + /* Send BEGIN if we haven't yet */
> + if (txndata && !txndata->sent_begin_txn)
> + pgoutput_send_begin(ctx, txn);
>   maybe_send_schema(ctx, change, relation, relentry);
>   }
> 
>   if (nrelids > 0)
>   {
> + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + /* Send BEGIN if we haven't yet */
> + if (txndata && !txndata->sent_begin_txn)
> + pgoutput_send_begin(ctx, txn);
> +
> 
> Why do we need to try sending the begin in the second check? I think
> it should be sufficient to do it in the above loop.
> 
> I have made these and a number of other changes in the attached patch.
> Do let me know what you think of the attached?

The changes look good to me.
And I did some basic tests for the patch and didn’t find some other problems.

Attach the new version patch.

Best regards,
Hou zj

Вложения

RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
> On Monday, March 21, 2022 6:01 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> >
> > On Sat, Mar 19, 2022 at 9:10 AM Ajin Cherian <itsajin@gmail.com> wrote:
> > >
> > > On Thu, Mar 17, 2022 at 10:43 PM Amit Kapila
> > > <amit.kapila16@gmail.com>
> > wrote:
> > >
> > > > 3. Can we add a simple test for it in one of the existing test
> > > > files(say in 001_rep_changes.pl)?
> > >
> > > added a simple test.
> > >
> >
> > This doesn't verify if the transaction is skipped. I think we should
> > extend this test to check for a DEBUG message in the Logs (you need to
> > probably set log_min_messages to DEBUG1 for this test). As an example,
> > you can check the patch [1]. Also, it seems by mistake you have added
> > wait_for_catchup() twice.
> 
> I added a testcase to check the DEBUG message.
> 
> > Few other comments:
> > =================
> > 1. Let's keep the parameter name as skipped_empty_xact in
> > OutputPluginUpdateProgress so as to not confuse with the other patch's
> > [2] keep_alive parameter. I think in this case we must send the
> > keep_alive message so as to not make the syncrep wait whereas in the
> > other patch we only need to send it periodically based on
> > wal_sender_timeout parameter.
> > 2. The new function SyncRepEnabled() seems confusing to me as the
> > comments in SyncRepWaitForLSN() clearly state why we need to first
> > read the parameter 'sync_standbys_defined' without any lock then read
> > it again with a lock if the parameter is true. So, I just put that
> > check back and also added a similar check in WalSndUpdateProgress.
> > 3.
> > @@ -1392,11 +1481,21 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> > ReorderBufferTXN *txn,
> >   continue;
> >
> >   relids[nrelids++] = relid;
> > +
> > + /* Send BEGIN if we haven't yet */
> > + if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx,
> > + txn);
> >   maybe_send_schema(ctx, change, relation, relentry);
> >   }
> >
> >   if (nrelids > 0)
> >   {
> > + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> > +
> > + /* Send BEGIN if we haven't yet */
> > + if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx,
> > + txn);
> > +
> >
> > Why do we need to try sending the begin in the second check? I think
> > it should be sufficient to do it in the above loop.
> >
> > I have made these and a number of other changes in the attached patch.
> > Do let me know what you think of the attached?
> 
> The changes look good to me.
> And I did some basic tests for the patch and didn’t find some other problems.
> 
> Attach the new version patch.

Oh, sorry, I posted the wrong patch, here is the correct one.

Best regards,
Hou zj

Вложения

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> > On Monday, March 21, 2022 6:01 PM Amit Kapila <amit.kapila16@gmail.com>
> > wrote:
>
> Oh, sorry, I posted the wrong patch, here is the correct one.
>

The test change looks good to me. I think additionally we can verify
that the record is not reflected in the subscriber table. Apart from
that, I had made minor changes mostly in the comments in the attached
patch. If those look okay to you, please include those in the next
version.


-- 
With Regards,
Amit Kapila.

Вложения

RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Tuesday, March 22, 2022 7:50 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > > On Monday, March 21, 2022 6:01 PM Amit Kapila
> > > <amit.kapila16@gmail.com>
> > > wrote:
> >
> > Oh, sorry, I posted the wrong patch, here is the correct one.
> >
> 
> The test change looks good to me. I think additionally we can verify that the
> record is not reflected in the subscriber table. Apart from that, I had made
> minor changes mostly in the comments in the attached patch. If those look
> okay to you, please include those in the next version.

Thanks, the changes look good to me, I merged the diff patch.

Attach the new version patch which include the following changes:

- Fix a typo
- Change the requestreply flag of the newly added WalSndKeepalive to false,
  because the subscriber can judge whether it's necessary to post a reply based
  on the received LSN.
- Add a testcase to make sure there is no data in subscriber side when the
  transaction is skipped.
- Change the name of flag skipped_empty_xact to skipped_xact which seems more
  understandable.
- Merge Amit's suggested changes.


Best regards,
Hou zj

Вложения

RE: logical replication empty transactions

От
"shiy.fnst@fujitsu.com"
Дата:
On Thursday, March 24, 2022 11:19 AM Hou, Zhijie/侯 志杰 <houzj.fnst@fujitsu.com> wrote:
> 
> Attach the new version patch which include the following changes:
> 
> - Fix a typo
> - Change the requestreply flag of the newly added WalSndKeepalive to false,
>   because the subscriber can judge whether it's necessary to post a reply
> based
>   on the received LSN.
> - Add a testcase to make sure there is no data in subscriber side when the
>   transaction is skipped.
> - Change the name of flag skipped_empty_xact to skipped_xact which seems
> more
>   understandable.
> - Merge Amit's suggested changes.
> 

Hi,

This patch skips sending BEGIN/COMMIT messages for empty transactions and saves
network bandwidth. So I tried to do a test to see how does it affect bandwidth.

This test refers to the previous test by Peter[1]. I temporarily modified the
code in worker.c to log the length of the data received by the subscriber (after
calling walrcv_receive()). At the conclusion of the test run, the logs are
processed to extract the numbers.

[1] https://www.postgresql.org/message-id/CAHut%2BPuyqcDJO0X2BxY%2B9ycF%2Bew3x77FiCbTJQGnLDbNmMASZQ%40mail.gmail.com

The number of transactions is fixed (1000), and I tested different mixes of
empty and not-empty transactions sent - 0%, 25%, 50%, 100%. The patch will send
keepalive message when skipping empty transaction in synchronous replication
mode, so I tested both synchronous replication and asynchronous replication.

The results are as follows, and attach the bar chart.

Sync replication - size of sending data
--------------------------------------------------------------------
            0%      25%     50%     75%     100%
HEAD        335211  281655  223661  170271  115108
patched     335217  256617  173878  98095   18108

Async replication - size of sending data
--------------------------------------------------------------------
            0%      25%     50%     75%     100%
HEAD        339379  285835  236343  184227  115000
patched     335077  260953  180022  113333  18126


The details of the test is also attached.

Summary of result:
In both synchronous replication mode and asynchronous replication mode, as more
empty transactions, the improvement is more obvious. Even if when there is no
empty transaction, I can't see any overhead.

Regards,
Shi yu

Вложения

RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Thursday, March 24, 2022 11:19 AM houzj.fnst@fujitsu.com wrote:
> On Tuesday, March 22, 2022 7:50 PM Amit Kapila <amit.kapila16@gmail.com>
> wrote:
> > On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > > > On Monday, March 21, 2022 6:01 PM Amit Kapila
> > > > <amit.kapila16@gmail.com>
> > > > wrote:
> > >
> > > Oh, sorry, I posted the wrong patch, here is the correct one.
> > >
> >
> > The test change looks good to me. I think additionally we can verify
> > that the record is not reflected in the subscriber table. Apart from
> > that, I had made minor changes mostly in the comments in the attached
> > patch. If those look okay to you, please include those in the next version.
> 
> Thanks, the changes look good to me, I merged the diff patch.
> 
> Attach the new version patch which include the following changes:
> 
> - Fix a typo
> - Change the requestreply flag of the newly added WalSndKeepalive to false,
>   because the subscriber can judge whether it's necessary to post a reply
> based
>   on the received LSN.
> - Add a testcase to make sure there is no data in subscriber side when the
>   transaction is skipped.
> - Change the name of flag skipped_empty_xact to skipped_xact which seems
> more
>   understandable.
> - Merge Amit's suggested changes.
> 

I did some more review for the newly added keepalive message and confirmed that
it's necessary to send this in sync mode.

+    if (skipped_xact &&
+        SyncRepRequested() &&
+        ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+        WalSndKeepalive(false, ctx->write_location);

Because in sync replication, the publisher need to get the reply from
subscirber to release the waiter. After applying the patch, we don't send empty
transaction to subscriber, so we won't get a reply without this keepalive
message. Although the walsender usually invoke WalSndWaitForWal() which will
also send a keepalive message to subscriber, and we could get a reply and
release the wait. But WalSndWaitForWal() is not always invoked for each record.
When reading the page, we won't invoke WalSndWaitForWal() if we already have
the record in our buffer[1].

[1] ReadPageInternal(
...
    /* check whether we have all the requested data already */
    if (targetSegNo == state->seg.ws_segno &&
        targetPageOff == state->segoff && reqLen <= state->readLen)
        return state->readLen;
...

Based on above, if we don't have the newly added keepalive message in the
patch, the transaction could wait for a bit more time to finish.

For example, I did some experiments to confirm:
1. Set LOG_SNAPSHOT_INTERVAL_MS and checkpoint_timeout to a bigger value to
   make sure it doesn't generate extra WAL which could affect the test.
2. Use debugger to attach the walsender and let it stop in the WalSndWaitForWal()
3. Start two clients and modify un-published table
postgres1 # INSERT INTO not_rep VALUES(1);
----        waiting
postgres2 # INSERT INTO not_rep VALUES(1);
----        waiting
4. Release the walsender, and we can see it won't send a keepalive to
   subscriber until it has handled all the above two transactions, which means
   the two transaction will wait until all of them has been decoded. This
   behavior doesn't looks good and is inconsistent with the current
   behavior(the transaction will finish after decoding it or after sending it
   to sub if necessary).

So, I think the newly add keepalive message makes sense.

Best regards,
Hou zj

RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Friday, March 25, 2022 8:31 AM houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com> wrote:
> On Thursday, March 24, 2022 11:19 AM houzj.fnst@fujitsu.com wrote:
> > On Tuesday, March 22, 2022 7:50 PM Amit Kapila
> <amit.kapila16@gmail.com>
> > wrote:
> > > On Tue, Mar 22, 2022 at 7:25 AM houzj.fnst@fujitsu.com
> > > <houzj.fnst@fujitsu.com> wrote:
> > > >
> > > > > On Monday, March 21, 2022 6:01 PM Amit Kapila
> > > > > <amit.kapila16@gmail.com>
> > > > > wrote:
> > > >
> > > > Oh, sorry, I posted the wrong patch, here is the correct one.
> > > >
> > >
> > > The test change looks good to me. I think additionally we can verify
> > > that the record is not reflected in the subscriber table. Apart from
> > > that, I had made minor changes mostly in the comments in the attached
> > > patch. If those look okay to you, please include those in the next version.
> >
> > Thanks, the changes look good to me, I merged the diff patch.
> >
> > Attach the new version patch which include the following changes:
> >
> > - Fix a typo
> > - Change the requestreply flag of the newly added WalSndKeepalive to false,
> >   because the subscriber can judge whether it's necessary to post a reply
> > based
> >   on the received LSN.
> > - Add a testcase to make sure there is no data in subscriber side when the
> >   transaction is skipped.
> > - Change the name of flag skipped_empty_xact to skipped_xact which seems
> > more
> >   understandable.
> > - Merge Amit's suggested changes.
> >
> 
> I did some more review for the newly added keepalive message and confirmed
> that it's necessary to send this in sync mode.

Since commit 75b1521 added decoding of sequence to logical 
replication, this patch needs to have send begin message in
pgoutput_sequence if necessary.

Attach the new version patch with this change.

Best regards,
Hou zj



Вложения

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Fri, Mar 25, 2022 at 12:50 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> Attach the new version patch with this change.
>

Few comments:
=================
1. I think we can move the keep_alive check after the tracklag record
check to keep it consistent with another patch [1].
2. Add the comment about the new parameter skipped_xact atop
WalSndUpdateProgress.
3. I think we need to call pq_flush_if_writable after sending a
keepalive message to avoid delaying sync transactions.

[1]:
https://www.postgresql.org/message-id/OS3PR01MB6275C64F264662E84D2FB7AE9E1D9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

-- 
With Regards,
Amit Kapila.



RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Monday, March 28, 2022 3:08 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Fri, Mar 25, 2022 at 12:50 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > Attach the new version patch with this change.
> >
> 
> Few comments:

Thanks for the comments.

> =================
> 1. I think we can move the keep_alive check after the tracklag record
> check to keep it consistent with another patch [1].

Changed.

> 2. Add the comment about the new parameter skipped_xact atop
> WalSndUpdateProgress.

Added.

> 3. I think we need to call pq_flush_if_writable after sending a
> keepalive message to avoid delaying sync transactions.

Agreed.
If we don’t flush the data, we might flush the keepalive later than before. And
we could get the reply later as well and then the release of syncwait could be
delayed.

Attach the new version patch which addressed the above comments.
The patch also adds a loop after the newly added keepalive message
to make sure the message is actually flushed to the client like what
did in WalSndWriteData.

Best regards,
Hou zj


Вложения

Re: logical replication empty transactions

От
Masahiko Sawada
Дата:
On Mon, Mar 28, 2022 at 9:22 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Monday, March 28, 2022 3:08 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Fri, Mar 25, 2022 at 12:50 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > > Attach the new version patch with this change.
> > >
> >
> > Few comments:
>
> Thanks for the comments.
>
> > =================
> > 1. I think we can move the keep_alive check after the tracklag record
> > check to keep it consistent with another patch [1].
>
> Changed.
>
> > 2. Add the comment about the new parameter skipped_xact atop
> > WalSndUpdateProgress.
>
> Added.
>
> > 3. I think we need to call pq_flush_if_writable after sending a
> > keepalive message to avoid delaying sync transactions.
>
> Agreed.
> If we don’t flush the data, we might flush the keepalive later than before. And
> we could get the reply later as well and then the release of syncwait could be
> delayed.
>
> Attach the new version patch which addressed the above comments.
> The patch also adds a loop after the newly added keepalive message
> to make sure the message is actually flushed to the client like what
> did in WalSndWriteData.
>

Thank you for updating the patch!

Some comments:

+       if (skipped_xact &&
+               SyncRepRequested() &&
+               ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+       {
+               WalSndKeepalive(false, ctx->write_location);

I think we can use 'lsn' since it is actually ctx->write_location.

---
+       if (!sent_begin_txn)
+       {
+               elog(DEBUG1, "Skipped replication of an empty
transaction with XID: %u", txn->xid);
+               return;
+       }

The log message should start with lowercase.

---
+# Note that the current location of the log file is not grabbed immediately
+# after reloading the configuration, but after sending one SQL command to
+# the node so as we are sure that the reloading has taken effect.
+$log_location = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES (11)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);

I think we should get the log location of the publisher node, not
subscriber node.

Regards,

--
Masahiko Sawada
EDB:  https://www.enterprisedb.com/



RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Tuesday, March 29, 2022 3:20 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> 
> Some comments:

Thanks for the comments!

> 
> +       if (skipped_xact &&
> +               SyncRepRequested() &&
> +               ((volatile WalSndCtlData *)
> WalSndCtl)->sync_standbys_defined)
> +       {
> +               WalSndKeepalive(false, ctx->write_location);
> 
> I think we can use 'lsn' since it is actually ctx->write_location.

Agreed, and changed.

> ---
> +       if (!sent_begin_txn)
> +       {
> +               elog(DEBUG1, "Skipped replication of an empty
> transaction with XID: %u", txn->xid);
> +               return;
> +       }
> 
> The log message should start with lowercase.

Changed.

> ---
> +# Note that the current location of the log file is not grabbed
> +immediately # after reloading the configuration, but after sending one
> +SQL command to # the node so as we are sure that the reloading has taken
> effect.
> +$log_location = -s $node_subscriber->logfile;
> +
> +$node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES
> +(11)");
> +
> +$node_publisher->wait_for_catchup('tap_sub');
> +
> +$logfile = slurp_file($node_publisher->logfile, $log_location);
> 
> I think we should get the log location of the publisher node, not subscriber
> node.

Changed.

Attach the new version patch which addressed the
above comments and slightly adjusted some code comments.

Best regards,
Hou zj

Вложения

Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Tue, Mar 29, 2022 at 2:05 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> Attach the new version patch which addressed the
> above comments and slightly adjusted some code comments.
>

The patch looks good to me. One minor suggestion is to change the
function name ProcessPendingWritesAndTimeOut() to
ProcessPendingWrites().

-- 
With Regards,
Amit Kapila.



RE: logical replication empty transactions

От
"houzj.fnst@fujitsu.com"
Дата:
On Tuesday, March 29, 2022 5:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> 
> On Tue, Mar 29, 2022 at 2:05 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > Attach the new version patch which addressed the above comments and
> > slightly adjusted some code comments.
> >
> 
> The patch looks good to me. One minor suggestion is to change the function
> name ProcessPendingWritesAndTimeOut() to ProcessPendingWrites().

Thanks for the comment.
Attach the new version patch with this change.

Best regards,
Hou zj

Вложения

RE: logical replication empty transactions

От
"shiy.fnst@fujitsu.com"
Дата:
On Tue, Mar 29, 2022 5:15 PM Hou, Zhijie/侯 志杰 <houzj.fnst@fujitsu.com> wrote:
> 
> Thanks for the comment.
> Attach the new version patch with this change.
> 

Hi,

I did a performance test for this patch to see if it affects performance when
publishing empty transactions, based on the v32 patch.

In this test, I use synchronous logical replication, and publish a table with no
operations on it. The test uses pgbench, each run takes 15 minutes, and I take
median of 3 runs. Drop and recreate db after each run.

The results are as follows, and attach the bar chart. The details of the test is
also attached.

TPS - publishing empty transactions (scale factor 1)
--------------------------------------------------------------------
            4 threads   8 threads   16 threads
HEAD        4818.2837   4353.6243   3888.5995
patched     5111.2936   4555.1629   4024.4286


TPS - publishing empty transactions (scale factor 100)
--------------------------------------------------------------------
            4 threads   8 threads   16 threads
HEAD        9066.6465   16118.0453  21485.1207
patched     9357.3361   16638.6409  24503.6829

There is an improvement of more than 3% after applying this patch, and in the
best case, it improves by 14%, which looks good to me.

Regards,
Shi yu

Вложения

Re: logical replication empty transactions

От
Masahiko Sawada
Дата:
On Tue, Mar 29, 2022 at 6:15 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:
>
> On Tuesday, March 29, 2022 5:12 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Tue, Mar 29, 2022 at 2:05 PM houzj.fnst@fujitsu.com
> > <houzj.fnst@fujitsu.com> wrote:
> > >
> > > Attach the new version patch which addressed the above comments and
> > > slightly adjusted some code comments.
> > >
> >
> > The patch looks good to me. One minor suggestion is to change the function
> > name ProcessPendingWritesAndTimeOut() to ProcessPendingWrites().
>
> Thanks for the comment.
> Attach the new version patch with this change.
>

Thank you for updating the patch. Looks good to me.

Regards,

-- 
Masahiko Sawada
EDB:  https://www.enterprisedb.com/



Re: logical replication empty transactions

От
Amit Kapila
Дата:
On Wed, Mar 30, 2022 at 7:15 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
>
> On Tue, Mar 29, 2022 at 6:15 PM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > Thanks for the comment.
> > Attach the new version patch with this change.
> >
>
> Thank you for updating the patch. Looks good to me.
>

Pushed.

-- 
With Regards,
Amit Kapila.