Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk
Дата
Msg-id 20190216030150.yn6bqmcfqc2wenbz@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk  (Dave Cramer <davecramer@gmail.com>)
Ответы Re: Reviving the "Stopping logical replication protocol" patch fromVladimir Gordichuk  (Dave Cramer <davecramer@gmail.com>)
Список pgsql-hackers
Hi,

On 2018-12-03 06:38:43 -0500, Dave Cramer wrote:
> From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001
> From: Dave Cramer <davecramer@gmail.com>
> Date: Fri, 30 Nov 2018 18:23:49 -0500
> Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender
> 
> ---
>  src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------
>  1 file changed, 30 insertions(+), 6 deletions(-)
> 
> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
> index 46edb52..93f2648 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
>      sendTimeLineValidUpto = state->currTLIValidUntil;
>      sendTimeLineNextTLI = state->nextTLI;
>  
> +    /*
> +    * If the client sent CopyDone while we were waiting,
> +    * bail out so we can wind up the decoding session.
> +    */
> +    if (streamingDoneSending)
> +        return -1;
> +
> +     /* more than one block available */
>      /* make sure we have enough WAL available */
>      flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
>  
> @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc)
>           * It's important to do this check after the recomputation of
>           * RecentFlushPtr, so we can send all remaining data before shutting
>           * down.
> -         */
> -        if (got_STOPPING)
> +         *
> +         * We'll also exit here if the client sent CopyDone because it wants
> +         * to return to command mode.
> +        */
> +
> +        if (got_STOPPING || streamingDoneReceiving)
>              break;
>  
>          /*
> @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void)
>      }
>  }
>  
> -/* Main loop of walsender process that streams the WAL over Copy messages. */
> +/*
> + * Main loop of walsender process that streams the WAL over Copy messages.
> + *
> + * The send_data callback must enqueue complete CopyData messages to libpq
> + * using pq_putmessage_noblock or similar, since the walsender loop may send
> + * CopyDone then exit and return to command mode in response to a client
> + * CopyDone between calls to send_data.
> + */

Wait, how is it ok to end CopyDone before all the pending data has been
sent out?



> diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
> index 23466ba..66b6e90 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
>          rb->begin(rb, txn);
>  
>          iterstate = ReorderBufferIterTXNInit(rb, txn);
> -        while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
> +        while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL &&
> +               (rb->continue_decoding_cb == NULL ||
> +                rb->continue_decoding_cb()))
>          {
>              Relation    relation = NULL;
>              Oid            reloid;

> @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
>          ReorderBufferIterTXNFinish(rb, iterstate);
>          iterstate = NULL;
>  
> -        /* call commit callback */
> -        rb->commit(rb, txn, commit_lsn);
> +        if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb())
> +        {
> +            /* call commit callback */
> +            rb->commit(rb, txn, commit_lsn);
> +        }


I'm doubtful it's ok to simply stop in the middle of a transaction.



> @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
>  
>      CHECK_FOR_INTERRUPTS();
>  
> -    /* Try to flush pending output to the client */
> -    if (pq_flush_if_writable() != 0)
> -        WalSndShutdown();
> -
> -    /* Try taking fast path unless we get too close to walsender timeout. */
> -    if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
> -                                          wal_sender_timeout / 2) &&
> -        !pq_is_send_pending())
> -    {
> -        return;
> -    }

As somebody else commented on the thread, I'm also doubtful this is
ok. This'll introduce significant additional blocking unless I'm missing
something?



>      /* If we have pending write here, go to slow path */
>      for (;;)
> @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc)
>              break;
>  
>          /*
> -         * We only send regular messages to the client for full decoded
> +         * If we have received CopyDone from the client, sent CopyDone
> +         * ourselves, it's time to exit streaming.
> +         */
> +        if (!IsStreamingActive()) {
> +            break;
> +        }

Wrong formatting.


I wonder if the saner approach here isn't to support query cancellations
or something of that vein, and then handle the error.

Greetings,

Andres Freund


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andres Freund
Дата:
Сообщение: Re: Refactoring the checkpointer's fsync request queue
Следующее
От: Andres Freund
Дата:
Сообщение: Re: [HACKERS] Restricting maximum keep segments by repslots