Re: Logical Replication WIP

Поиск
Список
Период
Сортировка
От Petr Jelinek
Тема Re: Logical Replication WIP
Дата
Msg-id 1798f7d3-a4e2-5c39-6bfe-b08b812da0f5@2ndquadrant.com
обсуждение исходный текст
Ответ на Re: Logical Replication WIP  (Andres Freund <andres@anarazel.de>)
Список pgsql-hackers
On 14/09/16 18:21, Andres Freund wrote:
> (continuing, uh, a bit happier)
>
> On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:
>
>> +/*
>> + * Relcache invalidation callback for our relation map cache.
>> + */
>> +static void
>> +logicalreprelmap_invalidate_cb(Datum arg, Oid reloid)
>> +{
>> +    LogicalRepRelMapEntry  *entry;
>> +
>> +    /* Just to be sure. */
>> +    if (LogicalRepRelMap == NULL)
>> +        return;
>> +
>> +    if (reloid != InvalidOid)
>> +    {
>> +        HASH_SEQ_STATUS status;
>> +
>> +        hash_seq_init(&status, LogicalRepRelMap);
>> +
>> +        /* TODO, use inverse lookup hastable? */
>
> *hashtable
>
>> +        while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
>> +        {
>> +            if (entry->reloid == reloid)
>> +                entry->reloid = InvalidOid;
>
> can't we break here?
>

Probably.

>
>> +/*
>> + * Initialize the relation map cache.
>> + */
>> +static void
>> +remoterelmap_init(void)
>> +{
>> +    HASHCTL        ctl;
>> +
>> +    /* Make sure we've initialized CacheMemoryContext. */
>> +    if (CacheMemoryContext == NULL)
>> +        CreateCacheMemoryContext();
>> +
>> +    /* Initialize the hash table. */
>> +    MemSet(&ctl, 0, sizeof(ctl));
>> +    ctl.keysize = sizeof(uint32);
>> +    ctl.entrysize = sizeof(LogicalRepRelMapEntry);
>> +    ctl.hcxt = CacheMemoryContext;
>
> Wonder if this (and similar code earlier) should try to do everything in
> a sub-context of CacheMemoryContext instead. That'd make some issues
> easier to track down.

Sure. don't see why not.

>
>> +/*
>> + * Open the local relation associated with the remote one.
>> + */
>> +static LogicalRepRelMapEntry *
>> +logicalreprel_open(uint32 remoteid, LOCKMODE lockmode)
>> +{
>> +    LogicalRepRelMapEntry  *entry;
>> +    bool        found;
>> +
>> +    if (LogicalRepRelMap == NULL)
>> +        remoterelmap_init();
>> +
>> +    /* Search for existing entry. */
>> +    entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
>> +                        HASH_FIND, &found);
>> +
>> +    if (!found)
>> +        elog(FATAL, "cache lookup failed for remote relation %u",
>> +             remoteid);
>> +
>> +    /* Need to update the local cache? */
>> +    if (!OidIsValid(entry->reloid))
>> +    {
>> +        Oid            nspid;
>> +        Oid            relid;
>> +        int            i;
>> +        TupleDesc    desc;
>> +        LogicalRepRelation *remoterel;
>> +
>> +        remoterel = &entry->remoterel;
>> +
>> +        nspid = LookupExplicitNamespace(remoterel->nspname, false);
>> +        if (!OidIsValid(nspid))
>> +            ereport(FATAL,
>> +                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>> +                     errmsg("the logical replication target %s not found",
>> +                            quote_qualified_identifier(remoterel->nspname,
>                                                        remoterel->relname))));
>> +        relid = get_relname_relid(remoterel->relname, nspid);
>> +        if (!OidIsValid(relid))
>> +            ereport(FATAL,
>> +                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>> +                     errmsg("the logical replication target %s not found",
>> +                            quote_qualified_identifier(remoterel->nspname,
>> +                                                       remoterel->relname))));
>> +
>> +        entry->rel = heap_open(relid, lockmode);
>
> This seems rather racy. I think this really instead needs something akin
> to RangeVarGetRelidExtended().

Maybe, I am not sure if it really matters here given how it's used, but 
I can change that.

>
>> +/*
>> + * Executor state preparation for evaluation of constraint expressions,
>> + * indexes and triggers.
>> + *
>> + * This is based on similar code in copy.c
>> + */
>> +static EState *
>> +create_estate_for_relation(LogicalRepRelMapEntry *rel)
>> +{
>> +    EState       *estate;
>> +    ResultRelInfo *resultRelInfo;
>> +    RangeTblEntry *rte;
>> +
>> +    estate = CreateExecutorState();
>> +
>> +    rte = makeNode(RangeTblEntry);
>> +    rte->rtekind = RTE_RELATION;
>> +    rte->relid = RelationGetRelid(rel->rel);
>> +    rte->relkind = rel->rel->rd_rel->relkind;
>> +    estate->es_range_table = list_make1(rte);
>> +
>> +    resultRelInfo = makeNode(ResultRelInfo);
>> +    InitResultRelInfo(resultRelInfo, rel->rel, 1, 0);
>> +
>> +    estate->es_result_relations = resultRelInfo;
>> +    estate->es_num_result_relations = 1;
>> +    estate->es_result_relation_info = resultRelInfo;
>> +
>> +    /* Triggers might need a slot */
>> +    if (resultRelInfo->ri_TrigDesc)
>> +        estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
>> +
>> +    return estate;
>> +}
>
> Ugh, we do this for every single change? That's pretty darn heavy.
>

I plan to add caching but didn't come up with good way of doing that yet.

>
>> +/*
>> + * Check if the local attribute is present in relation definition used
>> + * by upstream and hence updated by the replication.
>> + */
>> +static bool
>> +physatt_in_attmap(LogicalRepRelMapEntry *rel, int attid)
>> +{
>> +    AttrNumber    i;
>> +
>> +    /* Fast path for tables that are same on upstream and downstream. */
>> +    if (attid < rel->remoterel.natts && rel->attmap[attid] == attid)
>> +        return true;
>> +
>> +    /* Try to find the attribute in the map. */
>> +    for (i = 0; i < rel->remoterel.natts; i++)
>> +        if (rel->attmap[i] == attid)
>> +            return true;
>> +
>> +    return false;
>> +}
>
> Shouldn't we rather try to keep an attribute map that always can map
> remote attribute numbers to local ones? That doesn't seem hard on a
> first blush? But I might be missing something here.
>




>
>> +static void
>> +FillSlotDefaults(LogicalRepRelMapEntry *rel, EState *estate,
>> +                 TupleTableSlot *slot)
>> +{
>
> Why is this using a different naming scheme?
>

Because I originally wanted to put it into executor.

>> +/*
>> + * Handle INSERT message.
>> + */
>> +static void
>> +handle_insert(StringInfo s)
>> +{
>> +    LogicalRepRelMapEntry *rel;
>> +    LogicalRepTupleData    newtup;
>> +    LogicalRepRelId        relid;
>> +    EState               *estate;
>> +    TupleTableSlot       *remoteslot;
>> +    MemoryContext        oldctx;
>> +
>> +    ensure_transaction();
>> +
>> +    relid = logicalrep_read_insert(s, &newtup);
>> +    rel = logicalreprel_open(relid, RowExclusiveLock);
>> +
>> +    /* Initialize the executor state. */
>> +    estate = create_estate_for_relation(rel);
>> +    remoteslot = ExecInitExtraTupleSlot(estate);
>> +    ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->rel));
>
> This seems incredibly expensive for replicating a lot of rows.

You mean because of create_estate_for_relation()?

>
>> +/*
>> + * Search the relation 'rel' for tuple using the replication index.
>> + *
>> + * If a matching tuple is found lock it with lockmode, fill the slot with its
>> + * contents and return true, return false is returned otherwise.
>> + */
>> +static bool
>> +tuple_find_by_replidx(Relation rel, LockTupleMode lockmode,
>> +                      TupleTableSlot *searchslot, TupleTableSlot *slot)
>> +{
>> +    HeapTuple        scantuple;
>> +    ScanKeyData        skey[INDEX_MAX_KEYS];
>> +    IndexScanDesc    scan;
>> +    SnapshotData    snap;
>> +    TransactionId    xwait;
>> +    Oid                idxoid;
>> +    Relation        idxrel;
>> +    bool            found;
>> +
>> +    /* Open REPLICA IDENTITY index.*/
>> +    idxoid = RelationGetReplicaIndex(rel);
>> +    if (!OidIsValid(idxoid))
>> +    {
>> +        elog(ERROR, "could not find configured replica identity for table \"%s\"",
>> +             RelationGetRelationName(rel));
>> +        return false;
>> +    }
>> +    idxrel = index_open(idxoid, RowExclusiveLock);
>> +
>> +    /* Start an index scan. */
>> +    InitDirtySnapshot(snap);
>> +    scan = index_beginscan(rel, idxrel, &snap,
>> +                           RelationGetNumberOfAttributes(idxrel),
>> +                           0);
>> +
>> +    /* Build scan key. */
>> +    build_replindex_scan_key(skey, rel, idxrel, searchslot);
>> +
>> +retry:
>> +    found = false;
>> +
>> +    index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
>> +
>> +    /* Try to find the tuple */
>> +    if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
>> +    {
>> +        found = true;
>> +        ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
>> +        ExecMaterializeSlot(slot);
>> +
>> +        xwait = TransactionIdIsValid(snap.xmin) ?
>> +            snap.xmin : snap.xmax;
>> +
>> +        /*
>> +         * If the tuple is locked, wait for locking transaction to finish
>> +         * and retry.
>> +         */
>> +        if (TransactionIdIsValid(xwait))
>> +        {
>> +            XactLockTableWait(xwait, NULL, NULL, XLTW_None);
>> +            goto retry;
>> +        }
>> +    }
>
> Hm. So we potentially find multiple tuples here, and lock all of
> them. but then only use one for the update.
>

That's not how that code reads for me.

>
>> +static List *
>> +get_subscription_list(void)
>> +{
>> +    List       *res = NIL;
>> +    Relation    rel;
>> +    HeapScanDesc scan;
>> +    HeapTuple    tup;
>> +    MemoryContext resultcxt;
>> +
>> +    /* This is the context that we will allocate our output data in */
>> +    resultcxt = CurrentMemoryContext;
>> +
>> +    /*
>> +     * Start a transaction so we can access pg_database, and get a snapshot.
>> +     * We don't have a use for the snapshot itself, but we're interested in
>> +     * the secondary effect that it sets RecentGlobalXmin.  (This is critical
>> +     * for anything that reads heap pages, because HOT may decide to prune
>> +     * them even if the process doesn't attempt to modify any tuples.)
>> +     */
>
>> +    StartTransactionCommand();
>> +    (void) GetTransactionSnapshot();
>> +
>> +    rel = heap_open(SubscriptionRelationId, AccessShareLock);
>> +    scan = heap_beginscan_catalog(rel, 0, NULL);
>> +
>> +    while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
>> +    {
>> +        Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
>> +        Subscription   *sub;
>> +        MemoryContext    oldcxt;
>> +
>> +        /*
>> +         * Allocate our results in the caller's context, not the
>> +         * transaction's. We do this inside the loop, and restore the original
>> +         * context at the end, so that leaky things like heap_getnext() are
>> +         * not called in a potentially long-lived context.
>> +         */
>> +        oldcxt = MemoryContextSwitchTo(resultcxt);
>> +
>> +        sub = (Subscription *) palloc(sizeof(Subscription));
>> +        sub->oid = HeapTupleGetOid(tup);
>> +        sub->dbid = subform->subdbid;
>> +        sub->enabled = subform->subenabled;
>> +
>> +        /* We don't fill fields we are not intereste in. */
>> +        sub->name = NULL;
>> +        sub->conninfo = NULL;
>> +        sub->slotname = NULL;
>> +        sub->publications = NIL;
>> +
>> +        res = lappend(res, sub);
>> +        MemoryContextSwitchTo(oldcxt);
>> +    }
>> +
>> +    heap_endscan(scan);
>> +    heap_close(rel, AccessShareLock);
>> +
>> +    CommitTransactionCommand();
>
> Hm. this doesn't seem quite right from a locking pov. What if, in the
> middle of this, a new subscription is created?
>

So it will be called again eventually in the next iteration of main 
loop. We don't perfectly stable world view here, just snapshot of it to 
work with.

>
> Hadn't I previously read about always streaming data to disk first?
>
>> @@ -0,0 +1,674 @@
>> +/*-------------------------------------------------------------------------
>> + * tablesync.c
>> + *       PostgreSQL logical replication
>> + *
>> + * Copyright (c) 2012-2016, PostgreSQL Global Development Group
>> + *
>> + * IDENTIFICATION
>> + *      src/backend/replication/logical/tablesync.c
>> + *
>> + * NOTES
>> + *      This file contains code for initial table data synchronization for
>> + *      logical replication.
>> + *
>> + *    The initial data synchronization is done separately for each table,
>> + *    in separate apply worker that only fetches the initial snapshot data
>> + *    from the provider and then synchronizes the position in stream with
>> + *    the main apply worker.
>
> Why? I guess that's because it allows to incrementally add tables, with
> acceptable overhead.
>

Yes I need to document why's more here. It enables us to copy multiple 
tables in parallel (in the future). It also is needed for adding tables 
after the initial sync as you say.

>
>> + *    The stream position synchronization works in multiple steps.
>> + *     - sync finishes copy and sets table state as SYNCWAIT and waits
>> + *       for state to change in a loop
>> + *     - apply periodically checks unsynced tables for SYNCWAIT, when it
>> + *       appears it will compare its position in the stream with the
>> + *       SYNCWAIT position and decides to either set it to CATCHUP when
>> + *       the apply was infront (and wait for the sync to do the catchup),
>> + *       or set the state to SYNCDONE if the sync was infront or in case
>> + *       both sync and apply are at the same position it will set it to
>> + *       READY and stops tracking it
>
> I'm not quite following here.
>

It's hard for me to explain I guess, that's why the flow diagram is 
underneath. The point is to reach same LSN for the table before the main 
apply process can take over the replication of that table. There are 2 
possible scenarios
a) either apply has replayed more of the stream than sync did and then 
the sync needs to ask apply to wait for it a bit (which blocks 
replication for short while)
b) or the sync has replayed more of the stream than sync and then apply 
needs to track the table for a while (and don't apply changes to it) 
until it reaches the same position where sync stopped and once it 
reaches that point it can just apply changes to it same as to any old table

>> + *     - if the state was set to CATCHUP sync will read the stream and
>> + *       apply changes until it catches up to the specified stream
>> + *       position and then sets state to READY and signals apply that it
>> + *       can stop waiting and exits, if the state was set to something
>> + *       else than CATCHUP the sync process will simply end
>> + *     - if the state was set to SYNCDONE by apply, the apply will
>> + *       continue tracking the table until it reaches the SYNCDONE stream
>> + *       position at which point it sets state to READY and stops tracking
>> + *
>> + *    Example flows look like this:
>> + *     - Apply is infront:
>> + *          sync:8   -> set SYNCWAIT
>> + *        apply:10 -> set CATCHUP
>> + *        sync:10  -> set ready
>> + *          exit
>> + *        apply:10
>> + *          stop tracking
>> + *          continue rep
>> + *    - Sync infront:
>> + *        sync:10
>> + *          set SYNCWAIT
>> + *        apply:8
>> + *          set SYNCDONE
>> + *        sync:10
>> + *          exit
>> + *        apply:10
>> + *          set READY
>> + *          stop tracking
>> + *          continue rep
>
> This definitely needs to be expanded a bit. Where are we tracking how
> far replication has progressed on individual tables? Are we creating new
> slots for syncing? Is there any parallelism in syncing?
>

Yes, new slots, tracking is in pg_subscription_rel, parallelism is not 
there yet, but the design is ready for expanding it (I currently 
artificially limit the number of sync workers to one to limit potential 
bugs, but afaik it could just be bumped to more and it should work).

>> +/*
>> + * Exit routine for synchronization worker.
>> + */
>> +static void
>> +finish_sync_worker(char *slotname)
>> +{
>> +    LogicalRepWorker   *worker;
>> +    RepOriginId            originid;
>> +    MemoryContext        oldctx = CurrentMemoryContext;
>> +
>> +    /*
>> +     * Drop the replication slot on remote server.
>> +     * We want to continue even in the case that the slot on remote side
>> +     * is already gone. This means that we can leave slot on the remote
>> +     * side but that can happen for other reasons as well so we can't
>> +     * really protect against that.
>> +     */
>> +    PG_TRY();
>> +    {
>> +        wrcapi->drop_slot(wrchandle, slotname);
>> +    }
>> +    PG_CATCH();
>> +    {
>> +        MemoryContext    ectx;
>> +        ErrorData       *edata;
>> +
>> +        ectx = MemoryContextSwitchTo(oldctx);
>> +        /* Save error info */
>> +        edata = CopyErrorData();
>> +        MemoryContextSwitchTo(ectx);
>> +        FlushErrorState();
>> +
>> +        ereport(WARNING,
>> +                (errmsg("there was problem dropping the replication slot "
>> +                        "\"%s\" on provider", slotname),
>> +                 errdetail("The error was: %s", edata->message),
>> +                 errhint("You may have to drop it manually")));
>> +        FreeErrorData(edata);
>
> ISTM we really should rather return success/failure here, and not throw
> an error inside the libpqwalreceiver stuff.  I kind of wonder if we
> actually can get rid of this indirection.
>

Yeah I can do success/failure. Not sure what you mean by indirection.

>> +         * to ensure that we are not behind it (it's going to wait at this
>> +         * point for the change of state). Once we are infront or at the same
>> +         * position as the synchronization proccess we can signal it to
>> +         * finish the catchup.
>> +         */
>> +        if (tstate->state == SUBREL_STATE_SYNCWAIT)
>> +        {
>> +            if (end_lsn > tstate->lsn)
>> +            {
>> +                /*
>> +                 * Apply is infront, tell sync to catchup. and wait until
>> +                 * it does.
>> +                 */
>> +                tstate->state = SUBREL_STATE_CATCHUP;
>> +                tstate->lsn = end_lsn;
>> +                StartTransactionCommand();
>> +                SetSubscriptionRelState(MyLogicalRepWorker->subid,
>> +                                        tstate->relid, tstate->state,
>> +                                        tstate->lsn);
>> +                CommitTransactionCommand();
>> +
>> +                /* Signal the worker as it may be waiting for us. */
>> +                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
>> +                worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
>> +                                                tstate->relid);
>> +                if (worker && worker->proc)
>> +                    SetLatch(&worker->proc->procLatch);
>> +                LWLockRelease(LogicalRepWorkerLock);
>
> Different parts of this file use different lock level to set the
> latch. Why?
>

The latch does not need the lock, not really following what you mean. 
But the lock here is for the benefit of logicalrep_worker_find.

>
>> +                if (wait_for_sync_status_change(tstate))
>> +                    Assert(tstate->state == SUBREL_STATE_READY);
>> +            }
>> +            else
>> +            {
>> +                /*
>> +                 * Apply is either behind in which case sync worker is done
>> +                 * but apply needs to keep tracking the table until it
>> +                 * catches up to where sync finished.
>> +                 * Or apply and sync are at the same position in which case
>> +                 * table can be switched to standard replication mode
>> +                 * immediately.
>> +                 */
>> +                if (end_lsn < tstate->lsn)
>> +                    tstate->state = SUBREL_STATE_SYNCDONE;
>> +                else
>> +                    tstate->state = SUBREL_STATE_READY;
>> +
>
> What I'm failing to understand is how this can be done under
> concurrency. You probably thought about this, but it should really be
> explained somewhere.

Well, so, if the original state was syncdone (the previous branch) the 
apply won't actually do any work until the state changes (and it can 
only change to either syncdone or ready at that point) so there is no 
real concurrently. If reach this branch then either sync worker already 
exited (if it set the state to syncdone) or it's not doing anything and 
is waiting for apply to set state to ready in which case there is also 
no concurrency.

>> +        /*
>> +         * In case table is supposed to be synchronizing but the
>> +         * synchronization worker is not running, start it.
>> +         * Limit the number of launched workers here to one (for now).
>> +         */
>
> Hm. That seems problematic for online upgrade type cases, we might never
> be catch up that way...
>

You mean the limit to 1? That's just because I didn't get to creating 
GUC for configuring this.

>
>
>> +                /*
>> +                 * We want to do the table data sync in single
>> +                 * transaction so do not close the transaction opened
>> +                 * above.
>> +                 * There will be no BEGIN or COMMIT messages coming via
>> +                 * logical replication while the copy table command is
>> +                 * running so start the transaction here.
>> +                 * Note the memory context for data handling will still
>> +                 * be done using ensure_transaction called by the insert
>> +                 * handler.
>> +                 */
>> +                StartTransactionCommand();
>> +
>> +                /*
>> +                 * Don't allow parallel access other than SELECT while
>> +                 * the initial contents are being copied.
>> +                 */
>> +                rel = heap_open(tstate.relid, ExclusiveLock);
>
> Why do we want to allow access at all?
>

I didn't see reason to not allow selects.

>
>
>> @@ -87,6 +92,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>>      cb->commit_cb = pgoutput_commit_txn;
>>      cb->filter_by_origin_cb = pgoutput_origin_filter;
>>      cb->shutdown_cb = pgoutput_shutdown;
>> +    cb->tuple_cb = pgoutput_tuple;
>> +    cb->list_tables_cb = pgoutput_list_tables;
>>  }
>
> What are these new, and undocumented callbacks actually doing? And why
> is this integrated into logical decoding?
>

In the initial email I was saying that I am not very happy with this 
design, that's still true, because they don't belong to decoding.

>
>>  /*
>> + * Handle LIST_TABLES command.
>> + */
>> +static void
>> +SendTableList(ListTablesCmd *cmd)
>> +{
>
> Ugh.
>
>
> I really dislike this kind of command. I think we should instead change
> things around, allowing to issue normal SQL via the replication
> command. We'll have to error out for running sql for non-database
> connected replication connections, but that seems fine.
>

Note per discussion offline we agree to do this stuff over normal 
connection for now.

--   Petr Jelinek                  http://www.2ndQuadrant.com/  PostgreSQL Development, 24x7 Support, Training &
Services



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Jeff Janes
Дата:
Сообщение: Re: Hash Indexes
Следующее
От: Petr Jelinek
Дата:
Сообщение: Re: Logical Replication WIP