Re: Logical Replication WIP

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Logical Replication WIP
Дата
Msg-id 20160914162128.pemvaxjvd47qjrkt@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: Logical Replication WIP  (Petr Jelinek <petr@2ndquadrant.com>)
Ответы Re: Logical Replication WIP  (Peter Eisentraut <peter.eisentraut@2ndquadrant.com>)
Re: Logical Replication WIP  (Petr Jelinek <petr@2ndquadrant.com>)
Список pgsql-hackers
(continuing, uh, a bit happier)

On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:

> +/*
> + * Relcache invalidation callback for our relation map cache.
> + */
> +static void
> +logicalreprelmap_invalidate_cb(Datum arg, Oid reloid)
> +{
> +    LogicalRepRelMapEntry  *entry;
> +
> +    /* Just to be sure. */
> +    if (LogicalRepRelMap == NULL)
> +        return;
> +
> +    if (reloid != InvalidOid)
> +    {
> +        HASH_SEQ_STATUS status;
> +
> +        hash_seq_init(&status, LogicalRepRelMap);
> +
> +        /* TODO, use inverse lookup hastable? */

*hashtable

> +        while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
> +        {
> +            if (entry->reloid == reloid)
> +                entry->reloid = InvalidOid;

can't we break here?


> +/*
> + * Initialize the relation map cache.
> + */
> +static void
> +remoterelmap_init(void)
> +{
> +    HASHCTL        ctl;
> +
> +    /* Make sure we've initialized CacheMemoryContext. */
> +    if (CacheMemoryContext == NULL)
> +        CreateCacheMemoryContext();
> +
> +    /* Initialize the hash table. */
> +    MemSet(&ctl, 0, sizeof(ctl));
> +    ctl.keysize = sizeof(uint32);
> +    ctl.entrysize = sizeof(LogicalRepRelMapEntry);
> +    ctl.hcxt = CacheMemoryContext;

Wonder if this (and similar code earlier) should try to do everything in
a sub-context of CacheMemoryContext instead. That'd make some issues
easier to track down.

> +/*
> + * Open the local relation associated with the remote one.
> + */
> +static LogicalRepRelMapEntry *
> +logicalreprel_open(uint32 remoteid, LOCKMODE lockmode)
> +{
> +    LogicalRepRelMapEntry  *entry;
> +    bool        found;
> +
> +    if (LogicalRepRelMap == NULL)
> +        remoterelmap_init();
> +
> +    /* Search for existing entry. */
> +    entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
> +                        HASH_FIND, &found);
> +
> +    if (!found)
> +        elog(FATAL, "cache lookup failed for remote relation %u",
> +             remoteid);
> +
> +    /* Need to update the local cache? */
> +    if (!OidIsValid(entry->reloid))
> +    {
> +        Oid            nspid;
> +        Oid            relid;
> +        int            i;
> +        TupleDesc    desc;
> +        LogicalRepRelation *remoterel;
> +
> +        remoterel = &entry->remoterel;
> +
> +        nspid = LookupExplicitNamespace(remoterel->nspname, false);
> +        if (!OidIsValid(nspid))
> +            ereport(FATAL,
> +                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                     errmsg("the logical replication target %s not found",
> +                            quote_qualified_identifier(remoterel->nspname,
       remoterel->relname))));
 
> +        relid = get_relname_relid(remoterel->relname, nspid);
> +        if (!OidIsValid(relid))
> +            ereport(FATAL,
> +                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                     errmsg("the logical replication target %s not found",
> +                            quote_qualified_identifier(remoterel->nspname,
> +                                                       remoterel->relname))));
> +
> +        entry->rel = heap_open(relid, lockmode);

This seems rather racy. I think this really instead needs something akin
to RangeVarGetRelidExtended().

> +/*
> + * Executor state preparation for evaluation of constraint expressions,
> + * indexes and triggers.
> + *
> + * This is based on similar code in copy.c
> + */
> +static EState *
> +create_estate_for_relation(LogicalRepRelMapEntry *rel)
> +{
> +    EState       *estate;
> +    ResultRelInfo *resultRelInfo;
> +    RangeTblEntry *rte;
> +
> +    estate = CreateExecutorState();
> +
> +    rte = makeNode(RangeTblEntry);
> +    rte->rtekind = RTE_RELATION;
> +    rte->relid = RelationGetRelid(rel->rel);
> +    rte->relkind = rel->rel->rd_rel->relkind;
> +    estate->es_range_table = list_make1(rte);
> +
> +    resultRelInfo = makeNode(ResultRelInfo);
> +    InitResultRelInfo(resultRelInfo, rel->rel, 1, 0);
> +
> +    estate->es_result_relations = resultRelInfo;
> +    estate->es_num_result_relations = 1;
> +    estate->es_result_relation_info = resultRelInfo;
> +
> +    /* Triggers might need a slot */
> +    if (resultRelInfo->ri_TrigDesc)
> +        estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
> +
> +    return estate;
> +}

Ugh, we do this for every single change? That's pretty darn heavy.


> +/*
> + * Check if the local attribute is present in relation definition used
> + * by upstream and hence updated by the replication.
> + */
> +static bool
> +physatt_in_attmap(LogicalRepRelMapEntry *rel, int attid)
> +{
> +    AttrNumber    i;
> +
> +    /* Fast path for tables that are same on upstream and downstream. */
> +    if (attid < rel->remoterel.natts && rel->attmap[attid] == attid)
> +        return true;
> +
> +    /* Try to find the attribute in the map. */
> +    for (i = 0; i < rel->remoterel.natts; i++)
> +        if (rel->attmap[i] == attid)
> +            return true;
> +
> +    return false;
> +}

Shouldn't we rather try to keep an attribute map that always can map
remote attribute numbers to local ones? That doesn't seem hard on a
first blush? But I might be missing something here.


> +/*
> + * Executes default values for columns for which we can't map to remote
> + * relation columns.
> + *
> + * This allows us to support tables which have more columns on the downstream
> + * than on the upsttream.
> + */

Typo: upsttream.


> +static void
> +FillSlotDefaults(LogicalRepRelMapEntry *rel, EState *estate,
> +                 TupleTableSlot *slot)
> +{

Why is this using a different naming scheme?


> +/*
> + * Handle COMMIT message.
> + *
> + * TODO, support tracking of multiple origins
> + */
> +static void
> +handle_commit(StringInfo s)
> +{
> +    XLogRecPtr        commit_lsn;
> +    XLogRecPtr        end_lsn;
> +    TimestampTz        commit_time;
> +
> +    logicalrep_read_commit(s, &commit_lsn, &end_lsn, &commit_time);

Perhaps this (and related routines) should rather be       LogicalRepCommitdata commit_data;
logicalrep_read_commit(s,&commit_data);
 
etc? That way the data can transparently be enhanced.

> +    Assert(commit_lsn == replorigin_session_origin_lsn);
> +    Assert(commit_time == replorigin_session_origin_timestamp);
> +
> +    if (IsTransactionState())
> +    {
> +        FlushPosition *flushpos;
> +
> +        CommitTransactionCommand();
> +        MemoryContextSwitchTo(CacheMemoryContext);
> +
> +        /* Track commit lsn  */
> +        flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
> +        flushpos->local_end = XactLastCommitEnd;
> +        flushpos->remote_end = end_lsn;
> +
> +        dlist_push_tail(&lsn_mapping, &flushpos->node);
> +        MemoryContextSwitchTo(ApplyContext);

Seems like it should be in a separate function.


> +/*
> + * Handle INSERT message.
> + */
> +static void
> +handle_insert(StringInfo s)
> +{
> +    LogicalRepRelMapEntry *rel;
> +    LogicalRepTupleData    newtup;
> +    LogicalRepRelId        relid;
> +    EState               *estate;
> +    TupleTableSlot       *remoteslot;
> +    MemoryContext        oldctx;
> +
> +    ensure_transaction();
> +
> +    relid = logicalrep_read_insert(s, &newtup);
> +    rel = logicalreprel_open(relid, RowExclusiveLock);
> +
> +    /* Initialize the executor state. */
> +    estate = create_estate_for_relation(rel);
> +    remoteslot = ExecInitExtraTupleSlot(estate);
> +    ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->rel));

This seems incredibly expensive for replicating a lot of rows.

> +    /* Process and store remote tuple in the slot */
> +    oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> +    SlotStoreCStrings(remoteslot, newtup.values);
> +    FillSlotDefaults(rel, estate, remoteslot);
> +    MemoryContextSwitchTo(oldctx);
> +
> +    PushActiveSnapshot(GetTransactionSnapshot());
> +    ExecOpenIndices(estate->es_result_relation_info, false);
> +
> +    ExecInsert(NULL, /* mtstate is only used for onconflict handling which we don't support atm */
> +               remoteslot,
> +               remoteslot,
> +               NIL,
> +               ONCONFLICT_NONE,
> +               estate,
> +               false);

I have *severe* doubts about just using the (newly) exposed functions
1:1 here.


> +/*
> + * Search the relation 'rel' for tuple using the replication index.
> + *
> + * If a matching tuple is found lock it with lockmode, fill the slot with its
> + * contents and return true, return false is returned otherwise.
> + */
> +static bool
> +tuple_find_by_replidx(Relation rel, LockTupleMode lockmode,
> +                      TupleTableSlot *searchslot, TupleTableSlot *slot)
> +{
> +    HeapTuple        scantuple;
> +    ScanKeyData        skey[INDEX_MAX_KEYS];
> +    IndexScanDesc    scan;
> +    SnapshotData    snap;
> +    TransactionId    xwait;
> +    Oid                idxoid;
> +    Relation        idxrel;
> +    bool            found;
> +
> +    /* Open REPLICA IDENTITY index.*/
> +    idxoid = RelationGetReplicaIndex(rel);
> +    if (!OidIsValid(idxoid))
> +    {
> +        elog(ERROR, "could not find configured replica identity for table \"%s\"",
> +             RelationGetRelationName(rel));
> +        return false;
> +    }
> +    idxrel = index_open(idxoid, RowExclusiveLock);
> +
> +    /* Start an index scan. */
> +    InitDirtySnapshot(snap);
> +    scan = index_beginscan(rel, idxrel, &snap,
> +                           RelationGetNumberOfAttributes(idxrel),
> +                           0);
> +
> +    /* Build scan key. */
> +    build_replindex_scan_key(skey, rel, idxrel, searchslot);
> +
> +retry:
> +    found = false;
> +
> +    index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
> +
> +    /* Try to find the tuple */
> +    if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
> +    {
> +        found = true;
> +        ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
> +        ExecMaterializeSlot(slot);
> +
> +        xwait = TransactionIdIsValid(snap.xmin) ?
> +            snap.xmin : snap.xmax;
> +
> +        /*
> +         * If the tuple is locked, wait for locking transaction to finish
> +         * and retry.
> +         */
> +        if (TransactionIdIsValid(xwait))
> +        {
> +            XactLockTableWait(xwait, NULL, NULL, XLTW_None);
> +            goto retry;
> +        }
> +    }

Hm. So we potentially find multiple tuples here, and lock all of
them. but then only use one for the update.



> +static List *
> +get_subscription_list(void)
> +{
> +    List       *res = NIL;
> +    Relation    rel;
> +    HeapScanDesc scan;
> +    HeapTuple    tup;
> +    MemoryContext resultcxt;
> +
> +    /* This is the context that we will allocate our output data in */
> +    resultcxt = CurrentMemoryContext;
> +
> +    /*
> +     * Start a transaction so we can access pg_database, and get a snapshot.
> +     * We don't have a use for the snapshot itself, but we're interested in
> +     * the secondary effect that it sets RecentGlobalXmin.  (This is critical
> +     * for anything that reads heap pages, because HOT may decide to prune
> +     * them even if the process doesn't attempt to modify any tuples.)
> +     */

> +    StartTransactionCommand();
> +    (void) GetTransactionSnapshot();
> +
> +    rel = heap_open(SubscriptionRelationId, AccessShareLock);
> +    scan = heap_beginscan_catalog(rel, 0, NULL);
> +
> +    while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
> +    {
> +        Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
> +        Subscription   *sub;
> +        MemoryContext    oldcxt;
> +
> +        /*
> +         * Allocate our results in the caller's context, not the
> +         * transaction's. We do this inside the loop, and restore the original
> +         * context at the end, so that leaky things like heap_getnext() are
> +         * not called in a potentially long-lived context.
> +         */
> +        oldcxt = MemoryContextSwitchTo(resultcxt);
> +
> +        sub = (Subscription *) palloc(sizeof(Subscription));
> +        sub->oid = HeapTupleGetOid(tup);
> +        sub->dbid = subform->subdbid;
> +        sub->enabled = subform->subenabled;
> +
> +        /* We don't fill fields we are not intereste in. */
> +        sub->name = NULL;
> +        sub->conninfo = NULL;
> +        sub->slotname = NULL;
> +        sub->publications = NIL;
> +
> +        res = lappend(res, sub);
> +        MemoryContextSwitchTo(oldcxt);
> +    }
> +
> +    heap_endscan(scan);
> +    heap_close(rel, AccessShareLock);
> +
> +    CommitTransactionCommand();

Hm. this doesn't seem quite right from a locking pov. What if, in the
middle of this, a new subscription is created?



> +void
> +logicalrep_worker_stop(LogicalRepWorker *worker)
> +{
> +    Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
> +    /* Check that the worker is up and what we expect. */
> +    if (!worker->proc)
> +        return;
> +    if (!IsBackendPid(worker->proc->pid))
> +        return;
> +
> +    /* Terminate the worker. */
> +    kill(worker->proc->pid, SIGTERM);
> +
> +    LWLockRelease(LogicalRepLauncherLock);
> +
> +    /* Wait for it to detach. */
> +    for (;;)
> +    {
> +        int    rc = WaitLatch(&MyProc->procLatch,
> +                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> +                           1000L);
> +
> +        /* emergency bailout if postmaster has died */
> +        if (rc & WL_POSTMASTER_DEATH)
> +            proc_exit(1);
> +
> +        ResetLatch(&MyProc->procLatch);
> +
> +        CHECK_FOR_INTERRUPTS();
> +
> +        if (!worker->proc)
> +            return;
> +    }
> +}

indentation here seems scfrewed.



> +static void
> +xacthook_signal_launcher(XactEvent event, void *arg)
> +{
> +    switch (event)
> +    {
> +        case XACT_EVENT_COMMIT:
> +            if (xacthook_do_signal_launcher)
> +                ApplyLauncherWakeup();
> +            break;
> +        default:
> +            /* We're not interested in other tx events */
> +            break;
> +    }
> +}

> +void
> +ApplyLauncherWakeupOnCommit(void)
> +{
> +    if (!xacthook_do_signal_launcher)
> +    {
> +        RegisterXactCallback(xacthook_signal_launcher, NULL);
> +        xacthook_do_signal_launcher = true;
> +    }
> +}

Hm. This seems like it really should be an AtCommit_* routine instead.
This also needs more docs.


Hadn't I previously read about always streaming data to disk first?

> @@ -0,0 +1,674 @@
> +/*-------------------------------------------------------------------------
> + * tablesync.c
> + *       PostgreSQL logical replication
> + *
> + * Copyright (c) 2012-2016, PostgreSQL Global Development Group
> + *
> + * IDENTIFICATION
> + *      src/backend/replication/logical/tablesync.c
> + *
> + * NOTES
> + *      This file contains code for initial table data synchronization for
> + *      logical replication.
> + *
> + *    The initial data synchronization is done separately for each table,
> + *    in separate apply worker that only fetches the initial snapshot data
> + *    from the provider and then synchronizes the position in stream with
> + *    the main apply worker.

Why? I guess that's because it allows to incrementally add tables, with
acceptable overhead.


> + *    The stream position synchronization works in multiple steps.
> + *     - sync finishes copy and sets table state as SYNCWAIT and waits
> + *       for state to change in a loop
> + *     - apply periodically checks unsynced tables for SYNCWAIT, when it
> + *       appears it will compare its position in the stream with the
> + *       SYNCWAIT position and decides to either set it to CATCHUP when
> + *       the apply was infront (and wait for the sync to do the catchup),
> + *       or set the state to SYNCDONE if the sync was infront or in case
> + *       both sync and apply are at the same position it will set it to
> + *       READY and stops tracking it

I'm not quite following here.

> + *     - if the state was set to CATCHUP sync will read the stream and
> + *       apply changes until it catches up to the specified stream
> + *       position and then sets state to READY and signals apply that it
> + *       can stop waiting and exits, if the state was set to something
> + *       else than CATCHUP the sync process will simply end
> + *     - if the state was set to SYNCDONE by apply, the apply will
> + *       continue tracking the table until it reaches the SYNCDONE stream
> + *       position at which point it sets state to READY and stops tracking
> + *
> + *    Example flows look like this:
> + *     - Apply is infront:
> + *          sync:8   -> set SYNCWAIT
> + *        apply:10 -> set CATCHUP
> + *        sync:10  -> set ready
> + *          exit
> + *        apply:10
> + *          stop tracking
> + *          continue rep
> + *    - Sync infront:
> + *        sync:10
> + *          set SYNCWAIT
> + *        apply:8
> + *          set SYNCDONE
> + *        sync:10
> + *          exit
> + *        apply:10
> + *          set READY
> + *          stop tracking
> + *          continue rep

This definitely needs to be expanded a bit. Where are we tracking how
far replication has progressed on individual tables? Are we creating new
slots for syncing? Is there any parallelism in syncing?

> +/*
> + * Exit routine for synchronization worker.
> + */
> +static void
> +finish_sync_worker(char *slotname)
> +{
> +    LogicalRepWorker   *worker;
> +    RepOriginId            originid;
> +    MemoryContext        oldctx = CurrentMemoryContext;
> +
> +    /*
> +     * Drop the replication slot on remote server.
> +     * We want to continue even in the case that the slot on remote side
> +     * is already gone. This means that we can leave slot on the remote
> +     * side but that can happen for other reasons as well so we can't
> +     * really protect against that.
> +     */
> +    PG_TRY();
> +    {
> +        wrcapi->drop_slot(wrchandle, slotname);
> +    }
> +    PG_CATCH();
> +    {
> +        MemoryContext    ectx;
> +        ErrorData       *edata;
> +
> +        ectx = MemoryContextSwitchTo(oldctx);
> +        /* Save error info */
> +        edata = CopyErrorData();
> +        MemoryContextSwitchTo(ectx);
> +        FlushErrorState();
> +
> +        ereport(WARNING,
> +                (errmsg("there was problem dropping the replication slot "
> +                        "\"%s\" on provider", slotname),
> +                 errdetail("The error was: %s", edata->message),
> +                 errhint("You may have to drop it manually")));
> +        FreeErrorData(edata);

ISTM we really should rather return success/failure here, and not throw
an error inside the libpqwalreceiver stuff.  I kind of wonder if we
actually can get rid of this indirection.


> +    /* Find the main apply worker and signal it. */
> +    LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
> +    worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid);
> +    if (worker && worker->proc)
> +        SetLatch(&worker->proc->procLatch);
> +    LWLockRelease(LogicalRepWorkerLock);

I'd rather do the SetLatch outside of the critical section.

> +static bool
> +wait_for_sync_status_change(TableState *tstate)
> +{
> +    int        rc;
> +    char    state = tstate->state;
> +
> +    while (!got_SIGTERM)
> +    {
> +        StartTransactionCommand();
> +        tstate->state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                                tstate->relid,
> +                                                &tstate->lsn,
> +                                                true);
> +        CommitTransactionCommand();
> +
> +        /* Status record was removed. */
> +        if (tstate->state == SUBREL_STATE_UNKNOWN)
> +            return false;
> +
> +        if (tstate->state != state)
> +            return true;
> +
> +        rc = WaitLatch(&MyProc->procLatch,
> +                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> +                       10000L);
> +
> +        /* emergency bailout if postmaster has died */
> +        if (rc & WL_POSTMASTER_DEATH)
> +            proc_exit(1);
> +
> +        ResetLatch(&MyProc->procLatch);

broken indentation.


> +/*
> + * Read the state of the tables in the subscription and update our table
> + * state list.
> + */
> +static void
> +reread_sync_state(Oid relid)
> +{
> +    dlist_mutable_iter    iter;
> +    Relation    rel;
> +    HeapTuple    tup;
> +    ScanKeyData    skey[2];
> +    HeapScanDesc    scan;
> +
> +    /* Clean the old list. */
> +    dlist_foreach_modify(iter, &table_states)
> +    {
> +        TableState *tstate = dlist_container(TableState, node, iter.cur);
> +
> +        dlist_delete(iter.cur);
> +        pfree(tstate);
> +    }
> +
> +    /*
> +     * Fetch all the subscription relation states that are not marked as
> +     * ready and push them into our table state tracking list.
> +     */
> +    rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
> +
> +    ScanKeyInit(&skey[0],
> +                Anum_pg_subscription_rel_subid,
> +                BTEqualStrategyNumber, F_OIDEQ,
> +                ObjectIdGetDatum(MyLogicalRepWorker->subid));
> +
> +    if (OidIsValid(relid))
> +    {
> +        ScanKeyInit(&skey[1],
> +                    Anum_pg_subscription_rel_subrelid,
> +                    BTEqualStrategyNumber, F_OIDEQ,
> +                    ObjectIdGetDatum(relid));
> +    }
> +    else
> +    {
> +        ScanKeyInit(&skey[1],
> +                    Anum_pg_subscription_rel_substate,
> +                    BTEqualStrategyNumber, F_CHARNE,
> +                    CharGetDatum(SUBREL_STATE_READY));
> +    }
> +
> +    scan = heap_beginscan_catalog(rel, 2, skey);

Hm. So this is a seqscan. Shouldn't we make this use an index (depending
on which branch is taken above)?


> +    while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
> +    {
> +        Form_pg_subscription_rel    subrel;
> +        TableState       *tstate;
> +        MemoryContext    oldctx;
> +
> +        subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
> +
> +        /* Allocate the tracking info in a permament memory context. */

s/permament/permanent/

> +/*
> + * Handle table synchronization cooperation from the synchroniation
> + * worker.
> + */
> +static void
> +process_syncing_tables_sync(char *slotname, XLogRecPtr end_lsn)
> +{
> +    TableState *tstate;
> +    TimeLineID    tli;
> +
> +    Assert(!IsTransactionState());
> +
> +    /*
> +     * Synchronization workers don't keep track of all synchronization
> +     * tables, they only care about their table.
> +     */
> +    if (!table_states_valid)
> +    {
> +        StartTransactionCommand();
> +        reread_sync_state(MyLogicalRepWorker->relid);
> +        CommitTransactionCommand();
> +    }
> +
> +    /* Somebody removed table underneath this worker, nothing more to do. */
> +    if (dlist_is_empty(&table_states))
> +    {
> +        wrcapi->endstreaming(wrchandle, &tli);
> +        finish_sync_worker(slotname);
> +    }
> +
> +    /* Check if we are done with catchup now. */
> +    tstate = dlist_container(TableState, node, dlist_head_node(&table_states));
> +    if (tstate->state == SUBREL_STATE_CATCHUP)
> +    {
> +        Assert(tstate->lsn != InvalidXLogRecPtr);
> +
> +        if (tstate->lsn == end_lsn)
> +        {
> +            tstate->state = SUBREL_STATE_READY;
> +            tstate->lsn = InvalidXLogRecPtr;
> +            /* Update state of the synchronization. */
> +            StartTransactionCommand();
> +            SetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                    tstate->relid, tstate->state,
> +                                    tstate->lsn);
> +            CommitTransactionCommand();
> +
> +            wrcapi->endstreaming(wrchandle, &tli);
> +            finish_sync_worker(slotname);
> +        }
> +        return;
> +    }
> +}

The return inside the if is a bit weird. Makes one think it might be a
loop or such.


> +/*
> + * Handle table synchronization cooperation from the apply worker.
> + */
> +static void
> +process_syncing_tables_apply(char *slotname, XLogRecPtr end_lsn)
> +{
> +    dlist_mutable_iter    iter;
> +
> +    Assert(!IsTransactionState());
> +
> +    if (!table_states_valid)
> +    {
> +        StartTransactionCommand();
> +        reread_sync_state(InvalidOid);
> +        CommitTransactionCommand();
> +    }

So this pattern is repeated a bunch of times, maybe we can encapsulate
that somewhat? Maybe like ensure_sync_state_valid() or such?


> +    dlist_foreach_modify(iter, &table_states)
> +    {
> +        TableState *tstate = dlist_container(TableState, node, iter.cur);
> +        bool        start_worker;
> +        LogicalRepWorker   *worker;
> +
> +        /*
> +         * When the synchronization process is at the cachup phase we need

s/cachup/catchup/


> +         * to ensure that we are not behind it (it's going to wait at this
> +         * point for the change of state). Once we are infront or at the same
> +         * position as the synchronization proccess we can signal it to
> +         * finish the catchup.
> +         */
> +        if (tstate->state == SUBREL_STATE_SYNCWAIT)
> +        {
> +            if (end_lsn > tstate->lsn)
> +            {
> +                /*
> +                 * Apply is infront, tell sync to catchup. and wait until
> +                 * it does.
> +                 */
> +                tstate->state = SUBREL_STATE_CATCHUP;
> +                tstate->lsn = end_lsn;
> +                StartTransactionCommand();
> +                SetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                        tstate->relid, tstate->state,
> +                                        tstate->lsn);
> +                CommitTransactionCommand();
> +
> +                /* Signal the worker as it may be waiting for us. */
> +                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +                worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
> +                                                tstate->relid);
> +                if (worker && worker->proc)
> +                    SetLatch(&worker->proc->procLatch);
> +                LWLockRelease(LogicalRepWorkerLock);

Different parts of this file use different lock level to set the
latch. Why?


> +                if (wait_for_sync_status_change(tstate))
> +                    Assert(tstate->state == SUBREL_STATE_READY);
> +            }
> +            else
> +            {
> +                /*
> +                 * Apply is either behind in which case sync worker is done
> +                 * but apply needs to keep tracking the table until it
> +                 * catches up to where sync finished.
> +                 * Or apply and sync are at the same position in which case
> +                 * table can be switched to standard replication mode
> +                 * immediately.
> +                 */
> +                if (end_lsn < tstate->lsn)
> +                    tstate->state = SUBREL_STATE_SYNCDONE;
> +                else
> +                    tstate->state = SUBREL_STATE_READY;
> +

What I'm failing to understand is how this can be done under
concurrency. You probably thought about this, but it should really be
explained somewhere.


> +                StartTransactionCommand();
> +                SetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                        tstate->relid, tstate->state,
> +                                        tstate->lsn);
> +                CommitTransactionCommand();
> +
> +                /* Signal the worker as it may be waiting for us. */
> +                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +                worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
> +                                                tstate->relid);
> +                if (worker && worker->proc)
> +                    SetLatch(&worker->proc->procLatch);
> +                LWLockRelease(LogicalRepWorkerLock);

Oh, and again, please set latches outside of the lock.


> +        else if (tstate->state == SUBREL_STATE_SYNCDONE &&
> +                 end_lsn >= tstate->lsn)
> +        {
> +            /*
> +             * Apply catched up to the position where table sync finished,
> +             * mark the table as ready for normal replication.
> +             */

Sentence needs to be rephrased a bit.

> +        /*
> +         * In case table is supposed to be synchronizing but the
> +         * synchronization worker is not running, start it.
> +         * Limit the number of launched workers here to one (for now).
> +         */

Hm. That seems problematic for online upgrade type cases, we might never
be catch up that way...


> +/*
> + * Start syncing the table in the sync worker.
> + */
> +char *
> +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
> +{
> +    StringInfoData    s;
> +    TableState        tstate;
> +    MemoryContext    oldctx;
> +    char           *slotname;
> +
> +    /* Check the state of the table synchronization. */
> +    StartTransactionCommand();
> +    tstate.relid = MyLogicalRepWorker->relid;
> +    tstate.state = GetSubscriptionRelState(MySubscription->oid, tstate.relid,
> +                                           &tstate.lsn, false);
> +
> +    /*
> +     * Build unique slot name.
> +     * TODO: protect against too long slot name.
> +     */
> +    oldctx = MemoryContextSwitchTo(CacheMemoryContext);
> +    initStringInfo(&s);
> +    appendStringInfo(&s, "%s_sync_%s", MySubscription->slotname,
> +                     get_rel_name(tstate.relid));
> +    slotname = s.data;

Is this memory freed somewhere?


> +                /*
> +                 * We want to do the table data sync in single
> +                 * transaction so do not close the transaction opened
> +                 * above.
> +                 * There will be no BEGIN or COMMIT messages coming via
> +                 * logical replication while the copy table command is
> +                 * running so start the transaction here.
> +                 * Note the memory context for data handling will still
> +                 * be done using ensure_transaction called by the insert
> +                 * handler.
> +                 */
> +                StartTransactionCommand();
> +
> +                /*
> +                 * Don't allow parallel access other than SELECT while
> +                 * the initial contents are being copied.
> +                 */
> +                rel = heap_open(tstate.relid, ExclusiveLock);

Why do we want to allow access at all?



> @@ -87,6 +92,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>      cb->commit_cb = pgoutput_commit_txn;
>      cb->filter_by_origin_cb = pgoutput_origin_filter;
>      cb->shutdown_cb = pgoutput_shutdown;
> +    cb->tuple_cb = pgoutput_tuple;
> +    cb->list_tables_cb = pgoutput_list_tables;
>  }

What are these new, and undocumented callbacks actually doing? And why
is this integrated into logical decoding?


>  /*
> + * Handle LIST_TABLES command.
> + */
> +static void
> +SendTableList(ListTablesCmd *cmd)
> +{

Ugh.


I really dislike this kind of command. I think we should instead change
things around, allowing to issue normal SQL via the replication
command. We'll have to error out for running sql for non-database
connected replication connections, but that seems fine.


Andres



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Pavan Deolasee
Дата:
Сообщение: Re: Vacuum: allow usage of more than 1GB of work mem
Следующее
От: Claudio Freire
Дата:
Сообщение: Re: Vacuum: allow usage of more than 1GB of work mem