Re: Handle infinite recursion in logical replication setup

Поиск
Список
Период
Сортировка
От vignesh C
Тема Re: Handle infinite recursion in logical replication setup
Дата
Msg-id CALDaNm0HcoP79FVZp+vmJRoa5i87CKrZv4SBmn+aOf34_Ng-eA@mail.gmail.com
обсуждение исходный текст
Ответ на Re: Handle infinite recursion in logical replication setup  (Peter Smith <smithpb2250@gmail.com>)
Список pgsql-hackers
On Fri, Jun 3, 2022 at 11:01 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Please find below my review comments for patch v17-0002:
>
> ======
>
> 1. Commit message
>
> This patch adds a new SUBSCRIPTION parameter "origin". It Specifies whether
> the subscription will request the publisher to only send changes that
> originated locally, or to send any changes regardless of origin.
>
> "It Specifies" -> "It specifies"

Modified

> ~~~
>
> 2. Commit message
>
> Usage:
> CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'
> PUBLICATION pub1 with (origin = local);
>
> "with" -> "WITH"

Modified

> ======
>
> 3. doc/src/sgml/catalogs.sgml
>
> +      <para>
> +       Possible origin values are <literal>local</literal>,
> +       <literal>any</literal>, or <literal>NULL</literal> if none is specified.
> +       If <literal>local</literal>, the subscription will request the
> +       publisher to only send changes that originated locally. If
> +       <literal>any</literal>, the publisher sends any changes regardless of
> +       their origin.
> +      </para></entry>
>
> Should this also mention that NULL (default) is equivalent to 'any'?
>
> SUGGESTION
> If <literal>any</literal> (or <literal>NULL</literal>), the publisher
> sends any changes regardless of their origin.

Modified

> ======
>
> 4. src/backend/catalog/pg_subscription.c
>
> @@ -72,6 +72,16 @@ GetSubscription(Oid subid, bool missing_ok)
>   sub->twophasestate = subform->subtwophasestate;
>   sub->disableonerr = subform->subdisableonerr;
>
> + datum = SysCacheGetAttr(SUBSCRIPTIONOID,
> + tup,
> + Anum_pg_subscription_suborigin,
> + &isnull);
> +
> + if (!isnull)
> + sub->origin = TextDatumGetCString(datum);
> + else
> + sub->origin = NULL;
> +
>   /* Get conninfo */
>   datum = SysCacheGetAttr(SUBSCRIPTIONOID,
>   tup,
>
> Missing comment like the nearby code has:
> /* Get origin */

Modified

> ======
>
> 5. src/backend/replication/logical/worker.c
>
> +/* Macro for comparing string fields that might be NULL */
> +#define equalstr(a, b) \
> + (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b))
> +
>
> Should that have some extra parens for the macro args?
> e.g. "strcmp((a), (b))"

Modified

> ~~~
>
> 6. src/backend/replication/logical/worker.c - maybe_reread_subscription
>
> @@ -3059,6 +3063,7 @@ maybe_reread_subscription(void)
>   strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
>   newsub->binary != MySubscription->binary ||
>   newsub->stream != MySubscription->stream ||
> + equalstr(newsub->origin, MySubscription->origin) ||
>   newsub->owner != MySubscription->owner ||
>   !equal(newsub->publications, MySubscription->publications))
>   {
>
> Is that right? Shouldn't it say !equalstr(...)?

Modified

> ======
>
> 7. src/backend/replication/pgoutput/pgoutput.c - parse_output_parameters
>
> @@ -380,6 +382,16 @@ parse_output_parameters(List *options, PGOutputData *data)
>
>   data->two_phase = defGetBoolean(defel);
>   }
> + else if (strcmp(defel->defname, "origin") == 0)
> + {
> + if (origin_option_given)
> + ereport(ERROR,
> + (errcode(ERRCODE_SYNTAX_ERROR),
> + errmsg("conflicting or redundant options")));
> + origin_option_given = true;
> +
> + data->origin = defGetString(defel);
> + }
>
> Should this function also be validating that the origin parameter
> value is only permitted to be one of "local" or "any"?

