Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk

Поиск
Список
Период
Сортировка
От Dave Cramer
Тема Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk
Дата
Msg-id CADK3HH+WcV+YbpQf7ZAC8BsySw90vwS=8cGx-GdBF_=GOeOFcg@mail.gmail.com
обсуждение исходный текст
Ответ на Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk  (Andres Freund <andres@anarazel.de>)
Ответы Re: Re: Reviving the "Stopping logical replication protocol" patchfrom Vladimir Gordichuk  (David Steele <david@pgmasters.net>)
Список pgsql-hackers
Andres,

Thanks for looking at this. FYI, I did not originally write this, rather the original author has not replied to requests. 
JDBC could use this, I assume others could as well.

That said I'm certainly open to suggestions on how to do this.

Craig, do you have any other ideas?

Dave Cramer


On Fri, 15 Feb 2019 at 22:01, Andres Freund <andres@anarazel.de> wrote:
Hi,

On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:
> From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
> From: Dave Cramer <davecramer@gmail.com>
> Date: Fri, 30 Nov 2018 18:23:49 -0500
> Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender
>
> ---
>  src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
>  1 file changed, 30 insertions(+), 6 deletions(-)
>
> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
> index 46edb52..93f2648 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
>       sendTimeLineValidUpto = state->currTLIValidUntil;
>       sendTimeLineNextTLI = state->nextTLI;

> +     /*
> +     * If the client sent CopyDone while we were waiting,
> +     * bail out so we can wind up the decoding session.
> +     */
> +     if (streamingDoneSending)
> +             return -1;
> +
> +      /* more than one block available */
>       /* make sure we have enough WAL available */
>       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);

> @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
>                * It's important to do this check after the recomputation of
>                * RecentFlushPtr, so we can send all remaining data before shutting
>                * down.
> -              */
> -             if (got_STOPPING)
> +              *
> +              * We'll also exit here if the client sent CopyDone because it wants
> +              * to return to command mode.
> +             */
> +
> +             if (got_STOPPING || streamingDoneReceiving)
>                       break;

>               /*
> @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
>       }
>  }

> -/* Main loop of walsender process that streams the WAL over Copy messages. */
> +/*
> + * Main loop of walsender process that streams the WAL over Copy messages.
> + *
> + * The send_data callback must enqueue complete CopyData messages to libpq
> + * using pq_putmessage_noblock or similar, since the walsender loop may send
> + * CopyDone then exit and return to command mode in response to a client
> + * CopyDone between calls to send_data.
> + */

Wait, how is it ok to end CopyDone before all the pending data has been
sent out?



> diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
> index 23466ba..66b6e90 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
>               rb->begin(rb, txn);

>               iterstate = ReorderBufferIterTXNInit(rb, txn);
> -             while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
> +             while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
> +                        (rb->continue_decoding_cb == NULL ||
> +                             rb->continue_decoding_cb()))
>               {
>                       Relation        relation = NULL;
>                       Oid                     reloid;

> @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
>               ReorderBufferIterTXNFinish(rb, iterstate);
>               iterstate = NULL;

> -             /* call commit callback */
> -             rb->commit(rb, txn, commit_lsn);
> +             if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
> +             {
> +                     /* call commit callback */
> +                     rb->commit(rb, txn, commit_lsn);
> +             }


I'm doubtful it's ok to simply stop in the middle of a transaction.



> @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,

>       CHECK_FOR_INTERRUPTS();

> -     /* Try to flush pending output to the client */
> -     if (pq_flush_if_writable() != 0)
> -             WalSndShutdown();
> -
> -     /* Try taking fast path unless we get too close to walsender timeout. */
> -     if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
> -                                                                               wal_sender_timeout / 2) &&
> -             !pq_is_send_pending())
> -     {
> -             return;
> -     }

As somebody else commented on the thread, I'm also doubtful this is
ok. This'll introduce significant additional blocking unless I'm missing
something?



>       /* If we have pending write here, go to slow path */
>       for (;;)
> @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
>                       break;

>               /*
> -              * We only send regular messages to the client for full decoded
> +              * If we have received CopyDone from the client, sent CopyDone
> +              * ourselves, it's time to exit streaming.
> +              */
> +             if (!IsStreamingActive()) {
> +                     break;
> +             }

Wrong formatting.


I wonder if the saner approach here isn't to support query cancellations
or something of that vein, and then handle the error.

Greetings,

Andres Freund

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: Re: 2019-03 CF Summary / Review - Tranche #2
Следующее
От: Tom Lane
Дата:
Сообщение: Re: Early WIP/PoC for inlining CTEs