Re: Logical Replication WIP

Поиск
Список
Период
Сортировка
От Petr Jelinek
Тема Re: Logical Replication WIP
Дата
Msg-id a94c5e98-0ac2-d0a1-e1de-eaf8f371e631@2ndquadrant.com
обсуждение исходный текст
Ответ на Re: Logical Replication WIP  (Andres Freund <andres@anarazel.de>)
Ответы Re: Logical Replication WIP  (Andres Freund <andres@anarazel.de>)
Список pgsql-hackers
On 14/09/16 00:48, Andres Freund wrote:
>
> First read through the current version. Hence no real architectural
> comments.

Hi,

Thanks for looking!

>
> On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:
>
>> diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
>> new file mode 100644
>> index 0000000..e0c719d
>> --- /dev/null
>> +++ b/src/backend/commands/publicationcmds.c
>> @@ -0,0 +1,761 @@
>> +/*-------------------------------------------------------------------------
>> + *
>> + * publicationcmds.c
>> + *        publication manipulation
>> + *
>> + * Copyright (c) 2015, PostgreSQL Global Development Group
>> + *
>> + * IDENTIFICATION
>> + *        publicationcmds.c
>>
>
> Not that I'm a fan of this line in the first place, but usually it does
> include the path.
>

Yes, I don't bother with it in WIP version though, because this way I 
won't forget to change it when it's getting close to ready if there were 
renames.

>> +static void
>> +check_replication_permissions(void)
>> +{
>> +    if (!superuser() && !has_rolreplication(GetUserId()))
>> +        ereport(ERROR,
>> +                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
>> +                 (errmsg("must be superuser or replication role to manipulate publications"))));
>> +}
>
> Do we want to require owner privileges for replication roles? I'd say
> no, but want to raise the question.
>

No, we might want to invent some publish role for which we will so that 
we can do logical replication with higher granularity but for 
replication role it does not make sense. And I think the higher 
granularity ACLs is something for followup patches.

>
>> +ObjectAddress
>> +CreatePublication(CreatePublicationStmt *stmt)
>> +{
>> +    Relation    rel;
>> +    ObjectAddress myself;
>> +    Oid            puboid;
>> +    bool        nulls[Natts_pg_publication];
>> +    Datum        values[Natts_pg_publication];
>> +    HeapTuple    tup;
>> +    bool        replicate_insert_given;
>> +    bool        replicate_update_given;
>> +    bool        replicate_delete_given;
>> +    bool        replicate_insert;
>> +    bool        replicate_update;
>> +    bool        replicate_delete;
>> +
>> +    check_replication_permissions();
>> +
>> +    rel = heap_open(PublicationRelationId, RowExclusiveLock);
>> +
>> +    /* Check if name is used */
>> +    puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
>> +    if (OidIsValid(puboid))
>> +    {
>> +        ereport(ERROR,
>> +                (errcode(ERRCODE_DUPLICATE_OBJECT),
>> +                 errmsg("publication \"%s\" already exists",
>> +                        stmt->pubname)));
>> +    }
>> +
>> +    /* Form a tuple. */
>> +    memset(values, 0, sizeof(values));
>> +    memset(nulls, false, sizeof(nulls));
>> +
>> +    values[Anum_pg_publication_pubname - 1] =
>> +        DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
>> +
>> +    parse_publication_options(stmt->options,
>> +                              &replicate_insert_given, &replicate_insert,
>> +                              &replicate_update_given, &replicate_update,
>> +                              &replicate_delete_given, &replicate_delete);
>> +
>> +    values[Anum_pg_publication_puballtables - 1] =
>> +        BoolGetDatum(stmt->for_all_tables);
>> +    values[Anum_pg_publication_pubreplins - 1] =
>> +        BoolGetDatum(replicate_insert);
>> +    values[Anum_pg_publication_pubreplupd - 1] =
>> +        BoolGetDatum(replicate_update);
>> +    values[Anum_pg_publication_pubrepldel - 1] =
>> +        BoolGetDatum(replicate_delete);
>> +
>> +    tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
>> +
>> +    /* Insert tuple into catalog. */
>> +    puboid = simple_heap_insert(rel, tup);
>> +    CatalogUpdateIndexes(rel, tup);
>> +    heap_freetuple(tup);
>> +
>> +    ObjectAddressSet(myself, PublicationRelationId, puboid);
>> +
>> +    /* Make the changes visible. */
>> +    CommandCounterIncrement();
>> +
>> +    if (stmt->tables)
>> +    {
>> +        List       *rels;
>> +
>> +        Assert(list_length(stmt->tables) > 0);
>> +
>> +        rels = GatherTableList(stmt->tables);
>> +        PublicationAddTables(puboid, rels, true, NULL);
>> +        CloseTables(rels);
>> +    }
>> +    else if (stmt->for_all_tables || stmt->schema)
>> +    {
>> +        List       *rels;
>> +
>> +        rels = GatherTables(stmt->schema);
>> +        PublicationAddTables(puboid, rels, true, NULL);
>> +        CloseTables(rels);
>> +    }
>
> Isn't this (and ALTER) racy? What happens if tables are concurrently
> created? This session wouldn't necessarily see the tables, and other
> sessions won't see for_all_tables/schema.   Evaluating
> for_all_tables/all_in_schema when the publication is used, would solve
> that problem.

