H.3. wal2json — преобразование изменений из WAL в JSON с помощью логического декодирования #
Модуль wal2json — это расширение Postgres Pro для логического декодирования, которое преобразует изменения базы данных из журнала предзаписи (WAL) в формат JSON
. Postgres Pro имеет доступ к кортежам, сгенерированным командами INSERT
и UPDATE
. В зависимости от выбора варианта идентификации реплики также могут быть доступны предыдущие версии строк для команд UPDATE
и DELETE
. Изменения можно получить либо по протоколу потоковой репликации (слоты логической репликации), либо через специальный SQL API.
Формат версии 1 создаёт JSON-объект для каждой транзакции. Этот объект содержит все новые и старые кортежи. Также есть возможность включить в него такие свойства, как метки времени транзакций, имена, дополненные именами схем, типы данных и идентификаторы транзакций.
Формат версии 2 создаёт JSON-объект для каждого кортежа с необязательными JSON-объектами, обозначающими начало и конец транзакции. Также можно включить разные свойства транзакции.
H.3.1. Установка и настройка #
wal2json поставляется вместе с Postgres Pro Standard в виде отдельного пакета wal2json-std-17
(подробные инструкции по установке приведены в Главе 16). После установки Postgres Pro Standard выполните следующие действия, чтобы подготовить wal2json к работе:
Включите логическое декодирование, задав для параметра
wal_level
значениеlogical
в файлеpostgresql.conf
.Дополнительно вы можете задать значения для параметров
max_replication_slots
иmax_wal_senders
.Перезапустите сервер баз данных, чтобы изменения вступили в силу.
H.3.2. Параметры #
Модуль wal2json предоставляет различные параметры для управления логическим декодированием:
include-xids
Добавить идентификатор транзакции к каждому набору изменений в JSON-выводе. Значение по умолчанию —
off
.include-timestamp
Добавить
timestamp
к каждому набору изменений в JSON-выводе. Значение по умолчанию —off
.include-schemas
Добавить имя схемы к каждой записи об изменениях в JSON-выводе. Значение по умолчанию —
on
.include-types
Добавить
type
к каждой записи об изменениях в JSON-выводе. Значение по умолчанию —on
.include-typmod
Добавить модификатор типа для столбцов, у которых они есть. Значение по умолчанию —
on
.include-type-oids
Добавить OID типа. Значение по умолчанию — off.
include-domain-data-type
Заменить доменное имя нижележащим типом данных. Значение по умолчанию —
off
.include-column-positions
Добавить позицию столбца (
pg_attribute.attnum
). Значение по умолчанию —off
.include-origin
Добавить источник каждого изменения. Значение по умолчанию —
off
.include-not-null
Добавить информацию, помечен ли столбец как
not null
в полеcolumnoptionals
JSON-вывода. Значение по умолчанию —off
.include-default
Добавить выражения по умолчанию в JSON-вывод. Значение по умолчанию —
off
.include-pk
Добавить информацию о первичном ключе, включая имена столбцов и типы данных, в поле
pk
JSON-вывода. Значение по умолчанию —off
.numeric-data-types-as-string
Преобразовать значения числовых типов данных в строки в JSON-выводе. Спецификация JSON не поддерживает
Infinity
иNaN
как допустимые числовые значения. Для чисел двойной точности могут возникнуть потенциальные проблемы совместимости . Значение по умолчанию —off
.pretty-print
Добавить пробелы и отступы в структуру JSON-вывода. Значение по умолчанию —
off
.write-in-chunks
Включить запись JSON-вывода небольшими фрагментами вместо целого набора изменений. Этот параметр используется только когда для
format-version
задано значение1
. Значение по умолчанию —off
.include-lsn
Добавить поле
nextlsn
к каждому набору изменений. Значение по умолчанию —off
.include-transaction
Добавить записи о начале и конце каждой транзакции в JSON-вывод. Значение по умолчанию —
off
.filter-origins
Исключить изменения из указанных источников. По умолчанию этот параметр пуст, что означает, что источники не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.
filter-tables
Исключить строки из указанных таблиц. По умолчанию этот параметр пуст, что означает, что таблицы не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений. Имена таблиц должны быть дополнены схемой. Имя вида
*.foo
означает таблицуfoo
во всех схемах, а имя видаbar.*
означает все таблицы в схемеbar
. Специальные символы (пробел, одинарная кавычка, запятая, точка, звёздочка) должны быть экранированы обратной косой чертой. Схема и таблица чувствительны к регистру. Например, таблица"public"."Foo bar"
должна быть указана какpublic.Foo\ bar
.add-tables
Включить строки только из указанных таблиц. По умолчанию этот параметр пуст, что означает, что включены все таблицы из всех схем. Для этого параметра действуют те же правила, что и для
filter-tables
.filter-msg-prefixes
Исключить сообщения, чей префикс указан в значении параметра. По умолчанию этот параметр пуст, что означает, что сообщения не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.
add-msg-prefixes
Включить только те сообщения, чей префикс указан в значении параметра. По умолчанию используются все префиксы. Задаётся в виде разделённого запятыми списка значений.
wal2json
применяетfilter-msg-prefixes
до этого параметра.format-version
Определить, какой формат вывода использовать. Значение по умолчанию —
1
.actions
Определить, какие операции будут включены в JSON-вывод. По умолчанию используются все действия (
INSERT
,UPDATE
,DELETE
иTRUNCATE
). Однако, если используетсяformat-version = 1
, операцияTRUNCATE
не включается (в целях обратной совместимости).
H.3.3. Примеры #
Существует два способа получить изменения (JSON-объекты) в wal2json: через функции, вызываемые в SQL или с помощью pg_recvlogical.
H.3.3.1. pg_recvlogical #
Ниже представлен пример, как получить JSON-объекты в wal2json с помощью pg_recvlogical. Помимо вышеуказанной настройки необходимо настроить подключение для репликации, чтобы использовать pg_recvlogical. Начиная с Postgres Pro версии 10, логическая репликация сопоставляет обычные записи с именем базы данных или ключевыми словами, такими как all
.
Чтобы настроить подключение для репликации и параметры базы данных:
Добавьте правило подключения для репликации в файле
pg_hba.conf
:local mydatabase myuser trust
Дополнительно можно задать
max_wal_senders
в файлеpostgresql.conf
:max_wal_senders = 1
Перезапустите сервер баз данных, если изменили
max_wal_senders
.
Чтобы получить JSON-объекты в wal2json:
Откройте терминал и подключитесь к базе данных:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
В другом терминале:
$ cat /tmp/example1.sql CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); BEGIN; INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table1_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table1_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir'); -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; DROP TABLE table1_with_pk; DROP TABLE table1_without_pk; $ psql -At -f /tmp/example1.sql postgres CREATE TABLE CREATE TABLE BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78BFC828 3/78BFC880 DELETE 2 3/78BFC990 INSERT 0 1 UPDATE 1 COMMIT DROP TABLE DROP TABLE
Вывод в первом терминале может выглядеть так:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLE
Чтобы удалить слот в первом терминале:
Ctrl+C $ pg_recvlogical -d postgres --slot test_slot --drop-slot
H.3.3.2. Вызов SQL-функций #
Ниже представлены примеры, как получить изменения в wal2json с помощью SQL.
Если для format-version
задано значение 1
, скрипт может выглядеть так:
$ cat /tmp/example2.sql CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table2_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table2_with_pk; DROP TABLE table2_without_pk;
Ожидаемый вывод может выглядеть так:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLE
Если для format-version
задано значение 2
, скрипт может выглядеть так:
$ cat /tmp/example3.sql CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); BEGIN; INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table3_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table3_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir'); -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json'); SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); DROP TABLE table3_with_pk; DROP TABLE table3_without_pk;
Ожидаемый вывод может выглядеть так:
$ psql -At -f /tmp/example3.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78CB8F30 3/78CB8F88 DELETE 2 3/78CB90B8 INSERT 0 1 UPDATE 1 COMMIT psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk" {"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"} {"action":"B"} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]} {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]} {"action":"C"} stop DROP TABLE DROP TABLE