24.2. Интеграция с Elasticsearch и Elastic APM #

Интеграция с внешними источниками данных Elasticsearch используется для чтения журналов, записываемых туда утилитой pgpro-otel-collector.

Интеграция включает в себя компоненты, перечисленные ниже.

pgpro-otel-collector #

Агент мониторинга, предоставляющий следующую функциональность:

  • собирает журналы активности с экземпляров СУБД Postgres Pro

  • передаёт журналы активности в Elastic APM для Elasticsearch

Elastic APM Elasticsearch #

Система мониторинга производительности приложений на основе Elastic Stack, предоставляющая следующую функциональность:

  • принимает данные от агента мониторинга и преобразует их в формат документа ES

  • отправляет преобразованные данные в Elasticsearch

Elasticsearch #

Система хранения журналов активности, предоставляющая следующую функциональность:

  • принимает журналы активности от системы мониторинга производительности приложений

  • хранит журналы активности согласно внутренним параметрам хранения

  • предоставляет интерфейс для получения журналов активности

PPEM #

Система Postgres Pro Enterprise Manager, предоставляющая следующую функциональность:

  • обращается к системе Elasticsearch для получения журналов активности экземпляров СУБД

  • предоставляет пользователю интерфейс мониторинга в виде текстовых данных на основе журналов активности

Процесс интеграции состоит из следующих этапов:

Дополнительная настройка агента не требуется.

