Re: Minimal logical decoding on standbys

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Minimal logical decoding on standbys
Дата
Msg-id 20210406180231.qsnkyrgrm7gtxb73@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: Minimal logical decoding on standbys  ("Drouvot, Bertrand" <bdrouvot@amazon.com>)
Ответы Re: Minimal logical decoding on standbys
Re: Minimal logical decoding on standbys
Re: Minimal logical decoding on standbys
Список pgsql-hackers
Hi,

On 2021-04-06 14:30:29 +0200, Drouvot, Bertrand wrote:
> From 827295f74aff9c627ee722f541a6c7cc6d4133cf Mon Sep 17 00:00:00 2001
> From: bdrouvotAWS <bdrouvot@amazon.com>
> Date: Tue, 6 Apr 2021 11:59:23 +0000
> Subject: [PATCH v15 1/5] Allow logical decoding on standby.
> 
> Allow a logical slot to be created on standby. Restrict its usage
> or its creation if wal_level on primary is less than logical.
> During slot creation, it's restart_lsn is set to the last replayed
> LSN. Effectively, a logical slot creation on standby waits for an
> xl_running_xact record to arrive from primary. Conflicting slots
> would be handled in next commits.
>
> Andres Freund and Amit Khandekar.

I think more people have worked on this by now...

Does this strike you as an accurate description?

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas

> --- a/src/backend/replication/logical/logical.c
> +++ b/src/backend/replication/logical/logical.c
> @@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void)
>                  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>                   errmsg("logical decoding requires a database connection")));
>  
> -    /* ----
> -     * TODO: We got to change that someday soon...
> -     *
> -     * There's basically three things missing to allow this:
> -     * 1) We need to be able to correctly and quickly identify the timeline a
> -     *      LSN belongs to
> -     * 2) We need to force hot_standby_feedback to be enabled at all times so
> -     *      the primary cannot remove rows we need.
> -     * 3) support dropping replication slots referring to a database, in
> -     *      dbase_redo. There can't be any active ones due to HS recovery
> -     *      conflicts, so that should be relatively easy.
> -     * ----
> -     */
>      if (RecoveryInProgress())
> -        ereport(ERROR,
> -                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
> -                 errmsg("logical decoding cannot be used while in recovery")));

Maybe I am just missing something right now, and maybe I'm being a bit
overly pedantic, but I don't immediately see how 0001 is correct without
0002 and 0003? I think it'd be better to first introduce the conflict
information, then check for conflicts, and only after that allow
decoding on standbys?


> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
> index 6f8810e149..6a21cba362 100644
> --- a/src/backend/access/transam/xlog.c
> +++ b/src/backend/access/transam/xlog.c
> @@ -5080,6 +5080,17 @@ LocalProcessControlFile(bool reset)
>      ReadControlFile();
>  }
>  
> +/*
> + * Get the wal_level from the control file. For a standby, this value should be
> + * considered as its active wal_level, because it may be different from what
> + * was originally configured on standby.
> + */
> +WalLevel
> +GetActiveWalLevel(void)
> +{
> +    return ControlFile->wal_level;
> +}
> +

This strikes me as error-prone - there's nothing in the function name
that this should mainly (only?) be used during recovery...


> +        if (SlotIsPhysical(slot))
> +            restart_lsn = GetRedoRecPtr();
> +        else if (RecoveryInProgress())
> +        {
> +            restart_lsn = GetXLogReplayRecPtr(NULL);
> +            /*
> +             * Replay pointer may point one past the end of the record. If that
> +             * is a XLOG page boundary, it will not be a valid LSN for the
> +             * start of a record, so bump it up past the page header.
> +             */
> +            if (!XRecOffIsValid(restart_lsn))
> +            {
> +                if (restart_lsn % XLOG_BLCKSZ != 0)
> +                    elog(ERROR, "invalid replay pointer");
> +
> +                /* For the first page of a segment file, it's a long header */
> +                if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
> +                    restart_lsn += SizeOfXLogLongPHD;
> +                else
> +                    restart_lsn += SizeOfXLogShortPHD;
> +            }
> +        }

