3.3. Запросы

Когда все операции миграции были выполнены успешно, можно проверить, как выполняются запросы в распределённой схеме.

3.3.1. Запрос q1

Запрос q1 довольно прост, он возвращает бронирование с указанным номером:

SELECT *
  FROM bookings.bookings b
WHERE b.book_ref = '0824C5';

Для обычной СУБД PostgreSQL и ключа сегментирования ticket_no этот запрос выполняется сравнительно быстро. Скорость запроса для ключа сегментирования book_ref зависит от сегмента, в котором он выполняется. Если он выполняется в сегменте, где физически нет данных, Shardman отправляет запрос в другой сегмент, что вызывает задержку по времени из-за сетевого взаимодействия.

3.3.2. Запрос q2

В результате запроса q2 возвращаются все билеты из указанного бронирования:

SELECT t.*
FROM bookings.bookings b
JOIN bookings.tickets t
  ON t.book_ref = b.book_ref
WHERE b.book_ref = '0824C5';

С помощью ключа сегментирования book_ref запрос передаётся в сегменты, и глобальная таблица объединяется с секциями сегментированной таблицы:

Foreign Scan (actual rows=2 loops=1)
  Relations: (bookings_2_fdw b) INNER JOIN (tickets_2_fdw t)
  Network: FDW bytes sent=433 received=237

Посмотрите на план запроса для ключа сегментирования ticket_no:

Append (actual rows=2 loops=1)
  Network: FDW bytes sent=1263 received=205
  ->  Nested Loop (actual rows=1 loops=1)
        ->  Seq Scan on tickets_0 t_1 (actual rows=1 loops=1)
              Filter: (book_ref = '0824C5'::bpchar)
              Rows Removed by Filter: 207092
        ->  Index Only Scan using bookings_pkey on bookings b (actual rows=1 loops=1)
              Index Cond: (book_ref = '0824C5'::bpchar)
              Heap Fetches: 0
  ->  Async Foreign Scan (actual rows=1 loops=1)
        Relations: (tickets_1_fdw t_2) INNER JOIN (bookings b)
        Network: FDW bytes sent=421 received=205
  ->  Async Foreign Scan (actual rows=0 loops=1)
        Relations: (tickets_2_fdw t_3) INNER JOIN (bookings b)
        Network: FDW bytes sent=421
  ->  Async Foreign Scan (actual rows=0 loops=1)
        Relations: (tickets_3_fdw t_4) INNER JOIN (bookings b)
        Network: FDW bytes sent=421

План содержит узлы Async Foreign Scan, что означает сетевой обмен данными между узлом-источником запроса и сегментами, то есть данные получаются из сегментов и окончательная обработка выполняется на узле-источнике запроса.

Посмотрите на строку Network. Хорошим критерием оптимальности выполнения запросов в сегментах может быть значение received. Чем меньше это значение, тем лучше сегменты справляются с выполнением распределённых запросов. Большая часть обработки выполняется удалённо, и на узел-источник запроса возвращается уже готовый для дальнейшей обработки результат.

Вариант с ключом сегментирования book_ref выглядит значительно лучше, так как этот ключ уже присутствует в таблице с номерами билетов.

План запроса, выполняемого на произвольном узле, будет таким:

Foreign Scan (actual rows=2 loops=1)
  Relations: (bookings_2_fdw b) INNER JOIN (tickets_2_fdw t)
  Network: FDW bytes sent=433 received=237

Происходит сетевой обмен только с одним сегментом, в котором выполняется запрос. Это сегмент shard-3 и секция tickets_2 таблицы tickets на четвёртом узле.

Если этот запрос выполняется в сегменте, где физически присутствуют данные, то скорость выполнения будет ещё выше.

Посмотрите на план:

Nested Loop (actual rows=2 loops=1)
    ->  Index Only Scan using bookings_2_pkey on bookings_2
    ->  Bitmap Heap Scan on tickets_2
          ->  Bitmap Index Scan on tickets_2_book_ref_idx

В данном случае нет необходимости в сетевом обмене данными, так как запрашиваемый данные находятся в том же сегменте, в котором выполняется запрос.

В некоторых случаях, выбор сегмента для запуска запроса имеет смысл. Зная как работает функция распределения, можно реализовать данную логику на уровне приложения и на основании ключа сегментирования отправлять некоторые запросы сразу в сегмент, где содержатся требуемые данные.

3.3.3. Запрос q3

В результате запроса q3 находятся все рейсы для одного из билетов в выбранном ранее бронировании:

SELECT tf.*, t.*
FROM bookings.tickets t
JOIN bookings.ticket_flights tf
  ON tf.ticket_no = t.ticket_no