Well, yes it is. It's technically not problem for all_in_schema as 
that's just shorthand for TABLE a,b,c,d etc where future tables don't 
matter (and should be added manually, unless we want to change that 
behavior to act more like for_all_tables just with schema filter which I 
wouldn't be against). But for for_all_tables it's problem I agree.

Based on discussion offline I'll move the check to the actual DML 
operation instead of DDL and have for_all_tables be evaluated when used 
not when defined.

>
>> +/*
>> + * Gather all tables optinally filtered by schema name.
>> + * The gathered tables are locked in access share lock mode.
>> + */
>> +static List *
>> +GatherTables(char *nspname)
>> +{
>> +    Oid            nspid = InvalidOid;
>> +    List       *rels = NIL;
>> +    Relation    rel;
>> +    SysScanDesc scan;
>> +    ScanKeyData key[1];
>> +    HeapTuple    tup;
>> +
>> +    /* Resolve and validate the schema if specified */
>> +    if (nspname)
>> +    {
>> +        nspid = LookupExplicitNamespace(nspname, false);
>> +        if (IsSystemNamespace(nspid) || IsToastNamespace(nspid))
>> +            ereport(ERROR,
>> +                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
>> +                     errmsg("only tables in user schemas can be added to publication"),
>> +                     errdetail("%s is a system schema", strVal(nspname))));
>> +    }
>
> Why are we restricting pg_catalog here? There's a bunch of extensions
> creating objects therein, and we allow that. Seems better to just rely
> on the IsSystemClass check for that below.
>

Makes sense.

>> +/*
>> + * Gather Relations based o provided by RangeVar list.
>> + * The gathered tables are locked in access share lock mode.
>> + */
>
> Why access share? Shouldn't we make this ShareUpdateExclusive or
> similar, to prevent schema changes?
>

Hm, I thought AccessShare would be enough to prevent schema changes that 
matter to us (which is basically just drop afaik).

>
>> +static List *
>> +GatherTableList(List *tables)
>> +{
>> +    List       *relids = NIL;
>> +    List       *rels = NIL;
>> +    ListCell   *lc;
>> +
>> +    /*
>> +     * Open, share-lock, and check all the explicitly-specified relations
>> +     */
>> +    foreach(lc, tables)
>> +    {
>> +        RangeVar   *rv = lfirst(lc);
>> +        Relation    rel;
>> +        bool        recurse = interpretInhOption(rv->inhOpt);
>> +        Oid            myrelid;
>> +
>> +        rel = heap_openrv(rv, AccessShareLock);
>> +        myrelid = RelationGetRelid(rel);
>> +        /* don't throw error for "foo, foo" */
>> +        if (list_member_oid(relids, myrelid))
>> +        {
>> +            heap_close(rel, AccessShareLock);
>> +            continue;
>> +        }
>> +        rels = lappend(rels, rel);
>> +        relids = lappend_oid(relids, myrelid);
>> +
>> +        if (recurse)
>> +        {
>> +            ListCell   *child;
>> +            List       *children;
>> +
>> +            children = find_all_inheritors(myrelid, AccessShareLock,
>> +                                           NULL);
>> +
>> +            foreach(child, children)
>> +            {
>> +                Oid            childrelid = lfirst_oid(child);
>> +
>> +                if (list_member_oid(relids, childrelid))
>> +                    continue;
>> +
>> +                /* find_all_inheritors already got lock */
>> +                rel = heap_open(childrelid, NoLock);
>> +                rels = lappend(rels, rel);
>> +                relids = lappend_oid(relids, childrelid);
>> +            }
>> +        }
>> +    }
>
> Hm, can't this yield duplicates, when both an inherited and a top level
> relation are specified?
>

Hmm possible, I'll do the same check as I do above.

>
>> @@ -713,6 +714,25 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
>>      ObjectAddressSet(address, RelationRelationId, relationId);
>>
>>      /*
>> +     * If the newly created relation is a table and there are publications
>> +     * which were created as FOR ALL TABLES, we want to add the relation
>> +     * membership to those publications.
>> +     */
>> +
>> +    if (relkind == RELKIND_RELATION)
>> +    {
>> +        List       *pubids = GetAllTablesPublications();
>> +        ListCell   *lc;
>> +
>> +        foreach(lc, pubids)
>> +        {
>> +            Oid    pubid = lfirst_oid(lc);
>> +
>> +            publication_add_relation(pubid, rel, false);
>> +        }
>> +    }
>> +
>
> Hm, this has the potential to noticeably slow down table creation.
>

I doubt it's going to be noticeable given all the work CREATE TABLE 
already does, but it certainly won't make it any faster. But since we 
agreed to move the check to DML this will be removed as well.

>> +publication_opt_item:
>> +            IDENT
>> +                {
>> +                    /*
>> +                     * We handle identifiers that aren't parser keywords with
>> +                     * the following special-case codes, to avoid bloating the
>> +                     * size of the main parser.
>> +                     */
>> +                    if (strcmp($1, "replicate_insert") == 0)
>> +                        $$ = makeDefElem("replicate_insert",
>> +                                         (Node *)makeInteger(TRUE), @1);
>> +                    else if (strcmp($1, "noreplicate_insert") == 0)
>> +                        $$ = makeDefElem("replicate_insert",
>> +                                         (Node *)makeInteger(FALSE), @1);
>> +                    else if (strcmp($1, "replicate_update") == 0)
>> +                        $$ = makeDefElem("replicate_update",
>> +                                         (Node *)makeInteger(TRUE), @1);
>> +                    else if (strcmp($1, "noreplicate_update") == 0)
>> +                        $$ = makeDefElem("replicate_update",
>> +                                         (Node *)makeInteger(FALSE), @1);
>> +                    else if (strcmp($1, "replicate_delete") == 0)
>> +                        $$ = makeDefElem("replicate_delete",
>> +                                         (Node *)makeInteger(TRUE), @1);
>> +                    else if (strcmp($1, "noreplicate_delete") == 0)
>> +                        $$ = makeDefElem("replicate_delete",
>> +                                         (Node *)makeInteger(FALSE), @1);
>> +                    else
>> +                        ereport(ERROR,
>> +                                (errcode(ERRCODE_SYNTAX_ERROR),
>> +                                 errmsg("unrecognized publication option \"%s\"", $1),
>> +                                     parser_errposition(@1)));
>> +                }
>> +        ;
>
> I'm kind of inclined to do this checking at execution (or transform)
> time instead.  That allows extension to add options, and handle them in
> utility hooks.
>

Thant's interesting point, I prefer the parsing to be done in gram.y, 
but it might be worth moving it for extensibility. Although there are so 
far other barriers for that.

>> +
>> +/* ----------------
>> + *        pg_publication_rel definition.  cpp turns this into
>> + *        typedef struct FormData_pg_publication_rel
>> + *
>> + * ----------------
>> + */
>> +#define PublicationRelRelationId                6106
>> +
>> +CATALOG(pg_publication_rel,6106)
>> +{
>> +    Oid        pubid;                /* Oid of the publication */
>> +    Oid        relid;                /* Oid of the relation */
>> +} FormData_pg_publication_rel;
>
> Hm. Do we really want this to have an oid? Won't that significantly,
> especially if multiple publications are present, increase our oid
> consumption?  It seems entirely sufficient to identify rows in here
> using (pubid, relid).
>

It could, but I'll have to check and possibly fix dependency code, I 
vaguely remember that there is some part of it that assumes that suboid 
is only used for relation column and nothing else.

>
>> +ObjectAddress
>> +CreateSubscription(CreateSubscriptionStmt *stmt)
>> +{
>> +    Relation    rel;
>> +    ObjectAddress myself;
>> +    Oid            subid;
>> +    bool        nulls[Natts_pg_subscription];
>> +    Datum        values[Natts_pg_subscription];
>> +    HeapTuple    tup;
>> +    bool        enabled_given;
>> +    bool        enabled;
>> +    char       *conninfo;
>> +    List       *publications;
>> +
>> +    check_subscription_permissions();
>> +
>> +    rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
>> +
>> +    /* Check if name is used */
>> +    subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
>> +                            CStringGetDatum(stmt->subname));
>> +    if (OidIsValid(subid))
>> +    {
>> +        ereport(ERROR,
>> +                (errcode(ERRCODE_DUPLICATE_OBJECT),
>> +                 errmsg("subscription \"%s\" already exists",
>> +                        stmt->subname)));
>> +    }
>> +
>> +    /* Parse and check options. */
>> +    parse_subscription_options(stmt->options, &enabled_given, &enabled,
>> +                               &conninfo, &publications);
>> +
>> +    /* TODO: improve error messages here. */
>> +    if (conninfo == NULL)
>> +        ereport(ERROR,
>> +                (errcode(ERRCODE_SYNTAX_ERROR),
>> +                 errmsg("connection not specified")));
>
> Probably also makes sense to parse the conninfo here to verify it looks
> saen.  Although that's fairly annoying to do, because the relevant code
> is libpq :(
>

Well the connection is eventually used (in later patches) so maybe 
that's not problem.

>
>> diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
>> index 65230e2..f3d54c8 100644
>> --- a/src/backend/nodes/copyfuncs.c
>> +++ b/src/backend/nodes/copyfuncs.c
>
> I think you might be missing outfuncs support.
>

I thought that we don't do outfuncs for DDL?

>> +
>> +CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHEMA_MACRO
>> +{
>> +    Oid            subdbid;            /* Database the subscription is in. */
>> +    NameData    subname;        /* Name of the subscription */
>> +    bool        subenabled;        /* True if the subsription is enabled (running) */
>
> Not sure what "running" means here.

It's very terse way of saying that enabled means worker should be running.


>> +    <varlistentry>
>> +     <term>
>> +      publication_names
>> +     </term>
>> +     <listitem>
>> +      <para>
>> +       Comma separated list of publication names for which to subscribe
>> +       (receive changes). See
>> +       <xref linkend="logical-replication-publication"> for more info.
>> +      </para>
>> +     </listitem>
>> +    </varlistentry>
>> +   </variablelist>
>
> Do we need to specify an escaping scheme here?
>

Probably as we allow whatever Name allows.

>
>> +<listitem>
>> +<para>
>> +                Commit timestamp of the transaction.
>> +</para>
>> +</listitem>
>> +</varlistentry>
>
> Perhaps mention it's relative to postgres epoch?
>

Already done in my local working copy.

>
>
>> +<variablelist>
>> +<varlistentry>
>> +<term>
>> +        Byte1('O')
>> +</term>
>> +<listitem>
>> +<para>
>> +                Identifies the message as an origin message.
>> +</para>
>> +</listitem>
>> +</varlistentry>
>> +<varlistentry>
>> +<term>
>> +        Int64
>> +</term>
>> +<listitem>
>> +<para>
>> +                The LSN of the commit on the origin server.
>> +</para>
>> +</listitem>
>> +</varlistentry>
>> +<varlistentry>
>> +<term>
>> +        Int8
>> +</term>
>> +<listitem>
>> +<para>
>> +                Length of the origin name (including the NULL-termination
>> +                character).
>> +</para>
>> +</listitem>
>> +</varlistentry>
>
> Should this explain that there could be mulitple origin messages (when
> replay switched origins during an xact)?
>

Makes sense.

>> +<para>
>> +                Relation name.
>> +</para>
>> +</listitem>
>> +</varlistentry>
>> +</variablelist>
>> +
>> +</para>
>> +
>> +<para>
>> +This message is always followed by Attributes message.
>> +</para>
>
> What's the point of having this separate from the relation message?
>

It's not, it part of it, but the documentation does not make that very 
clear.

>> +<varlistentry>
>> +<term>
>> +        Byte1('C')
>> +</term>
>> +<listitem>
>> +<para>
>> +                Start of column block.
>> +</para>
>> +</listitem>
>
> "block"?
>

Block, message part, sub-message, I am not sure how to call something 
that's repeating inside of a message.

>> +</varlistentry><varlistentry>
>> +<term>
>> +        Int8
>> +</term>
>> +<listitem>
>> +<para>
>> +                Flags for the column. Currently can be either 0 for no flags
>> +                or one which marks the column as part of the key.
>> +</para>
>> +</listitem>
>> +</varlistentry>
>> +<varlistentry>
>> +<term>
>> +        Int8
>> +</term>
>> +<listitem>
>> +<para>
>> +                Length of column name (including the NULL-termination
>> +                character).
>> +</para>
>> +</listitem>
>> +</varlistentry>
>> +<varlistentry>
>> +<term>
>> +        String
>> +</term>
>> +<listitem>
>> +<para>
>> +                Name of the column.
>> +</para>
>> +</listitem>
>> +</varlistentry>
>
> Huh, no type information?
>

It's not necessary for the text transfer, it will be if we ever add 
binary data transfer but that will require protocol version bump anyway.


>> +<varlistentry>
>> +<term>
>> +        Byte1('O')
>> +</term>
>> +<listitem>
>> +<para>
>> +                Identifies the following TupleData message as the old tuple
>> +                (deleted tuple).
>> +</para>
>> +</listitem>
>> +</varlistentry>
>
> Should we discern between old key and old tuple?
>

Yes, otherwise it will be hard to support REPLICA IDENTITY FULL.

>
>> +/*
>> + * Read transaction BEGIN from the stream.
>> + */
>> +void
>> +logicalrep_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
>> +                      TimestampTz *committime, TransactionId *remote_xid)
>> +{
>> +    /* read fields */
>> +    *remote_lsn = pq_getmsgint64(in);
>> +    Assert(*remote_lsn != InvalidXLogRecPtr);
>> +    *committime = pq_getmsgint64(in);
>> +    *remote_xid = pq_getmsgint(in, 4);
>> +}
>
> In network exposed stuff it seems better not to use assert, and error
> out instead.
>

Okay

>
>> +/*
>> + * Write UPDATE to the output stream.
>> + */
>> +void
>> +logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
>> +                       HeapTuple newtuple)
>> +{
>> +    pq_sendbyte(out, 'U');        /* action UPDATE */
>> +
>> +    /* use Oid as relation identifier */
>> +    pq_sendint(out, RelationGetRelid(rel), 4);
>
> Wonder if there's a way that could screw us. What happens if there's an
> oid wraparound, and a relation is dropped? Then a new relation could end
> up with same id. Maybe answered somewhere further down.
>

Should not, we'll know we didn't send the message for the new table yet 
so we'll send new Relation message.

>> +
>> +/*
>> + * COMMIT callback
>> + */
>> +static void
>> +pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>> +                     XLogRecPtr commit_lsn)
>> +{
>> +    OutputPluginPrepareWrite(ctx, true);
>> +    logicalrep_write_commit(ctx->out, txn, commit_lsn);
>> +    OutputPluginWrite(ctx, true);
>> +}
>
> Hm, so we don't reset the context for these...
>

What?

>> +/*
>> + * Sends the decoded DML over wire.
>> + */
>> +static void
>> +pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>> +                Relation relation, ReorderBufferChange *change)
>> +{
>
>> +    /* Avoid leaking memory by using and resetting our own context */
>> +    old = MemoryContextSwitchTo(data->context);
>> +
>> +    /*
>> +     * Write the relation schema if the current schema haven't been sent yet.
>> +     */
>> +    if (!relentry->schema_sent)
>> +    {
>> +        OutputPluginPrepareWrite(ctx, false);
>> +        logicalrep_write_rel(ctx->out, relation);
>> +        OutputPluginWrite(ctx, false);
>> +        relentry->schema_sent = true;
>> +    }
>> +
>> +    /* Send the data */
>> +    switch (change->action)
>> +    {
> ...
>> +    /* Cleanup */
>> +    MemoryContextSwitchTo(old);
>> +    MemoryContextReset(data->context);
>> +}
>
> IIRC there were some pfree's in called functions. It's probably better
> to remove those and rely on this.
>

Only write_tuple calls pfree, that's mostly because we may call it twice 
for single tuple and it might allocate a lot of data.

>> +/*
>> + * Load publications from the list of publication names.
>> + */
>> +static List *
>> +LoadPublications(List *pubnames)
>> +{
>> +    List       *result = NIL;
>> +    ListCell   *lc;
>> +
>> +    foreach (lc, pubnames)
>> +    {
>> +        char           *pubname = (char *) lfirst(lc);
>> +        Publication       *pub = GetPublicationByName(pubname, false);
>> +
>> +        result = lappend(result, pub);
>> +    }
>> +
>> +    return result;
>> +}
>
> Why are we doing this eagerly? On systems with a lot of relations
> this'll suck up a fair amount of memory, without much need?
>

Don't follow, it only reads publications not relations in them, reason 
why we do it eagerly is to validate that the requested publications 
actually exist.

>> +/*
>> + * Remove all the entries from our relation cache.
>> + */
>> +static void
>> +destroy_rel_sync_cache(void)
>> +{
>> +    HASH_SEQ_STATUS        status;
>> +    RelationSyncEntry  *entry;
>> +
>> +    if (RelationSyncCache == NULL)
>> +        return;
>> +
>> +    hash_seq_init(&status, RelationSyncCache);
>> +
>> +    while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
>> +    {
>> +        if (hash_search(RelationSyncCache, (void *) &entry->relid,
>> +                        HASH_REMOVE, NULL) == NULL)
>> +            elog(ERROR, "hash table corrupted");
>> +    }
>> +
>> +    RelationSyncCache = NULL;
>> +}
>
> Any reason not to just destroy the hash table instead?
>

Missed that we have AOI for that.

>>
>>  /*
>> - * Module load callback
>> + * Module initialization callback
>>   */
>> -void
>> -_PG_init(void)
>> +WalReceiverConnHandle *
>> +_PG_walreceirver_conn_init(WalReceiverConnAPI *wrcapi)
>>  {
>> -    /* Tell walreceiver how to reach us */
>> -    if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
>> -        walrcv_readtimelinehistoryfile != NULL ||
>> -        walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
>> -        walrcv_receive != NULL || walrcv_send != NULL ||
>> -        walrcv_disconnect != NULL)
>> -        elog(ERROR, "libpqwalreceiver already loaded");
>> -    walrcv_connect = libpqrcv_connect;
>> -    walrcv_get_conninfo = libpqrcv_get_conninfo;
>> -    walrcv_identify_system = libpqrcv_identify_system;
>> -    walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
>> -    walrcv_startstreaming = libpqrcv_startstreaming;
>> -    walrcv_endstreaming = libpqrcv_endstreaming;
>> -    walrcv_receive = libpqrcv_receive;
>> -    walrcv_send = libpqrcv_send;
>> -    walrcv_disconnect = libpqrcv_disconnect;
>> +    WalReceiverConnHandle *handle;
>> +
>> +    handle = palloc0(sizeof(WalReceiverConnHandle));
>> +
>> +    /* Tell caller how to reach us */
>> +    wrcapi->connect = libpqrcv_connect;
>> +    wrcapi->get_conninfo = libpqrcv_get_conninfo;
>> +    wrcapi->identify_system = libpqrcv_identify_system;
>> +    wrcapi->readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
>> +    wrcapi->create_slot = libpqrcv_create_slot;
>> +    wrcapi->startstreaming_physical = libpqrcv_startstreaming_physical;
>> +    wrcapi->startstreaming_logical = libpqrcv_startstreaming_logical;
>> +    wrcapi->endstreaming = libpqrcv_endstreaming;
>> +    wrcapi->receive = libpqrcv_receive;
>> +    wrcapi->send = libpqrcv_send;
>> +    wrcapi->disconnect = libpqrcv_disconnect;
>> +
>> +    return handle;
>>  }
>
> This however I'm not following. Why do we need multiple copies of this?
> And why aren't we doing the assignments in _PG_init?  Seems better to
> just allocate one WalRcvCalllbacks globally and assign all these as
> constants.  Then the establishment function can just return all these
> (as part of a bigger struct).
>

Meh, If I understand you correctly that will make the access bit more 
ugly (multiple layers of structs).

>
> (skipped logical rep docs)
>
>> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
>> index 8acdff1..34007d3 100644
>> --- a/doc/src/sgml/reference.sgml
>> +++ b/doc/src/sgml/reference.sgml
>> @@ -54,11 +54,13 @@
>>     &alterOperatorClass;
>>     &alterOperatorFamily;
>>     &alterPolicy;
>> +   &alterPublication;
>>     &alterRole;
>>     &alterRule;
>>     &alterSchema;
>>     &alterSequence;
>>     &alterServer;
>> +   &alterSubscription;
>>     &alterSystem;
>>     &alterTable;
>>     &alterTableSpace;
>> @@ -100,11 +102,13 @@
>>     &createOperatorClass;
>>     &createOperatorFamily;
>>     &createPolicy;
>> +   &createPublication;
>>     &createRole;
>>     &createRule;
>>     &createSchema;
>>     &createSequence;
>>     &createServer;
>> +   &createSubscription;
>>     &createTable;
>>     &createTableAs;
>>     &createTableSpace;
>> @@ -144,11 +148,13 @@
>>     &dropOperatorFamily;
>>     &dropOwned;
>>     &dropPolicy;
>> +   &dropPublication;
>>     &dropRole;
>>     &dropRule;
>>     &dropSchema;
>>     &dropSequence;
>>     &dropServer;
>> +   &dropSubscription;
>>     &dropTable;
>>     &dropTableSpace;
>>     &dropTSConfig;
>
> Hm, shouldn't all these have been registered in the earlier patch?
>

Yeah, all the rebasing sometimes produces artefacts.

>
>
>> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
>> index d29d3f9..f2052b8 100644
>> --- a/src/backend/commands/subscriptioncmds.c
>> +++ b/src/backend/commands/subscriptioncmds.c
>
> This sure is a lot of yanking around of previously added code.  At least
> some of it looks like it should really have been part of the earlier
> commit.
>

True, but it depends on the previous patch ... scratches head ... hmm 
although the libpqwalreceiver actually does not depend on anything so it 
could be first patch in series, then this code could be moved to the 
patch which adds subscriptions.

>
>> @@ -327,6 +431,18 @@ DropSubscriptionById(Oid subid)
>>  {
>>      Relation    rel;
>>      HeapTuple    tup;
>> +    Datum        datum;
>> +    bool        isnull;
>> +    char       *subname;
>> +    char       *conninfo;
>> +    char       *slotname;
>> +    RepOriginId    originid;
>> +    MemoryContext            tmpctx,
>> +                            oldctx;
>> +    WalReceiverConnHandle  *wrchandle = NULL;
>> +    WalReceiverConnAPI       *wrcapi = NULL;
>> +    walrcvconn_init_fn        walrcvconn_init;
>> +    LogicalRepWorker       *worker;
>>
>>      check_subscription_permissions();
>>
>> @@ -337,9 +453,135 @@ DropSubscriptionById(Oid subid)
>>      if (!HeapTupleIsValid(tup))
>>          elog(ERROR, "cache lookup failed for subscription %u", subid);
>>
>> +    /*
>> +     * Create temporary memory context to keep copy of subscription
>> +     * info needed later in the execution.
>> +     */
>> +    tmpctx = AllocSetContextCreate(TopMemoryContext,
>> +                                          "DropSubscription Ctx",
>> +                                          ALLOCSET_DEFAULT_MINSIZE,
>> +                                          ALLOCSET_DEFAULT_INITSIZE,
>> +                                          ALLOCSET_DEFAULT_MAXSIZE);
>> +    oldctx = MemoryContextSwitchTo(tmpctx);
>> +
>> +    /* Get subname */
>> +    datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
>> +                            Anum_pg_subscription_subname, &isnull);
>> +    Assert(!isnull);
>> +    subname = pstrdup(NameStr(*DatumGetName(datum)));
>> +
>> +    /* Get conninfo */
>> +    datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
>> +                            Anum_pg_subscription_subconninfo, &isnull);
>> +    Assert(!isnull);
>> +    conninfo = pstrdup(TextDatumGetCString(datum));
>> +
>> +    /* Get slotname */
>> +    datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
>> +                            Anum_pg_subscription_subslotname, &isnull);
>> +    Assert(!isnull);
>> +    slotname = pstrdup(NameStr(*DatumGetName(datum)));
>> +
>> +    MemoryContextSwitchTo(oldctx);
>> +
>> +    /* Remove the tuple from catalog. */
>>      simple_heap_delete(rel, &tup->t_self);
>>
>> -    ReleaseSysCache(tup);
>> +    /* Protect against launcher restarting the worker. */
>> +    LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
>>
>> -    heap_close(rel, RowExclusiveLock);
>> +    /* Kill the apply worker so that the slot becomes accessible. */
>> +    LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
>> +    worker = logicalrep_worker_find(subid);
>> +    if (worker)
>> +        logicalrep_worker_stop(worker);
>> +    LWLockRelease(LogicalRepWorkerLock);
>> +
>> +    /* Wait for apply process to die. */
>> +    for (;;)
>> +    {
>> +        int    rc;
>> +
>> +        CHECK_FOR_INTERRUPTS();
>> +
>> +        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
>> +        if (logicalrep_worker_count(subid) < 1)
>> +        {
>> +            LWLockRelease(LogicalRepWorkerLock);
>> +            break;
>> +        }
>> +        LWLockRelease(LogicalRepWorkerLock);
>> +
>> +        /* Wait for more work. */
>> +        rc = WaitLatch(&MyProc->procLatch,
>> +                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
>> +                       1000L);
>> +
>> +        /* emergency bailout if postmaster has died */
>> +        if (rc & WL_POSTMASTER_DEATH)
>> +            proc_exit(1);
>> +
>> +        ResetLatch(&MyProc->procLatch);
>> +    }
>
> I'm really far from convinced this is the right layer to perform these
> operations.  Previously these routines were low level catalog
> manipulation routines. Now they're certainly not.
>

