48.1. Примеры логического декодирования
Следующий пример демонстрирует управление логическим декодированием на уровне SQL.
Прежде чем вы сможете использовать логическое декодирование, вы должны установить в wal_level значение logical
, а в max_replication_slots значение, не меньшее 1. После этого вы должны подключиться к целевой базе данных (в следующем примере, это postgres
) как суперпользователь.
postgres=# -- Создать слот с именем 'regression_slot', использующий модуль вывода 'test_decoding' postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); slot_name | lsn -----------------+----------- regression_slot | 0/16B1970 (1 row) postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn -----------------+---------------+-----------+----------+--------+-------------+----------------- regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440 (1 row) postgres=# -- Пока никакие изменения не видны postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----+-----+------ (0 rows) postgres=# CREATE TABLE data(id serial primary key, data text); CREATE TABLE postgres=# -- DDL не реплицируется, поэтому видна только транзакция postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+-------------- 0/BA2DA58 | 10297 | BEGIN 10297 0/BA5A5A0 | 10297 | COMMIT 10297 (2 rows) postgres=# -- Когда изменения прочитаны, они считаются обработанными и уже не выдаются postgres=# -- в последующем вызове: postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----+-----+------ (0 rows) postgres=# BEGIN; postgres=*# INSERT INTO data(data) VALUES('1'); postgres=*# INSERT INTO data(data) VALUES('2'); postgres=*# COMMIT; postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A688 | 10298 | BEGIN 10298 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1' 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2' 0/BA5A8A8 | 10298 | COMMIT 10298 (4 rows) postgres=# INSERT INTO data(data) VALUES('3'); postgres=# -- Также можно заглянуть вперёд в потоке изменений, не считывая эти изменения postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A8E0 | 10299 | BEGIN 10299 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/BA5A990 | 10299 | COMMIT 10299 (3 rows) postgres=# -- Следующий вызов pg_logical_slot_peek_changes() снова возвращает те же изменения postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A8E0 | 10299 | BEGIN 10299 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/BA5A990 | 10299 | COMMIT 10299 (3 rows) postgres=# -- Модулю вывода можно передать параметры, влияющие на форматирование postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); lsn | xid | data -----------+-------+--------------------------------------------------------- 0/BA5A8E0 | 10299 | BEGIN 10299 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04) (3 rows) postgres=# -- Не забудьте удалить слот, который вам больше не нужен, чтобы он postgres=# -- не потреблял ресурсы сервера: postgres=# SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot ----------------------- (1 row)
Следующий пример показывает, как можно управлять логическим декодированием средствами протокола потоковой репликации, используя программу pg_recvlogical, включённую в дистрибутив PostgreSQL. Для этого нужно, чтобы конфигурация аутентификации клиентов допускала подключения для репликации (см. Подраздел 26.2.5.1) и чтобы значение max_wal_senders
было достаточно большим и позволило установить дополнительное подключение.
$ pg_recvlogical -d postgres --slot=test --create-slot $ pg_recvlogical -d postgres --slot=test --start -f - Control+Z $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');" $ fg BEGIN 693 table public.data: INSERT: id[integer]:4 data[text]:'4' COMMIT 693 Control+C $ pg_recvlogical -d postgres --slot=test --drop-slot