Re: Logical Replication WIP

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Logical Replication WIP
Дата
Msg-id 20160913224850.6s6z6x7i2lvq5dma@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: Logical Replication WIP  (Petr Jelinek <petr@2ndquadrant.com>)
Ответы Re: Logical Replication WIP  (Petr Jelinek <petr@2ndquadrant.com>)
Список pgsql-hackers
Hi,

First read through the current version. Hence no real architectural
comments.

On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:

> diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
> new file mode 100644
> index 0000000..e0c719d
> --- /dev/null
> +++ b/src/backend/commands/publicationcmds.c
> @@ -0,0 +1,761 @@
> +/*-------------------------------------------------------------------------
> + *
> + * publicationcmds.c
> + *        publication manipulation
> + *
> + * Copyright (c) 2015, PostgreSQL Global Development Group
> + *
> + * IDENTIFICATION
> + *        publicationcmds.c
>

Not that I'm a fan of this line in the first place, but usually it does
include the path.


> +static void
> +check_replication_permissions(void)
> +{
> +    if (!superuser() && !has_rolreplication(GetUserId()))
> +        ereport(ERROR,
> +                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
> +                 (errmsg("must be superuser or replication role to manipulate publications"))));
> +}

Do we want to require owner privileges for replication roles? I'd say
no, but want to raise the question.


> +ObjectAddress
> +CreatePublication(CreatePublicationStmt *stmt)
> +{
> +    Relation    rel;
> +    ObjectAddress myself;
> +    Oid            puboid;
> +    bool        nulls[Natts_pg_publication];
> +    Datum        values[Natts_pg_publication];
> +    HeapTuple    tup;
> +    bool        replicate_insert_given;
> +    bool        replicate_update_given;
> +    bool        replicate_delete_given;
> +    bool        replicate_insert;
> +    bool        replicate_update;
> +    bool        replicate_delete;
> +
> +    check_replication_permissions();
> +
> +    rel = heap_open(PublicationRelationId, RowExclusiveLock);
> +
> +    /* Check if name is used */
> +    puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
> +    if (OidIsValid(puboid))
> +    {
> +        ereport(ERROR,
> +                (errcode(ERRCODE_DUPLICATE_OBJECT),
> +                 errmsg("publication \"%s\" already exists",
> +                        stmt->pubname)));
> +    }
> +
> +    /* Form a tuple. */
> +    memset(values, 0, sizeof(values));
> +    memset(nulls, false, sizeof(nulls));
> +
> +    values[Anum_pg_publication_pubname - 1] =
> +        DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
> +
> +    parse_publication_options(stmt->options,
> +                              &replicate_insert_given, &replicate_insert,
> +                              &replicate_update_given, &replicate_update,
> +                              &replicate_delete_given, &replicate_delete);
> +
> +    values[Anum_pg_publication_puballtables - 1] =
> +        BoolGetDatum(stmt->for_all_tables);
> +    values[Anum_pg_publication_pubreplins - 1] =
> +        BoolGetDatum(replicate_insert);
> +    values[Anum_pg_publication_pubreplupd - 1] =
> +        BoolGetDatum(replicate_update);
> +    values[Anum_pg_publication_pubrepldel - 1] =
> +        BoolGetDatum(replicate_delete);
> +
> +    tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
> +
> +    /* Insert tuple into catalog. */
> +    puboid = simple_heap_insert(rel, tup);
> +    CatalogUpdateIndexes(rel, tup);
> +    heap_freetuple(tup);
> +
> +    ObjectAddressSet(myself, PublicationRelationId, puboid);
> +
> +    /* Make the changes visible. */
> +    CommandCounterIncrement();
> +
> +    if (stmt->tables)
> +    {
> +        List       *rels;
> +
> +        Assert(list_length(stmt->tables) > 0);
> +
> +        rels = GatherTableList(stmt->tables);
> +        PublicationAddTables(puboid, rels, true, NULL);
> +        CloseTables(rels);
> +    }
> +    else if (stmt->for_all_tables || stmt->schema)
> +    {
> +        List       *rels;
> +
> +        rels = GatherTables(stmt->schema);
> +        PublicationAddTables(puboid, rels, true, NULL);
> +        CloseTables(rels);
> +    }

Isn't this (and ALTER) racy? What happens if tables are concurrently
created? This session wouldn't necessarily see the tables, and other
sessions won't see for_all_tables/schema.   Evaluating
for_all_tables/all_in_schema when the publication is used, would solve
that problem.

> +/*
> + * Gather all tables optinally filtered by schema name.
> + * The gathered tables are locked in access share lock mode.
> + */
> +static List *
> +GatherTables(char *nspname)
> +{
> +    Oid            nspid = InvalidOid;
> +    List       *rels = NIL;
> +    Relation    rel;
> +    SysScanDesc scan;
> +    ScanKeyData key[1];
> +    HeapTuple    tup;
> +
> +    /* Resolve and validate the schema if specified */
> +    if (nspname)
> +    {
> +        nspid = LookupExplicitNamespace(nspname, false);
> +        if (IsSystemNamespace(nspid) || IsToastNamespace(nspid))
> +            ereport(ERROR,
> +                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                     errmsg("only tables in user schemas can be added to publication"),
> +                     errdetail("%s is a system schema", strVal(nspname))));
> +    }

