47.1. Примеры логического декодирования

Следующий пример демонстрирует управление логическим декодированием на уровне SQL.

Прежде чем вы сможете использовать логическое декодирование, вы должны установить в wal_level значение logical, а в max_replication_slots значение, не меньшее 1. После этого вы должны подключиться к целевой базе данных (в следующем примере, это postgres) как суперпользователь.

postgres=# -- Создать слот с именем 'regression_slot', использующий модуль вывода 'test_decoding'
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name    | xlog_position
-----------------+---------------
 regression_slot | 0/16B1970
(1 row)

postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
    slot_name    |    plugin     | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+-------------+-----------------
 regression_slot | test_decoding | logical   | postgres | f      | 0/16A4408   | 0/16A4440
(1 row)

postgres=# -- Пока никакие изменения не видны
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE

postgres=# -- DDL не реплицируется, поэтому видна только транзакция
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location  | xid |    data
-----------+-----+------------
 0/16D5D48 | 688 | BEGIN 688
 0/16E0380 | 688 | COMMIT 688
(2 rows)

postgres=# -- Когда изменения прочитаны, они считаются обработанными и уже не выдаются
postgres=# -- в последующем вызове:
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;

postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E0478 | 689 | BEGIN 689
 0/16E0478 | 689 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 0/16E0580 | 689 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 0/16E0650 | 689 | COMMIT 689
(4 rows)

postgres=# INSERT INTO data(data) VALUES('3');

postgres=# -- Также можно заглянуть вперёд в потоке изменений, не считывая эти изменения
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E09C0 | 690 | BEGIN 690
 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 0/16E0B90 | 690 | COMMIT 690
(3 rows)

postgres=# -- Следующий вызов pg_logical_slot_peek_changes() снова возвращает те же изменения
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E09C0 | 690 | BEGIN 690
 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 0/16E0B90 | 690 | COMMIT 690
(3 rows)

postgres=# -- Модулю вывода можно передать параметры, влияющие на форматирование
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E09C0 | 690 | BEGIN 690
 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01)
(3 rows)

postgres=# -- Не забудьте удалить слот, который вам больше не нужен, чтобы он
postgres=# -- не потреблял ресурсы сервера:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
-----------------------

(1 row)

Следующий пример показывает, как можно управлять логическим декодированием средствами протокола потоковой репликации, используя программу pg_recvlogical, включённую в дистрибутив PostgreSQL. Для этого нужно, чтобы конфигурация аутентификации клиентов допускала подключения для репликации (см. Подраздел 26.2.5.1) и чтобы значение max_wal_senders было достаточно большим и позволило установить дополнительное подключение.

$ pg_recvlogical -d postgres --slot test --create-slot
$ pg_recvlogical -d postgres --slot test --start -f -
Control+Z
$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
$ fg
BEGIN 693
table public.data: INSERT: id[integer]:4 data[text]:'4'
COMMIT 693
Control+C
$ pg_recvlogical -d postgres --slot test --drop-slot