F.51. pgpro_queue — управление очередями сообщений #

Расширение pgpro_queue включено в состав Postgres Pro Enterprise как стандартное расширение для управления очередями сообщений. Оно обеспечивает надёжное взаимодействие между приложениями с использованием очередей в базе данных.

pgpro_queue позволяет управлять очередями сообщений непосредственно в базе данных. С помощью pgpro_queue можно создавать очереди, добавлять в них сообщения и эффективно обрабатывать их, обеспечивая целостность и надёжность данных. Сообщение из очереди обрабатывается одним из потребителей. Полученное из очереди сообщение становится недоступным для других потребителей.

pgpro_queue также поддерживает управление очередями сообщений для подтранзакций и автономных транзакций, даже если они имеют несколько уровней вложенности.

Благодаря тому, что pgpro_queue обеспечивает интегрированное в базу данных управление очередями сообщений, стандартные возможности, такие как восстановление транзакций, перезапуск после сбоя и синхронизация с резервным сервером, полностью поддерживаются.

Сообщения в очереди обрабатываются на основе назначенных приоритетов, что позволяет обрабатывать сообщения с более высоким приоритетом раньше остальных, даже если они поступили позже.

Примечание

Расширение pgpro_queue может быть использовано только для транзакций, запущенных на уровне изоляции READ COMMITTED.

Диаграмма ниже демонстрирует практический пример запроса на создание файла в формате PDF. Вместо немедленной обработки такие запросы могут быть отправлены в очередь, что позволяет эффективно справляться с большим объёмом задач, улучшать пользовательский опыт благодаря асинхронной обработке и обеспечивать большую гибкость за счёт раздельной архитектуры.

Рисунок F.1. Пример создания PDF-файла


F.51.1. Установка и удаление #

Расширение pgpro_queue включено в состав Postgres Pro Enterprise. Установив Postgres Pro Enterprise, выполните следующие действия, чтобы подготовить pgpro_queue:

  1. Добавьте pgpro_queue в параметр shared_preload_libraries в файле postgresql.conf:

    shared_preload_libraries = 'pgpro_queue'
  2. Создайте расширение pgpro_queue, выполнив следующий запрос:

    CREATE EXTENSION pgpro_queue;

    Будет создан набор API-функций в текущей схеме.

  3. Инициализируйте расширение:

     SELECT queue_initialize();

    Эта функция создаёт выделенную схему с именем pgpro_queue_data, в которой хранятся все служебные объекты, такие как таблицы метаданных, представления и таблицы для очередей. При инициализации объекты очереди отделяются от самого расширения, что обеспечивает корректную репликацию и выгрузку базы данных.

Чтобы удалить все объекты, созданные расширением, выполните следующую команду (доступно только суперпользователям):

SELECT queue_deinitialize();

Эта функция выполняет каскадное удаление схемы pgpro_queue_data.

F.51.2. Особенности для подготовленных транзакций #

При использовании pgpro_queue для подготовленных транзакций обратите внимание на следующие особенности:

  • Если транзакция, которая вызывает функцию READ_MESSAGE(), подготавливается для двухфазной фиксации с использованием PREPARE TRANSACTION, соответствующее сообщение блокируется в таблице очереди, пока вы не выполните команду COMMIT PREPARED или ROLLBACK PREPARED. Другими словами, сообщение будет показываться в таблице очереди, но не будет доступно для дальнейших операций чтения.

  • Для pgpro_queue команда COMMIT PREPARED работает так же, как и обычная команда COMMIT. Если сообщение было успешно прочитано в подготовленной транзакции, то это сообщение удаляется из таблицы очереди после выполнения команды COMMIT PREPARED.

  • В отличие от команды ROLLBACK, команда ROLLBACK PREPARED не включает логику управления повторными попытками обработки сообщения, прочитанного в подготовленной транзакции. Вместо этого ROLLBACK PREPARED только разблокирует сообщение в таблице очереди.

F.51.3. Справка #

F.51.3.1. Параметры конфигурации #

pgpro_queue.database_with_managed_retry (text) #

Указывает список имён баз данных, разделённых запятыми, для которых включена логика управления повторными попытками. Если имя базы данных отсутствует в списке, указано неправильно или используется недопустимый разделитель, все сообщения, неудачно обработанные в рамках транзакции (то есть отправленные на повторную попытку выполнения при откате), будут удалены, независимо от параметров очереди или сообщения. Будьте осторожны, когда задаёте этот параметр, потому что некорректное изменение может вызвать потерю сообщений из очереди повторных запросов. Значение по умолчанию — postgres. Этот параметр можно установить как при запуске сервера, так и при перезагрузке конфигурации.

