Re: [HACKERS] Logical Replication WIP

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: [HACKERS] Logical Replication WIP
Дата
Msg-id 20161213014155.pv5xkw7rerat5paq@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: [HACKERS] Logical Replication WIP  (Petr Jelinek <petr.jelinek@2ndquadrant.com>)
Список pgsql-hackers
On 2016-12-10 08:48:55 +0100, Petr Jelinek wrote:

> diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
> new file mode 100644
> index 0000000..e3560b7
> --- /dev/null
> +++ b/src/backend/catalog/pg_publication.c
> +
> +Datum pg_get_publication_tables(PG_FUNCTION_ARGS);

Don't we usually put these in a header?

> +/*
> + * Insert new publication / relation mapping.
> + */
> +ObjectAddress
> +publication_add_relation(Oid pubid, Relation targetrel,
> +                         bool if_not_exists)
> +{
> +    Relation    rel;
> +    HeapTuple    tup;
> +    Datum        values[Natts_pg_publication_rel];
> +    bool        nulls[Natts_pg_publication_rel];
> +    Oid            relid = RelationGetRelid(targetrel);
> +    Oid            prrelid;
> +    Publication *pub = GetPublication(pubid);
> +    ObjectAddress    myself,
> +                    referenced;
> +
> +    rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
> +
> +    /* Check for duplicates */

Maybe mention that that check is racy, but a unique index protects
against the race?


> +    /* Insert tuple into catalog. */
> +    prrelid = simple_heap_insert(rel, tup);
> +    CatalogUpdateIndexes(rel, tup);
> +    heap_freetuple(tup);
> +
> +    ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
> +
> +    /* Add dependency on the publication */
> +    ObjectAddressSet(referenced, PublicationRelationId, pubid);
> +    recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
> +
> +    /* Add dependency on the relation */
> +    ObjectAddressSet(referenced, RelationRelationId, relid);
> +    recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
> +
> +    /* Close the table. */
> +    heap_close(rel, RowExclusiveLock);

I'm not quite sure abou the policy, but shouldn't we invoke
InvokeObjectPostCreateHook etc here?


> +/*
> + * Gets list of relation oids for a publication.
> + *
> + * This should only be used for normal publications, the FOR ALL TABLES
> + * should use GetAllTablesPublicationRelations().
> + */
> +List *
> +GetPublicationRelations(Oid pubid)
> +{
> +    List           *result;
> +    Relation        pubrelsrel;
> +    ScanKeyData        scankey;
> +    SysScanDesc        scan;
> +    HeapTuple        tup;
> +
> +    /* Find all publications associated with the relation. */
> +    pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
> +
> +    ScanKeyInit(&scankey,
> +                Anum_pg_publication_rel_prpubid,
> +                BTEqualStrategyNumber, F_OIDEQ,
> +                ObjectIdGetDatum(pubid));
> +
> +    scan = systable_beginscan(pubrelsrel, PublicationRelMapIndexId, true,
> +                              NULL, 1, &scankey);
> +
> +    result = NIL;
> +    while (HeapTupleIsValid(tup = systable_getnext(scan)))
> +    {
> +        Form_pg_publication_rel        pubrel;
> +
> +        pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
> +
> +        result = lappend_oid(result, pubrel->prrelid);
> +    }
> +
> +    systable_endscan(scan);
> +    heap_close(pubrelsrel, NoLock);

In other parts of this you drop the lock, but not here?


> +    heap_close(rel, NoLock);
> +
> +    return result;
> +}

and here.


> +/*
> + * Gets list of all relation published by FOR ALL TABLES publication(s).
> + */
> +List *
> +GetAllTablesPublicationRelations(void)
> +{
> +    Relation    classRel;
> +    ScanKeyData key[1];
> +    HeapScanDesc scan;
> +    HeapTuple    tuple;
> +    List       *result = NIL;
> +
> +    classRel = heap_open(RelationRelationId, AccessShareLock);

> +    heap_endscan(scan);
> +    heap_close(classRel, AccessShareLock);
> +
> +    return result;
> +}

but here.


Btw, why are matviews not publishable?

> +/*
> + * Get Publication using name.
> + */
> +Publication *
> +GetPublicationByName(const char *pubname, bool missing_ok)
> +{
> +    Oid            oid;
> +
> +    oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
> +    if (!OidIsValid(oid))
> +    {
> +        if (missing_ok)
> +            return NULL;
> +
> +        ereport(ERROR,
> +                (errcode(ERRCODE_UNDEFINED_OBJECT),
> +                 errmsg("publication \"%s\" does not exist", pubname)));
> +    }
> +
> +    return GetPublication(oid);
> +}

That's racy... Also, shouldn't we specify for how to deal with the
returned memory for Publication * returning methods?



> diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
> new file mode 100644
> index 0000000..954b2bd
> --- /dev/null
> +++ b/src/backend/commands/publicationcmds.c
> @@ -0,0 +1,613 @@

> +/*
> + * Create new publication.
> + */
> +ObjectAddress
> +CreatePublication(CreatePublicationStmt *stmt)
> +{
> +    Relation    rel;

> +
> +    values[Anum_pg_publication_puballtables - 1] =
> +        BoolGetDatum(stmt->for_all_tables);
> +    values[Anum_pg_publication_pubinsert - 1] =
> +        BoolGetDatum(publish_insert);
> +    values[Anum_pg_publication_pubupdate - 1] =
> +        BoolGetDatum(publish_update);
> +    values[Anum_pg_publication_pubdelete - 1] =
> +        BoolGetDatum(publish_delete);

I remain convinced that a different representation would be
better. There'll be more options over time (truncate, DDL at least).


> +static void
> +AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
> +                       HeapTuple tup)
> +{
> +    bool        publish_insert_given;
> +    bool        publish_update_given;
> +    bool        publish_delete_given;
> +    bool        publish_insert;
> +    bool        publish_update;
> +    bool        publish_delete;
> +    ObjectAddress        obj;
> +
> +    parse_publication_options(stmt->options,
> +                              &publish_insert_given, &publish_insert,
> +                              &publish_update_given, &publish_update,
> +                              &publish_delete_given, &publish_delete);

You could pass it a struct instead...


> +static List *
> +OpenTableList(List *tables)
> +{
> +    List       *relids = NIL;
> +    List       *rels = NIL;
> +    ListCell   *lc;
> +
> +    /*
> +     * Open, share-lock, and check all the explicitly-specified relations
> +     */
> +    foreach(lc, tables)
> +    {
> +        RangeVar   *rv = lfirst(lc);
> +        Relation    rel;
> +        bool        recurse = interpretInhOption(rv->inhOpt);
> +        Oid            myrelid;
> +
> +        rel = heap_openrv(rv, ShareUpdateExclusiveLock);
> +        myrelid = RelationGetRelid(rel);
> +        /* filter out duplicates when user specifies "foo, foo" */
> +        if (list_member_oid(relids, myrelid))
> +        {
> +            heap_close(rel, ShareUpdateExclusiveLock);
> +            continue;
> +        }

This is a quadratic algorithm - that could bite us... Not sure if we
need to care.  If we want to fix it, one approach owuld be to use
RangeVarGetRelid() instead, and then do a qsort/deduplicate before
actually opening the relations.

>  
> -def_elem:    ColLabel '=' def_arg
> +def_elem:    def_key '=' def_arg
>                  {
>                      $$ = makeDefElem($1, (Node *) $3, @1);
>                  }
> -            | ColLabel
> +            | def_key
>                  {
>                      $$ = makeDefElem($1, NULL, @1);
>                  }
>          ;

> +def_key:
> +            ColLabel                        { $$ = $1; }
> +            | ColLabel ColLabel                { $$ = psprintf("%s %s", $1, $2); }
> +        ;
> +

Not quite sure what this is about?  Doesn't that change the accepted
syntax in a bunch of places?


> @@ -2337,6 +2338,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
>      bms_free(relation->rd_indexattr);
>      bms_free(relation->rd_keyattr);
>      bms_free(relation->rd_idattr);
> +    if (relation->rd_pubactions)
> +        pfree(relation->rd_pubactions);
>      if (relation->rd_options)
>          pfree(relation->rd_options);
>      if (relation->rd_indextuple)
> @@ -4992,6 +4995,67 @@ RelationGetExclusionInfo(Relation indexRelation,
>      MemoryContextSwitchTo(oldcxt);
>  }
>  
> +/*
> + * Get publication actions for the given relation.
> + */
> +struct PublicationActions *
> +GetRelationPublicationActions(Relation relation)
> +{
> +    List       *puboids;
> +    ListCell   *lc;
> +    MemoryContext        oldcxt;
> +    PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
> +
> +    if (relation->rd_pubactions)
> +        return memcpy(pubactions, relation->rd_pubactions,
> +                      sizeof(PublicationActions));
> +
> +    /* Fetch the publication membership info. */
> +    puboids = GetRelationPublications(RelationGetRelid(relation));
> +    puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
> +
> +    foreach(lc, puboids)
> +    {
> +        Oid            pubid = lfirst_oid(lc);
> +        HeapTuple    tup;
> +        Form_pg_publication pubform;
> +
> +        tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
> +
> +        if (!HeapTupleIsValid(tup))
> +            elog(ERROR, "cache lookup failed for publication %u", pubid);
> +
> +        pubform = (Form_pg_publication) GETSTRUCT(tup);
> +
> +        pubactions->pubinsert |= pubform->pubinsert;
> +        pubactions->pubupdate |= pubform->pubupdate;
> +        pubactions->pubdelete |= pubform->pubdelete;
> +
> +        ReleaseSysCache(tup);
> +
> +        /*
> +         * If we know everything is replicated, there is no point to check
> +         * for other publications.
> +         */
> +        if (pubactions->pubinsert && pubactions->pubupdate &&
> +            pubactions->pubdelete)
> +            break;
> +    }
> +
> +    if (relation->rd_pubactions)
> +    {
> +        pfree(relation->rd_pubactions);
> +        relation->rd_pubactions = NULL;
> +    }
> +
> +    /* Now save copy of the actions in the relcache entry. */
> +    oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
> +    relation->rd_pubactions = palloc(sizeof(PublicationActions));
> +    memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
> +    MemoryContextSwitchTo(oldcxt);
> +
> +    return pubactions;
> +}


Hm. Do we actually have enough cache invalidation support to make this
cached version correct?  I haven't seen anything in that regard? Seems
to mean that all changes to an ALL TABLES publication need to do a
global relcache invalidation?

- Andres



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Craig Ringer
Дата:
Сообщение: Re: [HACKERS] Password identifiers, protocol aging and SCRAM protocol
Следующее
От: Michael Paquier
Дата:
Сообщение: Re: [HACKERS] Password identifiers, protocol aging and SCRAM protocol