This seems like a layering violation to me. I don't think stuff like
this should be outside of xlog[reader].c, and definitely not in
ReplicationSlotReserveWal().

Relevant discussion (which totally escaped my mind):
https://postgr.es/m/CAJ3gD9csOr0LoYoMK9NnfBk0RZmvHXcJAFWFd2EuL%3DNOfz7PVA%40mail.gmail.com


> +        else
> +            restart_lsn = GetXLogInsertRecPtr();
> +
> +        SpinLockAcquire(&slot->mutex);
> +        slot->data.restart_lsn = restart_lsn;
> +        SpinLockRelease(&slot->mutex);
> +
>          if (!RecoveryInProgress() && SlotIsLogical(slot))
>          {
>              XLogRecPtr    flushptr;
>  
> -            /* start at current insert position */
> -            restart_lsn = GetXLogInsertRecPtr();
> -            SpinLockAcquire(&slot->mutex);
> -            slot->data.restart_lsn = restart_lsn;
> -            SpinLockRelease(&slot->mutex);
> -
>              /* make sure we have enough information to start */
>              flushptr = LogStandbySnapshot();
>  
>              /* and make sure it's fsynced to disk */
>              XLogFlush(flushptr);
>          }
> -        else
> -        {
> -            restart_lsn = GetRedoRecPtr();
> -            SpinLockAcquire(&slot->mutex);
> -            slot->data.restart_lsn = restart_lsn;
> -            SpinLockRelease(&slot->mutex);
> -        }
>  
>          /* prevent WAL removal as fast as possible */
>          ReplicationSlotsComputeRequiredLSN();

I think I'd move the LogStandbySnapshot() piece out of the entire
loop. There's no reason for logging multiple ones if we then just end up
failing because of the XLogGetLastRemovedSegno() check.


> diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
> index 178d49710a..6c4c26c2fe 100644
> --- a/src/include/access/heapam_xlog.h
> +++ b/src/include/access/heapam_xlog.h
> @@ -239,6 +239,7 @@ typedef struct xl_heap_update
>   */
>  typedef struct xl_heap_clean
>  {
> +    bool        onCatalogTable;
>      TransactionId latestRemovedXid;
>      uint16        nredirected;
>      uint16        ndead;
> @@ -254,6 +255,7 @@ typedef struct xl_heap_clean
>   */
>  typedef struct xl_heap_cleanup_info
>  {
> +    bool        onCatalogTable;
>      RelFileNode node;
>      TransactionId latestRemovedXid;
>  } xl_heap_cleanup_info;
> @@ -334,6 +336,7 @@ typedef struct xl_heap_freeze_tuple
>   */
>  typedef struct xl_heap_freeze_page
>  {
> +    bool        onCatalogTable;
>      TransactionId cutoff_xid;
>      uint16        ntuples;
>  } xl_heap_freeze_page;
> @@ -348,6 +351,7 @@ typedef struct xl_heap_freeze_page
>   */
>  typedef struct xl_heap_visible
>  {
> +    bool        onCatalogTable;
>      TransactionId cutoff_xid;
>      uint8        flags;
>  } xl_heap_visible;

Reminder to self: This needs a WAL version bump.

> diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
> index 9a3a03e520..3405070d63 100644
> --- a/src/include/utils/rel.h
> +++ b/src/include/utils/rel.h
> @@ -16,6 +16,7 @@
>  
>  #include "access/tupdesc.h"
>  #include "access/xlog.h"
> +#include "catalog/catalog.h"
>  #include "catalog/pg_class.h"
>  #include "catalog/pg_index.h"
>  #include "catalog/pg_publication.h"

Not clear why this is in this patch?



> diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
> index 5ba776e789..03c5dbea48 100644
> --- a/src/backend/postmaster/pgstat.c
> +++ b/src/backend/postmaster/pgstat.c
> @@ -2928,6 +2928,24 @@ pgstat_send_archiver(const char *xlog, bool failed)
>      pgstat_send(&msg, sizeof(msg));
>  }
>  
> +/* ----------
> + * pgstat_send_droplogicalslot() -
> + *
> + *    Tell the collector about a logical slot being dropped
> + *    due to conflict.
> + * ----------
> + */
> +void
> +pgstat_send_droplogicalslot(Oid dbOid)
> +{
> +    PgStat_MsgRecoveryConflict msg;
> +
> +    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
> +    msg.m_databaseid = dbOid;
> +    msg.m_reason = PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT;
> +    pgstat_send(&msg, sizeof(msg));
> +}