pgpro_queue.shared_retry_list_size (integer) #

Указывает размер хеш-таблицы в общей памяти для хранения списка повторных запросов (Shared Retry Pending List). Этот параметр в сущности определяет максимальное количество запросов на повторную попытку, которые процессы пользователя могут одновременно ставить в очередь для обработки процессом запуска. Значение по умолчанию — 10000.

F.51.3.2. Функции #

F.51.3.2.1. Процедуры управления очередями #

pgpro_queue предоставляет следующие процедуры для управления очередями.

CREATE PROCEDURE CREATE_QUEUE( IN q_name name, IN q_type char DEFAULT 'N'::char, IN q_dlq name DEFAULT null, IN q_retries int DEFAULT 10, IN q_retrydelay int DEFAULT 30 ) #

Создаёт очередь в существующей таблице очереди. Владелец очереди может ограничить доступ других пользователей к очереди, отказав им в доступе к таблице очереди на уровне списка управления доступом. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию.

  • q_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.

  • q_type: Тип новой очереди. Допустимые значения: N для обычной очереди (по умолчанию) и D для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)

  • q_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)

  • q_retries: Максимальное количество повторных попыток. Значение по умолчанию — 10.

  • q_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.

CREATE PROCEDURE ALTER_QUEUE( IN q_name name, IN new_type char DEFAULT null, IN new_dlq name DEFAULT null, IN new_retries int DEFAULT null, IN new_retrydelay int DEFAULT null ) #

Изменяет параметры существующей очереди. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию. Это изменение не повлияет на уже существующие сообщения. Изменять параметры очереди может только создавший её владелец.

  • q_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.

  • new_type: Новый тип очереди. Допустимые значения: N для обычной очереди (по умолчанию) и D для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)

  • new_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)

  • new_retries: Максимальное количество повторных попыток.

  • new_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.

CREATE PROCEDURE DROP_QUEUE(IN q_name name) #

Удаляет очередь с указанным именем.

CREATE FUNCTION GET_QUEUE_TABLE(IN q_name name) #

Возвращает OID таблицы очереди, используемой для очереди.

F.51.3.2.2. Процедуры для вставки сообщений #

pgpro_queue предоставляет следующие процедуры для отправки сообщений в очередь. Не рекомендуется совмещать строки сообщений в форматах XML и JSONB в одной и той же очереди.

CREATE PROCEDURE INSERT_MESSAGE( IN q_name name, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE INSERT_MESSAGE_XML( IN q_name name, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null ) #

Вставляет в очередь сообщение в формате JSON/XML.

  • q_name: Имя очереди.

  • q_msg_body: Строка сообщения (JSONB или XML).

  • q_msg_priority: Приоритет сообщения. Чем ниже значение, тем выше приоритет. Значение по умолчанию — 0, что означает высший приоритет.

  • q_msg_properties: Свойств по умолчанию нет.

  • q_msg_retries: Максимальное количество повторных попыток. Если не указано, используется значение по умолчанию из очереди.

  • q_msg_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Если не указано, используется значение по умолчанию из очереди.

  • q_msg_enable_time: Время задержки сообщения.

F.51.3.2.3. Функции для чтения сообщений #

pgpro_queue предоставляет следующие функции для чтения сообщений из очереди.

CREATE FUNCTION READ_MESSAGE( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_XML( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_ANY( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null, OUT q_jsonb jsonb, OUT q_xml xml ) RETURNS SETOF RECORD #

Извлекает сообщение из очереди в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.

  • q_name: Имя очереди.

  • q_msg_hfilter: Фильтр заголовков, применяемый дополнительно к стандартному фильтру.

  • q_msg_pfilter: Фильтр свойств, применяемый дополнительно к стандартному фильтру.

  • q_jsonb: Возвращаемое значение тела сообщения в формате JSON.

  • q_xml: Возвращаемое значение тела сообщения в формате XML.

CREATE FUNCTION READ_MESSAGE_BY_ID( IN q_name name, IN msgid bigint ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML( IN q_name name, IN msgid bigint ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY( IN q_name name, IN msgid bigint, OUT body jsonb, OUT body_xml xml ) RETURNS SETOF RECORD #

Извлекает конкретное сообщение из очереди по его ID в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.

  • q_name: Имя очереди.

  • msgid: ID сообщения.

  • body: Возвращаемое значение тела сообщения в формате JSON.

  • body_xml: Возвращаемое значение тела сообщения в формате XML.

F.51.4. Авторы #

Postgres Professional, Москва, Россия