H.5. wal2json — преобразование изменений из WAL в JSON с помощью логического декодирования #
Модуль wal2json — это расширение Postgres Pro для логического декодирования, которое преобразует изменения базы данных из журнала предзаписи (WAL) в формат JSON. Postgres Pro имеет доступ к кортежам, сгенерированным командами INSERT и UPDATE. В зависимости от выбора варианта идентификации реплики также могут быть доступны предыдущие версии строк для команд UPDATE и DELETE. Изменения можно получить либо по протоколу потоковой репликации (слоты логической репликации), либо через специальный SQL API.
Формат версии 1 создаёт JSON-объект для каждой транзакции. Этот объект содержит все новые и старые кортежи. Также есть возможность включить в него такие свойства, как метки времени транзакций, имена, дополненные именами схем, типы данных и идентификаторы транзакций.
Формат версии 2 создаёт JSON-объект для каждого кортежа с необязательными JSON-объектами, обозначающими начало и конец транзакции. Также можно включить разные свойства транзакции.
H.5.1. Установка и настройка #
wal2json поставляется вместе с Postgres Pro Enterprise в виде отдельного пакета wal2json-ent-17 (подробные инструкции по установке приведены в Главе 17). После установки Postgres Pro Enterprise выполните следующие действия, чтобы подготовить wal2json к работе:
Включите логическое декодирование, задав для параметра
wal_levelзначениеlogicalв файлеpostgresql.conf.Дополнительно вы можете задать значения для параметров
max_replication_slotsиmax_wal_senders.Перезапустите сервер баз данных, чтобы изменения вступили в силу.
H.5.2. Параметры #
Модуль wal2json предоставляет различные параметры для управления логическим декодированием:
include-xidsДобавить идентификатор транзакции к каждому набору изменений в JSON-выводе. Значение по умолчанию —
off.include-timestampДобавить
timestampк каждому набору изменений в JSON-выводе. Значение по умолчанию —off.include-schemasДобавить имя схемы к каждой записи об изменениях в JSON-выводе. Значение по умолчанию —
on.include-typesДобавить
typeк каждой записи об изменениях в JSON-выводе. Значение по умолчанию —on.include-typmodДобавить модификатор типа для столбцов, у которых они есть. Значение по умолчанию —
on.include-type-oidsДобавить OID типа. Значение по умолчанию — off.
include-domain-data-typeЗаменить доменное имя нижележащим типом данных. Значение по умолчанию —
off.include-column-positionsДобавить позицию столбца (
pg_attribute.attnum). Значение по умолчанию —off.include-originДобавить источник каждого изменения. Значение по умолчанию —
off.include-not-nullДобавить информацию, помечен ли столбец как
not nullв полеcolumnoptionalsJSON-вывода. Значение по умолчанию —off.include-defaultДобавить выражения по умолчанию в JSON-вывод. Значение по умолчанию —
off.include-pkДобавить информацию о первичном ключе, включая имена столбцов и типы данных, в поле
pkJSON-вывода. Значение по умолчанию —off.numeric-data-types-as-stringПреобразовать значения числовых типов данных в строки в JSON-выводе. Спецификация JSON не поддерживает
InfinityиNaNкак допустимые числовые значения. Для чисел двойной точности могут возникнуть потенциальные проблемы совместимости . Значение по умолчанию —off.pretty-printДобавить пробелы и отступы в структуру JSON-вывода. Значение по умолчанию —
off.write-in-chunksВключить запись JSON-вывода небольшими фрагментами вместо целого набора изменений. Этот параметр используется только когда для
format-versionзадано значение1. Значение по умолчанию —off.include-lsnДобавить поле
nextlsnк каждому набору изменений. Значение по умолчанию —off.include-transactionДобавить записи о начале и конце каждой транзакции в JSON-вывод. Значение по умолчанию —
off.filter-originsИсключить изменения из указанных источников. По умолчанию этот параметр пуст, что означает, что источники не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.
filter-tablesИсключить строки из указанных таблиц. По умолчанию этот параметр пуст, что означает, что таблицы не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений. Имена таблиц должны быть дополнены схемой. Имя вида
*.fooозначает таблицуfooво всех схемах, а имя видаbar.*означает все таблицы в схемеbar. Специальные символы (пробел, одинарная кавычка, запятая, точка, звёздочка) должны быть экранированы обратной косой чертой. Схема и таблица чувствительны к регистру. Например, таблица"public"."Foo bar"должна быть указана какpublic.Foo\ bar.add-tablesВключить строки только из указанных таблиц. По умолчанию этот параметр пуст, что означает, что включены все таблицы из всех схем. Для этого параметра действуют те же правила, что и для
filter-tables.filter-msg-prefixesИсключить сообщения, чей префикс указан в значении параметра. По умолчанию этот параметр пуст, что означает, что сообщения не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.
add-msg-prefixesВключить только те сообщения, чей префикс указан в значении параметра. По умолчанию используются все префиксы. Задаётся в виде разделённого запятыми списка значений.
wal2jsonприменяетfilter-msg-prefixesдо этого параметра.format-versionОпределить, какой формат вывода использовать. Значение по умолчанию —
1.actionsОпределить, какие операции будут включены в JSON-вывод. По умолчанию используются все действия (
INSERT,UPDATE,DELETEиTRUNCATE). Однако, если используетсяformat-version = 1, операцияTRUNCATEне включается (в целях обратной совместимости).
H.5.3. Примеры #
Существует два способа получить изменения (JSON-объекты) в wal2json: через функции, вызываемые в SQL или с помощью pg_recvlogical.
H.5.3.1. pg_recvlogical #
Ниже представлен пример, как получить JSON-объекты в wal2json с помощью pg_recvlogical. Помимо вышеуказанной настройки необходимо настроить подключение для репликации, чтобы использовать pg_recvlogical. Начиная с Postgres Pro версии 10, логическая репликация сопоставляет обычные записи с именем базы данных или ключевыми словами, такими как all.
Чтобы настроить подключение для репликации и параметры базы данных:
Добавьте правило подключения для репликации в файле
pg_hba.conf:local mydatabase myuser trustДополнительно можно задать
max_wal_sendersв файлеpostgresql.conf:max_wal_senders = 1Перезапустите сервер баз данных, если изменили
max_wal_senders.
Чтобы получить JSON-объекты в wal2json:
Откройте терминал и подключитесь к базе данных:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -В другом терминале:
$ cat /tmp/example1.sql CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); BEGIN; INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table1_with_pk (b, c) VALUES('Replication', now()); SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered'); SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered'); DELETE FROM table1_with_pk WHERE a < 3; SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction'); INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir'); -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; DROP TABLE table1_with_pk; DROP TABLE table1_without_pk; $ psql -At -f /tmp/example1.sql postgres CREATE TABLE CREATE TABLE BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78BFC828 3/78BFC880 DELETE 2 3/78BFC990 INSERT 0 1 UPDATE 1 COMMIT DROP TABLE DROP TABLEВывод в первом терминале может выглядеть так:
$ psql -At -f /tmp/example2.sql postgres CREATE TABLE CREATE TABLE init BEGIN INSERT 0 1 INSERT 0 1 INSERT 0 1 3/78C2CA50 3/78C2CAA8 DELETE 2 3/78C2CBD8 INSERT 0 1 UPDATE 1 COMMIT { "change": [ { "kind": "message", "transactional": false, "prefix": "wal2json", "content": "this non-transactional message will be delivered even if you rollback the transaction" } ] } psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing { "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "message", "transactional": true, "prefix": "wal2json", "content": "this message will be delivered" } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] } stop DROP TABLE DROP TABLEЧтобы удалить слот в первом терминале:
Ctrl+C $ pg_recvlogical -d postgres --slot test_slot --drop-slot
H.5.3.2. Вызов SQL-функций #
Ниже представлены примеры, как получить изменения в wal2json с помощью SQL.
Если для format-version задано значение 1, скрипт может выглядеть так:
$ cat /tmp/example2.sql
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table2_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table2_with_pk;
DROP TABLE table2_without_pk;
Ожидаемый вывод может выглядеть так:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
Если для format-version задано значение 2, скрипт может выглядеть так:
$ cat /tmp/example3.sql
CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table3_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
-- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table3_with_pk;
DROP TABLE table3_without_pk;
Ожидаемый вывод может выглядеть так:
$ psql -At -f /tmp/example3.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78CB8F30
3/78CB8F88
DELETE 2
3/78CB90B8
INSERT 0 1
UPDATE 1
COMMIT
psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk"
{"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
{"action":"B"}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
{"action":"C"}
stop
DROP TABLE
DROP TABLE