H.3. wal2json — преобразование изменений из WAL в JSON с помощью логического декодирования #

Модуль wal2json — это расширение Postgres Pro для логического декодирования, которое преобразует изменения базы данных из журнала предзаписи (WAL) в формат JSON. Postgres Pro имеет доступ к кортежам, сгенерированным командами INSERT и UPDATE. В зависимости от выбора варианта идентификации реплики также могут быть доступны предыдущие версии строк для команд UPDATE и DELETE. Изменения можно получить либо по протоколу потоковой репликации (слоты логической репликации), либо через специальный SQL API.

Формат версии 1 создаёт JSON-объект для каждой транзакции. Этот объект содержит все новые и старые кортежи. Также есть возможность включить в него такие свойства, как метки времени транзакций, имена, дополненные именами схем, типы данных и идентификаторы транзакций.

Формат версии 2 создаёт JSON-объект для каждого кортежа с необязательными JSON-объектами, обозначающими начало и конец транзакции. Также можно включить разные свойства транзакции.

H.3.1. Установка и настройка #

wal2json поставляется вместе с Postgres Pro Standard в виде отдельного пакета wal2json-std-17 (подробные инструкции по установке приведены в Главе 16). После установки Postgres Pro Standard выполните следующие действия, чтобы подготовить wal2json к работе:

  1. Включите логическое декодирование, задав для параметра wal_level значение logical в файле postgresql.conf.

    Дополнительно вы можете задать значения для параметров max_replication_slots и max_wal_senders.

  2. Перезапустите сервер баз данных, чтобы изменения вступили в силу.

H.3.2. Параметры #

Модуль wal2json предоставляет различные параметры для управления логическим декодированием:

include-xids

Добавить идентификатор транзакции к каждому набору изменений в JSON-выводе. Значение по умолчанию — off.

include-timestamp

Добавить timestamp к каждому набору изменений в JSON-выводе. Значение по умолчанию — off.

include-schemas

Добавить имя схемы к каждой записи об изменениях в JSON-выводе. Значение по умолчанию — on.

include-types

Добавить type к каждой записи об изменениях в JSON-выводе. Значение по умолчанию — on.

include-typmod

Добавить модификатор типа для столбцов, у которых они есть. Значение по умолчанию — on.

include-type-oids

Добавить OID типа. Значение по умолчанию — off.

include-domain-data-type

Заменить доменное имя нижележащим типом данных. Значение по умолчанию — off.

include-column-positions

Добавить позицию столбца (pg_attribute.attnum). Значение по умолчанию — off.

include-origin

Добавить источник каждого изменения. Значение по умолчанию — off.

include-not-null

Добавить информацию, помечен ли столбец как not null в поле columnoptionals JSON-вывода. Значение по умолчанию — off.

include-default

Добавить выражения по умолчанию в JSON-вывод. Значение по умолчанию — off.

include-pk

Добавить информацию о первичном ключе, включая имена столбцов и типы данных, в поле pk JSON-вывода. Значение по умолчанию — off.

numeric-data-types-as-string

Преобразовать значения числовых типов данных в строки в JSON-выводе. Спецификация JSON не поддерживает Infinity и NaN как допустимые числовые значения. Для чисел двойной точности могут возникнуть потенциальные проблемы совместимости . Значение по умолчанию — off.

pretty-print

Добавить пробелы и отступы в структуру JSON-вывода. Значение по умолчанию — off.

write-in-chunks

Включить запись JSON-вывода небольшими фрагментами вместо целого набора изменений. Этот параметр используется только когда для format-version задано значение 1. Значение по умолчанию — off.

include-lsn

Добавить поле nextlsn к каждому набору изменений. Значение по умолчанию — off.

include-transaction

Добавить записи о начале и конце каждой транзакции в JSON-вывод. Значение по умолчанию — off.

filter-origins

Исключить изменения из указанных источников. По умолчанию этот параметр пуст, что означает, что источники не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.

filter-tables

Исключить строки из указанных таблиц. По умолчанию этот параметр пуст, что означает, что таблицы не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений. Имена таблиц должны быть дополнены схемой. Имя вида *.foo означает таблицу foo во всех схемах, а имя вида bar.* означает все таблицы в схеме bar. Специальные символы (пробел, одинарная кавычка, запятая, точка, звёздочка) должны быть экранированы обратной косой чертой. Схема и таблица чувствительны к регистру. Например, таблица "public"."Foo bar" должна быть указана как public.Foo\ bar.