WHERE t.ticket_no = '0005435126781';

Чтобы выбрать конкретный фрагмент для выполнения запроса, как описано в Подраздел 3.3.2 обратите внимание, что при использовании ключа сегментирования ticket_no выполнение запроса будет более оптимальным в сегменте, содержащем секцию с данными. Планировщик знает, что сегмент содержит все данные, необходимые для объединения таблиц, поэтому никакого сетевого взаимодействия между сегментами не происходит.

Для ключа сегментирования book_ref обратите внимание, что по номеру бронирования можно вычислить номер билета и запросить его сразу с «правильного» сегмента.

В таком случае запрос выглядит следующим образом:

SELECT tf.*, t.*
FROM bookings.tickets t
JOIN bookings.ticket_flights tf
  ON tf.ticket_no = t.ticket_no
  AND t.book_ref = tf.book_ref
WHERE t.ticket_no = '0005435126781'
AND tf.book_ref = '0824C5';

Запрос выполняется медленнее в сегменте, не содержащем секцию с искомыми данными:

Foreign Scan (actual rows=6 loops=1)
  Relations: (tickets_1_fdw t) INNER JOIN (ticket_flights_1_fdw tf)
  Network: FDW bytes sent=434 received=369

Сетевая связь между сегментами присутствует в плане, так как он содержит узел Foreign Scan.

Важность указания в запросе ключа сегментирования, в случае с book_ref, можно проиллюстрировать таким запросом:

SELECT tf.*, t.*
FROM bookings.tickets t
JOIN bookings.ticket_flights tf
  ON tf.ticket_no = t.ticket_no
WHERE t.ticket_no = '0005435126781'
AND tf.book_ref = '0824C5';

Здесь ключ сегментирования намеренно не указан в join. Взглянем на план:

Nested Loop (actual rows=6 loops=1)
  Network: FDW bytes sent=1419 received=600
  ->  Foreign Scan on ticket_flights_2_fdw tf (actual rows=6 loops=1)
        Network: FDW bytes sent=381 received=395
  ->  Append (actual rows=1 loops=6)
        Network: FDW bytes sent=1038 received=205
        ->  Seq Scan on tickets_0 t_1 (actual rows=0 loops=6)
              Filter: (ticket_no = '0005435126781'::bpchar)
              Rows Removed by Filter: 207273
        ->  Async Foreign Scan on tickets_1_fdw t_2 (actual rows=0 loops=6)
              Network: FDW bytes sent=346 received=205
        ->  Async Foreign Scan on tickets_2_fdw t_3 (actual rows=1 loops=6)
              Network: FDW bytes sent=346
        ->  Async Foreign Scan on tickets_3_fdw t_4 (actual rows=0 loops=6)
              Network: FDW bytes sent=346

В отличие от предыдущих примеров заметны отклонения. Здесь запрос выполнялся на всех узлах, причём индекс при этом не использовался, поэтому для того, чтобы вернуть всего 6 строк, Shardman был вынужден просканировать последовательно секции таблицы tickets целиком, вернуть результат источнику запроса, после чего выполнить соединение (join) с таблицей ticket_flights. Наличие узлов Async Foreign Scan говорит о том, что в сегментах выполняется последовательное сканирование таблицы tickets.

3.3.4. Запрос q4

В результате данного запроса возвращаются все перелёты по всем билетам указанным в бронировании. Есть несколько вариантов, как его выполнить: включить в условии WHERE подзапрос с указанием номера бронирования, перечислить явно в конструкции IN номера билетов или использовать конструкцию WHERE... OR. Ниже показано, как будут работать все перечисленные варианты.

SELECT tf.*, t.*
FROM bookings.tickets t
JOIN bookings.ticket_flights tf
  ON tf.ticket_no = t.ticket_no
WHERE t.ticket_no IN (
   SELECT t.ticket_no
     FROM bookings.bookings b
     JOIN bookings.tickets  t
       ON t.book_ref = b.book_ref
    WHERE b.book_ref = '0824C5'
);

Данный пример — попытка выполнить запрос из нераспределённой БД. Но он выполняется одинаково плохо для обоих вариантов ключей сегментирования.

План запроса будет примерно таким:

Hash Join (actual rows=12 loops=1)
  Hash Cond: (tf.ticket_no = t.ticket_no)
  ->  Append (actual rows=2360335 loops=1)
        ->  Async Foreign Scan on ticket_flights_0_fdw tf_1 (actual rows=589983 loops=1)
        ->  Async Foreign Scan on ticket_flights_1_fdw tf_2 (actual rows=590175 loops=1)
        ->  Seq Scan on ticket_flights_2 tf_3 (actual rows=590174 loops=1)
        ->  Async Foreign Scan on ticket_flights_3_fdw tf_4 (actual rows=590003 loops=1)
  ->  Hash (actual rows=2 loops=1)
        Buckets: 1024  Batches: 1  Memory Usage: 9kB
        ->  Hash Semi Join (actual rows=2 loops=1)
              Hash Cond: (t.ticket_no = t_5.ticket_no)
              ->  Append (actual rows=829071 loops=1)
                    ->  Async Foreign Scan on tickets_0_fdw t_1 (actual rows=207273 loops=1)
                    ->  Async Foreign Scan on tickets_1_fdw t_2 (actual rows=207058 loops=1)
                    ->  Seq Scan on tickets_2 t_3 (actual rows=207431 loops=1)
                    ->  Async Foreign Scan on tickets_3_fdw t_4 (actual rows=207309 loops=1)
              ->  Hash (actual rows=2 loops=1)
                    Buckets: 1024  Batches: 1  Memory Usage: 9kB
                    ->  Nested Loop (actual rows=2 loops=1)
                          ->  Index Only Scan using tickets_2_pkey on tickets_2 t_5
                          ->  Materialize (actual rows=1 loops=2)
                                ->  Index Only Scan using bookings_2_pkey on bookings_2 b
 

Такой план говорит о том, что Shardman справился с подзапросом в WHERE, и был вынужден запросить все строки таблиц tickets и ticket_flights, после чего выполнить их обработку на узле источнике запроса, что крайне неэффективно. Попробуем другие варианты:

Для запроса с ключом сегментирования ticket_no:

    SELECT tf.*, t.*
    FROM bookings.tickets t
    JOIN bookings.ticket_flights tf
      ON tf.ticket_no = t.ticket_no
    WHERE t.ticket_no IN ('0005435126781','0005435126782');
    

план будет таким:

    Append (actual rows=12 loops=1)
      Network: FDW bytes sent=1098 received=1656
      ->  Async Foreign Scan (actual rows=6 loops=1)
            Relations: (tickets_0_fdw t_1) INNER JOIN (ticket_flights_0_fdw tf_1)
            Network: FDW bytes sent=549 received=1656
      ->  Async Foreign Scan (actual rows=6 loops=1)
            Relations: (tickets_1_fdw t_2) INNER JOIN (ticket_flights_1_fdw tf_2)
            Network: FDW bytes sent=549
    

Этот план эффективнее, запрос выполняется на двух сегментах из четырёх, и требуется лишь выполнить Append для полученных результатов.

Вновь обратим внимание, что book_ref присутствует в обеих таблицах — tickets и ticket_flights. Запрос для ключа сегментирования book_ref:

SELECT tf.*, t.*
FROM bookings.tickets t
JOIN bookings.ticket_flights tf
ON tf.ticket_no = t.ticket_no
AND tf.book_ref = t.book_ref
WHERE t.book_ref = '0824C5';

при этом план запроса:

Foreign Scan (actual rows=12 loops=1)
  Relations: (tickets_2_fdw t) INNER JOIN (ticket_flights_2_fdw tf)
  Network: FDW bytes sent=547 received=1717

Это отличный результат: запрос был модифицирован для эффективного выполнения в распределённой схеме.

3.3.5. Запрос q5

Это небольшой аналитический запрос, который возвращает имена и номера билетов тех пассажиров, которые прошли регистрацию первыми.

SELECT t.passenger_name, t.ticket_no
FROM bookings.tickets t
JOIN bookings.boarding_passes bp
  ON bp.ticket_no = t.ticket_no
GROUP BY t.passenger_name, t.ticket_no
HAVING max(bp.boarding_no) = 1
AND count(*) > 1;

Данный запрос выполняется одинаково медленно для обоих вариантов ключей сегментирования. План запроса с ключом сегментирования book_ref выглядит так:

HashAggregate (actual rows=424 loops=1)
  Group Key: t.ticket_no
  Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1))
  Batches: 85  Memory Usage: 4265kB  Disk Usage: 112008kB
  Rows Removed by Filter: 700748
  Network: FDW bytes sent=1215 received=77111136
  ->  Append (actual rows=1894295 loops=1)
        Network: FDW bytes sent=1215 received=77111136
        ->  Async Foreign Scan (actual rows=473327 loops=1)
              Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1)
              Network: FDW bytes sent=404 received=813128
        ->  Async Foreign Scan (actual rows=472632 loops=1)
              Relations: (tickets_1_fdw t_2) INNER JOIN (boarding_passes_1_fdw bp_2)
              Network: FDW bytes sent=404
        ->  Async Foreign Scan (actual rows=475755 loops=1)
              Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3)
              Network: FDW bytes sent=407
        ->  Hash Join (actual rows=472581 loops=1)
              Hash Cond: (bp_4.ticket_no = t_4.ticket_no)
              Network: FDW bytes received=28841344
              ->  Seq Scan on boarding_passes_3 bp_4 (actual rows=472581 loops=1)
              ->  Hash (actual rows=207118 loops=1)
                    Buckets: 65536  Batches: 4  Memory Usage: 3654kB
                    Network: FDW bytes received=9176680
                    ->  Seq Scan on tickets_3 t_4 (actual rows=207118 loops=1)
                          Network: FDW bytes received=9176680