24.2.1. Настройка Elastiсsearch #

  1. Установите сервер Elastic APM по стандартной документации.

  2. Интегрируйте сервер Elastic APM с Elastiсsearch по стандартной документации.

  3. Настройте ingest-конвейер (pipeline) pgpro-otel-collector.

    Это необходимо для совместимости полей документов (журналов) со схемой именования полей Elastiсsearch Common Schema (ECS).

    Пример настройки конвейера (оба запроса следует последовательно выполнить в Kibana Developer Tools):

    PUT _ingest/pipeline/postgrespro-otelcol-enrich-logs
    {
      "description": "Enrich PostgresPro Otel collector logs",
      "processors": [
        {
          "rename": {
            "if": "ctx?.labels?.message != null",
            "field": "labels.message",
            "target_field": "message",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.pid != null",
            "field": "labels.pid",
            "target_field": "process.pid",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.error_severity != null",
            "field": "labels.error_severity",
            "target_field": "log.level",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.user != null",
            "field": "labels.user",
            "target_field": "user.name",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.session_start != null",
            "field": "labels.session_start",
            "target_field": "session.start_time",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.session_id != null",
            "field": "labels.session_id",
            "target_field": "session.id",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.numeric_labels?.tx_id != null",
            "field": "numeric_labels.tx_id",
            "target_field": "transaction.id",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.log_file_name != null",
            "field": "labels.log_file_name",
            "target_field": "log.file.path",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "rename": {
            "if": "ctx?.labels?.dbname != null",
            "field": "labels.dbname",
            "target_field": "db.name",
            "ignore_failure": true,
            "ignore_missing": false,
            "override": true
          }
        },
        {
          "gsub": {
            "if": "ctx?.service?.node?.name != null",
            "field": "service.node.name",
            "target_field": "host.name",
            "pattern": ":.+$",
            "replacement": "",
            "ignore_failure": true,
            "ignore_missing": false
          }
        },
        {
          "remove": {
            "field": [
              "observer.version",
              "observer.hostname",
              "service.language.name"
            ],
            "ignore_failure": true
          }
        },
        {
          "remove": {
            "field": "agent.version",
            "if": "ctx?.agent?.version == \"unknown\"",
            "ignore_failure": true
          }
        }
      ]
    }
    PUT _ingest/pipeline/logs-apm.app@custom
    {
      "processors": [
        {
          "pipeline": {
            "name": "postgrespro-otelcol-enrich-logs"
          }
        }
      ]
    }

24.2.2. Настройка pgpro-otel-collector для Elastiсsearch #

  1. Включите и настройте ресивер filelog.

    Пример настройки ресивера для сценария, когда журналы PostgreSQL генерируются в формате JSON:

    receivers:
      filelog:
        include:
        - /var/log/postgresql/*.json
        operators:
        - parse_ints: true
          timestamp:
            layout: '%Y-%m-%d %H:%M:%S.%L %Z'
            layout_type: strptime
            parse_from: attributes.timestamp
          type: json_parser
        - field: attributes.timestamp
          type: remove
        retry_on_failure:
          enabled: true
          initial_interval: 1s
          max_elapsed_time: 5m
          max_interval: 30s
        start_at: end
  2. Настройте процессоры:

    processors:
      attributes/convert:
        actions:
        - action: convert
          converted_type: string
          key: query_id
        - action: convert
          converted_type: string
          key: pid
      resource:
        attributes:
        - action: upsert
          key: service.name
          value: postgresql
        - action: upsert
          key: service.instance.id
          value: postgresql-01.example.org:5432

    Где:

    • service.name — ключ для именования потока данных (data stream) и, как следствие, индексов.

    • service.instance.id — ключ для идентификации экземпляра.

    • Для журналов в формате JSON обязательно конвертировать поле query_id в строку, так как числовое значение некорректно отображается в ES.

    Важно

    Для хранения данных используются так называемые потоки данных (data streams). Целевой поток выбирается автоматически и имеет форматlogs-apm.app.service.name-namespace.

    Значение service.name указывается при настройке коллектора в списке processors.resource.attributes элементом key: service.name.

    Значение namespace определяется элементом с ключом service.environment. В приведённой настройке он не передаётся, поэтому подставляется значение по умолчанию default. Таким образом, при использовании приведённой настройки журналы активности будут храниться в потоке с именем logs-apm.app.postgresql-default.

  3. Настройте отправку журналов через otlphttpexporter и настройте конвейер:

    exporters:
      otlphttp/elastic_logs:
        compression: gzip
        endpoint: https://elasticsearch-apm.example.org
        tls:
          insecure_skip_verify: false
    
    service:
      extensions: []
      pipelines:
        logs:
          receivers:
          - filelog
          processors:
          - resource
          - attributes/convert
          exporters:
          - otlphttp/elastic_logs
  4. Запустите коллектор и проверьте публикацию метрик на стороне коллектора:

    # systemctl start pgpro-otel-collector
    
    # systemctl status pgpro-otel-collector
    ● pgpro-otel-collector.service - PostgresPro OpenTelemetry Collector
        Loaded: loaded (/lib/systemd/system/pgpro-otel-collector.service; enabled; preset: enabled)
        Active: active (running) since Thu 2025-03-20 01:18:08 MSK; 4h 13min ago
      Main PID: 6991 (pgpro-otel-coll)
        Tasks: 8 (limit: 3512)
        Memory: 119.3M
          CPU: 2min 49.311s
        CGroup: /system.slice/pgpro-otel-collector.service
                └─6991 /usr/bin/pgpro-otel-collector --config /etc/pgpro-otel-collector/basic.yml
    
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.366656,"msg":"Setting up own telemetry..."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.367178,"msg":"Skipped telemetry setup."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.3679142,"msg":"Development component. May change in the future.","kind":"receiver","name":"postgrespro","data_type":"metrics"}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.3494158,"caller":"envprovider@v1.16.0/provider.go:59","msg":"Configuration references unset environment variable","name":"POSTGRESQL_P>
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481084,"msg":"Starting pgpro-otel-collector...","Version":"v0.5.0","NumCPU":1}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4481149,"msg":"Starting extensions..."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"warn","ts":1742422688.4483361,"msg":"Using the 0.0.0.0 address exposes this server to every network interface, which may facilitate Denial of Service attack>
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.4515307,"msg":"Starting stanza receiver","kind":"receiver","name":"filelog","data_type":"logs"}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.451749,"msg":"Everything is ready. Begin running and processing data."}
    Mar 20 01:18:08 postgresql-01.example.org pgpro-otel-collector[6991]: {"level":"info","ts":1742422688.6523068,"msg":"Started watching file","kind":"receiver","name":"filelog","data_type":"logs","component":"fileconsumer","path":"/var/log/postgresql/postgresql-2025-03-20.json"}

24.2.3. Проверка наличия журналов в Elasticsearch #

  • После настройки отправки журналов из pgpro-otel-collector убедитесь, что система Elasticsearch получает метрики.

    Для проверки можно выполнить запрос к хранилищу с помощью утилиты curl.

    Пример запроса:

    curl -s -XGET "https://elasticsearch.example.org:9200/logs-apm.app.postgresql-default/_search?size=10" -H 'Content-Type: application/json' -d'
    {
      "_source": ["message","service.node.name","@timestamp"],
      "sort": [
        { "@timestamp": "desc" }
      ],
      "query": {
            "bool": {
                "filter": [
                    { "term":{"service.node.name":"postgresql-01.example.org:5432" }}]
            }
        }
    }'

    Где:

    • https://elasticsearch.example.org:9200 — URL системы хранения журналов.

    • logs-apm.app.postgresql-default — имя потока данных (data stream) для поиска.

    • size=10 ограничивает количество возвращаемых журналов.

    • "_source": ["message","service.node.name","@timestamp"] — запрашиваемые поля.

    Пример ответа:

    {
      "took": 18,
      "timed_out": false,
      "_shards": {
        "total": 11,
        "successful": 11,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 10000,
          "relation": "gte"
        },
        "max_score": null,
        "hits": [
          {
            "_index": ".ds-logs-apm.app.postgresql-default-2025.03.19-000379",
            "_id": "qmuArJUB2PKtie47RffA",
            "_score": null,
            "_source": {
              "message": "checkpoint complete: wrote 2038 buffers (16.6%); 0 WAL file(s) added, 0 removed, 10 recycled; write=269.563 s, sync=1.192 s, total=270.962 s; sync files=246, longest=0.677 s, average=0.005 s; distance=162419 kB, estimate=174180 kB; lsn=6/62000850, redo lsn=6/583C4DD8",
              "@timestamp": "2025-03-19T03:44:01.336Z",
              "service": {
                "node": {
                  "name": "postgresql-01.example.org:5432"
                }
              }
            },
            "sort": [
              1742355841336
            ]
          }
        ]
      }
    }

24.2.4. Настройка источника данных журналов #

  1. В навигационной панели перейдите в ИнфраструктураИсточники данныхХранилища сообщений.

  2. В правом верхнем углу страницы нажмите Создать хранилище.

  3. Укажите параметры хранилища журналов (помеченные звёздочкой параметры являются обязательными):

    • Система хранения сообщений: тип системы хранения журналов.

      Выберите Elasticsearch.

    • Название: уникальное имя хранилища журналов. Например, Elasticsearch.

    • URL: сетевой адрес для подключения к хранилищу журналов. Например, https://elasticsearch.example.org.

    • Elasticsearch index: Имя индекса (потока) для поисковых запросов.

      Укажите logs-apm.app.postgresql-default.

    • Пользователь: уникальное имя пользователя, если используется аутентификация.

    • Пароль: пароль пользователя, если используется аутентификация.

    • Описание: описание хранилища журналов.

    • Сделать источником данных по умолчанию: указывает, следует ли использовать хранилище журналов по умолчанию для всех запросов, связанных с получением журналов активности.

24.2.5. Проверка работы хранилища журналов #

  1. В навигационной панели перейдите в МониторингЖурнал сообщений.

    Отобразится таблица журналов.

  2. Чтобы сбросить фильтры, нажмите Сбросить всё над таблицей.

  3. Проверьте, что новые журналы активности отображаются в таблице.