F.53. pgpro_queue — управление очередями сообщений #
Расширение pgpro_queue включено в состав Postgres Pro Enterprise как стандартное расширение для управления очередями сообщений. Оно обеспечивает надёжное взаимодействие между приложениями с использованием очередей в базе данных.
pgpro_queue позволяет управлять очередями сообщений непосредственно в базе данных. С помощью pgpro_queue можно создавать очереди, добавлять в них сообщения и эффективно обрабатывать их, обеспечивая целостность и надёжность данных. Сообщение из очереди обрабатывается одним из потребителей. Полученное из очереди сообщение становится недоступным для других потребителей.
pgpro_queue также поддерживает управление очередями сообщений для подтранзакций и автономных транзакций, даже если они имеют несколько уровней вложенности.
Благодаря тому, что pgpro_queue обеспечивает интегрированное в базу данных управление очередями сообщений, стандартные возможности, такие как восстановление транзакций, перезапуск после сбоя и синхронизация с резервным сервером, полностью поддерживаются.
Сообщения в очереди обрабатываются на основе назначенных приоритетов, что позволяет обрабатывать сообщения с более высоким приоритетом раньше остальных, даже если они поступили позже.
Примечание
Расширение pgpro_queue может быть использовано только для транзакций, запущенных на уровне изоляции READ COMMITTED.
Диаграмма ниже демонстрирует практический пример запроса на создание файла в формате PDF. Вместо немедленной обработки такие запросы могут быть отправлены в очередь, что позволяет эффективно справляться с большим объёмом задач, улучшать пользовательский опыт благодаря асинхронной обработке и обеспечивать большую гибкость за счёт раздельной архитектуры.
Рисунок F.1. Пример создания PDF-файла
Помимо возможностей базового управления очередями сообщений, описанных выше, расширение pgpro_queue также использует точки обмена, чтобы обеспечить эффективное распределение сообщений по модели «издатель-подписчик». Согласно данной модели сообщения сначала публикуются в точке обмена — компоненте, который принимает сообщения, а затем направляет их в одну или несколько очередей. Маршрутизация определяется следующими элементами:
привязки: связи между точкой обмена и очередью
тип точки обмена: алгоритм маршрутизации
ключи маршрутизации: атрибуты сообщения, которые определяют адресата или категорию
Этот метод позволяет отправлять сообщения нескольким получателям одновременно вместо отправки сообщений напрямую в конкретные очереди, предназначенные для отдельных получателей. Он также позволяет получателям выборочно получать сообщения из определённых очередей путём создания подписок.
F.53.1. Установка и удаление #
Расширение pgpro_queue включено в состав Postgres Pro Enterprise. Установив Postgres Pro Enterprise, выполните следующие действия, чтобы подготовить pgpro_queue:
Добавьте
pgpro_queueв параметр shared_preload_libraries в файлеpostgresql.conf:shared_preload_libraries = 'pgpro_queue'
Создайте расширение
pgpro_queue, выполнив следующий запрос:CREATE EXTENSION pgpro_queue;
Будет создан набор API-функций в текущей схеме.
Инициализируйте расширение:
SELECT queue_initialize();
Эта функция создаёт выделенную схему с именем
pgpro_queue_data, в которой хранятся все служебные объекты, такие как таблицы метаданных, представления и таблицы для очередей. При инициализации объекты очереди отделяются от самого расширения, что обеспечивает корректную репликацию и выгрузку базы данных.
Чтобы удалить все объекты, созданные расширением, выполните следующую команду (доступно только суперпользователям):
SELECT queue_deinitialize();
Эта функция выполняет каскадное удаление схемы pgpro_queue_data.
F.53.2. Особенности для подготовленных транзакций #
При использовании pgpro_queue для подготовленных транзакций обратите внимание на следующие особенности:
Если транзакция, которая вызывает функцию
READ_MESSAGE(), подготавливается для двухфазной фиксации с использованиемPREPARE TRANSACTION, соответствующее сообщение блокируется в таблице очереди, пока вы не выполните командуCOMMIT PREPAREDилиROLLBACK PREPARED. Другими словами, сообщение будет показываться в таблице очереди, но не будет доступно для дальнейших операций чтения.Для pgpro_queue команда
COMMIT PREPAREDработает так же, как и обычная командаCOMMIT. Если сообщение было успешно прочитано в подготовленной транзакции, то это сообщение удаляется из таблицы очереди после выполнения командыCOMMIT PREPARED.В отличие от команды
ROLLBACK, командаROLLBACK PREPAREDне включает логику управления повторными попытками обработки сообщения, прочитанного в подготовленной транзакции. Вместо этогоROLLBACK PREPAREDтолько разблокирует сообщение в таблице очереди.
F.53.3. Справка #
F.53.3.1. Параметры конфигурации #
pgpro_queue.database_with_managed_retry(text) #Указывает список имён баз данных, разделённых запятыми, для которых включена логика управления повторными попытками. Если имя базы данных отсутствует в списке, указано неправильно или используется недопустимый разделитель, все сообщения, неудачно обработанные в рамках транзакции (то есть отправленные на повторную попытку выполнения при откате), будут удалены, независимо от параметров очереди или сообщения. Будьте осторожны, когда задаёте этот параметр, потому что некорректное изменение может вызвать потерю сообщений из очереди повторных запросов. Значение по умолчанию —
postgres. Этот параметр можно установить как при запуске сервера, так и при перезагрузке конфигурации.pgpro_queue.shared_retry_list_size(integer) #Указывает размер хеш-таблицы в общей памяти для хранения списка повторных запросов (Shared Retry Pending List). Этот параметр в сущности определяет максимальное количество запросов на повторную попытку, которые процессы пользователя могут одновременно ставить в очередь для обработки процессом запуска. Значение по умолчанию — 10000.
F.53.3.2. Функции #
F.53.3.2.1. Процедуры управления очередями #
pgpro_queue предоставляет следующие процедуры для управления очередями, которые может использовать только суперпользователь, создавший расширение pgpro_queue.
-
CREATE PROCEDURE CREATE_QUEUE( IN queue_name name, IN queue_type char DEFAULT 'N'::char, IN queue_dlq name DEFAULT null, IN queue_retries int DEFAULT 10, IN queue_retry_delay int DEFAULT 30 )# Создаёт очередь в существующей таблице очереди. Владелец очереди может ограничить доступ других пользователей к очереди, отказав им в доступе к таблице очереди на уровне списка управления доступом или выполнив процедуры
reset_queue_accessиrevoke_queue_access. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию.queue_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания, и его длина не должна превышать 54 символа.queue_type: Тип новой очереди. Допустимые значения:Nдля обычной очереди (по умолчанию) иDдля очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)queue_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)queue_retries: Максимальное количество повторных попыток. Значение по умолчанию — 10.queue_retry_delay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.
-
CREATE PROCEDURE ALTER_QUEUE( IN queue_name name, IN new_type char DEFAULT null, IN new_dlq name DEFAULT null, IN new_retries int DEFAULT null, IN new_retrydelay int DEFAULT null )# Изменяет параметры существующей очереди. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию. Это изменение не повлияет на уже существующие сообщения. Изменять параметры очереди может только создавший её владелец.
queue_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.new_type: Новый тип очереди. Допустимые значения:Nдля обычной очереди (по умолчанию) иDдля очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)new_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)new_retries: Максимальное количество повторных попыток.new_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.
-
CREATE PROCEDURE DROP_QUEUE(IN queue_name name)# Удаляет очередь с указанным именем.
-
CREATE FUNCTION GET_QUEUE_TABLE(IN queue_name name)# Возвращает OID таблицы очереди, используемой для очереди.
-
CREATE PROCEDURE RESET_QUEUE_ACCESS(IN queue_name name)# Ограничивает доступ к очереди для всех пользователей, кроме владельца очереди. Эту процедуру может выполнить только владелец очереди. Если суперпользователь, который создал расширение, удалён, процедура не выполняется.
-
CREATE PROCEDURE GRANT_QUEUE_ACCESS( IN queue_name name, IN role_name name )
CREATE PROCEDURE GRANT_QUEUE_ACCESS( IN queue_name name, IN role_oid oid )# Предоставляет указанной роли доступ к очереди.
queue_name: Имя очереди.role_name: Имя роли.role_oid: OID роли.
-
CREATE PROCEDURE REVOKE_QUEUE_ACCESS( IN queue_name name, IN role_name name )
CREATE PROCEDURE REVOKE_QUEUE_ACCESS( IN queue_name name, IN role_oid oid )# Отзывает право доступа к очереди у указанной роли.
F.53.3.2.2. Процедуры для вставки сообщений #
pgpro_queue предоставляет следующие процедуры для отправки сообщений в очередь. Не рекомендуется совмещать строки сообщений в форматах XML и JSONB в одной и той же очереди.
-
CREATE PROCEDURE INSERT_MESSAGE( IN q_name name, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE INSERT_MESSAGE_XML( IN q_name name, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )# Вставляет в очередь сообщение в формате JSON/XML.
q_name: Имя очереди.q_msg_body: Строка сообщения (JSONB или XML).q_msg_priority: Приоритет сообщения. Чем ниже значение, тем выше приоритет. Значение по умолчанию — 0, что означает высший приоритет.q_msg_properties: Свойств по умолчанию нет.q_msg_retries: Максимальное количество повторных попыток. Если не указано, используется значение по умолчанию из очереди.q_msg_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK. Если не указано, используется значение по умолчанию из очереди.q_msg_enable_time: Время задержки сообщения.
F.53.3.2.3. Функции для чтения сообщений #
pgpro_queue предоставляет следующие функции для чтения сообщений из очереди.
-
CREATE FUNCTION READ_MESSAGE( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS jsonb
CREATE FUNCTION READ_MESSAGE_XML( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS xml
CREATE FUNCTION READ_MESSAGE_ANY( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS RECORD# Извлекает сообщение из очереди в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.
q_name: Имя очереди.q_msg_hfilter: Фильтр заголовков, применяемый дополнительно к стандартному фильтру.q_msg_pfilter: Фильтр свойств, применяемый дополнительно к стандартному фильтру.q_jsonb: Возвращаемое значение тела сообщения в формате JSON.q_xml: Возвращаемое значение тела сообщения в формате XML.
-
CREATE FUNCTION READ_MESSAGE_BY_ID( IN q_name name, IN msgid bigint ) RETURNS jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML( IN q_name name, IN msgid bigint ) RETURNS xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY( IN q_name name, IN msgid bigint ) RETURNS RECORD# Извлекает конкретное сообщение из очереди по его ID в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.
q_name: Имя очереди.msgid: ID сообщения.body: Возвращаемое значение тела сообщения в формате JSON.body_xml: Возвращаемое значение тела сообщения в формате XML.
F.53.3.2.4. Функции для управления точками обмена #
pgpro_queue предоставляет следующие функции и процедуры для управления точками обмена.
-
CREATE FUNCTION CREATE_EXCHANGE( IN e_name name, IN e_type text )# Создаёт точку обмена с указанным именем и типом. Владелец точки обмена может ограничить доступ других пользователей к ней с помощью процедуры
revoke_publish_access.e_name: Имя точки обмена, которое может содержать только буквы, цифры и подчёркивания, его длина не должна превышать 63 символа.e_type: тип новой точки обмена. Возможные значения:fanout(по умолчанию): направляет сообщения во все привязанные очереди.direct: направляет сообщения в очереди по ключу маршрутизации.
-
CREATE FUNCTION DROP_EXCHANGE( IN e_name name, IN do_cascade bool )# Удаляет указанную точку обмена.
e_name: Имя точки обмена.do_cascade: Если установлено значениеtrue, удаляются также все привязанные очереди. Если установлено значениеfalse, функция завершится ошибкой, если к точке обмена привязаны какие-либо очереди.
-
CREATE FUNCTION BIND_QUEUE( IN q_name name, IN e_name name, IN e_key text )# Привязывает очередь к точке обмена. Для привязки к точкам обмена типа
directиспользуются ключи маршрутизации. Одну очередь можно привязать к нескольким точкам обмена.Примечание
Если очередь удаляется функцией DROP_QUEUE(), автоматически удаляются и все её привязки к точкам обмена.
q_name: Имя очереди.e_name: Имя точки обмена.e_key: Ключ маршрутизации для привязки. Значение NULL (по умолчанию) используется только для точек обмена типаfanout.
-
CREATE FUNCTION UNBIND_QUEUE( IN q_name name, IN e_name name, IN e_key text )# Удаляет привязку между очередью и точкой обмена. Если указан ключ маршрутизации, из привязки удаляется только этот ключ. При удалении последнего ключа удаляется привязка очереди.
q_name: Имя очереди.e_name: Имя точки обмена.e_key: Ключ маршрутизации, который необходимо удалить. Если задано значениеNULL(по умолчанию), привязка очереди удаляется полностью.
-
CREATE PROCEDURE GRANT_PUBLISH_ACCESS( IN exchange_name name, IN role_name name )# Предоставляет указанной роли права
INSERT,UPDATEиDELETEна точку обмена и таблицу привязок.exchange_name: Имя точки обмена.role_name: Имя роли.
-
CREATE PROCEDURE REVOKE_PUBLISH_ACCESS( IN exchange_name name, IN role_name name )# Отзывает у указанной роли все права на точку обмена и таблицу привязок.
exchange_name: Имя точки обмена.role_name: Имя роли.
F.53.3.2.5. Процедуры для публикации сообщений в точках обмена #
pgpro_queue предоставляет следующие процедуры для публикации сообщений в точках обмена. Не рекомендуется совмещать строки сообщений в форматах XML и JSONB в одной и той же очереди.
-
CREATE PROCEDURE PUBLISH_MESSAGE( IN e_name name, IN e_routing_key text, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE PUBLISH_MESSAGE_XML( IN e_name name, IN e_routing_key text, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )# Публикует в точке обмена сообщение в формате JSON/XML.
e_name: Имя точки обмена.e_routing_key: Ключ маршрутизации.q_msg_body: Строка сообщения (JSONB или XML).q_msg_priority: Приоритет сообщения. Чем ниже значение, тем выше приоритет. Значение по умолчанию — 0, что означает высший приоритет.q_msg_properties: Свойства сообщения, например, название кодировки или информация по маршрутизации на уровне приложения. По умолчанию никакие свойства не заданы.q_msg_retries: Максимальное количество повторных попыток. Если не указано, используется значение по умолчанию из очереди.q_msg_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполненияROLLBACK. Если не указано, используется значение по умолчанию из очереди.q_msg_enable_time: Время задержки сообщения.
F.53.4. Авторы #
Postgres Professional, Москва, Россия