Обратите внимание на довольно большой объём передачи данных по сети между сегментами. Попробуйте улучшить запрос, добавив book_ref как дополнительное условие для соединения таблиц:

SELECT t.passenger_name, t.ticket_no
FROM bookings.tickets t
JOIN bookings.boarding_passes bp
  ON bp.ticket_no = t.ticket_no
  AND bp.book_ref=t.book_ref -- <= added book_ref
GROUP BY t.passenger_name, t.ticket_no
HAVING max(bp.boarding_no) = 1
AND count(*) > 1;

Посмотрите на план запроса:

GroupAggregate (actual rows=424 loops=1)
  Group Key: t.passenger_name, t.ticket_no
  Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1))
  Rows Removed by Filter: 700748
  Network: FDW bytes sent=1424 received=77092816
  ->  Merge Append (actual rows=1894295 loops=1)
        Sort Key: t.passenger_name, t.ticket_no
        Network: FDW bytes sent=1424 received=77092816
        ->  Foreign Scan (actual rows=472757 loops=1)
              Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1)
              Network: FDW bytes sent=472 received=2884064
        ->  Sort (actual rows=472843 loops=1)
              Sort Key: t_2.passenger_name, t_2.ticket_no
              Sort Method: external merge  Disk: 21152kB
              Network: FDW bytes received=22753536
              ->  Hash Join (actual rows=472843 loops=1)
                    Hash Cond: ((bp_2.ticket_no = t_2.ticket_no) AND (bp_2.book_ref = t_2.book_ref))
                    Network: FDW bytes received=22753536
                    ->  Seq Scan on boarding_passes_1 bp_2 (actual rows=472843 loops=1)
                    ->  Hash (actual rows=207058 loops=1)
                          Buckets: 65536  Batches: 8  Memory Usage: 2264kB
                          Network: FDW bytes received=22753536
                          ->  Seq Scan on tickets_1 t_2 (actual rows=207058 loops=1)
                                Network: FDW bytes received=22753536
        ->  Foreign Scan (actual rows=474715 loops=1)
              Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3)
              Network: FDW bytes sent=476 received=2884120
        ->  Foreign Scan (actual rows=473980 loops=1)
              Relations: (tickets_3_fdw t_4) INNER JOIN (boarding_passes_3_fdw bp_4)
              Network: FDW bytes sent=476 received=25745384

Ситуация заметно улучшилась, результат был получен на узле-источнике запроса был получен, и затем произведена окончательная фильтрация, группировка и объединение данных.

Для ключа сегментирования ticket_no план исходного запроса выглядит так:

HashAggregate (actual rows=424 loops=1)
  Group Key: t.ticket_no
  Filter: ((max(bp.boarding_no) = 1) AND (count(*) > 1))
  Batches: 85  Memory Usage: 4265kB  Disk Usage: 111824kB
  Rows Removed by Filter: 700748
  Network: FDW bytes sent=1188 received=77103620
  ->  Append (actual rows=1894295 loops=1)
        Network: FDW bytes sent=1188 received=77103620
        ->  Async Foreign Scan (actual rows=473327 loops=1)
              Relations: (tickets_0_fdw t_1) INNER JOIN (boarding_passes_0_fdw bp_1)
              Network: FDW bytes sent=394
        ->  Hash Join (actual rows=472632 loops=1)
              Hash Cond: (bp_2.ticket_no = t_2.ticket_no)
              Network: FDW bytes received=77103620
              ->  Seq Scan on boarding_passes_1 bp_2 (actual rows=472632 loops=1)
              ->  Hash (actual rows=206712 loops=1)
                    Buckets: 65536  Batches: 4  Memory Usage: 3654kB
                    Network: FDW bytes received=23859576
                    ->  Seq Scan on tickets_1 t_2 (actual rows=206712 loops=1)
                          Network: FDW bytes received=23859576
        ->  Async Foreign Scan (actual rows=475755 loops=1)
              Relations: (tickets_2_fdw t_3) INNER JOIN (boarding_passes_2_fdw bp_3)
              Network: FDW bytes sent=397
        ->  Async Foreign Scan (actual rows=472581 loops=1)
              Relations: (tickets_3_fdw t_4) INNER JOIN (boarding_passes_3_fdw bp_4)
              Network: FDW bytes sent=397