Well I do want to have this happen when the DDL is executed so that I 
can inform user about failure. I can move this code to a separate 
function but it will still be executed in this layer.

>
>> +    /*
>> +     * Now that the catalog update is done, try to reserve slot at the
>> +     * provider node using replication connection.
>> +     */
>> +    wrcapi = palloc0(sizeof(WalReceiverConnAPI));
>> +
>> +    walrcvconn_init = (walrcvconn_init_fn)
>> +        load_external_function("libpqwalreceiver",
>> +                               "_PG_walreceirver_conn_init", false, NULL);
>> +
>> +    if (walrcvconn_init == NULL)
>> +        elog(ERROR, "libpqwalreceiver does not declare _PG_walreceirver_conn_init symbol");
>
> This does rather reinforce my opinion that the _PG_init removal in
> libpqwalreceiver isn't useful.

I don't see how it helps, you said we'd still return struct from some 
interface so this would be more or less the same?

>
>> diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
>> index 699c934..fc998cd 100644
>> --- a/src/backend/postmaster/bgworker.c
>> +++ b/src/backend/postmaster/bgworker.c
>> @@ -93,6 +93,9 @@ struct BackgroundWorkerHandle
>>
>>  static BackgroundWorkerArray *BackgroundWorkerData;
>>
>> +/* Enables registration of internal background workers. */
>> +bool internal_bgworker_registration_in_progress = false;
>> +
>>  /*
>>   * Calculate shared memory needed.
>>   */
>> @@ -745,7 +748,8 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
>>          ereport(DEBUG1,
>>           (errmsg("registering background worker \"%s\"", worker->bgw_name)));
>>
>> -    if (!process_shared_preload_libraries_in_progress)
>> +    if (!process_shared_preload_libraries_in_progress &&
>> +        !internal_bgworker_registration_in_progress)
>>      {
>>          if (!IsUnderPostmaster)
>>              ereport(LOG,
>
> Ugh.
>
>
>
>
>>  /*
>> + * Register internal background workers.
>> + *
>> + * This is here mainly because the permanent bgworkers are normally allowed
>> + * to be registered only when share preload libraries are loaded which does
>> + * not work for the internal ones.
>> + */
>> +static void
>> +register_internal_bgworkers(void)
>> +{
>> +    internal_bgworker_registration_in_progress = true;
>> +
>> +    /* Register the logical replication worker launcher if appropriate. */
>> +    if (!IsBinaryUpgrade && max_logical_replication_workers > 0)
>> +    {
>> +        BackgroundWorker bgw;
>> +
>> +        bgw.bgw_flags =    BGWORKER_SHMEM_ACCESS |
>> +            BGWORKER_BACKEND_DATABASE_CONNECTION;
>> +        bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
>> +        bgw.bgw_main = ApplyLauncherMain;
>> +        snprintf(bgw.bgw_name, BGW_MAXLEN,
>> +                 "logical replication launcher");
>> +        bgw.bgw_restart_time = 5;
>> +        bgw.bgw_notify_pid = 0;
>> +        bgw.bgw_main_arg = (Datum) 0;
>> +
>> +        RegisterBackgroundWorker(&bgw);
>> +    }
>> +
>> +    internal_bgworker_registration_in_progress = false;
>> +}
>
> Who says these flags are right for everyone?  If we indeed want to go
> through bgworkers here, I think you'll have to generallize this a bit,
> so we don't check for max_logical_replication_workers and such here.  We
> could e.g. have the shared memory sizing hooks set up a chain of
> registrations.
>

It could be more generalized, I agree, this is more of a WIP hack.

I would like to make special version of RegisterBackgroundWorker called 
something like RegisterInternalBackgroundWorker that does something 
similar as the above function (obviously the if should be moved to the 
caller of that function). The main point here is to be able to register 
static worker without extension.

>
>
>> -static void
>> +static char *
>>  libpqrcv_identify_system(WalReceiverConnHandle *handle,
>> -                         TimeLineID *primary_tli)
>> +                         TimeLineID *primary_tli,
>> +                         char **dbname)
>>  {
>> +    char       *sysid;
>>      PGresult   *res;
>> -    char       *primary_sysid;
>> -    char        standby_sysid[32];
>>
>>      /*
>>       * Get the system identifier and timeline ID as a DataRow message from the
>> @@ -231,24 +234,19 @@ libpqrcv_identify_system(WalReceiverConnHandle *handle,
>>                   errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more
fields.",
>>                             ntuples, nfields, 3, 1)));
>>      }
>> -    primary_sysid = PQgetvalue(res, 0, 0);
>> +    sysid = pstrdup(PQgetvalue(res, 0, 0));
>>      *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
>> -
>> -    /*
>> -     * Confirm that the system identifier of the primary is the same as ours.
>> -     */
>> -    snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
>> -             GetSystemIdentifier());
>> -    if (strcmp(primary_sysid, standby_sysid) != 0)
>> +    if (dbname)
>>      {
>> -        primary_sysid = pstrdup(primary_sysid);
>> -        PQclear(res);
>> -        ereport(ERROR,
>> -                (errmsg("database system identifier differs between the primary and standby"),
>> -                 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
>> -                           primary_sysid, standby_sysid)));
>> +        if (PQgetisnull(res, 0, 3))
>> +            *dbname = NULL;
>> +        else
>> +            *dbname = pstrdup(PQgetvalue(res, 0, 3));
>>      }
>> +
>>      PQclear(res);
>> +
>> +    return sysid;
>>  }
>>
>>  /*
>> @@ -274,7 +272,7 @@ libpqrcv_create_slot(WalReceiverConnHandle *handle, char *slotname,
>>
>>      if (PQresultStatus(res) != PGRES_TUPLES_OK)
>>      {
>> -        elog(FATAL, "could not crate replication slot \"%s\": %s\n",
>> +        elog(ERROR, "could not crate replication slot \"%s\": %s\n",
>>               slotname, PQerrorMessage(handle->streamConn));
>>      }
>>
>> @@ -287,6 +285,28 @@ libpqrcv_create_slot(WalReceiverConnHandle *handle, char *slotname,
>>      return snapshot;
>>  }
>>
>> +/*
>> + * Drop replication slot.
>> + */
>> +static void
>> +libpqrcv_drop_slot(WalReceiverConnHandle *handle, char *slotname)
>> +{
>> +    PGresult       *res;
>> +    char            cmd[256];
>> +
>> +    snprintf(cmd, sizeof(cmd),
>> +             "DROP_REPLICATION_SLOT \"%s\"", slotname);
>> +
>> +    res = libpqrcv_PQexec(handle, cmd);
>> +
>> +    if (PQresultStatus(res) != PGRES_COMMAND_OK)
>> +    {
>> +        elog(ERROR, "could not drop replication slot \"%s\": %s\n",
>> +             slotname, PQerrorMessage(handle->streamConn));
>> +    }
>> +
>> +    PQclear(res);
>> +}
>
>
> Given that the earlier commit to libpqwalreciever added a lot of this
> information, it doesn't seem right to change it again here.
>

Why? It's pretty unrelated to the previous change which is basically 
just refactoring, this actually adds new functionality.

--   Petr Jelinek                  http://www.2ndQuadrant.com/  PostgreSQL Development, 24x7 Support, Training &
Services



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Jesper Pedersen
Дата:
Сообщение: Re: Hash Indexes
Следующее
От: Petr Jelinek
Дата:
Сообщение: Re: Logical Replication WIP