Обсуждение: CDC/ETL system on top of logical replication with pgoutput, custom client

Поиск
Список
Период
Сортировка

CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi there, hope to find you well.

I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm running into issues with data loss caused by out-of-order logical replication messages.

The problem is as follows: postgres streams A, B, D, G, K, I, P logical replication events, upon exit signal we stop consuming new events at LSN K, and we wait 30s for out-of-order events. Let's say that we only got A, (and K ofc) so in the following 30s, we get B, D, however, for whatever reason, G never arrived. As with pgoutput-based logical replication we have no way to calculate the next LSN, we have no idea that G was missing, so we assumed that it all arrived, committing K to postgres slot and shutdown. In the next run, our worker will start receiving data from K forward, and G is lost forever...
Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we could, would be extremely inefficient.

In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional systems, and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no way to check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't as we didn't receive yet preceding data.

It seems very either to me that none of the open-source CDC projects that I looked into care about this. They always assume that the next LSN received is... well the next one, and commit that one, so upon restart, they are vulnerable to the same issue. So... either I'm missing something... or we have a generalized assumption causing data loss under certain conditions all over.

Am I missing any postgres mechanism that will allow me to at least detect that I'm missing data?

Thanks in advance for any clues on how to deal with this. It has been driving me nuts.
Regards,
José Neves

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Amit Kapila
Дата:
On Mon, Jul 31, 2023 at 3:06 PM José Neves <rafaneves3@msn.com> wrote:
>
> Hi there, hope to find you well.
>
> I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm
runninginto issues with data loss caused by out-of-order logical replication messages. 
>
> The problem is as follows: postgres streams A, B, D, G, K, I, P logical replication events, upon exit signal we stop
consumingnew events at LSN K, and we wait 30s for out-of-order events. Let's say that we only got A, (and K ofc) so in
thefollowing 30s, we get B, D, however, for whatever reason, G never arrived. As with pgoutput-based logical
replicationwe have no way to calculate the next LSN, we have no idea that G was missing, so we assumed that it all
arrived,committing K to postgres slot and shutdown. In the next run, our worker will start receiving data from K
forward,and G is lost forever... 
> Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we
could,would be extremely inefficient. 
>
> In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional
systems,and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no
wayto check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't
aswe didn't receive yet preceding data. 
>

As per my understanding, we stream the data in the commit LSN order
and for a particular transaction, all the changes are per their LSN
order. Now, it is possible that for a parallel transaction, we send
some changes from a prior LSN after sending the commit of another
transaction. Say we have changes as follows:

T-1
change1 LSN1-1000
change2 LSN2- 2000
commit   LSN3- 3000

T-2
change1 LSN1-500
change2 LSN2-1500
commit   LSN3-4000

In such a case, all the changes including the commit of T-1 are sent
and then all the changes including the commit of T-2 are sent. So, one
can say that some of the changes from T-2 from prior LSN arrived after
T-1's commit but that shouldn't be a problem because if restart
happens after we received partial T-2, we should receive the entire
T-2.

It is possible that you are seeing something else but if so then
please try to share a more concrete example.

--
With Regards,
Amit Kapila.



RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi Amit, thanks for the reply.

In our worker (custom pg replication client), we care only about INSERT, UPDATE, and DELETE operations, which - sure - may be part of the issue.
I can only replicate this with production-level load, not easy to get a real example, but as I'm understanding the issue (and building upon your exposition), we are seeing the following:


T-1
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000

T-2
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

If we miss LSN3-3000, let's say, a bad network, and we already received all other LSNs, we will commit to Postgres LSN4-5500 before restarting. LSN3 3000 will never be reattempted. And there are a couple of issues with this scenery:
1. We have no way to match LSN operations with the respective commit, as they have unordered offsets. Assuming that all of them were received in order, we would commit all data with the commit message LSN4-4000 as other events would match the transaction start and end LSN interval of it.
2. Still we have no way to verify that we got all data for a given transaction, we will never miss LSN3-3000 of the first transaction till we look at and analyze the resulting data.

So the question: how can we prevent our worker from committing LSN4-5500 without receiving LSN3-3000? Do we even have enough information out of pgoutput to do that?
PS.: when I say bad network, my suspicion is that this situation may be caused by network saturation on high QPS periods. Data will still arrive eventually but by that time our worker is no longer listening.

Thanks again. Regards,
José Neves





De: Amit Kapila <amit.kapila16@gmail.com>
Enviado: 31 de julho de 2023 14:31
Para: José Neves <rafaneves3@msn.com>
Cc: pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
On Mon, Jul 31, 2023 at 3:06 PM José Neves <rafaneves3@msn.com> wrote:
>
> Hi there, hope to find you well.
>
> I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm running into issues with data loss caused by out-of-order logical replication messages.
>
> The problem is as follows: postgres streams A, B, D, G, K, I, P logical replication events, upon exit signal we stop consuming new events at LSN K, and we wait 30s for out-of-order events. Let's say that we only got A, (and K ofc) so in the following 30s, we get B, D, however, for whatever reason, G never arrived. As with pgoutput-based logical replication we have no way to calculate the next LSN, we have no idea that G was missing, so we assumed that it all arrived, committing K to postgres slot and shutdown. In the next run, our worker will start receiving data from K forward, and G is lost forever...
> Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we could, would be extremely inefficient.
>
> In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional systems, and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no way to check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't as we didn't receive yet preceding data.
>