add-tables

Включить строки только из указанных таблиц. По умолчанию этот параметр пуст, что означает, что включены все таблицы из всех схем. Для этого параметра действуют те же правила, что и для filter-tables.

filter-msg-prefixes

Исключить сообщения, чей префикс указан в значении параметра. По умолчанию этот параметр пуст, что означает, что сообщения не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.

add-msg-prefixes

Включить только те сообщения, чей префикс указан в значении параметра. По умолчанию используются все префиксы. Задаётся в виде разделённого запятыми списка значений. wal2json применяет filter-msg-prefixes до этого параметра.

format-version

Определить, какой формат вывода использовать. Значение по умолчанию — 1.

actions

Определить, какие операции будут включены в JSON-вывод. По умолчанию используются все действия (INSERT, UPDATE, DELETE и TRUNCATE). Однако, если используется format-version = 1, операция TRUNCATE не включается (в целях обратной совместимости).

H.3.3. Примеры #

Существует два способа получить изменения (JSON-объекты) в wal2json: через функции, вызываемые в SQL или с помощью pg_recvlogical.

H.3.3.1. pg_recvlogical #

Ниже представлен пример, как получить JSON-объекты в wal2json с помощью pg_recvlogical. Помимо вышеуказанной настройки необходимо настроить подключение для репликации, чтобы использовать pg_recvlogical. Начиная с Postgres Pro версии 10, логическая репликация сопоставляет обычные записи с именем базы данных или ключевыми словами, такими как all.

Чтобы настроить подключение для репликации и параметры базы данных:

  1. Добавьте правило подключения для репликации в файле pg_hba.conf:

              local    mydatabase      myuser                     trust
            
  2. Дополнительно можно задать max_wal_senders в файле postgresql.conf:

              max_wal_senders = 1
            
  3. Перезапустите сервер баз данных, если изменили max_wal_senders.

Чтобы получить JSON-объекты в wal2json:

  1. Откройте терминал и подключитесь к базе данных:

             $ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
             $ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
            
  2. В другом терминале:

            $ cat /tmp/example1.sql
            CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
            CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
    
            BEGIN;
            INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());
            INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());
            INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());
            SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
            SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
            DELETE FROM table1_with_pk WHERE a < 3;
            SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
    
            INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');
            -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
            UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';
            COMMIT;
    
            DROP TABLE table1_with_pk;
            DROP TABLE table1_without_pk;
    
            $ psql -At -f /tmp/example1.sql postgres
            CREATE TABLE
            CREATE TABLE
            BEGIN
            INSERT 0 1
            INSERT 0 1
            INSERT 0 1
            3/78BFC828
            3/78BFC880
            DELETE 2
            3/78BFC990
            INSERT 0 1
            UPDATE 1
            COMMIT
            DROP TABLE
            DROP TABLE
           
  3. Вывод в первом терминале может выглядеть так:

              $ psql -At -f /tmp/example2.sql postgres
              CREATE TABLE
              CREATE TABLE
              init
              BEGIN
              INSERT 0 1
              INSERT 0 1
              INSERT 0 1
              3/78C2CA50
              3/78C2CAA8
              DELETE 2
              3/78C2CBD8
              INSERT 0 1
              UPDATE 1
              COMMIT
              {
                  "change": [
                      {
                          "kind": "message",
                          "transactional": false,
                          "prefix": "wal2json",
                          "content": "this non-transactional message will be delivered even if you rollback the transaction"
                      }
                  ]
              }
              psql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing
              {
                "change": [
                  {
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                    "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
                  }
                  ,{
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                    "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
                  }
                  ,{
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
                    "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
                  }
                      ,{
                          "kind": "message",
                          "transactional": true,
                          "prefix": "wal2json",
                          "content": "this message will be delivered"
                      }
                  ,{
                    "kind": "delete",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "oldkeys": {
                      "keynames": ["a", "c"],
                      "keytypes": ["integer", "timestamp without time zone"],
                      "keyvalues": [1, "2018-03-27 12:05:29.914496"]
                    }
                  }
                  ,{
                    "kind": "delete",
                    "schema": "public",
                    "table": "table2_with_pk",
                    "oldkeys": {
                      "keynames": ["a", "c"],
                      "keytypes": ["integer", "timestamp without time zone"],
                      "keyvalues": [2, "2018-03-27 12:05:29.914496"]
                    }
                  }
                  ,{
                    "kind": "insert",
                    "schema": "public",
                    "table": "table2_without_pk",
                    "columnnames": ["a", "b", "c"],
                    "columntypes": ["integer", "numeric(5,2)", "text"],
                    "columnvalues": [1, 2.34, "Tapir"]
                  }
                ]
              }
              stop
              DROP TABLE
              DROP TABLE
            
  4. Чтобы удалить слот в первом терминале:

            Ctrl+C
            $ pg_recvlogical -d postgres --slot test_slot --drop-slot
           