Modified

> ~~~
>
> 8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_origin_filter
>
> @@ -1698,12 +1710,20 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>  }
>
>  /*
> - * Currently we always forward.
> + * Return true if the data source (origin) is remote and user has requested
> + * only local data, false otherwise.
>   */
>  static bool
>  pgoutput_origin_filter(LogicalDecodingContext *ctx,
>      RepOriginId origin_id)
>  {
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> +
> + if (data->origin &&
> + (strcmp(data->origin, "local") == 0) &&
> + origin_id != InvalidRepOriginId)
> + return true;
> +
>   return false;
>  }
>
> 8a.
> Could rewrite the condition by putting the strcmp last so you can
> avoid doing unnecessary strcmp.
>
> e.g
> + if (data->origin &&
> + origin_id != InvalidRepOriginId &&
> + strcmp(data->origin, "local" == 0)

I have avoided the strcmp by caching locally as suggested in 8b. I
have not made any change for this.

>
> 8b.
> I also wondered if it might be worth considering caching the origin
> parameter value when it was parsed so that you can avoid doing any
> strcmp at all during this function. Because otherwise this might be
> called millions of times, right?

Modified

> ======
>
> 9. src/include/catalog/pg_subscription.h
>
> @@ -87,6 +87,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId)
> BKI_SHARED_RELATION BKI_ROW
>
>   /* List of publications subscribed to */
>   text subpublications[1] BKI_FORCE_NOT_NULL;
> +
> + /* Publish the data originated from the specified origin */
> + text suborigin;
>  #endif
>  } FormData_pg_subscription;
>
> SUGGESTION (for the comment text)
> /* Only publish data originating from the specified origin */

Modified

> ~~~
>
> 10 src/include/catalog/pg_subscription.h - Subscription
>
> @@ -118,6 +121,9 @@ typedef struct Subscription
>   char    *slotname; /* Name of the replication slot */
>   char    *synccommit; /* Synchronous commit setting for worker */
>   List    *publications; /* List of publication names to subscribe to */
> + char    *origin; /* Publish the data originated from the
> + * specified origin */
> +
>  } Subscription;
>
> 10a.
> Reword comment same as suggested in review comment #9

Modified

> 10b.
> Remove spurious blank line

Modified

> ======
>
> 11. src/include/replication/walreceiver.h
>
> @@ -183,6 +183,8 @@ typedef struct
>   bool streaming; /* Streaming of large transactions */
>   bool twophase; /* Streaming of two-phase transactions at
>   * prepare time */
> + char    *origin; /* Publish the data originated from the
> + * specified origin */
>   } logical;
>
> Reword comment same as suggested in review comment #9

Modified

> ======
>
> 12. src/test/subscription/t/032_origin.pl
>
> +# Test logical replication using origin option.
>
> # Test the CREATE SUBSCRIPTION 'origin' parameter

Modified

> ~~~
>
> 13. src/test/subscription/t/032_origin.pl
>
> +# check that the data published from node_C to node_B is not sent to node_A
>+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
> +is($result, qq(11
> +12), 'Remote data originating from another node (not the publisher)
> is not replicated when origin option is local'
> +);
>
> "option" -> "parameter"

Modified

Thanks for the comments, the v18 patch attached at [1] has the fixes
for the same.
[1] - https://www.postgresql.org/message-id/CALDaNm0Haovukx2q7Yd987Sm8fbQ0nsh8F0EWaO_qsw0uObGBQ%40mail.gmail.com

Regards,
Vignesh



В списке pgsql-hackers по дате отправления:

Предыдущее
От: vignesh C
Дата:
Сообщение: Re: Handle infinite recursion in logical replication setup
Следующее
От: Kyotaro Horiguchi
Дата:
Сообщение: Re: pg_rewind: warn when checkpoint hasn't happened after promotion