Why are we restricting pg_catalog here? There's a bunch of extensions
creating objects therein, and we allow that. Seems better to just rely
on the IsSystemClass check for that below.

> +/*
> + * Gather Relations based o provided by RangeVar list.
> + * The gathered tables are locked in access share lock mode.
> + */

Why access share? Shouldn't we make this ShareUpdateExclusive or
similar, to prevent schema changes?


> +static List *
> +GatherTableList(List *tables)
> +{
> +    List       *relids = NIL;
> +    List       *rels = NIL;
> +    ListCell   *lc;
> +
> +    /*
> +     * Open, share-lock, and check all the explicitly-specified relations
> +     */
> +    foreach(lc, tables)
> +    {
> +        RangeVar   *rv = lfirst(lc);
> +        Relation    rel;
> +        bool        recurse = interpretInhOption(rv->inhOpt);
> +        Oid            myrelid;
> +
> +        rel = heap_openrv(rv, AccessShareLock);
> +        myrelid = RelationGetRelid(rel);
> +        /* don't throw error for "foo, foo" */
> +        if (list_member_oid(relids, myrelid))
> +        {
> +            heap_close(rel, AccessShareLock);
> +            continue;
> +        }
> +        rels = lappend(rels, rel);
> +        relids = lappend_oid(relids, myrelid);
> +
> +        if (recurse)
> +        {
> +            ListCell   *child;
> +            List       *children;
> +
> +            children = find_all_inheritors(myrelid, AccessShareLock,
> +                                           NULL);
> +
> +            foreach(child, children)
> +            {
> +                Oid            childrelid = lfirst_oid(child);
> +
> +                if (list_member_oid(relids, childrelid))
> +                    continue;
> +
> +                /* find_all_inheritors already got lock */
> +                rel = heap_open(childrelid, NoLock);
> +                rels = lappend(rels, rel);
> +                relids = lappend_oid(relids, childrelid);
> +            }
> +        }
> +    }

Hm, can't this yield duplicates, when both an inherited and a top level
relation are specified?


> @@ -713,6 +714,25 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
>      ObjectAddressSet(address, RelationRelationId, relationId);
>  
>      /*
> +     * If the newly created relation is a table and there are publications
> +     * which were created as FOR ALL TABLES, we want to add the relation
> +     * membership to those publications.
> +     */
> +
> +    if (relkind == RELKIND_RELATION)
> +    {
> +        List       *pubids = GetAllTablesPublications();
> +        ListCell   *lc;
> +
> +        foreach(lc, pubids)
> +        {
> +            Oid    pubid = lfirst_oid(lc);
> +
> +            publication_add_relation(pubid, rel, false);
> +        }
> +    }
> +

Hm, this has the potential to noticeably slow down table creation.

> +publication_opt_item:
> +            IDENT
> +                {
> +                    /*
> +                     * We handle identifiers that aren't parser keywords with
> +                     * the following special-case codes, to avoid bloating the
> +                     * size of the main parser.
> +                     */
> +                    if (strcmp($1, "replicate_insert") == 0)
> +                        $$ = makeDefElem("replicate_insert",
> +                                         (Node *)makeInteger(TRUE), @1);
> +                    else if (strcmp($1, "noreplicate_insert") == 0)
> +                        $$ = makeDefElem("replicate_insert",
> +                                         (Node *)makeInteger(FALSE), @1);
> +                    else if (strcmp($1, "replicate_update") == 0)
> +                        $$ = makeDefElem("replicate_update",
> +                                         (Node *)makeInteger(TRUE), @1);
> +                    else if (strcmp($1, "noreplicate_update") == 0)
> +                        $$ = makeDefElem("replicate_update",
> +                                         (Node *)makeInteger(FALSE), @1);
> +                    else if (strcmp($1, "replicate_delete") == 0)
> +                        $$ = makeDefElem("replicate_delete",
> +                                         (Node *)makeInteger(TRUE), @1);
> +                    else if (strcmp($1, "noreplicate_delete") == 0)
> +                        $$ = makeDefElem("replicate_delete",
> +                                         (Node *)makeInteger(FALSE), @1);
> +                    else
> +                        ereport(ERROR,
> +                                (errcode(ERRCODE_SYNTAX_ERROR),
> +                                 errmsg("unrecognized publication option \"%s\"", $1),
> +                                     parser_errposition(@1)));
> +                }
> +        ;

I'm kind of inclined to do this checking at execution (or transform)
time instead.  That allows extension to add options, and handle them in
utility hooks.

> +
> +/* ----------------
> + *        pg_publication_rel definition.  cpp turns this into
> + *        typedef struct FormData_pg_publication_rel
> + *
> + * ----------------
> + */
> +#define PublicationRelRelationId                6106
> +
> +CATALOG(pg_publication_rel,6106)
> +{
> +    Oid        pubid;                /* Oid of the publication */
> +    Oid        relid;                /* Oid of the relation */
> +} FormData_pg_publication_rel;

Hm. Do we really want this to have an oid? Won't that significantly,
especially if multiple publications are present, increase our oid
consumption?  It seems entirely sufficient to identify rows in here
using (pubid, relid).


> +ObjectAddress
> +CreateSubscription(CreateSubscriptionStmt *stmt)
> +{
> +    Relation    rel;
> +    ObjectAddress myself;
> +    Oid            subid;
> +    bool        nulls[Natts_pg_subscription];
> +    Datum        values[Natts_pg_subscription];
> +    HeapTuple    tup;
> +    bool        enabled_given;
> +    bool        enabled;
> +    char       *conninfo;
> +    List       *publications;
> +
> +    check_subscription_permissions();
> +
> +    rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
> +
> +    /* Check if name is used */
> +    subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
> +                            CStringGetDatum(stmt->subname));
> +    if (OidIsValid(subid))
> +    {
> +        ereport(ERROR,
> +                (errcode(ERRCODE_DUPLICATE_OBJECT),
> +                 errmsg("subscription \"%s\" already exists",
> +                        stmt->subname)));
> +    }
> +
> +    /* Parse and check options. */
> +    parse_subscription_options(stmt->options, &enabled_given, &enabled,
> +                               &conninfo, &publications);
> +
> +    /* TODO: improve error messages here. */
> +    if (conninfo == NULL)
> +        ereport(ERROR,
> +                (errcode(ERRCODE_SYNTAX_ERROR),
> +                 errmsg("connection not specified")));

Probably also makes sense to parse the conninfo here to verify it looks
saen.  Although that's fairly annoying to do, because the relevant code
is libpq :(


> diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
> index 65230e2..f3d54c8 100644
> --- a/src/backend/nodes/copyfuncs.c
> +++ b/src/backend/nodes/copyfuncs.c

I think you might be missing outfuncs support.

> +
> +CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHEMA_MACRO
> +{
> +    Oid            subdbid;            /* Database the subscription is in. */
> +    NameData    subname;        /* Name of the subscription */
> +    bool        subenabled;        /* True if the subsription is enabled (running) */

Not sure what "running" means here.

> +#ifdef CATALOG_VARLEN            /* variable-length fields start here */
> +    text        subconninfo;    /* Connection string to the provider */
> +    NameData    subslotname;    /* Slot name on provider */
> +
> +    name        subpublications[1];    /* List of publications subscribed to */
> +#endif
> +} FormData_pg_subscription;

> +    <varlistentry>
> +     <term>
> +      publication_names
> +     </term>
> +     <listitem>
> +      <para>
> +       Comma separated list of publication names for which to subscribe
> +       (receive changes). See
> +       <xref linkend="logical-replication-publication"> for more info.
> +      </para>
> +     </listitem>
> +    </varlistentry>
> +   </variablelist>

Do we need to specify an escaping scheme here?

> +  <para>
> +   Every DML message contains arbitraty relation id, which can be mapped to

Typo: "arbitraty"


> +<listitem>
> +<para>
> +                Commit timestamp of the transaction.
> +</para>
> +</listitem>
> +</varlistentry>

Perhaps mention it's relative to postgres epoch?



> +<variablelist>
> +<varlistentry>
> +<term>
> +        Byte1('O')
> +</term>
> +<listitem>
> +<para>
> +                Identifies the message as an origin message.
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        Int64
> +</term>
> +<listitem>
> +<para>
> +                The LSN of the commit on the origin server.
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        Int8
> +</term>
> +<listitem>
> +<para>
> +                Length of the origin name (including the NULL-termination
> +                character).
> +</para>
> +</listitem>
> +</varlistentry>

Should this explain that there could be mulitple origin messages (when
replay switched origins during an xact)?

> +<para>
> +                Relation name.
> +</para>
> +</listitem>
> +</varlistentry>
> +</variablelist>
> +
> +</para>
> +
> +<para>
> +This message is always followed by Attributes message.
> +</para>

What's the point of having this separate from the relation message?

> +<varlistentry>
> +<term>
> +        Byte1('C')
> +</term>
> +<listitem>
> +<para>
> +                Start of column block.
> +</para>
> +</listitem>

"block"?

> +</varlistentry><varlistentry>
> +<term>
> +        Int8
> +</term>
> +<listitem>
> +<para>
> +                Flags for the column. Currently can be either 0 for no flags
> +                or one which marks the column as part of the key.
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        Int8
> +</term>
> +<listitem>
> +<para>
> +                Length of column name (including the NULL-termination
> +                character).
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        String
> +</term>
> +<listitem>
> +<para>
> +                Name of the column.
> +</para>
> +</listitem>
> +</varlistentry>

Huh, no type information?



> +<varlistentry>
> +<term>
> +        Byte1('O')
> +</term>
> +<listitem>
> +<para>
> +                Identifies the following TupleData message as the old tuple
> +                (deleted tuple).
> +</para>
> +</listitem>
> +</varlistentry>

Should we discern between old key and old tuple?


> +#define IS_REPLICA_IDENTITY    1

Defining this in the c file doesn't seem particularly useful?



> +/*
> + * Read transaction BEGIN from the stream.
> + */
> +void
> +logicalrep_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
> +                      TimestampTz *committime, TransactionId *remote_xid)
> +{
> +    /* read fields */
> +    *remote_lsn = pq_getmsgint64(in);
> +    Assert(*remote_lsn != InvalidXLogRecPtr);
> +    *committime = pq_getmsgint64(in);
> +    *remote_xid = pq_getmsgint(in, 4);
> +}

In network exposed stuff it seems better not to use assert, and error
out instead.


> +/*
> + * Write UPDATE to the output stream.
> + */
> +void
> +logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
> +                       HeapTuple newtuple)
> +{
> +    pq_sendbyte(out, 'U');        /* action UPDATE */
> +
> +    /* use Oid as relation identifier */
> +    pq_sendint(out, RelationGetRelid(rel), 4);

Wonder if there's a way that could screw us. What happens if there's an
oid wraparound, and a relation is dropped? Then a new relation could end
up with same id. Maybe answered somewhere further down.


> +/*
> + * Write a tuple to the outputstream, in the most efficient format possible.
> + */
> +static void
> +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
> +{

> +    /* Write the values */
> +    for (i = 0; i < desc->natts; i++)
> +    {
> +        outputstr =    OidOutputFunctionCall(typclass->typoutput, values[i]);

Odd spacing.



> +/*
> + * Initialize this plugin
> + */
> +static void
> +pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
> +                  bool is_init)
> +{
> +    PGOutputData   *data = palloc0(sizeof(PGOutputData));
> +    int                client_encoding;
> +
> +    /* Create our memory context for private allocations. */
> +    data->context = AllocSetContextCreate(ctx->context,
> +                                          "logical replication output context",
> +                                          ALLOCSET_DEFAULT_MINSIZE,
> +                                          ALLOCSET_DEFAULT_INITSIZE,
> +                                          ALLOCSET_DEFAULT_MAXSIZE);
> +
> +    ctx->output_plugin_private = data;
> +
> +    /*
> +     * This is replication start and not slot initialization.
> +     *
> +     * Parse and validate options passed by the client.
> +     */
> +    if (!is_init)
> +    {
> +        /* We can only do binary */
> +        if (opt->output_type != OUTPUT_PLUGIN_BINARY_OUTPUT)
> +            ereport(ERROR,
> +                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                     errmsg("only binary mode is supported for logical replication protocol")));

Shouldn't you just set            opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
or is the goal just to output a better message?

> +
> +/*
> + * COMMIT callback
> + */
> +static void
> +pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> +                     XLogRecPtr commit_lsn)
> +{
> +    OutputPluginPrepareWrite(ctx, true);
> +    logicalrep_write_commit(ctx->out, txn, commit_lsn);
> +    OutputPluginWrite(ctx, true);
> +}

Hm, so we don't reset the context for these...

> +/*
> + * Sends the decoded DML over wire.
> + */
> +static void
> +pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> +                Relation relation, ReorderBufferChange *change)
> +{

> +    /* Avoid leaking memory by using and resetting our own context */
> +    old = MemoryContextSwitchTo(data->context);
> +
> +    /*
> +     * Write the relation schema if the current schema haven't been sent yet.
> +     */
> +    if (!relentry->schema_sent)
> +    {
> +        OutputPluginPrepareWrite(ctx, false);
> +        logicalrep_write_rel(ctx->out, relation);
> +        OutputPluginWrite(ctx, false);
> +        relentry->schema_sent = true;
> +    }
> +
> +    /* Send the data */
> +    switch (change->action)
> +    {
...
> +    /* Cleanup */
> +    MemoryContextSwitchTo(old);
> +    MemoryContextReset(data->context);
> +}

IIRC there were some pfree's in called functions. It's probably better
to remove those and rely on this.

> +/*
> + * Load publications from the list of publication names.
> + */
> +static List *
> +LoadPublications(List *pubnames)
> +{
> +    List       *result = NIL;
> +    ListCell   *lc;
> +
> +    foreach (lc, pubnames)
> +    {
> +        char           *pubname = (char *) lfirst(lc);
> +        Publication       *pub = GetPublicationByName(pubname, false);
> +
> +        result = lappend(result, pub);
> +    }
> +
> +    return result;
> +}

Why are we doing this eagerly? On systems with a lot of relations
this'll suck up a fair amount of memory, without much need?

> +/*
> + * Remove all the entries from our relation cache.
> + */
> +static void
> +destroy_rel_sync_cache(void)
> +{
> +    HASH_SEQ_STATUS        status;
> +    RelationSyncEntry  *entry;
> +
> +    if (RelationSyncCache == NULL)
> +        return;
> +
> +    hash_seq_init(&status, RelationSyncCache);
> +
> +    while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
> +    {
> +        if (hash_search(RelationSyncCache, (void *) &entry->relid,
> +                        HASH_REMOVE, NULL) == NULL)
> +            elog(ERROR, "hash table corrupted");
> +    }
> +
> +    RelationSyncCache = NULL;
> +}

Any reason not to just destroy the hash table instead?





> +enum {
> +    PARAM_UNRECOGNISED,
> +    PARAM_PROTOCOL_VERSION,
> +    PARAM_ENCODING,
> +    PARAM_PG_VERSION,
> +    PARAM_PUBLICATION_NAMES,
> +} OutputPluginParamKey;
> +
> +typedef struct {
> +    const char * const paramname;
> +    int    paramkey;
> +} OutputPluginParam;
> +
> +/* Oh, if only C had switch on strings */
> +static OutputPluginParam param_lookup[] = {
> +    {"proto_version", PARAM_PROTOCOL_VERSION},
> +    {"encoding", PARAM_ENCODING},
> +    {"pg_version", PARAM_PG_VERSION},
> +    {"publication_names", PARAM_PUBLICATION_NAMES},
> +    {NULL, PARAM_UNRECOGNISED}
> +};
> +
> +
> +/*
> + * Read parameters sent by client at startup and store recognised
> + * ones in the parameters PGOutputData.
> + *
> + * The data must have all client-supplied parameter fields zeroed,
> + * such as by memset or palloc0, since values not supplied
> + * by the client are not set.
> + */
> +void
> +pgoutput_process_parameters(List *options, PGOutputData *data)
> +{
> +    ListCell    *lc;
> +
> +    /* Examine all the other params in the message. */
> +    foreach(lc, options)
> +    {
> +        DefElem    *elem = lfirst(lc);
> +        Datum        val;
> +
> +        Assert(elem->arg == NULL || IsA(elem->arg, String));
> +
> +        /* Check each param, whether or not we recognise it */
> +        switch(get_param_key(elem->defname))
> +        {
> +            case PARAM_PROTOCOL_VERSION:
> +                val = get_param_value(elem, OUTPUT_PARAM_TYPE_UINT32, false);
> +                data->protocol_version = DatumGetUInt32(val);
> +                break;
> +
> +            case PARAM_ENCODING:
> +                val = get_param_value(elem, OUTPUT_PARAM_TYPE_STRING, false);
> +                data->client_encoding = DatumGetCString(val);
> +                break;
> +
> +            case PARAM_PG_VERSION:
> +                val = get_param_value(elem, OUTPUT_PARAM_TYPE_UINT32, false);
> +                data->client_pg_version = DatumGetUInt32(val);
> +                break;
> +
> +            case PARAM_PUBLICATION_NAMES:
> +                val = get_param_value(elem, OUTPUT_PARAM_TYPE_STRING, false);
> +                if (!SplitIdentifierString(DatumGetCString(val), ',',
> +                                           &data->publication_names))
> +                    ereport(ERROR,
> +                            (errcode(ERRCODE_INVALID_NAME),
> +                             errmsg("invalid publication name syntax")));
> +
> +                break;
> +
> +            default:
> +                ereport(ERROR,
> +                        (errmsg("Unrecognised pgoutput parameter %s",
> +                                elem->defname)));
> +                break;
> +        }
> +    }
> +}
> +
> +/*
> + * Look up a param name to find the enum value for the
> + * param, or PARAM_UNRECOGNISED if not found.
> + */
> +static int
> +get_param_key(const char * const param_name)
> +{
> +    OutputPluginParam *param = ¶m_lookup[0];
> +
> +    do {
> +        if (strcmp(param->paramname, param_name) == 0)
> +            return param->paramkey;
> +        param++;
> +    } while (param->paramname != NULL);
> +
> +    return PARAM_UNRECOGNISED;
> +}

I'm not following why this isn't just one routine with a chain of
else if (strmcp() == 0)
blocks?


> From 2241471aec03de553126c2d5fc012fcba1ecf50d Mon Sep 17 00:00:00 2001
> From: Petr Jelinek <pjmodos@pjmodos.net>
> Date: Wed, 6 Jul 2016 13:59:23 +0200
> Subject: [PATCH 4/6] Make libpqwalreceiver reentrant
> 
> ---
>  .../libpqwalreceiver/libpqwalreceiver.c            | 328 ++++++++++++++-------
>  src/backend/replication/walreceiver.c              |  67 +++--
>  src/include/replication/walreceiver.h              |  75 +++--
>  3 files changed, 306 insertions(+), 164 deletions(-)
> 
> diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> index f1c843e..5da4474 100644
> --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> @@ -25,6 +25,7 @@
>  #include "miscadmin.h"
>  #include "replication/walreceiver.h"
>  #include "utils/builtins.h"
> +#include "utils/pg_lsn.h"
>  
>  #ifdef HAVE_POLL_H
>  #include <poll.h>
> @@ -38,62 +39,83 @@
>  
>  PG_MODULE_MAGIC;
>  
> -void        _PG_init(void);
> +struct WalReceiverConnHandle {
> +    /* Current connection to the primary, if any */
> +    PGconn *streamConn;
> +    /* Buffer for currently read records */
> +    char   *recvBuf;
> +};

newline before {

> -/* Current connection to the primary, if any */
> -static PGconn *streamConn = NULL;
> -
> -/* Buffer for currently read records */
> -static char *recvBuf = NULL;

Yuck, this indeed seems better.

>  
>  /*
> - * Module load callback
> + * Module initialization callback
>   */
> -void
> -_PG_init(void)
> +WalReceiverConnHandle *
> +_PG_walreceirver_conn_init(WalReceiverConnAPI *wrcapi)
>  {
> -    /* Tell walreceiver how to reach us */
> -    if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
> -        walrcv_readtimelinehistoryfile != NULL ||
> -        walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
> -        walrcv_receive != NULL || walrcv_send != NULL ||
> -        walrcv_disconnect != NULL)
> -        elog(ERROR, "libpqwalreceiver already loaded");
> -    walrcv_connect = libpqrcv_connect;
> -    walrcv_get_conninfo = libpqrcv_get_conninfo;
> -    walrcv_identify_system = libpqrcv_identify_system;
> -    walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
> -    walrcv_startstreaming = libpqrcv_startstreaming;
> -    walrcv_endstreaming = libpqrcv_endstreaming;
> -    walrcv_receive = libpqrcv_receive;
> -    walrcv_send = libpqrcv_send;
> -    walrcv_disconnect = libpqrcv_disconnect;
> +    WalReceiverConnHandle *handle;
> +
> +    handle = palloc0(sizeof(WalReceiverConnHandle));
> +
> +    /* Tell caller how to reach us */
> +    wrcapi->connect = libpqrcv_connect;
> +    wrcapi->get_conninfo = libpqrcv_get_conninfo;
> +    wrcapi->identify_system = libpqrcv_identify_system;
> +    wrcapi->readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
> +    wrcapi->create_slot = libpqrcv_create_slot;
> +    wrcapi->startstreaming_physical = libpqrcv_startstreaming_physical;
> +    wrcapi->startstreaming_logical = libpqrcv_startstreaming_logical;
> +    wrcapi->endstreaming = libpqrcv_endstreaming;
> +    wrcapi->receive = libpqrcv_receive;
> +    wrcapi->send = libpqrcv_send;
> +    wrcapi->disconnect = libpqrcv_disconnect;
> +
> +    return handle;
>  }

This however I'm not following. Why do we need multiple copies of this?
And why aren't we doing the assignments in _PG_init?  Seems better to
just allocate one WalRcvCalllbacks globally and assign all these as
constants.  Then the establishment function can just return all these
(as part of a bigger struct).


(skipped logical rep docs)

> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
> index 8acdff1..34007d3 100644
> --- a/doc/src/sgml/reference.sgml
> +++ b/doc/src/sgml/reference.sgml
> @@ -54,11 +54,13 @@
>     &alterOperatorClass;
>     &alterOperatorFamily;
>     &alterPolicy;
> +   &alterPublication;
>     &alterRole;
>     &alterRule;
>     &alterSchema;
>     &alterSequence;
>     &alterServer;
> +   &alterSubscription;
>     &alterSystem;
>     &alterTable;
>     &alterTableSpace;
> @@ -100,11 +102,13 @@
>     &createOperatorClass;
>     &createOperatorFamily;
>     &createPolicy;
> +   &createPublication;
>     &createRole;
>     &createRule;
>     &createSchema;
>     &createSequence;
>     &createServer;
> +   &createSubscription;
>     &createTable;
>     &createTableAs;
>     &createTableSpace;
> @@ -144,11 +148,13 @@
>     &dropOperatorFamily;
>     &dropOwned;
>     &dropPolicy;
> +   &dropPublication;
>     &dropRole;
>     &dropRule;
>     &dropSchema;
>     &dropSequence;
>     &dropServer;
> +   &dropSubscription;
>     &dropTable;
>     &dropTableSpace;
>     &dropTSConfig;

Hm, shouldn't all these have been registered in the earlier patch?



> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
> index d29d3f9..f2052b8 100644
> --- a/src/backend/commands/subscriptioncmds.c
> +++ b/src/backend/commands/subscriptioncmds.c

This sure is a lot of yanking around of previously added code.  At least
some of it looks like it should really have been part of the earlier
commit.


> @@ -327,6 +431,18 @@ DropSubscriptionById(Oid subid)
>  {
>      Relation    rel;
>      HeapTuple    tup;
> +    Datum        datum;
> +    bool        isnull;
> +    char       *subname;
> +    char       *conninfo;
> +    char       *slotname;
> +    RepOriginId    originid;
> +    MemoryContext            tmpctx,
> +                            oldctx;
> +    WalReceiverConnHandle  *wrchandle = NULL;
> +    WalReceiverConnAPI       *wrcapi = NULL;
> +    walrcvconn_init_fn        walrcvconn_init;
> +    LogicalRepWorker       *worker;
>  
>      check_subscription_permissions();
>  
> @@ -337,9 +453,135 @@ DropSubscriptionById(Oid subid)
>      if (!HeapTupleIsValid(tup))
>          elog(ERROR, "cache lookup failed for subscription %u", subid);
>  
> +    /*
> +     * Create temporary memory context to keep copy of subscription
> +     * info needed later in the execution.
> +     */
> +    tmpctx = AllocSetContextCreate(TopMemoryContext,
> +                                          "DropSubscription Ctx",
> +                                          ALLOCSET_DEFAULT_MINSIZE,
> +                                          ALLOCSET_DEFAULT_INITSIZE,
> +                                          ALLOCSET_DEFAULT_MAXSIZE);
> +    oldctx = MemoryContextSwitchTo(tmpctx);
> +
> +    /* Get subname */
> +    datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
> +                            Anum_pg_subscription_subname, &isnull);
> +    Assert(!isnull);
> +    subname = pstrdup(NameStr(*DatumGetName(datum)));
> +
> +    /* Get conninfo */
> +    datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
> +                            Anum_pg_subscription_subconninfo, &isnull);
> +    Assert(!isnull);
> +    conninfo = pstrdup(TextDatumGetCString(datum));
> +
> +    /* Get slotname */
> +    datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
> +                            Anum_pg_subscription_subslotname, &isnull);
> +    Assert(!isnull);
> +    slotname = pstrdup(NameStr(*DatumGetName(datum)));
> +
> +    MemoryContextSwitchTo(oldctx);
> +
> +    /* Remove the tuple from catalog. */
>      simple_heap_delete(rel, &tup->t_self);
>  
> -    ReleaseSysCache(tup);
> +    /* Protect against launcher restarting the worker. */
> +    LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
>  
> -    heap_close(rel, RowExclusiveLock);
> +    /* Kill the apply worker so that the slot becomes accessible. */
> +    LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +    worker = logicalrep_worker_find(subid);
> +    if (worker)
> +        logicalrep_worker_stop(worker);
> +    LWLockRelease(LogicalRepWorkerLock);
> +
> +    /* Wait for apply process to die. */
> +    for (;;)
> +    {
> +        int    rc;
> +
> +        CHECK_FOR_INTERRUPTS();
> +
> +        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +        if (logicalrep_worker_count(subid) < 1)
> +        {
> +            LWLockRelease(LogicalRepWorkerLock);
> +            break;
> +        }
> +        LWLockRelease(LogicalRepWorkerLock);
> +
> +        /* Wait for more work. */
> +        rc = WaitLatch(&MyProc->procLatch,
> +                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
> +                       1000L);
> +
> +        /* emergency bailout if postmaster has died */
> +        if (rc & WL_POSTMASTER_DEATH)
> +            proc_exit(1);
> +
> +        ResetLatch(&MyProc->procLatch);
> +    }

