Обсуждение: Logical replication (pgoutput plugin) in streaming mode: peek() always starts from beginning of transaction, not from latest stream block

Поиск
Список
Период
Сортировка
Hi everyone,

When using logical replication with the pgoutput plugin, on PG 16,we do the following:
  1) SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'false')
  2) Get LSN of last row (Commit)
  3) SELECT * FROM pg_replication_slot_advance('test_slot_v1', <Commit LSN>);  
  4) Repeat.

And this works perfectly fine when streaming = false. When turning on streaming the expectation is that the same thing happens, except the the LSN being passed to pg_replication_slot_advance() is for a Stream End record. On the next call to pg_logical_slot_peek_binary_changes() we should get the subsequent Stream Start record. But instead, the stream starts over from the transaction Begin record. Observe: 

*** Demo starts ***
*** Initially there are no changes, peek() returns nothing: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
 lsn | xid | data
-----+-----+------
(0 rows)

*** Slot status: ***

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;                                                               slot_name   | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
 test_slot_v1 | f      | 2/98CE060   | 2/98CE060
(1 rows)


*** Now make some changes (delete then insert a bunch of records) and call peek()          ***
*** The predicate filters out Delete and Insert records, leaving Stream Start (\x53 = S),  ***
*** Relation (\x52 = R), Stream End (\x45 = E), and Stream Commit (\x63 = c)               ***

abinitio=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
    lsn     | xid  |                                                                 data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 2/A222A20  | 1112 | \x530000045801
 2/A222A20  | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/C141BE8  | 1112 | \x45
 2/C141C28  | 1112 | \x530000045800
 2/DF598D8  | 1112 | \x45
 2/DF59950  | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
 2/DF59950  | 1114 | \x530000045a01
 2/DF59950  | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/108918D0 | 1114 | \x45
 2/108918D0 | 1114 | \x530000045a00
 2/131E1310 | 1114 | \x45
 2/131E1310 | 1114 | \x530000045a00
 2/137D7768 | 1114 | \x45
 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)

*** It was a peek() so the status is unchanged: ***

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;                                                               slot_name   | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
 test_slot_v1 | f      | 2/98CE060   | 2/98CE060
(1 rows)

*** Now advance the slot to the first Stream End record: ***

=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/C141BE8');                                                                             slot_name   |  end_lsn
--------------+-----------
 test_slot_v1 | 2/C141BE8
(1 row)

*** confirmed_flush_lsn is updated as expected: ****

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
  slot_name   | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
 test_slot_v1 | f      | 2/9B09D10   | 2/C141BE8
(1 rows)

*** Now peek() again. It is starting from earlier than confirmed_flush_lsn: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
    lsn     | xid  |                                                                 data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 2/A222A20  | 1112 | \x530000045801
 2/A222A20  | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/C141BE8  | 1112 | \x45
 2/C141C28  | 1112 | \x530000045800
 2/DF598D8  | 1112 | \x45
 2/DF59950  | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
 2/DF59950  | 1114 | \x530000045a01
 2/DF59950  | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/108918D0 | 1114 | \x45
 2/108918D0 | 1114 | \x530000045a00
 2/131E1310 | 1114 | \x45
 2/131E1310 | 1114 | \x530000045a00
 2/137D7768 | 1114 | \x45
 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)

*** Next advance to the Stream Commit record: ***

=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/DF59950');                                                                             slot_name   |  end_lsn
--------------+-----------
 test_slot_v1 | 2/DF59950
(1 row)

*** This time the peek() starts from the correct LSN: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
    lsn     | xid  |                                                                 data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
 2/DF59950  | 1114 | \x530000045a01
 2/DF59950  | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
 2/108918D0 | 1114 | \x45
 2/108918D0 | 1114 | \x530000045a00
 2/131E1310 | 1114 | \x45
 2/131E1310 | 1114 | \x530000045a00
 2/137D7768 | 1114 | \x45
 2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(8 rows)

*** End of demo ***

The question is whether that is by design or a bug, and if by design maybe someone can explain how this is meant to be used, because it's not clear. It will work eventually if argument upto_nchanges  is NULL, because when the transaction completes we get a Stream Commit record and can advance, but in the meantime we'll have ingested a lot of duplicate records we now have to deal with. And if  argument upto_nchanges is not NULL we're stuck because peek() will only returns one or more Stream blocks until the number of returned rows exceeds upto_nchanges , and then returns the same blocks over and over again forever because we cannot advance, and never see the Stream Commit record.

Thank you.

Guillaume.