F.48. pgpro_queue — управление очередями сообщений #

Расширение pgpro_queue включено в состав Postgres Pro Enterprise как стандартное расширение для управления очередями сообщений. Оно обеспечивает надёжное взаимодействие между приложениями с использованием очередей в базе данных.

pgpro_queue позволяет управлять очередями сообщений непосредственно в базе данных. С помощью pgpro_queue можно создавать очереди, добавлять в них сообщения и эффективно обрабатывать их, обеспечивая целостность и надёжность данных. Сообщение из очереди обрабатывается одним из потребителей. Полученное из очереди сообщение становится недоступным для других потребителей.

Благодаря тому, что pgpro_queue обеспечивает интегрированное в базу данных управление очередями сообщений, стандартные возможности, такие как восстановление транзакций, перезапуск после сбоя и синхронизация с резервным сервером, полностью поддерживаются.

Сообщения в очереди обрабатываются на основе назначенных приоритетов, что позволяет обрабатывать сообщения с более высоким приоритетом раньше остальных, даже если они поступили позже.

Диаграмма ниже демонстрирует практический пример запроса на создание файла в формате PDF. Вместо немедленной обработки такие запросы могут быть отправлены в очередь, что позволяет эффективно справляться с большим объёмом задач, улучшать пользовательский опыт благодаря асинхронной обработке и обеспечивать большую гибкость за счёт раздельной архитектуры.

Рисунок F.1. Пример создания PDF-файла


F.48.1. Установка и удаление #

Расширение pgpro_queue включено в состав Postgres Pro Enterprise. Установив Postgres Pro Enterprise, выполните следующие действия, чтобы подготовить pgpro_queue:

  1. Добавьте pgpro_queue в параметр shared_preload_libraries в файле postgresql.conf:

    shared_preload_libraries = 'pgpro_queue'
  2. Создайте расширение pgpro_queue, выполнив следующий запрос:

    CREATE EXTENSION pgpro_queue;

    Будет создан набор API-функций в текущей схеме.

  3. Инициализируйте расширение:

    SELECT pgpro_queue_initialize();

    Эта функция создаёт выделенную схему с именем pgpro_queue_data, в которой хранятся все служебные объекты, такие как таблицы метаданных, представления и таблицы для очередей. При инициализации объекты очереди отделяются от самого расширения, что обеспечивает корректную репликацию и выгрузку базы данных.

Чтобы удалить все объекты, созданные расширением, выполните следующую команду (доступно только суперпользователям):

SELECT pgpro_queue_deinitialize();

Эта функция выполняет каскадное удаление схемы pgpro_queue_data.

F.48.2. Справка #

F.48.2.1. Параметры конфигурации #

pgpro_queue.launcher_database (text) #

Указывает имя базы данных, к которой подключается процесс запуска. Несуществующие базы данных и пустые строки не принимаются. Значение по умолчанию — postgres. Этот параметр можно установить только при запуске сервера.

pgpro_queue.database_with_managed_retry (text) #

Указывает список имён баз данных, разделённых запятыми, для которых включена логика управления повторными попытками. Если имя базы данных отсутствует в списке, указано неправильно или используется недопустимый разделитель, все сообщения, неудачно обработанные в рамках транзакции (то есть отправленные на повторную попытку выполнения при откате), будут удалены, независимо от параметров очереди или сообщения. Значение по умолчанию — postgres. Этот параметр можно установить как при запуске сервера, так и при перезагрузке конфигурации.

pgpro_queue.shared_retry_list_size (integer) #

Указывает размер хеш-таблицы в общей памяти для хранения списка повторных запросов (Shared Retry Pending List). Этот параметр в сущности определяет максимальное количество запросов на повторную попытку, которые процессы пользователя могут одновременно ставить в очередь для обработки процессом запуска. Значение по умолчанию — 10000.

F.48.2.2. Функции #

F.48.2.2.1. Процедуры управления очередями #