I'm really far from convinced this is the right layer to perform these
operations.  Previously these routines were low level catalog
manipulation routines. Now they're certainly not.


> +    /* Remove the origin trakicking. */

typo.



> +    /*
> +     * Now that the catalog update is done, try to reserve slot at the
> +     * provider node using replication connection.
> +     */
> +    wrcapi = palloc0(sizeof(WalReceiverConnAPI));
> +
> +    walrcvconn_init = (walrcvconn_init_fn)
> +        load_external_function("libpqwalreceiver",
> +                               "_PG_walreceirver_conn_init", false, NULL);
> +
> +    if (walrcvconn_init == NULL)
> +        elog(ERROR, "libpqwalreceiver does not declare _PG_walreceirver_conn_init symbol");

This does rather reinforce my opinion that the _PG_init removal in
libpqwalreceiver isn't useful.

> diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
> index 699c934..fc998cd 100644
> --- a/src/backend/postmaster/bgworker.c
> +++ b/src/backend/postmaster/bgworker.c
> @@ -93,6 +93,9 @@ struct BackgroundWorkerHandle
>  
>  static BackgroundWorkerArray *BackgroundWorkerData;
>  
> +/* Enables registration of internal background workers. */
> +bool internal_bgworker_registration_in_progress = false;
> +
>  /*
>   * Calculate shared memory needed.
>   */
> @@ -745,7 +748,8 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
>          ereport(DEBUG1,
>           (errmsg("registering background worker \"%s\"", worker->bgw_name)));
>  
> -    if (!process_shared_preload_libraries_in_progress)
> +    if (!process_shared_preload_libraries_in_progress &&
> +        !internal_bgworker_registration_in_progress)
>      {
>          if (!IsUnderPostmaster)
>              ereport(LOG,

Ugh.




>  /*
> + * Register internal background workers.
> + *
> + * This is here mainly because the permanent bgworkers are normally allowed
> + * to be registered only when share preload libraries are loaded which does
> + * not work for the internal ones.
> + */
> +static void
> +register_internal_bgworkers(void)
> +{
> +    internal_bgworker_registration_in_progress = true;
> +
> +    /* Register the logical replication worker launcher if appropriate. */
> +    if (!IsBinaryUpgrade && max_logical_replication_workers > 0)
> +    {
> +        BackgroundWorker bgw;
> +
> +        bgw.bgw_flags =    BGWORKER_SHMEM_ACCESS |
> +            BGWORKER_BACKEND_DATABASE_CONNECTION;
> +        bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
> +        bgw.bgw_main = ApplyLauncherMain;
> +        snprintf(bgw.bgw_name, BGW_MAXLEN,
> +                 "logical replication launcher");
> +        bgw.bgw_restart_time = 5;
> +        bgw.bgw_notify_pid = 0;
> +        bgw.bgw_main_arg = (Datum) 0;
> +
> +        RegisterBackgroundWorker(&bgw);
> +    }
> +
> +    internal_bgworker_registration_in_progress = false;
> +}

Who says these flags are right for everyone?  If we indeed want to go
through bgworkers here, I think you'll have to generallize this a bit,
so we don't check for max_logical_replication_workers and such here.  We
could e.g. have the shared memory sizing hooks set up a chain of
registrations.



> -static void
> +static char *
>  libpqrcv_identify_system(WalReceiverConnHandle *handle,
> -                         TimeLineID *primary_tli)
> +                         TimeLineID *primary_tli,
> +                         char **dbname)
>  {
> +    char       *sysid;
>      PGresult   *res;
> -    char       *primary_sysid;
> -    char        standby_sysid[32];
>  
>      /*
>       * Get the system identifier and timeline ID as a DataRow message from the
> @@ -231,24 +234,19 @@ libpqrcv_identify_system(WalReceiverConnHandle *handle,
>                   errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more
fields.",
>                             ntuples, nfields, 3, 1)));
>      }
> -    primary_sysid = PQgetvalue(res, 0, 0);
> +    sysid = pstrdup(PQgetvalue(res, 0, 0));
>      *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
> -
> -    /*
> -     * Confirm that the system identifier of the primary is the same as ours.
> -     */
> -    snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
> -             GetSystemIdentifier());
> -    if (strcmp(primary_sysid, standby_sysid) != 0)
> +    if (dbname)
>      {
> -        primary_sysid = pstrdup(primary_sysid);
> -        PQclear(res);
> -        ereport(ERROR,
> -                (errmsg("database system identifier differs between the primary and standby"),
> -                 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
> -                           primary_sysid, standby_sysid)));
> +        if (PQgetisnull(res, 0, 3))
> +            *dbname = NULL;
> +        else
> +            *dbname = pstrdup(PQgetvalue(res, 0, 3));
>      }
> +
>      PQclear(res);
> +
> +    return sysid;
>  }
>  
>  /*
> @@ -274,7 +272,7 @@ libpqrcv_create_slot(WalReceiverConnHandle *handle, char *slotname,
>  
>      if (PQresultStatus(res) != PGRES_TUPLES_OK)
>      {
> -        elog(FATAL, "could not crate replication slot \"%s\": %s\n",
> +        elog(ERROR, "could not crate replication slot \"%s\": %s\n",
>               slotname, PQerrorMessage(handle->streamConn));
>      }
>  
> @@ -287,6 +285,28 @@ libpqrcv_create_slot(WalReceiverConnHandle *handle, char *slotname,
>      return snapshot;
>  }
>  
> +/*
> + * Drop replication slot.
> + */
> +static void
> +libpqrcv_drop_slot(WalReceiverConnHandle *handle, char *slotname)
> +{
> +    PGresult       *res;
> +    char            cmd[256];
> +
> +    snprintf(cmd, sizeof(cmd),
> +             "DROP_REPLICATION_SLOT \"%s\"", slotname);
> +
> +    res = libpqrcv_PQexec(handle, cmd);
> +
> +    if (PQresultStatus(res) != PGRES_COMMAND_OK)
> +    {
> +        elog(ERROR, "could not drop replication slot \"%s\": %s\n",
> +             slotname, PQerrorMessage(handle->streamConn));
> +    }
> +
> +    PQclear(res);
> +}


Given that the earlier commit to libpqwalreciever added a lot of this
information, it doesn't seem right to change it again here.



> +typedef struct LogicalRepRelMapEntry {

early {


Ok, running out of time. See you soon I guess ;)

Andres



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: kqueue
Следующее
От: Robert Haas
Дата:
Сообщение: Re: autonomous transactions