F.48. pgpro_queue — управление очередями сообщений #
Расширение pgpro_queue включено в состав Postgres Pro Enterprise как стандартное расширение для управления очередями сообщений. Оно обеспечивает надёжное взаимодействие между приложениями с использованием очередей в базе данных.
pgpro_queue позволяет управлять очередями сообщений непосредственно в базе данных. С помощью pgpro_queue можно создавать очереди, добавлять в них сообщения и эффективно обрабатывать их, обеспечивая целостность и надёжность данных. Сообщение из очереди обрабатывается одним из потребителей. Полученное из очереди сообщение становится недоступным для других потребителей.
Благодаря тому, что pgpro_queue обеспечивает интегрированное в базу данных управление очередями сообщений, стандартные возможности, такие как восстановление транзакций, перезапуск после сбоя и синхронизация с резервным сервером, полностью поддерживаются.
Сообщения в очереди обрабатываются на основе назначенных приоритетов, что позволяет обрабатывать сообщения с более высоким приоритетом раньше остальных, даже если они поступили позже.
Диаграмма ниже демонстрирует практический пример запроса на создание файла в формате PDF. Вместо немедленной обработки такие запросы могут быть отправлены в очередь, что позволяет эффективно справляться с большим объёмом задач, улучшать пользовательский опыт благодаря асинхронной обработке и обеспечивать большую гибкость за счёт раздельной архитектуры.
Рисунок F.1. Пример создания PDF-файла
F.48.1. Установка и удаление #
Расширение pgpro_queue включено в состав Postgres Pro Enterprise. Установив Postgres Pro Enterprise, выполните следующие действия, чтобы подготовить pgpro_queue:
Добавьте
pgpro_queue
в параметр shared_preload_libraries в файлеpostgresql.conf
:shared_preload_libraries = 'pgpro_queue'
Создайте расширение
pgpro_queue
, выполнив следующий запрос:CREATE EXTENSION pgpro_queue;
Будет создан набор API-функций в текущей схеме.
Инициализируйте расширение:
SELECT pgpro_queue_initialize();
Эта функция создаёт выделенную схему с именем
pgpro_queue_data
, в которой хранятся все служебные объекты, такие как таблицы метаданных, представления и таблицы для очередей. При инициализации объекты очереди отделяются от самого расширения, что обеспечивает корректную репликацию и выгрузку базы данных.
Чтобы удалить все объекты, созданные расширением, выполните следующую команду (доступно только суперпользователям):
SELECT pgpro_queue_deinitialize();
Эта функция выполняет каскадное удаление схемы pgpro_queue_data
.
F.48.2. Справка #
F.48.2.1. Параметры конфигурации #
pgpro_queue.launcher_database
(text
) #Указывает имя базы данных, к которой подключается процесс запуска. Несуществующие базы данных и пустые строки не принимаются. Значение по умолчанию —
postgres
. Этот параметр можно установить только при запуске сервера.pgpro_queue.database_with_managed_retry
(text
) #Указывает список имён баз данных, разделённых запятыми, для которых включена логика управления повторными попытками. Если имя базы данных отсутствует в списке, указано неправильно или используется недопустимый разделитель, все сообщения, неудачно обработанные в рамках транзакции (то есть отправленные на повторную попытку выполнения при откате), будут удалены, независимо от параметров очереди или сообщения. Значение по умолчанию —
postgres
. Этот параметр можно установить как при запуске сервера, так и при перезагрузке конфигурации.pgpro_queue.shared_retry_list_size
(integer
) #Указывает размер хеш-таблицы в общей памяти для хранения списка повторных запросов (Shared Retry Pending List). Этот параметр в сущности определяет максимальное количество запросов на повторную попытку, которые процессы пользователя могут одновременно ставить в очередь для обработки процессом запуска. Значение по умолчанию — 10000.
F.48.2.2. Функции #
F.48.2.2.1. Процедуры управления очередями #
pgpro_queue предоставляет следующие процедуры для управления очередями.
-
CREATE PROCEDURE CREATE_QUEUE( IN q_name name, IN q_type char DEFAULT 'N'::char, IN q_dlq name DEFAULT null, IN q_retries int DEFAULT 10, IN q_retrydelay int DEFAULT 30 )
# Создаёт очередь в существующей таблице очереди. Владелец очереди может ограничить доступ других пользователей к очереди, отказав им в доступе к таблице очереди на уровне списка управления доступом. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию.
q_name
: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.q_type
: Тип новой очереди. Допустимые значения:N
для обычной очереди (по умолчанию) иD
для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)q_dlq
: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)q_retries
: Максимальное количество повторных попыток. Значение по умолчанию — 10.q_retrydelay
: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK
. Укажите 0, чтобы повторить обработку сообщения немедленно.
-
CREATE PROCEDURE ALTER_QUEUE( IN q_name name, IN new_type char DEFAULT null, IN new_dlq name DEFAULT null, IN new_retries int DEFAULT null, IN new_retrydelay int DEFAULT null )
# Изменяет параметры существующей очереди. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию. Это изменение не повлияет на уже существующие сообщения. Изменять параметры очереди может только создавший её владелец.
q_name
: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.new_type
: Новый тип очереди. Допустимые значения:N
для обычной очереди (по умолчанию) иD
для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)new_dlq
: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)new_retries
: Максимальное количество повторных попыток.new_retrydelay
: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK
. Укажите 0, чтобы повторить обработку сообщения немедленно.
-
CREATE PROCEDURE DROP_QUEUE(IN q_name name)
# Удаляет очередь с указанным именем.
-
CREATE FUNCTION GET_QUEUE_TABLE(IN q_name name)
# Возвращает OID таблицы очереди, используемой для очереди.
F.48.2.2.2. Процедуры для вставки сообщений #
pgpro_queue предоставляет следующие процедуры для отправки сообщений в очередь. Не рекомендуется совмещать строки сообщений в форматах XML и JSONB в одной и той же очереди.
-
CREATE PROCEDURE INSERT_MESSAGE( IN q_name name, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE INSERT_MESSAGE_XML( IN q_name name, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
# Вставляет в очередь сообщение в формате JSON/XML.
q_name
: Имя очереди.q_msg_body
: Строка сообщения (JSONB или XML).q_msg_priority
: Приоритет сообщения.q_msg_properties
: Свойств по умолчанию нет.q_msg_retries
: Максимальное количество повторных попыток. Если не указано, используется значение по умолчанию из очереди.q_msg_retrydelay
: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK
. Если не указано, используется значение по умолчанию из очереди.q_msg_enable_time
: Время задержки сообщения.
F.48.2.2.3. Функции для чтения сообщений #
pgpro_queue предоставляет следующие функции для чтения сообщений из очереди.
-
CREATE FUNCTION READ_MESSAGE( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_XML( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_ANY( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null, OUT q_jsonb jsonb, OUT q_xml xml ) RETURNS SETOF RECORD
# Извлекает сообщение из очереди в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.
q_name
: Имя очереди.q_msg_hfilter
: Фильтр заголовков, применяемый дополнительно к стандартному фильтру.q_msg_pfilter
: Фильтр свойств, применяемый дополнительно к стандартному фильтру.q_jsonb
: Возвращаемое значение тела сообщения в формате JSON.q_xml
: Возвращаемое значение тела сообщения в формате XML.
-
CREATE FUNCTION READ_MESSAGE_BY_ID( IN q_name name, IN msgid bigint ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML( IN q_name name, IN msgid bigint ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY( IN q_name name, IN msgid bigint, OUT body jsonb, OUT body_xml xml ) RETURNS SETOF RECORD
# Извлекает конкретное сообщение из очереди по его ID в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.
q_name
: Имя очереди.msgid
: ID сообщения.body
: Возвращаемое значение тела сообщения в формате JSON.body_xml
: Возвращаемое значение тела сообщения в формате XML.
F.48.3. Авторы #
Postgres Professional, Москва, Россия