As per my understanding, we stream the data in the commit LSN order
and for a particular transaction, all the changes are per their LSN
order. Now, it is possible that for a parallel transaction, we send
some changes from a prior LSN after sending the commit of another
transaction. Say we have changes as follows:

T-1
change1 LSN1-1000
change2 LSN2- 2000
commit   LSN3- 3000

T-2
change1 LSN1-500
change2 LSN2-1500
commit   LSN3-4000

In such a case, all the changes including the commit of T-1 are sent
and then all the changes including the commit of T-2 are sent. So, one
can say that some of the changes from T-2 from prior LSN arrived after
T-1's commit but that shouldn't be a problem because if restart
happens after we received partial T-2, we should receive the entire
T-2.

It is possible that you are seeing something else but if so then
please try to share a more concrete example.

--
With Regards,
Amit Kapila.

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
"Euler Taveira"
Дата:
On Sat, Jul 29, 2023, at 8:07 PM, José Neves wrote:
I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm running into issues with data loss caused by out-of-order logical replication messages.

Can you provide a test case to show this issue? Did you try in a newer version?

The problem is as follows: postgres streams A, B, D, G, K, I, P logical replication events, upon exit signal we stop consuming new events at LSN K, and we wait 30s for out-of-order events. Let's say that we only got A, (and K ofc) so in the following 30s, we get B, D, however, for whatever reason, G never arrived. As with pgoutput-based logical replication we have no way to calculate the next LSN, we have no idea that G was missing, so we assumed that it all arrived, committing K to postgres slot and shutdown. In the next run, our worker will start receiving data from K forward, and G is lost forever...
Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we could, would be extremely inefficient.

Logical decoding provides the changes to output plugin at commit time. You
mentioned the logical replication events but didn't say which are part of the
same transaction. Let's say A, B, D and K are changes from the same transaction
and G, I and P are changes from another transaction. The first transaction will
be available when it processes K. The second transaction will be provided when
the logical decoding processes P.

You didn't say how your consumer is working. Are you sure your consumer doesn't
get the second transaction? If your consumer is advancing the replication slot
*after* receiving K (using pg_replication_slot_advance), it is doing it wrong.
Another common problem with consumer is that it uses
pg_logical_slot_get_changes() but *before* using the data it crashes; in this
case, the data is lost.

It is hard to say where the problem is if you didn't provide enough information
about the consumer logic and the WAL information (pg_waldump output) around the
time you detect the data loss.

In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional systems, and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no way to check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't as we didn't receive yet preceding data.

It seems very either to me that none of the open-source CDC projects that I looked into care about this. They always assume that the next LSN received is... well the next one, and commit that one, so upon restart, they are vulnerable to the same issue. So... either I'm missing something... or we have a generalized assumption causing data loss under certain conditions all over.

Let me illustrate the current behavior. Let's say there are 3 concurrent
transactions.

Session A
==========

euler=# SELECT pg_create_logical_replication_slot('repslot1', 'wal2json');
pg_create_logical_replication_slot 
------------------------------------
(repslot1,0/369DF088)
(1 row)

euler=# create table foo (a int primary key);
CREATE TABLE
euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(1, 2);
INSERT 0 2

Session B
==========

euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(11, 12);
INSERT 0 2

Session C
==========

euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(21, 22);
INSERT 0 2

Session A
==========

euler=*# INSERT INTO foo (a) VALUES(3);
INSERT 0 1

Session B
==========

euler=*# INSERT INTO foo (a) VALUES(13);
INSERT 0 1

Session C
==========

euler=*# INSERT INTO foo (a) VALUES(23);
INSERT 0 1
euler=*# COMMIT;
COMMIT

Session B
==========

euler=*# COMMIT;
COMMIT

Session A
==========

euler=*# COMMIT;
COMMIT

The output is:

euler=# SELECT * FROM pg_logical_slot_peek_changes('repslot1', NULL, NULL, 'format-version', '2', 'include-types', '0');
    lsn     |  xid   |                                        data                                        