Видно, что соединение данных таблиц выполняется в сегментах, а на узле-источнике запроса выполняется фильтрация, группировка и агрегирование полученных данных. В данном случае не нужно изменять исходный запрос.

3.3.6. Запрос q6

В результате данного запроса для каждого билета, забронированного неделю назад от текущего момента, выводятся входящие в него перелёты вместе с запасом времени на пересадку на следующий рейс.

SELECT tf.ticket_no,f.departure_airport,
      f.arrival_airport,f.scheduled_arrival,
      lead(f.scheduled_departure) OVER w AS next_departure,
      lead(f.scheduled_departure) OVER w - f.scheduled_arrival AS gap
FROM bookings.bookings b
JOIN bookings.tickets t
 ON t.book_ref = b.book_ref
JOIN bookings.ticket_flights tf
 ON tf.ticket_no = t.ticket_no
JOIN bookings.flights f
 ON tf.flight_id = f.flight_id
WHERE b.book_date = bookings.now()::date - INTERVAL '7 day'

WINDOW w AS (
PARTITION BY tf.ticket_no
ORDER BY f.scheduled_departure);

Для этого запроса тип столбца book_date из timestamptz должен быть приведён к типу date. При приведении типа PostgreSQL приводит тип данных столбца к типу данных, указанному в условии фильтрации, но не наоборот. Поэтому Shardman сначала должен получить все данные из других сегментов, привести тип, и только после этого применить правило фильтрации. План запроса будет выглядеть примерно так:

WindowAgg (actual rows=26 loops=1)
  Network: FDW bytes sent=1750 received=113339240
  ->  Sort (actual rows=26 loops=1)
        Sort Key: tf.ticket_no, f.scheduled_departure
        Sort Method: quicksort  Memory: 27kB
        Network: FDW bytes sent=1750 received=113339240
        ->  Append (actual rows=26 loops=1)
              Network: FDW bytes sent=1750 received=113339240
              ->  Hash Join (actual rows=10 loops=1)
                    Hash Cond: (t_1.book_ref = b.book_ref)
                    Network: FDW bytes sent=582 received=37717376
              ->  Hash Join (actual rows=6 loops=1)
                    Hash Cond: (t_2.book_ref = b.book_ref)
                    Network: FDW bytes sent=582 received=37700608
              ->  Hash Join (actual rows=2 loops=1)
                    Hash Cond: (t_3.book_ref = b.book_ref)
                    Network: FDW bytes sent=586 received=37921256
              ->  Nested Loop (actual rows=8 loops=1)
                    ->  Nested Loop (actual rows=8 loops=1)
                          ->  Hash Join (actual rows=2 loops=1)
                                Hash Cond: (t_4.book_ref = b.book_ref)
                                ->  Seq Scan on tickets_3 t_4 (actual rows=207118 loops=1)
                    ->  Index Scan using flights_pkey on flights f (actual rows=1 loops=8)
                          Index Cond: (flight_id = tf_4.flight_id)

Обратите внимание на количество байт, полученных от других сегментов кластера и на последовательное сканирование таблицы tickets. Попробуем переписать запрос так, чтобы можно было избежать приведения.

Интервал будет вычисляться не на уровне базы данных, а на уровне приложения, в запросу сразу будут передаваться данные типа timestamptz. Кроме того, может помочь создание дополнительного индекса:

CREATE INDEX if not exists bookings_date_idx ON bookings.bookings(book_date);

Запрос с ключом сегментирования book_ref будет выглядеть так:

SELECT tf.ticket_no,f.departure_airport,
      f.arrival_airport,f.scheduled_arrival,
      lead(f.scheduled_departure) OVER w AS next_departure,
      lead(f.scheduled_departure) OVER w - f.scheduled_arrival AS gap
FROM bookings.bookings b
JOIN bookings.tickets t
 ON t.book_ref = b.book_ref
JOIN bookings.ticket_flights tf
 ON tf.ticket_no = t.ticket_no
AND tf.book_ref = t.book_ref -- <= added book_ref
JOIN bookings.flights f
 ON tf.flight_id = f.flight_id
WHERE b.book_date = '2016-10-06 14:00:00+00'
WINDOW w AS (
PARTITION BY tf.ticket_no
ORDER BY f.scheduled_departure);

У данный запроса будет уже другой план:

WindowAgg (actual rows=18 loops=1)
  Network: FDW bytes sent=2268 received=892
  ->  Sort (actual rows=18 loops=1)
        Sort Key: tf.ticket_no, f.scheduled_departure
        Sort Method: quicksort  Memory: 26kB
        Network: FDW bytes sent=2268 received=892
        ->  Append (actual rows=18 loops=1)
              Network: FDW bytes sent=2268 received=892
              ->  Nested Loop (actual rows=4 loops=1)
                    ->  Nested Loop (actual rows=4 loops=1)
                          ->  Nested Loop (actual rows=1 loops=1)
                                ->  Bitmap Heap Scan on bookings_0 b_1
                                      Heap Blocks: exact=1
                                      ->  Bitmap Index Scan on bookings_0_book_date_idx
                                ->  Index Only Scan using tickets_0_pkey on tickets_0 t_1
                                      Index Cond: (book_ref = b_1.book_ref)
                                      Heap Fetches: 0
                          ->  Index Only Scan using ticket_flights_0_pkey on ticket_flights_0 tf_1
                                Heap Fetches: 0
                    ->  Index Scan using flights_pkey on flights f (actual rows=1 loops=4)
                          Index Cond: (flight_id = tf_1.flight_id)
              ->  Async Foreign Scan (actual rows=14 loops=1)
                    Network: FDW bytes sent=754 received=892
              ->  Async Foreign Scan (actual rows=0 loops=1)
                    Network: FDW bytes sent=757 -- received=0!
              ->  Async Foreign Scan (actual rows=0 loops=1)
                    Network: FDW bytes sent=757 -- received=0!

Он гораздо эффективнее. Во-первых, таблица не сканируется целиком, используется Index Only Scan. Во-вторых, видно, насколько снизился объём переданных по сети данных между узлами.

3.3.7. Запрос q7

Допустим, нужна статистика по количеству пассажиров на одно бронирование. Для её получения сначала посчитаем количество пассажиров в каждом бронировании, а затем — количество бронирований с каждым вариантом количества пассажиров.

SELECT tt.cnt, count(*)
FROM (
   SELECT count(*) cnt
   FROM bookings.tickets t
   GROUP BY t.book_ref
   ) tt
GROUP BY tt.cnt
ORDER BY tt.cnt;

В результате данного запроса будут обработаны все данные таблиц tickets и bookings, поэтому не получится избежать интенсивного обмена данными по сети между сегментами кластера. Также обратите внимание, что значение параметра work_mem должно быть достаточно большим во избежание использования диска при соединении таблиц. Измените значение параметра work_mem в кластере:

shardmanctl set work_mem='256MB';

План запроса с ключом сегментирования ticket_no:

GroupAggregate (actual rows=5 loops=1)
  Group Key: tt.cnt
  Network: FDW bytes sent=798 received=18338112
  ->  Sort (actual rows=593433 loops=1)
        Sort Key: tt.cnt
        Sort Method: quicksort  Memory: 57030kB
        Network: FDW bytes sent=798 received=18338112
        ->  Subquery Scan on tt (actual rows=593433 loops=1)
              Network: FDW bytes sent=798 received=18338112
              ->  Finalize HashAggregate (actual rows=593433 loops=1)
                    Group Key: t.book_ref
                    Batches: 1  Memory Usage: 81953kB
                    Network: FDW bytes sent=798 received=18338112
                    ->  Append (actual rows=763806 loops=1)
                          Network: FDW bytes sent=798 received=18338112
                          ->  Async Foreign Scan (actual rows=190886 loops=1)
                                Relations: Aggregate on (tickets_0_fdw t)
                                Network: FDW bytes sent=266 received=1558336
                          ->  Async Foreign Scan (actual rows=190501 loops=1)
                                Relations: Aggregate on (tickets_1_fdw t_1)
                                Network: FDW bytes sent=266
                          ->  Async Foreign Scan (actual rows=191589 loops=1)
                                Relations: Aggregate on (tickets_2_fdw t_2)
                                Network: FDW bytes sent=266
                          ->  Partial HashAggregate (actual rows=190830 loops=1)
                                Group Key: t_3.book_ref
                                Batches: 1  Memory Usage: 36881kB
                                Network: FDW bytes received=4981496
                                ->  Seq Scan on tickets_3 t_3 (actual rows=207118 loops=1)
                                      Network: FDW bytes received=4981496

План запроса с ключом сегментирования book_ref:

Sort (actual rows=5 loops=1)
  Sort Key: (count(*))
  Sort Method: quicksort  Memory: 25kB
  Network: FDW bytes sent=798 received=14239951
  ->  HashAggregate (actual rows=5 loops=1)
        Group Key: (count(*))
        Batches: 1  Memory Usage: 40kB
        Network: FDW bytes sent=798 received=14239951
        ->  Append (actual rows=593433 loops=1)
              Network: FDW bytes sent=798 received=14239951
              ->  GroupAggregate (actual rows=148504 loops=1)
                    Group Key: t.book_ref
                    ->  Index Only Scan using tickets_0_book_ref_idx on tickets_0 t (rows=207273)
                          Heap Fetches: 0
              ->  Async Foreign Scan (actual rows=148256 loops=1)
                    Relations: Aggregate on (tickets_1_fdw t_1)
                    Network: FDW bytes sent=266 received=1917350
              ->  Async Foreign Scan (actual rows=148270 loops=1)
                    Relations: Aggregate on (tickets_2_fdw t_2)
                    Network: FDW bytes sent=266
              ->  Async Foreign Scan (actual rows=148403 loops=1)
                    Relations: Aggregate on (tickets_3_fdw t_3)
                    Network: FDW bytes sent=266

Видно, что планы запросов различаются прежде всего порядком соединения таблиц и вычислением агрегатов.

Для ключа сегментирования ticket_no принимаются все частично агрегированные данные объединяемых таблиц (17 Мб), а вся остальная обработка выполняется на узле-источнике запроса.

Для ключа сегментирования book_ref, поскольку он включён в запрос, большая часть вычислений агрегатов выполняется на узлах, и на узел-источник запроса возвращается только результат (13 МБ), который затем финально обрабатывается.

3.3.8. Запрос q8

Этот запрос отвечает на следующие вопросы: какие сочетания имён и фамилий встречаются чаще всего и как понять какую долю от числа всех пассажиров составляют такие сочетания? Для получения результата в запросе используется оконная функция:

SELECT passenger_name,
      round( 100.0 * cnt / sum(cnt) OVER (), 2)
   AS percent
FROM (
 SELECT passenger_name,
        count(*) cnt
 FROM bookings.tickets
 GROUP BY passenger_name
) t
ORDER BY percent DESC;

В обоих случаях план запроса будет выглядеть примерно так:

Sort (actual rows=27909 loops=1)
  Sort Key: (round(((100.0 * ((count(*)))::numeric) / sum((count(*))) OVER (?)), 2)) DESC
  Sort Method: quicksort  Memory: 3076kB
  Network: FDW bytes sent=816 received=2376448
  ->  WindowAgg (actual rows=27909 loops=1)
        Network: FDW bytes sent=816 received=2376448
        ->  Finalize HashAggregate (actual rows=27909 loops=1)
              Group Key: tickets.passenger_name
              Batches: 1  Memory Usage: 5649kB
              Network: FDW bytes sent=816 received=2376448
              ->  Append (actual rows=74104 loops=1)
                    Network: FDW bytes sent=816 received=2376448
                    ->  Partial HashAggregate (actual rows=18589 loops=1)
                          Group Key: tickets.passenger_name
                          Batches: 1  Memory Usage: 2833kB
                          ->  Seq Scan on tickets_0 tickets (actual rows=207273 loops=1)
                    ->  Async Foreign Scan (actual rows=18435 loops=1)
                          Relations: Aggregate on (tickets_1_fdw tickets_1)
                          Network: FDW bytes sent=272 received=2376448
                    ->  Async Foreign Scan (actual rows=18567 loops=1)
                          Relations: Aggregate on (tickets_2_fdw tickets_2)
                          Network: FDW bytes sent=272
                    ->  Async Foreign Scan (actual rows=18513 loops=1)
                          Relations: Aggregate on (tickets_3_fdw tickets_3)
                          Network: FDW bytes sent=272

В плане запроса видно, что предварительная обработка данных, соединение таблиц и частичное агрегирование выполняется в сегментах, а финальная обработка — на узле-источнике запроса.

3.3.9. Запрос q9

Данный запрос отвечает на следующие вопросы: кто летел позавчера рейсом Москва (SVO) — Новосибирск (OVB) на месте 1A, и когда он забронировал себе билет? «Позавчера» считается от функции booking.now, а не от текущей даты. Исходный запрос в нераспределённой схеме выглядит так:

SELECT
   t.passenger_name,
   b.book_date v
FROM bookings b
JOIN tickets t ON
   t.book_ref = b.book_ref
JOIN boarding_passes bp
   ON bp.ticket_no = t.ticket_no
JOIN flights f ON
   f.flight_id = bp.flight_id
WHERE f.departure_airport = 'SVO'
AND f.arrival_airport = 'OVB'
AND f.scheduled_departure::date = bookings.now()::date - INTERVAL '2 day'
AND bp.seat_no = '1A';