H.3.3.2. Вызов SQL-функций #

Ниже представлены примеры, как получить изменения в wal2json с помощью SQL.

Если для format-version задано значение 1, скрипт может выглядеть так:

        $ cat /tmp/example2.sql
        CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
        CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

        SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

        BEGIN;
        INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
        INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
        INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
        DELETE FROM table2_with_pk WHERE a < 3;
        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

        INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
        -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
        UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
        COMMIT;

        SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
        SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

        DROP TABLE table2_with_pk;
        DROP TABLE table2_without_pk;
       

Ожидаемый вывод может выглядеть так:

        $ psql -At -f /tmp/example2.sql postgres
        CREATE TABLE
        CREATE TABLE
        init
        BEGIN
        INSERT 0 1
        INSERT 0 1
        INSERT 0 1
        3/78C2CA50
        3/78C2CAA8
        DELETE 2
        3/78C2CBD8
        INSERT 0 1
        UPDATE 1
        COMMIT
        {
            "change": [
                {
                    "kind": "message",
                    "transactional": false,
                    "prefix": "wal2json",
                    "content": "this non-transactional message will be delivered even if you rollback the transaction"
                }
            ]
        }
        psql:/tmp/example2.sql:17: WARNING:  table "table2_without_pk" without primary key or replica identity is nothing
        {
          "change": [
            {
              "kind": "insert",
              "schema": "public",
              "table": "table2_with_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
              "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
            }
            ,{
              "kind": "insert",
              "schema": "public",
              "table": "table2_with_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
              "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
            }
            ,{
              "kind": "insert",
              "schema": "public",
              "table": "table2_with_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
              "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
            }
                ,{
                    "kind": "message",
                    "transactional": true,
                    "prefix": "wal2json",
                    "content": "this message will be delivered"
                }
            ,{
              "kind": "delete",
              "schema": "public",
              "table": "table2_with_pk",
              "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [1, "2018-03-27 12:05:29.914496"]
              }
            }
            ,{
              "kind": "delete",
              "schema": "public",
              "table": "table2_with_pk",
              "oldkeys": {
                "keynames": ["a", "c"],
                "keytypes": ["integer", "timestamp without time zone"],
                "keyvalues": [2, "2018-03-27 12:05:29.914496"]
              }
            }
            ,{
              "kind": "insert",
              "schema": "public",
              "table": "table2_without_pk",
              "columnnames": ["a", "b", "c"],
              "columntypes": ["integer", "numeric(5,2)", "text"],
              "columnvalues": [1, 2.34, "Tapir"]
            }
          ]
        }
        stop
        DROP TABLE
        DROP TABLE
       

Если для format-version задано значение 2, скрипт может выглядеть так:

        $ cat /tmp/example3.sql
        CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
        CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);

        SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');

        BEGIN;
        INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
        INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
        INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
        SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
        SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
        DELETE FROM table3_with_pk WHERE a < 3;
        SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');

        INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
        -- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
        UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
        COMMIT;

        SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
        SELECT 'stop' FROM pg_drop_replication_slot('test_slot');

        DROP TABLE table3_with_pk;
        DROP TABLE table3_without_pk;
       

Ожидаемый вывод может выглядеть так:

        $ psql -At -f /tmp/example3.sql postgres
        CREATE TABLE
        CREATE TABLE
        init
        BEGIN
        INSERT 0 1
        INSERT 0 1
        INSERT 0 1
        3/78CB8F30
        3/78CB8F88
        DELETE 2
        3/78CB90B8
        INSERT 0 1
        UPDATE 1
        COMMIT
        psql:/tmp/example3.sql:20: WARNING:  no tuple identifier for UPDATE in table "public"."table3_without_pk"
        {"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
        {"action":"B"}
        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
        {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
        {"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
        {"action":"C"}
        stop
        DROP TABLE
        DROP TABLE