------------+--------+------------------------------------------------------------------------------------
0/369E4800 | 454539 | {"action":"B"}
0/36A05088 | 454539 | {"action":"C"}
0/36A05398 | 454542 | {"action":"B"}
0/36A05398 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":21}]}
0/36A05418 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":22}]}
0/36A05658 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":23}]}
0/36A057C0 | 454542 | {"action":"C"}
0/36A05258 | 454541 | {"action":"B"}
0/36A05258 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":11}]}
0/36A052D8 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":12}]}
0/36A05598 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":13}]}
0/36A057F0 | 454541 | {"action":"C"}
0/36A050C0 | 454540 | {"action":"B"}
0/36A050C0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":1}]}
0/36A051A0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":2}]}
0/36A054D8 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":3}]}
0/36A05820 | 454540 | {"action":"C"}
(17 rows)

Since session C committed first, it is the first transaction available to output
plugin (wal2json). Transaction 454541 is the next one that is available because
it committed after session C (transaction 454542) and the first transaction
that started (session A) is the last one available. You can also notice that
the first transaction (454540) is the last one available.

Your consumer cannot rely on LSN position or xid to track the progress.
Instead, Postgres provides a replication progress mechanism [1] to do it.




--
Euler Taveira

RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi Euler, thank you for your reply.

Your output is exactly how mine doesn't look like, I don't have such an order - that is - not only under heavy load.
Conditions in which this occurs make it challenging to provide detailed information, and will also take a while to trigger.  I've sent a previous email explaining how my output looks like, from a previous debug.

I can gather more information if needs be, but I was interested in this bit:
Instead, Postgres provides a replication progress mechanism [1] to do it.
It's not 100% clear to me how that would look like at the code level, can you provide a high-level algorithm on how such code would work? For reference, our implementation - to the bones - is very similar to this: https://adam-szpilewicz.pl/cdc-replication-from-postgresql-using-go-golang

Thanks for your help. Regards,
José Neves


De: Euler Taveira <euler@eulerto.com>
Enviado: 31 de julho de 2023 15:27
Para: José Neves <rafaneves3@msn.com>; pgsql-hackers <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
On Sat, Jul 29, 2023, at 8:07 PM, José Neves wrote:
I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm running into issues with data loss caused by out-of-order logical replication messages.

Can you provide a test case to show this issue? Did you try in a newer version?

The problem is as follows: postgres streams A, B, D, G, K, I, P logical replication events, upon exit signal we stop consuming new events at LSN K, and we wait 30s for out-of-order events. Let's say that we only got A, (and K ofc) so in the following 30s, we get B, D, however, for whatever reason, G never arrived. As with pgoutput-based logical replication we have no way to calculate the next LSN, we have no idea that G was missing, so we assumed that it all arrived, committing K to postgres slot and shutdown. In the next run, our worker will start receiving data from K forward, and G is lost forever...
Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we could, would be extremely inefficient.

Logical decoding provides the changes to output plugin at commit time. You
mentioned the logical replication events but didn't say which are part of the
same transaction. Let's say A, B, D and K are changes from the same transaction
and G, I and P are changes from another transaction. The first transaction will
be available when it processes K. The second transaction will be provided when
the logical decoding processes P.

You didn't say how your consumer is working. Are you sure your consumer doesn't
get the second transaction? If your consumer is advancing the replication slot
*after* receiving K (using pg_replication_slot_advance), it is doing it wrong.
Another common problem with consumer is that it uses
pg_logical_slot_get_changes() but *before* using the data it crashes; in this
case, the data is lost.

It is hard to say where the problem is if you didn't provide enough information
about the consumer logic and the WAL information (pg_waldump output) around the
time you detect the data loss.

In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional systems, and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no way to check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't as we didn't receive yet preceding data.

It seems very either to me that none of the open-source CDC projects that I looked into care about this. They always assume that the next LSN received is... well the next one, and commit that one, so upon restart, they are vulnerable to the same issue. So... either I'm missing something... or we have a generalized assumption causing data loss under certain conditions all over.

Let me illustrate the current behavior. Let's say there are 3 concurrent
transactions.

Session A
==========

euler=# SELECT pg_create_logical_replication_slot('repslot1', 'wal2json');
pg_create_logical_replication_slot 
------------------------------------
(repslot1,0/369DF088)
(1 row)

euler=# create table foo (a int primary key);
CREATE TABLE
euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(1, 2);
INSERT 0 2

Session B
==========

euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(11, 12);
INSERT 0 2

Session C
==========

euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(21, 22);
INSERT 0 2

Session A
==========

euler=*# INSERT INTO foo (a) VALUES(3);
INSERT 0 1

Session B
==========

euler=*# INSERT INTO foo (a) VALUES(13);
INSERT 0 1

Session C
==========

euler=*# INSERT INTO foo (a) VALUES(23);
INSERT 0 1
euler=*# COMMIT;
COMMIT

Session B
==========

euler=*# COMMIT;
COMMIT

Session A
==========

euler=*# COMMIT;
COMMIT

The output is:

euler=# SELECT * FROM pg_logical_slot_peek_changes('repslot1', NULL, NULL, 'format-version', '2', 'include-types', '0');
    lsn     |  xid   |                                        data                                        