Как уже объяснялось в описании Запроса q6, использование INTERVAL вызовет приведение типов. Поэтому следует сразу от него избавиться и переписать запрос для ключа сегментирования book_ref следующим образом:

SELECT
   t.passenger_name,
   b.book_date v
FROM bookings b
JOIN tickets t ON
   t.book_ref = b.book_ref
JOIN boarding_passes bp
   ON bp.ticket_no = t.ticket_no
   AND bp.book_ref = b.book_ref -- <= added book_ref
JOIN flights f ON
   f.flight_id = bp.flight_id
WHERE f.departure_airport = 'SVO'
AND f.arrival_airport = 'OVB'
AND f.scheduled_departure
  BETWEEN '2016-10-11 14:00:00+00' AND '2016-10-13 14:00:00+00'
AND bp.seat_no = '1A';

Также создайте пару вспомогательных индексов:

CREATE INDEX idx_boarding_passes_seats
     ON boarding_passes((seat_no::text));
CREATE INDEX idx_flights_sched_dep
     ON flights(departure_airport,arrival_airport,scheduled_departure);

В результате должен получиться достаточно эффективный план запроса:

Append (actual rows=1 loops=1)
  Network: FDW bytes sent=2484 received=102
  ->  Nested Loop (actual rows=1 loops=1)
        Join Filter: (bp_1.ticket_no = t_1.ticket_no)
        Rows Removed by Join Filter: 1
        ->  Nested Loop (actual rows=1 loops=1)
              ->  Hash Join (actual rows=1 loops=1)
                    Hash Cond: (bp_1.flight_id = f.flight_id)
                    ->  Bitmap Heap Scan on boarding_passes_0 bp_1 (actual rows=4919 loops=1)
                          Recheck Cond: ((seat_no)::text = '1A'::text)
                          Heap Blocks: exact=2632
                          ->  Bitmap Index Scan on boarding_passes_0_seat_no_idx (actual rows=4919)
                                Index Cond: ((seat_no)::text = '1A'::text)
                    ->  Hash (actual rows=2 loops=1)
                          Buckets: 1024  Batches: 1  Memory Usage: 9kB
                          ->  Bitmap Heap Scan on flights f (actual rows=2 loops=1)
                                Recheck Cond:
                     ((departure_airport = 'SVO'::bpchar) AND (arrival_airport = 'OVB'::bpchar) AND
                     (scheduled_departure >= '2016-10-11 14:00:00+00'::timestamp with time zone) AND
                     (scheduled_departure < '2016-10-13 14:00:00+00'::timestamp with time zone))
                                Heap Blocks: exact=2
                                ->  Bitmap Index Scan on idx_flights_sched_dep (actual rows=2 loops=1)
                                      Index Cond:
                                   ((departure_airport = 'SVO'::bpchar) AND
                                   (arrival_airport = 'OVB'::bpchar) AND
                       (scheduled_departure >= '2016-10-11 14:00:00+00'::timestamp with time zone) AND
                       (scheduled_departure <= '2016-10-13 14:00:00+00'::timestamp with time zone))
              ->  Index Scan using bookings_0_pkey on bookings_0 b_1 (actual rows=1 loops=1)
                    Index Cond: (book_ref = bp_1.book_ref)
        ->  Index Scan using tickets_0_book_ref_idx on tickets_0 t_1 (actual rows=2 loops=1)
              Index Cond: (book_ref = b_1.book_ref)
  ->  Async Foreign Scan (actual rows=0 loops=1)
        Relations: (((boarding_passes_1_fdw bp_2) INNER JOIN (flights f)) INNER JOIN (tickets_1_fdw t_2)) INNER JOIN (bookings_1_fdw b_2)
        Network: FDW bytes sent=826 received=68
  ->  Async Foreign Scan (actual rows=0 loops=1)
        Relations: (((boarding_passes_2_fdw bp_3) INNER JOIN (flights f)) INNER JOIN (tickets_2_fdw t_3)) INNER JOIN (bookings_2_fdw b_3)
        Network: FDW bytes sent=829 received=34
  ->  Async Foreign Scan (actual rows=0 loops=1)
        Relations: (((boarding_passes_3_fdw bp_4) INNER JOIN (flights f)) INNER JOIN (tickets_3_fdw t_4)) INNER JOIN (bookings_3_fdw b_4)
        Network: FDW bytes sent=829

В данном плане видно, что вся работа по соединению таблиц была выполнена в сегментах, на узел-источник запроса вернулся уже готовый результат, не содержащий строк, так как данные были локализованы в том единственном сегменте, где выполнялся запрос.

Если бы данный запрос выполнялся в другом сегменте, план был бы таким же, но данные для итоговой обработки результата были бы получены от сегмента, содержащего данные.