pgpro_queue предоставляет следующие процедуры для управления очередями.

CREATE PROCEDURE CREATE_QUEUE( IN q_name name, IN q_type char DEFAULT 'N'::char, IN q_dlq name DEFAULT null, IN q_retries int DEFAULT 10, IN q_retrydelay int DEFAULT 30 ) #

Создаёт очередь в существующей таблице очереди. Владелец очереди может ограничить доступ других пользователей к очереди, отказав им в доступе к таблице очереди на уровне списка управления доступом. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию.

  • q_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.

  • q_type: Тип новой очереди. Допустимые значения: N для обычной очереди (по умолчанию) и D для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)

  • q_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)

  • q_retries: Максимальное количество повторных попыток. Значение по умолчанию — 10.

  • q_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.

CREATE PROCEDURE ALTER_QUEUE( IN q_name name, IN new_type char DEFAULT null, IN new_dlq name DEFAULT null, IN new_retries int DEFAULT null, IN new_retrydelay int DEFAULT null ) #

Изменяет параметры существующей очереди. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию. Это изменение не повлияет на уже существующие сообщения. Изменять параметры очереди может только создавший её владелец.

  • q_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.

  • new_type: Новый тип очереди. Допустимые значения: N для обычной очереди (по умолчанию) и D для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)

  • new_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)

  • new_retries: Максимальное количество повторных попыток.

  • new_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.

CREATE PROCEDURE DROP_QUEUE(IN q_name name) #

Удаляет очередь с указанным именем.

CREATE FUNCTION GET_QUEUE_TABLE(IN q_name name) #

Возвращает OID таблицы очереди, используемой для очереди.

F.48.2.2.2. Процедуры для вставки сообщений #

pgpro_queue предоставляет следующие процедуры для отправки сообщений в очередь. Не рекомендуется совмещать строки сообщений в форматах XML и JSONB в одной и той же очереди.

CREATE PROCEDURE INSERT_MESSAGE( IN q_name name, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE INSERT_MESSAGE_XML( IN q_name name, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null ) #

Вставляет в очередь сообщение в формате JSON/XML.

  • q_name: Имя очереди.

  • q_msg_body: Строка сообщения (JSONB или XML).

  • q_msg_priority: Приоритет сообщения.

  • q_msg_properties: Свойств по умолчанию нет.

  • q_msg_retries: Максимальное количество повторных попыток. Если не указано, используется значение по умолчанию из очереди.

  • q_msg_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Если не указано, используется значение по умолчанию из очереди.

  • q_msg_enable_time: Время задержки сообщения.

F.48.2.2.3. Функции для чтения сообщений #

pgpro_queue предоставляет следующие функции для чтения сообщений из очереди.

CREATE FUNCTION READ_MESSAGE( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_XML( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_ANY( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null, OUT q_jsonb jsonb, OUT q_xml xml ) RETURNS SETOF RECORD #

Извлекает сообщение из очереди в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.

  • q_name: Имя очереди.

  • q_msg_hfilter: Фильтр заголовков, применяемый дополнительно к стандартному фильтру.

  • q_msg_pfilter: Фильтр свойств, применяемый дополнительно к стандартному фильтру.

  • q_jsonb: Возвращаемое значение тела сообщения в формате JSON.

  • q_xml: Возвращаемое значение тела сообщения в формате XML.

CREATE FUNCTION READ_MESSAGE_BY_ID( IN q_name name, IN msgid bigint ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML( IN q_name name, IN msgid bigint ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY( IN q_name name, IN msgid bigint, OUT body jsonb, OUT body_xml xml ) RETURNS SETOF RECORD #

Извлекает конкретное сообщение из очереди по его ID в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.

  • q_name: Имя очереди.

  • msgid: ID сообщения.

  • body: Возвращаемое значение тела сообщения в формате JSON.

  • body_xml: Возвращаемое значение тела сообщения в формате XML.

F.48.3. Авторы #

Postgres Professional, Москва, Россия