------------+--------+------------------------------------------------------------------------------------
0/369E4800 | 454539 | {"action":"B"}
0/36A05088 | 454539 | {"action":"C"}
0/36A05398 | 454542 | {"action":"B"}
0/36A05398 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":21}]}
0/36A05418 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":22}]}
0/36A05658 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":23}]}
0/36A057C0 | 454542 | {"action":"C"}
0/36A05258 | 454541 | {"action":"B"}
0/36A05258 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":11}]}
0/36A052D8 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":12}]}
0/36A05598 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":13}]}
0/36A057F0 | 454541 | {"action":"C"}
0/36A050C0 | 454540 | {"action":"B"}
0/36A050C0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":1}]}
0/36A051A0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":2}]}
0/36A054D8 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":3}]}
0/36A05820 | 454540 | {"action":"C"}
(17 rows)

Since session C committed first, it is the first transaction available to output
plugin (wal2json). Transaction 454541 is the next one that is available because
it committed after session C (transaction 454542) and the first transaction
that started (session A) is the last one available. You can also notice that
the first transaction (454540) is the last one available.

Your consumer cannot rely on LSN position or xid to track the progress.
Instead, Postgres provides a replication progress mechanism [1] to do it.




--
Euler Taveira

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Andres Freund
Дата:
Hi,

On 2023-07-31 14:16:22 +0000, José Neves wrote:
> Hi Amit, thanks for the reply.
>
> In our worker (custom pg replication client), we care only about INSERT,
> UPDATE, and DELETE operations, which - sure - may be part of the issue.