Why do we have this in adition to pgstat_report_replslot_drop()? ISTM
that we should instead add a reason parameter to
pgstat_report_replslot_drop()?


> +/*
> + * Resolve recovery conflicts with logical slots.
> + *
> + * When xid is valid, it means that rows older than xid might have been
> + * removed.

I don't think the past tense is correct - the rows better not be removed
yet on the standby, otherwise we'd potentially do something random in
decoding.


> diff --git a/src/test/recovery/t/024_standby_logical_decoding_xmins.pl
b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl
> new file mode 100644
> index 0000000000..d654d79526
> --- /dev/null
> +++ b/src/test/recovery/t/024_standby_logical_decoding_xmins.pl
> @@ -0,0 +1,272 @@
> +# logical decoding on a standby : ensure xmins are appropriately updated
> +
> +use strict;
> +use warnings;
> +
> +use PostgresNode;
> +use TestLib;
> +use Test::More tests => 23;
> +use RecursiveCopy;
> +use File::Copy;
> +use Time::HiRes qw(usleep);

Several of these don't actually seem to be used?


> +########################
> +# Initialize master node
> +########################

(I'll rename these to primary/replica)


> +$node_master->init(allows_streaming => 1, has_archiving => 1);
> +$node_master->append_conf('postgresql.conf', q{
> +wal_level = 'logical'
> +max_replication_slots = 4
> +max_wal_senders = 4
> +log_min_messages = 'debug2'
> +log_error_verbosity = verbose
> +# very promptly terminate conflicting backends
> +max_standby_streaming_delay = '2s'
> +});

Why is this done on the primary, rather than on the standby?


> +################################
> +# Catalog xmins should advance after standby logical slot fetches the changes.
> +################################
> +
> +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot,
> +# we hold down xmin.

I don't know what that means.


> +$node_master->safe_psql('postgres', qq[CREATE TABLE catalog_increase_1();]);
> +$node_master->safe_psql('postgres', 'CREATE TABLE test_table(id serial primary key, blah text)');
> +for my $i (0 .. 2000)
> +{
> +    $node_master->safe_psql('postgres', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]);
> +}

Forking 2000 psql processes is pretty expensive, especially on slower
machines. What is this supposed to test?


> +($ret, $stdout, $stderr) = $node_standby->psql('postgres',
> +    qq[SELECT data FROM pg_logical_slot_get_changes('$standby_slotname', NULL, NULL, 'include-xids', '0',
'skip-empty-xacts','1', 'include-timestamp', '0')]);
 
> +is($ret, 0, 'replay of big series succeeded');
> +isnt($stdout, '', 'replayed some rows');

Nothing is being replayed...



> +######################
> +# Upstream oldestXid should not go past downstream catalog_xmin
> +######################
> +
> +# First burn some xids on the master in another DB, so we push the master's
> +# nextXid ahead.
> +foreach my $i (1 .. 100)
> +{
> +    $node_master->safe_psql('postgres', 'SELECT txid_current()');
> +}
> +
> +# Force vacuum freeze on the master and ensure its oldestXmin doesn't advance
> +# past our needed xmin. The only way we have visibility into that is to force
> +# a checkpoint.
> +$node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'");
> +foreach my $dbname ('template1', 'postgres', 'postgres', 'template0')
> +{
> +    $node_master->safe_psql($dbname, 'VACUUM FREEZE');
> +}
> +$node_master->safe_psql('postgres', 'CHECKPOINT');
> +IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout)
> +    or die "pg_controldata failed with $?";
> +my @checkpoint = split('\n', $stdout);
> +my $oldestXid = '';
> +foreach my $line (@checkpoint)
> +{
> +    if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/)
> +    {
> +        $oldestXid = $1;
> +    }
> +}
> +die 'no oldestXID found in checkpoint' unless $oldestXid;
> +
> +cmp_ok($oldestXid, "<=", $node_standby->slot($standby_slotname)->{'catalog_xmin'},
> +       'upstream oldestXid not past downstream catalog_xmin with hs_feedback on');
> +
> +$node_master->safe_psql('postgres',
> +    "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'");
> +

I am thinking of removing this test. It doesn't seem to test anything
really related to the issue at hand, and seems complicated (needing to
update datallowcon, manually triggering checkpoints, parsing
pg_controldata output).


> +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
> +# given boolean condition to be true to ensure we've reached a quiescent state
> +sub wait_for_xmins
> +{
> +    my ($node, $slotname, $check_expr) = @_;
> +
> +    $node->poll_query_until(
> +        'postgres', qq[
> +        SELECT $check_expr
> +        FROM pg_catalog.pg_replication_slots
> +        WHERE slot_name = '$slotname';
> +    ]) or die "Timed out waiting for slot xmins to advance";
> +}
> +
> +# Verify that pg_stat_database_conflicts.confl_logicalslot has been updated
> +sub check_confl_logicalslot
> +{
> +    ok( $node_standby->poll_query_until(
> +        'postgres',
> +        "select (confl_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
> +        'confl_logicalslot updated') or die "Timed out waiting confl_logicalslot to be updated";
> +}
> +

Given that this hardcodes a specific number of conflicting slots etc,
there doesn't seem much point in making this a function...


> +# Acquire one of the standby logical slots created by create_logical_slots()
> +sub make_slot_active
> +{
> +    my $slot_user_handle;
> +
> +    # make sure activeslot is in use
> +    print "starting pg_recvlogical\n";
> +    $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node_standby->connstr('testdb'), '-S',
'activeslot','-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr);
 
> +
> +    while (!$node_standby->slot('activeslot')->{'active_pid'})
> +    {
> +        usleep(100_000);
> +        print "waiting for slot to become active\n";
> +    }
> +    return $slot_user_handle;
> +}

It's a bad idea to not have timeouts in things like this - if there's a
problem, it'll lead to the test never returning. Things like
poll_query_until() have timeouts to deal with this, but this doesn't.


> +# Check if all the slots on standby are dropped. These include the 'activeslot'
> +# that was acquired by make_slot_active(), and the non-active 'dropslot'.
> +sub check_slots_dropped
> +{
> +    my ($slot_user_handle) = @_;
> +    my $return;
> +
> +    is($node_standby->slot('dropslot')->{'slot_type'}, '', 'dropslot on standby dropped');
> +    is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
> +
> +    # our client should've terminated in response to the walsender error
> +    eval {
> +        $slot_user_handle->finish;
> +    };
> +    $return = $?;
> +    cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero\n");
> +    if ($return) {
> +        like($stderr, qr/conflict with recovery/, 'recvlogical recovery conflict');
> +        like($stderr, qr/must be dropped/, 'recvlogical error detail');
> +    }

Why do we need to use eval{} for things like checking if a program
finished?


> @@ -297,6 +297,24 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
>       may consume changes from a slot at any given time.
>      </para>
>  
> +    <para>
> +     A logical replication slot can also be created on a hot standby. To prevent
> +     <command>VACUUM</command> from removing required rows from the system
> +     catalogs, <varname>hot_standby_feedback</varname> should be set on the
> +     standby. In spite of that, if any required rows get removed, the slot gets
> +     dropped. Existing logical slots on standby also get dropped if wal_level
> +     on primary is reduced to less than 'logical'.
> +    </para>

I think this should add that it's very advisable to use a physical slot
between primary and standby. Otherwise hot_standby_feedback will work,
but only while the connection is alive - as soon as it breaks, a node
gets restarted, ...

Greetings,

Andres Freund



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Heikki Linnakangas
Дата:
Сообщение: Re: Force lookahead in COPY FROM parsing
Следующее
От: Mohamed Mansour
Дата:
Сообщение: Re: GSoc Applicant