49.6. Модули вывода логического декодирования #
Пример модуля вывода можно найти в подкаталоге contrib/test_decoding
в дереве исходного кода Postgres Pro.
49.6.1. Функция инициализации #
Модуль вывода загружается в результате динамической загрузки разделяемой библиотеки (при этом в качестве имени библиотеки задаётся имя модуля). Для нахождения библиотеки применяется обычный путь поиска библиотек. В этой библиотеке должна быть функция _PG_output_plugin_init
, которая показывает, что библиотека на самом деле представляет собой модуль вывода, и устанавливает требуемые обработчики модуля вывода. Этой функции передаётся структура, в которой должны быть заполнены указатели на функции-обработчики отдельных действий.
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
Обработчики begin_cb
, change_cb
и commit_cb
должны устанавливаться обязательно, а startup_cb
, truncate_cb
, message_cb
, filter_by_origin_cb
и shutdown_cb
могут отсутствовать. Если truncate_cb
не установлен, но потребуется декодировать операцию TRUNCATE
, она будет проигнорирована.
Модуль вывода может также определять функции для поддержки потоковой передачи больших транзакций во время их выполнения. Функции stream_start_cb
, stream_stop_cb
, stream_abort_cb
, stream_commit_cb
и stream_change_cb
являются обязательными, а stream_message_cb
и stream_truncate_cb
— необязательными. Функция stream_prepare_cb
также является обязательной, если модуль вывода поддерживает двухфазную фиксацию.
Модуль вывода также может определять функции для поддержки двухфазной фиксации, позволяющие декодировать действия в PREPARE TRANSACTION
. Обработчики begin_prepare_cb
, prepare_cb
, commit_prepared_cb
и rollback_prepared_cb
являются обязательными, а filter_prepare_cb
— нет. Обработчик stream_prepare_cb
также является обязательным, если модуль вывода поддерживает потоковую передачу больших выполняющихся транзакций.
49.6.2. Возможности #
Для декодирования, форматирования и вывода изменений модули вывода могут использовать практически всю обычную инфраструктуру сервера, включая вызов функций вывода типов. К отношениям разрешается доступ только на чтение, если только эти отношения были созданы программой initdb
в схеме pg_catalog
, либо помечены как пользовательские таблицы каталогов командами
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
Обратите внимание, что доступ к пользовательским таблицам каталога или обычным системным таблицам каталога в подключаемых модулях вывода должен осуществляться только с использованием функций сканирования systable_*
. При попытке использовать функции сканирования heap_*
произойдёт ошибка. Кроме того, любые действия, которые требуют присваивания идентификатора транзакции, запрещаются. В частности, к этим действиям относятся операции записи в таблицы, изменения DDL и вызов pg_current_xact_id()
.
49.6.3. Режимы вывода #
Обработчики в модуле вывода могут передавать данные потребителю в практически любых форматах. Для некоторых вариантов использования, например, просмотра изменений через SQL, вывод информации в типах, которые могут содержать произвольные данные (например, bytea
), может быть неудобоваримым. Если модуль вывода выводит только текстовые данные в кодировке сервера, он может объявить это, установив в OutputPluginOptions.output_type
значение OUTPUT_PLUGIN_TEXTUAL_OUTPUT
вместо OUTPUT_PLUGIN_BINARY_OUTPUT
в обработчике запуска. В этом случае все данные должны быть в кодировке сервера, чтобы их можно было передать в значении типа text
. Это контролируется в сборках с включёнными проверочными утверждениями.
49.6.4. Обработчики в модуле вывода #
Модуль вывода уведомляется о происходящих изменениях через различные обработчики, которые он должен установить.
Параллельные транзакции декодируются в порядке фиксации, при этом между обратными вызовами begin
и commit
декодируются только изменения, относящиеся к определённой транзакции. Явно или неявно отменённые транзакции никогда не декодируются. Успешные точки сохранения включаются в содержащую их транзакцию в том порядке, в котором они выполнялись в этой транзакции. Транзакция, подготовленная для двухфазной фиксации командой PREPARE TRANSACTION
, также будет декодирована, если модуль вывода предоставил требуемые обработчики для декодирования. Возможно, что текущая подготовленная декодируемая транзакция будет прервана параллельно выполненной командой ROLLBACK PREPARED
. В этом случае будет прервано и логическое декодирование этой транзакции. Все изменения такой транзакции пропускаются после обнаружения прерывания и вызова обработчика prepare_cb
. Таким образом даже при блокирующем прерывании модулю вывода предоставляется достаточно информации, чтобы он мог правильно обработать ROLLBACK PREPARED
после декодирования.
Примечание
Декодироваться будут только те транзакции, которые уже успешно сброшены на диск. Вследствие этого, COMMIT
может не декодироваться в следующем сразу за ним вызове pg_logical_slot_get_changes()
, когда synchronous_commit
имеет значение off
.
49.6.4.1. Обработчик запуска #
Необязательный обработчик startup_cb
вызывается, когда слот репликации создаётся или через него запрашивается передача изменений, независимо от того, в каком количестве изменения готовы к передаче.
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
Параметр is_init
будет равен true, когда слот репликации создаётся, и false в противном случае. Параметр options
указывает на структуру параметров, которые могут устанавливать модули вывода:
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
В поле output_type
должно быть значение OUTPUT_PLUGIN_TEXTUAL_OUTPUT
или OUTPUT_PLUGIN_BINARY_OUTPUT
. См. также Подраздел 49.6.3. Если поле receive_rewrites
равно true, модуль вывода также будет вызываться для изменений, связанных с перезаписью кучи при определённых операциях DDL. Эти изменения представляют интерес для модулей, осуществляющих репликацию DDL, но для их обработки может потребоваться особый подход.
Обработчик запуска должен проверить параметры, представленные в ctx->output_plugin_options
. Если модулю вывода требуется поддерживать состояние, он может сохранить его в ctx->output_plugin_private
.
49.6.4.2. Обработчик выключения #
Необязательный обработчик shutdown_cb
вызывается, когда ранее активный слот репликации перестаёт использоваться, так что ресурсы, занятые модулем вывода, можно освободить. При этом слот не обязательно удаляется, прекращается только потоковая передача через него.
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
49.6.4.3. Обработчик начала транзакции #
Обязательный обработчик begin_cb
вызывается, когда декодируется начало зафиксированной транзакции. Прерванные транзакции и их содержимое никогда не декодируется.
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
Параметр txn
содержит метаинформацию о транзакции, в частности её идентификатор и время её фиксирования.
49.6.4.4. Обработчик завершения транзакции #
Обязательный обработчик commit_cb
вызывается, когда декодируется фиксирование транзакции. Перед этим обработчиком будет вызываться обработчик change_cb
для всех изменённых строк (если строки были изменены).
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
49.6.4.5. Обработчик изменения #
Обязательный обработчик change_cb
вызывается для каждого отдельного изменения строки в транзакции, производимого командами INSERT
, UPDATE
или DELETE
. Даже если команда изменила несколько строк сразу, этот обработчик будет вызываться для каждой отдельной строки. Обработчик change_cb
может обращаться к системным или пользовательским таблицам каталога, чтобы дополнить выводимую информацию об изменении строки. В случае декодирования подготовленной (но ещё не зафиксированной) транзакции или декодирования незафиксированной транзакции этот обработчик изменений также может выдать ошибку из-за одновременного отката той же самой транзакции. В этом случае логическое декодирование этой прерванной транзакции корректно останавливается.
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
Параметры ctx
и txn
имеют то же содержимое, что и для обработчиков begin_cb
и commit_cb
; дополнительный дескриптор отношения relation
указывает на отношение, к которому принадлежит строка, а структура change
описывает передаваемое изменение строки.
Примечание
В процессе логического декодирования могут быть обработаны изменения только в таблицах, не являющихся нежурналируемыми (см. описание UNLOGGED
) или временными (см. описание TEMPORARY
или TEMP
).
49.6.4.6. Обработчик опустошения #
Необязательный обработчик truncate_cb
вызывается для команды TRUNCATE
.
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Он получает те же параметры, что и change_cb
. Но так как операции TRUNCATE
в таблицах, связанных внешними ключами, должны выполняться одновременно, данный обработчик получает на вход не одно отношение, а массив отношений. За подробностями обратитесь к описанию оператора TRUNCATE.
49.6.4.7. Обработчик фильтрации источника #
Необязательный обработчик filter_by_origin_cb
вызывается, чтобы отметить, интересуют ли модуль вывода изменения, воспроизводимые из указанного источника (origin_id
).
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
В параметре ctx
передаётся та же информация, что и для других обработчиков. Чтобы отметить, что изменения, поступающие из переданного узла, не представляют интереса, модуль должен вернуть true, вследствие чего эти изменения будут фильтроваться; в противном случае он должен вернуть false. Другие обработчики для фильтруемых транзакций и изменений вызываться не будут.
Это полезно при реализации каскадной или разнонаправленной репликации. Фильтрация по источнику в таких конфигурациях позволяет предотвратить передачу взад-вперёд одних и тех же изменений. Хотя информацию об источнике можно также извлечь из транзакций и изменений, фильтрация с помощью этого обработчика гораздо более эффективна.
49.6.4.8. Обработчик произвольных сообщений #
Необязательный обработчик message_cb
вызывается при получении сообщения логического декодирования.
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
Параметр txn
содержит метаинформацию о транзакции, включая время её фиксации и её XID. Заметьте, однако, что в нём может передаваться NULL, когда сообщение нетранзакционное и транзакции, в которой было выдано сообщение, ещё не назначен XID. В параметре lsn
отмечается позиция сообщения в WAL. Параметр transactional
показывает, было ли сообщение передано как транзакционное. Подобно обработчику изменений, в случае декодирования подготовленной (но ещё не зафиксированной) транзакции или декодирования незафиксированной транзакции этот обработчик также может выдать ошибку из-за одновременного отката той же самой транзакции. В этом случае логическое декодирование этой прерванной транзакции корректно останавливается. В параметре prefix
передаётся некоторый префикс (завершающийся нулём), по которому текущий модуль может выделять интересующие его сообщения. И наконец, параметр message
содержит само сообщение размером message_size
байт.
Необходимо дополнительно позаботиться о том, чтобы префикс, определяющий интересующие модуль вывода сообщения, был уникальным. Удачным выбором обычно будет имя расширения или самого модуля вывода.
49.6.4.9. Обработчик фильтра подготовки #
Необязательный обработчик filter_prepare_cb
вызывается, чтобы определить, следует ли декодировать данные, составляющие текущую фиксируемую двухфазную транзакцию, на этапе подготовки или позже — в виде обычной однофазной транзакции на этапе COMMIT PREPARED
. Если декодирование следует пропустить, обработчик должен выдать true
, а иначе — false
. Если обработчик не определён, предполагается false
(т. е. фильтрация отсутствует — все транзакции, использующие двухфазную фиксацию, декодируются также в две фазы).
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
Параметр ctx
имеет то же содержимое, что и в других обработчиках. Параметры xid
и gid
предоставляют два разных способа идентификации транзакции. Последующая команда COMMIT PREPARED
или ROLLBACK PREPARED
передаёт оба идентификатора, предоставляя модулю вывода возможность выбора, какой использовать.
Обработчик может вызываться несколько раз в течение одной декодируемой транзакции и должен выдавать один и тот же постоянный ответ для данной пары xid
и gid
при каждом вызове.
49.6.4.10. Обработчик начала подготовленной транзакции #
Обязательный обработчик begin_prepare_cb
вызывается при декодировании начала подготовленной транзакции. Переданное в параметре txn
поле gid
позволяет обработчику определить, получал ли этот модуль уже данную команду PREPARE
, — если да, он может либо выдать ошибку, либо пропустить оставшиеся изменения транзакции.
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
49.6.4.11. Обработчик подготовки транзакции #
Обязательный обработчик prepare_cb
вызывается при декодировании транзакции, подготовленной для двухфазной фиксации. Перед этим обработчиком будет вызываться обработчик change_cb
для всех изменённых строк (если строки были изменены). В этом обработчике может использоваться поле gid
, являющееся частью параметра txn
.
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
49.6.4.12. Обработчик фиксации подготовленной транзакции #
Обязательный обработчик commit_prepared_cb
вызывается при декодировании команды COMMIT PREPARED
транзакции. В этом обработчике может использоваться поле gid
, являющееся частью параметра txn
.
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
49.6.4.13. Обработчик отмены подготовленной транзакции #
Обязательный обработчик rollback_prepared_cb
вызывается при декодировании команды ROLLBACK PREPARED
транзакции. В этом обработчике может использоваться поле gid
, которое является частью параметра txn
. Проанализировав параметры prepare_end_lsn
и prepare_time
, модуль декодирования может определить, получал ли он PREPARE TRANSACTION
, и если да — выполнить отмену транзакции, а в противном случае пропустить эту операцию. Одного поля gid
для этого недостаточно, так как на нижестоящем узле может быть подготовленная транзакция с таким же строковым идентификатором.
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
49.6.4.14. Обработчик начала потока #
Обязательный обработчик stream_start_cb
вызывается, когда открывается блок передаваемых в потоке изменений выполняющейся транзакции.
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
49.6.4.15. Обработчик остановки потока #
Обязательный обработчик stream_stop_cb
вызывается, когда закрывается блок передаваемых в потоке изменений выполняющейся транзакции.
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
49.6.4.16. Обработчик прерывания потока #
Обязательный обработчик stream_abort_cb
вызывается для прерывания передаваемой в потоке транзакции.
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
49.6.4.17. Обработчик подготовки потока #
Обработчик stream_prepare_cb
вызывается для подготовки ранее переданной в потоке транзакции как части двухфазной фиксации. Этот обработчик требуется, когда плагин вывода поддерживает как потоковую передачу больших текущих транзакций, так и двухфазные фиксации.
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
49.6.4.18. Обработчик фиксации потока #
Обязательный обработчик stream_commit_cb
вызывается для фиксации ранее переданной в потоке транзакции.
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
49.6.4.19. Обработчик изменений в потоке #
Обязательный обработчик stream_change_cb
вызывается, когда передаётся изменение в блоке изменений (границы которого обозначают вызовы stream_start_cb
и stream_stop_cb
). Фактические изменения не отображаются, поскольку транзакция может быть прервана позже, а изменения для прерванных транзакций не декодируются.
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
49.6.4.20. Обработчик сообщения потока #
Необязательный обработчик stream_message_cb
вызывается, когда передаётся произвольное сообщение в блоке изменений (границы которого обозначают вызовы stream_start_cb
и stream_stop_cb
). Содержимое транзакционных сообщений не отображается, поскольку транзакция может быть прервана позже, а изменения для прерванных транзакций не декодируются.
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
49.6.4.21. Обработчик команды опустошения в потоке #
Необязательный обработчик truncate_cb
вызывается для команды TRUNCATE
в блоке изменений (границы которого обозначают вызовы stream_start_cb
и stream_stop_cb
).
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
Параметры этого обработчика аналогичны параметрам change_cb
. Но так как операции TRUNCATE
в таблицах, связанных внешними ключами, должны выполняться одновременно, данный обработчик получает на вход не одно отношение, а массив отношений. За подробностями обратитесь к описанию оператора TRUNCATE.
49.6.5. Функции для формирования вывода #
Чтобы действительно вывести данные, модули вывода могут записывать их в буфер StringInfo
через ctx->out
, внутри обработчиков begin_cb
, commit_cb
или change_cb
. Прежде чем записывать данные в этот буфер, необходимо вызвать OutputPluginPrepareWrite(ctx, last_write)
, а завершив запись в буфер, нужно вызвать OutputPluginWrite(ctx, last_write)
, чтобы собственно произвести запись. Параметр last_write
указывает, была ли эта определённая операция записи последней в данном обработчике.
Следующий пример показывает, как вывести данные для потребителя модуля вывода:
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);