That seems likely. Postgres streams out changes in commit order, not in order
of the changes having been made (that'd not work due to rollbacks etc). If you
just disregard transactions entirely, you'll get something bogus after
retries.

You don't need to store the details for each commit in the target system, just
up to which LSN you have processed *commit records*. E.g. if you have received
and safely stored up to commit 0/1000, you need to remember that.


Are you using the 'streaming' mode / option to pgoutput?


> 1. We have no way to match LSN operations with the respective commit, as
> they have unordered offsets.

Not sure what you mean with "unordered offsets"?


> Assuming that all of them were received in order, we would commit all data with the commit message LSN4-4000 as other
eventswould match the transaction start and end LSN interval of it.
 

Logical decoding sends out changes in a deterministic order and you won't see
out of order data when using TCP (the entire connection can obviously fail
though).

Andres



RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi Andres, thanks for your reply.

Ok, if I understood you correctly, I start to see where my logic is faulty. Just to make sure that I got it right, taking the following example again:

T-1
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000

T-2
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

Where data will arrive in this order:

INSERT LSN1-500
INSERT LSN1-1000
UPDATE LSN2-1500
UPDATE LSN2-2000
UPDATE LSN3-2500
UPDATE LSN3-3000
COMMIT  LSN4-4000
COMMIT  LSN4-5500

You are saying that the LSN3-3000 will never be missing, either the entire connection will fail at that point, or all should be received in the expected order (which is different from the "numeric order" of LSNs). If the connection is down, upon restart, I will receive the entire T-1 transaction again (well, all example data again).
In addition to that, if I commit LSN4-4000, even tho that LSN has a "bigger numeric value" than the ones representing INSERT and UPDATE events on T-2, I will be receiving the entire T-2 transaction again, as the LSN4-5500 is still uncommitted. 
This makes sense to me, but just to be extra clear, I will never receive a transaction commit before receiving all other events for that transaction.
Are these statements correct?

>Are you using the 'streaming' mode / option to pgoutput?
No.

>Not sure what you mean with "unordered offsets"?
Ordered: EB53/E0D88188, EB53/E0D88189, EB53/E0D88190
Unordered: EB53/E0D88190, EB53/E0D88188, EB53/E0D88189

Extra question: When I get a begin message, I get a transaction starting at LSN-1000, and a transaction ending at LSN-2000. But as the example above shows, I can have data points from other transactions with LSNs in that interval. I have no way to identify to which transaction they belong, correct?

Thanks again. Regards,
José Neves

De: Andres Freund <andres@anarazel.de>
Enviado: 31 de julho de 2023 21:39
Para: José Neves <rafaneves3@msn.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
Hi,

On 2023-07-31 14:16:22 +0000, José Neves wrote:
> Hi Amit, thanks for the reply.
>
> In our worker (custom pg replication client), we care only about INSERT,
> UPDATE, and DELETE operations, which - sure - may be part of the issue.

That seems likely. Postgres streams out changes in commit order, not in order
of the changes having been made (that'd not work due to rollbacks etc). If you
just disregard transactions entirely, you'll get something bogus after
retries.

You don't need to store the details for each commit in the target system, just
up to which LSN you have processed *commit records*. E.g. if you have received
and safely stored up to commit 0/1000, you need to remember that.


Are you using the 'streaming' mode / option to pgoutput?


> 1. We have no way to match LSN operations with the respective commit, as
> they have unordered offsets.

Not sure what you mean with "unordered offsets"?


> Assuming that all of them were received in order, we would commit all data with the commit message LSN4-4000 as other events would match the transaction start and end LSN interval of it.

Logical decoding sends out changes in a deterministic order and you won't see
out of order data when using TCP (the entire connection can obviously fail
though).

Andres

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Andres Freund
Дата:
Hi,

On 2023-07-31 21:25:06 +0000, José Neves wrote:
> Ok, if I understood you correctly, I start to see where my logic is faulty. Just to make sure that I got it right,
takingthe following example again:
 
>
> T-1
> INSERT LSN1-1000
> UPDATE LSN2-2000
> UPDATE LSN3-3000
> COMMIT  LSN4-4000
>
> T-2
> INSERT LSN1-500
> UPDATE LSN2-1500
> UPDATE LSN3-2500
> COMMIT  LSN4-5500
>
> Where data will arrive in this order:
>
> INSERT LSN1-500
> INSERT LSN1-1000
> UPDATE LSN2-1500
> UPDATE LSN2-2000
> UPDATE LSN3-2500
> UPDATE LSN3-3000
> COMMIT  LSN4-4000
> COMMIT  LSN4-5500

No, they won't arrive in that order. They will arive as

BEGIN
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000
BEGIN
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

Because T1 committed before T2. Changes are only streamed out at commit /
prepare transaction (*). Within a transaction, they however *will* be ordered
by LSN.

(*) Unless you use streaming mode, in which case it'll all be more
complicated, as you'll also receive changes for transactions that might still
abort.


> You are saying that the LSN3-3000 will never be missing, either the entire
> connection will fail at that point, or all should be received in the
> expected order (which is different from the "numeric order" of LSNs).

I'm not quite sure what you mean with the "different from the numeric order"
bit...


> If the connection is down, upon restart, I will receive the entire T-1
> transaction again (well, all example data again).

Yes, unless you already acknowledged receipt up to LSN4-4000 and/or are only
asking for newer transactions when reconnecting.


> In addition to that, if I commit LSN4-4000, even tho that LSN has a "bigger
> numeric value" than the ones representing INSERT and UPDATE events on T-2, I
> will be receiving the entire T-2 transaction again, as the LSN4-5500 is
> still uncommitted.

I don't quite know what you mean with "commit LSN4-4000" here.


> This makes sense to me, but just to be extra clear, I will never receive a
> transaction commit before receiving all other events for that transaction.

Correct.

Greetings,

Andres Freund



RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi Andres.

Owh, I see the error of my way... :(

By ignoring commits, and committing individual operation LSNs, I was effectively rolling back the subscription. In the previous example, if I committed the LSN of the first insert of the second transaction (LSN1-500), I was basically telling Postgres to send everything again, including the already processed T1.

what you mean with the "different from the numeric order"
I'm probably lacking terminology. I mean that LSN4-5500 > LSN4-4000 > LSN3-3000 > LSN3-2500...

But, if I'm understanding correctly, I can only rely on the incremental sequence to be true for the commit events. Which explains my pain.
The world makes sense again.

Thank you very much. Will try to implement this new logic, and hopefully not bug again with this issue.
Regards,
José Neves

De: Andres Freund <andres@anarazel.de>
Enviado: 1 de agosto de 2023 00:21
Para: José Neves <rafaneves3@msn.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
Hi,

On 2023-07-31 21:25:06 +0000, José Neves wrote:
> Ok, if I understood you correctly, I start to see where my logic is faulty. Just to make sure that I got it right, taking the following example again:
>
> T-1
> INSERT LSN1-1000
> UPDATE LSN2-2000
> UPDATE LSN3-3000
> COMMIT  LSN4-4000
>
> T-2
> INSERT LSN1-500
> UPDATE LSN2-1500
> UPDATE LSN3-2500
> COMMIT  LSN4-5500
>
> Where data will arrive in this order:
>
> INSERT LSN1-500
> INSERT LSN1-1000
> UPDATE LSN2-1500
> UPDATE LSN2-2000
> UPDATE LSN3-2500
> UPDATE LSN3-3000
> COMMIT  LSN4-4000
> COMMIT  LSN4-5500

No, they won't arrive in that order. They will arive as

BEGIN
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000
BEGIN
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

Because T1 committed before T2. Changes are only streamed out at commit /
prepare transaction (*). Within a transaction, they however *will* be ordered
by LSN.

(*) Unless you use streaming mode, in which case it'll all be more
complicated, as you'll also receive changes for transactions that might still
abort.


> You are saying that the LSN3-3000 will never be missing, either the entire
> connection will fail at that point, or all should be received in the
> expected order (which is different from the "numeric order" of LSNs).

I'm not quite sure what you mean with the "different from the numeric order"
bit...


> If the connection is down, upon restart, I will receive the entire T-1
> transaction again (well, all example data again).

Yes, unless you already acknowledged receipt up to LSN4-4000 and/or are only
asking for newer transactions when reconnecting.


> In addition to that, if I commit LSN4-4000, even tho that LSN has a "bigger
> numeric value" than the ones representing INSERT and UPDATE events on T-2, I
> will be receiving the entire T-2 transaction again, as the LSN4-5500 is
> still uncommitted.

I don't quite know what you mean with "commit LSN4-4000" here.


> This makes sense to me, but just to be extra clear, I will never receive a
> transaction commit before receiving all other events for that transaction.

Correct.

Greetings,

Andres Freund

RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi there, hope to find you all well.

A follow-up on this. Indeed, a new commit-based approach solved my missing data issues.
But, getting back to the previous examples, how are server times expected to be logged for the xlogs containing these records?

With these 2 transactions:
T-1
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000

T-2
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

Arriving this way:
BEGIN
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000

BEGIN
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

Are server times for them expected to be:

BEGIN
INSERT LSN1-1000 - 2
UPDATE LSN2-2000 - 4
UPDATE LSN3-3000 - 6
COMMIT  LSN4-4000 - 7

BEGIN
INSERT LSN1-500 - 1
UPDATE LSN2-1500 - 3
UPDATE LSN3-2500 - 5
COMMIT  LSN4-5500 - 8

Or:

BEGIN
INSERT LSN1-1000 - 1
UPDATE LSN2-2000 - 2
UPDATE LSN3-3000 - 3
COMMIT  LSN4-4000 - 4

BEGIN
INSERT LSN1-500 - 5
UPDATE LSN2-1500 - 6
UPDATE LSN3-2500 - 7
COMMIT  LSN4-5500 - 8

I'm asking because altho I'm no longer missing data, I have a second async process that can fail (publishing data to an event messaging service), and therefore there is a possibility of data duplication. Worst I've to split large transactions as message sizes are limited. Would be nice if I could rely on server time ts to discard duplicated data...

Thanks.
Regards,
José Neves

De: José Neves <rafaneves3@msn.com>
Enviado: 1 de agosto de 2023 10:13
Para: Andres Freund <andres@anarazel.de>
Cc: Amit Kapila <amit.kapila16@gmail.com>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: RE: CDC/ETL system on top of logical replication with pgoutput, custom client
 
Hi Andres.

Owh, I see the error of my way... :(

By ignoring commits, and committing individual operation LSNs, I was effectively rolling back the subscription. In the previous example, if I committed the LSN of the first insert of the second transaction (LSN1-500), I was basically telling Postgres to send everything again, including the already processed T1.

what you mean with the "different from the numeric order"
I'm probably lacking terminology. I mean that LSN4-5500 > LSN4-4000 > LSN3-3000 > LSN3-2500...

But, if I'm understanding correctly, I can only rely on the incremental sequence to be true for the commit events. Which explains my pain.
The world makes sense again.

Thank you very much. Will try to implement this new logic, and hopefully not bug again with this issue.
Regards,
José Neves

De: Andres Freund <andres@anarazel.de>
Enviado: 1 de agosto de 2023 00:21
Para: José Neves <rafaneves3@msn.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
Hi,

On 2023-07-31 21:25:06 +0000, José Neves wrote:
> Ok, if I understood you correctly, I start to see where my logic is faulty. Just to make sure that I got it right, taking the following example again:
>
> T-1
> INSERT LSN1-1000
> UPDATE LSN2-2000
> UPDATE LSN3-3000
> COMMIT  LSN4-4000
>
> T-2
> INSERT LSN1-500
> UPDATE LSN2-1500
> UPDATE LSN3-2500
> COMMIT  LSN4-5500
>
> Where data will arrive in this order:
>
> INSERT LSN1-500
> INSERT LSN1-1000
> UPDATE LSN2-1500
> UPDATE LSN2-2000
> UPDATE LSN3-2500
> UPDATE LSN3-3000
> COMMIT  LSN4-4000
> COMMIT  LSN4-5500

No, they won't arrive in that order. They will arive as

BEGIN
INSERT LSN1-1000
UPDATE LSN2-2000
UPDATE LSN3-3000
COMMIT  LSN4-4000
BEGIN
INSERT LSN1-500
UPDATE LSN2-1500
UPDATE LSN3-2500
COMMIT  LSN4-5500

Because T1 committed before T2. Changes are only streamed out at commit /
prepare transaction (*). Within a transaction, they however *will* be ordered
by LSN.

(*) Unless you use streaming mode, in which case it'll all be more
complicated, as you'll also receive changes for transactions that might still
abort.


> You are saying that the LSN3-3000 will never be missing, either the entire
> connection will fail at that point, or all should be received in the
> expected order (which is different from the "numeric order" of LSNs).

I'm not quite sure what you mean with the "different from the numeric order"
bit...


> If the connection is down, upon restart, I will receive the entire T-1
> transaction again (well, all example data again).

Yes, unless you already acknowledged receipt up to LSN4-4000 and/or are only
asking for newer transactions when reconnecting.


> In addition to that, if I commit LSN4-4000, even tho that LSN has a "bigger
> numeric value" than the ones representing INSERT and UPDATE events on T-2, I
> will be receiving the entire T-2 transaction again, as the LSN4-5500 is
> still uncommitted.

I don't quite know what you mean with "commit LSN4-4000" here.


> This makes sense to me, but just to be extra clear, I will never receive a
> transaction commit before receiving all other events for that transaction.

Correct.

Greetings,

Andres Freund

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Amit Kapila
Дата:
On Sun, Aug 6, 2023 at 7:54 PM José Neves <rafaneves3@msn.com> wrote:
>
> A follow-up on this. Indeed, a new commit-based approach solved my missing data issues.
> But, getting back to the previous examples, how are server times expected to be logged for the xlogs containing these
records?
>

I think it should be based on commit_time because as far as I see we
can only get that on the client.

--
With Regards,
Amit Kapila.



RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi Amit.

Humm, that's... challenging. I faced some issues after "the fix" because I had a couple of transactions with 25k updates, and I had to split it to be able to push to our event messaging system, as our max message size is 10MB. Relying on commit time would mean that all transaction operations will have the same timestamp. If something goes wrong while my worker is pushing that transaction data chunks, I will duplicate some data in the next run, so... this wouldn't allow me to deal with data duplication.
Is there any other way that you see to deal with it?

Right now I only see an option, which is to store all processed LSNs on the other side of the ETL. I'm trying to avoid that overhead.

Thanks.
Regards,
José Neves

De: Amit Kapila <amit.kapila16@gmail.com>
Enviado: 7 de agosto de 2023 05:59
Para: José Neves <rafaneves3@msn.com>
Cc: Andres Freund <andres@anarazel.de>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
On Sun, Aug 6, 2023 at 7:54 PM José Neves <rafaneves3@msn.com> wrote:
>
> A follow-up on this. Indeed, a new commit-based approach solved my missing data issues.
> But, getting back to the previous examples, how are server times expected to be logged for the xlogs containing these records?
>

I think it should be based on commit_time because as far as I see we
can only get that on the client.

--
With Regards,
Amit Kapila.

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Amit Kapila
Дата:
On Mon, Aug 7, 2023 at 1:46 PM José Neves <rafaneves3@msn.com> wrote:
>
> Humm, that's... challenging. I faced some issues after "the fix" because I had a couple of transactions with 25k
updates,and I had to split it to be able to push to our event messaging system, as our max message size is 10MB.
Relyingon commit time would mean that all transaction operations will have the same timestamp. If something goes wrong
whilemy worker is pushing that transaction data chunks, I will duplicate some data in the next run, so... this wouldn't
allowme to deal with data duplication. 
> Is there any other way that you see to deal with it?
>
> Right now I only see an option, which is to store all processed LSNs on the other side of the ETL. I'm trying to
avoidthat overhead. 
>

Sorry, I don't understand your system enough to give you suggestions
but if you have any questions related to how logical replication work
then I might be able to help.

--
With Regards,
Amit Kapila.



RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Hi there, hope to find you well.

I have a follow-up question to this already long thread.

Upon deploying my PostgreSQL logical replication fed application on a stale database, I ended up running out of space, as the replication slot is being held back till the next time that we receive a data-changing event, and we advance to that new LSN offset.
I think that the solution for this is to advance our LSN offset every time a keep-alive message is received ('k' // 107).
My doubt is, can the keep-alive messages be received in between open transaction events? I think not, but I would like to get your input to be extra sure as if this happens, and I commit that offset, I may introduce again faulty logic leading to data loss.

In sum, something like this wouldn't happen:
BEGIN LSN001
INSERT LSN002
KEEP LIVE LSN003
UPDATE LSN004
COMMIT LSN005

Correct? It has to be either:

KEEP LIVE LSN001
BEGIN LSN002
INSERT LSN003

UPDATE LSN004
COMMIT LSN005

Or:
BEGIN LSN001
INSERT LSN002

UPDATE LSN004
COMMIT LSN005
KEEP LIVE LSN006

LSNXXX are mere representations of LSN offsets.

Thank you again.
Regards,
José Neves


De: Amit Kapila <amit.kapila16@gmail.com>
Enviado: 8 de agosto de 2023 14:37
Para: José Neves <rafaneves3@msn.com>
Cc: Andres Freund <andres@anarazel.de>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
On Mon, Aug 7, 2023 at 1:46 PM José Neves <rafaneves3@msn.com> wrote:
>
> Humm, that's... challenging. I faced some issues after "the fix" because I had a couple of transactions with 25k updates, and I had to split it to be able to push to our event messaging system, as our max message size is 10MB. Relying on commit time would mean that all transaction operations will have the same timestamp. If something goes wrong while my worker is pushing that transaction data chunks, I will duplicate some data in the next run, so... this wouldn't allow me to deal with data duplication.
> Is there any other way that you see to deal with it?
>
> Right now I only see an option, which is to store all processed LSNs on the other side of the ETL. I'm trying to avoid that overhead.
>

Sorry, I don't understand your system enough to give you suggestions
but if you have any questions related to how logical replication work
then I might be able to help.

--
With Regards,
Amit Kapila.

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Ashutosh Bapat
Дата:
On Tue, Oct 24, 2023 at 8:53 PM José Neves <rafaneves3@msn.com> wrote:
>
> Hi there, hope to find you well.
>
> I have a follow-up question to this already long thread.
>
> Upon deploying my PostgreSQL logical replication fed application on a stale database, I ended up running out of
space,as the replication slot is being held back till the next time that we receive a data-changing event, and we
advanceto that new LSN offset. 
> I think that the solution for this is to advance our LSN offset every time a keep-alive message is received ('k' //
107).
> My doubt is, can the keep-alive messages be received in between open transaction events? I think not, but I would
liketo get your input to be extra sure as if this happens, and I commit that offset, I may introduce again faulty logic
leadingto data loss. 
>
> In sum, something like this wouldn't happen:
> BEGIN LSN001
> INSERT LSN002
> KEEP LIVE LSN003
> UPDATE LSN004
> COMMIT LSN005
>

If the downstream acknowledges receipt of LSN003 and saves it locally
and crashes, upon restart the upstream will resend all the
transactions that committed after LSN003 including the one ended at
LSN005. So this is safe.

--
Best Wishes,
Ashutosh



RE: CDC/ETL system on top of logical replication with pgoutput, custom client

От
José Neves
Дата:
Ok, I see. In that situation is safe indeed, as the offset is lower than the current transaction commit.
But I think that I asked the wrong question. I guess that the right question is: Can we receive a keep-alive message with an LSN offset bigger than the commit of the open or following transactions?
Something like:

BEGIN LSN001
INSERT LSN002
KEEP LIVE LSN006
UPDATE LSN004
COMMIT LSN005

Or:

KEEP LIVE LSN006
BEGIN LSN001
INSERT LSN002
UPDATE LSN004
COMMIT LSN005
KEEP LIVE LSN007

Or is the sequence ensured not only between commits but also with keep-alive messaging?

De: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Enviado: 25 de outubro de 2023 11:42
Para: José Neves <rafaneves3@msn.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>; Andres Freund <andres@anarazel.de>; pgsql-hackers@postgresql.org <pgsql-hackers@postgresql.org>
Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
 
On Tue, Oct 24, 2023 at 8:53 PM José Neves <rafaneves3@msn.com> wrote:
>
> Hi there, hope to find you well.
>
> I have a follow-up question to this already long thread.
>
> Upon deploying my PostgreSQL logical replication fed application on a stale database, I ended up running out of space, as the replication slot is being held back till the next time that we receive a data-changing event, and we advance to that new LSN offset.
> I think that the solution for this is to advance our LSN offset every time a keep-alive message is received ('k' // 107).
> My doubt is, can the keep-alive messages be received in between open transaction events? I think not, but I would like to get your input to be extra sure as if this happens, and I commit that offset, I may introduce again faulty logic leading to data loss.
>
> In sum, something like this wouldn't happen:
> BEGIN LSN001
> INSERT LSN002
> KEEP LIVE LSN003
> UPDATE LSN004
> COMMIT LSN005
>

If the downstream acknowledges receipt of LSN003 and saves it locally
and crashes, upon restart the upstream will resend all the
transactions that committed after LSN003 including the one ended at
LSN005. So this is safe.

--
Best Wishes,
Ashutosh

Re: CDC/ETL system on top of logical replication with pgoutput, custom client

От
Ashutosh Bapat
Дата:
On Wed, Oct 25, 2023 at 4:23 PM José Neves <rafaneves3@msn.com> wrote:
>
> Ok, I see. In that situation is safe indeed, as the offset is lower than the current transaction commit.
> But I think that I asked the wrong question. I guess that the right question is: Can we receive a keep-alive message
withan LSN offset bigger than the commit of the open or following transactions? 
> Something like:
>
> BEGIN LSN001
> INSERT LSN002
> KEEP LIVE LSN006
> UPDATE LSN004
> COMMIT LSN005
>
> Or:
>
> KEEP LIVE LSN006
> BEGIN LSN001
> INSERT LSN002
> UPDATE LSN004
> COMMIT LSN005
> KEEP LIVE LSN007
>

AFAIU the code in walsender this isn't possible. Keep alive sends the
LSN of the last WAL record it read (sentPtr). Upon reading a commit
WAL record, the whole transaction is decoded. Till that point sentPtr
is not updated.

Please take a look at XLogSendLogical(void) and the places where
WalSndKeepaliveIfNecessary() is called.

--
Best Wishes,
Ashutosh Bapat